您的位置 首页 java

并发编程 | 5.3 fork/join 合并任务的执行结果

Fork /join 框架可以执行任务并返回结果。这类任务必须实现RecursiveTask类,该类继承了ForkJoinTask类,并实现了 Executor 框架的Future接口。

在该任务内部,开发者需要使用 java API 文档推荐的格式:

 if (problem size > size){
  tasks=Divide(task);
  execute(tasks);
  joinResults()
  return result;
} else {
  resolve problem;
  return result;
}  

如果该任务需要解决的问题规模超过了预定义的大小,则需要分割任务为更多个子任务,并用 fork/join 框架执行它们。任务执行完毕后,初始任务会收集全部子任务生成的返回结果,并组成最终返回结果。最后,当池中的初始任务执行完毕时,可以获取到初始任务的执行结果,即整个问题的返回结果。

本节将会通过开发一个在文档中查找单词的应用,来了解如何用 fork/join 框架解决这类问题。需要实现如下两类任务。

  • 文档任务,在一个文档的一个行集合中查找一个单词。
  • 行任务,在文档的一部分中查找一个单词。

所有任务都返回给定单词在文档或者行中出现的次数。本节将使用 Java 并发 API 提供的默认的 fork/join 池开发。

 import java.util.Random;

/**
 * @Author wj
 * @Description 该类通过生成一个 字符串 二维数组 来模拟一份文档
 * @Date 15:05 2022/5/30
 **/public class DocumentMock {
    //创建一个由多个单词组成的字符串型数组。该数组用于生成字符串型二维数组
     private  String words[] = {"the", "hello", "goodbye", "packt",
            "java", "thread", "pool", "random",
            "class", "main"};

    /**
     * @return java.lang.String[][]
     * @Author wj
     * @Description 该方法接收行数、每行的单词数和要搜索的单词作为参数,并返回一个字符串型二维数组
     * @Date 15:06 2022/5/30
     * @Param [numLines, numWords, word]
     **/    public String[][] generateDocument(int numLines, int numWords,
                                       String word) {
        int counter = 0;
        String document[][] = new String[numLines][numWords];
        Random random = new Random();
        for (int i = 0; i < numLines; i++) {
            for (int j = 0; j < numWords; j++) {
                int index = random.nextInt(words.length);
                document[i][j] = words[index];
                if (document[i][j].equals(word)) {
                    counter++;
                }
            }
        }
        System.out.println("DocumentMock: The word appears " + counter + " times in the document");
        return document;
    }
}  
 import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Execution Exception ;
import java.util.concurrent.RecursiveTask;

//该类实现了查询指定单词在行集合中出现次数的任务
public class DocumentTask  extends  RecursiveTask<Integer> {
    private String document[][];
    private int start, end;
    private String word;

    public DocumentTask(String document[][], int start, int end,
                        String word) {
        this.document = document;
        this.start = start;
        this.end = end;
        this.word = word;
    }

    //实现 compute ()方法。如果end和start属性的差小于10,则该任务直接计算通过调用processLines()方法获得的行中,给定单词出现的次数:
    //否则,分割行为两组,并创建两个新的DocumentTask对象来处理这两组行,并在池中调用invokeAll()方法来执行它们
    @Override
    protected  Integer  compute() {
        Integer result = null;
        if (end - start < 10) {
            result = processLines(document, start, end, word);
        } else {
            int mid = (start + end) / 2;
            DocumentTask task1 = new DocumentTask(document, start, mid, word);
            DocumentTask task2 = new DocumentTask(document, mid, end, word);
            invokeAll(task1, task2);
            try {
                //将两个任务返回的值相加,并返回任务执行的最终结果
                result = groupResults(task1.get(), task2.get());
            } catch (InterruptedException | ExecutionException e) {
                e.printStackTrace();
            }
        }
        return result;
    }

    private Integer processLines(String[][] document, int start,
                                 int end, String word) {
        List<LineTask> tasks = new ArrayList<LineTask>();
        for (int i = start; i < end; i++) {
            LineTask task = new LineTask(document[i], 0,
                    document[i].length, word);
            tasks.add(task);
        }
        invokeAll(tasks);
        int result = 0;
        for (int i = 0; i < tasks.size(); i++) {
            LineTask task = tasks.get(i);
            try {
                result = result + task.get();
            } catch (InterruptedException | ExecutionException e) {
                e.printStackTrace();
            }
        }
        return result;
    }

    private Integer groupResults(Integer  number 1, Integer number2) {
        Integer result;
        result = number1 + number2;
        return result;
    }
}  
 import java.util.concurrent.ExecutionException;
import java.util.concurrent.RecursiveTask;

//该类将计算一行中给定单词出现的次数
public class LineTask extends RecursiveTask<Integer> {
    private String line[];
    private int start, end;
    private String word;

