본문 바로가기

server/system design

redis pub/sub을 이용한 서버 아키텍처 구현

Pub/Sub(발행-구독) 아키텍처는 비동기적 데이터 흐름확장성이 필요한 상황에서 주로 사용됩니다.일반적으로 다음의 상황들에 적용하게 됩니다.

- 이벤트 기반 시스템 ( 이메일/푸시 알림 , 데이터 실시간 업데이트)

- 마이크로서비스 간 통신 (주문 처리 시스템, 사용자 인증)

- 스트리밍 데이터 처리

- 데이터 동기화

- 장애 복구 및 재처리 (결제 시스템)

- 확장 가능한 비동기 작업 처리 ( 백그라운드 처리 )

 

 

이번 예제에서는 Redis와 Python의 FastAPI를 사용하여 비동기 웹 서버를 만들고 관리하는 설정을 구성할 수 있습니다. 아래는 전체적인 구성 예제입니다.


주요 구성 요소

  1. FastAPI 웹 서버: 요청을 받아 작업을 Queue에 넣습니다.
  2. Redis: Queue 역할을 합니다.
  3. Worker: Redis에서 작업을 가져와 처리합니다.

 

(queue로 하면 되는데, 왜 pub/sub으로 구현했는지는 마지막 세션의 kafka로의 확장을 위함입니다. 하지만 pub/sub으로 하면 work가 여러개(멀티프로세싱)일 경우 같은 task를 여러번 처리하게 되므로, 중복 처리 로직이 들어가야 하는 단점이 생깁니다. - queue로 하면 중복처리는 안해도 되지만, 확장이 어려워집니다)


 

참고로 웹서버와 워커 로직의 분리한 이유는 다음과 같습니다.

- 비동기 작업 처리

  • 웹 서버는 사용자 요청에 즉각 응답해야 합니다. 하지만 작업(예: 데이터 처리, 파일 업로드 등)이 오래 걸리면 응답이 지연됩니다.
  • 워커를 사용하면 웹 서버는 작업을 큐에 넣고 즉시 응답할 수 있으며, 작업은 워커가 처리합니다.

- 시스템 확장성

  • 웹 서버와 워커를 분리하면 각각 독립적으로 확장 가능합니다.
    • 요청량이 많아지면 웹 서버를 추가.
    • 작업량이 많아지면 워커를 추가.
  • 이를 통해 효율적인 리소스 사용과 고가용성을 보장합니다.

-   장애 격리

  • 작업 중 오류가 발생하더라도 워커만 영향을 받으며, 웹 서버는 계속 동작할 수 있습니다.

 

1. redis 실행

(로컬 실행이기 때문에 따로 보안 설정은 하지 않았습니다.)

docker-compose 로 레디스를 실행합니다.

version: "3.9"

services:
  redis:
    image: "redis:alpine"
    ports:
      - "6379:6379"

 

2. 웹서버

먼저 도커로 실행된 레디스에 클라이언트를 접속하게 됩니다.

/enqueue : 사용자 요청을  API 호출로 받은 후 redis에 저장

/task/{task_id} : 해당 task의 상태 확인 및 완료 시 redis에서 삭제

from fastapi import FastAPI
import redis
import json
import uuid

app = FastAPI()

# Redis 연결
redis_client = redis.Redis(host="localhost", port=6379, decode_responses=True)

@app.post("/enqueue/")
async def enqueue_task(data: dict):
    task_id = str(uuid.uuid4())  # 고유한 Task ID 생성
    task_data = {"id": task_id, "data": data, "status": "queued"}
    
    # Redis에 작업 상태 저장 (key: task_id, value: task_data)
    redis_client.set(task_id, json.dumps(task_data), ex=3600)  # 1시간 후 자동 삭제
    
    # Redis Pub/Sub 채널로 작업 발행
    redis_client.publish("task_channel", json.dumps(task_data))
    
    return {"message": "Task published", "task_id": task_id}

@app.get("/task/{task_id}")
async def get_task_status(task_id: str):
    # Redis에서 작업 상태 조회
    task_data = redis_client.get(task_id)
    if task_data:
        task = json.loads(task_data)
        
        # 작업 상태를 'completed'로 업데이트
        task["status"] = "completed"
        redis_client.set(task["id"], json.dumps(task))

        # 작업 데이터 삭제
        print(f"Deleting task {task['id']} from Redis.")
        redis_client.delete(task["id"])
        
        return {"task_id": task["id"], "status": task["status"], "data": task["data"]}
    return {"message": "Task not found", "task_id": task_id}

 

 

3. worker

워커의 경우 웹서버와 따로 실행되게 됩니다. sub을 통해 계속해서 레디스의 데이터를 처리하게 됩니다.

import redis
import json

redis_client = redis.StrictRedis(host='localhost', port=6379, decode_responses=True)

# 메시지 처리 함수
def process_task(task):
    task_data = json.loads(task)
    task_id = task_data["id"]

    # 중복 확인: 작업 ID가 이미 처리된 Set에 있는지 검사
    if redis_client.sismember("processed_tasks", task_id):
        print(f"[SKIP] Task {task_id} already processed.")
        return

    # 작업 처리
    print(f"[PROCESSING] Task {task_id} is being processed.")

    # 작업 완료 기록
    redis_client.sadd("processed_tasks", task_id)
    print(f"[DONE] Task {task_id} completed.")


pubsub = redis_client.pubsub()
pubsub.subscribe("task_channel")

print("Worker is running and waiting for tasks...")

# Pub/Sub 메시지 수신 및 처리
for message in pubsub.listen():
    if message["type"] == "message":
        process_task(message["data"])

 

 

 

 

전체적인 실행 결과는 다음과 같습니다.

1. 사용자가 /enqueue API를 호출합니다. uuid로 task_id가 발급되며, 해당 task_id를 통해 상태를 확인하게 됩니다.

 

2. worker에서 레디스에 데이터가 들어오면 처리하게 됩니다.

 

(참고로 다른 워커에서는 해당 task를 받았지만 이미 처리중이므로 skip 됩니다)

 

3. 클라쪽에서는 /task/{task_id}로 계속해서 완료되었는지 API를 호출하게 됩니다.

(완료가 되었다면 redis에서도 삭제하게 됩니다.)

 

 

 

번외

pub/sub을 redis 대신에 Kafka를 사용하는 경우

카프카의 경우 분산 메시징에 특화되어 있습니다. 대규모 서비스를 원한다면 kafka를 사용해야 겠지만, 확장성과 장애 관련 처리를 고려해서 선택해야 합니다.

Redis vs Kafka

주요 목적 데이터 구조 기반 메시징 및 캐싱 분산 메시징 및 로그 스트리밍
Pub/Sub 지원
확장성 단일 노드 확장에 한계 분산 환경에서 무한 확장 가능
메시지 영속성 선택적 (RDB 또는 AOF 설정) 기본적으로 영속성 지원
적합한 사용 사례 실시간 알림, 간단한 작업 큐 대규모 분산 작업, 데이터 파이프라인

 

위의 코드에서는 redis접속 부분을 카프카로 변경해주면 적용이 가능해집니다.

다음번에는 카프카를 통한 데이터 처리를 포스팅해보겠습니다.