您的位置 首页 java

探索 java.util.concurrent 包的内容

分享不易,感谢关注转发

什么是并发编程?

在一个程序中,当多个 线程 似乎同时在运行时,我们就有了 并发性 。如果两个进程实际上同时运行,我们就有了 并行性

我们并没有深入探讨并发的概念,而是探索 java 并发包相关的内容。

内容

  1. java.util.concurrent

a) Executors & Future

b) 并发集合

  1. java.util.concurrent.locks
  2. java.util.concurrent.atomic

这个包中包含的主要组件是—— 执行器 队列 、定时、 同步 器和 并发集合

在本文中,我们将探讨在基于 Web 的应用程序中大量使用的两个组件——执行器和并发集合。

执行器框架中的类和接口及其层次结构如下图所示,

executor

ThreadPoolExecutor ScheduledThreadPoolExecutor 类之间的区别在于后者可用于调度任务执行,例如添加延迟等。在这个包中需要记住的另一个重要类是 Executors, 此类提供大量的工厂方法。

尝试使用接口并创建一个 ThreadPool 并通过从该池中获取线程来执行一些任务,用 Executors 工厂类中的方法来创建 线程池

 package com.example.concurrent.config;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;

public class ExecutorServiceSample {

        public  static  void main(String[] args) {
                ExecutorService executor = Executors.newScheduledThreadPool(2, new MyThreadFactory());

                // submit(Runnable task)
                executor.submit(() -> {
                        System.out.println("Implemented Runnable Interface"+Thread.currentThread().getName());
                        try {
                                Thread.sleep(10);
                        } catch (Interrupted Exception  e) {
                                e.printStackTrace();
                        }
                });

                // submit(Runnable task, T result)
                executor.submit(() -> {
                        System.out.println("Implemented Runnable Interface"+Thread.currentThread().getName());
                }, 1);

                // submit(Callable<T> task)
                executor.submit(() -> {
                        System.out.println("Implemented Callable Interface"+Thread.currentThread().getName());
                        return 1;
                });
                executor.shutdown();

        }
}

//Implement ThreadFactory and give name to thread
class MyThreadFactory implements ThreadFactory {
        static int i = 1;
        public Thread new Thread (Runnable r) {
                Thread t = new Thread(r,""+i);
                i++;
                return t;
        }
}
  

在上面的示例中,创建了一个初始大小为 2 的 ThreadPool,还提供了我们自己的 ThreadFactory 实现,它是可选的。我们可以跳过我们自己的实现,执行器将采用默认的 ThreadFactory 实现,实现自己的工厂的原因是为新线程命名并确定哪个线程正在运行我们的任务。

接着,使用 ExecutorService 的 submit() 方法执行了三个任务 请注意,它们三个都有不同的签名,任务类型可以是 Callable Runnable 类型的提交。

Runnable 和 Callable 执行类似的工作。它们都在新线程中执行任务,但 Callable 可以返回结果 ,也可以抛出 检查异常

Future 是一个带有签名 接口 Future<V> 的接口。 它代表异步执行的结果。在上面的代码示例中,executor 的提交方法返回一个 Future。

concurrent 包中很多类都实现了这个接口,来看看 Future 接口最常用的实现 CompletableFuture

  1. runAsync() : 返回一个新的 CompletableFuture。接受一个 Runnable 对象。如果您想运行代码块并且不期望返回值,请使用此方法。
  2. supplyAsync() :返回一个新的 CompletableFuture 。接受一个 Supplier 对象,并返回通过调用给定的 Supplier 获得的值。如果要运行代码块并期望返回值,请使用此方法。

请注意,这两种方法都可以选择接受 executor 实例作为参数。如果我们不为这些方法提供执行程序,则使用创建新线程 Fork JoinPool.commonPool()。

我们已经实现了自己的 ThreadPool 并且可以控制默认创建的线程数,所以我更喜欢将 executor 的实例作为参数传递。

 package com.example.concurrent.config;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;

public class ExecutorServiceSample {

        public static void main(String[] args) {
                ExecutorService executor = Executors.newScheduledThreadPool(2, new MyThreadFactory());

                //no return value
                CompletableFuture.runAsync(() -> {
                        System.out.println("Inside Runnable " + Thread.currentThread().getName());
                }, executor);
                
                //with return value
                CompletableFuture< Boolean > cf = CompletableFuture.supplyAsync(() -> {
                        System.out.println("Inside Supplier " + Thread.currentThread().getName());
                        return true;
                        
                }, executor);
                
                //get blocks the thread and waits for the future to complete
                try {
                        System.out.println("Result from supplier " + cf.get());
                } catch (InterruptedException e) {
                        e.printStackTrace();
                } catch (ExecutionException e) {
                        e.printStackTrace();
                }

                
                // shutdown  executor manually
                executor.shutdown();
        }
}

