GP Coder

Trang chia sẻ kiến thức lập trình Java

  • Java Core
    • Basic Java
    • OOP
    • Exception Handling
    • Multi-Thread
    • Java I/O
    • Networking
    • Reflection
    • Collection
    • Java 8
  • Design pattern
    • Creational Pattern
    • Structuaral Pattern
    • Behavior Pattern
  • Web Service
    • SOAP
    • REST
  • JPA
  • Java library
    • Report
    • Json
    • Unit Test
  • Message Queue
    • ActiveMQ
    • RabbitMQ
  • All
Trang chủ Message Queue Sử dụng Headers Exchange trong RabbitMQ

Sử dụng Headers Exchange trong RabbitMQ

Đăng vào 04/06/2020 . Được đăng bởi GP Coder . 4948 Lượt xem . Toàn màn hình

Trong các bài viết trước, chúng ta đã cùng tìm hiểu về Direct Exchange, Fanout Exchange và Topic Exchange. Trong bài này, tôi sẽ giới thiệu với các bạn một loại exchange rất mạnh mẽ khác của RabbitM là Headers Exchange.

Nội dung

  • 1 Flow của một Message trong Headers Exchange
  • 2 Ví dụ Topic Exchange trong RabbitMQ

Flow của một Message trong Headers Exchange

Header exchange (amq.headers) được thiết kế để định tuyến với nhiều thuộc tính, để dàng thực hiện dưới dạng header của message hơn là routing key. Header exchange bỏ đi routing key mà thay vào đó định tuyến dựa trên header của message. Trường hợp này, broker cần một hoặc nhiều thông tin từ application developer, cụ thể là, nên quan tâm đến những tin nhắn với tiêu đề nào phù hợp hoặc tất cả chúng.

Headers Exchange rất giống với Topic Exchange, nhưng nó định tuyến dựa trên các giá trị header thay vì routing key.

Một Message được coi là phù hợp nếu giá trị của header bằng với giá trị được chỉ định khi ràng buộc.

Flow của một Message trong Headers Exchange như sau:

  • Một hoặc nhiều Queue được tạo và binding tới một Headers Exchange sử dụng các header property (H).
  • Một Producer sẽ tạo một Message với các header property (MH) và publish tới Exchange.
  • Một Message được Exchange chuyển đến Queue nếu Header H match với Header MH.
  • Consumer đăng ký tới Queue để nhận Message.

Có 2 loại matching được sử dụng để kiểm tra một Header của binding queue có match với một header từ message đến:

  • any: tương tự như logic OR, được biểu diễn trong các ràng buộc header property là {“x-match“, “any“, …} . Nghĩa là, một Message được gửi tới Exchange phải chứa ít nhất một trong các header mà Queue được liên kết, sau đó Message sẽ được chuyển đến Queue.
  • all: tương tự như logic AND, được biểu diễn trong các ràng buộc header property là {“x-match“, “and“, …} . Nghĩa là, các Message có tất cả các header được liệt kê của nó sẽ được chuyển tiếp đến Queue.

Ví dụ Topic Exchange trong RabbitMQ

Trong ví dụ này, tôi tạo một Headers Exchange có tên GPCoderHeadersExchange, tạo 3 Queue binding tới Headers Exchange này:

  • QDeveloper : Queue này sẽ nhận tất cả message có header là {“dev”, “Developer Channel”} hoặc {“general”, “General Channel”}.
  • QManager : Queue này nhận tất cả message có header là {“dev”, “Developer Channel”} hoặc {“general”, “General Channel”} hoặc {“manager”, “Manager Channel”}.
  • QPublished : Queue này nhận tất cả message có header là {“dev”, “Developer Channel”} và {“access”, “publish”}.

Một số class của chương trình:

  • ConnectionManager : hỗ trợ tạo Connection đến RabbitMQ.
  • HeadersExchangeChannel :  class util hỗ trợ tạo Echange, Queue, binding Queue đến Exchange, publish/ subscribe message, …
  • Constant : định nghĩa constant chứa các thông tin về tên Exchange, Queue.
  • Producer: để gửi Message đến Exchange.
  • Consumer: để nhận Message từ Queue.
  • App: giả lập việc gửi nhận Message thông qua Headers Exchange của RabbitMQ.

ConnectionManager.java


package com.gpcoder.headersexchange;

import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class ConnectionManager {

    private ConnectionManager() {
        super();
    }

    public static Connection createConnection() throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        return factory.newConnection();
    }
}