    public LineTask(String line[], int start, int end, String word) {
        this.line = line;
        this.start = start;
        this.end = end;
        this.word = word;
    }
    //如果end和start属性的差小于100,则该任务直接调用count()方法并在行中start和end范围内搜索给定单词
    //否则,将行分割为两组,并创建两个新的LineTask对象来处理这两组行,并在池中调用invokeAll()方法来执行它们
    @Override
    protected Integer compute() {
        Integer result = null;
        if (end - start < 100) {
            result = count(line, start, end, word);
        } else {
            int mid = (start + end) / 2;
            LineTask task1 = new LineTask(line, start, mid, word);
            LineTask task2 = new LineTask(line, mid, end, word);
            invokeAll(task1, task2);
            try {
                result = groupResults(task1.get(), task2.get());
            } catch (InterruptedException | ExecutionException e) {
                e.printStackTrace();
            }
        }
        return result;
    }

    private Integer count(String[] line, int start, int end,
                          String word) {
        int counter;
        counter = 0;
        for (int i = start; i < end; i++) {
            if (line[i].equals(word)) {
                counter++;
            }
        }
        try {
            Thread.sleep(10);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return counter;
    }
    //将两个任务返回的值相加,并返回任务的最终结果
    private Integer groupResults(Integer number1, Integer number2) {
        Integer result;
        result = number1 + number2;
        return result;
    }
}  
 import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeUnit;

public class Main {
    public  static   void  main(String[] args) {
        //用DocumentMock类来创建一个包含 100 行,每行有 1000 个单词的文档
        Document Mock  mock = new DocumentMock();
        String[][] document = mock.generateDocument(100, 1000, "the");
        DocumentTask task = new DocumentTask(document, 0, 100, "the");
        //调用commonPool()方法获取默认的ForkJoinPool执行器,并调用execute()方法执行该任务
        ForkJoinPool commonPool = ForkJoinPool.commonPool();
        commonPool.execute(task);
        do {
            System.out.printf("******************************************n");
            System.out.printf("Main: Active Threads: %dn",
                    commonPool.getActiveThreadCount());
            System.out.printf("Main: Task Count: %dn",
                    commonPool.getQueuedTaskCount());
            System.out.printf("Main: Steal Count: %dn",
                    commonPool.getStealCount());
            System.out.printf("******************************************n");
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        } while (!task.isDone());
        //调用shutdown()方法关闭池
        commonPool.shutdown();
        try {
            //调用awaitTermination()方法等待程序执行完成:
            commonPool.awaitTermination(1, TimeUnit.DAYS);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        try {
            System.out.printf("Main: The word appears %d in the document", task.get());
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }
    }
}
  

结果分析

本案例实现了如下两个任务。

并发编程 | 5.3 fork/join 合并任务的执行结果

  • DocumentTask:该类根据start和end属性处理了对应文档中的行集合。如果行数小于10,则对每行创建LineTask对象,当任务执行完毕后,汇总运行结果并返回。如果任务对应的行集合大于等于10,则将集合分割为两个子集,并创建两个DocumentTask对象来处理这些新集合。当这些任务执行完毕后,汇总子任务的结果并返回。
  • LineTask:该类处理文档中一行的单词。如果单词个数小于 100,则任务直接在单词集合中搜索单词并返回单词的出现次数;否则,将单词集合分割成两个子集并创建两个LineTask对象来处理这些集合。当这些任务执行完毕后,汇总子任务的结果并返回。
  • 在Main类中,我们在DocumentTask类中执行默认的ForkJoinPool(调用 静态方法 commonPool()来获取),该DocumentTask类处理每行 1000 个单词共100行的文档。该任务会使用其他的DocumentTask对象和LineTask对象分割任务,以便这些任务执行完毕后,可以使用初始任务来获取整个文档中单词出现的次数。
  • 为了让任务返回执行结果,这些任务继承了RecursiveTask类。调用get()方法可获取任务的返回值,该方法在Future接口中声明,在RecursiveTask类中实现。
  • 运行程序时,你可以对比控制台输出的第一行和最后一行。第一行是文档生成时统计的单词出现次数,最后一行是 fork/join 任务统计的单词出现次数。

其他说明

  • ForkJoinTask类提供了另一个方法来完成执行一个任务并返回结果,即complete()方法。该方法接收一个RecursiveTask的 泛型 对象,并在调用join()方法后,返回任务执行结果的对象。
  • 因为RecursiveTask类实现了Future接口,get()方法有另一种版本的实现:
  • get(long timeout, TimeUnit unit):此版本的get()方法会在任务结果未返回时,等待一段指定的时间。如果指定时间结束后仍未返回结果,该方法返回一个 null 值。TimeUnit类对象是一个枚举类,包括DAYS、HOURS、MICROSECONDS、MILLISECONDS、MINUTES、NANOSECONDS和SECONDS。

文章来源:智云一二三科技

文章标题:并发编程 | 5.3 fork/join 合并任务的执行结果

文章地址:https://www.zhihuclub.com/197141.shtml

关于作者: 智云科技

热门文章

网站地图