GP Coder

Trang chia sẻ kiến thức lập trình Java

  • Java Core
    • Basic Java
    • OOP
    • Exception Handling
    • Multi-Thread
    • Java I/O
    • Networking
    • Reflection
    • Collection
    • Java 8
  • Design pattern
    • Creational Pattern
    • Structuaral Pattern
    • Behavior Pattern
  • Web Service
    • SOAP
    • REST
  • JPA
  • Java library
    • Report
    • Json
    • Unit Test
  • Message Queue
    • ActiveMQ
    • RabbitMQ
  • All
Trang chủ Message Queue Sử dụng Dead Letter Exchange trong RabbitMQ

Sử dụng Dead Letter Exchange trong RabbitMQ

Đăng vào 13/06/2020 . Được đăng bởi GP Coder . 7847 Lượt xem . Toàn màn hình

Nội dung

  • 1 Dead Letter Exchange trong RabbitMQ
  • 2 Ví dụ sử dụng Dead Letter Exchange trong RabbitMQ

Dead Letter Exchange trong RabbitMQ

Dead Letter Exchange là gì?

Dead Letter là một tin nhắn không thể gửi đến người nhận. Dead Letter Queue (DLQ), là hàng đợi chứa tin nhắn chưa được gửi, không thể được gửi đến đích của chúng vì lý do này hay lý do khác.

Trong hàng đợi tin nhắn, DLQ là một dịch vụ được cài đặt để lưu trữ các tin nhắn đáp ứng một hoặc nhiều sự kiện sau:

  • Tin nhắn bị từ chối (rejected) bởi một Queue Exchange.
  • Message hết hạn (expire) do Time to live (TTL).
  • Vượt quá giới hạn chiều dài hàng đợi (length limit).

Dead Letter Exchange là một Exchange bình thường, có thể là một trong 4 loại Exchange (Direct, Fanout, Topic, Headers).

Điều gì xảy ra với Dead Letter Message?

  • Gửi tới một Dead Letter Exchange.
  • Thêm một số thông tin vào header của Message trước khi gửi đến Dead Letter Exchange.

Cấu hình Dead Letter Exchange sử dụng Optional Queue Arguments

Để gán một Dead Letter Exchange cho một Queue sử dụng agruments x-dead-letter-exchange khi định nghĩa Queue.


channel.exchangeDeclare("gpcoder.exchange.name", "direct");

Map<String, Object> args = new HashMap<String, Object>();
args.put("x-dead-letter-exchange", "gpcoder.exchange.name");
channel.queueDeclare("GPCoderQueue", false, false, false, args);

Đoạn code trên đơn giản tạo một Exchange mới gọi là gpcoder.exchange.name và Exchange mới này là Dead Letter Exchange cho hàng đợi mới được tạo. Lưu ý rằng Exchange  không phải được khai báo khi hàng đợi được khai báo, nhưng nó phải tồn tại tại thời điểm các Message được Dead Letter, nếu không thì các Message sẽ bị hủy bỏ.

Theo mặc định RabbitMQ sẽ lấy khoá định tuyến từ Message ban đầu được gửi đến Exchange. Chúng ta cũng có thể chỉ định một khóa định tuyến sẽ được sử dụng khi Dead Letter Message nếu muốn.

args.put("x-dead-letter-routing-key", "some-routing-key");

Routing Dead-Lettered Message

Dead-lettered message được định tuyến tới Dead Letter Exchange theo thứ tự ưu tiên sau:

  • Theo khoá định tuyến được chỉ định cho hàng đợi Dead Letter Message được chỉ định.
  • Theo khoá định tuyến ban đầu Message được publish.

Ví dụ: Nếu một Message ban đầu được publish đến Exchange với khoá định tuyến foo. Sau đó Message này bị reject và nó trở thành một Dead Letter Message. Và nó được publish đến Dead Letter Exchange với khoá định tuyến foo. Giả sử chúng ta đã chỉ định một khóa định tuyến sẽ được sử dụng khi Dead Letter Message là bar, lúc này Message được publish đến Dead Letter Exchange với khoá định tuyến bar.

