GP Coder

Trang chia sẻ kiến thức lập trình Java

  • Java Core
    • Basic Java
    • OOP
    • Exception Handling
    • Multi-Thread
    • Java I/O
    • Networking
    • Reflection
    • Collection
    • Java 8
  • Design pattern
    • Creational Pattern
    • Structuaral Pattern
    • Behavior Pattern
  • Web Service
    • SOAP
    • REST
  • JPA
  • Java library
    • Report
    • Json
    • Unit Test
  • Message Queue
    • ActiveMQ
    • RabbitMQ
  • All
Trang chủ Java Core Multi-Thread Vấn đề Nhà sản xuất (Producer) – Người tiêu dùng (Consumer) và đồng bộ hóa các luồng trong Java

Vấn đề Nhà sản xuất (Producer) – Người tiêu dùng (Consumer) và đồng bộ hóa các luồng trong Java

Đăng vào 23/09/2019 . Được đăng bởi GP Coder . 14771 Lượt xem . Toàn màn hình

Producer/ Consumer là một ví dụ kinh điển về vấn đề đồng hóa các luồng (multi-threading synchronization). Trong bài này tôi sẽ giới thiệu với các bạn vấn đề này và cách giải quyết để giúp các bạn hiểu rõ hơn về Java concurrency và mutli-threading.

Nội dung

  • 1 Mô tả vấn đề Producer/ Consumer
  • 2 Giải pháp để giải quyết vấn đề Producer/ Consumer
  • 3 Tại sao vấn đề Producer/ Consumer lại quan trọng?

Mô tả vấn đề Producer/ Consumer

Vấn đề mô tả hai đối tượng nhà sản xuất (Producer) và người tiêu dùng (Consumer), cả hai cùng chia sẻ một bộ đệm có kích thước cố định được sử dụng như một hàng đợi (queue).

Producer: công việc của nhà sản xuất là tạo dữ liệu, đưa nó vào bộ đệm và bắt đầu lại.

Consumer: công việc người tiêu dùng là tiêu thụ dữ liệu (nghĩa là loại bỏ nó khỏi bộ đệm), từng phần một và xử lý nó. Consumer và Producer hoạt động song song với nhau.

Vấn đề là đảm bảo rằng nhà sản xuất không thể thêm dữ liệu vào bộ đệm nếu nó đầy và người tiêu dùng không thể xóa dữ liệu khỏi bộ đệm trống, đồng thời đảm bảo an toàn cho luồng (thread-safe).

Giải pháp để giải quyết vấn đề Producer/ Consumer

Ý tưởng:

  • Giải pháp cho nhà sản xuất là đi ngủ (wait) nếu bộ đệm đầy. Lần tiếp theo người tiêu dùng xóa một mục khỏi bộ đệm, nó đánh thức (notify) nhà sản xuất, bắt đầu đưa dữ liệu vào bộ đệm.
  • Theo cách tương tự, người tiêu dùng có thể đi ngủ (wait) nếu thấy bộ đệm trống. Lần tiếp theo nhà sản xuất đưa dữ liệu vào bộ đệm, nó đánh thức (notify) người tiêu dùng đang ngủ (wait).
  • Trong khi làm tất cả điều này, phải đảm bảo an toàn cho luồng (thread safe).

Có nhiều giải pháp cho vấn đề Producer Consumer trong Java:

  • Sử dụng synchronized.
  • Sử dụng BlockingQueue.
  • Sử dụng Semaphore.
  • Sử dụng JMS (Java Messaging Service): JMS là một implementation của vấn đề Producer Consumer. Nhiều nhà sản xuất và nhiều người tiêu dùng có thể kết nối với JMS và phân phối công việc. Tôi sẽ giới thiệu với các bạn JMS ở một bài viết khác.

Trong bài này, tôi sẽ giới thiệu với các bạn cách sử dụng BlockingQueue và Semaphore.

Sử dụng BlockingQueue

Để giải quyết vấn đề, chúng ta sẽ cần 3 class:

  • Blocking Queue :
    • Sử dụng synchronized để đảm bảo thread-safe.
    • put() : sử dụng wait() để chờ khi queue đã đầy (full), và notifyAll() để thông báo khi thêm data mới vào queue.
    • take() : sử dụng wait() để chờ khi queue rỗng (empty), và notifyAll() để thông báo khi lấy data ra khỏi queue.
  • Producer Thread(s) : các thread mô phỏng các nhà sản xuất.
  • Consumer Thread(s) : các thread mô phỏng các người tiêu dùng.

