본문 바로가기

server/kafka

ksqldb

 

1. ksqlDB?

ksqlDB는 Apache Kafka®를 위해 특별히 제작된 이벤트 스트리밍 데이터베이스,  Kafka 토픽에 저장된 실시간 데이터 스트림을 대상으로, 마치 관계형 데이터베이스(RDB)의 테이블을 다루듯 SQL을 사용하여 데이터를 처리하고 분석할 수 있게 해준다. Java나 Scala 같은 프로그래밍 언어로 복잡한 코드를 작성하는 대신, 선언적인 SQL 구문으로 스트림 처리 애플리케이션을 만들 수 있도록 지원하는 것이 핵심이다.

 

전통적으로 Kafka의 데이터를 처리하기 위해서는 Kafka Streams나 다른 스트림 처리 프레임워크를 사용하여 복잡한 애플리케이션을 개발해야 했다. 이는 상당한 학습 비용과 개발 시간을 요구했으며, 데이터 분석가나 SQL에 익숙한 개발자들이 스트림 데이터에 접근하기 어렵게 만들었다. ksqlDB는 이러한 복잡성을 해결하기 위해 등장했다. Kafka Streams의 강력한 처리 능력 위에 사용하기 쉬운 SQL 인터페이스를 얹어, 더 많은 사람이 스트림 처리 기술의 혜택을 누릴 수 있도록 장벽을 낮추었다.

 

언제 ksqlDB를 사용해야 하는가?

  • 실시간 모니터링 및 이상 탐지: 시스템 로그, 사용자 활동, IoT 센서 데이터를 실시간으로 분석하여 특정 패턴이나 비정상적인 이벤트를 즉시 감지하고 알림을 보낼 때
  • 스트리밍 ETL: 여러 소스로부터 들어오는 원본 데이터를 실시간으로 정제, 변환, 보강하여 다른 시스템(예: 데이터 웨어하우스, 검색 엔진)으로 적재하는 데이터 파이프라인을 구축할 때
  • 실시간 분석 대시보드: 끊임없이 변화하는 데이터를 기반으로 최신 상태를 유지하는 머티리얼라이즈드 뷰(Materialized View)를 생성하고, 이를 통해 동적인 대시보드를 구현할 때
  • 이벤트 기반 마이크로서비스: 서비스 간의 데이터 흐름을 SQL을 통해 간단하게 정의하고 구현하여 이벤트 기반 아키텍처를 손쉽게 구축하고자 할 때

 


실습

 

1단계 서버 실행

docker-compose.yaml

version: '3.8'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.3.0
    container_name: zookeeper
    networks:
      - kafka-net
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  kafka:
    image: confluentinc/cp-kafka:7.3.0
    container_name: kafka
    ports:
      # 외부(로컬 PC)에서 접속할 포트
      - "29092:29092"
    depends_on:
      - zookeeper
    networks:
      - kafka-net
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181' # 내부/외부 리스너 설정
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
      KAFKA_LISTENERS: INTERNAL://0.0.0.0:9092,EXTERNAL://0.0.0.0:29092
      KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka:9092,EXTERNAL://localhost:29092
      KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL # ksqlDB가 내부적으로 사용할 토픽을 위한 설정
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1

  ksqldb-server:
    image: confluentinc/cp-ksqldb-server:7.3.0
    container_name: ksqldb-server
    ports:
      - "8088:8088"
    depends_on:
      - kafka
    networks:
      - kafka-net
    environment:
      KSQL_LISTENERS: http://0.0.0.0:8088
      KSQL_BOOTSTRAP_SERVERS: kafka:9092 # 내부 리스너로 접속
      KSQL_KSQL_LOGGING_PROCESSING_STREAM_AUTO_CREATE: "true"
      KSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE: "true"
      KSQL_KSQL_SERVICE_ID: ksql-service

  ksqldb-cli:
    image: confluentinc/cp-ksqldb-cli:7.3.0
    container_name: ksqldb-cli
    depends_on:
      - ksqldb-server
    networks:
      - kafka-net
    tty: true # CLI 세션을 유지하기 위함

  kafbat:
    image:  ghcr.io/kafbat/kafka-ui:latest
    container_name: kafbat
    ports:
      - "8080:8080"
    depends_on:
      - kafka
      - zookeeper
    networks:
      - kafka-net
    environment:
      KAFKA_CLUSTERS_0_NAME: local-kafka
      KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9092 
      KAFKA_CLUSTERS_0_ZOOKEEPER: zookeeper:2181
      KAFKA_CLUSTERS_0_KSQLDBSERVER: http://ksqldb-server:8088
      DYNAMIC_CONFIG_ENABLED: 'true'


