본문 바로가기

server/system design

postgreSQL CDC를 활용한 엘라스틱서치로 데이터 실시간 연동

 

전체 소스 : https://github.com/uiandwe/postgresSQL-to-es

 

만일 사용하는 디비가 postgreSQL이면서 실시간 데이터 검색 / 데이터 처리가 일반적으로 CDC를 활용해서 데이터를 필요한 곳으로 이관합니다.

 

물론, PostgreSQL은 강력한 기능과 안정성으로 널리 사용되는 데이터베이스이지만, 실시간 검색이나 데이터 처리를 위해 데이터를 효율적으로 연동할 필요가 있습니다. 특히 대규모 데이터 처리나 빠른 검색 응답이 요구되는 환경에서는 데이터를 엘라스틱서치(Elasticsearch)와 같은 검색 엔진으로 이관해 활용하는 것이 효과적입니다.

이 과정에서 핵심 기술로 사용되는 것이 바로 CDC(Change Data Capture)입니다. CDC는 데이터베이스의 변경 사항을 실시간으로 캡처하여 이를 외부 시스템으로 전달하는 기술로, 데이터 동기화에 필요한 정확성과 최신성을 보장합니다. PostgreSQL의 CDC 기능은 이를 구현하는 데 매우 적합하며, Debezium과 같은 오픈소스 도구를 활용하면 효율적으로 데이터 변경 사항을 추적할 수 있습니다.

이번 글에서는 PostgreSQL의 CDC를 활용해 데이터를 엘라스틱서치로 실시간 연동하는 방법을 단계별로 알아보겠습니다. CDC의 기본 개념부터 설정 방법, 실시간 연동을 구현하는 방법까지 살펴보며, 실시간 데이터 동기화의 이점을 극대화하는 방법을 소개합니다.

 

0. 카프카 커넥터

CDC를 PostgreSQL과 엘라스틱서치 간의 실시간 데이터 연동에 활용하는 데 있어 중요한 역할을 하는 도구 중 하나는 카프카 커넥터(Kafka Connector)입니다. Apache Kafka는 대규모 데이터 처리와 스트리밍을 위한 분산 메시징 시스템으로, 데이터의 흐름을 효율적으로 관리하고 처리하는 데 탁월한 성능을 제공합니다. 카프카는 실시간 데이터 스트리밍을 위한 핵심 기술로 자리잡고 있으며, 다양한 시스템 간에 데이터를 손쉽게 전송할 수 있는 플랫폼을 제공합니다.

 

카프카 커넥터는 Kafka와 외부 시스템 간의 데이터 흐름을 간소화해주는 플러그인입니다. PostgreSQL에서 발생하는 데이터 변경 사항을 Kafka 토픽으로 전송하고, 이후 Kafka 소비자(consumer)가 이를 받아 엘라스틱서치로 전송하는 구조를 구현할 수 있습니다. 이를 통해 실시간 데이터 동기화와 고도화된 검색 기능을 제공할 수 있습니다.

 

Kafka Connect에는 다음과 같은 다양한 커넥터가 제공됩니다.

  • RDBMS(오라클, SQL 서버, DB2, 포스트그레스, MySQL)
  • 클라우드 객체 저장소(Amazon S3, Azure Blob Storage, Google Cloud Storage)
  • 메시지 큐(ActiveMQ, IBM MQ, RabbitMQ)
  • NoSQL 및 문서 저장소(Elasticsearch, MongoDB, Cassandra)
  • 클라우드 데이터웨어하우스(Snowflake, Google BigQuery, Amazon Redshift)

Kafka Connect의 특징

  • Kafka를 사용하여 외부 시스템을 연결함으로써 커넥터의 개발, 배포 및 관리를 간소화합니다.
  • 대규모 클러스터를 배포하고 개발 및 테스트를 위한 설정을 제공하는 분산형 독립형 클러스터입니다.
  • REST API를 사용하여 커넥터를 관리할 수 있습니다.
  • Kafka Connect는 오프셋 커밋 프로세스를 처리하는 데 도움이 됩니다.
  • 기본적으로 Kafka Connect는 분산되어 있으며 확장 가능합니다.

 

처음에 커넥터를 debezium으로 시도했다가 일주일간 안되서 포기했습니다... 예제나 최신 버전에서도 도저희 엘라스틱서치의 커넥터가 설치가 안되더군요 (공식이미지를 사용해도 안되었습니다...) 그냥 포기하고 카프카 커넥터로 진행하였습니다.


 

1. 전체 환경 설정

전체적인 환경은 docker-compose로 만듭니다.

docker-compose.yaml은 다음과 같습니다.

(아래에 나오지만, 플로그인 폴더를 위해 connect-plugin, 디비 저장을 위해 postgres-data, es_data 를 만들어줬습니다.)

