rabbitmq persistent
delivery_mode는 RabbitMQ에서 메시지의 배달 모드를 지정하는 데 사용되는 옵션. 이 옵션은 메시지를 메모리에만 저장할지 아니면 디스크에 영구적으로 저장할지를 결정
- Transient
- 메시지를 메모리에만 저장. RabbitMQ가 종료되면 메모리에 있는 메시지는 모두 삭제됨
- 메시지는 RabbitMQ 서버의 메모리에서만 보관되므로 메모리가 부족한 경우 메시지 손실이 발생할 수 있음
- 메시지를 가장 빠르게 처리하고자 할 때 사용
- Persistent
- 메시지를 디스크에 저장. RabbitMQ가 종료되어도 메시지는 디스크에 보존됨.
- 메시지를 디스크에 영구적으로 저장하여 메시지 손실을 방지할 수 있으며 재부팅 후에도 메시지가 유지되므로 안정성을 높일 수 있다.
- 메시지의 지속성이 중요하고 메모리가 아니라 디스크에 저장되어야 할 때 사용됨. 신뢰성이나 데이터 손실 방지를 위해 사용된다.
# 디스크 저장
import pika
# RabbitMQ 서버에 연결합니다.
credentials = pika.PlainCredentials('admin', 'rabbitpassword')
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost', credentials=credentials))
channel = connection.channel()
# 메시지를 저장할 큐를 선언합니다.
channel.queue_declare(queue='disk_persistent_queue', durable=True)
# 메시지를 보냅니다. 메시지는 디스크에 저장됩니다.
channel.basic_publish(exchange='',
routing_key='disk_persistent_queue',
body='Hello, RabbitMQ!',
properties=pika.BasicProperties(
delivery_mode=2, # 메시지를 디스크에 저장합니다. (persistent)
))
print(" [x] Sent 'Hello, RabbitMQ!'")
# 연결을 닫습니다.
connection.close()
큐사 생성된거 확인
docker-compose를 종료 후 다시 실행해 보면 해당 메시지는 다시 복구 된것을 확인할수 있다. (메모리에 저장되었던 hello큐는 삭제됨)
rabbitmq cluster
rabbitmqctl cluster_status 명령어를 통해 현재 cluster 상태를 확인할 수 있다.
현재는 1개의 노드만을 사용중이다.
3개의 노드를 띄우기 위해서 docker-compose를 다음과 같이 수정한다.
version: "3.7"
services:
rabbitmq1:
image: rabbitmq:3.8.11-management-alpine
hostname: my-rabbitmq1 # 호스트 이름 변경
ports:
- "5672:5672"
- "15672:15672"
- "4369:4369" # EPMD 포트 매핑
environment:
RABBITMQ_NODENAME: rabbit@my-rabbitmq1 # 노드 이름 변경
volumes:
- ./rabbitmq1:/var/lib/rabbitmq
- ./rabbitmq1/.erlang.cookie:/var/lib/rabbitmq/.erlang.cookie
networks:
- rabbitmq-network
rabbitmq2:
image: rabbitmq:3.8.11-management-alpine
hostname: my-rabbitmq2 # 호스트 이름 변경
ports:
- "5673:5672"
- "15673:15672"
- "4368:4369" # EPMD 포트 매핑
environment:
RABBITMQ_NODENAME: rabbit@my-rabbitmq2 # 노드 이름 변경
volumes:
- ./rabbitmq2:/var/lib/rabbitmq
- ./rabbitmq2/.erlang.cookie:/var/lib/rabbitmq/.erlang.cookie
networks:
- rabbitmq-network
rabbitmq3:
image: rabbitmq:3.8.11-management-alpine
hostname: my-rabbitmq3 # 호스트 이름 변경
ports:
- "5674:5672"
- "15674:15672"
- "4367:4369" # EPMD 포트 매핑
environment:
RABBITMQ_NODENAME: rabbit@my-rabbitmq3 # 노드 이름 변경
volumes:
- ./rabbitmq3:/var/lib/rabbitmq
- ./rabbitmq3/.erlang.cookie:/var/lib/rabbitmq/.erlang.cookie
networks:
- rabbitmq-network
networks:
rabbitmq-network:
driver: bridge
docker-compose up을 통해 노드를 실행 한 후
rabbitmq2와 rabbitmq3에 접속 후 cluster를 설정해준다. (rabbitmq1에 join한다)
# rabbitmq 앱을 종료한다.
$ rabbitmqctl stop_app
# join_cluster를 이용해 my-rabbitmq1에 조인한다.
$rabbitmqctl join_cluster rabbit@my-rabbitmq1
# 앱을 실행한다.
$rabbitmqctl start_app
rabbitmq1의 노드에 접속 후 다음의 명령어를 통해 cluster를 확인 할 수 있다.
$ rabbitmqctl cluster_status
# rabbitmqctl cluster_status
Cluster status of node rabbit@my-rabbitmq1 ...
Basics
Cluster name: rabbit@my-rabbitmq1
Disk Nodes
rabbit@my-rabbitmq1
rabbit@my-rabbitmq2
rabbit@my-rabbitmq3
Running Nodes
rabbit@my-rabbitmq1
rabbit@my-rabbitmq2
rabbit@my-rabbitmq3
Versions
rabbit@my-rabbitmq1: RabbitMQ 3.8.11 on Erlang 23.2.3
rabbit@my-rabbitmq2: RabbitMQ 3.8.11 on Erlang 23.2.3
rabbit@my-rabbitmq3: RabbitMQ 3.8.11 on Erlang 23.2.3
웹 관리자 화면에서도 정상적으로 등록된것을 확인할 수 있다
HA
x-ha-policy 속성을 설정하여 이 큐를 HA 큐로 만든 후 메시지를 전송 및 수신하면 된다.
import pika
# RabbitMQ 연결 설정
credentials = pika.PlainCredentials('guest', 'guest')
parameters = pika.ConnectionParameters('localhost', credentials=credentials)
connection = pika.BlockingConnection(parameters)
channel = connection.channel()
# 큐 정의 (HA 큐로 만들려면 x-ha-policy 속성을 설정해야 합니다.)
channel.queue_declare(queue='ha_queue', durable=True, arguments={'x-ha-policy': 'all'})
# 메시지 전송
channel.basic_publish(exchange='',
routing_key='ha_queue',
body='Hello, RabbitMQ!',
properties=pika.BasicProperties(
delivery_mode = 2, # 메시지를 영구 저장
))
print(" [x] Sent 'Hello, RabbitMQ!'")
# 메시지 수신 콜백 함수
def callback(ch, method, properties, body):
print(" [x] Received %r" % body)
# 큐에서 메시지 수신
channel.basic_consume(queue='ha_queue',
on_message_callback=callback,
auto_ack=True)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
'server > 아키텍쳐' 카테고리의 다른 글
rabbitmq start (0) | 2024.05.04 |
---|---|
airflow scheduler high cpu usage (1) | 2021.11.29 |
캐시 (0) | 2021.07.14 |
Consistent Hashing (일관된 해싱) (0) | 2021.07.12 |
airflow 시간대가 다른 두개의 dag을 ExternalTaskSensor 사용하기 (0) | 2021.04.16 |