GP Coder

Trang chia sẻ kiến thức lập trình Java

  • Java Core
    • Basic Java
    • OOP
    • Exception Handling
    • Multi-Thread
    • Java I/O
    • Networking
    • Reflection
    • Collection
    • Java 8
  • Design pattern
    • Creational Pattern
    • Structuaral Pattern
    • Behavior Pattern
  • Web Service
    • SOAP
    • REST
  • JPA
  • Java library
    • Report
    • Json
    • Unit Test
  • Message Queue
    • ActiveMQ
    • RabbitMQ
  • All
Trang chủ Java Core Multi-Thread Lập trình đa luồng với Callable và Future trong Java

Lập trình đa luồng với Callable và Future trong Java

Đăng vào 02/03/2018 . Được đăng bởi GP Coder . 27850 Lượt xem . Toàn màn hình

Trong bài viết Lập trình đa luồng trong Java các bạn đã biết được 2 cách để tạo một Thread trong Java: tạo 1 đối tượng của lớp được extend từ class Thread hoặc implements từ interface Runnable. Trong bài viết này tôi giới thiệu với các bạn một cách khác để tạo Thread, đó là Callable trong Java với khả năng trả về kết quả Future<T> sau khi xử lý và có thể throw Exception nếu trong quá trình xử lý tác vụ có lỗi xảy ra.

Nội dung

  • 1 Callable
  • 2 Future
  • 3 Ví dụ sử dụng Callable và Future

Callable

Callable là một interface sử dụng Java Generic để định nghĩa đối tượng sẽ trả về sau khi xử lý xong tác vụ.

Callable interface cũng giống như Runnable, ngoại trừ khác ở chỗ thay vì trả về void từ run() method của Runnable thì call() method của Callable trả về đối tượng kiểu Future<T> (bất kỳ) hoặc throw Exception.

Runnable:

public abstract void run() => kiểu trả về là void

Callable:

<T> Future<T> submit(Callable<T> task) => kiểu trả về là Future<T>

<T> Future<T> submit(Runnable<T> task) => kiểu trả về là Future<T>

Future

Dùng để lấy kết quả khi thực thi một Thread, có thể lấy Future khi submit một task vào ThreadPool. Task ở đây là một object implement Runnable hay Callable.

Một số method của Future:

  • isDone() : Kiểm tra task hoàn thành hay không?
  • cancel() : Hủy task
  • isCancelled(): Kiểm tra task bị hủy trước khi hoàn thành hay không?
  • get() : Lấy kết quả trả về của task.

Ví dụ sử dụng Callable và Future

Sử dụng phương thức submit(Callable) của ExecutorService với kiểu trả về là 1 Future<T>

Ví dụ về tính tổng bình phương của 10 số, thay vì xử lý việc tính tổng này trong Thread chính của chương trình, tôi sẽ tạo mới Thread sử dụng Callable để xử lý và nhận kết quả về thông qua Future.


package com.gpcoder.threadpool.callable;

import java.util.Random;
import java.util.concurrent.Callable;

public class CallableWorker implements Callable<Integer> {

	private int num;
	private Random ran;

	public CallableWorker(int num) {
		this.num = num;
		this.ran = new Random();
	}

	public Integer call() throws Exception {
		Thread.sleep(ran.nextInt(10) * 1000);
		int result = num * num;
		System.out.println("Complete " + num);
		return result;
	}
}

Để thực thi tác vụ của Callable, chúng ta phải submit nó vào một ThreadPool sử dụng phương thức submit() của Executor Framework. Để nhận kết quả trả về chúng ta sử dụng phương thức get() của lớp Future. Ta có chương trình như bên dưới:


package com.gpcoder.threadpool.callable;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class CallableExample {
	public static void main(String[] args) {
		// create a list to hold the Future object associated with Callable
		List<Future<Integer>> list = new ArrayList<>();

		// Get ExecutorService from Executors utility class, thread pool size is 5
		ExecutorService executor = Executors.newFixedThreadPool(5);

		Callable<Integer> callable;
		Future<Integer> future;
		for (int i = 1; i <= 10; i++) {
			callable = new CallableWorker(i);

			// submit Callable tasks to be executed by thread pool
			future = executor.submit(callable);

			// add Future to the list, we can get return value using Future
			list.add(future);
		}

		// shut down the executor service now
		executor.shutdown();

		// Wait until all threads are finish
		while (!executor.isTerminated()) {
			// Running ...
		}

		int sum = 0;
		for (Future<Integer> f : list) {
			try {
				sum += f.get();
			} catch (InterruptedException | ExecutionException e) {
				e.printStackTrace();
			}
		}

		System.out.println("Finished all threads: ");
		System.out.println("Sum all = " + sum);
	}
}