Dead-lettered message được re-published với tính năng publisher confirm được bật mặc định để đảm bảo rằng Dead-letter queue phải gửi xác nhận Message (ack) đã được lưu trữ trước khi nó bị xoá ở Queue gốc.

Message bị thay đổi như thế nào khi chuyển sang Dead-Lettered Message?

Header bị thay đổi:

  • Exchange name bị thay thế bởi Dead-letter exchange name.
  • Routing key có thể bị bởi một Routing key khác.
  • Nếu điều trên xảy ra, thì CC và BCC header cũng sẽ bị remove.

Thêm header x-death, với một mảng các giá trị:

  • queue: tên của Queue mà Message được publish trước khi nó trở thành Dead-Lettered Message.
  • reason: lý do xảy ra Dead-Lettered Message. Có thể là: rejected, expired, maxlen.
  • time: thời gian Message bị dead lettered.
  • exchange: tên Exchange mà Message được publish, nó có thể là dead letter exchange nếu Message bị dead lettered nhiều lần.
  • routing-keys: là routing key (bao gồm CC keys nhưng không bao gồm BCC keys) của Message được publish.
  • count: số lần mà Message bị dead-lettered trong Queue này vì lý do này.
  • original-expiration: nếu Message bị dead-lettered vì lý do TTL, nào là giá trị của thuộc tính expiration ban đầu của Message. Thuộc tính expiration sẽ bị remove từ dead-lettering message để ngăn việc expiring lần nữa trong Queue mà nó được định tuyến đến.

3 header được thêm cho mỗi dead-lettering event đầu tiên:

  • x-first-death-reason
  • x-first-death-queue
  • x-first-death-exchange

Chúng có cùng các giá trị như reason, queue, và exchange của event xảy ra Dead-Lettered ban đầu. Sau khi thêm, các header này không bao giờ được sửa đổi.

Lưu ý rằng: mảng giá trị x-death được sắp xếp gần đây nhất trước, do đó, Dead-Lettered gần đây nhất sẽ được ghi lại trong mục đầu tiên.

Ví dụ sử dụng Dead Letter Exchange trong RabbitMQ

Trong ví dụ này tôi sử dụng Dead Letter Exchange để mô phỏng trường hợp Retry xử lý sau mỗi 300 millisecond nếu ngay lần nhận Message đó không thể xử lý thành công.

  1. Tạo một WorkQueue và bind đến WorkExchange.
  2. Tạo một RetryQueue và bind đến RetryExchange.
    • Gán agruments: x-dead-letter-exchange đến WorkExchange
    • Gán agruments: x-message-ttl là 300 ms.
  3. Producer publish một Message đến WorkExchange. Sau đó, WorkExchange sẽ chuyển Message đến WorkQueue.
  4. Consumer nhận Message từ WorkQueue và cố gắng xử lý nó.
  5. Trường hợp xử lý thất bại, Consumer sẽ publish Message đó đến RetryExchange. Sau đó, RetryExchange sẽ chuyển Message đến RetryQueue.
  6. Message sẽ lưu tại RetryQueue trong 300 ms.
  7. Khi Message bị expire, nó sẽ được chuyển đến WorkExchange và được chuyển đến WorkQueue.
  8. Khi đó Consumer có thể nhận lại Message từ WorkQueue và xử lý lại.

Hãy xem code implement:


package com.gpcoder.deadletterexchange;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.time.LocalDateTime;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;

public class App {

    // Exchange
    private static final String WORK_EXCHANGE_NAME = "GPcoder.WorkExchange";
    private static final String RETRY_EXCHANGE_NAME = "GPcoder.RetryExchange";

    // Queue
    private static final String WORK_QUEUE_NAME = "WorkQueue";
    private static final String RETRY_QUEUE_NAME = "RetryQueue";

    private static final int RETRY_DELAY = 300; // in ms

    private static Channel channel;
    private static int RETRY_COUNT = 0;

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();

        channel = connection.createChannel();

