GP Coder

Trang chia sẻ kiến thức lập trình Java

  • Java Core
    • Basic Java
    • OOP
    • Exception Handling
    • Multi-Thread
    • Java I/O
    • Networking
    • Reflection
    • Collection
    • Java 8
  • Design pattern
    • Creational Pattern
    • Structuaral Pattern
    • Behavior Pattern
  • Web Service
    • SOAP
    • REST
  • JPA
  • Java library
    • Report
    • Json
    • Unit Test
  • Message Queue
    • ActiveMQ
    • RabbitMQ
  • All
Trang chủ Message Queue Kết nối JMS Client với ActiveMQ

Kết nối JMS Client với ActiveMQ

Đăng vào 07/05/2020 . Được đăng bởi GP Coder . 7620 Lượt xem . Toàn màn hình

Trong bài này, chúng ta sẽ cùng tìm hiểu cách tạo JMS Client (Producer và Consumer) để kết nối đến ActiveMQ server, cũng như sự khác biệt trong việc phân phối tin nhắn giữa Queue va Topic.

Nội dung

  • 1 Tạo ActiveMQ project
  • 2 Tạo Producer và Consumer
  • 3 Chạy ứng dụng

Tạo ActiveMQ project

Tạo maven project và mở file pom.xml, khai báo dependency như sau:


<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.gpcoder</groupId>
    <artifactId>activemq-example</artifactId>
    <version>1.0-SNAPSHOT</version>


    <properties>
        <java.version>1.8</java.version>
        <maven.compiler.target>1.8</maven.compiler.target>
        <maven.compiler.source>1.8</maven.compiler.source>
    </properties>

    <dependencies>
        <!-- https://mvnrepository.com/artifact/org.apache.activemq/activemq-client -->
        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-client</artifactId>
            <version>5.15.12</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.qpid/qpid-jms-client -->
        <dependency>
            <groupId>org.apache.qpid</groupId>
            <artifactId>qpid-jms-client</artifactId>
            <version>0.50.0</version>
        </dependency>

    </dependencies>

    <build>
        <plugins>
            <!-- https://mvnrepository.com/artifact/org.apache.maven.plugins/maven-compiler-plugin -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.8.1</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>

            <!-- include all the dependencies into the jar for easier to execute the application -->
            <!-- https://mvnrepository.com/artifact/org.fusesource.mvnplugins/maven-uberize-plugin -->
            <plugin>
                <groupId>org.fusesource.mvnplugins</groupId>
                <artifactId>maven-uberize-plugin</artifactId>
                <version>1.45</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>uberize</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>

        </plugins>
    </build>

</project>

Chúng ta chỉ cần sử dụng 1 trong 2 thư viện JMS Client: activemq-client hoặc qpid-jms-client. Trong bài này, tôi muốn giới thiệu với các bạn cả 2, nên cần include cả 2 thư viện này.

  • activemq-client : nếu muốn kết nối thông qua giao thức TCP.
  • qpid-jms-client : nếu muốn kết nối thông qua giao thức AMQP.

Tạo Producer và Consumer

Tạo Producer

Các bước thực hiện:

  • Tạo ConnectionFactory : xác định remote URI đến ActiveMQ server. ActiveMQ hỗ trợ nhiều loại giao thức khác nhau, trong ví dụ này, tôi sẽ sử dụng giới các bạn cách tạo ConnectionFactory sử dụng giao thức AMQP và TCP.
    • remoteURI sử dụng AMQP: amqp://localhost:5672
    • remoteURI sử dụng TCP: tcp://localhost:61616
  • Tạo Connection từ ConnectionFactory, cần cung cấp username và password.
  • Tạo Session: mỗi Connection có thể có nhiều Session quản lý những thứ như Transaction và lưu giữ Message riêng biệt.
  • Tạo Destination: là một địa chỉ của một Topic hoặc Queue cụ thể được lưu trữ bởi JMS broker.
  • Tạo Producer: producer dành riêng cho một Destination, nó chỉ có thể gửi tin nhắn đến một Topic hoặc Queue cụ thể.
  • Tạo Message: Mỗi khi muốn gửi tin nhắn đến Topic hoặc Queue, cần phải tạo một đối tượng Message. Có một vài loại khác nhau: text, binary, object, IO stream. Trong ví dụ này, tôi chỉ gửi một tin nhắn dạng text, đây là một trong những dạng đơn giản nhất.
  • Gửi Message: thực hiện gửi message đến JMS Broker.
  • Đóng kết nối: sau khi sử dụng xong cần đóng kết nối. Điều này nói với JMS broker rằng nó có thể giải phóng các tài nguyên được sử dụng cho kết nối đó.

Ví dụ:


package com.gpcoder;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.qpid.jms.JmsConnectionFactory;

import javax.jms.*;
import java.io.BufferedReader;
import java.io.InputStreamReader;

class Producer {

