본문 바로가기

server/system design

fastapi로 server sent event 구현 실습

 

실시간 주식 데이터를 제공하는 서비스를 생각해보자. 해당 데이터는 주식 시작과 함께 끊임없이 데이터가 변경된다. 그리고 해당 화면을 보는 유저는 계속해서 데이터를 제공받아야 한다.

클아이언트와 서버 개발자는 이걸 어떻게 개발해야 할까? (1초마다 데이터를 갱신한다고 가정하겠다.)

 

아마도 아래의 두가지를 생각할수 있다.

1.  1초마다 서버에 get 요청을 받은 후 화면 갱신

2. 스트림 데이터를 통해 1초마다 데이터를 받은 후 화면 갱신

 

 

1번 "1초마다 서버에 get 요청을 받은 후 화면 갱신" 일 경우 http 커넥션이 계속해서 발생하게 된다. 통신은 기본적으로 steless이므로 서버는 어떤 사용자가 어떤 데이터를 원하는지 계속해서 탐색해야 하는 상황이 된다.

 

2번 "스트림 데이터를 통해 1초마다 데이터를 받은 후 화면 갱신" 일 경우 커넥션 연겨리 지속적으로 유지되며, 클라이언트에서는 데이터를 계속해서 스트리밍하여 받아서 처리하면 됩니다.

 

스트림 데이터를 처리하기 위한 방법으로는 websocket 과 SSE(server-send events) 방식이 있습니다.

이번 포스팅에서는 sse를 통한 서버 구현을 해보겠습니다.

 

 

1. SSE(server-send events)

 

SSE(Server-Sent Events)는 서버와 클라이언트 간에 연결이 지속적으로 유지되는 방식으로 동작합니다. 이는 HTTP/1.1의 단일 연결을 통해 서버에서 클라이언트로 데이터를 푸시하며, 서버는 클라이언트에게 text/event-stream 형식으로 데이터를 스트리밍합니다.

만약 연결이 끊어지더라도, SSE는 기본적으로 클라이언트에서 자동으로 재연결을 시도합니다. (EventSource 객체가 이를 처리)

 

특징으로는 단방향 통신으로 데이터는 서버 → 클라이언트로만 전송됩니다. (클라이언트가 서버로 데이터를 보내려면 별도의 HTTP 요청이 필요) 또한 HTTP/1.1의 keep-alive 메커니즘을 사용하여 연결을 끊지 않고 유지합니다.

 

만일! 양방향 통신이 필요한 경우에는 websocket를 사용해야 합니다. 또한 HTTP기반으로 연결수가 많아지면 다른 방법을 사용해야 합니다.

 

2. 코드

먼저 서버 입니다. 해당 코드는 fastapi로 구현되었습니다.

크게 index.html 호출과 sse를 호출하는것으로 되어 있습니다.

sse는 헤더에 "Connection": "keep-alive",  # 연결 유지 / "Content-Type": "text/event-stream",  # SSE 응답 타입

두가지가 중요한 필드입니다.

from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from fastapi.middleware.cors import CORSMiddleware
from fastapi.staticfiles import StaticFiles
from fastapi.responses import FileResponse
import asyncio
import time

app = FastAPI()

app.mount("/static", StaticFiles(directory="static"), name="static")

# CORS 설정
app.add_middleware(
    CORSMiddleware,
    allow_origins=["http://localhost:8000"], 
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)


async def event_stream():
    """
    SSE를 통해 데이터를 스트리밍합니다.
    """
    try:
        while True:
            for i in range(1, 101):
                yield f"data: {{\"message\": \"Update #{i}\", \"timestamp\": {time.time()}}}\n\n"
                await asyncio.sleep(1)
    except asyncio.CancelledError as e:
        print(e)
        print("클라이언트 연결이 종료되었습니다.")
        raise
    except Exception as e:
        print(f"에러 발생: {e}")

@app.get("/sse")
async def sse_endpoint():

    headers = {
        "Cache-Control": "no-store",  # 데이터 캐싱 방지
        "Connection": "keep-alive",  # 연결 유지
        "Content-Type": "text/event-stream",  # SSE 응답 타입
    }
    return StreamingResponse(event_stream(), headers=headers)

@app.get("/")
async def serve_index():
    return FileResponse("static/index.html")

 

 

다음으로 클라이언트 입니다. 간단하게 스크립트에서 해당 sse api를 호출하는 형식입니다.

eventsource에서 비동기 호출을 통해 계속해서 데이터를 받아서 처리하게 됩니다.

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>Title</title>
</head>
<script>
    const eventSource = new EventSource("http://localhost:8000/sse");

eventSource.onmessage = (event) => {
  console.log("New message:", event.data);
};

eventSource.onerror = (error) => {
  console.error("Error with SSE:", error);
  eventSource.close(); // 오류 발생 시 연결 종료
};


</script>
<body>

</body>
</html>

 

 

3. 실행결과

서버 실행

gunicorn -w 4 -k uvicorn.workers.UvicornWorker main:app --timeout 120 --keep-alive 5

 

그리고 http://localhost:8000 해당 url로 접속해 보면 console 창에 데이터가 찍히고 있는것을 확인할수 있습니다.