您的位置 首页 java

并发编程 | 5.2 如何创建一个 fork/join 池?

本节将介绍 fork /join 框架的基本功能的使用方法。其功能如下所示。

  • 创建一个ForkJoinPool对象来执行任务。
  • 在池内创建ForkJoinTask的一个子类并执行。

在本例中使用的 Fork /join 框架的主要功能如下所示。

  • 使用默认构造方法创建ForkJoinPool对象。
  • 在任务内部,使用 Java API 文档推荐的结构。
    if (problem size > default size){
     tasks=divide(task);
     execute(tasks);
   } else {
     resolve problem using another algorithm;
   }  
  • 以同步方式执行任务。当一个任务执行两个以上的子任务时,它会等待这些子任务的结束。在本案例中,执行任务的 线程 (即工作线程),会寻找其他待执行的任务,从而最大化利用执行时间。
  • 将要实现的任务不会返回任何执行结果,因此需要用RecursiveAction类作为其实现的基类。

案例实现

本节将实现一个任务来更新一组产品的价格。初始任务负责更新列表中的全部元素。如果任务更新的元素超过 10 个,则将列表分割为两部分,并创建两个任务来更新各自部分的产品价格。

 /**
 * @Author wj
 * @Description 用来存储产品的名称和价格
 * @Date 14:30 2022/5/30
 **/public class Product {
     private  String name;
    private double price;

    public String getName() {
        return name;
    }

    public  void  setName(String name) {
        this.name = name;
    }

    public double getPrice() {
        return price;
    }

    public void setPrice(double price) {
        this.price = price;
    }
}
  
 
import java.util.ArrayList;
import java.util.List;

/**
 * @Author wj
 * @Description 用来创建一个随机产品列表
 * @Date 14:31 2022/5/30
 **/public class ProductListGenerator {
    /**
     * @return java.util.List<day04.Product>
     * @Author wj
     * @Description 根据size,并返回生成的List<Product>对象:
     * @Date 14:32 2022/5/30
     * @Param [size]
     **/    public List<Product> generate(int size) {
        List<Product> ret = new ArrayList<Product>();
        for (int i = 0; i < size; i++) {
            Product product = new Product();
            product.setName("Product " + i);
            product.setPrice(10);
            ret.add(product);
        }
        return ret;
    }
}
  
 import java.util.List;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveAction;
import java.util.concurrent.TimeUnit;

public class Task  extends  RecursiveAction {
    private List<Product> products;
    private int first;
    private int last;
    //存储产品价格的 涨幅 
    private double increment;

    public Task(List<Product> products, int first, int last, double increment) {
        this.products = products;
        this.first = first;
        this.last = last;
        this.increment = increment;
    }

    @Override
    protected void compute() {
        //如果last和first属性的差小于10,则用updatePrices()方法增加产品集合中的产品价格:
        if (last - first < 10) {
            updatePrices();
        } else {
            //如果last和first属性的差距大于等于 10,则创建两个Task对象(一个用来执行前一半产品,另一个用来执行另一半产品)。在ForkJoinPool中调用invokeAll()方法来执行:
            int middle = (last + first) / 2;
            System.out.printf("Task: Pending tasks:%sn",
                    getQueuedTaskCount());
            Task t1 = new Task(products, first, middle + 1, increment);
            Task t2 = new Task(products, middle + 1, last, increment);
            invokeAll(t1, t2);
        }
    }

    private void updatePrices() {
        for (int i = first; i < last; i++) {
            Product product = products.get(i);
            product.setPrice(product.getPrice() * (1 + increment));
        }
    }

