mac(OS X, 특히 Apple Silicon) 환경에서 minikube 위에 Kafka 클러스터를 올려보려고 했는데,
이미지 아키텍처 문제로 실행이 안되었다. docker-compose로 진행한다.
Kafka 3노드(KRaft) 클러스터 구성
먼저 Kafka 3대와 Connect 컨테이너를 올리는 compose 설정이다.
version: '3.8'
services:
kafka-1:
image: confluentinc/cp-kafka:7.5.1
platform: linux/arm64
container_name: kafka-1
ports:
- "9092:9092"
environment:
CLUSTER_ID: "HhJtYRlRR5ujD1e5KuBL9Q"
KAFKA_NODE_ID: 1
KAFKA_PROCESS_ROLES: broker,controller
KAFKA_CONTROLLER_QUORUM_VOTERS: "1@kafka-1:29093,2@kafka-2:29093,3@kafka-3:29093"
KAFKA_LISTENERS: INTERNAL://0.0.0.0:19092,EXTERNAL://0.0.0.0:9092,CONTROLLER://0.0.0.0:29093
KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka-1:19092,EXTERNAL://localhost:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_LOG_DIRS: /tmp/kraft-logs
networks:
- kafka-net
kafka-2:
image: confluentinc/cp-kafka:7.5.1
platform: linux/arm64
container_name: kafka-2
ports:
- "9093:9092"
environment:
CLUSTER_ID: "HhJtYRlRR5ujD1e5KuBL9Q"
KAFKA_NODE_ID: 2
KAFKA_PROCESS_ROLES: broker,controller
KAFKA_CONTROLLER_QUORUM_VOTERS: "1@kafka-1:29093,2@kafka-2:29093,3@kafka-3:29093"
KAFKA_LISTENERS: INTERNAL://0.0.0.0:19092,EXTERNAL://0.0.0.0:9092,CONTROLLER://0.0.0.0:29093
KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka-2:19092,EXTERNAL://localhost:9093
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_LOG_DIRS: /tmp/kraft-logs
networks:
- kafka-net
kafka-3:
image: confluentinc/cp-kafka:7.5.1
platform: linux/arm64
container_name: kafka-3
ports:
- "9094:9092"
environment:
CLUSTER_ID: "HhJtYRlRR5ujD1e5KuBL9Q"
KAFKA_NODE_ID: 3
KAFKA_PROCESS_ROLES: broker,controller
KAFKA_CONTROLLER_QUORUM_VOTERS: "1@kafka-1:29093,2@kafka-2:29093,3@kafka-3:29093"
KAFKA_LISTENERS: INTERNAL://0.0.0.0:19092,EXTERNAL://0.0.0.0:9092,CONTROLLER://0.0.0.0:29093
KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka-3:19092,EXTERNAL://localhost:9094
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_LOG_DIRS: /tmp/kraft-logs
networks:
- kafka-net
connect:
image: confluentinc/cp-kafka-connect:7.5.1
platform: linux/arm64
container_name: connect
depends_on:
- kafka-1
- kafka-2
- kafka-3
ports:
- "8083:8083"
environment:
CONNECT_BOOTSTRAP_SERVERS: "kafka-1:19092,kafka-2:19092,kafka-3:19092"
CONNECT_REST_ADVERTISED_HOST_NAME: "connect"
CONNECT_REST_PORT: 8083
CONNECT_GROUP_ID: "connect-cluster"
CONNECT_CONFIG_STORAGE_TOPIC: "_connect-configs"
CONNECT_OFFSET_STORAGE_TOPIC: "_connect-offsets"
CONNECT_STATUS_STORAGE_TOPIC: "_connect-status"
CONNECT_KEY_CONVERTER: "org.apache.kafka.connect.storage.StringConverter"
CONNECT_VALUE_CONVERTER: "org.apache.kafka.connect.storage.StringConverter"
CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_PLUGIN_PATH: "/usr/share/java"
networks:
- kafka-net
networks:
kafka-net:
driver: bridge
Kafka Connect 동작 확인
컨테이너까지 올렸다면 Connect REST API가 떠 있는지 먼저 확인한다.
$ curl localhost:8083
{"version":"7.5.1-ccs","commit":"18394206009bb2f31244031f70fdb5e5826e2408","kafka_cluster_id":"HhJtYRlRR5ujD1e5KuBL9Q"}%
= Connect가 Kafka 클러스터와 연결됨
= Connect 부팅 성공
= 분산 모드 정상 작동
다음으로 네트워크 레벨에서 Kafka 브로커 포트로 실제 접속이 가능한지 체크
$ docker exec -it connect bash -c "nc -vz kafka-1 19092"
Ncat: Version 7.70 ( https://nmap.org/ncat )
Ncat: Connected to 172.19.0.2:19092.
Ncat: 0 bytes sent, 0 bytes received in 0.01 seconds.
Elasticsearch & Kibana 추가 (데이터 싱크 목적)
Kafka → Connect → Elasticsearch 구조를 만들기 위해,
저장소 역할을 할 Elasticsearch와 시각화 도구인 Kibana 컨테이너를 추가한다.
elasticsearch:
image: docker.elastic.co/elasticsearch/elasticsearch:7.17.24
platform: linux/arm64
container_name: elasticsearch
environment:
- discovery.type=single-node
- xpack.security.enabled=false
- ES_JAVA_OPTS=-Xms1g -Xmx1g
ports:
- "9200:9200"
networks:
- kafka-net
kibana:
image: docker.elastic.co/kibana/kibana:7.17.24
platform: linux/arm64
container_name: kibana
depends_on:
- elasticsearch
environment:
- ELASTICSEARCH_HOSTS=http://elasticsearch:9200
ports:
- "5601:5601"
networks:
- kafka-net
docker-compose 재실행 후 es 동작 확인
$ curl localhost:9200
{
"name" : "b7ecaadffc2a",
"cluster_name" : "docker-cluster",
"cluster_uuid" : "koPtOLnPRrenlvOY973Kgg",
"version" : {
"number" : "8.11.1",
"build_flavor" : "default",
"build_type" : "docker",
"build_hash" : "6f9ff581fbcde658e6f69d6ce03050f060d1fd0c",
"build_date" : "2023-11-11T10:05:59.421038163Z",
"build_snapshot" : false,
"lucene_version" : "9.8.0",
"minimum_wire_compatibility_version" : "7.17.0",
"minimum_index_compatibility_version" : "7.0.0"
},
"tagline" : "You Know, for Search"
}
Kafka Connect용 Elasticsearch 플러그인 설치
이제 Kafka Connect → Elasticsearch Sink Connector를 사용하기 위해
Confluent Hub에서 플러그인 ZIP을 받아서 Connect 컨테이너에 마운트한다.
connect plugin 파일 다운로드
https://www.confluent.io/hub/confluentinc/kafka-connect-elasticsearch
플러그인 경로 설정 및 스키마 설정 추가
connect:
image: confluentinc/cp-kafka-connect:7.5.1
platform: linux/arm64
container_name: connect
depends_on:
- kafka-1
- kafka-2
- kafka-3
ports:
- "8083:8083"
environment:
CONNECT_BOOTSTRAP_SERVERS: "kafka-1:19092,kafka-2:19092,kafka-3:19092"
CONNECT_REST_PORT: 8083
CONNECT_REST_ADVERTISED_HOST_NAME: "connect"
CONNECT_GROUP_ID: "connect-cluster"
CONNECT_CONFIG_STORAGE_TOPIC: "connect-configs"
CONNECT_OFFSET_STORAGE_TOPIC: "connect-offsets"
CONNECT_STATUS_STORAGE_TOPIC: "connect-status"
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
CONNECT_KEY_CONVERTER: "org.apache.kafka.connect.storage.StringConverter"
CONNECT_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE: "false"
CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.storage.StringConverter"
CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.storage.StringConverter"
CONNECT_LOG4J_ROOT_LOGLEVEL: "INFO"
CONNECT_PLUGIN_PATH: "/usr/share/java,/etc/kafka-connect/jars"
volumes:
- test-kafka/plugins/confluentinc-kafka-connect-elasticsearch-15.1.0/lib:/etc/kafka-connect/jars
networks:
- kafka-net
Elasticsearch Sink 플러그인 로딩 확인
플러그인이 잘 올라왔는지, Connect REST API로 커넥터 플러그인 목록을 확인한다.
docker-compose 재부팅 후 connect 플러그인 동작 확인 (es 플러그인 생성 확인)
$ curl http://localhost:8083/connector-plugins | jq
% Total % Received % Xferd Average Speed Time Time Time Current
Dload Upload Total Spent Left Speed
100 427 100 427 0 0 3161 0 --:--:-- --:--:-- --:--:-- 3162
[
{
"class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"type": "sink",
"version": "15.1.0"
},
......................
Elasticsearch Sink Connector 생성
이제 실제로 Kafka 토픽에서 데이터를 읽어 Elasticsearch 인덱스로 밀어 넣는
Sink Connector 인스턴스를 만든다.
응답부분의 tasks에 task가 생성되 있어야 한다.
curl -X PUT http://localhost:8083/connectors/es-sink/config \
-H "Content-Type: application/json" \
-d '{
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"tasks.max": "1",
"topics": "test-topic",
"connection.url": "http://elasticsearch:9200",
"key.ignore": "true",
"schema.ignore": "true",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false"
}'
{"name":"es-sink","config":{"connector.class":"io.confluent.connect.elasticsearch.ElasticsearchSinkConnector","tasks.max":"1","topics":"test-topic","connection.url":"http://elasticsearch:9200","key.ignore":"true","schema.ignore":"true","key.converter":"org.apache.kafka.connect.storage.StringConverter","value.converter":"org.apache.kafka.connect.json.JsonConverter","value.converter.schemas.enable":"false","name":"es-sink"},"tasks":[{"connector":"es-sink","task":0}],"type":"sink"}%
Kafka Producer로 테스트 메시지 전송
이제 실제로 Kafka에 메시지를 넣어보고, 그게 ES에 잘 들어오는지 확인한다.
kafka-python을 사용한 간단한 프로듀서 코드 (producer.py)
from kafka import KafkaProducer
import json
import time
producer = KafkaProducer(
bootstrap_servers=["localhost:9092", "localhost:9093", "localhost:9094"],
value_serializer=lambda v: json.dumps(v).encode("utf-8")
)
topic = "test-topic"
for i in range(10):
msg = {"message": f"hello-{i}"}
producer.send(topic, msg)
print("Sent:", msg)
time.sleep(0.5)
producer.flush()
print("Done.")
Elasticsearch에서 데이터 적재 확인
이제 ES에 실제로 문서가 들어갔는지 확인한다.
문서 개수 확인 / 샘플 문서 조회
$ curl -X GET "http://localhost:9200/test-topic/_count?pretty"
{
"count" : 20,
"_shards" : {
"total" : 1,
"successful" : 1,
"skipped" : 0,
"failed" : 0
}
}
$ curl -X GET "http://localhost:9200/test-topic/_search?size=1&pretty"
{
"took" : 8,
"timed_out" : false,
"_shards" : {
"total" : 1,
"successful" : 1,
"skipped" : 0,
"failed" : 0
},
"hits" : {
"total" : {
"value" : 20,
"relation" : "eq"
},
"max_score" : 1.0,
"hits" : [
{
"_index" : "test-topic",
"_type" : "_doc",
"_id" : "test-topic+0+0",
"_score" : 1.0,
"_source" : {
"message" : "hello-0"
}
}
]
}
}
Kibana에서 데이터 확인

'server > kafka' 카테고리의 다른 글
| [kafka] KIP-98 Exactly Once Delivery and Transactional Messaging (0) | 2025.11.21 |
|---|---|
| Kafka producer/consumer 튜닝 (acks, batch.size, linger.ms 등) (2) | 2025.08.08 |
| kafka-reassign-partitions (3) | 2025.07.30 |
| fluentd grpc ruby 버전 에러 (3) | 2025.07.29 |
| ksqldb (1) | 2025.07.17 |