        // Create the WorkQueue
        channel.exchangeDeclare(WORK_EXCHANGE_NAME, BuiltinExchangeType.DIRECT, true);
        channel.queueDeclare(WORK_QUEUE_NAME, true, false, false, null);
        channel.queueBind(WORK_QUEUE_NAME, WORK_EXCHANGE_NAME, "", null);

        // Create the RetryQueue
        Map<String, Object> arguments = new HashMap<>();
        arguments.put("x-dead-letter-exchange", WORK_EXCHANGE_NAME);
        arguments.put("x-message-ttl", RETRY_DELAY);
        channel.exchangeDeclare(RETRY_EXCHANGE_NAME, BuiltinExchangeType.DIRECT, true);
        channel.queueDeclare(RETRY_QUEUE_NAME, true, false, false, arguments);
        channel.queueBind(RETRY_QUEUE_NAME, RETRY_EXCHANGE_NAME, "", null);

        // basicPublish - ( exchange, routingKey, basicProperties, body)
        String message = "GPCoder Message";
        System.out.println("[" + LocalDateTime.now() + "] [Work] [Send]: " + message);
        channel.basicPublish(WORK_EXCHANGE_NAME, "", null, message.getBytes());

        consumer(WORK_QUEUE_NAME);
    }

    private static void consumer(String queueName) throws IOException {
        // basicConsume - ( queue, autoAck, deliverCallback, cancelCallback)
        DeliverCallback deliverCallback = (consumerTag, message) -> {
            String content = new String(message.getBody());
            System.out.println("[" + LocalDateTime.now() + "] [Received] [" + queueName + "]: " + content);
            System.out.println("");
            if (RETRY_COUNT < 5) { publishToRetryExchange(content); RETRY_COUNT++; } else { RETRY_COUNT = 0; } }; CancelCallback cancelCallback = consumerTag -> System.out.println(consumerTag);
        channel.basicConsume(queueName, true, deliverCallback, cancelCallback);
    }

    // Publish to RetryQueue on failure
    private static void publishToRetryExchange(String message) throws IOException {
        System.out.println("[" + LocalDateTime.now() + "] [Retry" + RETRY_COUNT + "] [Re-Publish]: " + message);
        channel.basicPublish(RETRY_EXCHANGE_NAME, "", null, message.getBytes());
    }
}


Output chương trình:


[2020-05-03T22:57:03.614] [Work] [Send]: GPCoder Message
[2020-05-03T22:57:03.620] [Received] [WorkQueue]: GPCoder Message

[2020-05-03T22:57:03.620] [Retry0] [Re-Publish]: GPCoder Message
[2020-05-03T22:57:03.922] [Received] [WorkQueue]: GPCoder Message

[2020-05-03T22:57:03.922] [Retry1] [Re-Publish]: GPCoder Message
[2020-05-03T22:57:04.224] [Received] [WorkQueue]: GPCoder Message

[2020-05-03T22:57:04.224] [Retry2] [Re-Publish]: GPCoder Message
[2020-05-03T22:57:04.526] [Received] [WorkQueue]: GPCoder Message

[2020-05-03T22:57:04.526] [Retry3] [Re-Publish]: GPCoder Message
[2020-05-03T22:57:04.828] [Received] [WorkQueue]: GPCoder Message

[2020-05-03T22:57:04.828] [Retry4] [Re-Publish]: GPCoder Message
[2020-05-03T22:57:05.131] [Received] [WorkQueue]: GPCoder Message

Như bạn thấy, Message được tự động chuyển từ WorkQueue sang RetryQueue sau mỗi 300 ms nhờ vào Dead Letter Message. Đây là ví dụ cho trường hợp một Message hết hạn (expire) do Time to live (TTL).

Tương tự các bạn có thể set agruments là agruments.put(“x-max-length”, 10) để test trường hợp số lượng Message trong Queue vượt quá giới hạn chiều dài hàng đợi (length limit).

Tài liệu tham khảo:

  • https://www.rabbitmq.com/dlx.html
  • https://www.rabbitmq.com/ttl.html
  • https://www.rabbitmq.com/maxlength.html