Thực thi chương trình trên, ta có kết quả như sau:


Complete 5
Complete 4
Complete 7
Complete 6
Complete 3
Complete 2
Complete 9
Complete 1
Complete 8
Complete 10
Finished all threads: 
Sum all = 385

Lưu ý: Tương tự như submit(Callable), phương thức submit(Runnable) cũng đưa vào 1 Runnable và nó trả về một đối tượng Future. Đối tượng Future có thể được sử dụng để kiểm tra nếu Runnable đã hoàn tất việc thực thi.

Sử dụng phương thức get() của Future<T> với Timeout

Phương thức get() là synchronous, do đó nó sẽ blocking chương trình của chúng ta mỗi khi đợi kết quả của Callable. Để hạn chế blocking chương trình quá lâu, chúng ta có thể sử dụng phương thức get() này với một thời gian Timeout như sau:


future.get(7, TimeUnit.SECONDS);

Lưu ý: khi sử dụng phương thức get() với Timeout có thể nhận một TimeoutException nếu thời gian thực thi của task vượt quá khoảng thời gian Timeout.

Ví dụ:


package com.gpcoder.threadpool.callable;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class CallableExample2 {
	public static final int GET_TIME_OUT = 5;
	public static final int NUM_OF_TASK = 10;

	public static void main(String[] args) throws TimeoutException, InterruptedException {
		// create a list to hold the Future object associated with Callable
		List<Future<Integer>> list = new ArrayList<>();

		// Get ExecutorService from Executors utility class, thread pool size is 5
		ExecutorService executor = Executors.newFixedThreadPool(5);

		Callable<Integer> callable;
		Future<Integer> future;
		for (int i = 1; i <= NUM_OF_TASK; i++) {
			callable = new CallableWorker(i);

			// submit Callable tasks to be executed by thread pool
			future = executor.submit(callable);

			// add Future to the list, we can get return value using Future
			list.add(future);
		}

		int sum = 0;
		for (Future<Integer> f : list) {
			try {
				// print the return value of Future, notice the output delay in console
				// because Future.get() waits for task to get completed
				int result = f.get(GET_TIME_OUT, TimeUnit.SECONDS);
				// Throw TimeoutException if the task execute over 7s
				sum += result;
				System.out.println("Result: " + result);
				System.out.println("Is completed? : " + f.isDone());
				System.out.println("Is canceled? : " + f.isCancelled());
			} catch (TimeoutException | CancellationException | InterruptedException | ExecutionException e) {
				e.printStackTrace();
			}
			System.out.println("---");
		}

		// shut down the executor service now
		executor.shutdownNow();

		// Blocks until all tasks have completed execution after a shutdown request, or
		// the timeout occurs, or the current thread is interrupted, whichever happens
		// first.
		while (!executor.awaitTermination(GET_TIME_OUT * NUM_OF_TASK * 1000, TimeUnit.SECONDS)) {
			// Running ...
		}

		System.out.println("Finished all threads: ");
		System.out.println("Sum all = " + sum);
	}
}

Thực thi chương trình trên vài lần bạn sẽ nhận được exception và kết quả như sau:


Complete 1
Result: 1
Is completed? : true
Is canceled? : false
---
Complete 6
java.util.concurrent.TimeoutException
---
	at java.util.concurrent.FutureTask.get(FutureTask.java:205)
	at com.gpcoder.threadpool.callable.CallableExample2.main(CallableExample2.java:42)
Complete 2
Complete 3
Result: 9
Is completed? : true
Is canceled? : false
---
Complete 5
Complete 10
Complete 8
Complete 4
Result: 16
Is completed? : true
Is canceled? : false
---
Result: 25
Is completed? : true
Is canceled? : false
---
Result: 36
Is completed? : true
Is canceled? : false
---
Complete 7
Result: 49
Is completed? : true
Is canceled? : false
---
Result: 64
Is completed? : true
Is canceled? : false
---
java.util.concurrent.TimeoutException
---
Result: 100
Is completed? : true
Is canceled? : false
---
	at java.util.concurrent.FutureTask.get(FutureTask.java:205)
	at com.gpcoder.threadpool.callable.CallableExample2.main(CallableExample2.java:42)
Finished all threads: 
Sum all = 300

Sử dụng phương thức invokeAny(Collection<?> extends Callable<T> tasks)

Phương thức invokeAny() trả về một Future, nhưng trả về kết quả của một trong những đối tượng Callable. Nó không đảm bảo về kết quả bạn sẽ nhận được từ callable nào, chỉ cần một trong số chúng hoàn thành. Tức là ko cần tất cả các Thread hòan thành, chỉ cần 1 task hoàn thành phương thức get() sẽ nhận được kết quả.