networks:
  kafka-net:
    driver: bridge

 

 

 

 

 

2단계: 테스트 데이터 준비

토픽추가 및 데이터 입력

$ docker exec -i kafka kafka-console-producer --broker-list kafka:9092 --topic user_clicks_topic
{"user_id": "user-1", "url": "/home", "timestamp": "2025-07-12T14:00:00Z"}
{"user_id": "user-2", "url": "/products/a-123", "timestamp": "2025-07-12T14:01:00Z"}
{"user_id": "user-1", "url": "/cart", "timestamp": "2025-07-12T14:01:30Z"}
{"user_id": "user-3", "url": "/products/b-456", "timestamp": "2025-07-12T14:02:00Z"}

 

 

$ docker exec -i kafka kafka-console-producer --broker-list kafka:9092 --topic user_profiles_topic --property "parse.key=true" --property "key.separator=:"
user-1:{"name": "Alice", "city": "Seoul"}
user-2:{"name": "Bob", "city": "Busan"}
user-3:{"name": "Charlie", "city": "Incheon"}

 

 

 

ksqlDB에 접속

docker exec -it ksqldb-cli ksql http://ksqldb-server:8088
$ docker exec -it ksqldb-cli ksql http://ksqldb-server:8088

                  ===========================================
                  =       _              _ ____  ____       =
                  =      | | _____  __ _| |  _ \| __ )      =
                  =      | |/ / __|/ _` | | | | |  _ \      =
                  =      |   <\__ \ (_| | | |_| | |_) |     =
                  =      |_|\_\___/\__, |_|____/|____/      =
                  =                   |_|                   =
                  =        The Database purpose-built       =
                  =        for stream processing apps       =
                  ===========================================

Copyright 2017-2022 Confluent Inc.

CLI v7.3.0, Server v7.3.0 located at http://ksqldb-server:8088
Server Status: RUNNING

Having trouble? Type 'help' (case-insensitive) for a rundown of how things work!

접속하면 ksql> 프롬프트가 나타납니다. 이제부터 모든 ksql> 명령어는 이 창에서 실행

 

'user_profiles_topic'  토픽 데이터를 보고 싶다면 다음과 같이 실행 

ksql> PRINT 'user_profiles_topic' FROM BEGINNING;


Key format: KAFKA_STRING
Value format: JSON or KAFKA_STRING
rowtime: 2025/07/12 05:03:25.326 Z, key: user-1, value: {"name": "Alice", "city": "Seoul"}, partition: 0
rowtime: 2025/07/12 05:03:25.338 Z, key: user-2, value: {"name": "Bob", "city": "Busan"}, partition: 0

 

 


 

3단계 스트리밍 데이터 생성

  CREATE STREAM CREATE TABLE
개념 이벤트 스트림 (append-only) 상태 저장 테이블 (key-value store)
처리 방식 이벤트가 시간순으로 계속 발생 각 키에 대해 가장 최신 상태만 유지
용도 실시간 로그, 트랜잭션, 클릭 스트림 사용자 상태, 계좌 잔액, 설정 정보
토픽 메시지 모든 이벤트 유지 키가 동일하면 덮어씀 (compacted topic)
주문 이벤트, 센서 데이터 사용자 정보, 현재 재고 상태

 

CREATE STREAM user_clicks (
  user_id VARCHAR,
  url VARCHAR,
  timestamp VARCHAR
) WITH (
  KAFKA_TOPIC='user_clicks_topic',
  VALUE_FORMAT='JSON'
);

 

 

이번엔 데이터를 실시간으로 넣으면서 확인해 보자.

랜덤하게 데이터를 생성해준다. 

 

 

producer.py

import json
import random
import time
from datetime import datetime
from kafka.vendor import six
from six.moves import range

from kafka import KafkaProducer

KAFKA_BROKER = 'localhost:29092' 
TOPIC_NAME = 'user_clicks_topic'

producer = KafkaProducer(
    bootstrap_servers=[KAFKA_BROKER],
    key_serializer=str.encode,
    value_serializer=lambda v: json.dumps(v).encode('utf-8')
)

# 샘플 데이터 생성
USER_IDS = [f'user_{i}' for i in range(1, 11)]
URLS = ['/main', '/products/101', '/products/205', '/cart', '/checkout/payment', '/mypage/orders']


try:
    while True:
        user_id = random.choice(USER_IDS)
        url = random.choice(URLS)
        
        click_event = {
            'USER_ID': user_id,
            'URL': url,
            'TIMESTAMP': datetime.now().isoformat() # ISO 8601 형식의 타임스탬프
        }
        
        producer.send(TOPIC_NAME, key=user_id, value=click_event)
        print(f"Sent: {click_event}")
        
        producer.flush()
        
        time.sleep(1)

except KeyboardInterrupt:
    print("\nProducer를 종료합니다.")
finally:
    producer.close()

 

 

결과물 확인

ksql> SELECT * FROM user_clicks WHERE url LIKE '%/products/%' EMIT CHANGES;

 

 

 

 

 

ksqlDB의 규칙을 명확히 해야 한다. CREATE TABLE ... AS SELECT ... 같은 영속적인 쿼리에서는 Kafka 토픽 이름을 FROM 절에 직접 사용할 수 없습니다.

먼저 해당 토픽의 구조(스키마)를 ksqlDB에 알려주는 STREAM을 선언해야 한다. 이 STREAM은 데이터를 복사하거나 새로 만드는 것이 아니라, 단순히 ksqlDB가 해당 토픽을 SQL로 조회할 수 있도록 "이름과 컬럼 정보를 등록"하는 역할을 한다.

 

CREATE TABLE user_profiles (
  user_id VARCHAR PRIMARY KEY,
  name VARCHAR,
  city VARCHAR
) WITH (
  KAFKA_TOPIC='user_profiles_topic',
  VALUE_FORMAT='JSON'
);

 

 

 

 

 

4단계: 실시간 필터링 및 조인 쿼리 실행

 

 

이제 이 예시의 핵심인, "상품 상세 페이지(/products/...)를 본 사용자"를 실시간으로 찾은다음, user_profiles 테이블과 조인하여 데이터를 보강하는 연속 쿼리를 실행한다

 

다음 쿼리는 PRODUCT_VIEWERS_ENRICHED 라는 새로운 스트림을 생성하여 조인된 데이터를 확인할수 있다. 

CREATE STREAM PRODUCT_VIEWERS_ENRICHED AS
SELECT
  c.user_id AS user_id,
  u.name AS user_name,
  u.city AS user_city,
  c.url AS product_url,
  c.timestamp AS view_time
FROM user_clicks c
LEFT JOIN user_profiles u ON c.user_id = u.user_id
WHERE c.url LIKE '%/products/%'
EMIT CHANGES;

 

 

PRINT 'PRODUCT_VIEWERS_ENRICHED' FROM BEGINNING;

데이터를 계속 밀어넣고 있으므로, 조건이 맞는 (products 로 호출되는 로그) 를 계속해서 검색되어 볼 수 있다. 

 

 

 

PRODUCT_VIEWERS_ENRICHED는 Kafka 토픽이 아니라 ksqlDB의 스트림(Stream) 또는 테이블(Table)의 이름입니다.


ksqlDB 객체와 Kafka 토픽의 관계

ksqlDB에서 사용하는 스트림과 테이블은 Kafka 토픽 위에 만들어진 논리적인 추상화 계층이다

  1. 스트림/테이블: SQL로 데이터를 다루기 쉽게 만든 ksqlDB의 객체입니다. PRINT, SELECT, WHERE 등의 구문을 사용할 수 있다
  2. Kafka 토픽: 실제 데이터가 저장되는 물리적인 공간

보통 ksqlDB 스트림/테이블은 특정 Kafka 토픽을 읽거나 쓰도록 연결되어 있다

 

ksql> DESCRIBE PRODUCT_VIEWERS_ENRICHED;

Name                 : PRODUCT_VIEWERS_ENRICHED
 Field       | Type
--------------------------------------
 USER_ID     | VARCHAR(STRING)  (key)
 USER_NAME   | VARCHAR(STRING)
 USER_CITY   | VARCHAR(STRING)
 PRODUCT_URL | VARCHAR(STRING)
 VIEW_TIME   | VARCHAR(STRING)
--------------------------------------
For runtime statistics and query details run: DESCRIBE <Stream,Table> EXTENDED;

 

 

기존 스트림 삭제 

DROP STREAM IF EXISTS PRODUCT_VIEWERS_ENRICHED;

 

 

결과물을 스트림으로만 볼수 있었지만, 이번엔 스트림데이터를 토픽에 저장할수 있다.

CREATE STREAM PRODUCT_VIEWERS_ENRICHED
WITH (
    KAFKA_TOPIC = 'product_viewers_enriched_topic', -- 결과를 저장할 새로운 Kafka 토픽
    VALUE_FORMAT = 'JSON'
) AS
SELECT
    P.USER_ID AS USER_ID,
    P.NAME AS USER_NAME, 
    P.CITY AS USER_CITY, 
    C.URL AS VIEWED_PRODUCT_URL,
    C.TIMESTAMP AS VIEW_TIMESTAMP
FROM
    user_clicks C
LEFT JOIN
    user_profiles P ON C.USER_ID = P.USER_ID
WHERE C.URL LIKE '%/products/%'
EMIT CHANGES;

 

해당 토픽을 보면 스트림의 데이터가 저장되고 있다.

PRINT 'product_viewers_enriched_topic' FROM BEGINNING;

 

 

토픽에 데이터가 있으므로, consumer를 통해서 원하는 곳에서 데이터를 가져올수 있다. 

consumer.py

import json
from kafka import KafkaConsumer

KAFKA_BROKER = 'localhost:29092' 
TOPIC_NAME = 'product_viewers_enriched_topic'

# KafkaConsumer를 생성합니다.
consumer = KafkaConsumer(
    TOPIC_NAME, 
    bootstrap_servers=[KAFKA_BROKER],
    auto_offset_reset='earliest', 
    value_deserializer=lambda m: json.loads(m.decode('utf-8'))
)

print(f"'{TOPIC_NAME}' 토픽의 메시지를 수신합니다. Ctrl+C를 눌러 종료하세요.")

try:
    for message in consumer:
        print(f"Partition: {message.partition}, Offset: {message.offset}, Value: {message.value}")
        
except KeyboardInterrupt:
    print("\nConsumer를 종료합니다.")
finally:
    consumer.close()

 

 

 

 

 

 

 

 

끝!

 

 

 

 

 

 

 

 

 

'server > kafka' 카테고리의 다른 글

kafka-reassign-partitions  (3) 2025.07.30
fluentd grpc ruby 버전 에러  (3) 2025.07.29
kafka reset offset  (0) 2025.07.13
ELK 실습  (0) 2025.04.28
kafka -> fluentd -> kafka 로 데이터 전송하기 (local)  (0) 2025.03.19