GP Coder

Trang chia sẻ kiến thức lập trình Java

  • Java Core
    • Basic Java
    • OOP
    • Exception Handling
    • Multi-Thread
    • Java I/O
    • Networking
    • Reflection
    • Collection
    • Java 8
  • Design pattern
    • Creational Pattern
    • Structuaral Pattern
    • Behavior Pattern
  • Web Service
    • SOAP
    • REST
  • JPA
  • Java library
    • Report
    • Json
    • Unit Test
  • Message Queue
    • ActiveMQ
    • RabbitMQ
  • All
Trang chủ Message Queue Sử dụng Topic Exchange (Publish/Subscribe) trong RabbitMQ

Sử dụng Topic Exchange (Publish/Subscribe) trong RabbitMQ

Đăng vào 01/06/2020 . Được đăng bởi GP Coder . 9757 Lượt xem . Toàn màn hình

Trong các bài viết trước, chúng ta đã cùng tìm hiểu về Direct Exchange và Fanout Exchange. Trong bài này, tôi sẽ giới thiệu với các bạn một loại exchange khác là Topic Exchange.

Nội dung

  • 1 Flow của một Message trong Topic Exchange
  • 2 Ví dụ Topic Exchange trong RabbitMQ

Flow của một Message trong Topic Exchange

Topic exchange (amq.topic) định tuyến message tới một hoặc nhiều queue dựa trên sự trùng khớp giữa routing key và pattern. Topic exchange được sử dụng để thực hiện định tuyến thông điệp multicast. Loại Exchange này thường được sử dụng để thực hiện các biến thể của Pub/Sub pattern.

Ví dụ một vài trường hợp sử dụng:

  • Phân phối dữ liệu liên quan đến vị trí địa lý cụ thể.
  • Xử lý tác vụ nền được thực hiện bởi nhiều workers, mỗi công việc có khả năng xử lý các nhóm tác vụ cụ thể.
  • Cập nhật tin tức liên quan đến một category hoặc gắn tag.
  • Điều phối các dịch vụ của các loại khác nhau trong cloud.

Một topic exchange sẽ sử dụng wildcard để gắn routing key với một routing pattern khai báo trong binding. Consumer có thể đăng ký những topic mà nó quan tâm.

Routing Key trong Topic Exchange:

  • Một Routing Key trong Topic Exchange phải bao gồm 0 hoặc nhiều từ phân cách bởi dấu chấm (.).
  • Routing Key trong Topic Exchange còn gọi là Routing Pattern.
  • Routing Pattern tương tự như Regular expression, nhưng chỉ các wildcard *, . và # được sử phép.

Ý nghĩa các wildcard được sử dụng là:

  • * : có nghĩa là chính xác một từ được phép.
  • # : có nghĩa là 0 hoặc nhiều số từ được phép.
  • . : có nghĩa là dấu phân cách từ. Nhiều từ chính được phân tách bằng dấu phân cách dấu chấm.

Ví dụ:

  • java.* : được đăng ký bởi tất cả những key với pattern bắt đầu bằng java và theo sau là chính xác một từ bất kỳ.
    • Những key sau là hợp lệ: java.core, java.gpcoder.
    • Những key sau là không hợp lệ: java, java.core.gpcoder.
  • java.*.gpcoder : được đăng ký bởi tất cả những key với pattern bắt đầu bằng java, theo sau là chính xác một từ bất kỳ và kết thúc là gpcoder.
    • Những key sau là hợp lệ: java.core.gpcoder, java.collection.gpcoder.
    • Những key sau là không hợp lệ: java.gpcoder, java.core.variable.gpcoder.
  • java.# : được đăng ký bởi tất cả các key bắt đầu với java.
    • Những key sau là hợp lệ: java, java.gpcoder, java.core.gpcoder.
    • Những key sau là không hợp lệ: core.java, core-java.com.
  • java.#.gpcoder : được đăng ký bởi tất cả những key với pattern bắt đầu bằng java và kết thúc là gpcoder.
    • Những key sau là hợp lệ: java.gpcoder, java.core.gpcoder, java.collection.map.gpcoder.
    • Những key sau là không hợp lệ: java.gpcoder.com, java.core.gpcoder.com.
  • #.gpcoder.com : được đăng ký bởi tất cả những key với pattern kết thúc là gpcoder.com.
    • Những key sau là hợp lệ: gpcoder.com, java.gpcoder.com, core.java.gpcoder.com.
    • Những key sau là không hợp lệ: java.gpcoder, gpcoder.com.java.