Nếu 1 trong số task hoàn thành hoặc ném ra 1 ngoại lệ, phần còn lại của Callable sẽ được hủy bỏ (cancelled).

Ví dụ:


package com.gpcoder.threadpool.callable;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class InvokeAnyExample {

	public static void main(String[] args) throws InterruptedException, ExecutionException {
		// Get ExecutorService from Executors utility class, thread pool size is 5
		ExecutorService executor = Executors.newFixedThreadPool(5);

		List<Callable<Integer>> callables = new ArrayList<>();
		for (int i = 1; i <= 10; i++) {
			callables.add(new CallableWorker(i));
		}

		Integer result = executor.invokeAny(callables);
		System.out.println("Result = " + result);

		executor.shutdown();
		System.out.println("Finished all threads ");
	}

}

Thực thi chương trình trên sẽ in ra các kết quả được trả về từ 1 trong 10 Callable từ danh sách callables. Mỗi lần chạy sẽ nhận được một kết quả khác nhau.

Kết quả lần 1:


Complete 4
Result = 16
Finished all threads 

Kết quả lần 2:


Complete 3
Complete 6
Result = 9
Finished all threads

Sử dụng phương thức invokeAll(Collection<?> extends Callable<T> tasks)

Phương thức invokeAll() gọi tất cả đối tượng Callable trong tập hợp. Phương thức này trả về 1 danh sách các đối tượng Future<T> mà được trả về từ việc thực thi các Callables.

Ví dụ:


package com.gpcoder.threadpool.callable;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class InvokeAllExample {

	public static void main(String[] args) throws InterruptedException, ExecutionException {
		// Get ExecutorService from Executors utility class, thread pool size is 5
		ExecutorService executor = Executors.newFixedThreadPool(5);

		List<Callable<Integer>> callables = new ArrayList<>();
		for (int i = 1; i <= 10; i++) {
			callables.add(new CallableWorker(i));
		}

		List<Future<Integer>> futures = executor.invokeAll(callables);

		int sum = 0;
		for (Future<Integer> future : futures) {
			sum += future.get();
		}
		System.out.println("Sum all = " + sum);

		executor.shutdown();
		System.out.println("Finished all threads ");
	}
}

Thực thi chương trình trên, bạn sẽ thấy tất cả các Callable đều được thực thi và kết quả được lưu vào List<Future>. Ta có kết quả như sau:


Complete 1
Complete 5
Complete 7
Complete 4
Complete 8
Complete 2
Complete 3
Complete 9
Complete 6
Complete 10
Sum all = 385
Finished all threads

Hủy bỏ một Future

Bạn có thể hủy một Future bằng cách sử dụng phương thức Future.cancel(). Khi phương thức này được gọi, nó cố gắng để hủy bỏ việc thực hiện các task và trả về true nếu nó bị hủy bỏ thành công, nếu không, nó trả về false.

Phương thức cancel() chấp nhận đối số boolean – mayInterruptIfRunning. Nếu gán giá trị true cho đối số này, Thread hiện đang thi hành task sẽ bị gián đoạn (interrupted), nếu không các task đang xử lý sẽ được phép hoàn thành.

Bạn có thể sử dụng phương thức isCancelled() để kiểm tra xem một task có bị hủy hay không. Ngoài ra, sau khi huỷ bỏ task thì phương thức isDone() sẽ luôn luôn có kết quả là true.

Ví dụ:


package com.gpcoder.threadpool.callable;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

public class FutureCancelExample {

	public static void main(String[] args) throws InterruptedException, ExecutionException {
		ExecutorService executorService = Executors.newSingleThreadExecutor();

		long startTime = System.currentTimeMillis();
		Future<Integer> future = executorService.submit(new CallableWorker(1));

		while (!future.isDone()) {
			System.out.println("Task is still working ...");
			Thread.sleep(200);

			long workingTime = (System.currentTimeMillis() - startTime);
			if (TimeUnit.SECONDS.toSeconds(workingTime) > 1000) {
				future.cancel(true);
			}
		}

		executorService.shutdown();

		if (!future.isCancelled()) {
			System.out.println("Task completed! Retrieving the result");
			System.out.println("Result = " + future.get());
		} else {
			System.out.println("Task was cancelled");
		}

		System.out.println("It will throw exception form here");
		System.out.println("Result = " + future.get());
	}

}

Thực thi chương trình trên, ta có kết quả như sau:


Task is still working ...
Task is still working ...
Task is still working ...
Task is still working ...
Task is still working ...
Task was cancelled
It will throw exception form here
Exception in thread "main" java.util.concurrent.CancellationException
	at java.util.concurrent.FutureTask.report(FutureTask.java:121)
	at java.util.concurrent.FutureTask.get(FutureTask.java:192)
	at com.gpcoder.threadpool.callable.FutureCancelExample.main(FutureCancelExample.java:37)

