본문 바로가기

server/아키텍쳐

airflow scheduler high cpu usage

 

CPU 99.7% 사용중 ........

전혀 dag이 실행중이지 않는데도 CPU를 사용입니다.

 

너무 잦은 DAG 파일 검색으로 인한 CPU부하와 함께 기존 하위 버전 v1.10.*부터 아래와 같은 버그가 있었습니다.

  1. 스케줄러 버그 - 스케쥴러가 중단없이 계속 반복됨
  2. 웹서버의 높은 CPU 부하

또한 airflow의 디폴트 값 설정값은 속도를 중시하에 셋팅 되어 있습니다. 자신만의 CPU 상황과 dag+task의 수에 맞게 [scheduler] / [webserver] / [core]의 환경변수를 조절 해야 합니다.

 

아래는 이번에 수정한 환경변수 목록 입니다. 이중에서 dag_dir_list_interval / min_file_process_interval 를 높게 설정한것만으로도 CPU 부하를 줄일 수 있습니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
# [scheduler]
 
# 스케줄러의 실행 빈도
scheduler_heartbeat_sec = 60
 
# 스케줄러가 dag을 예약하기 위한 병렬 스레드 수
max_threads = 1
 
# 실행할 프로세스 수를 정의 (CPU CORES 이하)
# DAG를 구문 분석 하는 스케줄러 의 용량이 줄어들고 DAG가 웹 서버 에 나타나는 데 걸리는 시간이 늘어날 수 있습니다.
parsing_processes = 2
 
 
# 스케줄러 "루프" 당 DagRun 을 생성할 최대 DAG 수
max_dagruns_to_create_per_loop = 5
 
 
# DAG 폴더에서 새 파일을 검색해야 하는 시간(초)
dag_dir_list_interval = 3600
 
# 스케줄러가 DAG를 구문 분석하고 DAG에 대한 업데이트가 반영되는 시간
min_file_process_interval = 3600
 
 
# [webserver]
# gunicorn 웹서버가 worker 시작전 대기하는 시간(초)
web_server_worker_timeout = 300
 
# worker 배치를 새로 고치기 전에 기다리는 시간(초)
worker_refresh_interval = 300
 
# Gunicorn 웹서버를 실행 수 ex) 2 * NUM_CPU_CORES + 1
workers = 4
 
 
# [celery]
# CeleryExecutor가 태스크 상태를 동기화하는 데 사용하는 프로세스 수
sync_parallelism = 1
 
 
# [core]
# DAG 직렬화 업데이트 간격
min_serialized_dag_update_interval = 600
# DAG 직렬화 데이터 가져오는 간격
min_serialized_dag_fetch_interval = 60
cs

 

scheduler_heartbeat_sec를 너무 크게 설정하면 위와 같이 경고 문구가 뜨지만 정상 작동한다.

 

스케쥴 대기 상태에서 7~10%로 하락

dag 실행시엔 30% 내외로 하락

 

 

번외

DAG 작성시 DAG 로드 시간에 영향을 미치는 중요한 요소 중 하나는 import로 로드에 많은 시간이 소요될 수 있고 많은 오버헤드를 생성할 수 있지만 로컬 가져오기로 변환하여 쉽게 피할 수 있다는 것입니다.

최상위 import로 numpy모듈을 가져온다면 로컬 import로 한다면 보다 훨씬 더 느리게 구문 분석을  진행됩니다.

 

나쁜 예:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
from datetime import datetime
 
from airflow import DAG
from airflow.operators.python import PythonOperator
 
import numpy as np  # <-- THIS IS A VERY BAD IDEA! DON'T DO THAT!
 
with DAG(
    dag_id="example_python_operator",
    schedule_interval=None,
    start_date=datetime(202111),
    catchup=False,
    tags=["example"],
as dag:
 
    def print_array():
        """Print Numpy array."""
        a = np.arange(15).reshape(35)
        print(a)
        return a
 
    run_this = PythonOperator(
        task_id="print_the_context",
        python_callable=print_array,
    )
cs

 

좋은예

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
from datetime import datetime
 
from airflow import DAG
from airflow.operators.python import PythonOperator
 
with DAG(
    dag_id="example_python_operator",
    schedule_interval=None,
    start_date=datetime(202111),
    catchup=False,
    tags=["example"],
as dag:
 
    def print_array():
        """Print Numpy array."""
        import numpy as np  # <- THIS IS HOW NUMPY SHOULD BE IMPORTED IN THIS CASE
 
        a = np.arange(15).reshape(35)
        print(a)
        return a
 
    run_this = PythonOperator(
        task_id="print_the_context",
        python_callable=print_array,
    )
cs

 

 

끝!

 

 

참고사항

https://github.com/apache/airflow/issues/13637

https://stackoverflow.com/questions/42419834/airbnb-airflow-using-all-system-resources

https://docs.aws.amazon.com/mwaa/latest/userguide/best-practices-tuning.html

https://github.com/apache/airflow/blob/main/airflow/config_templates/default_airflow.cfg