[RabbitMQ] 게시, 구독(Publish / Subscribe)

참조


소개

  • 이번 포스팅에서는 하나의 메시지를 여러개의 작업자에게 배달 할 수 있는 방법에 대해서 알아 봅니다.
  • 이 패턴은 게시(Publish) / 구독(Subscribe) 으로 알려져 있습니다.

시나리오

  • 게시(Publish) / 구독(Subscribe) 패턴을 이해하기 위해서, 간단한 로깅 시스템을 만들어 봅ㄴ디ㅏ.
  • 이것은 두개의 프로그램으로 구성되어 있는데, 첫 번째는 로그 메시지를 방출하고 두번째는 그것을 받아서 출력합니다.
  • 로깅 시스템에서 모든 동작하는 Receiver 프로그램 복사본은 메시지를 받을 것입니다.
  • 하나의 Receiver는 로그를 디스크에 저장하고, 동시에 다른 Receiver는 콘솔 창에 출력합니다.
  • 필수적으로, 게시된 로그 메시지는 모든 Receiver 들에게 브로드캐스트 됩니다.

Exchange

  • 앞서 가볍게 RabbitMQ 의 용어를 다시 정의합니다.
    • Producer는 메시지를 전송하는 사용자 응용프로그램입니다.
    • Queue는 메시지를 저장하는 버퍼입니다.
    • Consumer는 메시지를 수신하는 사용자 응용프로그램입니다.

  • RabbitMQ에서 메시징 모델의 중심 생각은 Producer는 어떠한 메시지라도 큐에 직접 전송하지 않는다는 것입니다.
  • 실제 Producer는 메시지가 큐에 전달되었는지 여부를 전혀 알지 못합니다.
  • 대신에 Producer는 오직 메시지를 exchange에 전송할 뿐입니다.
  • Exchange는 매우 간단합니다.
  • 한쪽에서 exchange는 producer로부터 메시지를 받고 다른 쪽에서 그 메시지를 queue에게 건네줍니다.
  • Exchange는 전달받은 메시지를 가지고 무엇을 할지를 정확히 알아야만 합니다.
  • 사용 가능한 몇몇 Exchange type들이 있습니다.
  • direct, topic, headers, fanout 들이 그 type 들에 속합니다.
  • 여기서 fanout 에 집중합니다.
  • fanout 타입으로 logs 라고 불리는 Exchange를 만들어 봅니다.

channel.exchange_declare(exchange='logs',
                         exchange_type='fanout')
  • fanout exchange는 이름에서 유추할 수 있듯이, 이것은 모든 메시지를 알고 있는 모든 큐들에게 전달이 됩니다.
  • 이제 다음과 같이 이름이 있는 exchange 에게 게시를 할 수 있게 되었습니다.
channel.basic_publish(exchange='logs',
                      routing_key='',
                      body=message)

임시 큐

  • 이전에 특정 이름을 갖는 큐를 사용했습니다.
  • 큐가 이름을 갖는 것은 아주 중요한데, 우리는 작업자에게 동일한 큐를 지정해야 하기 때문입니다.
  • Producer와 Consumer 사이에 큐를 공유할 때 큐의 이름은 매우 중요합니다.
  • 하지만 우리 로거의 경우에는 그렇지 않습니다.
  • 로그의 부분이 아닌 모든 메시지를 받기를 원합니다.
  • 또한, 오래된 로그가 아닌 현재 나오고 있는 로그에 관심이 있습니다. 이를 위해 2가지가 필요합니다.
    • 먼저 RabbitMQ에 연결할 때마다 새로운 빈 큐가 필요합니다.
    • 이렇게 하기 위해서는 서버가 임의의 이름으로 새롭게 생성해 주는 큐가 필요합니다.
result = channel.queue_declare(queue='')
  • 두 번째로, Consumer의 연결을 종료하면 큐는 자동적으로 삭제되어야 합니다.
result = channel.queue_declare(queue='', exclusive=True)

Bindings

  • 지금까지 fanout exchange와 queue을 생성했습니다.
  • 이제 exchange에게 메시지를 queue에게 전송하라고 지시해야 합니다.
  • 여기서 Exchange와 queue 사이의 관계를 binding 이라고 부릅니다.
channel.queue_bind(exchange='logs',
                   queue=result.method.queue)

publish.py

import pika
import sys

connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='logs', exchange_type='fanout')

message = ' '.join(sys.argv[1:]) or "info: Hello World!"
channel.basic_publish(exchange='logs', routing_key='', body=message)
print(" [x] Sent %r" % message)
connection.close()

receive.py

import pika

connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='logs', exchange_type='fanout')

result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue

channel.queue_bind(exchange='logs', queue=queue_name)

print(' [*] Waiting for logs. To exit press CTRL+C')

def callback(ch, method, properties, body):
    print(" [x] %r" % body)

channel.basic_consume(
    queue=queue_name, on_message_callback=callback, auto_ack=True)

channel.start_consuming()
728x90

이 글을 공유하기

댓글

Designed by JB FACTORY