본문 바로가기

server/kafka

Kafka producer/consumer 튜닝 (acks, batch.size, linger.ms 등)

 

resources:
    requests:
      cpu: 500m
      memory: 512Mi

 

top - 05:43:42 up 1 day, 10:29,  0 users,  load average: 0.64, 0.88, 1.29
Tasks:   5 total,   1 running,   4 sleeping,   0 stopped,   0 zombie
%Cpu(s):  6.8 us,  1.0 sy,  0.0 ni, 91.4 id,  0.2 wa,  0.0 hi,  0.6 si,  0.0 st
MiB Mem :   7945.9 total,    805.2 free,   3249.5 used,   3891.1 buff/cache
MiB Swap:      0.0 total,      0.0 free,      0.0 used.   4355.2 avail Mem

 

 

CPU는 6% 메모리는 50% 정도를 사용하고 있다.

 

 

평소때는 초당 20개씩 쌓이지만, 피크일때는 200개정도 쌓인다. (주기적으로 알람을 보낼때 사용자 로그가 쌓이고 있다) 

사실 데이터양이 늘어도 CPU50%를 넘지 않는다. 참고로 초당 600개가 넘을때에도 fluentd두대로 충분히 커버 가능하면서 자원은 70%를 넘지 않았다.

 

현재는 CPU와 메모리가 노는김에 튜닝을 해볼까 해서, 한번 시도해보았다. 

 


 

 

먼저 producer로는 fluentd를 사용하고 있다. 

현재 셋팅되어 있는 코드이다.

<match **>
  @type kafka2

  brokers <카프카서버>
  use_event_time true

  # buffer settings
  <buffer topic>
    @type file
    path /var/log/fluent/buffer/td/
    flush_interval 3s
  </buffer>

  # data type settings
  <format>
    @type json
  </format>

  # topic settings
  topic_key topic
  
  # producer settings
  required_acks 1
  compression_codec lz4

</match>

 

 

producer의 경우에는 두가지 정도로 튜닝하는데, 크게 로그 수집 or 스트리밍 데이터 수집 으로 보면 된다. 

각각의 튜닝은 

 

  • 로그 수집기(Logstash 또는 Fluentd) → Kafka
    ex : 이벤트 수가 적고 요청이 잦다면, batch.size=64KB, linger.ms=50ms, compression=lz4 로 튜닝해 네트워크/처리 효율을 높임.
  • 실시간 마켓 가격 데이터 수집
    ex : 일부 유실 허용 가능하므로 acks=0, batch.size=32KB, linger.ms=5ms 설정해 지연 최소화.

로 보면 된다. 

 

 

 

 

나는 다음과 같이 튜닝해 보았다. 

 

1. flush_interval 3s → 더 길게 (5~10s)

  • 현 상태: 3초마다 flush
  • 문제점: 트래픽이 많지 않을 경우 너무 자주 flush → I/O 낭비 가능성 있음 
  • 개선안: 5s로 변경 
  • 지연(latency)과 처리량(throughput) 사이의 트레이드오프 이므로 적절하게 설정해야 함 (1초로 하면 빠르게 자주 보냄 -> I/O 많아짐, 10초로 느리게 보냄 -> I/O는 적어지지만 데이터가 느리게 감)

  

2. chunk_limit_size 8mb
  • 너무 작으면 batch가 쪼개짐 → Kafka 전송 요청 수 증가
  • 너무 크면 flush 지연 → 기본값(8MB) 정도 유지

 

3. queued_chunks_limit_size 512

 

  • 버퍼 디렉터리 내에 적재 가능한 청크 수 제한
  • 안 걸어두면 디스크가 가득 차는 위험 존재
  • 자원이 충분하다면 128~512 정도 권장

 

4. linger_ms 0 -> 50

- 현상태 : 0 (메시지마다 바로 전송)

- 데이터가 많으면 벌크로 보내지 않아, I/O 낭비 

