GP Coder

Trang chia sẻ kiến thức lập trình Java

  • Java Core
    • Basic Java
    • OOP
    • Exception Handling
    • Multi-Thread
    • Java I/O
    • Networking
    • Reflection
    • Collection
    • Java 8
  • Design pattern
    • Creational Pattern
    • Structuaral Pattern
    • Behavior Pattern
  • Web Service
    • SOAP
    • REST
  • JPA
  • Java library
    • Report
    • Json
    • Unit Test
  • Message Queue
    • ActiveMQ
    • RabbitMQ
  • All
Trang chủ Message Queue Sử dụng publisher confirm trong RabbitMQ

Sử dụng publisher confirm trong RabbitMQ

Đăng vào 16/06/2020 Được đăng bởi GP Coder 2471 Lượt xem

Publisher confirms là một extension của RabbitMQ để thực hiện publish Message đáng tin cậy. Khi publisher confirms được bật trên một Channel, các Message mà Producer  publish phải được xác nhận bởi Broker là đã nhận thành công/ thất bại. Từ đó chúng ta có thể ghi log, thông báo lỗi và / hoặc retry gửi tin nhắn. Trong bài viết này tôi sẽ hướng dẫn bạn sử dụng tính năng này.

Nội dung

  • 1 Enabling Publisher Confirms trên một Channel
  • 2 Ví dụ Publish và Confirm từng Message
  • 3 Ví dụ Publish và Confirm một Batch các Message

Enabling Publisher Confirms trên một Channel

Publishers confirms không được enable theo mặc định. Để enable chúng ta gọi phương thức confirmSelect().


Channel channel = connection.createChannel();
channel.confirmSelect();

Phương thức này phải được gọi trên mọi Channel mà ta muốn sử dụng, chỉ nên được kích hoạt một lần cho một Channel, không phải cho mọi Message được publish.

Để biết một Message đã được publisher confirm hay chưa, chúng ta đăng ký một callback để được notify về kết quả publish:


Channel channel = connection.createChannel();
channel.confirmSelect();
channel.addConfirmListener((sequenceNumber, multiple) -> {
    // code when message is confirmed
}, (sequenceNumber, multiple) -> {
    // code when message is nack-ed
});

Có 2 callback:

  • Một cho Message được xác nhậ.
  • Một cho tin nhắn nack-ed (tin nhắn có thể được coi là bị mất bởi boker).

Mỗi callback có 2 tham số:

  • sequence number (số thứ tự): một số xác định Message được xác nhận hoặc nack-ed. Nó tương ứng với số Message được publish.
  • multiple: đây là một giá trị boolean. Nếu false, chỉ có một Message được xác nhận / nack-ed, nếu true, tất cả các Message có số thứ tự <= được xác nhận / nack-ed.

Chúng ta có thể lấy một sequence number của một Message trước khi nó được publish thông qua phương thức sau:


long sequenceNumber = channel.getNextPublishSeqNo());

Ví dụ Publish và Confirm từng Message

Trong ví dụ này, tôi sẽ tạo một Direct Exchange, 1 channel và enable tính năng Publisher Confirm trên Channel này.


package com.gpcoder.publisherconfirm;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class SinglePublisherConfirm {

    private static final String EXCHANGE_NAME = "PublishingMessage.DirectExchange";
    private static final String QUEUE_NAME = "PublishingMessage.DirectQueue1";
    private static final String ROUTING_KEY = "batchMessage";
    private static final int NUM_OF_MESSAGE = 6;

    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        // Create connection
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();

        // Create channel
        Channel channel = connection.createChannel();

        // Create direct exchange - exchange, builtinExchangeType, durable
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT, true);

        // Create queue - (queueName, durable, exclusive, autoDelete, arguments)
        channel.queueDeclare(QUEUE_NAME, true, false, false, null);

        // Bind queue to exchange - (queue, exchange, routingKey)
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);

        // Enabling Publisher Confirms on a Channel
        AMQP.Confirm.SelectOk confirmed = channel.confirmSelect();
        System.out.println("Enabled published confirm: " + confirmed);

        // Handling Publisher Confirms Asynchronously
        channel.addConfirmListener((sequenceNumber, multiple) -> {
            // code when message is confirmed
            System.out.println("[Confirmed - multiple] " + multiple);
            System.out.println("[Confirmed - sequenceNumber] " + sequenceNumber);
        }, (sequenceNumber, multiple) -> {
            // code when message is nack-ed
            // Message was lost, we just print the info for debug;
            // otherwise, this case should be handled differently
            System.out.println("Not-Acknowledging for message with id " + sequenceNumber);
        });

        // Publish messages to the channel and put the ids to the queue
        for (int i = 1; i <= NUM_OF_MESSAGE; i++) { 
            String message = "Message " + i; 
            System.out.println("[Send] [" + channel.getNextPublishSeqNo() + "] " + message); 
            channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, null, message.getBytes()); 
            channel.waitForConfirmsOrDie(300); // in ms 
        } 

        DeliverCallback deliverCallback = (consumerTag, message) -> {
            System.out.println("[Received] : " + new String(message.getBody()));
        };

        CancelCallback cancelCallback = consumerTag -> {
            System.out.println("[Canceled]" + consumerTag);
        };

        // basicConsume - ( queue, autoAck, deliverCallback, cancelCallback)
        channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);
    }
}


