RabbitMQ - Demo with python

rabbitMQ

1. Message-boker ?

  • Là chương trình đóng vai trò trung gian lưu trữ cũng như điều phối (valadating, transforming, routing messages) các yêu cầu (message) giữa sender và reciever.
  • Mesage-boker có 2 hình thức giao tiếp cơ bản là:
    • Publish và Subscribe (Topics)
    • Point-to-Point (Queues)

2. RabbitMQ là gì ?

  • RabbitMQ là một message boker (message-oriented middleware) hay còn gọi là phần mềm quản lý hàng đợi message (thường được gọi là môi giới message hay trình quản lý message). Nói đơn giản đây là phần mềm định nghĩa hàng đợi một ứng dụng khác có thể kết nối tới để bỏ message vào và gửi message dựa trên nó.

    Message: ở đây có thể chứa nhiều kiểu thông tin. Ví dụ như thông tin về một process/task để khởi động một ứng dụng nào đó (nằm trên một server khác), hoặc có thể là một message chứa text đơn giản.

  • RabbitMQ hỗ trợ nhiều giao thức (AMQP, STOMP, MQTT, HTTP and Websockets), tuy nhiên phương thức phố biến nhất mà rabbitmq sử dụng là AMQP - Advanced Message Queue Protocol

message-queue-small

  • Phần mềm quản lý hàng đợi chứa các message cho đến khi ứng dụng nhận đến lấy message.
  • Một số thuật ngữ trong RabbitMQ
    • Producer: Bên phát hành message (publisher)
    • Consumer: Bên nhận tin (subscriber)
    • Exchange: Làm nhiệm vụ điều hướng message từ producer đến các queue bên trong do các message không được công khai trực tiếp trong queue.
    • Routing key: Là một khóa mà exchange dùng nó để quyết định cách đưa vào hàng đợi. Routing key có thể hiểu như một địa chỉ của message.
    • Queues: có nhiệm vụ lưu trữ bản tin được gửi lên
    • Connection: Là một kết nối TCP giữa ứng dụng của bạn và RabbitMQ
    • Channel: Một channel là một kết nối ảo bên trong một connection. Khi bạn đẩy đi hoặc nhận các message từ hàng đợi, tất cả phải đi qua channel
    • Binding: Là một kết nối giữa hàng đợi và exchange
    • User: người dùng có thể kết nối đến #RabbitMQ bằng username/password. Mỗi người dùng được cấp quyền như đọc, ghi và cấu hình quyền bên trong một instance. User còn có quyền trên một host ảo.
    • Virtual Host: Cung cấp chức năng tách ứng dụng dùng trên cùng #RabbitMQ. Người dùng khác nhau có quyền hạn khác nhau trên virtual host, hàng đợi hay exchange khác nhau. Chúng chỉ tồn tại trong một virtual host.

3. Tại sao dùng RabbitMQ ?

  • Problems:

    • Đối với các hệ thống sử dụng kiến trúc microservice thì việc gọi chéo giữa các service quá nhiều khiến luồng xử lý khá phức tạp.
    • Mức độ trao đổi data giữa các thành phần tăng lên khiến cho việc lập trình trở nên khó khăn (maintain).
    • Khi phát triển làm sao để dev tập trung vào business logic thay vì các công việc trao đổi ở tầng infrastructure.
    • Với các hệ thống phân tán, khi việc giao tiếp giữa các thành phần đòi hỏi chúng phải biết nhau. Nhưng điều này rắc rối cho việc viết code. Một thành phần phải viết quá nhiều đâm ra rất khó maintain, debug
  • Sử dụng RabbitMQ:

    • Theo dõi được message và có thể retry (service ngừng hoạt động thì message vẫn còn trong queue).
    • Một producer không cần phải biết comsumer. Nó chỉ việc gởi message đến các queue trong message-boker. Consumer chỉ việc đăng ký nhận message từ queue này.
    • Vì producer giao tiếp với consumer trung gian qua message broker nên dù producer và consumer có khác biệt nhau về ngôn ngữ thì giao tiếp vẫn thành công.(Hiện nay rabbitmq đã hỗ trợ rất nhiều ngôn ngữ khác nhau).
    • Một đặc tính của rabbitmq là bất đồng bộ(asynchronous). Producer không thể biết khi nào message đến được consumer hay khi nào message được consumer xử lý xong. Đối với producer, đẩy message đến message broker là xong việc. Consumer sẽ lấy message về khi nó muốn. Đặc tính này có thể được tận dụng để xây dựng các hệ thống lưu trữ và xử lý log.

4. Exchange

Mặc định exchange là chuỗi “”. Một exchange có thể có nhiều queue. Exchange có 4 loại:

  • Fanout: Một Fanout exchange sẽ đẩy message đến toàn bộ hàng đợi gắn với nó.
  • Direct: Một Direct exchange sẽ đẩy message đến hàng đợi dựa theo khóa định tuyến – routing key (do producer khai báo). Ví dụ, nếu hàng đợi gắn với một exchange có binding key là pdfprocess, message được đẩy vào exchange với routing key là pdfprocess sẽ được đưa vào hàng đợi.
  • Topic: Một topic exchange sẽ làm một lá bài (gọi là wildcard) để gắn routing key với một routing pattern khai báo trong binding Consumer có thể đăng ký những topic mà nó quan tâm. Cú pháp được sử dụng ở đây là * và #. Ví dụ: - booking.* -> Được đăng ký bởi tất cả những key với pattern bắt đầu bằng booking. - booking.# -> Được đăng ký bởi tất cả các key booking hoặc bắt đầu với booking
  • Headers: Một header exchange sẽ dùng các thuộc tính header của message để định tuyến. Headers Exchange rất giống với Topic Exchange, nhưng nó định tuyến dựa trên các giá trị tiêu đề thay vì các khóa định tuyến.
  • Dead Letter Exchange: Nếu không tìm thấy hàng đợi phù hợp cho tin nhắn, tin nhắn sẽ tự động bị hủy. RabbitMQ cung cấp một tiện ích mở rộng AMQP được gọi là “Dead Letter Exchange” — Cung cấp chức năng để chụp các tin nhắn không thể gửi được.

5. Workflow của RabbitMQ ?

exchanges-bidings-routing-keys.

  1. Producer đẩy message vào exchange. Khi tạo exchange, phải mô tả nó thuộc loại gì.
  2. Sau khi exchange nhận message, nó chịu trách nhiệm định tuyến message. Exchange sẽ chịu trách về các thuộc tính của message, ví dụ routing key, phụ thuộc loại exchange.
  3. Việc binding phải được tạo từ exchange đến hàng đợi. Trong trường hợp như ảnh, ta sẽ có hai binding đến hai hàng đợi khác nhau từ một exchange. Exchange sẽ định tuyến message vào các hàng đợi dựa trên thuộc tính của của từng message.
  4. Các message nằm ở hàng đợi đến khi chúng được xử lý bởi một consumer.
  5. Consumer xử lý message.

6. Thử bắn và nhận msg với rabbitmq (python)

  • Tạo một service gởi - sender.py: với queue tên ‘demo’, exchange ’logs’ đẩy msg đến toàn bộ queue (exchange_type=‘fanout’). Bắn msg với routing_keys=‘key1’, nội dung trong body
import pika

# using CloudAMQP (https://www.cloudamqp.com/)
CLOUDAMQP_URL = 'amqp://vvrzbnja:[email protected]/vvrzbnja'

# establish a connection with RabbitMQ server.
params = pika.URLParameters(CLOUDAMQP_URL)
connection = pika.BlockingConnection(params)
channel = connection.channel()

# create queue with name 'demo'
channel.queue_declare(queue='demo')
channel.exchange_declare(exchange='logs', exchange_type='fanout')

# Ready to send a message
channel.basic_publish(exchange='logs',
                      routing_key='key1',
                      body='Hello world!')

print(" Sent message")
# close connection
connection.close()
  • Service nhận message- receiver.py. Đăng ký nhận msg ở queue ‘demo’
import pika

CLOUDAMQP_URL = 'amqp://vvrzbnja:[email protected]/vvrzbnja'

# Access the CLODUAMQP_URL environment variable and parse it (fallback to localhost)
params = pika.URLParameters(CLOUDAMQP_URL)
connection = pika.BlockingConnection(params)
channel = connection.channel()  # start a channel
channel.queue_declare(queue='demo')  # Declare a queue


def callback(ch, method, properties, body):
    print(" [x] Received " + str(body))


channel.basic_consume('hello',
                      callback,
                      auto_ack=True)

print(' [*] Waiting for messages:')
channel.start_consuming()
connection.close()

  • Run
    $ python sender.py
    $ python receiver.py

Sender

Result

References: RabbitMQ for beginners - What is RabbitMQ? Những điều cần biết về RabbitMQ RabbitMQ và Kafka