您的位置 首页 java

JAVA并发-Fork/Join使用ForkJoinPool

java 7中新加了ForkJoinPool,ForkJoinPool 和 ExecutorService 非常相似,但是有一点不同, ForkJoinPool使任务能够很容易地将它们的工作拆分成更小的任务,然后再提交给ForkJoinPool 任务可以继续将其工作拆分为更小的子任务,只要它能够拆分任务,这听起来有点抽象,下面会解释 ForkJoinPool 怎么工作,以及怎么拆分任务。

Fork/Join解释

在了解 ForkJoinPool直接,我想先解释一下fork/join得工作原理。Fork/join原理包括两个步骤,这两个步骤是递归执行的,两个步骤分别是Fork和Join.

Fork

一个任务使用Fork/ join原则可以把自己分叉(分离)成更小的可以并发执行的子任务,下面图说明了:

通过把自己分成多个子任务,每个子任务可以通过不同的CPU并发执行, 或者是同一个CPU中的不同 线程 。一个任务只会将自己拆分为子任务,前提是该任务的工作量足够大,这样做是有意义的。将任务拆分为子任务有一个开销,因此对于少量的工作,这个开销可能大于并发执行子任务所获得的加速。将任务分成子任务有意义的时间限制也称为 阈值 ,由每个任务来决定一个合理的阈值,这在很大程度上取决于所做的工作。

Join

当一个任务分成多个子任务,这个任务直到他的所有子任务完成才会完成,一旦所有子任务完成,就要合并所有子任务的结果,下图解释这一点:

当然,并不是所有的任务都会返回结果,如果没有返回结果,只需要等待子任务全部完成。

ForkJoinPool

ForkJoinPool是一个专门设计用于Fork/join的线程池. ForkJoinPool在

java.util.concurrent 包中, 全名 java.util.concurrent.ForkJoinPool.

创建ForkJoinPool

用构造函创建 ForkJoinPool,作为ForkJoinPool 构造函数 的参数,可以传递所需的指定并行级别,并行级别指示在传递给ForkJoinPool的任务上要同时处理多少线程或cpu。下面是创建 ForkJoinPool的例子:

 ForkJoinPool forkJoinPool = new ForkJoinPool(4);  

这个例子创建了一个并行级别为4的ForkJoinPool 。

提交任务到ForkJoinPool

提交任务到ForkJoinPool类似于提交任务到 ExecutorService. 可以提交两种类型的任务,一种是返回任何结果 (an “action”),另一种是返回结果(a “task”). 这两种类型的代表分别是RecursiveAction和RecursiveTask。下面会讲述如何使用和提交这两种类型。

RecursiveAction

RecursiveAction是不返回任何结果的任务,他只负责做工作,例如写数据到磁盘,然后退出。RecursiveAction仍然需要将任务分成更小的分支通过不同的线程或者任务并发执行 。通过子类继承 RecursiveAction,下面是例子:

 import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.RecursiveAction;
 
public class MyRecursiveAction extends RecursiveAction {
 
    private  Long  workLoad = 0;
 
    public MyRecursiveAction(long workLoad) {
        this.workLoad = workLoad;
    }
 
    @Override
    protected void compute() {
 
        //if work is above threshold, break tasks up into smaller tasks
        if(this.workLoad > 16) {
            System.out.println("Splitting workLoad : " + this.workLoad);
 
            List<MyRecursiveAction> subtasks =
                new ArrayList<MyRecursiveAction>();
 
            subtasks.addAll(createSubtasks());
 
            for(RecursiveAction subtask : subtasks){
                subtask.fork();
            }
 
        } else {
            System.out.println("Doing workLoad myself: " + this.workLoad);
        }
    }
 
    private List<MyRecursiveAction> createSubtasks() {
        List<MyRecursiveAction> subtasks =
            new ArrayList<MyRecursiveAction>();
 
        MyRecursiveAction subtask1 = new MyRecursiveAction(this.workLoad / 2);
        MyRecursiveAction subtask2 = new MyRecursiveAction(this.workLoad / 2);
 
        subtasks.add(subtask1);
        subtasks.add(subtask2);
 
        return subtasks;
    }
 
}  

这例子非常简单,MyRecursiveAction只是将一个虚拟的workLoad 作为其构造函数的参数,如果工作负载高于某个阈值,则将工作拆分为子任务,这些子任务也被计划执行(通过子任务的.fork()方法)。如果工作负载低于某个阈值,则该工作由MyRecursiveAction本身执行。

您可以计划MyRecursiveAction执行,如下所示:

 MyRecursiveAction myRecursiveAction = new MyRecursiveAction(24);
 
forkJoinPool.invoke(myRecursiveAction);  

RecursiveTask

RecursiveTask是一类返回结果的任务,可以将任务分割成更小的任务,同时可以把这些结果合并成集合,分裂和合并可能发生在几个层面,下面是例子:

 import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.RecursiveTask;
    
    
public class MyRecursiveTask extends RecursiveTask<Long> {
 
    private long workLoad = 0;
 
    public MyRecursiveTask(long workLoad) {
        this.workLoad = workLoad;
    }
 
    protected Long compute() {
 
        //if work is above threshold, break tasks up into smaller tasks
        if(this.workLoad > 16) {
            System.out.println("Splitting workLoad : " + this.workLoad);
 
            List<MyRecursiveTask> subtasks =
                new ArrayList<MyRecursiveTask>();
            subtasks.addAll(createSubtasks());
 
            for(MyRecursiveTask subtask : subtasks){
                subtask.fork();
            }
 
            long result = 0;
            for(MyRecursiveTask subtask : subtasks) {
                result += subtask.join();
            }
            return result;
 
        } else {
            System.out.println("Doing workLoad myself: " + this.workLoad);
            return workLoad * 3;
        }
    }
 
    private List<MyRecursiveTask> createSubtasks() {
        List<MyRecursiveTask> subtasks =
        new ArrayList<MyRecursiveTask>();
 
        MyRecursiveTask subtask1 = new MyRecursiveTask(this.workLoad / 2);
        MyRecursiveTask subtask2 = new MyRecursiveTask(this.workLoad / 2);
 
 
        subtasks.add(subtask1);
        subtasks.add(subtask2);
 
        return subtasks;
    }
}  

这个例子和 RecursiveAction 非常相似,除了返回结果。 MyRecursiveTask继承了 RecursiveTask<Long> 意思这个任务的返回结果是Long.

MyRecursiveTask例子同样将任务分解成子任务, 使用fork()方法计划执行这些子任务。另外,这个例子通过每个子任务调用join()方法来接受返回结果,子任务的结果合并成大的任务结果返回, 这种子任务结果的连接/合并可能在多个递归级别上递归发生。

可以像这样调度递归任务:

 MyRecursiveTask myRecursiveTask = new MyRecursiveTask(128);
 
long mergedResult = forkJoinPool.invoke(myRecursiveTask);
 
System.out.println("mergedResult = " + mergedResult);      

注意怎么从最后的通知里出来的ForkJoinPool.invoke()方法调用

参考:

文章来源:智云一二三科技

文章标题:JAVA并发-Fork/Join使用ForkJoinPool

文章地址:https://www.zhihuclub.com/198333.shtml

关于作者: 智云科技

热门文章

网站地图