//Implement ThreadFactory and give name to thread
class MyThreadFactory implements ThreadFactory {
        static int i = 1;
        public Thread newThread(Runnable r) {
                Thread t = new Thread(r,""+i);
                i++;
                return t;
        }
}
  

3. thenAccept() :获取 CompletedFuture 任务的结果并将其作为参数传递给 Consumer 对象。如果需要基于 Future 对象的结果运行任务并且不希望从该任务返回结果,请使用此方法。

4. thenApply() :获取 CompletedFuture 任务的结果并将其作为参数传递给 Function 对象。如果需要基于 Future 对象 的结果运行任务并且需要从该任务返回结果,请使用此方法。

 package com.example.concurrent.config;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;

public class ExecutorServiceSample {

        public static void main(String[] args) {
                ExecutorService executor = Executors.newScheduledThreadPool(2, new MyThreadFactory());

                // no return value
                CompletableFuture.runAsync(() -> {
                        System.out.println("Inside Runnable " + Thread.currentThread().getName());
                }, executor);

                // with return value
                CompletableFuture<Boolean> cf = CompletableFuture.supplyAsync(() -> {
                        System.out.println("Inside Supplier " + Thread.currentThread().getName());
                        return true;

                }, executor);

                // accept the new Consumer object once the future is completed
                cf.thenAccept(flag -> {
                        if (flag)
                                System.out.println("Inside thenAccept() ");
                });

                // apply a Function object once the future is completed and returns boolean
                CompletableFuture<Boolean> applyResult = cf.thenApply(flag -> {
                        System.out.println("Inside thenApply() ");
                        return false;
                });

                // shutdown executor manually
                executor.shutdown();
        }
}

//Implement ThreadFactory and give name to thread
class MyThreadFactory implements ThreadFactory {
        static int i = 1;

        public Thread newThread(Runnable r) {
                Thread t = new Thread(r, "" + i);
                i++;
                return t;
        }
}
  

5. thenCompose() :如果想组合两个未来执行的结果,其中一个未来任务依赖于另一个未来任务的输出,请使用此方法。

 package com.example.concurrent.config;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;

public class ExecutorServiceSample {

        public static void main(String[] args) throws InterruptedException, ExecutionException {
                ExecutorService executor = Executors.newScheduledThreadPool(2, new MyThreadFactory());

                CompletableFuture<Integer> cf = CompletableFuture.supplyAsync(() -> {
                        return 1;
                }, executor);

                // result of cf is passed to another future object that invokes add method
                CompletableFuture<Integer> result = cf.thenCompose(x -> {
                        return CompletableFuture.supplyAsync(() -> {
                                return add(x, 2);
                        }, executor);
                });

                System.out.println("Result " + result.get());

                // shutdown executor manually
                executor.shutdown();
        }

        public static int add(int x, int y) {
                return x + y;
        }
}

//Implement ThreadFactory and give name to thread
class MyThreadFactory implements ThreadFactory {
        static int i = 1;

        public Thread newThread(Runnable r) {
                Thread t = new Thread(r, "" + i);
                i++;
                return t;
        }
}
  

6. thenCombine(): 如果你想组合两个独立的未来任务的输出,那么使用这个方法。

修改上面的代码以进行以下更改,我们将产生与上面的代码相同的结果。

 CompletableFuture<Integer> cf = CompletableFuture.supplyAsync(() -> {
                        return 1;
                }, executor);

                CompletableFuture<Integer> cf1 = CompletableFuture.supplyAsync(() -> {
                        return 2;
                }, executor);
                
                // result of cf is passed to another future object that invokes add method
                CompletableFuture<Integer> result = cf.thenCombine(cf1, (x,y) -> {
                        return add(x,y);
                });

                System.out.println("Result " + result.get());
  

并发集合

多线程 上下文设计的集合实现主要是

ConcurrentHashMap , ConcurrentSkipListMap, ConcurrentSkipListSet, CopyOnWriteArrayList, 和CopyOnWriteArraySet,这些集合应用场景有明显区别,例如,Collections. synchronized List(new ArrayList());返回一个 同步的 ArrayList, 同步集合上的 线程安全 是通过在整个对象上应用线程锁来实现的。

CopyOnWriteArraySet 容器即写时复制的容器。通俗的理解是当我们往一个容器添加元素的时候,不直接往当前容器添加,而是先将当前容器进行Copy,复制出一个新的容器,然后新的容器里添加元素,添加完元素之后,再将原容器的引用指向新的容器。这样做的好处是我们可以对CopyOnWrite容器进行并发的读,而不需要加锁,因为当前容器不会添加任何元素。所以CopyOnWrite容器也是一种读写分离的思想,读和写不同的容器。缺点两个 1)占用内存 2)最终一致性

文章来源:智云一二三科技

文章标题:探索 java.util.concurrent 包的内容

文章地址:https://www.zhihuclub.com/184141.shtml

关于作者: 智云科技

热门文章

网站地图