본문 바로가기

Database/모델링

3. Airflow + Astronomer Cosmos + dbt

 

 

airflow + dbt를 테스트한 dag의 모습입니다.

dbt 초기엔 Airflow에서 BashOperator 하나로 dbt run/dbt test를 돌리며, GCS→BigQuery 적재는 staging/mart 두 계층으로 나눠 관리했습니다.
문제는 빌드가 실패했을 때 어느 모델에서 깨졌는지 빠르게 추적하기 어렵고, retry도 잡 전체를 다시 돌려야 한다는 점이었습니다.

Cosmos를 붙이면 Airflow 그래프에 dbt 모델이 각각의 태스크로 나타나니,

  • 실패 지점이 정확히 어느 모델인지 즉시 보이고,
  • 그 모델만 부분 재시도 / 부분 실행(select) 할 수 있어 운영이 쉬워집니다.

 

  • 왜 Cosmos?
    • Airflow에서 dbt 모델을 개별 태스크로 쪼개서 보이고, 의존성 그래프도 자동 생성 → 문제 지점 파악/재시도/부분 실행이 쉬워짐.
    • dbt build를 단일 Bash로 던지는 방식보다 관찰성(Observability)과 운영성이 훨씬 올라감.
  • 어떻게 붙였나?
    • astronomer-cosmos[dbt-bigquery,google] 설치
    • 기존 DAG에 DbtTaskGroup 추가 → vars로 처리 일자(= Airflow ds) 를 런타임에 전달
    • stage_external_sources(dbt-external-tables 매크로)는 별도 BashOperator로 앞단에서 실행
  • 주의할 점
    • Cosmos는 DAG 파싱 시점에 dbt ls/dep를 실행 → Jinja var가 필수면 파싱용 기본값을 넣어 Broken DAG를 피해야 함
    • 컨테이너에서 dbt 바이너리 경로를 ExecutionConfig로 명시

 

 

 

requirements.txt

astronomer-cosmos[dbt-bigquery,google]

 

 

dbt_bq_from_gcs.py

from datetime import datetime
from airflow import DAG
from airflow.operators.bash import BashOperator
from cosmos import DbtTaskGroup, ProjectConfig, ProfileConfig, ExecutionConfig


DBT_DIR = "/opt/airflow/dbt/demo_dbt"


project_config = ProjectConfig(
    dbt_project_path=DBT_DIR,
)

profile_config = ProfileConfig(
    profile_name="demo_dbt",                   
    target_name="dev",                        
    profiles_yml_filepath=f"{DBT_DIR}/profiles/profiles.yml",
)


exec_config = ExecutionConfig(
    dbt_executable_path="/home/airflow/.local/bin/dbt"     # 컨테이너에서 dbt 설치
)


with DAG(
    dag_id="dbt_bq_from_gcs",
    start_date=datetime(2025, 8, 1),
    schedule="@daily",
    catchup=False,
    max_active_runs=1,
    tags=["dbt", "cosmos", "bigquery", "gcs", "staging", "mart"],
) as dag:

    operator_args = {
        "vars": {
            "proc_date": "{{ ds }}",
            "proc_date_yyyymmdd": "{{ ds_nodash }}",
        }
    }
    
    stage_external = BashOperator(
        task_id="stage_external_sources",
        bash_command=(
            f"cd {DBT_DIR} && "
            "dbt run-operation stage_external_sources "
            "--vars '{\"ext_full_refresh\": true, "
            "\"proc_date\": \"{{ ds }}\", "
            "\"proc_date_yyyymmdd\": \"{{ ds_nodash }}\"}'"
        ),
    )

    dbt_tg = DbtTaskGroup(
        group_id="dbt_build",                   # 그룹 이름
        project_config=project_config,
        profile_config=profile_config,
        execution_config=exec_config,
        operator_args=operator_args, 
    )
    
    stage_external >> dbt_tg

 

 

 

models/sources/gcs_raw.yml

  • Cosmos는 DAG 파싱 시점에 dbt ls/dep를 실행 → Jinja var가 필수면 파싱용 기본값을 넣어 Broken DAG를 피해야 함
version: 2

sources:
  - name: raw
    schema: dbt_tutorial
    tables:
      - name: orders_ext
        external:
          # 전일/특정일 폴더만 바라보도록 vars 사용
          # 예) gs://your-bucket/data/orders/20250810/*.parquet
          location: "gs://your-bucket/data/orders/{{ var('proc_date_yyyymmdd', '19700101') }}/*.parquet"
          options:
            format: PARQUET

 

 

 

실행 결과 

그래프를 보면 각 dbt 단계별로 실행가능해집니다. 

 

 

Cosmos를 붙였더니 Airflow 그래프에서 dbt 모델이 1st-class 시민이 됩니다.
장애 원인 파악, 부분 재시도, 모듈별 실행이 쉬워져 운영 비용이 확 줄었고, 날짜 파티션 전략과 맞물려 안전한 재실행/백필이 가능해졌습니다.