version: '3.8'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.3.0
    container_name: zookeeper
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  kafka:
    image: confluentinc/cp-kafka:7.3.0
    container_name: kafka
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

  postgres:
    image: postgres:15
    container_name: postgres
    environment:
      POSTGRES_USER: postgres
      POSTGRES_PASSWORD: postgres
      POSTGRES_DB: exampledb
      PGDATA: /var/lib/postgresql/data
      PGOPTIONS: "-c wal_level=logical -c max_replication_slots=4 -c max_wal_senders=4"
    ports:
      - "5432:5432"
    volumes:
      - ./postgres-data:/var/lib/postgresql/data
    command: >
      postgres -c wal_level=logical
      

  kafka-connect:
    image: confluentinc/cp-kafka-connect:7.4.0
    container_name: kafka-connect
    depends_on:
      - kafka
      - postgres
    ports:
      - 8083:8083
    environment:
      CONNECT_BOOTSTRAP_SERVERS: kafka:9092
      CONNECT_REST_ADVERTISED_HOST_NAME: kafka-connect
      CONNECT_GROUP_ID: "1"
      CONNECT_CONFIG_STORAGE_TOPIC: connect-configs
      CONNECT_OFFSET_STORAGE_TOPIC: connect-offsets
      CONNECT_STATUS_STORAGE_TOPIC: connect-status
      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
      CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
      CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE: "false"
      CONNECT_KEY_CONVERTER_SCHEMAS_ENABLE: "false"
      CONNECT_PLUGIN_PATH: /usr/share/java,/etc/kafka-connect/jars
      CONNECT_DEBEZIUM_DATABASE_INCLUDE: mydb
    volumes:
      - ./connect-plugins:/etc/kafka-connect/jars


  elasticsearch:
    image: docker.elastic.co/elasticsearch/elasticsearch:8.10.2
    container_name: elasticsearch
    environment:
      discovery.type: single-node
      bootstrap.memory_lock: "true"
      xpack.security.enabled: "false"      # 보안 비활성화
      xpack.security.transport.ssl.enabled: "false"
      xpack.security.enrollment.enabled: "false"
      xpack.security.http.ssl.enabled: "false"
      ES_JAVA_OPTS: "-Xms512m -Xmx512m"
    ulimits:
      memlock:
        soft: -1
        hard: -1
    ports:
      - "9200:9200"
    volumes:
      - ./es_data:/usr/share/elasticsearch/data

 

 

2. postgres 설정

 

docker 접속

docker exec -it postgres psql -U postgres -d exampledb

 

PostgreSQL Publication 생성

CREATE PUBLICATION debezium_pub FOR ALL TABLES;

 

 

wal_level 확인

SHOW wal_level;
SHOW max_replication_slots;
SHOW max_wal_senders;

 

 wal_level
-----------
 logical
(1 row)

 max_replication_slots
-----------------------
 4
(1 row)

 max_wal_senders
-----------------
 4
(1 row)

 

만일 wal_level이 logical이 아니라면 debezium 설정시 다음과 같이 에러가 발생합니다.

{"error_code":400,"message":"Connector configuration is invalid and contains the following 1 error(s):\nError while validating connector config: Postgres server wal_level property must be 'logical' but is: 'replica'\nYou can also find the above list of errors at the endpoint /connector-plugins/{connectorType}/config/validate"}

 

 

 

3. 커넥터 확인

커넥터는 카프카에 데이터가 들어왔을시 활성화되는 퍼블리셔라고 보면 됩니다.

각각의 target (여기서는 postgreSQL) 과 sink (여기서는 엘라스틱서치) 로 존재하며 필요한 정보(디비 아이디/비번/토픽 등)를 알려줘서 접속을 통해서 데이터가 이관됩니다.

 

먼저 posetgres 플러그인과 엘라스틱서치 플러그인이있는지 확인합니다. (없다면 ./connect-plugin 폴더에 추가해주면 됩니다.)

curl -s http://localhost:8083/connector-plugins

 

 

 

PostgreSQL 커넥터 등록

register-postgres.json을  PostgreSQL 커넥터를 등록합니다.

설정 파일은 다음과 같습니다.

{
    "name": "inventory-connector",
    "config": {
      "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
      "tasks.max": "1",
      "database.hostname": "postgres",
      "database.port": "5432",
      "database.user": "postgres",
      "database.password": "postgres",
      "database.dbname": "exampledb",
      "database.server.name": "postgres_server",
      "table.include.list": "public.test_table",
      "plugin.name": "pgoutput",
      "slot.name": "debezium",
      "topic.prefix": "postgres_server",
      "publication.name": "debezium_publication",
      "database.history.kafka.bootstrap.servers": "kafka:9092",
      "database.history.kafka.topic": "schema-changes.exampledb",
      "transforms": "unwrap",
      "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
      "transforms.unwrap.add.fields": "op,ts_ms",
      "key.converter": "org.apache.kafka.connect.json.JsonConverter",
      "value.converter": "org.apache.kafka.connect.json.JsonConverter",
      "key.converter.schemas.enable": "false",
      "value.converter.schemas.enable": "false"
    }
  }
curl -X POST -H "Content-Type: application/json" \
    --data @config/register-postgres.json \
    http://localhost:8083/connectors

 

엘라스틱서치 커넥터 등록

register-es-sink 는 다음과 같습니다.

