airflow ExternalTaskSensor의 설명은 아래 블로그가 잘되어 있으니 참고!!
tommybebe.github.io/2020/11/30/airflow-external-task-sensor/
나의 경우 ExternalTaskSensor를 이용해서 2개의 dag이 끝나는 시점을 감지한 후 새로운 dag을 실행해야 하는 상황이었다.
* 감지해야 하는 2개의 dag을 dag_parent_A / dag_parent_B으로, 실행해야 하는 dag은 dag_child으로 지칭한다.
각 dag의 실행 시간은 아래와 같다.
dag_parent_A schedule: 0 0,6,12,18 * * *
dag_parent_B schedule: 0 3,9,15,21 * * *
dag_child schedule: 0 3,9,15,21 * * *
모두 6시간 간격으로 실행하는 거였지만 dag_parent_A의 실행시간이 3시간 빠르다!!
이럴 경우 ExternalTaskSensor에서 execution_date_fn를 수정해주면 된다.
execution_date_fn의 설명을 보면 "sensing 대상의 execution_date을 입력"이다.
wait_for_dag_parent_A = ExternalTaskSensor(
task_id='wait_for_dag_parent_A',
external_dag_id='dag_parent_A',
external_task_id='dag_parent_A_task',
start_date=datetime(2021, 4, 10),
execution_date_fn=lambda x: x,
mode='reschedule',
timeout=14400,
pool='dag_child'
)
wait_for_dag_parent_B = ExternalTaskSensor(
task_id='wait_for_dag_parent_B',
external_dag_id='dag_parent_B',
external_task_id='dag_parent_B_task',
start_date=datetime(2021, 4, 10),
execution_date_fn=lambda x: x - timedelta(hours=3), # sensing 대상의 execution_date, 매칭되지 않을시 성공하지 않음
mode='reschedule',
timeout=14400,
pool='dag_child'
)
wait_for_dag_parent_A >> t1
wait_for_dag_parent_B >> t1
위와 같이 execution_date_fn는 함수이므로 람다식으로 ExternalTaskSensor으로 감지해야 하는 Dag의 시작 시간(스케줄 시간)을 맞춰주면 된다. 나의 경우 3시간이 빨라서 timedelta() 함수로 3시간을 빼주었다.
주의 하자!! ExternalTaskSensor는 무조건 해당 dag의 스케줄 시간이 정확히 맞는 run만 sensing 한다.
스케쥴 시간이 맞지 않다면 timeout이 끝날 때까지 기다리다가 죽는다. execution_date_fn이 없는 경우엔 자신의 스케줄과 맞는 시간대만 체크하기 때문에 스케줄이 다른 dag을 감지할 경우 꼭 execution_date_fn를 추가하자!!
- 감지하는 parent dag이 error or run 중이라면 ExternalTaskSensor는 up_for_reschedule 상태가 되고 1분마다 queued로 다시 돌아와서 run 상태가 된다. 한마디로 1분마다 상태를 체크하는 형태가 된다.
'server > 아키텍쳐' 카테고리의 다른 글
캐시 (0) | 2021.07.14 |
---|---|
Consistent Hashing (일관된 해싱) (0) | 2021.07.12 |
코딩을 하기전 해야할 일들 (0) | 2018.12.28 |
web developer roadmap (0) | 2018.12.20 |
DSR ( direct server return ) (0) | 2018.12.11 |