GP Coder

Trang chia sẻ kiến thức lập trình Java

  • Java Core
    • Basic Java
    • OOP
    • Exception Handling
    • Multi-Thread
    • Java I/O
    • Networking
    • Reflection
    • Collection
    • Java 8
  • Design pattern
    • Creational Pattern
    • Structuaral Pattern
    • Behavior Pattern
  • Web Service
    • SOAP
    • REST
  • JPA
  • Java library
    • Report
    • Json
    • Unit Test
  • Message Queue
    • ActiveMQ
    • RabbitMQ
  • All
Trang chủ Message Queue Sử dụng Direct Exchange trong RabbitMQ

Sử dụng Direct Exchange trong RabbitMQ

Đăng vào 26/05/2020 . Được đăng bởi GP Coder . 7780 Lượt xem . Toàn màn hình

Trong các bài viết trước chúng ta đã tìm hiểu về Default Exchange, cách tạo Work Queue với RabbitMQ. Trong bài này, chúng ta sẽ cùng tìm hiểu về Direct Exchange trong RabbitMQ.

Nội dung

  • 1 Flow của một Message trong Direct Exchange
  • 2 Ví dụ Direct Exchange trong RabbitMQ

Flow của một Message trong Direct Exchange

Direct Exchange (trao đổi trực tiếp – amq.direct) định tuyến message đến Queue dựa vào routing key.

Một Exchange không xác định tên (empty String), đây là loại Default Exchange, một dạng đặc biệt của là Direct Exchange. Default Exchange được liên kết ngầm định với mọi Queue với khóa định tuyến bằng với tên Queue.

Flow của một Message trong Direct Exchange như sau:

  • Một Producer sẽ tạo một Message và publish tới Exchange.
  • Một Queue sẽ binding tới Exchange sử dụng routing key. Chúng ta có thể tạo nhiều Queue và binding tới Exchange, có thể sử dụng cùng routing key, hoặc các routing key khác nhau.
  • Một Message tới Exchange với thông tin routing key. Dựa vào thông tin routing key, message sẽ được chuyển đến một hoặc nhiều Queue đã binding có cùng routing key với routing key của Message.

Chúng ta đã thấy cách hoạt động của Default Exchange ở các bài viết trước. Trong phần tiếp theo của bài viết này, chúng ta sẽ thấy cách hoạt động của Direct Exchange với routing key cụ thể.

Ví dụ Direct Exchange trong RabbitMQ

Trong ví dụ này, tôi tạo một Direct Exchange có tên GPCoderDirectExchange, tạo 3 Queue binding tới Direct Exchange này với 3 routing key:

  • QDeveloper sẽ binding với routing key devGroup.
  • QManager sẽ binding với routing key managerGroup.
  • QGeneral sẽ binding với routing key generalGroup.

Một số class của chương trình:

  • ConnectionManager : hỗ trợ tạo Connection đến RabbitMQ.
  • DirectExchangeChannel :  class util hỗ trợ tạo Echange, Queue, binding Queue đến Exchange, publish/ subscribe message, …
  • Constant : định nghĩa constant chứa các thông tin về tên Exchange, Queue, Routing Key.
  • Producer: để gửi Message đến Exchange.
  • Consumer: để nhận Message từ Queue.
  • App: giả lập việc gửi nhận Message thông qua Direct Exchange của RabbitMQ.

ConnectionManager.java


package com.gpcoder.directexchange;

import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class ConnectionManager {

    private ConnectionManager() {
        super();
    }

    public static Connection createConnection() throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        return factory.newConnection();
    }
}

DirectExchangeChannel.java


package com.gpcoder.directexchange;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class DirectExchangeChannel {

    private String exchangeName;
    private Channel channel;
    private Connection connection;

    public DirectExchangeChannel(Connection connection, String exchangeName) throws IOException {
        this.exchangeName = exchangeName;
        this.connection = connection;
        this.channel = connection.createChannel();
    }

    public void declareExchange() throws IOException {
        // exchangeDeclare( exchange, builtinExchangeType, durable)
        channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT, true);
    }

    public void declareQueues(String ...queueNames) throws IOException {
        for (String queueName : queueNames) {
            // queueDeclare  - (queueName, durable, exclusive, autoDelete, arguments)
            channel.queueDeclare(queueName, true, false, false, null);
        }
    }

    public void performQueueBinding(String queueName, String routingKey) throws IOException {
        // Create bindings - (queue, exchange, routingKey)
        channel.queueBind(queueName, exchangeName, routingKey);
    }

    public void subscribeMessage(String queueName) throws IOException {
        // basicConsume - ( queue, autoAck, deliverCallback, cancelCallback)
        channel.basicConsume(queueName, true, ((consumerTag, message) -> {
            System.out.println("[Received] [" + queueName + "]: " + consumerTag);
            System.out.println("[Received] [" + queueName + "]: " + new String(message.getBody()));
        }), consumerTag -> {
            System.out.println(consumerTag);
        });
    }

    public void publishMessage(String message, String routingKey) throws IOException {
        // basicPublish - ( exchange, routingKey, basicProperties, body)
        System.out.println("[Send] [" + routingKey + "]: " + message);
        channel.basicPublish(exchangeName, routingKey, null, message.getBytes());
    }
}