{
  "name": "elasticsearch-sink-connector",
  "config": {
    "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
    "tasks.max": "1",
    "topics": "postgres_server.public.test_table",
    "connection.url": "http://elasticsearch:9200",
    "type.name": "_doc",
    "key.ignore": "true",
    "schema.ignore": "true"
  }
}

 

다음으로 등록하면 됩니다.

curl -X POST -H "Content-Type: application/json" \
--data @./config/register-es-sink.json \
http://localhost:8083/connectors

 

 

 

데이터를 추가하면 카프카에서 새로운 테스크가 발생한것을 볼수 있습니다.

 

kafka             | [2025-01-08 14:58:28,232] INFO [Broker id=1] Add 1 partitions and deleted 0 partitions from metadata cache in response to UpdateMetadata request sent by controller 1 epoch 1 with correlation id 10 (state.change.logger)
kafka             | [2025-01-08 14:58:28,233] TRACE [Controller id=1 epoch=1] Received response UpdateMetadataResponseData(errorCode=0) for request UPDATE_METADATA with correlation id 10 sent to broker kafka:9092 (id: 1 rack: null) (state.change.logger)

 

 

4. 데이터 확인

postgreSQL에 테이블과 데이터를 넣어줍니다. (해당 테이블의 이름은 이미 postgres 커넥터에 정의되어 있습니다. )

-- 1. 테스트용 테이블 생성
CREATE TABLE test_table (
    id SERIAL PRIMARY KEY,
    name VARCHAR(50) NOT NULL,
    description TEXT,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);


-- 2. 샘플 데이터 삽입
INSERT INTO test_table (name, description) VALUES
('Alice', 'First test record'),
('Bob', 'Second test record'),
('Charlie', 'Third test record'),
('Diana', 'Fourth test record'),
('Eve', 'Fifth test record');

 

데이터를 넣는 즉시 카프카에 데이터가 들어가는것을 로그로 확인할수 있습니다.

kafka 토픽과 데이터가 있는지 확인해 줍니다.

$ docker exec -it kafka bash

$ kafka-topics --bootstrap-server localhost:9092 --list
connect-configs
connect-offsets
connect-status
postgres_server.public.test_table

 

해당 토픽의 데이터 확인은 다음과 같습니다.

$ docker exec -it kafka bash -c "kafka-console-consumer --bootstrap-server localhost:9092 --topic postgres_server.public.test_table --from-beginning"


{"id":1,"name":"Alice","description":"First test record","created_at":1736816257075566,"__op":"r","__ts_ms":1736784417004}
{"id":2,"name":"Bob","description":"Second test record","created_at":1736816257075566,"__op":"r","__ts_ms":1736784417009}
{"id":3,"name":"Charlie","description":"Third test record","created_at":1736816257075566,"__op":"r","__ts_ms":1736784417009}
{"id":4,"name":"Diana","description":"Fourth test record","created_at":1736816257075566,"__op":"r","__ts_ms":1736784417010}
{"id":5,"name":"Eve","description":"Fifth test record","created_at":1736816257075566,"__op":"r","__ts_ms":1736784417010}
{"id":6,"name":"Alice","description":"First test record","created_at":1736816677146491,"__op":"r","__ts_ms":1736784417010}
{"id":7,"name":"Alice","description":"First test record","created_at":1736816827563006,"__op":"c","__ts_ms":1736784427944}
{"id":8,"name":"Alice","description":"First test record","created_at":1736816944221514,"__op":"c","__ts_ms":1736784638039}
{"id":9,"name":"test","description":"First test record","created_at":1736816991621822,"__op":"c","__ts_ms":1736784638041}
{"id":10,"name":"test","description":"First test record","created_at":1736817044838026,"__op":"c","__ts_ms":1736784645142}

 

 

마지막으로 엘라스틱서치에 데이터가 들어가 있는지 확인해 줍니다.

curl --location 'http://localhost:9200/postgres_server.public.test_table/_search?pretty=null'


{
    "took": 98,
    "timed_out": false,
    "_shards": {
        "total": 1,
        "successful": 1,
        "skipped": 0,
        "failed": 0
    },
    "hits": {
        "total": {
            "value": 10,
            "relation": "eq"
        },
        "max_score": 1.0,
        "hits": [
            {
                "_index": "postgres_server.public.test_table",
                "_id": "postgres_server.public.test_table+0+8",
                "_score": 1.0,
                "_source": {
                    "__ts_ms": 1736784638041,
                    "name": "test",
                    "description": "First test record",
                    "created_at": 1736816991621822,
                    "id": 9,
                    "__op": "c"
                }
            },
            {
                "_index": "postgres_server.public.test_table",
                "_id": "postgres_server.public.test_table+0+9",
                "_score": 1.0,
                "_source": {
                    "__ts_ms": 1736784645142,
                    "name": "test",
                    "description": "First test record",
                    "created_at": 1736817044838026,
                    "id": 10,
                    "__op": "c"
                }
            },
            ....
        ]
    }
}

 

 

끝!

 

 

참고

https://blog.nashtechglobal.com/kafka-connect-fundamentals/