airflow + dbt를 테스트한 dag의 모습입니다.

dbt 초기엔 Airflow에서 BashOperator 하나로 dbt run/dbt test를 돌리며, GCS→BigQuery 적재는 staging/mart 두 계층으로 나눠 관리했습니다.
문제는 빌드가 실패했을 때 어느 모델에서 깨졌는지 빠르게 추적하기 어렵고, retry도 잡 전체를 다시 돌려야 한다는 점이었습니다.
Cosmos를 붙이면 Airflow 그래프에 dbt 모델이 각각의 태스크로 나타나니,
- 실패 지점이 정확히 어느 모델인지 즉시 보이고,
- 그 모델만 부분 재시도 / 부분 실행(select) 할 수 있어 운영이 쉬워집니다.
- 왜 Cosmos?
- Airflow에서 dbt 모델을 개별 태스크로 쪼개서 보이고, 의존성 그래프도 자동 생성 → 문제 지점 파악/재시도/부분 실행이 쉬워짐.
- dbt build를 단일 Bash로 던지는 방식보다 관찰성(Observability)과 운영성이 훨씬 올라감.
- 어떻게 붙였나?
- astronomer-cosmos[dbt-bigquery,google] 설치
- 기존 DAG에 DbtTaskGroup 추가 → vars로 처리 일자(= Airflow ds) 를 런타임에 전달
- stage_external_sources(dbt-external-tables 매크로)는 별도 BashOperator로 앞단에서 실행
- 주의할 점
- Cosmos는 DAG 파싱 시점에 dbt ls/dep를 실행 → Jinja var가 필수면 파싱용 기본값을 넣어 Broken DAG를 피해야 함
- 컨테이너에서 dbt 바이너리 경로를 ExecutionConfig로 명시
requirements.txt
astronomer-cosmos[dbt-bigquery,google]
dbt_bq_from_gcs.py
from datetime import datetime
from airflow import DAG
from airflow.operators.bash import BashOperator
from cosmos import DbtTaskGroup, ProjectConfig, ProfileConfig, ExecutionConfig
DBT_DIR = "/opt/airflow/dbt/demo_dbt"
project_config = ProjectConfig(
dbt_project_path=DBT_DIR,
)
profile_config = ProfileConfig(
profile_name="demo_dbt",
target_name="dev",
profiles_yml_filepath=f"{DBT_DIR}/profiles/profiles.yml",
)
exec_config = ExecutionConfig(
dbt_executable_path="/home/airflow/.local/bin/dbt" # 컨테이너에서 dbt 설치
)
with DAG(
dag_id="dbt_bq_from_gcs",
start_date=datetime(2025, 8, 1),
schedule="@daily",
catchup=False,
max_active_runs=1,
tags=["dbt", "cosmos", "bigquery", "gcs", "staging", "mart"],
) as dag:
operator_args = {
"vars": {
"proc_date": "{{ ds }}",
"proc_date_yyyymmdd": "{{ ds_nodash }}",
}
}
stage_external = BashOperator(
task_id="stage_external_sources",
bash_command=(
f"cd {DBT_DIR} && "
"dbt run-operation stage_external_sources "
"--vars '{\"ext_full_refresh\": true, "
"\"proc_date\": \"{{ ds }}\", "
"\"proc_date_yyyymmdd\": \"{{ ds_nodash }}\"}'"
),
)
dbt_tg = DbtTaskGroup(
group_id="dbt_build", # 그룹 이름
project_config=project_config,
profile_config=profile_config,
execution_config=exec_config,
operator_args=operator_args,
)
stage_external >> dbt_tg
models/sources/gcs_raw.yml
- Cosmos는 DAG 파싱 시점에 dbt ls/dep를 실행 → Jinja var가 필수면 파싱용 기본값을 넣어 Broken DAG를 피해야 함
version: 2
sources:
- name: raw
schema: dbt_tutorial
tables:
- name: orders_ext
external:
# 전일/특정일 폴더만 바라보도록 vars 사용
# 예) gs://your-bucket/data/orders/20250810/*.parquet
location: "gs://your-bucket/data/orders/{{ var('proc_date_yyyymmdd', '19700101') }}/*.parquet"
options:
format: PARQUET
실행 결과
그래프를 보면 각 dbt 단계별로 실행가능해집니다.


Cosmos를 붙였더니 Airflow 그래프에서 dbt 모델이 1st-class 시민이 됩니다.
장애 원인 파악, 부분 재시도, 모듈별 실행이 쉬워져 운영 비용이 확 줄었고, 날짜 파티션 전략과 맞물려 안전한 재실행/백필이 가능해졌습니다.
'Database > 모델링' 카테고리의 다른 글
| 4. mysql to bigquery using dbt (0) | 2025.10.05 |
|---|---|
| 프로젝트는 절대로 생각대로 흘러가지 않는다. (0) | 2011.04.04 |
| [설계] CRC카드 , 클래스다이어그램 (0) | 2011.04.02 |
| DA# 교육에서 그렸던거... (0) | 2010.11.24 |
| 간단한 DB 이해(초보) (0) | 2010.09.25 |