저번 글에 이어서 kafka를 이용해서 구현해 보겠습니다. (지난번 포스팅 : https://uiandwe.tistory.com/92183 )
큐 역활을 했던 redis 대신 kafka를 사용하는 이유는 대용량 메시지처리와 함께 팬아웃(갑작스런 대규모 요청) 등을 빠르고 안전하게 서비스 하기 위해 kafka를 사용합니다. 단점으로는 kafka를 운영하는 시간과 filaover 지점이 늘어나게 됩니다.
주요 구성 요소
- FastAPI 웹 서버: 요청을 받아 작업을 kafka 토픽에 넣습니다.
- kafka: Queue 역할을 합니다.
- Worker: 토픽에서 작업을 가져와 처리합니다. 처리 완료확인을 위해 redis에 데이터를 넣습니다.
- redis : 캐시 역활
Kafka는 메시지를 처리한 후 상태를 저장하는 기능이 기본적으로 제공되지 않으므로, 상태 확인과 작업 완료 처리를 위해 별도의 저장소(예: Redis, 데이터베이스)를 사용해야 합니다. (만일 굳이 카프카를 써야 한다면 새로운 토픽을 생성하고 관리해줘야 합니다.)
docker-compose는 다음과 같습니다. 주키퍼는 카프카를 관리감독하는 역활을 합니다.
version: '3.8'
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.5.0
container_name: zookeeper
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ports:
- "2181:2181"
kafka:
image: confluentinc/cp-kafka:7.5.0
container_name: kafka
depends_on:
- zookeeper
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
ports:
- "9092:9092"
redis:
image: redis:latest
container_name: redis
ports:
- "6379:6379"
웹서버는 다음과 같습니다.
기존 레디스만 있던 코드에서 카프카부분만 추가되었습니다.
import redis
import json
import uuid
from fastapi import APIRouter, BackgroundTasks
from confluent_kafka import Producer
router = APIRouter(prefix="/enqueue")
# Kafka Producer 설정
kafka_config = {
"bootstrap.servers": "localhost:9092"
}
producer = Producer(kafka_config)
# 작업 추가 엔드포인트
@router.post("")
async def enqueue_task(data: dict, background_tasks: BackgroundTasks):
task_id = str(uuid.uuid4()) # 고유한 Task ID 생성
task_data = {"id": task_id, "data": data, "status": "queued"}
# Kafka로 메시지 전송
producer.produce("task_topic", key=task_id, value=json.dumps(task_data))
producer.flush()
return {"message": "Task enqueued", "task_id": task_id}
from redis import Redis
# Redis 클라이언트 초기화
redis_client = Redis(host="localhost", port=6379, decode_responses=True)
@router.get("/task/{task_id}")
async def get_task_status(task_id: str):
task_data = redis_client.get(task_id)
if task_data:
task = json.loads(task_data)
# 작업 상태를 'completed'로 업데이트
task["status"] = "completed"
redis_client.set(task["id"], json.dumps(task))
# 작업 데이터 삭제
print(f"Deleting task {task['id']} from Redis.")
redis_client.delete(task["id"])
return {"task_id": task["id"], "status": task["status"]}
return {"message": "Task not found", "task_id": task_id}
다음으로 워커 부분입니다.
워커에서는 큐에서 가져오는 부분은 카프카에서, 처리 후 완료되는 부분은 레디스로 처리하였습니다.
from confluent_kafka import Consumer, KafkaException
from redis import Redis
import json
import time
# Kafka Consumer 설정
kafka_config = {
"bootstrap.servers": "localhost:9092",
"group.id": "worker_group",
"auto.offset.reset": "earliest"
}
consumer = Consumer(kafka_config)
# Redis 클라이언트 초기화
redis_client = Redis(host="localhost", port=6379, decode_responses=True)
# Kafka 토픽 구독
consumer.subscribe(["task_topic"])
def process_task(task_data):
task_id = task_data["id"]
# 작업 상태를 'processing'으로 업데이트
task_data["status"] = "processing"
redis_client.set(task_id, json.dumps(task_data))
print(f"[PROCESSING] Task {task_id}: {task_data['data']}")
time.sleep(2) # 작업 처리 시간 시뮬레이션
# 작업 상태를 'completed'으로 업데이트
task_data["status"] = "completed"
redis_client.set(task_id, json.dumps(task_data))
print(f"[DONE] Task {task_id} completed.")
try:
print("Worker is running and waiting for tasks...")
while True:
msg = consumer.poll(1.0) # 메시지 폴링
if msg is None:
continue # 메시지가 없으면 대기
if msg.error():
if msg.error().code() == KafkaException._PARTITION_EOF:
continue
else:
print(f"[ERROR] {msg.error()}")
break
# 메시지 처리
task_data = json.loads(msg.value().decode("utf-8"))
process_task(task_data)
except KeyboardInterrupt:
print("Worker stopped.")
finally:
consumer.close()
동작 방식은 기존과 같습니다. 먼저 사용자가 웹서버로 post요청시 task id가 발급됩니다.
워커에서는 토픽에 들어온 테스크를 처리 후 완료 메시지를 레디스에 저장합니다.
다시 웹서버에서 /task/{task_id}를 통해 해당 task가 완료 되었는지 확인합니다.
이제 대용량 서버 작업이나, 대규모 요청을 처리해야 하는 로직이 필요하다면 위와 같은 구성을 통해 서버 아키텍처를 구현하여 사용하면 됩니다.
끝
'server > system design' 카테고리의 다른 글
postgreSQL CDC를 활용한 엘라스틱서치로 데이터 실시간 연동 (0) | 2025.01.14 |
---|---|
redis pub/sub을 이용한 서버 아키텍처 구현 (0) | 2025.01.08 |
[25 Computer Papers] 4. Cassandra - A Decentralized Structured Storage System (0) | 2024.09.21 |
레디스 트러블 슈팅 (1) | 2024.09.19 |
[25 Computer Papers] 3. Bigtable: A Distributed Storage System for Structured Data (0) | 2024.09.11 |