    public static void main(String[] args) throws Exception {

        System.out.println("Create a ConnectionFactory");
        // ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
        ConnectionFactory connectionFactory = new JmsConnectionFactory("amqp://localhost:5672");

        System.out.println("Create a Connection");
        Connection connection = connectionFactory.createConnection("admin", "admin");
        connection.start();

        System.out.println("Create a Session");
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

        System.out.println("Create a Topic/ Queue based on the given parameter");
        Destination destination = null;
        if (args.length > 0 && args[0].equalsIgnoreCase("QUEUE")) {
            destination = session.createQueue("gpcoder-jms-queue");
        } else if (args.length > 0 && args[0].equalsIgnoreCase("TOPIC")) {
            destination = session.createTopic("gpcoder-jms-topic");
        } else {
            System.out.println("Error: You must specify Queue or Topic");
            connection.close();
            System.exit(1);
        }

        System.out.println("Create a Producer to send messages to one Topic or Queue.");
        MessageProducer producer = session.createProducer(destination);

        System.out.println("Start sending messages ... ");
        try (BufferedReader br = new BufferedReader(new InputStreamReader(System.in));) {
            String response;
            do {
                System.out.print("Enter message: ");
                response = br.readLine().trim();
                TextMessage msg = session.createTextMessage(response);
                producer.send(msg);
            } while (!response.equalsIgnoreCase("close"));
        }

        System.out.println("Shutdown JMS connection and free resources");
        connection.close();
        System.exit(1);
    }
}

Tạo Consumer

Các bước thực hiện tương tự như tạo Producer, khác biệt duy nhất là thay vì tạo Producer để gửi tin nhắn, ta tạo Consumer để nhận tin nhắn.


package com.gpcoder;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.qpid.jms.JmsConnectionFactory;

import javax.jms.Connection;
import javax.jms.Session;
import javax.jms.Destination;
import javax.jms.MessageConsumer;
import javax.jms.TextMessage;
import javax.jms.*;

class Consumer {

    public static void main(String[] args) throws JMSException {

        System.out.println("Create a ConnectionFactory");
        // ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
        ConnectionFactory connectionFactory = new JmsConnectionFactory("amqp://localhost:5672");

        System.out.println("Create a Connection");
        Connection connection = connectionFactory.createConnection("admin", "admin");
        connection.start();

        System.out.println("Create a Session");
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

        System.out.println("Create a Topic/ Queue based on the given parameter");
        Destination destination = null;
        if (args.length > 0 && args[0].equalsIgnoreCase("QUEUE")) {
            destination = session.createQueue("gpcoder-jms-queue");
        } else if (args.length > 0 && args[0].equalsIgnoreCase("TOPIC")) {
            destination = session.createTopic("gpcoder-jms-topic");
        } else {
            System.out.println("Error: You must specify Queue or Topic");
            connection.close();
            System.exit(1);
        }

        System.out.println("Create a Consumer to receive messages from one Topic or Queue.");
        MessageConsumer consumer = session.createConsumer(destination);

        System.out.println("Start receiving messages ... ");
        String body;
        do {
            Message msg = consumer.receive();
            body = ((TextMessage) msg).getText();
            System.out.println("Received = " + body);
        } while (!body.equalsIgnoreCase("close"));

        System.out.println("Shutdown JMS connection and free resources");
        connection.close();
        System.exit(1);
    }
}

Chạy ứng dụng

Trước hết, chúng ta cần package ứng dụng trên thành gói jar, chạy lệnh: mvn clean install

Sau khi chạy lệnh trên, trong thư mục target của project, chúng ta có gói: activemq-example-1.0-SNAPSHOT.jar

Start ActiveMQ Server: xem lại bài viết trước “Cài đặt ActiveMQ“.

Trường hợp JMS Queue

Mô hình P2P (Point to Point) đảm bảo chỉ có một người gửi và một người nhận tin nhắn.

Nhiều Consumer và một Producer

Mở 2 console và chạy lệnh sau để start 2 JMS Consumer:

java -cp target/activemq-example-1.0-SNAPSHOT.jar com.gpcoder.Consumer Queue

Mở thêm 1 console khác để start JMS Producer:

java -cp target/activemq-example-1.0-SNAPSHOT.jar com.gpcoder.Producer Queue

Chúng ta có kết quả như sau:

Nhập một vài giá trị ở cửa sổ Producer, chúng ta có kết quả sau:

Như bạn thấy, một tin nhắn chỉ được gửi cho một client tại một thời điểm và client thay phiên nhau nhận tin nhắn.

Hãy đóng các console trên bằng cách enter “exit” trên console của Producer hoặc Ctrl + C.

Producer gửi message trước khi Consumer start

Tiếp tục hãy test thử một trường hợp khác: Start Producer và gửi một vài message:

Sau đó start Consumer và check kết quả.

Bạn có thể thấy rằng, người nhận không cần active tại thời điểm Producer gửi message. JMS Broker sẽ deliver message ngay khi Consumer active.

Trường hợp JMS Topic

Mô hình Pub/ Sub (Publisher/ Subscriber) cho phép 1 người gửi và nhiều người nhận.

Mở 2 console và chạy lệnh sau để start 2 JMS Consumer:

java -cp target/activemq-example-1.0-SNAPSHOT.jar com.gpcoder.Consumer Topic

