from datetime import datetime
from airflow import DAG
from airflow.operators.python import PythonOperator
from cosmos import DbtTaskGroup, ProjectConfig, ProfileConfig, ExecutionConfig
import os
DBT_DIR = "/opt/airflow/dbt/demo_dbt" # 네 프로젝트 경로
# Cosmos 설정
project_config = ProjectConfig(dbt_project_path=DBT_DIR)
profile_config = ProfileConfig(
profile_name="demo_dbt",
target_name="dev",
profiles_yml_filepath=f"{DBT_DIR}/profiles/profiles.yml",
)
exec_config = ExecutionConfig(
dbt_executable_path="/home/airflow/.local/bin/dbt"
)
from datetime import datetime, date, timezone
from decimal import Decimal
def _to_jsonable(row: dict) -> dict:
out = {}
for k, v in row.items():
if isinstance(v, datetime):
# tz 미지정이면 UTC로 간주
if v.tzinfo is None:
v = v.replace(tzinfo=timezone.utc)
out[k] = v.isoformat(timespec="microseconds") # 예: 2025-09-01T01:02:03.123456+00:00
elif isinstance(v, date):
out[k] = v.isoformat() # YYYY-MM-DD
elif isinstance(v, Decimal):
# 정밀도 필요 없으면 float, 필요하면 str로
out[k] = float(v)
else:
out[k] = v
return out
def load_users_to_bq(proc_date: str, proc_date_yyyymmdd: str, **_):
"""
MySQL의 users 테이블에서 날짜별(signup_ts 기준) 데이터를 읽어
BigQuery의 dbt_tutorial.users 파티션(YYYYMMDD)에 WRITE_TRUNCATE로 적재.
"""
import pymysql
from google.cloud import bigquery
from google.api_core.exceptions import NotFound
# MySQL 접속 정보
mysql_host = os.getenv("MYSQL_HOST", "mysql")
mysql_user = os.getenv("MYSQL_USER", "app")
mysql_password = os.getenv("MYSQL_PASSWORD", "app")
mysql_db = os.getenv("MYSQL_DB", "appdb")
# BigQuery 정보
project = os.getenv("GCP_PROJECT_ID") or bigquery.Client().project
dataset = os.getenv("BQ_RAW_DATASET", "dbt_tutorial") # raw용 데이터셋명
location = os.getenv("BQ_LOCATION", "asia-northeast3")
# 1) MySQL에서 대상 날짜 rows 가져오기
conn = pymysql.connect(
host=mysql_host, user=mysql_user, password=mysql_password,
database=mysql_db, cursorclass=pymysql.cursors.DictCursor
)
with conn:
with conn.cursor() as cur:
cur.execute(
"""
SELECT user_id, email, country_code, signup_ts, updated_at
FROM users
WHERE DATE(signup_ts) = %s
""",
(proc_date,)
)
rows = cur.fetchall()
if not rows:
print(f"[INFO] No rows for {proc_date}. Skip load.")
return
# 2) BigQuery 데이터셋 보장
client = bigquery.Client()
dataset_ref = bigquery.Dataset(f"{project}.{dataset}")
try:
client.get_dataset(dataset_ref)
except NotFound:
dataset_ref.location = location
client.create_dataset(dataset_ref)
print(f"[INFO] Created dataset {project}.{dataset} in {location}")
# 3) 파티션 데코레이터로 해당 일자만 교체
table_base = "users"
table_with_partition = f"{project}.{dataset}.{table_base}${proc_date_yyyymmdd}"
job_config = bigquery.LoadJobConfig(
write_disposition=bigquery.WriteDisposition.WRITE_TRUNCATE,
schema=[
bigquery.SchemaField("user_id", "INTEGER"),
bigquery.SchemaField("email", "STRING"),
bigquery.SchemaField("country_code", "STRING"),
bigquery.SchemaField("signup_ts", "TIMESTAMP"),
bigquery.SchemaField("updated_at", "TIMESTAMP"),
]
)
json_rows = [_to_jsonable(r) for r in rows]
load_job = client.load_table_from_json(json_rows, table_with_partition, job_config=job_config)
load_job.result()
print(f"[OK] Loaded {len(rows)} rows into {table_with_partition}")
with DAG(
dag_id="mysql_to_bq_users",
start_date=datetime(2025, 8, 31),
schedule="@daily", # 매일 실행(논리일자=ds)
catchup=False, # 과거일자 백필 가능
max_active_runs=1,
tags=["mysql","bigquery","dbt","cosmos","staging","mart"],
) as dag:
# 템플릿 변수(날짜)
PROC_DATE = "{{ ds }}"
PROC_DATE_YYYYMMDD = "{{ ds_nodash }}"
extract_users = PythonOperator(
task_id="extract_users_from_mysql_to_bq",
python_callable=load_users_to_bq,
op_kwargs={
"proc_date": PROC_DATE,
"proc_date_yyyymmdd": PROC_DATE_YYYYMMDD,
},
)
# dbt 런타임 vars (users 포함 모든 모델이 그날 데이터만 처리)
operator_args = {
"vars": {
"proc_date": PROC_DATE,
"proc_date_yyyymmdd": PROC_DATE_YYYYMMDD,
}
}
print("operator_args", operator_args)
dbt_tg = DbtTaskGroup(
group_id="dbt_build",
project_config=project_config,
profile_config=profile_config,
execution_config=exec_config,
operator_args=operator_args,
)
extract_users >> dbt_tg
mysql/init/01_seed.sql
CREATE TABLE IF NOT EXISTS users (
user_id BIGINT PRIMARY KEY,
email VARCHAR(255),
country_code VARCHAR(2),
signup_ts DATETIME,
updated_at DATETIME
);
INSERT INTO users (user_id, email, country_code, signup_ts, updated_at) VALUES
(1, 'alice@example.com', 'KR', '2025-08-10 03:12:00', '2025-08-10 08:00:00'),
(2, 'bob@example.com', 'US', '2025-08-10 11:30:00', '2025-08-10 15:10:00'),
(3, 'cindy@example.com', 'JP', '2025-08-11 09:00:00', '2025-08-11 10:00:00'),
(4, 'dave@example.com', 'KR', '2025-08-12 01:00:00', '2025-08-12 01:10:00'),
(5, 'erin@example.com', 'US', '2025-08-12 08:05:00', '2025-08-12 08:05:00'),
(6, 'finn@example.com', 'KR', '2025-08-12 11:45:00', '2025-08-12 11:45:00'),
(7, '1111@example.com', 'US', '2025-08-31 08:05:00', '2025-08-31 08:05:00'),
(8, '2222@example.com', 'KR', '2025-08-31 11:45:00', '2025-08-31 11:45:00'),
(9, '333@example.com', 'KR', '2025-09-01 03:12:00', '2025-08-31 08:05:00'),
(10, '444@example.com', 'US', '2025-09-01 11:30:00', '2025-08-31 11:45:00');
빅쿼리에 users 테이블 추가
CREATE TABLE IF NOT EXISTS `<프로젝트>.dbt_tutorial.users` (
user_id INT64,
email STRING,
country_code STRING,
signup_ts TIMESTAMP,
updated_at TIMESTAMP
)
PARTITION BY DATE(signup_ts);




'Database > 모델링' 카테고리의 다른 글
| 3. Airflow + Astronomer Cosmos + dbt (0) | 2025.09.09 |
|---|---|
| 프로젝트는 절대로 생각대로 흘러가지 않는다. (0) | 2011.04.04 |
| [설계] CRC카드 , 클래스다이어그램 (0) | 2011.04.02 |
| DA# 교육에서 그렸던거... (0) | 2010.11.24 |
| 간단한 DB 이해(초보) (0) | 2010.09.25 |