본문 바로가기

server/아키텍쳐

2. airflow + dbt

 

이 예제는 개발(로컬/경량)–운영(클라우드/규모) 분리를 전제로 한다.

  • 개발/속도: DuckDB 어댑터로 빠르게 모델을 검증하고, seed·단위테스트 중심으로 피드백 루프를 짧게 한다.
  • 운영/확장: BigQuery를 타깃으로 외부 테이블(원천: GCS) → staging → mart의 표준 3계층을 구축한다. 스토리지-컴퓨트 분리를 활용해 대용량에도 비용/성능 균형을 맞춘다.

 

디렉토리 구조 

airflow-dbt-demo/
├─ docker-compose.yml
├─ .env                    
├─ airflow/
│  ├─ Dockerfile
│  ├─ requirements.txt
│  ├─ dags/
│     └─ dbt_duckdb_demo.py
└─ dbt/
   └─ demo_dbt/
      ├─ dbt_project.yml
      ├─ models/
      │  ├─ marts/order_counts.sql
      │  └─ schema.yml
      ├─ seeds/
      │  └─ orders.csv
      └─ profiles/profiles.yml

 

 

airflow/requirements.txt

requests
pandas
numpy
pathlib
beautifulsoup4
psycopg2-binary
sqlalchemy
pymongo
apache-airflow
apache-airflow-providers-ssh
apache-airflow-providers-papermill
dbt-core
dbt-postgres
dbt-duckdb

 

 

 

 

기본 태스크 순서: dbt debug → dbt deps → dbt seed --full-refresh → dbt run → dbt test

 

airflow/dags/dbt_duckdb_demo.py

from datetime import datetime
from airflow import DAG
from airflow.operators.bash import BashOperator

DBT_PROJECT_DIR = "/opt/airflow/dbt/demo_dbt"

with DAG(
    dag_id="dbt_duckdb_demo",
    start_date=datetime(2025, 8, 1),
    schedule=None,           # 수동 실행
    catchup=False,
    tags=["dbt", "duckdb", "demo"],
) as dag:

    dbt_debug = BashOperator(
        task_id="dbt_debug",
        bash_command=f"cd {DBT_PROJECT_DIR} && dbt debug"
    )

    dbt_deps = BashOperator(
        task_id="dbt_deps",
        bash_command=f"cd {DBT_PROJECT_DIR} && dbt deps"
    )

    dbt_seed = BashOperator(
        task_id="dbt_seed",
        bash_command=f"cd {DBT_PROJECT_DIR} && dbt seed --full-refresh"
    )

    dbt_run = BashOperator(
        task_id="dbt_run",
        bash_command=f"cd {DBT_PROJECT_DIR} && dbt run"
    )

    dbt_test = BashOperator(
        task_id="dbt_test",
        bash_command=f"cd {DBT_PROJECT_DIR} && dbt test"
    )

    dbt_debug >> dbt_deps >> dbt_seed >> dbt_run >> dbt_test

 

 

 

 

docker-compose

x-airflow-common: &airflow-common
  build: ./airflow
  env_file:
    - .env
  environment:
    AIRFLOW__CORE__EXECUTOR: LocalExecutor
    AIRFLOW__CORE__LOAD_EXAMPLES: "false"
    AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: "false"
    AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
    AIRFLOW__LOGGING__LOGGING_LEVEL: INFO
    DBT_PROFILES_DIR: /opt/airflow/dbt/demo_dbt/profiles
    GOOGLE_APPLICATION_CREDENTIALS: /opt/airflow/gcp/sa.json
    GCP_PROJECT_ID: your-gcp-project-id
    BQ_LOCATION: asia-northeast3
  volumes:
    - ./airflow/dags:/opt/airflow/dags
    - ./airflow/logs:/opt/airflow/logs
    - ./dbt:/opt/airflow/dbt
    - ./gcp:/opt/airflow/gcp:ro            
  user: "50000:0" 
  depends_on:
    postgres:
      condition: service_healthy

 

 

 

 

 

 

Airflow DAG: 외부테이블 생성 → staging → mart → test

airflow/dags/dbt_bq_from_gcs.py

from datetime import datetime
from airflow import DAG
from airflow.operators.bash import BashOperator