Mở thêm 1 console khác để start JMS Producer:

java -cp target/activemq-example-1.0-SNAPSHOT.jar com.gpcoder.Producer Topic

Chúng ta có kết quả như sau:

Nhập một vài giá trị ở cửa sổ Producer, chúng ta có kết quả sau:

Như bạn thấy, mỗi khi Producer gửi một tin nhắn thì tất cả consumer đều nhận được ngay tức thì. Khi bạn nhập “close” thì chương trình cũng kết thúc và đóng connection.

Hãy đóng các console trên bằng cách enter “exit” trên console của Producer hoặc Ctrl + C.

Tiếp tục hãy test thử một trường hợp khác: start Producer và gửi một vài message. Sau đó start Consumer. Bạn sẽ thấy rằng, Consumer không nhận được message của Producer đã gửi từ trước. Mỗi Consumer sẽ chỉ nhận được message từ Topic sau khi đã subscription.

Mở admin page của ActiveMQ để kiểm tra lại Topic và Queue đã tạo: http://localhost:8161/admin/

Trang Queues:

Trang Topics:

 

Tài liệu tham khảo:

  • https://activemq.apache.org/hello-world
4.9
11
Nếu bạn thấy hay thì hãy chia sẻ bài viết cho mọi người nhé! Và Donate tác giả

Shares

Chuyên mục: Message Queue Được gắn thẻ: ActiveMQ, JMS, Message Queue

Cài đặt ActiveMQ
Giới thiệu RabbitMQ

Có thể bạn muốn xem:

  • Sử dụng Direct Exchange trong RabbitMQ (26/05/2020)
  • Sử dụng Topic Exchange (Publish/Subscribe) trong RabbitMQ (01/06/2020)
  • Sử dụng publisher confirm trong RabbitMQ (16/06/2020)
  • Sử dụng Dead Letter Exchange trong RabbitMQ (13/06/2020)
  • Cài đặt ActiveMQ (04/05/2020)

Bình luận

bình luận

Tìm kiếm

Bài viết mới

  • Clean code 13/01/2024
  • Giới thiệu CloudAMQP – Một RabbitMQ server trên Cloud 02/10/2020
  • Kết nối RabbitMQ sử dụng Web STOMP Plugin 19/06/2020
  • Sử dụng publisher confirm trong RabbitMQ 16/06/2020
  • Sử dụng Dead Letter Exchange trong RabbitMQ 13/06/2020

Xem nhiều

  • Hướng dẫn Java Design Pattern – Factory Method (97309 lượt xem)
  • Hướng dẫn Java Design Pattern – Singleton (96950 lượt xem)
  • Giới thiệu Design Patterns (86585 lượt xem)
  • Lập trình đa luồng trong Java (Java Multi-threading) (85401 lượt xem)
  • Giới thiệu về Stream API trong Java 8 (82971 lượt xem)

Nội dung bài viết

  • 1 Tạo ActiveMQ project
  • 2 Tạo Producer và Consumer
  • 3 Chạy ứng dụng

Lưu trữ

Thẻ đánh dấu

Annotation Authentication Basic Java Behavior Pattern Collection Creational Design Pattern Cấu trúc điều khiển Database Dependency Injection Design pattern Eclipse Exception Executor Service Google Guice Gson Hibernate How to Interceptor IO Jackson Java 8 Java Core JDBC JDK Jersey JMS JPA json JUnit JWT Message Queue Mockito Multithreading OOP PowerMockito RabbitMQ Reflection Report REST SOAP Structuaral Pattern Swagger Thread Pool Unit Test Webservice

Liên kết

  • Clean Code
  • JavaTpoint
  • Refactoring Guru
  • Source Making
  • TutorialsPoint
  • W3Schools Online Web Tutorials

Giới thiệu

GP Coder là trang web cá nhân, được thành lập với mục đích lưu trữ, chia sẽ kiến thức đã học và làm việc của tôi. Các bài viết trên trang này chủ yếu về ngôn ngữ Java và các công nghệ có liên quan đến Java như: Spring, JSF, Web Services, Unit Test, Hibernate, SQL, ...
Hi vọng góp được chút ít công sức cho sự phát triển cộng đồng Coder Việt.

Donate tác giả

Tìm kiếm các bài viết của GP Coder với Google Search

Liên hệ

Các bạn có thể liên hệ với tôi thông qua:
  • Trang liên hệ
  • Linkedin: gpcoder
  • Email: contact@gpcoder.com
  • Skype: ptgiang56it

Follow me

Copyright 2025 © GP Coder · All Rights Reserved · Giới thiệu · Chính sách · Điều khoản · Liên hệ ·

Share

Blogger
Delicious
Digg
Email
Facebook
Facebook messenger
Flipboard
Google
Hacker News
Line
LinkedIn
Mastodon
Mix
Odnoklassniki
PDF
Pinterest
Pocket
Print
Reddit
Renren
Short link
SMS
Skype
Telegram
Tumblr
Twitter
VKontakte
wechat
Weibo
WhatsApp
X
Xing
Yahoo! Mail

Copy short link

Copy link