java 7中新加了ForkJoinPool,ForkJoinPool 和 ExecutorService 非常相似,但是有一点不同, ForkJoinPool使任务能够很容易地将它们的工作拆分成更小的任务,然后再提交给ForkJoinPool 。 任务可以继续将其工作拆分为更小的子任务,只要它能够拆分任务,这听起来有点抽象,下面会解释 ForkJoinPool 怎么工作,以及怎么拆分任务。
Fork/Join解释
在了解 ForkJoinPool直接,我想先解释一下fork/join得工作原理。Fork/join原理包括两个步骤,这两个步骤是递归执行的,两个步骤分别是Fork和Join.
Fork
一个任务使用Fork/ join原则可以把自己分叉(分离)成更小的可以并发执行的子任务,下面图说明了:
通过把自己分成多个子任务,每个子任务可以通过不同的CPU并发执行, 或者是同一个CPU中的不同 线程 。一个任务只会将自己拆分为子任务,前提是该任务的工作量足够大,这样做是有意义的。将任务拆分为子任务有一个开销,因此对于少量的工作,这个开销可能大于并发执行子任务所获得的加速。将任务分成子任务有意义的时间限制也称为 阈值 ,由每个任务来决定一个合理的阈值,这在很大程度上取决于所做的工作。
Join
当一个任务分成多个子任务,这个任务直到他的所有子任务完成才会完成,一旦所有子任务完成,就要合并所有子任务的结果,下图解释这一点:
当然,并不是所有的任务都会返回结果,如果没有返回结果,只需要等待子任务全部完成。
ForkJoinPool
ForkJoinPool是一个专门设计用于Fork/join的线程池. ForkJoinPool在
java.util.concurrent 包中, 全名 java.util.concurrent.ForkJoinPool.
创建ForkJoinPool
用构造函创建 ForkJoinPool,作为ForkJoinPool 构造函数 的参数,可以传递所需的指定并行级别,并行级别指示在传递给ForkJoinPool的任务上要同时处理多少线程或cpu。下面是创建 ForkJoinPool的例子:
ForkJoinPool forkJoinPool = new ForkJoinPool(4);
这个例子创建了一个并行级别为4的ForkJoinPool 。
提交任务到ForkJoinPool
提交任务到ForkJoinPool类似于提交任务到 ExecutorService. 可以提交两种类型的任务,一种是返回任何结果 (an “action”),另一种是返回结果(a “task”). 这两种类型的代表分别是RecursiveAction和RecursiveTask。下面会讲述如何使用和提交这两种类型。
RecursiveAction
RecursiveAction是不返回任何结果的任务,他只负责做工作,例如写数据到磁盘,然后退出。RecursiveAction仍然需要将任务分成更小的分支通过不同的线程或者任务并发执行 。通过子类继承 RecursiveAction,下面是例子:
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.RecursiveAction;
public class MyRecursiveAction extends RecursiveAction {
private Long workLoad = 0;
public MyRecursiveAction(long workLoad) {
this.workLoad = workLoad;
}
@Override
protected void compute() {
//if work is above threshold, break tasks up into smaller tasks
if(this.workLoad > 16) {
System.out.println("Splitting workLoad : " + this.workLoad);
List<MyRecursiveAction> subtasks =
new ArrayList<MyRecursiveAction>();
subtasks.addAll(createSubtasks());
for(RecursiveAction subtask : subtasks){
subtask.fork();
}
} else {
System.out.println("Doing workLoad myself: " + this.workLoad);
}
}
private List<MyRecursiveAction> createSubtasks() {
List<MyRecursiveAction> subtasks =
new ArrayList<MyRecursiveAction>();
MyRecursiveAction subtask1 = new MyRecursiveAction(this.workLoad / 2);
MyRecursiveAction subtask2 = new MyRecursiveAction(this.workLoad / 2);
subtasks.add(subtask1);
subtasks.add(subtask2);
return subtasks;
}
}
这例子非常简单,MyRecursiveAction只是将一个虚拟的workLoad 作为其构造函数的参数,如果工作负载高于某个阈值,则将工作拆分为子任务,这些子任务也被计划执行(通过子任务的.fork()方法)。如果工作负载低于某个阈值,则该工作由MyRecursiveAction本身执行。
您可以计划MyRecursiveAction执行,如下所示:
MyRecursiveAction myRecursiveAction = new MyRecursiveAction(24);
forkJoinPool.invoke(myRecursiveAction);
RecursiveTask
RecursiveTask是一类返回结果的任务,可以将任务分割成更小的任务,同时可以把这些结果合并成集合,分裂和合并可能发生在几个层面,下面是例子:
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.RecursiveTask;
public class MyRecursiveTask extends RecursiveTask<Long> {
private long workLoad = 0;
public MyRecursiveTask(long workLoad) {
this.workLoad = workLoad;
}
protected Long compute() {
//if work is above threshold, break tasks up into smaller tasks
if(this.workLoad > 16) {
System.out.println("Splitting workLoad : " + this.workLoad);
List<MyRecursiveTask> subtasks =
new ArrayList<MyRecursiveTask>();
subtasks.addAll(createSubtasks());
for(MyRecursiveTask subtask : subtasks){
subtask.fork();
}
long result = 0;
for(MyRecursiveTask subtask : subtasks) {
result += subtask.join();
}
return result;
} else {
System.out.println("Doing workLoad myself: " + this.workLoad);
return workLoad * 3;
}
}
private List<MyRecursiveTask> createSubtasks() {
List<MyRecursiveTask> subtasks =
new ArrayList<MyRecursiveTask>();
MyRecursiveTask subtask1 = new MyRecursiveTask(this.workLoad / 2);
MyRecursiveTask subtask2 = new MyRecursiveTask(this.workLoad / 2);
subtasks.add(subtask1);
subtasks.add(subtask2);
return subtasks;
}
}
这个例子和 RecursiveAction 非常相似,除了返回结果。 MyRecursiveTask继承了 RecursiveTask<Long> 意思这个任务的返回结果是Long.
MyRecursiveTask例子同样将任务分解成子任务, 使用fork()方法计划执行这些子任务。另外,这个例子通过每个子任务调用join()方法来接受返回结果,子任务的结果合并成大的任务结果返回, 这种子任务结果的连接/合并可能在多个递归级别上递归发生。
可以像这样调度递归任务:
MyRecursiveTask myRecursiveTask = new MyRecursiveTask(128);
long mergedResult = forkJoinPool.invoke(myRecursiveTask);
System.out.println("mergedResult = " + mergedResult);
注意怎么从最后的通知里出来的ForkJoinPool.invoke()方法调用
参考: