CPU 99.7% 사용중 ........
전혀 dag이 실행중이지 않는데도 CPU를 사용입니다.
너무 잦은 DAG 파일 검색으로 인한 CPU부하와 함께 기존 하위 버전 v1.10.*부터 아래와 같은 버그가 있었습니다.
- 스케줄러 버그 - 스케쥴러가 중단없이 계속 반복됨
- 웹서버의 높은 CPU 부하
또한 airflow의 디폴트 값 설정값은 속도를 중시하에 셋팅 되어 있습니다. 자신만의 CPU 상황과 dag+task의 수에 맞게 [scheduler] / [webserver] / [core]의 환경변수를 조절 해야 합니다.
아래는 이번에 수정한 환경변수 목록 입니다. 이중에서 dag_dir_list_interval / min_file_process_interval 를 높게 설정한것만으로도 CPU 부하를 줄일 수 있습니다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
|
# [scheduler]
# 스케줄러의 실행 빈도
scheduler_heartbeat_sec = 60
# 스케줄러가 dag을 예약하기 위한 병렬 스레드 수
max_threads = 1
# 실행할 프로세스 수를 정의 (CPU CORES 이하)
# DAG를 구문 분석 하는 스케줄러 의 용량이 줄어들고 DAG가 웹 서버 에 나타나는 데 걸리는 시간이 늘어날 수 있습니다.
parsing_processes = 2
# 스케줄러 "루프" 당 DagRun 을 생성할 최대 DAG 수
max_dagruns_to_create_per_loop = 5
# DAG 폴더에서 새 파일을 검색해야 하는 시간(초)
dag_dir_list_interval = 3600
# 스케줄러가 DAG를 구문 분석하고 DAG에 대한 업데이트가 반영되는 시간
min_file_process_interval = 3600
# [webserver]
# gunicorn 웹서버가 worker 시작전 대기하는 시간(초)
web_server_worker_timeout = 300
# worker 배치를 새로 고치기 전에 기다리는 시간(초)
worker_refresh_interval = 300
# Gunicorn 웹서버를 실행 수 ex) 2 * NUM_CPU_CORES + 1
workers = 4
# [celery]
# CeleryExecutor가 태스크 상태를 동기화하는 데 사용하는 프로세스 수
sync_parallelism = 1
# [core]
# DAG 직렬화 업데이트 간격
min_serialized_dag_update_interval = 600
# DAG 직렬화 데이터 가져오는 간격
min_serialized_dag_fetch_interval = 60
|
cs |
scheduler_heartbeat_sec를 너무 크게 설정하면 위와 같이 경고 문구가 뜨지만 정상 작동한다.
스케쥴 대기 상태에서 7~10%로 하락
dag 실행시엔 30% 내외로 하락
번외
DAG 작성시 DAG 로드 시간에 영향을 미치는 중요한 요소 중 하나는 import로 로드에 많은 시간이 소요될 수 있고 많은 오버헤드를 생성할 수 있지만 로컬 가져오기로 변환하여 쉽게 피할 수 있다는 것입니다.
최상위 import로 numpy모듈을 가져온다면 로컬 import로 한다면 보다 훨씬 더 느리게 구문 분석을 진행됩니다.
나쁜 예:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
from datetime import datetime
from airflow import DAG
from airflow.operators.python import PythonOperator
import numpy as np # <-- THIS IS A VERY BAD IDEA! DON'T DO THAT!
with DAG(
dag_id="example_python_operator",
schedule_interval=None,
start_date=datetime(2021, 1, 1),
catchup=False,
tags=["example"],
) as dag:
def print_array():
"""Print Numpy array."""
a = np.arange(15).reshape(3, 5)
print(a)
return a
run_this = PythonOperator(
task_id="print_the_context",
python_callable=print_array,
)
|
cs |
좋은예
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
from datetime import datetime
from airflow import DAG
from airflow.operators.python import PythonOperator
with DAG(
dag_id="example_python_operator",
schedule_interval=None,
start_date=datetime(2021, 1, 1),
catchup=False,
tags=["example"],
) as dag:
def print_array():
"""Print Numpy array."""
import numpy as np # <- THIS IS HOW NUMPY SHOULD BE IMPORTED IN THIS CASE
a = np.arange(15).reshape(3, 5)
print(a)
return a
run_this = PythonOperator(
task_id="print_the_context",
python_callable=print_array,
)
|
cs |
끝!
참고사항
https://github.com/apache/airflow/issues/13637
https://stackoverflow.com/questions/42419834/airbnb-airflow-using-all-system-resources
https://docs.aws.amazon.com/mwaa/latest/userguide/best-practices-tuning.html
https://github.com/apache/airflow/blob/main/airflow/config_templates/default_airflow.cfg
'server > 아키텍쳐' 카테고리의 다른 글
rabbitmq 심화 (persistent / cluster) (0) | 2024.05.05 |
---|---|
rabbitmq start (0) | 2024.05.04 |
캐시 (0) | 2021.07.14 |
Consistent Hashing (일관된 해싱) (0) | 2021.07.12 |
airflow 시간대가 다른 두개의 dag을 ExternalTaskSensor 사용하기 (0) | 2021.04.16 |