본문 바로가기

Database/모델링

4. mysql to bigquery using dbt

 

 

from datetime import datetime
from airflow import DAG
from airflow.operators.python import PythonOperator
from cosmos import DbtTaskGroup, ProjectConfig, ProfileConfig, ExecutionConfig

import os

DBT_DIR = "/opt/airflow/dbt/demo_dbt"   # 네 프로젝트 경로

# Cosmos 설정
project_config = ProjectConfig(dbt_project_path=DBT_DIR)
profile_config = ProfileConfig(
    profile_name="demo_dbt",
    target_name="dev",
    profiles_yml_filepath=f"{DBT_DIR}/profiles/profiles.yml",
)
exec_config = ExecutionConfig(
    dbt_executable_path="/home/airflow/.local/bin/dbt"
)


from datetime import datetime, date, timezone
from decimal import Decimal

def _to_jsonable(row: dict) -> dict:
    out = {}
    for k, v in row.items():
        if isinstance(v, datetime):
            # tz 미지정이면 UTC로 간주
            if v.tzinfo is None:
                v = v.replace(tzinfo=timezone.utc)
            out[k] = v.isoformat(timespec="microseconds")  # 예: 2025-09-01T01:02:03.123456+00:00
        elif isinstance(v, date):
            out[k] = v.isoformat()  # YYYY-MM-DD
        elif isinstance(v, Decimal):
            # 정밀도 필요 없으면 float, 필요하면 str로
            out[k] = float(v)
        else:
            out[k] = v
    return out


def load_users_to_bq(proc_date: str, proc_date_yyyymmdd: str, **_):
    """
    MySQL의 users 테이블에서 날짜별(signup_ts 기준) 데이터를 읽어
    BigQuery의 dbt_tutorial.users 파티션(YYYYMMDD)에 WRITE_TRUNCATE로 적재.
    """
    import pymysql
    from google.cloud import bigquery
    from google.api_core.exceptions import NotFound

    # MySQL 접속 정보
    mysql_host = os.getenv("MYSQL_HOST", "mysql")
    mysql_user = os.getenv("MYSQL_USER", "app")
    mysql_password = os.getenv("MYSQL_PASSWORD", "app")
    mysql_db = os.getenv("MYSQL_DB", "appdb")

    # BigQuery 정보
    project = os.getenv("GCP_PROJECT_ID") or bigquery.Client().project
    dataset = os.getenv("BQ_RAW_DATASET", "dbt_tutorial")  # raw용 데이터셋명
    location = os.getenv("BQ_LOCATION", "asia-northeast3")

    # 1) MySQL에서 대상 날짜 rows 가져오기
    conn = pymysql.connect(
        host=mysql_host, user=mysql_user, password=mysql_password,
        database=mysql_db, cursorclass=pymysql.cursors.DictCursor
    )
    with conn:
        with conn.cursor() as cur:
            cur.execute(
                """
                SELECT user_id, email, country_code, signup_ts, updated_at
                FROM users
                WHERE DATE(signup_ts) = %s
                """,
                (proc_date,)
            )
            rows = cur.fetchall()

    if not rows:
        print(f"[INFO] No rows for {proc_date}. Skip load.")
        return

    # 2) BigQuery 데이터셋 보장
    client = bigquery.Client()
    dataset_ref = bigquery.Dataset(f"{project}.{dataset}")
    try:
        client.get_dataset(dataset_ref)
    except NotFound:
        dataset_ref.location = location
        client.create_dataset(dataset_ref)
        print(f"[INFO] Created dataset {project}.{dataset} in {location}")

    # 3) 파티션 데코레이터로 해당 일자만 교체
    table_base = "users"
    table_with_partition = f"{project}.{dataset}.{table_base}${proc_date_yyyymmdd}"

    job_config = bigquery.LoadJobConfig(
        write_disposition=bigquery.WriteDisposition.WRITE_TRUNCATE,
        schema=[
            bigquery.SchemaField("user_id", "INTEGER"),
            bigquery.SchemaField("email", "STRING"),
            bigquery.SchemaField("country_code", "STRING"),
            bigquery.SchemaField("signup_ts", "TIMESTAMP"),
            bigquery.SchemaField("updated_at", "TIMESTAMP"),
        ]
    )
    json_rows = [_to_jsonable(r) for r in rows]
    load_job = client.load_table_from_json(json_rows, table_with_partition, job_config=job_config)
    load_job.result()
    print(f"[OK] Loaded {len(rows)} rows into {table_with_partition}")