TopicExchangeChannel.java

</pre>
<pre>package com.gpcoder.headersexchange;

import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

import java.io.IOException;
import java.util.Map;

public class HeadersExchangeChannel {

    private String exchangeName;
    private Channel channel;
    private Connection connection;

    public HeadersExchangeChannel(Connection connection, String exchangeName) throws IOException {
        this.exchangeName = exchangeName;
        this.connection = connection;
        this.channel = connection.createChannel();
    }

    public void declareExchange() throws IOException {
        // exchangeDeclare( exchange, builtinExchangeType, durable)
        channel.exchangeDeclare(exchangeName, BuiltinExchangeType.HEADERS, true);
    }

    public void declareQueues(String ...queueNames) throws IOException {
        for (String queueName : queueNames) {
            // queueDeclare  - (queueName, durable, exclusive, autoDelete, arguments)
            channel.queueDeclare(queueName, true, false, false, null);
        }
    }

    public void performQueueBinding(String queueName, Map<String, Object> headers) throws IOException {
        // Create bindings - (queue, exchange, routingKey)
        channel.queueBind(queueName, exchangeName, "", headers);
    }

    public void subscribeMessage(String queueName) throws IOException {
        // basicConsume - ( queue, autoAck, deliverCallback, cancelCallback)
        channel.basicConsume(queueName, true, ((consumerTag, message) -> {
            System.out.println("[Received] [" + queueName + "]: " + consumerTag);
            System.out.println("[Received] [" + queueName + "]: " + new String(message.getBody()));
        }), consumerTag -> {
            System.out.println(consumerTag);
        });
    }

    public void publishMessage(String message, Map<String, Object> headers) throws IOException {
        BasicProperties properties = new BasicProperties()
                .builder().headers(headers).build();

        // basicPublish - ( exchange, routingKey, basicProperties, body)
        System.out.println("[Send] [" + headers + "]: " + message);
        channel.basicPublish(exchangeName, "", properties, message.getBytes());
    }
}

Constant.java


package com.gpcoder.headersexchange;

public final class Constant {

    // Exchange

    public static final String EXCHANGE_NAME = "GPCoderHeadersExchange";

    // Queue

    public static final String DEV_QUEUE_NAME = "QDeveloper";

    public static final String MANAGER_QUEUE_NAME = "QManager";

    public static final String PUBLISHED_QUEUE_NAME = "QPublished";

    private Constant() {
        super();
    }
}


Producer.java


package com.gpcoder.headersexchange;

import com.rabbitmq.client.Connection;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;

import static com.gpcoder.headersexchange.Constant.*;

public class Producer {

    private HeadersExchangeChannel channel;

    public void start() throws IOException, TimeoutException {
        // Create connection
        Connection connection = ConnectionManager.createConnection();

        // Create channel
        channel = new HeadersExchangeChannel(connection, EXCHANGE_NAME);

        // Create headers exchange
        channel.declareExchange();

        // Create headers
        Map<String, Object> devHeaders = new HashMap<>();
        devHeaders.put("x-match", "any"); // Match any of the header
        devHeaders.put("dev", "Developer Channel");
        devHeaders.put("general", "General Channel");

        Map<String, Object> managerHeaders = new HashMap<>();
        managerHeaders.put("x-match", "any"); // Match any of the header
        managerHeaders.put("dev", "Developer Channel");
        managerHeaders.put("manager", "Manager Channel");
        managerHeaders.put("general", "General Channel");

        Map<String, Object> publishedHeaders = new HashMap<>();
        publishedHeaders.put("x-match", "all"); // Match all of the header
        publishedHeaders.put("general", "General Channel");
        publishedHeaders.put("access", "publish");

        // Create queues
        channel.declareQueues(DEV_QUEUE_NAME, MANAGER_QUEUE_NAME, PUBLISHED_QUEUE_NAME);

        // Binding queues with headers
        channel.performQueueBinding(DEV_QUEUE_NAME, devHeaders);
        channel.performQueueBinding(MANAGER_QUEUE_NAME, managerHeaders);
        channel.performQueueBinding(PUBLISHED_QUEUE_NAME, publishedHeaders);
    }

    public void send(String message, Map<String, Object> headers) throws IOException {
        // Send message
        channel.publishMessage(message, headers);
    }
}

Consumer.java


package com.gpcoder.headersexchange;

import com.rabbitmq.client.Connection;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;

import static com.gpcoder.headersexchange.Constant.*;

public class Consumer {

