Fork /join 框架可以执行任务并返回结果。这类任务必须实现RecursiveTask类,该类继承了ForkJoinTask类,并实现了 Executor 框架的Future接口。
在该任务内部,开发者需要使用 java API 文档推荐的格式:
if (problem size > size){
tasks=Divide(task);
execute(tasks);
joinResults()
return result;
} else {
resolve problem;
return result;
}
如果该任务需要解决的问题规模超过了预定义的大小,则需要分割任务为更多个子任务,并用 fork/join 框架执行它们。任务执行完毕后,初始任务会收集全部子任务生成的返回结果,并组成最终返回结果。最后,当池中的初始任务执行完毕时,可以获取到初始任务的执行结果,即整个问题的返回结果。
本节将会通过开发一个在文档中查找单词的应用,来了解如何用 fork/join 框架解决这类问题。需要实现如下两类任务。
- 文档任务,在一个文档的一个行集合中查找一个单词。
- 行任务,在文档的一部分中查找一个单词。
所有任务都返回给定单词在文档或者行中出现的次数。本节将使用 Java 并发 API 提供的默认的 fork/join 池开发。
import java.util.Random;
/**
* @Author wj
* @Description 该类通过生成一个 字符串 型 二维数组 来模拟一份文档
* @Date 15:05 2022/5/30
**/public class DocumentMock {
//创建一个由多个单词组成的字符串型数组。该数组用于生成字符串型二维数组
private String words[] = {"the", "hello", "goodbye", "packt",
"java", "thread", "pool", "random",
"class", "main"};
/**
* @return java.lang.String[][]
* @Author wj
* @Description 该方法接收行数、每行的单词数和要搜索的单词作为参数,并返回一个字符串型二维数组
* @Date 15:06 2022/5/30
* @Param [numLines, numWords, word]
**/ public String[][] generateDocument(int numLines, int numWords,
String word) {
int counter = 0;
String document[][] = new String[numLines][numWords];
Random random = new Random();
for (int i = 0; i < numLines; i++) {
for (int j = 0; j < numWords; j++) {
int index = random.nextInt(words.length);
document[i][j] = words[index];
if (document[i][j].equals(word)) {
counter++;
}
}
}
System.out.println("DocumentMock: The word appears " + counter + " times in the document");
return document;
}
}
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Execution Exception ;
import java.util.concurrent.RecursiveTask;
//该类实现了查询指定单词在行集合中出现次数的任务
public class DocumentTask extends RecursiveTask<Integer> {
private String document[][];
private int start, end;
private String word;
public DocumentTask(String document[][], int start, int end,
String word) {
this.document = document;
this.start = start;
this.end = end;
this.word = word;
}
//实现 compute ()方法。如果end和start属性的差小于10,则该任务直接计算通过调用processLines()方法获得的行中,给定单词出现的次数:
//否则,分割行为两组,并创建两个新的DocumentTask对象来处理这两组行,并在池中调用invokeAll()方法来执行它们
@Override
protected Integer compute() {
Integer result = null;
if (end - start < 10) {
result = processLines(document, start, end, word);
} else {
int mid = (start + end) / 2;
DocumentTask task1 = new DocumentTask(document, start, mid, word);
DocumentTask task2 = new DocumentTask(document, mid, end, word);
invokeAll(task1, task2);
try {
//将两个任务返回的值相加,并返回任务执行的最终结果
result = groupResults(task1.get(), task2.get());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
return result;
}
private Integer processLines(String[][] document, int start,
int end, String word) {
List<LineTask> tasks = new ArrayList<LineTask>();
for (int i = start; i < end; i++) {
LineTask task = new LineTask(document[i], 0,
document[i].length, word);
tasks.add(task);
}
invokeAll(tasks);
int result = 0;
for (int i = 0; i < tasks.size(); i++) {
LineTask task = tasks.get(i);
try {
result = result + task.get();
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
return result;
}
private Integer groupResults(Integer number 1, Integer number2) {
Integer result;
result = number1 + number2;
return result;
}
}
import java.util.concurrent.ExecutionException;
import java.util.concurrent.RecursiveTask;
//该类将计算一行中给定单词出现的次数
public class LineTask extends RecursiveTask<Integer> {
private String line[];
private int start, end;
private String word;
public LineTask(String line[], int start, int end, String word) {
this.line = line;
this.start = start;
this.end = end;
this.word = word;
}
//如果end和start属性的差小于100,则该任务直接调用count()方法并在行中start和end范围内搜索给定单词
//否则,将行分割为两组,并创建两个新的LineTask对象来处理这两组行,并在池中调用invokeAll()方法来执行它们
@Override
protected Integer compute() {
Integer result = null;
if (end - start < 100) {
result = count(line, start, end, word);
} else {
int mid = (start + end) / 2;
LineTask task1 = new LineTask(line, start, mid, word);
LineTask task2 = new LineTask(line, mid, end, word);
invokeAll(task1, task2);
try {
result = groupResults(task1.get(), task2.get());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
return result;
}
private Integer count(String[] line, int start, int end,
String word) {
int counter;
counter = 0;
for (int i = start; i < end; i++) {
if (line[i].equals(word)) {
counter++;
}
}
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
return counter;
}
//将两个任务返回的值相加,并返回任务的最终结果
private Integer groupResults(Integer number1, Integer number2) {
Integer result;
result = number1 + number2;
return result;
}
}
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeUnit;
public class Main {
public static void main(String[] args) {
//用DocumentMock类来创建一个包含 100 行,每行有 1000 个单词的文档
Document Mock mock = new DocumentMock();
String[][] document = mock.generateDocument(100, 1000, "the");
DocumentTask task = new DocumentTask(document, 0, 100, "the");
//调用commonPool()方法获取默认的ForkJoinPool执行器,并调用execute()方法执行该任务
ForkJoinPool commonPool = ForkJoinPool.commonPool();
commonPool.execute(task);
do {
System.out.printf("******************************************n");
System.out.printf("Main: Active Threads: %dn",
commonPool.getActiveThreadCount());
System.out.printf("Main: Task Count: %dn",
commonPool.getQueuedTaskCount());
System.out.printf("Main: Steal Count: %dn",
commonPool.getStealCount());
System.out.printf("******************************************n");
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
} while (!task.isDone());
//调用shutdown()方法关闭池
commonPool.shutdown();
try {
//调用awaitTermination()方法等待程序执行完成:
commonPool.awaitTermination(1, TimeUnit.DAYS);
} catch (InterruptedException e) {
e.printStackTrace();
}
try {
System.out.printf("Main: The word appears %d in the document", task.get());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
}
结果分析
本案例实现了如下两个任务。
- DocumentTask:该类根据start和end属性处理了对应文档中的行集合。如果行数小于10,则对每行创建LineTask对象,当任务执行完毕后,汇总运行结果并返回。如果任务对应的行集合大于等于10,则将集合分割为两个子集,并创建两个DocumentTask对象来处理这些新集合。当这些任务执行完毕后,汇总子任务的结果并返回。
- LineTask:该类处理文档中一行的单词。如果单词个数小于 100,则任务直接在单词集合中搜索单词并返回单词的出现次数;否则,将单词集合分割成两个子集并创建两个LineTask对象来处理这些集合。当这些任务执行完毕后,汇总子任务的结果并返回。
- 在Main类中,我们在DocumentTask类中执行默认的ForkJoinPool(调用 静态方法 commonPool()来获取),该DocumentTask类处理每行 1000 个单词共100行的文档。该任务会使用其他的DocumentTask对象和LineTask对象分割任务,以便这些任务执行完毕后,可以使用初始任务来获取整个文档中单词出现的次数。
- 为了让任务返回执行结果,这些任务继承了RecursiveTask类。调用get()方法可获取任务的返回值,该方法在Future接口中声明,在RecursiveTask类中实现。
- 运行程序时,你可以对比控制台输出的第一行和最后一行。第一行是文档生成时统计的单词出现次数,最后一行是 fork/join 任务统计的单词出现次数。
其他说明
- ForkJoinTask类提供了另一个方法来完成执行一个任务并返回结果,即complete()方法。该方法接收一个RecursiveTask的 泛型 对象,并在调用join()方法后,返回任务执行结果的对象。
- 因为RecursiveTask类实现了Future接口,get()方法有另一种版本的实现:
- get(long timeout, TimeUnit unit):此版本的get()方法会在任务结果未返回时,等待一段指定的时间。如果指定时间结束后仍未返回结果,该方法返回一个 null 值。TimeUnit类对象是一个枚举类,包括DAYS、HOURS、MICROSECONDS、MILLISECONDS、MINUTES、NANOSECONDS和SECONDS。