Output chương trình:


Enabled published confirm: #method<confirm.select-ok>()
[Send] [1] Message 1
[Confirmed - multiple] false
[Confirmed - sequenceNumber] 1
[Send] [2] Message 2
[Confirmed - multiple] false
[Confirmed - sequenceNumber] 2
[Send] [3] Message 3
[Confirmed - multiple] false
[Confirmed - sequenceNumber] 3
[Send] [4] Message 4
[Confirmed - multiple] false
[Confirmed - sequenceNumber] 4
[Send] [5] Message 5
[Confirmed - multiple] false
[Confirmed - sequenceNumber] 5
[Send] [6] Message 6
[Confirmed - multiple] false
[Confirmed - sequenceNumber] 6

[Received] : Message 1
[Received] : Message 2
[Received] : Message 3
[Received] : Message 4
[Received] : Message 5
[Received] : Message 6

Như bạn thấy, các Message được xác nhận từng cái một thông qua confirm callback.

Ví dụ Publish và Confirm một Batch các Message

Tương tự như trên, nhưng chúng ta yêu cầu gửi confirm callback theo batch.


package com.gpcoder.publisherconfirm;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class BatchPublisherConfirm {

    private static final String EXCHANGE_NAME = "PublishingMessage.DirectExchange";
    private static final String QUEUE_NAME = "PublishingMessage.DirectQueue1";
    private static final String ROUTING_KEY = "batchMessage";
    private static final int NUM_OF_MESSAGE = 6;
    private static final int BATCH_SIZE = 3;

    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        // Create connection
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();

        // Create channel
        Channel channel = connection.createChannel();

        // Create direct exchange - exchange, builtinExchangeType, durable
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT, true);

        // Create queue - (queueName, durable, exclusive, autoDelete, arguments)
        channel.queueDeclare(QUEUE_NAME, true, false, false, null);

        // Bind queue to exchange - (queue, exchange, routingKey)
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);

        // Enabling Publisher Confirms on a Channel
        AMQP.Confirm.SelectOk confirmed = channel.confirmSelect();
        System.out.println("Enabled published confirm: " + confirmed);

        // Handling Publisher Confirms Asynchronously
        channel.addConfirmListener((sequenceNumber, multiple) -> {
            // code when message is confirmed
            System.out.println("[Confirmed - multiple] " + multiple);
            System.out.println("[Confirmed - sequenceNumber] " + sequenceNumber);
        }, (sequenceNumber, multiple) -> {
            // code when message is nack-ed
            // Message was lost, we just print the info for debug;
            // otherwise, this case should be handled differently
            System.out.println("Not-Acknowledging for message with id " + sequenceNumber);
        });

        // Publish messages to the channel and put the ids to the queue
        for (int i = 1; i <= NUM_OF_MESSAGE; i++) { 
            String message = "Message " + i; 
            System.out.println("[Send] [" + channel.getNextPublishSeqNo() + "] " + message); 
            channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, null, message.getBytes()); 

            if (i % BATCH_SIZE == 0) { 
                channel.waitForConfirmsOrDie(300); // in ms 
            } 
        } 

        DeliverCallback deliverCallback = (consumerTag, message) -> {
            System.out.println("[Received] : " + new String(message.getBody()));
        };

        CancelCallback cancelCallback = consumerTag -> {
            System.out.println("[Canceled]" + consumerTag);
        };

        // basicConsume - ( queue, autoAck, deliverCallback, cancelCallback)
        channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);
    }
}

Output chương trình:


Enabled published confirm: #method<confirm.select-ok>()
[Send] [1] Message 1
[Send] [2] Message 2
[Send] [3] Message 3
[Confirmed - multiple] true
[Confirmed - sequenceNumber] 2
[Confirmed - multiple] false
[Confirmed - sequenceNumber] 3
[Send] [4] Message 4
[Send] [5] Message 5
[Send] [6] Message 6
[Confirmed - multiple] false
[Confirmed - sequenceNumber] 4
[Confirmed - multiple] true
[Confirmed - sequenceNumber] 6

[Received] : Message 1
[Received] : Message 2
[Received] : Message 3
[Received] : Message 4
[Received] : Message 5
[Received] : Message 6