    private HeadersExchangeChannel channel;

    public void start() throws IOException, TimeoutException {
        // Create connection
        Connection connection = ConnectionManager.createConnection();

        // Create channel
        channel = new HeadersExchangeChannel(connection, EXCHANGE_NAME);

        // Create headers exchange
        channel.declareExchange();

        // Create headers
        Map<String, Object> devHeaders = new HashMap<>();
        devHeaders.put("x-match", "any"); // Match any of the header
        devHeaders.put("dev", "Developer Channel");
        devHeaders.put("general", "General Channel");

        Map<String, Object> managerHeaders = new HashMap<>();
        managerHeaders.put("x-match", "any"); // Match any of the header
        managerHeaders.put("dev", "Developer Channel");
        managerHeaders.put("manager", "Manager Channel");
        managerHeaders.put("general", "General Channel");

        Map<String, Object> publishedHeaders = new HashMap<>();
        publishedHeaders.put("x-match", "all"); // Match all of the header
        publishedHeaders.put("general", "General Channel");
        publishedHeaders.put("access", "publish");

        // Create queues
        channel.declareQueues(DEV_QUEUE_NAME, MANAGER_QUEUE_NAME, PUBLISHED_QUEUE_NAME);

        // Binding queues with headers
        channel.performQueueBinding(DEV_QUEUE_NAME, devHeaders);
        channel.performQueueBinding(MANAGER_QUEUE_NAME, managerHeaders);
        channel.performQueueBinding(PUBLISHED_QUEUE_NAME, publishedHeaders);
    }

    public void subscribe() throws IOException {
        // Subscribe message
        channel.subscribeMessage(DEV_QUEUE_NAME);
        channel.subscribeMessage(MANAGER_QUEUE_NAME);
        channel.subscribeMessage(PUBLISHED_QUEUE_NAME);
    }

}

App.java


package com.gpcoder.headersexchange;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;

public class App {

    public static void main(String[] args) throws IOException, TimeoutException {
        // Create producers, queues and binding queues to Headers Exchange
        Producer producer = new Producer();
        producer.start();

        // Publish some messages
        Map<String, Object> devHeader = new HashMap<>();
        devHeader.put("dev", "Developer Channel");
        producer.send("[1] Developer message", devHeader);

        Map<String, Object> managerHeader = new HashMap<>();
        managerHeader.put("manager", "Manager Channel");
        producer.send("[2] Manager message", managerHeader);

        Map<String, Object> generalHeader = new HashMap<>();
        generalHeader.put("general", "General Channel");
        producer.send("[3] General message", generalHeader);

        Map<String, Object> publishedHeader = new HashMap<>();
        publishedHeader.put("general", "General Channel");
        publishedHeader.put("access", "publish");
        producer.send("[4] Published message", publishedHeader);


        // Create consumers, queues and binding queues to Headers Exchange
        Consumer consumer = new Consumer();
        consumer.start();
        consumer.subscribe();
    }
}

Output của chương trình:


[Send] [{dev=Developer Channel}]: [1] Developer message
[Send] [{manager=Manager Channel}]: [2] Manager message
[Send] [{general=General Channel}]: [3] General message
[Send] [{general=General Channel, access=publish}]: [4] Published message
[Received] [QDeveloper]: amq.ctag-NEPntH2xJ4411lij29dCyg
[Received] [QDeveloper]: [1] Developer message
[Received] [QDeveloper]: amq.ctag-NEPntH2xJ4411lij29dCyg
[Received] [QDeveloper]: [3] General message
[Received] [QDeveloper]: amq.ctag-NEPntH2xJ4411lij29dCyg
[Received] [QDeveloper]: [4] Published message
[Received] [QManager]: amq.ctag-UBFWB3QQqjgbPNAdyx87NQ
[Received] [QManager]: [1] Developer message
[Received] [QManager]: amq.ctag-UBFWB3QQqjgbPNAdyx87NQ
[Received] [QManager]: [2] Manager message
[Received] [QManager]: amq.ctag-UBFWB3QQqjgbPNAdyx87NQ
[Received] [QManager]: [3] General message
[Received] [QManager]: amq.ctag-UBFWB3QQqjgbPNAdyx87NQ
[Received] [QManager]: [4] Published message
[Received] [QPublished]: amq.ctag-j0yil4i64E5lepsuKdKFdg
[Received] [QPublished]: [4] Published message

