[RabbitMQ] Work Queues

참조


소개

  • 작업 큐(Work Queue) 의 주된 아이디어는 자원 집약적인 작업을 즉시 수행하지 않고 완료 될 때까지 기다리자 않아야 합니다.
  • 대신 나중에 수행 할 작업을 예약 합니다.

시나리오

  • 작업을 메시지로 캡슐화 하여 큐로 보냅니다.
  • 백그라운드에서 실행 중인 작업자 프로세스는 작업을 팝업하고 결국 작업을 실행합니다.
  • 많은 작업자를 실행하면 작업이 그들 사이에 공유 됩니다.

Worker

import time
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

channel.queue_declare(queue='task_queue')

def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)
    time.sleep(body.count(b'.'))
    print(" [x] Done")

channel.basic_consume(queue='task_queue',
                      auto_ack=True,
                      on_message_callback=callback)

print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()     

NetTask

import sys
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

message = ' '.join(sys.argv[1:]) or "Hello World!"

channel.queue_declare(queue='task_queue')

channel.basic_publish(exchange='',
                      routing_key='task_queue',
                      body=message)
print(" [x] Sent %r" % message)

라운드-로빈 배분(Round-robin Dispatching)

  • Task Queue 를 사용하는 이점 중 하나는 쉽게 병렬 작업을 할 수 있는 능력입니다.
  • 만약 우리가 작업의 잔무를 처리하고 있다면, 우리는 작업자를 좀 더 추가하여 쉽게 규모를 키울 수 있습니다.
  • 먼저 동시에 두 개의 Worker.py 파일을 실행합니다.
  • 이들은 모두 Queue로부터 메시지를 얻으려 할 것입니다.
  • 3개의 파이썬 파일을 실행해야 합니다.
  • 그 중 2개는 Worker 를 실행합니다.
  • 해당 서비스는 2개의 Consumer, 즉 C1 과 C2가 되는 것입니다.
  • 기본적으로 RabbitMQ는 각 메시지들을 연속적으로 다음 Consumer 에게 전송합니다.
  • 평균적으로 모든 Consumer 는 같은 수의 메시지들을 얻습니다.
  • 이런 방식의 메시지 분산은 Round-robin 이라 불립니다.
# shell 1
python worker.py
# => [*] Waiting for messages. To exit press CTRL+C
# shell 2
python worker.py
# => [*] Waiting for messages. To exit press CTRL+C
# shell 3
python new_task.py First message.
python new_task.py Second message..
python new_task.py Third message...
python new_task.py Fourth message....
python new_task.py Fifth message.....


메시지 승인(Acknowledgement)

  • 작업을 수행하는 것은 몇 초가 걸릴 수도 있습니다.
  • 만약 Consumer 중의 하나가 긴 작업을 시작했고 그 작업을 부분적으로 수행하고 죽을 수 있습니다.
  • 앞에서 작성한 코드에서는 RabbitMQ가 메시지를 Consumer에게 전달하면 즉시 메모리로부터 그 메시지를 제거합니다.
  • 이런 경유에 만약 작업자를 죽이면 진행중이었던 그 메시지를 잃게 됩니다.
  • 또한 그 특별한 작업자에게 전달되어 아직 수행되지 않은 모든 메시지들도 잃게 됩니다.
  • 하지만, 작업들을 잃게 되는 것을 원하지 않는다면, 작업자가 죽는 경우 그 작업은 다른 작업에게 전달되어야 합니다.
  • 메시지를 잃어버리지 않기 위해 RabbitMQ는 Message Acknowledgement 를 제공합니다.
  • Consumer 는 RabbitMQ에게 Ack(nowledgement) 를 되돌려 주는데, 이는 특정 메시지가 처리되었음을 알려주어 RabbitMQ가 그 메시지를 지울 수 있도록 합니다.
  • 메시지 타임 아웃은 없습니다.
  • RabbitMQ는 Consumer가 죽을 때 그 메시지를 재 배달 합니다.
  • Message Acknowlegdement는 기본으로 동작하게 되어 있습니다.
  • 이전 예제에서 noAck 파라미터를 true로 설정함으로 명시적으로 이 기능을 사용하지 않도록 하였습니다.
  • 해당 플래그를 제거하여 작업자가 적절한 Ack를 보내도록 설정합니다.
import time
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

channel.queue_declare(queue='task_queue')

def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)
    time.sleep(body.count(b'.'))
    print(" [x] Done")
    ch.baskc_ack(delivery_tag=method.delivery_tag)

channel.basic_consume(queue='task_queue',
                      on_message_callback=callback)

print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()            
  • 위 코드를 사용하면 메시지를 처리하는 동안 CTRL_C 를 사용하여 작업자를 종료 시키더라도 아무것도 손실되지 않습니다.
  • 작업자가 종료된 직후 모든 미확인 메시지가 재전송 됩니다.
  • 수신 확인은 전송을 수신 한 동일한 채널을 통해 전송되어야 합니다.
  • 다른 채널을 사용하여 확인 응답을 시도하면 채널 수준 프로토콜에서 예외가 발생합니다.