5.0
14
Nếu bạn thấy hay thì hãy chia sẻ bài viết cho mọi người nhé! Và Donate tác giả

Shares

Chuyên mục: Message Queue Được gắn thẻ: Message Queue, RabbitMQ

Sử dụng Alternate Exchange trong RabbitMQ
Sử dụng publisher confirm trong RabbitMQ

Có thể bạn muốn xem:

  • Sử dụng Alternate Exchange trong RabbitMQ (10/06/2020)
  • Sử dụng publisher confirm trong RabbitMQ (16/06/2020)
  • Giới thiệu JMS – Java Message Services (30/04/2020)
  • Kết nối RabbitMQ sử dụng Web STOMP Plugin (19/06/2020)
  • Giới thiệu RabbitMQ Management Interface (17/05/2020)

Bình luận

bình luận

Tìm kiếm

Bài viết mới

  • Clean code 13/01/2024
  • Giới thiệu CloudAMQP – Một RabbitMQ server trên Cloud 02/10/2020
  • Kết nối RabbitMQ sử dụng Web STOMP Plugin 19/06/2020
  • Sử dụng publisher confirm trong RabbitMQ 16/06/2020
  • Sử dụng Dead Letter Exchange trong RabbitMQ 13/06/2020

Xem nhiều

  • Hướng dẫn Java Design Pattern – Factory Method (98427 lượt xem)
  • Hướng dẫn Java Design Pattern – Singleton (98099 lượt xem)
  • Giới thiệu Design Patterns (88460 lượt xem)
  • Lập trình đa luồng trong Java (Java Multi-threading) (86907 lượt xem)
  • Giới thiệu về Stream API trong Java 8 (84245 lượt xem)

Nội dung bài viết

  • 1 Dead Letter Exchange trong RabbitMQ
  • 2 Ví dụ sử dụng Dead Letter Exchange trong RabbitMQ

Lưu trữ

Thẻ đánh dấu

Annotation Authentication Basic Java Behavior Pattern Collection Creational Design Pattern Cấu trúc điều khiển Database Dependency Injection Design pattern Eclipse Exception Executor Service Google Guice Gson Hibernate How to Interceptor IO Jackson Java 8 Java Core JDBC JDK Jersey JMS JPA json JUnit JWT Message Queue Mockito Multithreading OOP PowerMockito RabbitMQ Reflection Report REST SOAP Structuaral Pattern Swagger Thread Pool Unit Test Webservice

Liên kết

  • Clean Code
  • JavaTpoint
  • Refactoring Guru
  • Source Making
  • TutorialsPoint
  • W3Schools Online Web Tutorials

Giới thiệu

GP Coder là trang web cá nhân, được thành lập với mục đích lưu trữ, chia sẽ kiến thức đã học và làm việc của tôi. Các bài viết trên trang này chủ yếu về ngôn ngữ Java và các công nghệ có liên quan đến Java như: Spring, JSF, Web Services, Unit Test, Hibernate, SQL, ...
Hi vọng góp được chút ít công sức cho sự phát triển cộng đồng Coder Việt.

Donate tác giả

Tìm kiếm các bài viết của GP Coder với Google Search

Liên hệ

Các bạn có thể liên hệ với tôi thông qua:
  • Trang liên hệ
  • Linkedin: gpcoder
  • Email: contact@gpcoder.com
  • Skype: ptgiang56it

Follow me

Copyright 2025 © GP Coder · All Rights Reserved · Giới thiệu · Chính sách · Điều khoản · Liên hệ ·

Share

Blogger
Delicious
Digg
Email
Facebook
Facebook messenger
Flipboard
Google
Hacker News
Line
LinkedIn
Mastodon
Mix
Odnoklassniki
PDF
Pinterest
Pocket
Print
Reddit
Renren
Short link
SMS
Skype
Telegram
Tumblr
Twitter
VKontakte
wechat
Weibo
WhatsApp
X
Xing
Yahoo! Mail

Copy short link

Copy link