Constant.java


package com.gpcoder.directexchange;

public final class Constant {

    // Exchange

    public static final String EXCHANGE_NAME = "GPCoderDirectExchange";

    // Routing key

    public static final String DEV_ROUTING_KEY = "devGroup";

    public static final String MANAGER_ROUTING_KEY = "managerGroup";

    public static final String GENERAL_ROUTING_KEY = "generalGroup";

    // Queue

    public static final String DEV_QUEUE_NAME = "QDeveloper";

    public static final String MANAGER_QUEUE_NAME = "QManager";

    public static final String GENERAL_QUEUE_NAME = "QGeneral";

    private Constant() {
        super();
    }
}


Producer.java


package com.gpcoder.directexchange;

import com.rabbitmq.client.Connection;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import static com.gpcoder.directexchange.Constant.*;

public class Producer {
    private DirectExchangeChannel channel;

    public void start() throws IOException, TimeoutException {
        // Create connection
        Connection connection = ConnectionManager.createConnection();

        // Create channel
        channel = new DirectExchangeChannel(connection, EXCHANGE_NAME);

        // Create direct exchange
        channel.declareExchange();

        // Create queues
        channel.declareQueues(DEV_QUEUE_NAME, MANAGER_QUEUE_NAME, GENERAL_QUEUE_NAME);

        // Binding queues with routing keys
        channel.performQueueBinding(DEV_QUEUE_NAME, DEV_ROUTING_KEY);
        channel.performQueueBinding(MANAGER_QUEUE_NAME, MANAGER_ROUTING_KEY);
        channel.performQueueBinding(GENERAL_QUEUE_NAME, GENERAL_ROUTING_KEY);
    }

    public void send(String message, String routingKey) throws IOException {
        // Send message
        channel.publishMessage(message, routingKey);
    }
}


Consumer.java


package com.gpcoder.directexchange;

import com.rabbitmq.client.Connection;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import static com.gpcoder.directexchange.Constant.*;

public class Consumer {

    private DirectExchangeChannel channel;

    public void start() throws IOException, TimeoutException {
        // Create connection
        Connection connection = ConnectionManager.createConnection();

        // Create channel
        channel = new DirectExchangeChannel(connection, EXCHANGE_NAME);

        // Create direct exchange
        channel.declareExchange();

        // Create queues
        channel.declareQueues(DEV_QUEUE_NAME, MANAGER_QUEUE_NAME, GENERAL_QUEUE_NAME);

        // Binding queues with routing keys
        channel.performQueueBinding(DEV_QUEUE_NAME, DEV_ROUTING_KEY);
        channel.performQueueBinding(MANAGER_QUEUE_NAME, MANAGER_ROUTING_KEY);
        channel.performQueueBinding(GENERAL_QUEUE_NAME, GENERAL_ROUTING_KEY);
    }

    public void subscribe() throws IOException {
        // Subscribe message
        channel.subscribeMessage(DEV_QUEUE_NAME);
        channel.subscribeMessage(MANAGER_QUEUE_NAME);
        channel.subscribeMessage(GENERAL_QUEUE_NAME);
    }

}

App.java


package com.gpcoder.directexchange;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import static com.gpcoder.directexchange.Constant.*;

public class App {

    public static void main(String[] args) throws IOException, TimeoutException {
        // Create producer
        Producer producer = new Producer();
        producer.start();

        // Publish some message
        producer.send("This message for all developers", DEV_ROUTING_KEY);
        producer.send("This message for all managers", MANAGER_ROUTING_KEY);
        producer.send("This message for everyone", GENERAL_ROUTING_KEY);

        Consumer consumer = new Consumer();
        consumer.start();
        consumer.subscribe();
    }
}

Output chương trình:


