GP Coder

Trang chia sẻ kiến thức lập trình Java

  • Java Core
    • Basic Java
    • OOP
    • Exception Handling
    • Multi-Thread
    • Java I/O
    • Networking
    • Reflection
    • Collection
    • Java 8
  • Design pattern
    • Creational Pattern
    • Structuaral Pattern
    • Behavior Pattern
  • Web Service
    • SOAP
    • REST
  • JPA
  • Java library
    • Report
    • Json
    • Unit Test
  • Message Queue
    • ActiveMQ
    • RabbitMQ
  • All
Trang chủ Message Queue Work Queues trong RabbitMQ

Work Queues trong RabbitMQ

Đăng vào 23/05/2020 . Được đăng bởi GP Coder . 6821 Lượt xem . Toàn màn hình

Trong bài viết trước, chúng ta đã cùng tìm hiểu về cách tạo RabbitMQ Client (Producer và Consumer) sử dụng AMQP library để kết nối đến RabbitMQ server. Trong bài này, chúng ta sẽ cùng tìm hiểu chi tiết hơn về Work Queues được sử dụng để phân phối các task đến nhiều Worker.

Nội dung

  • 1 Work Queues
  • 2 Ví dụ

Work Queues

Ý tưởng chính của Work Queues (còn gọi là Task Queues) là tránh thực hiện một nhiệm vụ (work/task) tốn nhiều tài nguyên ngay lập tức và phải chờ nó hoàn thành. Thay vào đó chúng ta sẽ lên lịch (schedule) và các nhiệm vụ sẽ được thực hiện sau.

Chúng ta gói gọn (encapsulate) một task dưới dạng Message và gửi nó đến Queue. Một tiến trình worker chạy background sẽ lấy các task và cuối cùng thực thi chúng.

Có thể tạo nhiều worker để thực hiện các task, các task sẽ được chia sẻ giữa chúng.

Ví dụ

Trong ví dụ bên dưới, tôi sẽ tạo:

  • class Thread Producer có nhiệm vụ tạo các task và đưa vào Queue (RabbitMQ Broker).
  • class Thread Consumer (đóng vai trò như Worker) có nhiệm vụ lấy các task từ Queue về xử lý.
  • Sử dụng method channel.basicQos(1) : Mặc định, RabbitQM sử dụng Round-robin để gửi Message đến Consumer kế tiếp một cách tuần tự. Mỗi Consumer có thời gian xử lý mỗi task khác nhau. Để tránh một Consumer nhận quá nhiều mà không có thời gian để xử lý, một Consumer quá rãnh không có thời gian thực hiện. Chúng ta có sẽ sử dụng option basicQos() để nói với RabbitMQ rằng chỉ gửi 1 Message cho Consumer, khi nào xử lý xong hãy gửi Message kế tiếp. Nhờ vậy thời gian hoàn thành sớm hơn.
  • Sử dụng thuộc tính autoAck = false : Trong bài viết trước, chúng ta sử dụng thuộc tính autoAck là true khi Consumer nhận Message, thuộc tính này chỉ ra răng một ACK message sẽ được auto gửi đến RabbitMQ để báo với RabbitMQ rằng một Message đã được Consumer nhận, xử lý và Rabbit có thể xoá nó. Một vấn đề đặt ra là nếu một Consumer xử lý Task trong một thời gian dài, chỉ một phần của Task được hoàn thành và nó die. Khi đó, Message đã bị xoá bởi RabbitQM và Task sẽ bị mất. Để giải quyết vấn đề này, chúng ta sẽ không auto gửi Message, mà chúng ta sẽ gửi một ACK message đến RabbitMQ khi nó hoàn thành xử lý Message.
  • class App : mô phỏng việc tạo Task bởi Producer và lấy task xử lý bởi Consumer. Tôi sẽ tạo 1 Producer để tạo ra 10 Tasks và 2 Consumer để thay phiên nhau xử lý các Task được tạo bởi Producer. Một lưu ý là Consumer 1 sẽ xử lý mỗi task trong 100 milliseconds, Consumer 2 sẽ xử lý mỗi task trong 300 milliseconds. Nếu theo cơ chế Round-robin dispatching của RabbitMQ thông thường thì mỗi Consumer sẽ xử ý 5 Tasks, thời gian xử lý mỗi Task của Consumer 2 lâu hơn nên Consumer 1 sẽ rỗi rảnh trong khi Consumer 2 vẫn còn việc phải xử lý. Tuy nhiên, mình đã sử dụng basicQos() nên sẽ không có chuyện một Consumer rỗi rảnh và một Consumer có nhiều Task cần làm. Hãy xem kết quả ở chương trình bên dưới nhé.

 

Producer.java