Như bạn thấy:

  • developer có thể nhận bất kỳ Message nào có header là {“dev”, “Developer Channel”} hoặc {“general”, “General Channel”}.
  • manager có thể nhận bất kỳ Message nào có header là {“dev”, “Developer Channel”} hoặc {“general”, “General Channel”} hoặc {“manager”, “Manager Channel”}.
  • customer có thể nhận bất kỳ Message nào có header là {“dev”, “Developer Channel”} và {“access”, “publish”}.

Tài liệu tham khảo:

  • https://www.rabbitmq.com/tutorials/amqp-concepts.html
5.0
07
Nếu bạn thấy hay thì hãy chia sẻ bài viết cho mọi người nhé! Và Donate tác giả

Shares

Chuyên mục: Message Queue Được gắn thẻ: Message Queue, RabbitMQ

Sử dụng Topic Exchange (Publish/Subscribe) trong RabbitMQ
Sử dụng binding Exchange to Exchange trong RabbitMQ

Có thể bạn muốn xem:

  • Giới thiệu RabbitMQ Management Interface (17/05/2020)
  • Giới thiệu CloudAMQP – Một RabbitMQ server trên Cloud (02/10/2020)
  • Sử dụng publisher confirm trong RabbitMQ (16/06/2020)
  • Kết nối AMQP Client với RabbitMQ Server (20/05/2020)
  • Kết nối RabbitMQ sử dụng Web STOMP Plugin (19/06/2020)

Bình luận

bình luận

Tìm kiếm

Bài viết mới

  • Clean code 13/01/2024
  • Giới thiệu CloudAMQP – Một RabbitMQ server trên Cloud 02/10/2020
  • Kết nối RabbitMQ sử dụng Web STOMP Plugin 19/06/2020
  • Sử dụng publisher confirm trong RabbitMQ 16/06/2020
  • Sử dụng Dead Letter Exchange trong RabbitMQ 13/06/2020

Xem nhiều

  • Hướng dẫn Java Design Pattern – Factory Method (97387 lượt xem)
  • Hướng dẫn Java Design Pattern – Singleton (97036 lượt xem)
  • Giới thiệu Design Patterns (86706 lượt xem)
  • Lập trình đa luồng trong Java (Java Multi-threading) (85534 lượt xem)
  • Giới thiệu về Stream API trong Java 8 (83085 lượt xem)

Nội dung bài viết

  • 1 Flow của một Message trong Headers Exchange
  • 2 Ví dụ Topic Exchange trong RabbitMQ

Lưu trữ

Thẻ đánh dấu

Annotation Authentication Basic Java Behavior Pattern Collection Creational Design Pattern Cấu trúc điều khiển Database Dependency Injection Design pattern Eclipse Exception Executor Service Google Guice Gson Hibernate How to Interceptor IO Jackson Java 8 Java Core JDBC JDK Jersey JMS JPA json JUnit JWT Message Queue Mockito Multithreading OOP PowerMockito RabbitMQ Reflection Report REST SOAP Structuaral Pattern Swagger Thread Pool Unit Test Webservice

Liên kết

  • Clean Code
  • JavaTpoint
  • Refactoring Guru
  • Source Making
  • TutorialsPoint
  • W3Schools Online Web Tutorials

Giới thiệu

GP Coder là trang web cá nhân, được thành lập với mục đích lưu trữ, chia sẽ kiến thức đã học và làm việc của tôi. Các bài viết trên trang này chủ yếu về ngôn ngữ Java và các công nghệ có liên quan đến Java như: Spring, JSF, Web Services, Unit Test, Hibernate, SQL, ...
Hi vọng góp được chút ít công sức cho sự phát triển cộng đồng Coder Việt.

Donate tác giả

Tìm kiếm các bài viết của GP Coder với Google Search

Liên hệ

Các bạn có thể liên hệ với tôi thông qua:
  • Trang liên hệ
  • Linkedin: gpcoder
  • Email: contact@gpcoder.com
  • Skype: ptgiang56it

Follow me

Copyright 2025 © GP Coder · All Rights Reserved · Giới thiệu · Chính sách · Điều khoản · Liên hệ ·

Share

Blogger
Delicious
Digg
Email
Facebook
Facebook messenger
Flipboard
Google
Hacker News
Line
LinkedIn
Mastodon
Mix
Odnoklassniki
PDF
Pinterest
Pocket
Print
Reddit
Renren
Short link
SMS
Skype
Telegram
Tumblr
Twitter
VKontakte
wechat
Weibo
WhatsApp
X
Xing
Yahoo! Mail

Copy short link

Copy link