[Send] [devGroup]: This message for all developers
[Send] [managerGroup]: This message for all managers
[Send] [generalGroup]: This message for everyone
[Received] [QDeveloper]: amq.ctag-F_4tm402_GYRx8FBn6rLPw
[Received] [QDeveloper]: This message for all developers
[Received] [QManager]: amq.ctag-DbVV5-XhzLyFtFd5Kij8DQ
[Received] [QManager]: This message for all managers
[Received] [QGeneral]: amq.ctag-feJgzR4t-P2tjvBWEb13yA
[Received] [QGeneral]: This message for everyone

Như bạn thấy,  Consumer/ Producer chỉ gửi/nhận đúng Message ở Queue mà nó binding.

Tài liệu tham khảo:

  • https://www.rabbitmq.com/tutorials/amqp-concepts.html
5.0
09
Nếu bạn thấy hay thì hãy chia sẻ bài viết cho mọi người nhé! Và Donate tác giả

Shares

Chuyên mục: Message Queue Được gắn thẻ: Message Queue, RabbitMQ

Work Queues trong RabbitMQ
Sử dụng Fanout Exchange trong RabbitMQ

Có thể bạn muốn xem:

  • Sử dụng Topic Exchange (Publish/Subscribe) trong RabbitMQ (01/06/2020)
  • Sử dụng Alternate Exchange trong RabbitMQ (10/06/2020)
  • Kết nối JMS Client với ActiveMQ (07/05/2020)
  • Kết nối AMQP Client với RabbitMQ Server (20/05/2020)
  • Giới thiệu RabbitMQ Management Interface (17/05/2020)

Bình luận

bình luận

Tìm kiếm

Bài viết mới

  • Clean code 13/01/2024
  • Giới thiệu CloudAMQP – Một RabbitMQ server trên Cloud 02/10/2020
  • Kết nối RabbitMQ sử dụng Web STOMP Plugin 19/06/2020
  • Sử dụng publisher confirm trong RabbitMQ 16/06/2020
  • Sử dụng Dead Letter Exchange trong RabbitMQ 13/06/2020

Xem nhiều

  • Hướng dẫn Java Design Pattern – Factory Method (97386 lượt xem)
  • Hướng dẫn Java Design Pattern – Singleton (97034 lượt xem)
  • Giới thiệu Design Patterns (86706 lượt xem)
  • Lập trình đa luồng trong Java (Java Multi-threading) (85534 lượt xem)
  • Giới thiệu về Stream API trong Java 8 (83084 lượt xem)

Nội dung bài viết

  • 1 Flow của một Message trong Direct Exchange
  • 2 Ví dụ Direct Exchange trong RabbitMQ

Lưu trữ

Thẻ đánh dấu

Annotation Authentication Basic Java Behavior Pattern Collection Creational Design Pattern Cấu trúc điều khiển Database Dependency Injection Design pattern Eclipse Exception Executor Service Google Guice Gson Hibernate How to Interceptor IO Jackson Java 8 Java Core JDBC JDK Jersey JMS JPA json JUnit JWT Message Queue Mockito Multithreading OOP PowerMockito RabbitMQ Reflection Report REST SOAP Structuaral Pattern Swagger Thread Pool Unit Test Webservice

Liên kết

  • Clean Code
  • JavaTpoint
  • Refactoring Guru
  • Source Making
  • TutorialsPoint
  • W3Schools Online Web Tutorials

Giới thiệu

GP Coder là trang web cá nhân, được thành lập với mục đích lưu trữ, chia sẽ kiến thức đã học và làm việc của tôi. Các bài viết trên trang này chủ yếu về ngôn ngữ Java và các công nghệ có liên quan đến Java như: Spring, JSF, Web Services, Unit Test, Hibernate, SQL, ...
Hi vọng góp được chút ít công sức cho sự phát triển cộng đồng Coder Việt.

Donate tác giả

Tìm kiếm các bài viết của GP Coder với Google Search

Liên hệ

Các bạn có thể liên hệ với tôi thông qua:
  • Trang liên hệ
  • Linkedin: gpcoder
  • Email: contact@gpcoder.com
  • Skype: ptgiang56it

Follow me

Copyright 2025 © GP Coder · All Rights Reserved · Giới thiệu · Chính sách · Điều khoản · Liên hệ ·

Share

Blogger
Delicious
Digg
Email
Facebook
Facebook messenger
Flipboard
Google
Hacker News
Line
LinkedIn
Mastodon
Mix
Odnoklassniki
PDF
Pinterest
Pocket
Print
Reddit
Renren
Short link
SMS
Skype
Telegram
Tumblr
Twitter
VKontakte
wechat
Weibo
WhatsApp
X
Xing
Yahoo! Mail

Copy short link

Copy link