Flow của một Message trong Topic Exchange như sau:

  • Một Queue được tạo và binding tới một Topic Exchange với một routing key pattern (P).
  • Một Producer sẽ tạo một Message với một routing key (K) và publish tới Exchange.
  • Một Message được Exchange chuyển đến Queue nếu Pattern P match với Key K.
  • Consumer đăng ký tới Queue để nhận Message.

Ví dụ Topic Exchange trong RabbitMQ

Trong ví dụ này, tôi tạo một Topic Exchange có tên GPCoderTopicExchange, tạo 2 Queue binding tới Topic Exchange này:

  • QJava : Queue này sẽ nhận tất cả message có Key match với routing key “java.*.gpcoder.com“. Nghĩa là chỉ nhận các message cho một topic Java cụ thể từ gpcoder.com, chẳng hạn: java.core.gpcoder.com, java.collection.gpcoder.com.
  • QAll : Queue này nhận tất cả message có Key match với routing key “#.gpcoder.com“. Nghĩa là nhận tất cả message từ gpcoder.com, chẳng hạn: design-pattern.gpcoder, java.gpcoder.com, creational.design-pattern.gpcoder.com.

Một số class của chương trình:

  • ConnectionManager : hỗ trợ tạo Connection đến RabbitMQ.
  • TopicExchangeChannel :  class util hỗ trợ tạo Echange, Queue, binding Queue đến Exchange, publish/ subscribe message, …
  • Constant : định nghĩa constant chứa các thông tin về tên Exchange, Queue.
  • Producer: để gửi Message đến Exchange.
  • Consumer: để nhận Message từ Queue.
  • App: giả lập việc gửi nhận Message thông qua Topic Exchange của RabbitMQ.

ConnectionManager.java


package com.gpcoder.topicexchange;

import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class ConnectionManager {

    private ConnectionManager() {
        super();
    }

    public static Connection createConnection() throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        return factory.newConnection();
    }
}

TopicExchangeChannel.java


package com.gpcoder.topicexchange;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

import java.io.IOException;

public class TopicExchangeChannel {

    private String exchangeName;
    private Channel channel;
    private Connection connection;

    public TopicExchangeChannel(Connection connection, String exchangeName) throws IOException {
        this.exchangeName = exchangeName;
        this.connection = connection;
        this.channel = connection.createChannel();
    }

    public void declareExchange() throws IOException {
        // exchangeDeclare( exchange, builtinExchangeType, durable)
        channel.exchangeDeclare(exchangeName, BuiltinExchangeType.TOPIC, true);
    }

    public void declareQueues(String ...queueNames) throws IOException {
        for (String queueName : queueNames) {
            // queueDeclare  - (queueName, durable, exclusive, autoDelete, arguments)
            channel.queueDeclare(queueName, true, false, false, null);
        }
    }

    public void performQueueBinding(String queueName, String routingKey) throws IOException {
        // Create bindings - (queue, exchange, routingKey)
        channel.queueBind(queueName, exchangeName, routingKey);
    }

    public void subscribeMessage(String queueName) throws IOException {
        // basicConsume - ( queue, autoAck, deliverCallback, cancelCallback)
        channel.basicConsume(queueName, true, ((consumerTag, message) -> {
            System.out.println("[Received] [" + queueName + "]: " + consumerTag);
            System.out.println("[Received] [" + queueName + "]: " + new String(message.getBody()));
        }), consumerTag -> {
            System.out.println(consumerTag);
        });
    }

    public void publishMessage(String message, String messageKey) throws IOException {
        // basicPublish - ( exchange, routingKey, basicProperties, body)
        System.out.println("[Send] [" + messageKey + "]: " + message);
        channel.basicPublish(exchangeName, messageKey, null, message.getBytes());
    }
}

Constant.java


package com.gpcoder.topicexchange;

public final class Constant {

    // Exchange

    public static final String EXCHANGE_NAME = "GPCoderTopicExchange";

    // Queue

    public static final String JAVA_QUEUE_NAME = "QJava";

    public static final String GENERAL_QUEUE_NAME = "QAll";

    private Constant() {
        super();
    }

    // Routing key pattern

    public static final String JAVA_ROUTING_KEY = "java.*.gpcoder.com";

    public static final String GPCODER_ROUTING_KEY = "#.gpcoder.com";

    // Message key

    public static final String JAVA_CORE_MSG_KEY = "java.core.gpcoder.com";

