본문 바로가기

server/kafka

[kafka] kafka connect to es

 

mac(OS X, 특히 Apple Silicon) 환경에서 minikube 위에 Kafka 클러스터를 올려보려고 했는데,
이미지 아키텍처 문제로 실행이 안되었다. docker-compose로 진행한다. 

 

 

Kafka 3노드(KRaft) 클러스터 구성

먼저 Kafka 3대와 Connect 컨테이너를 올리는 compose 설정이다.

version: '3.8'

services:
  kafka-1:
    image: confluentinc/cp-kafka:7.5.1
    platform: linux/arm64
    container_name: kafka-1
    ports:
      - "9092:9092"
    environment:
      CLUSTER_ID: "HhJtYRlRR5ujD1e5KuBL9Q"

      KAFKA_NODE_ID: 1
      KAFKA_PROCESS_ROLES: broker,controller
      KAFKA_CONTROLLER_QUORUM_VOTERS: "1@kafka-1:29093,2@kafka-2:29093,3@kafka-3:29093"


      KAFKA_LISTENERS: INTERNAL://0.0.0.0:19092,EXTERNAL://0.0.0.0:9092,CONTROLLER://0.0.0.0:29093
      KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka-1:19092,EXTERNAL://localhost:9092

      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
      KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER

      KAFKA_LOG_DIRS: /tmp/kraft-logs
    networks:
      - kafka-net

  kafka-2:
    image: confluentinc/cp-kafka:7.5.1
    platform: linux/arm64
    container_name: kafka-2
    ports:
      - "9093:9092"
    environment:
      CLUSTER_ID: "HhJtYRlRR5ujD1e5KuBL9Q"

      KAFKA_NODE_ID: 2
      KAFKA_PROCESS_ROLES: broker,controller
      KAFKA_CONTROLLER_QUORUM_VOTERS: "1@kafka-1:29093,2@kafka-2:29093,3@kafka-3:29093"

      KAFKA_LISTENERS: INTERNAL://0.0.0.0:19092,EXTERNAL://0.0.0.0:9092,CONTROLLER://0.0.0.0:29093
      KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka-2:19092,EXTERNAL://localhost:9093

      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
      KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER

      KAFKA_LOG_DIRS: /tmp/kraft-logs
    networks:
      - kafka-net

  kafka-3:
    image: confluentinc/cp-kafka:7.5.1
    platform: linux/arm64
    container_name: kafka-3
    ports:
      - "9094:9092"
    environment:
      CLUSTER_ID: "HhJtYRlRR5ujD1e5KuBL9Q"

      KAFKA_NODE_ID: 3
      KAFKA_PROCESS_ROLES: broker,controller
      KAFKA_CONTROLLER_QUORUM_VOTERS: "1@kafka-1:29093,2@kafka-2:29093,3@kafka-3:29093"

      KAFKA_LISTENERS: INTERNAL://0.0.0.0:19092,EXTERNAL://0.0.0.0:9092,CONTROLLER://0.0.0.0:29093
      KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka-3:19092,EXTERNAL://localhost:9094

      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
      KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER

      KAFKA_LOG_DIRS: /tmp/kraft-logs
    networks:
      - kafka-net

  connect:
    image: confluentinc/cp-kafka-connect:7.5.1
    platform: linux/arm64
    container_name: connect
    depends_on:
      - kafka-1
      - kafka-2
      - kafka-3
    ports:
      - "8083:8083"
    environment:
      CONNECT_BOOTSTRAP_SERVERS: "kafka-1:19092,kafka-2:19092,kafka-3:19092"
      CONNECT_REST_ADVERTISED_HOST_NAME: "connect"
      CONNECT_REST_PORT: 8083
      CONNECT_GROUP_ID: "connect-cluster"
      CONNECT_CONFIG_STORAGE_TOPIC: "_connect-configs"
      CONNECT_OFFSET_STORAGE_TOPIC: "_connect-offsets"
      CONNECT_STATUS_STORAGE_TOPIC: "_connect-status"
      CONNECT_KEY_CONVERTER: "org.apache.kafka.connect.storage.StringConverter"
      CONNECT_VALUE_CONVERTER: "org.apache.kafka.connect.storage.StringConverter"
      CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_PLUGIN_PATH: "/usr/share/java"
    networks:
      - kafka-net