Như bạn thấy chương trình trên nó sẽ ném một ngoại lệ (exception), bởi vì phương thức future.get() sẽ ném CancellationException nếu task được hủy bỏ. Chúng ta có thể xử lý sự kiện này bằng cách kiểm tra Future có bị hủy bỏ hay không trước khi lấy kết quả thông qua phương thức future.isCancelled().

Trên đây là toàn bộ những kiến thức cơ bản về việc sử dụng Callable và Future với ExecutorService Framework. Hy vọng bài viết này giúp ích cho các bạn, hẹn gặp lại ở các bài viết tiếp theo.

4.8
26
Nếu bạn thấy hay thì hãy chia sẻ bài viết cho mọi người nhé! Và Donate tác giả

Shares

Chuyên mục: Multi-Thread Được gắn thẻ: Callable, Future, Multithreading

Hướng dẫn tạo và sử dụng ThreadPool trong Java
Sử dụng Fork/Join Framework với ForkJoinPool trong Java

Có thể bạn muốn xem:

  • Sử dụng CyclicBarrier trong Java (13/03/2018)
  • Vấn đề Nhà sản xuất (Producer) – Người tiêu dùng (Consumer) và đồng bộ hóa các luồng trong Java (23/09/2019)
  • Thực thi nhiều tác vụ cùng lúc như thế nào trong Java? (01/02/2019)
  • Semaphore trong Java (18/09/2019)
  • Sử dụng CountDownLatch trong Java (09/03/2018)

Bình luận

bình luận

Tìm kiếm

Bài viết mới

  • Clean code 13/01/2024
  • Giới thiệu CloudAMQP – Một RabbitMQ server trên Cloud 02/10/2020
  • Kết nối RabbitMQ sử dụng Web STOMP Plugin 19/06/2020
  • Sử dụng publisher confirm trong RabbitMQ 16/06/2020
  • Sử dụng Dead Letter Exchange trong RabbitMQ 13/06/2020

Xem nhiều

  • Hướng dẫn Java Design Pattern – Factory Method (97351 lượt xem)
  • Hướng dẫn Java Design Pattern – Singleton (96987 lượt xem)
  • Giới thiệu Design Patterns (86648 lượt xem)
  • Lập trình đa luồng trong Java (Java Multi-threading) (85486 lượt xem)
  • Giới thiệu về Stream API trong Java 8 (83024 lượt xem)

Nội dung bài viết

  • 1 Callable
  • 2 Future
  • 3 Ví dụ sử dụng Callable và Future

Lưu trữ

Thẻ đánh dấu

Annotation Authentication Basic Java Behavior Pattern Collection Creational Design Pattern Cấu trúc điều khiển Database Dependency Injection Design pattern Eclipse Exception Executor Service Google Guice Gson Hibernate How to Interceptor IO Jackson Java 8 Java Core JDBC JDK Jersey JMS JPA json JUnit JWT Message Queue Mockito Multithreading OOP PowerMockito RabbitMQ Reflection Report REST SOAP Structuaral Pattern Swagger Thread Pool Unit Test Webservice

Liên kết

  • Clean Code
  • JavaTpoint
  • Refactoring Guru
  • Source Making
  • TutorialsPoint
  • W3Schools Online Web Tutorials

Giới thiệu

GP Coder là trang web cá nhân, được thành lập với mục đích lưu trữ, chia sẽ kiến thức đã học và làm việc của tôi. Các bài viết trên trang này chủ yếu về ngôn ngữ Java và các công nghệ có liên quan đến Java như: Spring, JSF, Web Services, Unit Test, Hibernate, SQL, ...
Hi vọng góp được chút ít công sức cho sự phát triển cộng đồng Coder Việt.

Donate tác giả

Tìm kiếm các bài viết của GP Coder với Google Search

Liên hệ

Các bạn có thể liên hệ với tôi thông qua:
  • Trang liên hệ
  • Linkedin: gpcoder
  • Email: contact@gpcoder.com
  • Skype: ptgiang56it

Follow me

Copyright 2025 © GP Coder · All Rights Reserved · Giới thiệu · Chính sách · Điều khoản · Liên hệ ·

Share

Blogger
Delicious
Digg
Email
Facebook
Facebook messenger
Flipboard
Google
Hacker News
Line
LinkedIn
Mastodon
Mix
Odnoklassniki
PDF
Pinterest
Pocket
Print
Reddit
Renren
Short link
SMS
Skype
Telegram
Tumblr
Twitter
VKontakte
wechat
Weibo
WhatsApp
X
Xing
Yahoo! Mail

Copy short link

Copy link