    public static final String JAVA_MSG_KEY = "java.gpcoder.com";

    public static final String DESIGN_PATTERN_MSG_KEY = "design-pattern.gpcoder.com";

    public static final String NOT_MATCHING_MSG_KEY = "java.collection.gpcoder.com.vn";
}


Producer.java


package com.gpcoder.topicexchange;

import com.rabbitmq.client.Connection;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import static com.gpcoder.topicexchange.Constant.*;

public class Producer {

    private TopicExchangeChannel channel;

    public void start() throws IOException, TimeoutException {
        // Create connection
        Connection connection = ConnectionManager.createConnection();

        // Create channel
        channel = new TopicExchangeChannel(connection, EXCHANGE_NAME);

        // Create topic exchange
        channel.declareExchange();

        // Create queues
        channel.declareQueues(JAVA_QUEUE_NAME, GENERAL_QUEUE_NAME);

        // Binding queues with routing key
        channel.performQueueBinding(JAVA_QUEUE_NAME, JAVA_ROUTING_KEY);
        channel.performQueueBinding(GENERAL_QUEUE_NAME, GPCODER_ROUTING_KEY);
    }

    public void send(String message, String messageKey) throws IOException {
        // Send message
        channel.publishMessage(message, messageKey);
    }
}

Consumer.java


package com.gpcoder.topicexchange;

import com.rabbitmq.client.Connection;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import static com.gpcoder.topicexchange.Constant.*;

public class Consumer {

    private TopicExchangeChannel channel;

    public void start() throws IOException, TimeoutException {
        // Create connection
        Connection connection = ConnectionManager.createConnection();

        // Create channel
        channel = new TopicExchangeChannel(connection, EXCHANGE_NAME);

        // Create topic exchange
        channel.declareExchange();

        // Create queues
        channel.declareQueues(JAVA_QUEUE_NAME, GENERAL_QUEUE_NAME);

        // Binding queues with routing key
        channel.performQueueBinding(JAVA_QUEUE_NAME, JAVA_ROUTING_KEY);
        channel.performQueueBinding(GENERAL_QUEUE_NAME, GPCODER_ROUTING_KEY);
    }

    public void subscribe() throws IOException {
        // Subscribe message
        channel.subscribeMessage(JAVA_QUEUE_NAME);
        channel.subscribeMessage(GENERAL_QUEUE_NAME);
    }

}

App.java


package com.gpcoder.topicexchange;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import static com.gpcoder.topicexchange.Constant.*;

public class App {

    public static void main(String[] args) throws IOException, TimeoutException {
        // Create producers, queues and binding queues to Topic Exchange
        Producer producer = new Producer();
        producer.start();

        // Publish some message
        producer.send("[1] A new Java Core topic is published", JAVA_CORE_MSG_KEY);
        producer.send("[2] A new Java general topic is published", JAVA_MSG_KEY);
        producer.send("[3] A new Design Pattern topic is published", DESIGN_PATTERN_MSG_KEY);
        producer.send("[4] Not matching any routing key", NOT_MATCHING_MSG_KEY);

        // Create consumers, queues and binding queues to Topic Exchange
        Consumer consumer = new Consumer();
        consumer.start();
        consumer.subscribe();
    }
}

Output chương trình:


[Send] [java.core.gpcoder.com]: [1] A new Java Core topic is published
[Send] [java.gpcoder.com]: [2] A new Java general topic is published
[Send] [design-pattern.gpcoder.com]: [3] A new Design Pattern topic is published
[Send] [java.collection.gpcoder.com.vn]: [4] Not matching any routing key
[Received] [QJava]: amq.ctag-LiUyX8m4KIJu0Gb9WlpLdg
[Received] [QJava]: [1] A new Java Core topic is published
[Received] [QAll]: amq.ctag-nmlzYAixFEwB4P0-N6nltw
[Received] [QAll]: [1] A new Java Core topic is published
[Received] [QAll]: amq.ctag-nmlzYAixFEwB4P0-N6nltw
[Received] [QAll]: [2] A new Java general topic is published
[Received] [QAll]: amq.ctag-nmlzYAixFEwB4P0-N6nltw
[Received] [QAll]: [3] A new Design Pattern topic is published

Như bạn thấy:

  • Consumer binding đến Queue QJava chỉ nhận message được gởi từ message key “java.core.gpcoder.com”.
  • Consumer binding đến Queue QAll có thể nhận message gửi từ messag key “java.core.gpcoder.com”, “java.gpcoder.com” và “design-pattern.gpcoder.com”. Không nhận message từ “java.collection.gpcoder.com.vn” do message không phải gởi từ “gpcoder.com”.