networks:
  kafka-net:
    driver: bridge

 

 

 

 

Kafka Connect 동작 확인

컨테이너까지 올렸다면 Connect REST API가 떠 있는지 먼저 확인한다.

$ curl localhost:8083

{"version":"7.5.1-ccs","commit":"18394206009bb2f31244031f70fdb5e5826e2408","kafka_cluster_id":"HhJtYRlRR5ujD1e5KuBL9Q"}%

     
= Connect가 Kafka 클러스터와 연결됨
= Connect 부팅 성공
= 분산 모드 정상 작동

 

다음으로 네트워크 레벨에서 Kafka 브로커 포트로 실제 접속이 가능한지 체크

$ docker exec -it connect bash -c "nc -vz kafka-1 19092"

Ncat: Version 7.70 ( https://nmap.org/ncat )
Ncat: Connected to 172.19.0.2:19092.
Ncat: 0 bytes sent, 0 bytes received in 0.01 seconds.

 

 

 

 

Elasticsearch & Kibana 추가 (데이터 싱크 목적)

Kafka → Connect → Elasticsearch 구조를 만들기 위해,
저장소 역할을 할 Elasticsearch와 시각화 도구인 Kibana 컨테이너를 추가한다.


  elasticsearch:
    image: docker.elastic.co/elasticsearch/elasticsearch:7.17.24
    platform: linux/arm64
    container_name: elasticsearch
    environment:
      - discovery.type=single-node
      - xpack.security.enabled=false
      - ES_JAVA_OPTS=-Xms1g -Xmx1g
    ports:
      - "9200:9200"
    networks:
      - kafka-net

  kibana:
    image: docker.elastic.co/kibana/kibana:7.17.24
    platform: linux/arm64
    container_name: kibana
    depends_on:
      - elasticsearch
    environment:
      - ELASTICSEARCH_HOSTS=http://elasticsearch:9200
    ports:
      - "5601:5601"
    networks:
      - kafka-net

 

 

docker-compose 재실행 후 es 동작 확인

 

$ curl localhost:9200

{
  "name" : "b7ecaadffc2a",
  "cluster_name" : "docker-cluster",
  "cluster_uuid" : "koPtOLnPRrenlvOY973Kgg",
  "version" : {
    "number" : "8.11.1",
    "build_flavor" : "default",
    "build_type" : "docker",
    "build_hash" : "6f9ff581fbcde658e6f69d6ce03050f060d1fd0c",
    "build_date" : "2023-11-11T10:05:59.421038163Z",
    "build_snapshot" : false,
    "lucene_version" : "9.8.0",
    "minimum_wire_compatibility_version" : "7.17.0",
    "minimum_index_compatibility_version" : "7.0.0"
  },
  "tagline" : "You Know, for Search"
}

 

 

 

Kafka Connect용 Elasticsearch 플러그인 설치

이제 Kafka Connect → Elasticsearch Sink Connector를 사용하기 위해
Confluent Hub에서 플러그인 ZIP을 받아서 Connect 컨테이너에 마운트한다.

 

 

connect plugin 파일 다운로드 
https://www.confluent.io/hub/confluentinc/kafka-connect-elasticsearch

 

 

플러그인 경로 설정 및 스키마 설정 추가

  connect:
    image: confluentinc/cp-kafka-connect:7.5.1
    platform: linux/arm64
    container_name: connect
    depends_on:
      - kafka-1
      - kafka-2
      - kafka-3
    ports:
      - "8083:8083"
    environment:
      CONNECT_BOOTSTRAP_SERVERS: "kafka-1:19092,kafka-2:19092,kafka-3:19092"

      CONNECT_REST_PORT: 8083
      CONNECT_REST_ADVERTISED_HOST_NAME: "connect"

      CONNECT_GROUP_ID: "connect-cluster"
      CONNECT_CONFIG_STORAGE_TOPIC: "connect-configs"
      CONNECT_OFFSET_STORAGE_TOPIC: "connect-offsets"
      CONNECT_STATUS_STORAGE_TOPIC: "connect-status"

      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1

      CONNECT_KEY_CONVERTER: "org.apache.kafka.connect.storage.StringConverter"
      CONNECT_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE: "false"

      CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.storage.StringConverter"
      CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.storage.StringConverter"


      CONNECT_LOG4J_ROOT_LOGLEVEL: "INFO"

      CONNECT_PLUGIN_PATH: "/usr/share/java,/etc/kafka-connect/jars"

    volumes:
      - test-kafka/plugins/confluentinc-kafka-connect-elasticsearch-15.1.0/lib:/etc/kafka-connect/jars

    networks:
      - kafka-net

 

 

 

Elasticsearch Sink 플러그인 로딩 확인

플러그인이 잘 올라왔는지, Connect REST API로 커넥터 플러그인 목록을 확인한다.

docker-compose 재부팅 후 connect 플러그인 동작 확인 (es 플러그인 생성 확인)

$ curl http://localhost:8083/connector-plugins | jq

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100   427  100   427    0     0   3161      0 --:--:-- --:--:-- --:--:--  3162
[
  {
    "class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
    "type": "sink",
    "version": "15.1.0"
  },
  ......................

 

 

Elasticsearch Sink Connector 생성

이제 실제로 Kafka 토픽에서 데이터를 읽어 Elasticsearch 인덱스로 밀어 넣는
Sink Connector 인스턴스를 만든다.


응답부분의 tasks에 task가 생성되 있어야 한다. 

 curl -X PUT http://localhost:8083/connectors/es-sink/config \
  -H "Content-Type: application/json" \
  -d '{
    "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
    "tasks.max": "1",
    "topics": "test-topic",
    "connection.url": "http://elasticsearch:9200",
    "key.ignore": "true",
    "schema.ignore": "true",

    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable": "false"
  }'

{"name":"es-sink","config":{"connector.class":"io.confluent.connect.elasticsearch.ElasticsearchSinkConnector","tasks.max":"1","topics":"test-topic","connection.url":"http://elasticsearch:9200","key.ignore":"true","schema.ignore":"true","key.converter":"org.apache.kafka.connect.storage.StringConverter","value.converter":"org.apache.kafka.connect.json.JsonConverter","value.converter.schemas.enable":"false","name":"es-sink"},"tasks":[{"connector":"es-sink","task":0}],"type":"sink"}%




Kafka Producer로 테스트 메시지 전송

이제 실제로 Kafka에 메시지를 넣어보고, 그게 ES에 잘 들어오는지 확인한다.
kafka-python을 사용한 간단한 프로듀서 코드 (producer.py)

from kafka import KafkaProducer
import json
import time

producer = KafkaProducer(
    bootstrap_servers=["localhost:9092", "localhost:9093", "localhost:9094"],
    value_serializer=lambda v: json.dumps(v).encode("utf-8")
)

topic = "test-topic"

for i in range(10):
    msg = {"message": f"hello-{i}"}
    producer.send(topic, msg)
    print("Sent:", msg)
    time.sleep(0.5)

producer.flush()
print("Done.")

 

 

 

Elasticsearch에서 데이터 적재 확인

이제 ES에 실제로 문서가 들어갔는지 확인한다.

문서 개수 확인 / 샘플 문서 조회

$  curl -X GET "http://localhost:9200/test-topic/_count?pretty"

{
  "count" : 20,
  "_shards" : {
    "total" : 1,
    "successful" : 1,
    "skipped" : 0,
    "failed" : 0
  }
}



$  curl -X GET "http://localhost:9200/test-topic/_search?size=1&pretty"

{
  "took" : 8,
  "timed_out" : false,
  "_shards" : {
    "total" : 1,
    "successful" : 1,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : {
      "value" : 20,
      "relation" : "eq"
    },
    "max_score" : 1.0,
    "hits" : [
      {
        "_index" : "test-topic",
        "_type" : "_doc",
        "_id" : "test-topic+0+0",
        "_score" : 1.0,
        "_source" : {
          "message" : "hello-0"
        }
      }
    ]
  }
}

 

 

Kibana에서 데이터 확인