메시지 내구성

  • 앞에서 Consumer 가 죽더라도 작업을 일지 않게 하는 방법을 배웠습니다.
  • 하지만 RabbitMQ 서버가 중지되면 모든 작업들을 잃게 됩니다.
  • RabbitMQ가 멈추면, RabbitMQ는 Queue와 메시지를 모두 잃게 됩니다.
  • 메시지를 잃지 않게 하기 위해서는 2가지가 필요합니다.
  • 우리는 Queue와 메시지 모두 내구성을 갖도록 해야 합니다.
  • 먼저, RabbitMQ가 Queue를 잃지 않도록 하기 위해서는 Queue를 durable 로 선언합니다.
channel.queue_declare(queue='hello', durable=True)
  • 위 명령 현재 상황에서 동작하지 않습니다.
  • 내구성이 없는 "hello" 라 불리는 queue를 이미 선언했기 때문입니다.
  • RabbitMQ는 이미 존재하는 queue를 다른 파라미터로 재정의하는 것을 허용하지 않습니다.
  • 그러므로 다른 이름으로 queue를 선언해야 합니다.
channel.queue_declare(queue='task_queue', durable=True)
  • "task_queue" queue는 RabbitMQ가 재시작 되더라도 메시지를 잃어버리지 않습니다.
  • 이제 메시지가 IBasicProperites.SetPersistent를 true 로 설정해서 영속적이 되도록 설정해야 합니다.
channel.basic_publish(exchange='',
                      routing_key="task_queue",
                      body=message,
                      properties=pika.BasicProperties(
                         delivery_mode = 2, # make message persistent
                      ))

메시지를 영속적으로 설정하더라도 메시지는 잃어 버릴수 있습니다. 비록 RabbitMQ에게 메시지를 디스크에 저장하라고 하더라도, RabbitMQ가 메시지를 받고 아직 저장하지 않은 짧은 시간이 존재합니다. 영속성 보증은 강하지 않습니다. 만약 더 강한 보증이 필요하다면, publisher confirm을 사용해야 합니다.


공평한 분배

  • 예를 들어 2개의 작업자가 있는 경우에, 모든 홀수번째 메시지는 무겁고, 모든 짝수 번째 메시지는 가볍다고 한다면, 한 작업자는 꾸준히 바쁠 것이고 나머지 작업자는 거의 동작하지 않을 것입니다.
  • RabbitMQ는 그것에 관해 어떠한 것도 알지 못하며 공평하게 메시지를 분배할 것입니다.
  • 이것은 메시지가 Queue에 들어올 때 RabbitMQ가 메시지를 분배하기 때문에 발생합니다.
  • RabbitMQ는 Consumer에 대한 승인되지 않은 메시지의 수는 살펴보지 않습니다.
  • RabbitMQ는 맹목적으로 모든 n번째 메시지를 n번째 consumer에게 분배합니다.
  • 이 문제를 해결하기 위해 BasicQos 메서드를 prefetchCount = 1 설정으로 사용할 수 있습니다.
  • 이것은 RabbitMQ가 작업자에게 한번에 하나 이상의 메시지를 주지 않도록 합니다.
  • 다시 말해, 작업자가 이전 메시지를 처리하고 승인하기 전까지는 새로운 메시지를 분배하지 않습니다.
  • 대신에, 바쁘지 않은 다른 작업자에게 분배 됩니다.

channel.basic_qos(prefetch_count=1)
  • prefetchsize는 "특정한 제한 없음" 을 의미하는 0으로만 설정할 수 있습니다.
  • prefetchSize는 아직 구현되지 않았다고 RabbitMQ 공식 문서에서 설명하고 있습니다.
  • 만약 모든 작업자가 바쁘다면, 큐에는 메시지가 점점 채워질 것입니다.
  • 큐를 모니터링하고 있다가 큐가 점점 채워지면 작업자를 더 추가하거나, 다른 전략을 사용해야 합니다.

전체 소스

New Task

#!/usr/bin/env python
import pika
import sys

connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.queue_declare(queue='task_queue', durable=True)

message = ' '.join(sys.argv[1:]) or "Hello World!"
channel.basic_publish(
    exchange='',
    routing_key='task_queue',
    body=message,
    properties=pika.BasicProperties(
        delivery_mode=2,  # make message persistent
    ))
print(" [x] Sent %r" % message)
connection.close()

Worker

#!/usr/bin/env python
import pika
import time

connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.queue_declare(queue='task_queue', durable=True)
print(' [*] Waiting for messages. To exit press CTRL+C')


def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)
    time.sleep(body.count(b'.'))
    print(" [x] Done")
    ch.basic_ack(delivery_tag=method.delivery_tag)


channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue='task_queue', on_message_callback=callback)

channel.start_consuming()
728x90

이 글을 공유하기

댓글

Designed by JB FACTORY