Tài liệu tham khảo:

  • https://www.rabbitmq.com/tutorials/tutorial-three-java.html
  • https://www.rabbitmq.com/tutorials/tutorial-four-java.html
  • https://www.rabbitmq.com/tutorials/tutorial-five-java.html
  • https://www.rabbitmq.com/tutorials/amqp-concepts.html
5.0
16
Nếu bạn thấy hay thì hãy chia sẻ bài viết cho mọi người nhé! Và Donate tác giả

Shares

Chuyên mục: Message Queue Được gắn thẻ: Message Queue, RabbitMQ

Sử dụng Fanout Exchange trong RabbitMQ
Sử dụng Headers Exchange trong RabbitMQ

Có thể bạn muốn xem:

  • Sử dụng Direct Exchange trong RabbitMQ (26/05/2020)
  • Kết nối RabbitMQ sử dụng Web STOMP Plugin (19/06/2020)
  • Giới thiệu RabbitMQ (11/05/2020)
  • Giới thiệu CloudAMQP – Một RabbitMQ server trên Cloud (02/10/2020)
  • Sử dụng binding Exchange to Exchange trong RabbitMQ (07/06/2020)

Bình luận

bình luận

Tìm kiếm

Bài viết mới

  • Clean code 13/01/2024
  • Giới thiệu CloudAMQP – Một RabbitMQ server trên Cloud 02/10/2020
  • Kết nối RabbitMQ sử dụng Web STOMP Plugin 19/06/2020
  • Sử dụng publisher confirm trong RabbitMQ 16/06/2020
  • Sử dụng Dead Letter Exchange trong RabbitMQ 13/06/2020

Xem nhiều

  • Hướng dẫn Java Design Pattern – Factory Method (97381 lượt xem)
  • Hướng dẫn Java Design Pattern – Singleton (97029 lượt xem)
  • Giới thiệu Design Patterns (86695 lượt xem)
  • Lập trình đa luồng trong Java (Java Multi-threading) (85528 lượt xem)
  • Giới thiệu về Stream API trong Java 8 (83078 lượt xem)

Nội dung bài viết

  • 1 Flow của một Message trong Topic Exchange
  • 2 Ví dụ Topic Exchange trong RabbitMQ

Lưu trữ

Thẻ đánh dấu

Annotation Authentication Basic Java Behavior Pattern Collection Creational Design Pattern Cấu trúc điều khiển Database Dependency Injection Design pattern Eclipse Exception Executor Service Google Guice Gson Hibernate How to Interceptor IO Jackson Java 8 Java Core JDBC JDK Jersey JMS JPA json JUnit JWT Message Queue Mockito Multithreading OOP PowerMockito RabbitMQ Reflection Report REST SOAP Structuaral Pattern Swagger Thread Pool Unit Test Webservice

Liên kết

  • Clean Code
  • JavaTpoint
  • Refactoring Guru
  • Source Making
  • TutorialsPoint
  • W3Schools Online Web Tutorials

Giới thiệu

GP Coder là trang web cá nhân, được thành lập với mục đích lưu trữ, chia sẽ kiến thức đã học và làm việc của tôi. Các bài viết trên trang này chủ yếu về ngôn ngữ Java và các công nghệ có liên quan đến Java như: Spring, JSF, Web Services, Unit Test, Hibernate, SQL, ...
Hi vọng góp được chút ít công sức cho sự phát triển cộng đồng Coder Việt.

Donate tác giả

Tìm kiếm các bài viết của GP Coder với Google Search

Liên hệ

Các bạn có thể liên hệ với tôi thông qua:
  • Trang liên hệ
  • Linkedin: gpcoder
  • Email: contact@gpcoder.com
  • Skype: ptgiang56it

Follow me

Copyright 2025 © GP Coder · All Rights Reserved · Giới thiệu · Chính sách · Điều khoản · Liên hệ ·

Share

Blogger
Delicious
Digg
Email
Facebook
Facebook messenger
Flipboard
Google
Hacker News
Line
LinkedIn
Mastodon
Mix
Odnoklassniki
PDF
Pinterest
Pocket
Print
Reddit
Renren
Short link
SMS
Skype
Telegram
Tumblr
Twitter
VKontakte
wechat
Weibo
WhatsApp
X
Xing
Yahoo! Mail

Copy short link

Copy link