본문 바로가기

server/kafka

kafka -> fluentd -> kafka 로 데이터 전송하기 (local)

이번에 받은 업무가 kafka에 로그 데이터가 있는 것을 fluentd로 consumer 하고 kafka로 다시 전송하는 작업을 진행하기 전에 local에서 테스트 한 코드를 여기에 적는다. (실제 적용은 pub/sub -> fluentd -> kafka 로 되어 있다.)

 

 

1. kafka docker-compose로 실행하기 

$docker-compose up

으로 바로 실행

version: '3.8'

services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.5.0
    container_name: zookeeper
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
    networks:
      - kafka-net

  kafka:
    image: confluentinc/cp-kafka:7.5.0
    container_name: kafka
    ports:
      - "9092:9092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181

      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9093,PLAINTEXT_HOST://0.0.0.0:9092
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9093,PLAINTEXT_HOST://localhost:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
    depends_on:
      - zookeeper
    networks:
      - kafka-net

  kafka-ui:
    image: provectuslabs/kafka-ui:latest
    container_name: kafka-ui
    ports:
      - "8080:8080"
    environment:
      KAFKA_CLUSTERS_0_NAME: local-kafka
      KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9093
      DYNAMIC_CONFIG_ENABLED: true
    depends_on:
      - kafka
    networks:
      - kafka-net

networks:
  kafka-net:

 

 

 

2. kafka-ui에서 토픽 만들기 

localhost:8080으로 접속하면 kafka-ui로 들어갈수 있다. 

메뉴에서 토픽으로 들어가서 이름과 파티션은 1로 넣고 생성하면 된다. 

- source-topic : 첫번째 카프카에서 시작하는 토픽 (원본 데이터)

- sink-topic : 두번째 카프카에서 받는 토픽 

 

 

3. fluentd docker 만들기 

docker는 매우 심플하다. kafka 플러그인만 설치하면 된다. 

FROM fluent/fluentd:v1.16.3-debian-1.0

USER root

RUN gem install fluent-plugin-kafka --no-document

USER fluent

 

중요한것은 fluent.conf 파일이다. 

코드를 보면 <source> 에서 source-topic의 데이터를 가져온다음 

<match> 에서 해당 데이터를 sink-topic로 보내는것을 알수 있다. 

 

<source>에서 brokers가 kafka로 되어 있는데, 바로 다음세션에서  네트워크 설정으로 docker-compose와 연동시킬것이다.

<system>
  log_level debug
</system>

<source>
  @type kafka_group
  brokers kafka:9093
  topics source-topic
  format json
  consumer_group fluentd-group
</source>

<match source-topic>
  @type kafka2
  brokers kafka:9093
  default_topic sink-topic
  use_event_time true
  compression_codec gzip

  <format>
    @type json
  </format>

  <buffer topic>
    @type memory
    flush_interval 3s
  </buffer>
</match>

 

fluntd는 다음 명령어 빌드하고

$docker build -t fluentd-custom:latest .

 

docker-compose의 네트워크도 확인해보자. (docker와 네트워크가 다르므로 같게 설정해야 한다.)

$ docker network ls  
NETWORK ID     NAME                      DRIVER    SCOPE
a65523f02dd1   bridge                    bridge    local
fb1c6e28a871   host                      host      local
dfd977c8a5cb   none                      null      local
fce124a584a6   test-fluentd_kafka-net    bridge    local

 

네트워크 이름이 "test-fluentd_kafka-net"으로 되어 있다. 

이제 이것을 docker 실행시 넣어주면 된다. 

$ docker run --rm \
  --name fluentd \
  --network test-fluentd_kafka-net \
  -v $(pwd)/fluent.conf:/fluentd/etc/fluent.conf \
  -v $(pwd)/buffer:/var/log/fluent/buffer/td \
  -p 24224:24224 \
  -p 24224:24224/udp \
  fluentd-custom

 

 

4. 이제 마지막 sink-topic의 컨슈머 단이다. (데이터 확인용)

kafka 로 접속해서 컨슈머로 데이터를 확인해준다. 이제 데이터가 들어오면 바로바로 콘솔에 출력이 될것이다.

docker exec -it kafka kafka-console-consumer \
  --bootstrap-server localhost:9093 \
  --topic sink-topic \
  --from-beginning

 

 

5. 마지막으로 실제 데이터를 생성할 python 파일이다. 

해당 python에서 source-topic으로 데이터를 넣어줄것이다.  실행시 패키지 "kafka-python"을 설치해줘야 한다. 

아래 코드에서는 단순하게 message, timestamp를 넘겨준다. 

from kafka import KafkaProducer
import json
from datetime import datetime, timezone
import time 

producer = KafkaProducer(
    bootstrap_servers='localhost:9092',
    value_serializer=lambda v: json.dumps(v).encode('utf-8')
)


for _ in range(100):
    message = {
        "message": "hello from python!",
        "timestamp": datetime.now(timezone.utc).isoformat()
    }

    producer.send('source-topic', message)
    producer.flush()
    time.sleep(1)

 

 

이제 파이썬을 실행해보면 결국

파이썬 -> 카프카 -> fluentd -> 카프카 -> 컨슈머  순으로 출력되는것을 볼 수 있다. 

 

파이썬을 실행해보면 다음 화면을 볼수 있다. 

 

혹시 안된다면 단계 단계마다 데이터가 가는지 확인이 필요할것이다!!

 

+ 추가 

fluentd에 filter를 추가해서 데이터를 변경해서 내보낼수 있다. 

이번에 fluent.conf를 다음과 같이 수정해보자. 

- processed_at 칼럼 추가 

- upper_message 칼럼 추가

<system>
  log_level debug
</system>

<source>
  @type kafka_group
  brokers kafka:9093
  topics source-topic
  format json
  consumer_group fluentd-group
</source>

<filter source-topic>
  @type record_transformer
  enable_ruby true
  <record>
    tag_from = ${tag}
    processed_at ${time.strftime('%Y-%m-%dT%H:%M:%S%z')}
    upper_message ${record["message"].upcase}
  </record>
  remove_keys timestamp
</filter>


<match source-topic>
  @type kafka2
  brokers kafka:9093
  default_topic sink-topic
  use_event_time true
  compression_codec gzip

  <format>
    @type json
  </format>

  <buffer topic>
    @type memory
    flush_interval 3s
  </buffer>
</match>

 

후에 fluentd 도커만 다시 실행해보면 다음과 같이 출력된다. 

 

 

끝!