- 50ms로 변경 

 

 

5. batch_size  16384 (16KB) -> 65536 (64KB)

- 현상태 :  16384 (16KB)

- linger_ms와 마찬가지로 배치 사이즈를 크게 해서 여러개를 한번에 전송 

 

 

 

추가로 worker와 멀티프로세서 관련은 셋팅을 못했는데, 사용하는 패키지가 지원하지 않아, 셋팅을 할수가 없었다...(할수 있었으면 워커와 멀티프로세서만 늘렸어도 충분했을꺼 같다.)

그리고 required_acks도 바꾸지 않았는데, ack를 모두 확인하는 all 을 하면 느려질꺼 같아 1로 그대로 두었다. (혹시나 데이터 누락이 생긴다면 변경해주면 된다)

 

최종 결과물 

<match **>
  @type kafka2

  brokers <카프카 서버>
  use_event_time true

  # buffer settings
  <buffer topic>
    @type file
    path /var/log/fluent/buffer/td/
    flush_interval 5s
    chunk_limit_size 8MB
    queued_chunks_limit_size 128
  </buffer>

  # data type settings
  <format>
    @type json
  </format>

  # topic settings
  topic_key topic
  
  # producer settings
  required_acks 1
  compression_codec lz4
  batch_size 65536
  linger_ms 50
  max_send_retries 5

</match>

 

 

 

 

 


 

이번엔 consumer 차례이다. 

Kafka consumer 역할은 Logstash 를 사용중이며, 카프카의 데이터를 엘라스틱 서치에 전송하는 역활을 한다.

 

변경전

input {
  kafka {
    bootstrap_servers => "카프카서버"
    codec => "json"
    auto_offset_reset => "latest"
    enable_auto_commit => "true"
    group_id => "data_logstash-01"
    client_id => "data_es_logstash-01"
    topics => "토픽이름"
  }
}

output {
  elasticsearch {
    codec => "json"
    hosts => "엘라스틱서치서버"
    index => "인덱스-%{+YYYYMMdd}"
    workers => 1
  }
}

 

 

 

변경후

input {
  kafka {
    bootstrap_servers => "카프카서버"
    codec => "json"
    auto_offset_reset => "latest"
    enable_auto_commit => "true"
    group_id => "data_logstash-01"
    client_id => "data_es_logstash-01"
    topics => "토픽이름"
    
    
    consumer_threads => 3     # 스레드 1 -> 3으로 증가
    fetch_min_bytes => 16384  # 배치 사이즈 증가
    fetch_max_wait_ms => 1000 # batch 대기 증가
    max_poll_records => 1000  # 기본 500 -> 1000 batch 읽기 증가
  }
}

output {
  elasticsearch {
    codec => "json"
    hosts => "엘라스틱서치서버"
    index => "인덱스-%{+YYYYMMdd}"
    
    flush_size => 2000        # bulk 1000 -> 2000 전송 사이즈 증가 
    idle_flush_time => 5      # flush 1 -> 5 대기 시간 증가 
    retry_on_conflict => 3    # 저번 충돌시 최대 3회 재시도 
    retry_max_interval => 10  # 전송 실패시 60s -> 10s 재시도 지연 방지 
    workers => 4              # 병렬 전송 증가
  }
}

 

 

 

만일 CPU와 메모리가 남고, I/O도 남는다면 전체적으로 벌크 형식으로 수정하여, 최적의 효과를 볼수 있다. (물론 지연 시간이 늘어날수도 있으니 모니터링을 잘 보면서 튜닝을 하면 된다.)

'server > kafka' 카테고리의 다른 글

[kafka] KIP-98 Exactly Once Delivery and Transactional Messaging  (0) 2025.11.21
[kafka] kafka connect to es  (1) 2025.11.17
kafka-reassign-partitions  (3) 2025.07.30
fluentd grpc ruby 버전 에러  (3) 2025.07.29
ksqldb  (1) 2025.07.17