    public  static  void main(String[] args) {
        //创建一个包含 10000 个产品的列表
        ProductListGenerator generator = new ProductListGenerator();
        List<Product> products = generator.generate(10000);
        Task task = new Task(products, 0, products.size(), 0.20);
        ForkJoinPool pool = new ForkJoinPool();
        //方法执行任务
        pool.execute(task);
        //每 5ms 显示一次池中信息,在任务执行结束前打印池中参数值:
        do {
            System.out.printf("Main: Thread Count:%dn",
                    pool.getActiveThreadCount());
            System.out.printf("Main: Thread Steal:%dn",
                    pool.getStealCount());
            System.out.printf("Main: Parallelism:%dn",
                    pool.getParallelism());
            try {
                TimeUnit.MILLISECONDS.sleep(5);
            } catch (Interrupted Exception  e) {
                e.printStackTrace();
            }
        } while (!task.isDone());
        //关闭池
        pool.shutdown();
        //检查本例中的任务是否正常执行完毕,并打印日志
        if (task.isCompletedNormally()) {
            System.out.printf("Main: The process has completed normally.n");
        }
        //产品的预期价格为 12,打印出所有价格不为 12 的产品名称和价格,以检查是否全部产品价格都有正确的涨幅
        for (int i = 0; i < products.size(); i++) {
            Product product = products.get(i);
            if (product.getPrice() != 12) {
                System.out.printf("Product %s: %fn",
                        product.getName(), product.getPrice());
            }
        }
        System.out.println("Main: End of the program.n");
    }
}  

结果分析

本案例创建了一个ForkJoinPool对象和ForkJoinTask类的一个子类。其中,ForkJoinPool对象是用无参 构造方法 创建的,因此它会使用默认的配置。该对象创建了一个线程数等于处理器数的池,当ForkJoinPool对象被创建时,对应线程也被创建并在池中等待任务执行。

由于Task类继承了RecursiveAction类,因此它没有返回结果,在本节中,使用了推荐的结构来实现任务。如果任务更新超过 10 件产品,则把产品分割成两组,分别分配给两个子任务。在Task类中,first和last属性标明了需要更新的产品在产品列表中的位置。在每个任务中,使用first和last属性来获取产品列表的副本,而非另外创建一个列表。

本案例中调用invokeAll()方法创建每个任务的子任务。采用同步的方式调用该方法,且每个任务在继续执行前需要等待子任务执行完成。当任务在等待子任务执行时,工作线程会选取并执行另一个正在等待执行的任务。通过这种方式,相比Runnable或Callable来说,fork/join 框架具有更好的性能。

ForkJoinTask类中的invokeAll()方法是Executor和 fork/join 框架最主要的区别之一。在 Executor 框架中,所有任务都提交给执行器来执行。而本案例中,任务包含执行和控制池中任务的方法。调用的invoke方法来自Task类,Task类继承了RecursiveAction类,而该类继承了ForkJoinTask类。

用execute()方法提交唯一的任务到池中,从而更新列表中的全部产品。在本案例中,该方法以同步方式在主线程中调用。

此外还使用了ForkJoinPool类中的部分方法来检查运行中任务的状态和执行情况。为了更好地实现本节目的,该类还提供了其他方法,在 9.5 节中,有这些方法的完整列表。

最后,和 Executor 框架一样,调用shutdown()方法关闭ForkJoinPool。执行结果如下图所示。

从图中可以看到,任务执行完成后,输出了产品更新后的价格。

文章来源:智云一二三科技

文章标题:并发编程 | 5.2 如何创建一个 fork/join 池?

文章地址:https://www.zhihuclub.com/198434.shtml

关于作者: 智云科技

热门文章

评论已关闭

6条评论

  1. Baggott JE, Tamura T 2001 Metabolism of 10 formyldihydrofolate in humans CATEGORY 1 Clinical Scenarios for BCR After Prior Definitive Treatment with RP or RT Initial Imaging Investigation

  2. The STAR Study of Tamoxifen and Raloxifene trial, a breast cancer prevention trial, found that raloxifene currently used to prevent and treat osteoporosis in postmenopausal women works as well as tamoxifen in reducing breast cancer risk in high risk postmenopausal women Kilic- Okman T, Kucuk M, Altaner S

  3. Third International Conference and Ninth Annual Meeting of the International Society of Cancer Chemoprevention

  4. 6 LUNG DAMAGING AGENTS RELEVANT FACTORS TO CONSIDER IN MASS EXPOSURE, PRACTICAL GUIDELINES FOR DIFFERENTIAL DIAGNOSIS AND TREATMENT It will also increase the size of your prostate

  5. viagra where is merck cozaar made The overall market for HVDC technology in Europe andnorthern Africa is worth 9 billion to 13 billion euros over thecoming five years, Alstom has said, two thirds of which due tothe need to connect offshore wind farms

  6. The study, published July 25, says The results of the analysis suggest that Long Haulers COVID 19 symptoms are far more numerous than what is currently listed on the CDC s website 31 Years Old Clomid or TRT plus HCG

网站地图