package com.gpcoder;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class Producer implements Runnable {
    private final static String QUEUE_NAME = "gpcoder-queue";

    private String name;
    private int numOfMessage;

    public Producer(String name, int numOfMessage) {
        this.name = name;
        this.numOfMessage = numOfMessage;
    }

    @Override
    public void run() {
        System.out.println("Create a ConnectionFactory for " + name);
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");

        try ( Connection connection = factory.newConnection();
              Channel channel = connection.createChannel() ) {
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);

            System.out.println("Start sending messages ... ");
            int index = 1;
            while (index <= numOfMessage) {
                String message = " Task #" + index++;
                channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
                System.out.println(" [x] " + name + " Sent: '" + message + "'");
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            System.out.println("Close connection and free resources");
        }
    }
}

Consumer.java


package com.gpcoder;

import com.rabbitmq.client.*;

public class Consumer implements Runnable {

    private final static String QUEUE_NAME = "gpcoder-queue";
    private int numberConsumedMessage = 0;
    private String name;
    private int timeToFinishATask;

    public Consumer(String name, int timeToFinishATask) {
        this.name = name;
        this.timeToFinishATask = timeToFinishATask;
    }

    @Override
    public void run() {
        try {
            System.out.println("Create a ConnectionFactory for " + name);
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");

            Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);

            /**
             * This tells RabbitMQ not to give more than one message to a worker at a time.
             * Or, in other words, don't dispatch a new message to a worker
             * until it has processed and acknowledged the previous one.
             * Instead, it will dispatch it to the next worker that is not still busy.
             */
            channel.basicQos(1);

            System.out.println("Start receiving messages ... ");
            DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                String message = new String(delivery.getBody(), "UTF-8");
                System.out.println(" [x] " + name + " Received: '" + message + "'");
                consume(message);
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
                System.out.println(" [-] " + name + " Already consumed: " + (++numberConsumedMessage) + " Tasks");

            };
            CancelCallback cancelCallback = consumerTag -> { };
            boolean autoAck = false;
            String consumerTag = channel.basicConsume(QUEUE_NAME, autoAck, deliverCallback, cancelCallback);
            System.out.println("Tag for " + name + ": " + consumerTag);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    private void consume(String message) {
        try {
            Thread.sleep(timeToFinishATask); // simulate time to produce the data
            System.out.println(" [-] " + name + " Consumed for " + message + " in " + timeToFinishATask + " ms");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

App.java


package com.gpcoder;

public class App {

    public static void main(String[] args) throws InterruptedException {

        Producer producer1 = new Producer("[Producer 1]",10);
        Consumer consumer1 = new Consumer("[Consumer 1]", 100);
        Consumer consumer2 = new Consumer("[Consumer 2]", 300);

        new Thread(producer1).start();

        new Thread(consumer1).start();
        new Thread(consumer2).start();
    }
}

Output của chương trình trên:


Create a ConnectionFactory for [Producer 1]
Create a ConnectionFactory for [Consumer 2]
Create a ConnectionFactory for [Consumer 1]
Start sending messages ... 
Start receiving messages ... 
Start receiving messages ... 
 [x] [Producer 1] Sent: ' Task #1'
 [x] [Producer 1] Sent: ' Task #2'
 [x] [Producer 1] Sent: ' Task #3'
 [x] [Producer 1] Sent: ' Task #4'
 [x] [Producer 1] Sent: ' Task #5'
 [x] [Producer 1] Sent: ' Task #6'
 [x] [Producer 1] Sent: ' Task #7'
 [x] [Producer 1] Sent: ' Task #8'
 [x] [Producer 1] Sent: ' Task #9'
 [x] [Producer 1] Sent: ' Task #10'
Tag for [Consumer 2]: amq.ctag-yVYxlwJ6750wJrwcMrO0nQ
Tag for [Consumer 1]: amq.ctag-RBZ-9HjR0-B_Gf2lQ0js7g
 [x] [Consumer 2] Received: ' Task #2'
 [x] [Consumer 1] Received: ' Task #1'
Close connection and free resources
 [-] [Consumer 1] Consumed for  Task #1 in 100 ms
 [-] [Consumer 1] Already consumed: 1 Tasks
 [x] [Consumer 1] Received: ' Task #3'
 [-] [Consumer 1] Consumed for  Task #3 in 100 ms
 [-] [Consumer 1] Already consumed: 2 Tasks
 [x] [Consumer 1] Received: ' Task #4'
 [-] [Consumer 2] Consumed for  Task #2 in 300 ms
 [-] [Consumer 2] Already consumed: 1 Tasks
 [x] [Consumer 2] Received: ' Task #5'
 [-] [Consumer 1] Consumed for  Task #4 in 100 ms
 [-] [Consumer 1] Already consumed: 3 Tasks
 [x] [Consumer 1] Received: ' Task #6'
 [-] [Consumer 1] Consumed for  Task #6 in 100 ms
 [-] [Consumer 1] Already consumed: 4 Tasks
 [x] [Consumer 1] Received: ' Task #7'
 [-] [Consumer 1] Consumed for  Task #7 in 100 ms
 [-] [Consumer 1] Already consumed: 5 Tasks
 [x] [Consumer 1] Received: ' Task #8'
 [-] [Consumer 2] Consumed for  Task #5 in 300 ms
 [-] [Consumer 2] Already consumed: 2 Tasks
 [x] [Consumer 2] Received: ' Task #9'
 [-] [Consumer 1] Consumed for  Task #8 in 100 ms
 [-] [Consumer 1] Already consumed: 6 Tasks
 [x] [Consumer 1] Received: ' Task #10'
 [-] [Consumer 1] Consumed for  Task #10 in 100 ms
 [-] [Consumer 1] Already consumed: 7 Tasks
 [-] [Consumer 2] Consumed for  Task #9 in 300 ms
 [-] [Consumer 2] Already consumed: 3 Tasks

Như bạn thấy, 2 Consumer cùng xử lý các công việc được tạo bởi Producer. Tuy nhiên, thời gian xử lý mỗi Task của Consumer 1 nhanh hơn Consumer 2, nên nó có thể done nhiều task hơn: Consumer 1 có thể xử lý 7 tasks, trong khi Consumer 2 chỉ xử lý 3 tasks.

Tài liệu tham khảo:

  • https://www.rabbitmq.com/tutorials/tutorial-two-java.html
5.0
08
Nếu bạn thấy hay thì hãy chia sẻ bài viết cho mọi người nhé! Và Donate tác giả

Shares

Chuyên mục: Message Queue Được gắn thẻ: JMS, Message Queue, RabbitMQ

Kết nối AMQP Client với RabbitMQ Server
Sử dụng Direct Exchange trong RabbitMQ

Có thể bạn muốn xem:

  • Sử dụng publisher confirm trong RabbitMQ (16/06/2020)
  • Sử dụng Direct Exchange trong RabbitMQ (26/05/2020)
  • Kết nối AMQP Client với RabbitMQ Server (20/05/2020)
  • Cài đặt ActiveMQ (04/05/2020)
  • Sử dụng Alternate Exchange trong RabbitMQ (10/06/2020)

Bình luận

bình luận

Tìm kiếm

Bài viết mới

  • Clean code 13/01/2024
  • Giới thiệu CloudAMQP – Một RabbitMQ server trên Cloud 02/10/2020
  • Kết nối RabbitMQ sử dụng Web STOMP Plugin 19/06/2020
  • Sử dụng publisher confirm trong RabbitMQ 16/06/2020
  • Sử dụng Dead Letter Exchange trong RabbitMQ 13/06/2020

Xem nhiều

  • Hướng dẫn Java Design Pattern – Factory Method (97390 lượt xem)
  • Hướng dẫn Java Design Pattern – Singleton (97038 lượt xem)
  • Giới thiệu Design Patterns (86713 lượt xem)
  • Lập trình đa luồng trong Java (Java Multi-threading) (85539 lượt xem)
  • Giới thiệu về Stream API trong Java 8 (83089 lượt xem)

Nội dung bài viết

  • 1 Work Queues
  • 2 Ví dụ

Lưu trữ

Thẻ đánh dấu

Annotation Authentication Basic Java Behavior Pattern Collection Creational Design Pattern Cấu trúc điều khiển Database Dependency Injection Design pattern Eclipse Exception Executor Service Google Guice Gson Hibernate How to Interceptor IO Jackson Java 8 Java Core JDBC JDK Jersey JMS JPA json JUnit JWT Message Queue Mockito Multithreading OOP PowerMockito RabbitMQ Reflection Report REST SOAP Structuaral Pattern Swagger Thread Pool Unit Test Webservice

Liên kết

  • Clean Code
  • JavaTpoint
  • Refactoring Guru
  • Source Making
  • TutorialsPoint
  • W3Schools Online Web Tutorials

Giới thiệu

GP Coder là trang web cá nhân, được thành lập với mục đích lưu trữ, chia sẽ kiến thức đã học và làm việc của tôi. Các bài viết trên trang này chủ yếu về ngôn ngữ Java và các công nghệ có liên quan đến Java như: Spring, JSF, Web Services, Unit Test, Hibernate, SQL, ...
Hi vọng góp được chút ít công sức cho sự phát triển cộng đồng Coder Việt.

Donate tác giả

Tìm kiếm các bài viết của GP Coder với Google Search

Liên hệ

Các bạn có thể liên hệ với tôi thông qua:
  • Trang liên hệ
  • Linkedin: gpcoder
  • Email: contact@gpcoder.com
  • Skype: ptgiang56it

Follow me

Copyright 2025 © GP Coder · All Rights Reserved · Giới thiệu · Chính sách · Điều khoản · Liên hệ ·

Share

Blogger
Delicious
Digg
Email
Facebook
Facebook messenger
Flipboard
Google
Hacker News
Line
LinkedIn
Mastodon
Mix
Odnoklassniki
PDF
Pinterest
Pocket
Print
Reddit
Renren
Short link
SMS
Skype
Telegram
Tumblr
Twitter
VKontakte
wechat
Weibo
WhatsApp
X
Xing
Yahoo! Mail

Copy short link

Copy link