이번에 받은 업무가 kafka에 로그 데이터가 있는 것을 fluentd로 consumer 하고 kafka로 다시 전송하는 작업을 진행하기 전에 local에서 테스트 한 코드를 여기에 적는다. (실제 적용은 pub/sub -> fluentd -> kafka 로 되어 있다.)
1. kafka docker-compose로 실행하기
$docker-compose up
으로 바로 실행
version: '3.8'
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.5.0
container_name: zookeeper
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
networks:
- kafka-net
kafka:
image: confluentinc/cp-kafka:7.5.0
container_name: kafka
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9093,PLAINTEXT_HOST://0.0.0.0:9092
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9093,PLAINTEXT_HOST://localhost:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
depends_on:
- zookeeper
networks:
- kafka-net
kafka-ui:
image: provectuslabs/kafka-ui:latest
container_name: kafka-ui
ports:
- "8080:8080"
environment:
KAFKA_CLUSTERS_0_NAME: local-kafka
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9093
DYNAMIC_CONFIG_ENABLED: true
depends_on:
- kafka
networks:
- kafka-net
networks:
kafka-net:
2. kafka-ui에서 토픽 만들기
localhost:8080으로 접속하면 kafka-ui로 들어갈수 있다.
메뉴에서 토픽으로 들어가서 이름과 파티션은 1로 넣고 생성하면 된다.
- source-topic : 첫번째 카프카에서 시작하는 토픽 (원본 데이터)
- sink-topic : 두번째 카프카에서 받는 토픽
3. fluentd docker 만들기
docker는 매우 심플하다. kafka 플러그인만 설치하면 된다.
FROM fluent/fluentd:v1.16.3-debian-1.0
USER root
RUN gem install fluent-plugin-kafka --no-document
USER fluent
중요한것은 fluent.conf 파일이다.
코드를 보면 <source> 에서 source-topic의 데이터를 가져온다음
<match> 에서 해당 데이터를 sink-topic로 보내는것을 알수 있다.
<source>에서 brokers가 kafka로 되어 있는데, 바로 다음세션에서 네트워크 설정으로 docker-compose와 연동시킬것이다.
<system>
log_level debug
</system>
<source>
@type kafka_group
brokers kafka:9093
topics source-topic
format json
consumer_group fluentd-group
</source>
<match source-topic>
@type kafka2
brokers kafka:9093
default_topic sink-topic
use_event_time true
compression_codec gzip
<format>
@type json
</format>
<buffer topic>
@type memory
flush_interval 3s
</buffer>
</match>
fluntd는 다음 명령어 빌드하고
$docker build -t fluentd-custom:latest .
docker-compose의 네트워크도 확인해보자. (docker와 네트워크가 다르므로 같게 설정해야 한다.)
$ docker network ls
NETWORK ID NAME DRIVER SCOPE
a65523f02dd1 bridge bridge local
fb1c6e28a871 host host local
dfd977c8a5cb none null local
fce124a584a6 test-fluentd_kafka-net bridge local
네트워크 이름이 "test-fluentd_kafka-net"으로 되어 있다.
이제 이것을 docker 실행시 넣어주면 된다.
$ docker run --rm \
--name fluentd \
--network test-fluentd_kafka-net \
-v $(pwd)/fluent.conf:/fluentd/etc/fluent.conf \
-v $(pwd)/buffer:/var/log/fluent/buffer/td \
-p 24224:24224 \
-p 24224:24224/udp \
fluentd-custom
4. 이제 마지막 sink-topic의 컨슈머 단이다. (데이터 확인용)
kafka 로 접속해서 컨슈머로 데이터를 확인해준다. 이제 데이터가 들어오면 바로바로 콘솔에 출력이 될것이다.
docker exec -it kafka kafka-console-consumer \
--bootstrap-server localhost:9093 \
--topic sink-topic \
--from-beginning
5. 마지막으로 실제 데이터를 생성할 python 파일이다.
해당 python에서 source-topic으로 데이터를 넣어줄것이다. 실행시 패키지 "kafka-python"을 설치해줘야 한다.
아래 코드에서는 단순하게 message, timestamp를 넘겨준다.
from kafka import KafkaProducer
import json
from datetime import datetime, timezone
import time
producer = KafkaProducer(
bootstrap_servers='localhost:9092',
value_serializer=lambda v: json.dumps(v).encode('utf-8')
)
for _ in range(100):
message = {
"message": "hello from python!",
"timestamp": datetime.now(timezone.utc).isoformat()
}
producer.send('source-topic', message)
producer.flush()
time.sleep(1)
이제 파이썬을 실행해보면 결국
파이썬 -> 카프카 -> fluentd -> 카프카 -> 컨슈머 순으로 출력되는것을 볼 수 있다.
파이썬을 실행해보면 다음 화면을 볼수 있다.
혹시 안된다면 단계 단계마다 데이터가 가는지 확인이 필요할것이다!!
+ 추가
fluentd에 filter를 추가해서 데이터를 변경해서 내보낼수 있다.
이번에 fluent.conf를 다음과 같이 수정해보자.
- processed_at 칼럼 추가
- upper_message 칼럼 추가
<system>
log_level debug
</system>
<source>
@type kafka_group
brokers kafka:9093
topics source-topic
format json
consumer_group fluentd-group
</source>
<filter source-topic>
@type record_transformer
enable_ruby true
<record>
tag_from = ${tag}
processed_at ${time.strftime('%Y-%m-%dT%H:%M:%S%z')}
upper_message ${record["message"].upcase}
</record>
remove_keys timestamp
</filter>
<match source-topic>
@type kafka2
brokers kafka:9093
default_topic sink-topic
use_event_time true
compression_codec gzip
<format>
@type json
</format>
<buffer topic>
@type memory
flush_interval 3s
</buffer>
</match>
후에 fluentd 도커만 다시 실행해보면 다음과 같이 출력된다.
끝!