DBT_DIR = "/opt/airflow/dbt/demo_dbt"

with DAG(
    dag_id="dbt_bq_from_gcs",
    start_date=datetime(2025, 8, 1),
    schedule=None,
    catchup=False,
    tags=["dbt", "bigquery", "gcs", "staging", "mart"],
) as dag:

    dbt_debug = BashOperator(
        task_id="dbt_debug",
        bash_command=f"cd {DBT_DIR} && dbt debug"
    )

    dbt_deps = BashOperator(
        task_id="dbt_deps",
        bash_command=f"cd {DBT_DIR} && dbt deps"
    )

    # 외부 테이블 생성/갱신
    stage_external = BashOperator(
        task_id="stage_external_sources",
        bash_command=(
            f'cd {DBT_DIR} && '
            'dbt run-operation stage_external_sources --args \'{"ext_full_refresh": true}\''
        ),
    )

    # 전체 모델 실행(staging, mart 순차)
    dbt_run = BashOperator(
        task_id="dbt_run",
        bash_command=f"cd {DBT_DIR} && dbt run"
    )

    dbt_test = BashOperator(
        task_id="dbt_test",
        bash_command=f"cd {DBT_DIR} && dbt test"
    )

    dbt_debug >> dbt_deps >> stage_external >> dbt_run >> dbt_test

 

 

 

 

dbt 셋팅 

 

dbt/packages.yml

외부 테이블 생성을 자동화하는 패키지.

packages:
  - package: dbt-labs/dbt_external_tables
    version: 0.11.1

 

 

 

dbt_project.yml

 

  • 레이어별 스키마 분리: staging은 stg 스키마, marts는 mart 스키마로 생성되도록 설정했다. 데이터셋을 논리 계층으로 분할해 거버넌스(권한, 수명, 비용 추적)가 쉬워진다.
  • 기본 materialization: 프로젝트 루트에서 +materialized: table을 지정해, 별도 지정이 없을 때 테이블로 생성되게 했다. 필요 시 모델 단위에서 view/incremental로 덮어쓰면 된다.

 

name: "demo_dbt"
version: "1.0"
config-version: 2
profile: "demo_dbt"

models:
  demo_dbt:
    +materialized: table

  staging:
      +schema: stg
      +tags: ["staging"]

  marts:
    +schema: mart
    +tags: ["mart"]

    
seeds:
  demo_dbt:
    +header: true
    +quote_columns: false

 

 

 

profiles/profiles.yml

BigQuery 서비스 계정 기반 연결 정의. dataset은 디폴트(미사용), 실제로는 폴더별 +schema에 따라 stg, mart 데이터셋이 생성된다.

demo_dbt:
  target: dev
  outputs:
    dev:
      type: bigquery
      method: service-account
      keyfile: "{{ env_var('GOOGLE_APPLICATION_CREDENTIALS') }}"
      project: "{{ env_var('GCP_PROJECT_ID') }}"
      dataset: analytics   # 기본값(미사용), 모델에서 +schema로 분리
      location: "{{ env_var('BQ_LOCATION') }}"
      threads: 4
      priority: interactive

 

 

 

 

models/sources/gcs_raw.yml

원천은 GCS Parquet다. 이 선언은 “BigQuery 외부테이블을 어떤 경로·옵션으로 만들 것인가”를 담는다.

여기서 GCS 버킷/경로를 환경에 맞게 바꾸면 된다. 
아래 예시는 gs://your-bucket/data/orders/*.parquet 를 외부 테이블로 만든다. 외부 테이블은 raw_ext 데이터셋에 정의된다.

version: 2

sources:
  - name: raw
    schema: dbt_tutorial
    tables:
      - name: orders_ext
        external:
          location: "gs://your-bucket/data/orders/*.parquet"
          options:
            format: PARQUET

 

 

 

staging 모델 models/staging/stg_orders.sql

너의 Parquet 스키마에 맞춰 컬럼명을 맞춰라. 예시는 order_id, customer_id, amount, order_ts 기준.

