본문 바로가기

server/아키텍쳐

rabbitmq 심화 (persistent / cluster)

 

rabbitmq persistent

delivery_mode는 RabbitMQ에서 메시지의 배달 모드를 지정하는 데 사용되는 옵션. 이 옵션은 메시지를 메모리에만 저장할지 아니면 디스크에 영구적으로 저장할지를 결정

  1. Transient
    • 메시지를 메모리에만 저장. RabbitMQ가 종료되면 메모리에 있는 메시지는 모두 삭제됨
    • 메시지는 RabbitMQ 서버의 메모리에서만 보관되므로 메모리가 부족한 경우 메시지 손실이 발생할 수 있음
    • 메시지를 가장 빠르게 처리하고자 할 때 사용
  2. Persistent
    • 메시지를 디스크에 저장. RabbitMQ가 종료되어도 메시지는 디스크에 보존됨.
    • 메시지를 디스크에 영구적으로 저장하여 메시지 손실을 방지할 수 있으며 재부팅 후에도 메시지가 유지되므로 안정성을 높일 수 있다.
    • 메시지의 지속성이 중요하고 메모리가 아니라 디스크에 저장되어야 할 때 사용됨. 신뢰성이나 데이터 손실 방지를 위해 사용된다.

 

 

# 디스크 저장

import pika

# RabbitMQ 서버에 연결합니다.
credentials = pika.PlainCredentials('admin', 'rabbitpassword')
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost', credentials=credentials))
channel = connection.channel()

# 메시지를 저장할 큐를 선언합니다.
channel.queue_declare(queue='disk_persistent_queue', durable=True)

# 메시지를 보냅니다. 메시지는 디스크에 저장됩니다.
channel.basic_publish(exchange='',
                      routing_key='disk_persistent_queue',
                      body='Hello, RabbitMQ!',
                      properties=pika.BasicProperties(
                          delivery_mode=2,  # 메시지를 디스크에 저장합니다. (persistent)
                      ))
print(" [x] Sent 'Hello, RabbitMQ!'")

# 연결을 닫습니다.
connection.close()

 

큐사 생성된거 확인

 

docker-compose를 종료 후 다시 실행해 보면 해당 메시지는 다시 복구 된것을 확인할수 있다. (메모리에 저장되었던 hello큐는 삭제됨)

 

 


rabbitmq cluster

 

rabbitmqctl cluster_status 명령어를 통해 현재 cluster 상태를 확인할 수 있다.

현재는 1개의 노드만을 사용중이다.

 

 

 

 

3개의 노드를 띄우기 위해서 docker-compose를 다음과 같이 수정한다.

version: "3.7"

services:
  rabbitmq1:
    image: rabbitmq:3.8.11-management-alpine
    hostname: my-rabbitmq1 # 호스트 이름 변경
    ports:
      - "5672:5672"
      - "15672:15672"
      - "4369:4369" # EPMD 포트 매핑
    environment:
      RABBITMQ_NODENAME: rabbit@my-rabbitmq1 # 노드 이름 변경
    volumes:
      - ./rabbitmq1:/var/lib/rabbitmq
      - ./rabbitmq1/.erlang.cookie:/var/lib/rabbitmq/.erlang.cookie
    networks:
      - rabbitmq-network

  rabbitmq2:
    image: rabbitmq:3.8.11-management-alpine
    hostname: my-rabbitmq2 # 호스트 이름 변경
    ports:
      - "5673:5672"
      - "15673:15672"
      - "4368:4369" # EPMD 포트 매핑
    environment:
      RABBITMQ_NODENAME: rabbit@my-rabbitmq2 # 노드 이름 변경
    volumes:
      - ./rabbitmq2:/var/lib/rabbitmq
      - ./rabbitmq2/.erlang.cookie:/var/lib/rabbitmq/.erlang.cookie
    networks:
      - rabbitmq-network

  rabbitmq3:
    image: rabbitmq:3.8.11-management-alpine
    hostname: my-rabbitmq3 # 호스트 이름 변경
    ports:
      - "5674:5672"
      - "15674:15672"
      - "4367:4369" # EPMD 포트 매핑
    environment:
      RABBITMQ_NODENAME: rabbit@my-rabbitmq3 # 노드 이름 변경
    volumes:
      - ./rabbitmq3:/var/lib/rabbitmq
      - ./rabbitmq3/.erlang.cookie:/var/lib/rabbitmq/.erlang.cookie
    networks:
      - rabbitmq-network

networks:
  rabbitmq-network:
    driver: bridge

 

 

docker-compose up을 통해 노드를 실행 한 후

rabbitmq2와 rabbitmq3에 접속 후 cluster를 설정해준다. (rabbitmq1에 join한다)

# rabbitmq 앱을 종료한다.
$ rabbitmqctl stop_app

# join_cluster를 이용해 my-rabbitmq1에 조인한다.
$rabbitmqctl join_cluster rabbit@my-rabbitmq1

# 앱을 실행한다.
$rabbitmqctl start_app

 

rabbitmq1의 노드에 접속 후 다음의 명령어를 통해 cluster를 확인 할 수 있다.

$ rabbitmqctl cluster_status

# rabbitmqctl cluster_status
Cluster status of node rabbit@my-rabbitmq1 ...
Basics

Cluster name: rabbit@my-rabbitmq1

Disk Nodes

rabbit@my-rabbitmq1
rabbit@my-rabbitmq2
rabbit@my-rabbitmq3

Running Nodes

rabbit@my-rabbitmq1
rabbit@my-rabbitmq2
rabbit@my-rabbitmq3

Versions

rabbit@my-rabbitmq1: RabbitMQ 3.8.11 on Erlang 23.2.3
rabbit@my-rabbitmq2: RabbitMQ 3.8.11 on Erlang 23.2.3
rabbit@my-rabbitmq3: RabbitMQ 3.8.11 on Erlang 23.2.3

 

웹 관리자 화면에서도 정상적으로 등록된것을 확인할 수 있다

 

 


 

HA

x-ha-policy 속성을 설정하여 이 큐를 HA 큐로 만든 후 메시지를 전송 및 수신하면 된다.

import pika

# RabbitMQ 연결 설정
credentials = pika.PlainCredentials('guest', 'guest')
parameters = pika.ConnectionParameters('localhost', credentials=credentials)
connection = pika.BlockingConnection(parameters)
channel = connection.channel()

# 큐 정의 (HA 큐로 만들려면 x-ha-policy 속성을 설정해야 합니다.)
channel.queue_declare(queue='ha_queue', durable=True, arguments={'x-ha-policy': 'all'})

# 메시지 전송
channel.basic_publish(exchange='',
                      routing_key='ha_queue',
                      body='Hello, RabbitMQ!',
                      properties=pika.BasicProperties(
                         delivery_mode = 2,  # 메시지를 영구 저장
                      ))
print(" [x] Sent 'Hello, RabbitMQ!'")

# 메시지 수신 콜백 함수
def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)

# 큐에서 메시지 수신
channel.basic_consume(queue='ha_queue',
                      on_message_callback=callback,
                      auto_ack=True)

print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()

'server > 아키텍쳐' 카테고리의 다른 글

rabbitmq start  (0) 2024.05.04
airflow scheduler high cpu usage  (1) 2021.11.29
캐시  (0) 2021.07.14
Consistent Hashing (일관된 해싱)  (0) 2021.07.12
airflow 시간대가 다른 두개의 dag을 ExternalTaskSensor 사용하기  (0) 2021.04.16