Các bạn có thể sử dụng BlockingQueue có sẵn trong package java.util.concurrent . Trong bài này, tôi sẽ tự tạo một BlockingQueue để mô phỏng cơ chế hoạt động của Blocking Queue cho các bạn dễ hiểu.

BlockingQueue.java


package com.gpcoder.producerconsumer;

import java.util.LinkedList;

public class BlockingQueue<T> {
	private static final int compacity = 10;
	private final LinkedList<T> items = new LinkedList<>();

	public synchronized void put(T value) throws InterruptedException {
		while (items.size() == compacity) {
			System.out.println("Queue is full");
			wait();
		}
		items.addLast(value);
		notifyAll();
	}

	public synchronized T take() throws InterruptedException {
		while (items.size() == 0) {
			System.out.println("Queue is empty");
			wait();
		}
		notifyAll();
		return items.removeFirst();
	}
	
	public synchronized int size() {
		return items.size();
	}
}

Producer.java


package com.gpcoder.producerconsumer;

import java.util.concurrent.ThreadLocalRandom;

public class Producer implements Runnable {

	private final BlockingQueue<Integer> queue;

	Producer(BlockingQueue<Integer> queue) {
		this.queue = queue;
	}

	public void run() {
		try {
			while (true) {
				queue.put(produce());
				System.out.println("Produced resource - Queue size() = "  + queue.size());
			}
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	}

	private Integer produce() throws InterruptedException {
		Thread.sleep(50); // simulate time to produce the data
		return ThreadLocalRandom.current().nextInt(1, 100);
	}
}

Consumer.java


package com.gpcoder.producerconsumer;

import java.util.concurrent.ThreadLocalRandom;

public class Consumer implements Runnable {

	private final BlockingQueue<Integer> queue;

	Consumer(BlockingQueue<Integer> queue) {
		this.queue = queue;
	}

	public void run() {
		try {
			while (true) {
				queue.take();
				System.out.println("Consumed resource - Queue size() = " + queue.size());
				Thread.sleep(ThreadLocalRandom.current().nextInt(50, 300)); // simulate time passing
			}
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	}
}

Main.java


package com.gpcoder.producerconsumer;

public class Main {

	public static void main(String[] args) throws InterruptedException {
		BlockingQueue<Integer> boundedBuffer = new BlockingQueue<>();

		Producer producer = new Producer(boundedBuffer);
		Consumer consumer1 = new Consumer(boundedBuffer);
		Consumer consumer2 = new Consumer(boundedBuffer);
		Consumer consumer3 = new Consumer(boundedBuffer);

		new Thread(producer).start();
		new Thread(consumer1).start();
		new Thread(consumer2).start();
		new Thread(consumer3).start();
		
		Thread.sleep(5000); // After 5s have another comsumer
		Consumer consumer4 = new Consumer(boundedBuffer);
		new Thread(consumer4).start();
	}
}

Output chương trình:


Queue is empty
Queue is empty
Queue is empty
Queue is empty
Produced resource - Queue size() = 0
Consumed resource - Queue size() = 0
Queue is empty
Produced resource - Queue size() = 0
Consumed resource - Queue size() = 0
Produced resource - Queue size() = 1
Consumed resource - Queue size() = 0
Queue is empty
Produced resource - Queue size() = 1
Consumed resource - Queue size() = 0
....
Produced resource - Queue size() = 10
Queue is full
Consumed resource - Queue size() = 9
Produced resource - Queue size() = 10
Queue is full
Consumed resource - Queue size() = 9
Produced resource - Queue size() = 10
Consumed resource - Queue size() = 9
....
Produced resource - Queue size() = 10
Consumed resource - Queue size() = 9
Consumed resource - Queue size() = 8
Produced resource - Queue size() = 9
Consumed resource - Queue size() = 8
...
Consumed resource - Queue size() = 1
Produced resource - Queue size() = 2
Consumed resource - Queue size() = 1
Consumed resource - Queue size() = 0
Queue is empty
Produced resource - Queue size() = 1
Consumed resource - Queue size() = 0
...

Sử dụng Semaphore

Tương tự như BlockingQueue ở trên, với Semaphore chúng ta cũng cần 3 class:

  • Queue : sử dụng 2 Semaphore để đồng bộ hóa dữ liệu.
    • Semaphore Producer: được set giá trị là maximum của buffer size. Giá trị này tương ứng với số lượng item có thể được thêm vào buffer.
    • Semaphore Consumer: được set giá trị là 0. Giá trị này tương ứng với số lượng item có thể được lấy ra khỏi buffer.
  • Producer Thread(s) : các thread mô phỏng các nhà sản xuất.
  • Consumer Thread(s) : các thread mô phỏng các người tiêu dùng.

Ví dụ:

Để dễ theo dõi, tôi sẽ thêm một số dòng log để xem cách hoạt động của Semaphore.


package com.gpcoder.producerconsumer.semaphore;

import java.util.Stack;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadLocalRandom;

public class ProducerConsumerSemaphore {

	private static final int BUFFER_SIZE = 4;
	private final Semaphore writePermits = new Semaphore(BUFFER_SIZE);
	private final Semaphore readPermits = new Semaphore(0);
	private final Stack<Integer> buffer = new Stack<>();

	class Producer implements Runnable {
		private String name;

		public Producer(String name) {
			this.name = name;
		}

		@Override
		public void run() {
			try {
				while (true) {
					System.out.println(name + ": acquiring lock...");
					System.out.println(name + ": Producer available Semaphore permits now: " + writePermits.availablePermits());
					writePermits.acquire();
					System.out.println(name + ": got the permit!");

					Thread.sleep(50); // simulate time to work
					int data = ThreadLocalRandom.current().nextInt(100);
					System.out.println(name + ": produced data " + buffer.push(data));

					System.out.println(name + ": releasing lock...");
					readPermits.release();
					System.out.println(name + ": Consumer available Semaphore permits now: " + readPermits.availablePermits());
				}
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
	}

	class Consumer implements Runnable {
		private String name;

		public Consumer(String name) {
			this.name = name;
		}

		@Override
		public void run() {
			try {
				while (true) {
					System.out.println(name + ": acquiring lock...");
					System.out.println(name + ": Consumer available Semaphore permits now: " + readPermits.availablePermits());
					readPermits.acquire();

					Thread.sleep(ThreadLocalRandom.current().nextInt(50, 300)); // simulate time to work
					System.out.println(name + ": consumed data " + buffer.pop());

					System.out.println(name + ": releasing lock...");
					writePermits.release();
					System.out.println(name + ": Producer available Semaphore permits now: " + writePermits.availablePermits());
				}
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
	}

	public static void main(String[] args) throws InterruptedException {
		ProducerConsumerSemaphore obj = new ProducerConsumerSemaphore();
		Producer producer = obj.new Producer("Producer 1");
		new Thread(producer).start();
		
		for (int i = 1; i <= 3; i++) {
			Consumer consumer = obj.new Consumer("Consumer " + i);
			new Thread(consumer).start();
		}
		
		Thread.sleep(5000); // After 5s have another comsumer
		Consumer consumer = obj.new Consumer("Consumer " + 4);
		new Thread(consumer).start();
	}
}

Chạy chương trình trên, ta có kết quả sau:


Producer 1: acquiring lock...
Producer 1: Producer available Semaphore permits now: 4
Producer 1: got the permit!
Consumer 1: acquiring lock...
Consumer 1: Consumer available Semaphore permits now: 0
Consumer 2: acquiring lock...
Consumer 2: Consumer available Semaphore permits now: 0
Consumer 3: acquiring lock...
Consumer 3: Consumer available Semaphore permits now: 0
Producer 1: produced data 94
Producer 1: releasing lock...
Producer 1: Consumer available Semaphore permits now: 0
Producer 1: acquiring lock...
Producer 1: Producer available Semaphore permits now: 3
Producer 1: got the permit!
Producer 1: produced data 85
Producer 1: releasing lock...
Producer 1: Consumer available Semaphore permits now: 1
Producer 1: acquiring lock...
Producer 1: Producer available Semaphore permits now: 2
...
Consumer 2: Producer available Semaphore permits now: 1
Consumer 2: acquiring lock...
Consumer 2: Consumer available Semaphore permits now: 0
Producer 1: produced data 9
Producer 1: releasing lock...
Producer 1: Consumer available Semaphore permits now: 1
Producer 1: acquiring lock...
Producer 1: Producer available Semaphore permits now: 1
Producer 1: got the permit!
...

Tại sao vấn đề Producer/ Consumer lại quan trọng?

Có thể được sử dụng để phân phối công việc giữa các worker khác nhau, dễ dàng tăng hoặc giảm theo yêu cầu. Như bạn thấy trong ví dụ trên, ban đầu một Producer có thể sản xuất đủ cho 3 Consumer. Nhưng sau đó, có thêm một Consumer thì hệ thống sản xuất không đáp ứng được, do đó chúng ta cần tăng hiệu suất của Producer lên hoặc tạo thêm một Producer khác để đủ phục vụ cho Consumer, tránh tình trạng thiếu sản phẩm.

Producer và Consumer được kết nối thông qua BlockingQueue, nó không biết sự hiện diện của nhau, tách rời các mối quan tâm (separation of concern), giúp hệ thống thống có thiết kế tốt hơn, nên dễ dàng nâng cấp và mở rộng.

Producer và Consumer không cần phải có sẵn cùng một lúc. Consumer có thể nhận các nhiệm vụ được sản xuất bởi Producer tại một thời điểm khác nhau.

Tài liệu tham khảo:

  • https://en.wikipedia.org/wiki/Producer%E2%80%93consumer_problem
  • https://courses.washington.edu/cp105/06_Synchronization_Patterns/Synchronization%20Patterns.html
  • https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/BlockingQueue.html
  • https://stackoverflow.com/questions/8288479/how-to-solve-the-producer-consumer-using-semaphores
5.0
26
Nếu bạn thấy hay thì hãy chia sẻ bài viết cho mọi người nhé! Và Donate tác giả

Shares

Chuyên mục: Java Core, Multi-Thread Được gắn thẻ: Multithreading

Semaphore trong Java
Refactoring Design Pattern với tính năng mới trong Java 8

Có thể bạn muốn xem:

  • Sử dụng CyclicBarrier trong Java (13/03/2018)
  • Lập trình đa luồng trong Java (Java Multi-threading) (12/02/2018)
  • Tránh lỗi NullPointerException trong Java như thế nào? (06/08/2018)
  • Sử dụng CountDownLatch trong Java (09/03/2018)
  • Marker Interface trong Java (30/03/2020)

Bình luận

bình luận

Tìm kiếm

Bài viết mới

  • Clean code 13/01/2024
  • Giới thiệu CloudAMQP – Một RabbitMQ server trên Cloud 02/10/2020
  • Kết nối RabbitMQ sử dụng Web STOMP Plugin 19/06/2020
  • Sử dụng publisher confirm trong RabbitMQ 16/06/2020
  • Sử dụng Dead Letter Exchange trong RabbitMQ 13/06/2020

Xem nhiều

  • Hướng dẫn Java Design Pattern – Factory Method (98059 lượt xem)
  • Hướng dẫn Java Design Pattern – Singleton (97700 lượt xem)
  • Giới thiệu Design Patterns (87764 lượt xem)
  • Lập trình đa luồng trong Java (Java Multi-threading) (86434 lượt xem)
  • Giới thiệu về Stream API trong Java 8 (83839 lượt xem)

Nội dung bài viết

  • 1 Mô tả vấn đề Producer/ Consumer
  • 2 Giải pháp để giải quyết vấn đề Producer/ Consumer
  • 3 Tại sao vấn đề Producer/ Consumer lại quan trọng?

Lưu trữ

Thẻ đánh dấu

Annotation Authentication Basic Java Behavior Pattern Collection Creational Design Pattern Cấu trúc điều khiển Database Dependency Injection Design pattern Eclipse Exception Executor Service Google Guice Gson Hibernate How to Interceptor IO Jackson Java 8 Java Core JDBC JDK Jersey JMS JPA json JUnit JWT Message Queue Mockito Multithreading OOP PowerMockito RabbitMQ Reflection Report REST SOAP Structuaral Pattern Swagger Thread Pool Unit Test Webservice

Liên kết

  • Clean Code
  • JavaTpoint
  • Refactoring Guru
  • Source Making
  • TutorialsPoint
  • W3Schools Online Web Tutorials

Giới thiệu

GP Coder là trang web cá nhân, được thành lập với mục đích lưu trữ, chia sẽ kiến thức đã học và làm việc của tôi. Các bài viết trên trang này chủ yếu về ngôn ngữ Java và các công nghệ có liên quan đến Java như: Spring, JSF, Web Services, Unit Test, Hibernate, SQL, ...
Hi vọng góp được chút ít công sức cho sự phát triển cộng đồng Coder Việt.

Donate tác giả

Tìm kiếm các bài viết của GP Coder với Google Search

Liên hệ

Các bạn có thể liên hệ với tôi thông qua:
  • Trang liên hệ
  • Linkedin: gpcoder
  • Email: contact@gpcoder.com
  • Skype: ptgiang56it

Follow me

Copyright 2025 © GP Coder · All Rights Reserved · Giới thiệu · Chính sách · Điều khoản · Liên hệ ·

Share

Blogger
Delicious
Digg
Email
Facebook
Facebook messenger
Flipboard
Google
Hacker News
Line
LinkedIn
Mastodon
Mix
Odnoklassniki
PDF
Pinterest
Pocket
Print
Reddit
Renren
Short link
SMS
Skype
Telegram
Tumblr
Twitter
VKontakte
wechat
Weibo
WhatsApp
X
Xing
Yahoo! Mail

Copy short link

Copy link