with src as (
  select * from {{ source('raw', 'orders_ext') }}
),
typed as (
  select
    cast(order_id as int64)         as order_id,
    cast(customer_id as int64)      as customer_id,
    cast(amount as numeric)         as amount,
    cast(order_ts as timestamp)     as order_ts
  from src
)
select * from typed

 

 

mart 모델 models/marts/mart_orders_daily.sql

일자 단위 매출 집계 예시.

select
  date(order_ts) as order_date,
  count(*)       as orders_cnt,
  sum(amount)    as revenue
from {{ ref('stg_orders') }}
group by 1
order by 1

 

 

macros/generate_schema_name.sql

+schema를 지정했을 때와 아닐 때의 스키마 이름 결정을 단일 매크로로 캡슐화했다. 이 패턴의 이점은 다음과 같다.

  • 환경·도메인·레이어 기준의 스키마 네이밍 규칙을 한 곳에서 강제할 수 있다.
  • 팀이 “프로젝트별 기본 스키마는 target.schema, 레이어 지정 시엔 custom_schema” 같은 룰을 정했다면, 매크로만 표준화하면 된다.
{% macro generate_schema_name(custom_schema_name, node) -%}
    {%- if custom_schema_name is none -%}
        {{ target.schema }}
    {%- else -%}
        {{ custom_schema_name }}
    {%- endif -%}
{%- endmacro %}

 

 

실행~

docker compose build
docker compose run --rm airflow-init
docker compose up -d

 

 

 

 

다음과 같이 빅쿼리에 테이블이 생성되고

 

mart_orders_daily에 집계도 잘되어 있는것을 볼수 있다. 

 

 


 

이번엔 스토리지의 모든 parquet 파일을 한꺼번에 동작하는게 아닌, 

해당 날짜에 전날의 데이터가 저장되고, 집계되도록 airflow를 수정하려 한다. 

 

models/sources/gcs_raw.yml

version: 2

sources:
  - name: raw
    schema: dbt_tutorial
    tables:
      - name: orders_ext
        external:
          # 전일/특정일 폴더만 바라보도록 vars 사용
          # 예) gs://your-bucket/data/orders/20250810/*.parquet
          location: "gs://your-bucket/data/orders/{{ var('proc_date_yyyymmdd') }}/*.parquet"
          options:
            format: PARQUET
            # 필요시: autodetect: true

 

 

models/marts/mart_orders_daily.sql

이렇게 하면 같은 날짜를 다시 돌려도 해당 파티션만 깔끔히 교체됩니다(리트라이·백필 안전).

{{ config(
    materialized='incremental',
    partition_by={'field': 'order_date', 'data_type':'date'},
    incremental_strategy='insert_overwrite',
    on_schema_change='sync_all_columns'
) }}

select
  order_date,
  count(*)    as orders_cnt,
  sum(amount) as revenue
from {{ ref('stg_orders') }}
where order_date = date('{{ var("proc_date") }}')
group by order_date

 

 

dag에는 실행시 날짜를 넣어주도록 변수를 추가한다. 

from datetime import datetime
from airflow import DAG
from airflow.operators.bash import BashOperator

DBT_DIR = "/opt/airflow/dbt/demo_dbt"

with DAG(
    dag_id="dbt_bq_from_gcs",
    start_date=datetime(2025, 8, 1),
    schedule="@daily",
    catchup=False,
    max_active_runs=1,
    tags=["dbt", "bigquery", "gcs", "staging", "mart"],
) as dag:

    
    DBT_VARS = (
        "--vars '{"
        "\"proc_date\": \"{{ ds }}\", "
        "\"proc_date_yyyymmdd\": \"{{ ds_nodash }}\", "
        "\"ext_full_refresh\": true "
        "}'"
    )
    
    dbt_debug = BashOperator(
        task_id="dbt_debug",
        bash_command=f"cd {DBT_DIR} && dbt debug"
    )

    dbt_deps = BashOperator(
        task_id="dbt_deps",
        bash_command=f"cd {DBT_DIR} && dbt deps"
    )

    # 외부 테이블 생성/갱신
    stage_external = BashOperator(
        task_id="stage_external_sources",
        bash_command=(
            f"cd {DBT_DIR} && "
            f"dbt run-operation stage_external_sources {DBT_VARS} "
        )
    )

    # 전체 모델 실행(staging, mart 순차)
    dbt_run = BashOperator(
        task_id="dbt_run",
        bash_command=f"cd {DBT_DIR} && dbt run {DBT_VARS}"
    )

    dbt_test = BashOperator(
        task_id="dbt_test",
        bash_command=f"cd {DBT_DIR} && dbt test {DBT_VARS}"
    )

    dbt_debug >> dbt_deps >> stage_external >> dbt_run >> dbt_test

 

 

 

 

 

$ airflow dags list
dag_id          | fileloc                              | owners  | is_paused
================+======================================+=========+==========
dbt_bq_from_gcs | /opt/airflow/dags/dbt_bq_from_gcs.py | airflow | False
dbt_duckdb_demo | /opt/airflow/dags/dbt_duckdb_demo.py | airflow | False
test            | /opt/airflow/dags/test.py            | airflow | False

 

 

 

backfill <dag 이름> -s -e 를 통해서 지난 날짜의 데이터를 수행해보자.

 airflow dags backfill dbt_bq_from_gcs -s 2025-08-10 -e 2025-08-12 --reset-dagruns
[2025-08-17T07:19:21.617+0000] {dagbag.py:545} INFO - Filling up the DagBag from /opt/airflow/dags
Nothing to clear.
[2025-08-17T07:19:21.686+0000] {executor_loader.py:235} INFO - Loaded executor: LocalExecutor
[2025-08-17T07:19:21.818+0000] {backfill_job_runner.py:905} INFO - No run dates were found for the given dates and dag interval.
airflow@dbf78f46e040:/opt/airflow$ airflow dags backfill dbt_bq_from_gcs -s 2025-08-10 -e 2025-08-12 --reset-dagruns
[2025-08-17T07:21:08.757+0000] {dagbag.py:545} INFO - Filling up the DagBag from /opt/airflow/dags
Nothing to clear.
[2025-08-17T07:21:08.812+0000] {executor_loader.py:235} INFO - Loaded executor: LocalExecutor
[2025-08-17T07:21:09.165+0000] {base_executor.py:149} INFO - Adding to queue: ['airflow', 'tasks', 'run', 'dbt_bq_from_gcs', 'dbt_debug', 'backfill__2025-08-10T00:00:00+00:00', '--depends-on-past', 'ignore', '--local', '--pool', 'default_pool', '--subdir', 'DAGS_FOLDER/dbt_bq_from_gcs.py', '--cfg-path', '/tmp/tmphkoulyt7']
[2025-08-17T07:21:09.177+0000] {base_executor.py:149} INFO - Adding to queue: ['airflow', 'tasks', 'run', 'dbt_bq_from_gcs', 'dbt_debug', 'backfill__2025-08-11T00:00:00+00:00', '--local', '--pool', 'default_pool', '--subdir', 'DAGS_FOLDER/dbt_bq_from_gcs.py', '--cfg-path', '/tmp/tmp2jm_f9p8']
[2025-08-17T07:21:09.189+0000] {base_executor.py:149} INFO - Adding to queue: ['airflow', 'tasks', 'run', 'dbt_bq_from_gcs', 'dbt_debug', 'backfill__2025-08-12T00:00:00+00:00', '--local', '--pool', 'default_pool', '--subdir', 'DAGS_FOLDER/dbt_bq_from_gcs.py', '--cfg-path', '/tmp/tmpn526ptzm']
[2025-08-17T07:21:13.832+0000] {local_executor.py:90} INFO - QueuedLocalWorker running ['airflow', 'tasks', 'run', 'dbt_bq_from_gcs', 'dbt_debug', 'backfill__2025-08-11T00:00:00+00:00', '--local', '--pool', 'default_pool', '--subdir', 'DAGS_FOLDER/dbt_bq_from_gcs.py', '--cfg-path', '/tmp/tmp2jm_f9p8']

 

 

'server > 아키텍쳐' 카테고리의 다른 글

1. dbt tutorial  (0) 2025.08.30
로또 시스템 아키텍처  (3) 2025.08.03
rabbitmq 심화 (persistent / cluster)  (0) 2024.05.05
rabbitmq start  (1) 2024.05.04
airflow scheduler high cpu usage  (1) 2021.11.29