Như bạn thấy, trường hợp batch thì Message được xác nhận một lần có thể là 1 hay nhiều message.

  • [Confirmed – multiple] true và [Confirmed – sequenceNumber] 2 : nghĩa là đã xác nhận ok cho Message có số thứ tự <= 2.
  • [Confirmed – multiple] false và [Confirmed – sequenceNumber] 3 : nghĩa là đã xác nhận ok cho 1 Message có số thứ tự là 3.
  • [Confirmed – multiple] false và [Confirmed – sequenceNumber] 4 : nghĩa là đã xác nhận ok cho 1 Message có số thứ tự là 4.
  • [Confirmed – multiple] true và [Confirmed – sequenceNumber] 6 : nghĩa là đã xác nhận ok cho Message có số thứ tự <= 6.

Tài liệu tham khảo:

  • https://www.rabbitmq.com/tutorials/tutorial-seven-java.html
5.0
09
Nếu bạn thấy hay thì hãy chia sẻ bài viết cho mọi người nhé!

Shares

Chuyên mục: Message Queue Được gắn thẻ: Message Queue, RabbitMQ

Sử dụng Dead Letter Exchange trong RabbitMQ
Kết nối RabbitMQ sử dụng Web STOMP Plugin

Có thể bạn muốn xem:

  • Giới thiệu CloudAMQP – Một RabbitMQ server trên Cloud (02/10/2020)
  • Work Queues trong RabbitMQ (23/05/2020)
  • Giới thiệu RabbitMQ Management Interface (17/05/2020)
  • Sử dụng Topic Exchange (Publish/Subscribe) trong RabbitMQ (01/06/2020)
  • Sử dụng Direct Exchange trong RabbitMQ (26/05/2020)

Bình luận

bình luận

Tìm kiếm

Bài viết mới

  • Giới thiệu CloudAMQP – Một RabbitMQ server trên Cloud 02/10/2020
  • Kết nối RabbitMQ sử dụng Web STOMP Plugin 19/06/2020
  • Sử dụng publisher confirm trong RabbitMQ 16/06/2020
  • Sử dụng Dead Letter Exchange trong RabbitMQ 13/06/2020
  • Sử dụng Alternate Exchange trong RabbitMQ 10/06/2020

Xem nhiều

  • Hướng dẫn Java Design Pattern – Factory Method (54265 lượt xem)
  • Lập trình đa luồng trong Java (Java Multi-threading) (53067 lượt xem)
  • Hướng dẫn Java Design Pattern – Singleton (52708 lượt xem)
  • Xây dựng ứng dụng Client-Server với Socket trong Java (48767 lượt xem)
  • Giới thiệu Design Patterns (47932 lượt xem)

Nội dung bài viết

  • 1 Enabling Publisher Confirms trên một Channel
  • 2 Ví dụ Publish và Confirm từng Message
  • 3 Ví dụ Publish và Confirm một Batch các Message

Lưu trữ

Thẻ đánh dấu

Annotation Authentication Basic Java Behavior Pattern Collection Creational Design Pattern Cấu trúc điều khiển Database Dependency Injection Design pattern Eclipse Exception Executor Service Google Guice Gson Hibernate How to Interceptor IO Jackson Java 8 Java Core JDBC JDK Jersey JMS JPA json JUnit JWT Message Queue Mockito Multithreading OOP Performance PowerMockito RabbitMQ Reflection Report REST SOAP Structuaral Pattern Thread Pool Unit Test Webservice

Liên kết website

Design Pattern

  • Refactoring Guru
  • Source Making

Lập trình Java

  • JavaTpoint
  • JavaWorld
  • Journaldev
  • TutorialsPoint
  • W3Schools Online Web Tutorials

Giới thiệu

GP Coder là trang web cá nhân, được thành lập với mục đích lưu trữ, chia sẽ kiến thức đã học và làm việc của tôi. Các bài viết trên trang này chủ yếu về ngôn ngữ Java và các công nghệ có liên quan đến Java như: Spring, JSF, Web Services, Unit Test, Hibernate, SQL, ...
Hi vọng góp được chút ít công sức cho sự phát triển cộng đồng Coder Việt.

Tìm kiếm các bài viết của GP Coder với Google Search

Liên hệ

Các bạn có thể liên hệ với tôi thông qua:
  • Trang liên hệ
  • Linkedin: gpcoder
  • Email: contact@gpcoder.com
  • Skype: ptgiang56it

Follow me

Copyright 2022 © GP Coder · All Rights Reserved · Giới thiệu · Chính sách · Điều khoản · Liên hệ ·

sponsored

Share

Blogger
Delicious
Digg
Email
Facebook
Facebook messenger
Google
Hacker News
Line
LinkedIn
Mix
Odnoklassniki
PDF
Pinterest
Pocket
Print
Reddit
Renren
Short link
SMS
Skype
Telegram
Tumblr
Twitter
VKontakte
wechat
Weibo
WhatsApp
Xing
Yahoo! Mail
Powered by WP Socializer

Copy short link

Copy link
Powered by WP Socializer