Publisher confirms là một extension của RabbitMQ để thực hiện publish Message đáng tin cậy. Khi publisher confirms được bật trên một Channel, các Message mà Producer publish phải được xác nhận bởi Broker là đã nhận thành công/ thất bại. Từ đó chúng ta có thể ghi log, thông báo lỗi và / hoặc retry gửi tin nhắn. Trong bài viết này tôi sẽ hướng dẫn bạn sử dụng tính năng này.
Nội dung
Enabling Publisher Confirms trên một Channel
Publishers confirms không được enable theo mặc định. Để enable chúng ta gọi phương thức confirmSelect().
Channel channel = connection.createChannel(); channel.confirmSelect();
Phương thức này phải được gọi trên mọi Channel mà ta muốn sử dụng, chỉ nên được kích hoạt một lần cho một Channel, không phải cho mọi Message được publish.
Để biết một Message đã được publisher confirm hay chưa, chúng ta đăng ký một callback để được notify về kết quả publish:
Channel channel = connection.createChannel(); channel.confirmSelect(); channel.addConfirmListener((sequenceNumber, multiple) -> { // code when message is confirmed }, (sequenceNumber, multiple) -> { // code when message is nack-ed });
Có 2 callback:
- Một cho Message được xác nhậ.
- Một cho tin nhắn nack-ed (tin nhắn có thể được coi là bị mất bởi boker).
Mỗi callback có 2 tham số:
- sequence number (số thứ tự): một số xác định Message được xác nhận hoặc nack-ed. Nó tương ứng với số Message được publish.
- multiple: đây là một giá trị boolean. Nếu false, chỉ có một Message được xác nhận / nack-ed, nếu true, tất cả các Message có số thứ tự <= được xác nhận / nack-ed.
Chúng ta có thể lấy một sequence number của một Message trước khi nó được publish thông qua phương thức sau:
long sequenceNumber = channel.getNextPublishSeqNo());
Ví dụ Publish và Confirm từng Message
Trong ví dụ này, tôi sẽ tạo một Direct Exchange, 1 channel và enable tính năng Publisher Confirm trên Channel này.
package com.gpcoder.publisherconfirm; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; public class SinglePublisherConfirm { private static final String EXCHANGE_NAME = "PublishingMessage.DirectExchange"; private static final String QUEUE_NAME = "PublishingMessage.DirectQueue1"; private static final String ROUTING_KEY = "batchMessage"; private static final int NUM_OF_MESSAGE = 6; public static void main(String[] args) throws IOException, TimeoutException, InterruptedException { // Create connection ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); // Create channel Channel channel = connection.createChannel(); // Create direct exchange - exchange, builtinExchangeType, durable channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT, true); // Create queue - (queueName, durable, exclusive, autoDelete, arguments) channel.queueDeclare(QUEUE_NAME, true, false, false, null); // Bind queue to exchange - (queue, exchange, routingKey) channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY); // Enabling Publisher Confirms on a Channel AMQP.Confirm.SelectOk confirmed = channel.confirmSelect(); System.out.println("Enabled published confirm: " + confirmed); // Handling Publisher Confirms Asynchronously channel.addConfirmListener((sequenceNumber, multiple) -> { // code when message is confirmed System.out.println("[Confirmed - multiple] " + multiple); System.out.println("[Confirmed - sequenceNumber] " + sequenceNumber); }, (sequenceNumber, multiple) -> { // code when message is nack-ed // Message was lost, we just print the info for debug; // otherwise, this case should be handled differently System.out.println("Not-Acknowledging for message with id " + sequenceNumber); }); // Publish messages to the channel and put the ids to the queue for (int i = 1; i <= NUM_OF_MESSAGE; i++) { String message = "Message " + i; System.out.println("[Send] [" + channel.getNextPublishSeqNo() + "] " + message); channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, null, message.getBytes()); channel.waitForConfirmsOrDie(300); // in ms } DeliverCallback deliverCallback = (consumerTag, message) -> { System.out.println("[Received] : " + new String(message.getBody())); }; CancelCallback cancelCallback = consumerTag -> { System.out.println("[Canceled]" + consumerTag); }; // basicConsume - ( queue, autoAck, deliverCallback, cancelCallback) channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback); } }
Output chương trình:
Enabled published confirm: #method<confirm.select-ok>() [Send] [1] Message 1 [Confirmed - multiple] false [Confirmed - sequenceNumber] 1 [Send] [2] Message 2 [Confirmed - multiple] false [Confirmed - sequenceNumber] 2 [Send] [3] Message 3 [Confirmed - multiple] false [Confirmed - sequenceNumber] 3 [Send] [4] Message 4 [Confirmed - multiple] false [Confirmed - sequenceNumber] 4 [Send] [5] Message 5 [Confirmed - multiple] false [Confirmed - sequenceNumber] 5 [Send] [6] Message 6 [Confirmed - multiple] false [Confirmed - sequenceNumber] 6 [Received] : Message 1 [Received] : Message 2 [Received] : Message 3 [Received] : Message 4 [Received] : Message 5 [Received] : Message 6
Như bạn thấy, các Message được xác nhận từng cái một thông qua confirm callback.
Ví dụ Publish và Confirm một Batch các Message
Tương tự như trên, nhưng chúng ta yêu cầu gửi confirm callback theo batch.
package com.gpcoder.publisherconfirm; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; public class BatchPublisherConfirm { private static final String EXCHANGE_NAME = "PublishingMessage.DirectExchange"; private static final String QUEUE_NAME = "PublishingMessage.DirectQueue1"; private static final String ROUTING_KEY = "batchMessage"; private static final int NUM_OF_MESSAGE = 6; private static final int BATCH_SIZE = 3; public static void main(String[] args) throws IOException, TimeoutException, InterruptedException { // Create connection ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); // Create channel Channel channel = connection.createChannel(); // Create direct exchange - exchange, builtinExchangeType, durable channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT, true); // Create queue - (queueName, durable, exclusive, autoDelete, arguments) channel.queueDeclare(QUEUE_NAME, true, false, false, null); // Bind queue to exchange - (queue, exchange, routingKey) channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY); // Enabling Publisher Confirms on a Channel AMQP.Confirm.SelectOk confirmed = channel.confirmSelect(); System.out.println("Enabled published confirm: " + confirmed); // Handling Publisher Confirms Asynchronously channel.addConfirmListener((sequenceNumber, multiple) -> { // code when message is confirmed System.out.println("[Confirmed - multiple] " + multiple); System.out.println("[Confirmed - sequenceNumber] " + sequenceNumber); }, (sequenceNumber, multiple) -> { // code when message is nack-ed // Message was lost, we just print the info for debug; // otherwise, this case should be handled differently System.out.println("Not-Acknowledging for message with id " + sequenceNumber); }); // Publish messages to the channel and put the ids to the queue for (int i = 1; i <= NUM_OF_MESSAGE; i++) { String message = "Message " + i; System.out.println("[Send] [" + channel.getNextPublishSeqNo() + "] " + message); channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, null, message.getBytes()); if (i % BATCH_SIZE == 0) { channel.waitForConfirmsOrDie(300); // in ms } } DeliverCallback deliverCallback = (consumerTag, message) -> { System.out.println("[Received] : " + new String(message.getBody())); }; CancelCallback cancelCallback = consumerTag -> { System.out.println("[Canceled]" + consumerTag); }; // basicConsume - ( queue, autoAck, deliverCallback, cancelCallback) channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback); } }
Output chương trình:
Enabled published confirm: #method<confirm.select-ok>() [Send] [1] Message 1 [Send] [2] Message 2 [Send] [3] Message 3 [Confirmed - multiple] true [Confirmed - sequenceNumber] 2 [Confirmed - multiple] false [Confirmed - sequenceNumber] 3 [Send] [4] Message 4 [Send] [5] Message 5 [Send] [6] Message 6 [Confirmed - multiple] false [Confirmed - sequenceNumber] 4 [Confirmed - multiple] true [Confirmed - sequenceNumber] 6 [Received] : Message 1 [Received] : Message 2 [Received] : Message 3 [Received] : Message 4 [Received] : Message 5 [Received] : Message 6
Như bạn thấy, trường hợp batch thì Message được xác nhận một lần có thể là 1 hay nhiều message.
- [Confirmed – multiple] true và [Confirmed – sequenceNumber] 2 : nghĩa là đã xác nhận ok cho Message có số thứ tự <= 2.
- [Confirmed – multiple] false và [Confirmed – sequenceNumber] 3 : nghĩa là đã xác nhận ok cho 1 Message có số thứ tự là 3.
- [Confirmed – multiple] false và [Confirmed – sequenceNumber] 4 : nghĩa là đã xác nhận ok cho 1 Message có số thứ tự là 4.
- [Confirmed – multiple] true và [Confirmed – sequenceNumber] 6 : nghĩa là đã xác nhận ok cho Message có số thứ tự <= 6.
Tài liệu tham khảo: