GP Coder

Trang chia sẻ kiến thức lập trình Java

  • Java Core
    • Basic Java
    • OOP
    • Exception Handling
    • Multi-Thread
    • Java I/O
    • Networking
    • Reflection
    • Collection
    • Java 8
  • Design pattern
    • Creational Pattern
    • Structuaral Pattern
    • Behavior Pattern
  • Web Service
    • SOAP
    • REST
  • JPA
  • Java library
    • Report
    • Json
    • Unit Test
  • Message Queue
    • ActiveMQ
    • RabbitMQ
  • All
Trang chủ Message Queue Sử dụng Alternate Exchange trong RabbitMQ

Sử dụng Alternate Exchange trong RabbitMQ

Đăng vào 10/06/2020 . Được đăng bởi GP Coder . 4601 Lượt xem . Toàn màn hình

Khi một Message đến Exchange, nếu không tìm thấy Queue nào phù hợp cho Message, Message sẽ tự động bị hủy. RabbitMQ cung cấp một tiện ích mở rộng AMQP được gọi là Alternate Exchange, để collect các Message không thể gửi được trước khi chúng bị huỷ. Chúng ta sẽ biết được cách làm việc và cài đặt của Alternate Exchange trong bài viết này.

Nội dung

  • 1 Flow của một Message trong Alternate Exchange
  • 2 Ví dụ binding Exchange to Exchange trong RabbitMQ

Flow của một Message trong Alternate Exchange

Alternate Exchange được định nghĩa để collect các Message không thể gửi được (rejected/ discarded/ unrouted) trước khi chúng bị huỷ.

Bất kỳ 4 loại Exchange: Direct, Fanout, Topic, Headers có thể được chỉ định như một Alternate Exchange cho một Exchange khác thuộc bất kỳ loại nào. Tuy nhiên, ta nên sử dụng Fanout Exchange như một Alternate Exchange vì nó chuyển tiếp tin nhắn vô điều kiện.

Để chỉ định một Alternate Exchange cho một Exchange GPCoder.AltTopicExchange, chúng ta chỉ cần thêm arguments: alternate-exchange=”GPCoder.AltFanoutExchange” cho GPCoder.AltTopicExchange. Khi đó GPCoder.AltFanoutExchange trở thành một Alternate Exchange cho GPCoder.AltTopicExchange.

Flow của một Message trong Alternate Exchange:

  • Một Producer publish một Message đến source Exchange với một routing key dựa trên loại của Exchange. Trong trường hợp này là GPCoder.AltTopicExchange.
  • Một Fanout Exchange (GPCoder.AltFanoutExchange), được chỉ định là một AlternateExchange cho GPCoder.AltTopicExchange.
  • Nếu một Message có routing key match với bất kỳ routing key pattern nào mà Queue đã binding với GPCoder.AltTopicExchange, thì Message sẽ được chuyển đến Queue match đó.
  • Nếu không match với bất kỳ routing key pattern nào, khi đó Message sẽ bị reject.
  • Theo mặc định của RabbitMQ, một Message bị reject sẽ bị huỷ. Trong trường hợp chúng ta có Alternate Exchange, nó sẽ nhận các Message bị reject và chuyển đến Queue.
  • Cuối cùng, Consumer có thể binding đến Queue của Alternate Exchange để xử lý.

Ví dụ binding Exchange to Exchange trong RabbitMQ

Một số class của chương trình:

  • ConnectionManager : hỗ trợ tạo Connection đến RabbitMQ.
  • ExchangeChannel :  class util hỗ trợ tạo Echange, Queue, binding Queue đến Exchange, binding Exchange đến Exchange, publish/ subscribe message, …
  • Constant : định nghĩa constant chứa các thông tin về tên Exchange, Queue.
  • AlternateExchangeProducer : để gửi Message đến GPCoder.AltFanoutExchange.
  • TopicExchangeProducer : để gửi Message đến GPCoderTopicExchange.
  • AlternateExchangeConsumer : để nhận Message từ Queue được binding đến GPCoder.AltFanoutExchange.
  • TopicExchangeConsumer : để nhận Message từ Queue được binding đến GPCoder.AltTopicExchange.
  • App: giả lập việc gửi nhận Message thông qua Topic Exchange của RabbitMQ.

ConnectionManager.java


package com.gpcoder.alternateexchange;

import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class ConnectionManager {

    private ConnectionManager() {
        super();
    }

    public static Connection createConnection() throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        return factory.newConnection();
    }
}

ExchangeChannel.java


package com.gpcoder.alternateexchange;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;

public class ExchangeChannel {

    private String exchangeName;
    private Channel channel;
    private Connection connection;

    public ExchangeChannel(Connection connection, String exchangeName) throws IOException {
        this.exchangeName = exchangeName;
        this.connection = connection;
        this.channel = connection.createChannel();
    }

    public void declareExchange(BuiltinExchangeType exchangeType) throws IOException {
        // exchangeDeclare( exchange, builtinExchangeType, durable)
        channel.exchangeDeclare(exchangeName, exchangeType, true);
    }

    public void declareExchangeWithAlternateExchagne(BuiltinExchangeType exchangeType, String alternateExchangeName) throws IOException {
        // Declare the topic exchange and set an alternate-exchange
        // exchangeDeclare( exchange, builtinExchangeType, durable, autoDelete, arguments)
        Map<String, Object> arguments = new HashMap<>();
        arguments.put("alternate-exchange", alternateExchangeName);
        channel.exchangeDeclare(exchangeName, exchangeType, true, false, arguments);
    }

    public void declareQueues(String ...queueNames) throws IOException {
        for (String queueName : queueNames) {
            // queueDeclare  - (queueName, durable, exclusive, autoDelete, arguments)
            channel.queueDeclare(queueName, true, false, false, null);
        }
    }

    public void performQueueBinding(String queueName, String routingKey) throws IOException {
        // Create bindings - (queue, exchange, routingKey)
        channel.queueBind(queueName, exchangeName, routingKey);
    }

    public void subscribeMessage(String queueName) throws IOException {
        // basicConsume - ( queue, autoAck, deliverCallback, cancelCallback)
        channel.basicConsume(queueName, true, ((consumerTag, message) -> {
            System.out.println("[Received] [" + queueName + "]: " + consumerTag);
            System.out.println("[Received] [" + queueName + "]: " + new String(message.getBody()));
        }), consumerTag -> {
            System.out.println(consumerTag);
        });
    }

    public void publishMessage(String message, String routingKey) throws IOException {

        // basicPublish - ( exchange, routingKey, basicProperties, body)
        System.out.println("[Send] [" + routingKey + "]: " + message);
        channel.basicPublish(exchangeName, routingKey, null, message.getBytes());
    }
}

Constant.java


package com.gpcoder.alternateexchange;

public final class Constant {

    // Exchange

    public static final String TOPIC_EXCHANGE_NAME = "GPCoder.AltTopicExchange";

    public static final String ALTERNATE_EXCHANGE_NAME = "GPCoder.AltFanoutExchange";

    // Queue

    public static final String JAVA_QUEUE_NAME = "QJava";

    public static final String ALL_QUEUE_NAME = "QAll";

    public static final String UNKNOWN_QUEUE_NAME = "QUnknown";

    // Routing key pattern

    public static final String JAVA_ROUTING_KEY = "java.*.gpcoder.com";

    public static final String GPCODER_ROUTING_KEY = "#.gpcoder.com";

    // Message key

    public static final String JAVA_MSG_KEY = "java.gpcoder.com";

    private Constant() {
        super();
    }
}


AlternateExchangeProducer.java


package com.gpcoder.alternateexchange;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Connection;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import static com.gpcoder.alternateexchange.Constant.ALTERNATE_EXCHANGE_NAME;
import static com.gpcoder.alternateexchange.Constant.UNKNOWN_QUEUE_NAME;

public class AlternateExchangeProducer {

    private ExchangeChannel channel;

    public void start() throws IOException, TimeoutException {
        // Create connection
        Connection connection = ConnectionManager.createConnection();

        // Create channel
        channel = new ExchangeChannel(connection, ALTERNATE_EXCHANGE_NAME);

        // Create fanout exchange
        channel.declareExchange(BuiltinExchangeType.FANOUT);

        // Create queues
        channel.declareQueues(UNKNOWN_QUEUE_NAME);

        channel.performQueueBinding(UNKNOWN_QUEUE_NAME, "");
    }
}


TopicExchangeProducer.java


package com.gpcoder.alternateexchange;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Connection;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import static com.gpcoder.alternateexchange.Constant.*;

public class TopicExchangeProducer {

    private ExchangeChannel channel;

    public void start() throws IOException, TimeoutException {
        // Create connection
        Connection connection = ConnectionManager.createConnection();

        // Create channel
        channel = new ExchangeChannel(connection, TOPIC_EXCHANGE_NAME);

        // Create topic exchange
        channel.declareExchangeWithAlternativeExchagne(BuiltinExchangeType.TOPIC, ALTERNATIVE_EXCHANGE_NAME);

        // Create queues
        channel.declareQueues(JAVA_QUEUE_NAME, ALL_QUEUE_NAME);

        // Binding queues
        channel.performQueueBinding(JAVA_QUEUE_NAME, JAVA_ROUTING_KEY);
        channel.performQueueBinding(ALL_QUEUE_NAME, GPCODER_ROUTING_KEY);
    }

    public void send(String message, String messageKey) throws IOException {
        // Send message
        channel.publishMessage(message, messageKey);
    }
}


AlternateExchangeConsumer.java


package com.gpcoder.alternateexchange;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Connection;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import static com.gpcoder.alternateexchange.Constant.ALTERNATE_EXCHANGE_NAME;
import static com.gpcoder.alternateexchange.Constant.UNKNOWN_QUEUE_NAME;

public class AlternateExchangeConsumer {

    private ExchangeChannel channel;

    public void start() throws IOException, TimeoutException {
        // Create connection
        Connection connection = ConnectionManager.createConnection();

        // Create channel
        channel = new ExchangeChannel(connection, ALTERNATE_EXCHANGE_NAME);

        // Create fanout exchange
        channel.declareExchange(BuiltinExchangeType.FANOUT);

        // Create queues
        channel.declareQueues(UNKNOWN_QUEUE_NAME);

        channel.performQueueBinding(UNKNOWN_QUEUE_NAME, "");
    }

    public void subscribe() throws IOException {
        // Subscribe message
        channel.subscribeMessage(UNKNOWN_QUEUE_NAME);
    }
}


TopicExchangeConsumer.java


package com.gpcoder.alternateexchange;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Connection;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import static com.gpcoder.alternateexchange.Constant.*;

public class TopicExchangeConsumer {

    private ExchangeChannel channel;

    public void start() throws IOException, TimeoutException {
        // Create connection
        Connection connection = ConnectionManager.createConnection();

        // Create channel
        channel = new ExchangeChannel(connection, TOPIC_EXCHANGE_NAME);

        // Create topic exchange
        channel.declareExchangeWithAlternateExchagne(BuiltinExchangeType.TOPIC, ALTERNATE_EXCHANGE_NAME);

        // Create queues
        channel.declareQueues(JAVA_QUEUE_NAME, ALL_QUEUE_NAME);

        // Binding queues
        channel.performQueueBinding(JAVA_QUEUE_NAME, JAVA_ROUTING_KEY);
        channel.performQueueBinding(ALL_QUEUE_NAME, GPCODER_ROUTING_KEY);
    }

    public void subscribe() throws IOException {
        // Subscribe message
        channel.subscribeMessage(JAVA_QUEUE_NAME);
        channel.subscribeMessage(ALL_QUEUE_NAME);
    }
}


App.java


package com.gpcoder.alternateexchange;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import static com.gpcoder.alternateexchange.Constant.JAVA_MSG_KEY;

public class App {

    public static void main(String[] args) throws IOException, TimeoutException {
        AlternateExchangeProducer producer1 = new AlternateExchangeProducer();
        producer1.start();

        TopicExchangeProducer producer2 = new TopicExchangeProducer();
        producer2.start();

        // Publish some messages
        producer2.send("[1] Head First Design Pattern", JAVA_MSG_KEY);
        producer2.send("[2] Unknown Message", "random-gpcoder");

        AlternateExchangeConsumer consumer1 = new AlternateExchangeConsumer();
        consumer1.start();
        consumer1.subscribe();

        TopicExchangeConsumer consumer2 = new TopicExchangeConsumer();
        consumer2.start();
        consumer2.subscribe();
    }
}


Output chương trình:


[Send] [java.gpcoder.com]: [1] Head First Design Pattern
[Send] [random-gpcoder]: [2] Unknown Message
[Received] [QUnknown]: amq.ctag-qe5VOGnCFLq8_Qu_ajUo_g
[Received] [QUnknown]: [2] Unknown Message
[Received] [QAll]: amq.ctag-Nt0tVcCjOXEr-BTF20roCw
[Received] [QAll]: [1] Head First Design Pattern

Như bạn thấy, Message thứ 2 không match với bất kỳ routing key nào, nên được chuyển xuống Queue QUnknown của Alternate Exchange (GPCoder.AltFanoutExchange).

Tài liệu tham khảo:

  • https://www.rabbitmq.com/ae.html
5.0
09
Nếu bạn thấy hay thì hãy chia sẻ bài viết cho mọi người nhé! Và Donate tác giả

Shares

Chuyên mục: Message Queue Được gắn thẻ: Message Queue, RabbitMQ

Sử dụng binding Exchange to Exchange trong RabbitMQ
Sử dụng Dead Letter Exchange trong RabbitMQ

Có thể bạn muốn xem:

  • Giới thiệu JMS – Java Message Services (30/04/2020)
  • Giới thiệu CloudAMQP – Một RabbitMQ server trên Cloud (02/10/2020)
  • Sử dụng binding Exchange to Exchange trong RabbitMQ (07/06/2020)
  • Kết nối JMS Client với ActiveMQ (07/05/2020)
  • Cài đặt ActiveMQ (04/05/2020)

Bình luận

bình luận

Tìm kiếm

Bài viết mới

  • Clean code 13/01/2024
  • Giới thiệu CloudAMQP – Một RabbitMQ server trên Cloud 02/10/2020
  • Kết nối RabbitMQ sử dụng Web STOMP Plugin 19/06/2020
  • Sử dụng publisher confirm trong RabbitMQ 16/06/2020
  • Sử dụng Dead Letter Exchange trong RabbitMQ 13/06/2020

Xem nhiều

  • Hướng dẫn Java Design Pattern – Factory Method (98059 lượt xem)
  • Hướng dẫn Java Design Pattern – Singleton (97700 lượt xem)
  • Giới thiệu Design Patterns (87764 lượt xem)
  • Lập trình đa luồng trong Java (Java Multi-threading) (86434 lượt xem)
  • Giới thiệu về Stream API trong Java 8 (83839 lượt xem)

Nội dung bài viết

  • 1 Flow của một Message trong Alternate Exchange
  • 2 Ví dụ binding Exchange to Exchange trong RabbitMQ

Lưu trữ

Thẻ đánh dấu

Annotation Authentication Basic Java Behavior Pattern Collection Creational Design Pattern Cấu trúc điều khiển Database Dependency Injection Design pattern Eclipse Exception Executor Service Google Guice Gson Hibernate How to Interceptor IO Jackson Java 8 Java Core JDBC JDK Jersey JMS JPA json JUnit JWT Message Queue Mockito Multithreading OOP PowerMockito RabbitMQ Reflection Report REST SOAP Structuaral Pattern Swagger Thread Pool Unit Test Webservice

Liên kết

  • Clean Code
  • JavaTpoint
  • Refactoring Guru
  • Source Making
  • TutorialsPoint
  • W3Schools Online Web Tutorials

Giới thiệu

GP Coder là trang web cá nhân, được thành lập với mục đích lưu trữ, chia sẽ kiến thức đã học và làm việc của tôi. Các bài viết trên trang này chủ yếu về ngôn ngữ Java và các công nghệ có liên quan đến Java như: Spring, JSF, Web Services, Unit Test, Hibernate, SQL, ...
Hi vọng góp được chút ít công sức cho sự phát triển cộng đồng Coder Việt.

Donate tác giả

Tìm kiếm các bài viết của GP Coder với Google Search

Liên hệ

Các bạn có thể liên hệ với tôi thông qua:
  • Trang liên hệ
  • Linkedin: gpcoder
  • Email: contact@gpcoder.com
  • Skype: ptgiang56it

Follow me

Copyright 2025 © GP Coder · All Rights Reserved · Giới thiệu · Chính sách · Điều khoản · Liên hệ ·

Share

Blogger
Delicious
Digg
Email
Facebook
Facebook messenger
Flipboard
Google
Hacker News
Line
LinkedIn
Mastodon
Mix
Odnoklassniki
PDF
Pinterest
Pocket
Print
Reddit
Renren
Short link
SMS
Skype
Telegram
Tumblr
Twitter
VKontakte
wechat
Weibo
WhatsApp
X
Xing
Yahoo! Mail

Copy short link

Copy link