with DAG(
    dag_id="mysql_to_bq_users",
    start_date=datetime(2025, 8, 31),
    schedule="@daily",         # 매일 실행(논리일자=ds)
    catchup=False,              # 과거일자 백필 가능
    max_active_runs=1,
    tags=["mysql","bigquery","dbt","cosmos","staging","mart"],
) as dag:

    # 템플릿 변수(날짜)
    PROC_DATE = "{{ ds }}"
    PROC_DATE_YYYYMMDD = "{{ ds_nodash }}"

    extract_users = PythonOperator(
        task_id="extract_users_from_mysql_to_bq",
        python_callable=load_users_to_bq,
        op_kwargs={
            "proc_date": PROC_DATE,
            "proc_date_yyyymmdd": PROC_DATE_YYYYMMDD,
        },
    )

    # dbt 런타임 vars (users 포함 모든 모델이 그날 데이터만 처리)
    operator_args = {
        "vars": {
            "proc_date": PROC_DATE,
            "proc_date_yyyymmdd": PROC_DATE_YYYYMMDD,
        }
    }

    print("operator_args", operator_args)

    dbt_tg = DbtTaskGroup(
        group_id="dbt_build",
        project_config=project_config,
        profile_config=profile_config,
        execution_config=exec_config,
        operator_args=operator_args,
    )

    extract_users >> dbt_tg

 

 

mysql/init/01_seed.sql

CREATE TABLE IF NOT EXISTS users (
  user_id BIGINT PRIMARY KEY,
  email VARCHAR(255),
  country_code VARCHAR(2),
  signup_ts DATETIME,
  updated_at DATETIME
);

INSERT INTO users (user_id, email, country_code, signup_ts, updated_at) VALUES
(1, 'alice@example.com', 'KR', '2025-08-10 03:12:00', '2025-08-10 08:00:00'),
(2, 'bob@example.com',   'US', '2025-08-10 11:30:00', '2025-08-10 15:10:00'),
(3, 'cindy@example.com', 'JP', '2025-08-11 09:00:00', '2025-08-11 10:00:00'),
(4, 'dave@example.com',  'KR', '2025-08-12 01:00:00', '2025-08-12 01:10:00'),
(5, 'erin@example.com',  'US', '2025-08-12 08:05:00', '2025-08-12 08:05:00'),
(6, 'finn@example.com',  'KR', '2025-08-12 11:45:00', '2025-08-12 11:45:00'),
(7, '1111@example.com',  'US', '2025-08-31 08:05:00', '2025-08-31 08:05:00'),
(8, '2222@example.com',  'KR', '2025-08-31 11:45:00', '2025-08-31 11:45:00'),
(9, '333@example.com',  'KR', '2025-09-01 03:12:00', '2025-08-31 08:05:00'),
(10, '444@example.com',  'US', '2025-09-01 11:30:00', '2025-08-31 11:45:00');

 

 

 

 

 

빅쿼리에 users 테이블 추가

CREATE TABLE IF NOT EXISTS `<프로젝트>.dbt_tutorial.users` (
  user_id INT64,
  email STRING,
  country_code STRING,
  signup_ts TIMESTAMP,
  updated_at TIMESTAMP
)
PARTITION BY DATE(signup_ts);