Orchestrate dbt with Airflow on Spark: A Complete Guide
This guide demonstrates how to orchestrate scalable dbt pipelines on Ilum's অ্যাপাচি স্পার্ক cluster using Apache Airflow এবং Astronomer Cosmos. You will learn how to build a robust medallion architecture (Bronze → Silver → Gold) utilizing Kubernetes-native compute for efficient data transformation. We will cover automatic DAG generation, data quality testing, and incremental processing strategies.
For a comprehensive deep-dive into the architecture, benefits, and strategic considerations of running dbt on Spark with Airflow, see our detailed blog post: Orchestrate dbt on Spark with Airflow: A Guide to Modern Data Engineering on Ilum.
Prerequisites for Airflow and dbt on Ilum
- Ilum version 6.6.2 or later
- Airflow enabled with the
3.1.1-dbtপ্রতিচ্ছবি - Spark SQL (Thrift Server) or স্পার্ক কানেক্ট সক্ষম
- Basic familiarity with dbt and Airflow concepts
dbt-Airflow-Spark Architecture Overview
The integration combines four key components:
| Component | Technology |
|---|---|
| Compute Engine | Apache Spark on Ilum (Kubernetes-native) |
| Data Modeling | dbt-spark (dbt-core + Spark adapter) |
| অর্কেস্ট্রেশন | Apache Airflow 3.1 with KubernetesExecutor |
| DAG Generation | Astronomer Cosmos |
Workflow: Git repository → gitSync → Airflow → Cosmos (auto-generates DAG) → Spark SQL / Spark Connect → Ilum Spark Cluster
┌──────────────────────────────────────────────────────────────────┐
│ Gitea Repository │
│ airflow.git/ilum_dbt_project/ │
│ ├── dbt_project.yml │
│ ├── seeds/crypto_prices_raw.csv │
│ └── models/{bronze,silver,gold}/*.sql │
└──────────────────────────────────────────────────────────────────┘
│ gitSync
▼
┌──────────────────────────────────────────────────────────────────┐
│ Airflow 3.1 (Kubernetes) │
│ ┌─────────────────────────────────────────────────────────────┐ │
│ │ Astronomer Cosmos │ │
│ │ - Parses dbt project │ │
│ │ - Generates DAG with tasks for each model + test │ │
│ │ - Maintains dbt ref() dependencies │ │
│ └─────────────────────────────────────────────────────────────┘ │
└──────────────────────────────────────────────────────────────────┘
│
┌───────────────────┴───────────────────┐
▼ ▼
┌─────────────────────┐ ┌─────────────────────┐
│ Thrift Server │ │ Spark Connect │
│ (ilum-sql-thrift) │ │ (job-xxx-driver) │
│ Port: 10009 │ │ Port: 15002 │
└─────────────────────┘ └─────────────────────┘
│ │
└───────────────────┬───────────────────┘
▼
┌──────────────────────────────────────────────────────────────────┐
│ Ilum Spark Cluster │
│ Executes SQL queries, creates managed tables │
└──────────────────────────────────────────────────────────────────┘
Quick Start: Deploying dbt Pipelines
1. Enable Airflow with dbt Support
Install Ilum with Airflow and dbt pre-configured:
Helm installation il and ile/el \
--সেট ilum-Hive-metastore.enabled=true \
--set ilum-core.metastore.enabled=true \
--set ilum-core.metastore.type=hive \
--সেট ilum-core.sql.সক্ষম=সত্য \
--সেট ilum-sql.enabled=true \
--set airflow.enabled=true \
--set airflow.images.airflow.tag=3.1.1-dbt
When using the 3.1.1-dbt image tag, the complete dbt project and DAG described in this guide are automatically pre-loaded into the internal Gitea repository and synced to Airflow. You can trigger the example pipeline immediately after installation.
2. Access Airflow
Navigate to the Airflow UI via the Ilum console. The default credentials are typically অ্যাডমিন:অ্যাডমিন .
3. Verify the Connection
Starting from Ilum 6.6.2, ঐ spark_thrift_default connection is automatically configured. Verify it exists:
Admin → Connections → Search for spark_thrift_default
If missing, create it manually:
| Field | মান |
|---|---|
| Connection ID | spark_thrift_default |
| Connection Type | স্ফুলিঙ্গ বা spark_sql |
| Host | ilum-sql-thrift-binary.<NAMESPACE>.svc.cluster.local |
| Port | 10009 |
4. Trigger the DAG
Find ilum_dbt_thrift_pipeline in the Airflow UI and trigger it manually. Monitor the Graph View to see the auto-generated task dependencies.
The Airflow DAG automatically generated by Astronomer Cosmos, reflecting the dbt model dependencies.
Project Structure
The dbt project follows a standard medallion architecture:
ilum_dbt_project/
├── dbt_project.yml
├── packages.yml
├── seeds/
│ └── crypto_prices_raw.csv
└── models/
├── bronze/
│ └── crypto_prices_bronze.sql
├── silver/
│ └── crypto_prices_silver_daily.sql
├── gold/
│ └── crypto_prices_gold_latest.sql
└── schema.yml
dbt Configuration
dbt_project.yml
The central configuration file defines project metadata, paths, and layer-specific settings:
নাম : "ilum_dbt_project"
সংস্করণ : "1.0.0"
config-version: 2
profile: "ilum_dbt_project" # must match ProfileConfig.profile_name
model-paths: [ "models"]
seed-paths: [ "seeds"]
macro-paths: [ "macros"]
test-paths: [ "tests"]
models:
ilum_dbt_project :
+materialized: সারণী
bronze:
+tags: [ "bronze"]
silver:
+tags: [ "silver"]
gold:
+tags: [ "gold"]
seeds:
ilum_dbt_project :
+column_types:
তারিখ : তারিখ
symbol: স্ট্রিং
price_usd: double
volume_usd: double
market_cap_usd: double
crypto_prices_raw:
+pre-hook:
- "{{ drop_this_seed() }}"
Key features:
- Tags enable selective execution:
dbt run --select tag:gold - Column types ensure correct Spark table schemas
- Pre-hook
drop_this_seed()provides idempotent seed loading for demos
Medallion Architecture Layers
Bronze Layer: Type Normalization
File: models/bronze/crypto_prices_bronze.sql
Converts raw seed data into typed, normalized tables with incremental processing:
{{ কনফিগারেশন (
বাস্তবায়িত = 'incremental',
unique_key= [ 'date', 'symbol']
) }}
select
cast( তারিখ হিসেবে তারিখ ) হিসেবে তারিখ ,
upper( symbol) হিসেবে symbol,
cast( price_usd হিসেবে double) হিসেবে price_usd,
cast( volume_usd হিসেবে double) হিসেবে volume_usd,
cast( market_cap_usd হিসেবে double) হিসেবে market_cap_usd
থেকে {{ রেফ ( 'crypto_prices_raw') }}
{ % যদি is_incremental( ) % }
where তারিখ > ( select max( তারিখ ) থেকে {{ this }})
{ % এন্ডআইএফ % }
বেনিফিট:
- Only processes new records after initial load
- Reduces compute costs for large datasets
Silver Layer: Data Enrichment
File: models/silver/crypto_prices_silver_daily.sql
Adds 7-day moving averages using Spark window functions:
{{ কনফিগারেশন ( বাস্তবায়িত = 'টেবিল' ) }}
select
তারিখ ,
symbol,
price_usd,
volume_usd,
market_cap_usd,
গড় ( price_usd) over (
partition দ্বারা symbol
order দ্বারা তারিখ
rows মধ্যে 6 preceding এবং current সারি
) হিসেবে price_usd_7d_avg,
গড় ( volume_usd) over (
partition দ্বারা symbol
order দ্বারা তারিখ
rows মধ্যে 6 preceding এবং current সারি
) হিসেবে volume_usd_7d_avg
থেকে {{ রেফ ( 'crypto_prices_bronze') }}
Gold Layer: Business-Ready Views
File: models/gold/crypto_prices_gold_latest.sql
Produces analytics-ready data showing the latest 30 days per symbol:
{{ কনফিগারেশন ( বাস্তবায়িত = 'টেবিল' ) }}
সাথে ranked হিসেবে (
select
* ,
row_number( ) over (
partition দ্বারা symbol
order দ্বারা তারিখ desc
) হিসেবে rn
থেকে {{ রেফ ( 'crypto_prices_silver_daily') }}
)
select
তারিখ ,
symbol,
price_usd,
price_usd_7d_avg,
volume_usd_7d_avg,
market_cap_usd
থেকে ranked
where rn <= 30
Data Quality Tests
Define tests in models/schema.yml to create quality gates:
সংস্করণ : 2
seeds:
- নাম : crypto_prices_raw
columns:
- নাম : তারিখ
tests: [ not_null]
- নাম : symbol
tests: [ not_null]
- নাম : price_usd
tests: [ not_null]
models:
- নাম : crypto_prices_bronze
বর্ণনা : "Bronze layer - typed and normalized crypto prices."
columns:
- নাম : তারিখ
tests: [ not_null]
- নাম : symbol
tests: [ not_null]
- নাম : price_usd
tests: [ not_null]
In Airflow: Cosmos converts each test into a separate task that blocks downstream models if it fails.
Generating Airflow DAGs with Cosmos
- Option A: Spark Thrift Server
- Option B: Spark Connect
File: dags/ilum_dbt_thrift.py
থেকে তারিখের সময় আমদানি তারিখের সময় , timedelta
আমদানি os
থেকে airflow. configuration আমদানি কনফ
থেকে cosmos আমদানি DbtDag, ProjectConfig, ProfileConfig, ExecutionConfig
থেকে cosmos. profiles আমদানি SparkThriftProfileMapping
DAGS_FOLDER = কনফ . পান ( "core", "dags_folder")
DBT_PROJECT_PATH = os. পথ . join( DAGS_FOLDER, "ilum_dbt_project")
DBT_BIN = "/home/airflow/.local/bin/dbt"
profile_config = ProfileConfig(
profile_name= "ilum_dbt_project",
target_name= "prod",
profile_mapping= SparkThriftProfileMapping(
conn_id= "spark_thrift_default",
profile_args= {
"schema": "ডিফল্ট" ,
"threads": 4 ,
} ,
) ,
)
dbt_dag = DbtDag(
project_config= ProjectConfig(
dbt_project_path= DBT_PROJECT_PATH,
) ,
profile_config= profile_config,
execution_config= ExecutionConfig(
dbt_executable_path= DBT_BIN,
) ,
dag_id= "ilum_dbt_thrift_pipeline",
schedule= "@daily",
start_date= তারিখের সময় ( 2024, 1 , 1 ) ,
catchup= False,
default_args= {
"owner": "data-team",
"retries": 2 ,
"retry_delay": timedelta( minutes= 5 ) ,
} ,
)
How it works:
SparkThriftProfileMappinguses the Airflow connectionspark_thrift_default- Cosmos scans the dbt project and auto-generates tasks for each model, seed, and test
- Dependencies mirror dbt's
ref()relationships
File: dags/ilum_dbt_connect.py
থেকে তারিখের সময় আমদানি তারিখের সময় , timedelta
আমদানি os
থেকে airflow. configuration আমদানি কনফ
থেকে cosmos আমদানি DbtDag, ProjectConfig, ProfileConfig, ExecutionConfig
থেকে cosmos. profiles আমদানি SparkSessionProfileMapping
DAGS_FOLDER = কনফ . পান ( "core", "dags_folder")
DBT_PROJECT_PATH = os. পথ . join( DAGS_FOLDER, "ilum_dbt_project")
DBT_BIN = "/home/airflow/.local/bin/dbt"
profile_config = ProfileConfig(
profile_name= "ilum_dbt_project",
target_name= "prod",
profile_mapping= SparkSessionProfileMapping(
conn_id= "spark_connect_default",
profile_args= {
"schema": "ডিফল্ট" ,
"threads": 4 ,
} ,
) ,
)
dbt_dag = DbtDag(
project_config= ProjectConfig(
dbt_project_path= DBT_PROJECT_PATH,
) ,
profile_config= profile_config,
execution_config= ExecutionConfig(
dbt_executable_path= DBT_BIN,
) ,
dag_id= "ilum_dbt_connect_pipeline",
schedule= "@daily",
start_date= তারিখের সময় ( 2024, 1 , 1 ) ,
catchup= False,
default_args= {
"owner": "data-team",
"retries": 2 ,
"retry_delay": timedelta( minutes= 5 ) ,
} ,
)
How it works:
-
SparkSessionProfileMappinguses the Airflow connectionspark_connect_default -
Cosmos scans the dbt project and auto-generates tasks for each model, seed, and test
-
Dependencies mirror dbt's
ref()relationshipsSpark Connect Connection:
Field মান Connection ID spark_connect_defaultHost job-<id>-driver-svc.<NAMESPACE>.svc.cluster.localPort 15002Extra {"connect": "sc://job-<id>-driver-svc.<NAMESPACE>.svc.cluster.local:15002"}
Thrift vs Spark Connect
| Aspect | Thrift Server | স্পার্ক কানেক্ট |
|---|---|---|
| Endpoint | Central SQL server (shared) | Per-job endpoint (isolated) |
| Use case | Multiple tools sharing one endpoint | Isolated compute per project |
| Protocol | JDBC/Thrift | gRPC (native Spark API) |
| Connection | Stable service name | Dynamic job-based URL |
Tracking Data Lineage and Dependencies
Once the pipeline runs successfully, tables are stored in the Hive Metastore and accessible across Ilum components:
- Ilum SQL: Query tables directly
- জুপিটার নোটবুক : Analyze data interactively
- স্পার্ক জব : Use as input for other pipelines
- Lineage View: Visualize table dependencies in Ilum UI
Figure 2: The Ilum Data Lineage view visualizing the full medallion architecture (Bronze → Silver → Gold) and table dependencies.
মূল উপকারিতা
| বৈশিষ্ট্য | Benefit |
|---|---|
| No dbt Cloud | Fully open-source, no subscription costs |
| Medallion pattern | Clean data architecture (bronze → silver → gold) |
| Incremental models | Process only new data, reduce compute costs |
| Quality gates | dbt tests block downstream if data fails |
| KubernetesExecutor | Each task isolated in separate pod |
| gitSync | Code changes auto-deployed from Gitea |
| Auto DAG generation | Cosmos creates tasks from dbt models automatically |
| Full lineage | Track model dependencies in Airflow UI and Ilum |
সমস্যা সমাধান
Click to view troubleshooting steps
Connection Errors
If you see failed to resolve sockaddr errors in dbt logs:
[Errno -2] Name or service not known
সমাধান: Verify the Thrift service exists:
kubectl get svc -n <NAMESPACE> | grep thrift
Ensure the connection host matches the service name exactly.
DAG Not Appearing
If the DAG doesn't show up in Airflow:
- Check gitSync logs to ensure the dbt project is synced
- Verify the
DBT_PROJECT_PATHpoints to the correct directory - Look for parsing errors in Airflow logs
Tests Failing
If dbt tests fail unexpectedly:
- Check the test task logs in Airflow
- Query the table directly via Ilum SQL to verify data quality
- Adjust test thresholds or fix upstream data issues
অতিরিক্ত রিসোর্স
- Blog Post: Orchestrate dbt on Spark with Airflow - Comprehensive guide with architecture details and strategic benefits
- Astronomer Cosmos Documentation
- dbt-spark Adapter