您的位置 首页 java

异步调用如何使用是最好的方式?

「这是我参与2022首次更文挑战的第9天,活动详情查看:2022首次更文挑战」

一、 异步调用 方式分析

今天在写代码的时候,想要调用异步的操作,这里我是用的java8的流式异步调用,但是使用过程中呢,发现这个异步方式有两个方法,如下所示:

区别是一个 需要指定 线程池 一个不需要

  • 那么指定线程池有哪些好处呢?直观的说有以下两点好处:可以根据我们的服务器性能,通过池的管理更好的规划我们的 线程 数。可以对我们使用的线程自定义名称,这里也是 阿里 java开发规范所提到的。

1.1 java8异步调用默认线程池方式

当然常规使用默认的也没什么问题。我们通过源码分析下使用默认线程池的过程。

    public  static  CompletableFuture< void > runAsync(Runnable runnable) {
        return asyncRunStage(asyncPool, runnable);
    }
复制代码  

看下这个 asyncPool 是什么?

如下所示,useCommonPool如果为真,就使用 ForkJoinPool.commonPool() ,否则创建一个 new ThreadPerTaskExecutor()

      private  static final Executor asyncPool = useCommonPool ?
        ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();
复制代码  

看看useCommonPool 是什么?

     private static final boolean useCommonPool =
        (ForkJoinPool.getCommonPoolParallelism() > 1);
复制代码  
   /**
    * 公共池的目标并行度级别
    */    public static int getCommonPoolParallelism() {
        return commonParallelism;
    }
复制代码  

最终这个并行级别并没有给出默认值

 static final int commonParallelism;
复制代码  

通过找到这个常量的调用,我们看看是如何进行初始化的,在ForkJoinPool中有一个静态代码块,启动时会对commonParallelism进行初始化,我们只关注最后一句话就好了,:

     // Unsafe mechanics
    private static final sun.misc.Unsafe U;
    private static final int  ABASE;
    private static final int  ASHIFT;
    private static final long CTL;
    private static final long RUNSTATE;
    private static final long STEALCOUNTER;
    private static final long PARKBLOCKER;
    private static final long QTOP;
    private static final long QLOCK;
    private static final long QSCANSTATE;
    private static final long QPARKER;
    private static final long QCURRENTSTEAL;
    private static final long QCURRENTJOIN;

    static {
        // initialize field offsets for CAS etc
        try {
            U = sun.misc.Unsafe.getUnsafe();
            Class<?> k = ForkJoinPool.class;
            CTL = U.objectFieldOffset
                (k.getDeclaredField("ctl"));
            RUNSTATE = U.objectFieldOffset
                (k.getDeclaredField("runState"));
            STEALCOUNTER = U.objectFieldOffset
                (k.getDeclaredField("stealCounter"));
            Class<?> tk = Thread.class;
            PARKBLOCKER = U.objectFieldOffset
                (tk.getDeclaredField("parkBlocker"));
            Class<?> wk = WorkQueue.class;
            QTOP = U.objectFieldOffset
                (wk.getDeclaredField("top"));
            QLOCK = U.objectFieldOffset
                (wk.getDeclaredField("qlock"));
            QSCANSTATE = U.objectFieldOffset
                (wk.getDeclaredField("scanState"));
            QPARKER = U.objectFieldOffset
                (wk.getDeclaredField("parker"));
            QCURRENTSTEAL = U.objectFieldOffset
                (wk.getDeclaredField("currentSteal"));
            QCURRENTJOIN = U.objectFieldOffset
                (wk.getDeclaredField("currentJoin"));
            Class<?> ak = ForkJoinTask[].class;
            ABASE = U.arrayBaseOffset(ak);
            int scale = U.arrayIndexScale(ak);
            if ((scale & (scale - 1)) != 0)
                throw new Error("data type scale not a power of two");
            ASHIFT = 31 - Integer. number OfLeadingZeros(scale);
        } catch ( Exception  e) {
            throw new Error(e);
        }

        commonMaxSpares = DEFAULT_COMMON_MAX_SPARES;
        defaultForkJoinWorkerThreadFactory =
            new DefaultForkJoinWorkerThreadFactory();
        modifyThreadPermission = new RuntimePermission("modifyThread");

        common = java.security.AccessController.doPrivileged
            (new java.security.PrivilegedAction<ForkJoinPool>() {
                public ForkJoinPool run() { return makeCommonPool(); }});
         // 即使线程被禁用也是1,至少是个1
        int par = common.config & SMASK;
        commonParallelism = par > 0 ? par : 1;
    }
复制代码  

如下所示,默认是7:

所以接着下面的代码看:

     private static final boolean useCommonPool =
        (ForkJoinPool.getCommonPoolParallelism() > 1);
复制代码  

这里一定是返回true,证明当前是并行的。

     private static final Executor asyncPool = useCommonPool ?
        ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();
复制代码  

上面会返回一个大小是七的的默认线程池

其实这个默认值是当前cpu的核心数,我的电脑是八核,在代码中默认会将核心数减一,所以显示是七个线程。

         if (parallelism < 0 && //默认是1,小于核心数
            (parallelism = Runtime.getRuntime().availableProcessors() - 1) <= 0)
            parallelism = 1;
        if (parallelism > MAX_CAP)
            parallelism = MAX_CAP;
复制代码  

下面我们写个main方法测试一下,10个线程,每个阻塞10秒,看结果:

     public static void main(String[] args) {
        // 创建10个任务,每个任务阻塞10秒
        for (int i = 0; i < 10; i++) {
            CompletableFuture.runAsync(() -> {
                try {
                    Thread.sleep(10000);
                    System.out.println(new Date() + ":" + Thread.currentThread().getName());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
        }

        try {
            Thread.sleep(30000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
复制代码  

结果如下所示,前面七个任务先完成,另外三个任务被阻塞10秒后,才完成:

 Mon Sep 13 11:20:57 CST 2021:ForkJoinPool.commonPool-worker-5
Mon Sep 13 11:20:57 CST 2021:ForkJoinPool.commonPool-worker-4
Mon Sep 13 11:20:57 CST 2021:ForkJoinPool.commonPool-worker-2
Mon Sep 13 11:20:57 CST 2021:ForkJoinPool.commonPool-worker-7
Mon Sep 13 11:20:57 CST 2021:ForkJoinPool.commonPool-worker-3
Mon Sep 13 11:20:57 CST 2021:ForkJoinPool.commonPool-worker-6
Mon Sep 13 11:20:57 CST 2021:ForkJoinPool.commonPool-worker-1
-----------------------------------------------------------  
Mon Sep 13 11:21:07 CST 2021:ForkJoinPool.commonPool-worker-2
Mon Sep 13 11:21:07 CST 2021:ForkJoinPool.commonPool-worker-5
Mon Sep 13 11:21:07 CST 2021:ForkJoinPool.commonPool-worker-4
复制代码  

二、使用自定义的线程池

上面说到如果是 IO密集型 的场景,在异步调用时还是使用 自定义线程池 比较好。

  • 针对开篇提到的两个显而易见的好处,此处新增一条:可以根据我们的服务器性能,通过池的管理更好的规划我们的线程数。可以对我们使用的线程自定义名称,这里也是阿里java开发规范所提到的。不会因为阻塞导致使用共享线程池的其他线程阻塞甚至异常。

我们自定义下面的线程池:

 /**
 * @description: 全局通用线程池
 * @author:weirx
 * @date:2021/9/9 18:09
 * @version:3.0
 */@Slf4j
public class GlobalThreadPool {

    /**
     * 核心线程数
     */    public final static int CORE_POOL_SIZE = 10;

    /**
     * 最大线程数
     */    public final static int MAX_NUM_POOL_SIZE = 20;

    /**
     * 任务队列大小
     */    public final static int BLOCKING_QUEUE_SIZE = 30;

    /**
     * 线程池实例
     */    private final static ThreadPoolExecutor instance =  getInstance ();


    /**
     * description: 初始化线程池
     *
     * @return:  java .util.concurrent.ThreadPoolExecutor
     * @author: weirx
     * @time: 2021/9/10 9:49
     */    private  synchronized  static ThreadPoolExecutor getInstance() {
        // 生成线程池
         ThreadPoolExecutor  executor = new ThreadPoolExecutor(
                CORE_POOL_SIZE,
                MAX_NUM_POOL_SIZE,
                60,
                TimeUnit.SECONDS,
                new LinkedBlockingQueue<>(BLOCKING_QUEUE_SIZE),
                new NamedThreadFactory("Thread-wjbgn-", false));
        return executor;
    }

    private GlobalThreadPool() {
    }

    public static ThreadPoolExecutor getExecutor() {
        return instance;
    }
}
复制代码  

调用:

     public static void main(String[] args) {
        // 创建10个任务,每个任务阻塞10秒
        for (int i = 0; i < 10; i++) {
            CompletableFuture.runAsync(() -> {
                try {
                    Thread.sleep(10000);
                    System.out.println(new Date() + ":" + Thread.currentThread().getName());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            },GlobalThreadPool.getExecutor());
        }

        try {
            Thread.sleep(30000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
复制代码  

输出我们指定线程名称的线程:

 Mon Sep 13 11:32:35 CST 2021:Thread-Inbox-Model-1
Mon Sep 13 11:32:35 CST 2021:Thread-Inbox-Model-10
Mon Sep 13 11:32:35 CST 2021:Thread-Inbox-Model-2
Mon Sep 13 11:32:35 CST 2021:Thread-Inbox-Model-9
Mon Sep 13 11:32:35 CST 2021:Thread-Inbox-Model-5
Mon Sep 13 11:32:35 CST 2021:Thread-Inbox-Model-6
Mon Sep 13 11:32:35 CST 2021:Thread-Inbox-Model-3
Mon Sep 13 11:32:35 CST 2021:Thread-Inbox-Model-7
Mon Sep 13 11:32:35 CST 2021:Thread-Inbox-Model-8
Mon Sep 13 11:32:35 CST 2021:Thread-Inbox-Model-4
复制代码  

三、题外话,动态线程池

3.1 什么是动态线程池?

在我们使用线程池的时候,是否有的时候很纠结,到底设置多大的线程池参数是最合适的呢?如果不够用了怎么办,要改代码重新部署吗?

其实是不需要的,记得当初看过 美团 的一篇文章,真的让人茅塞顿开啊, 动态线程池

ThreadPoolExecutor 这个类其实是提供对于线程池的属性进行修改的,支持我们动态修改以下的属性:

从上至下分别是:

  • 线程工厂(用于指定线程名称)
  • 核心线程数
  • 最大线程数
  • 活跃时间
  • 拒绝策略。

在美团的文章当中呢,是监控服务器线程的使用率,当达到 阈值 就进行告警,然后通过配置中心去动态修改这些数值。

我们也可以这么做,使用 @RefreshScope nacos 就可以实现了。

3.2 实践

我写了一个定时任务,监控当前服务的线程使用率,小了就扩容,一段时间后占用率下降,就恢复初始值。

其实还有很多地方需要改进的,请大家多提意见,监控的是文章前面的线程池 GlobalThreadPool ,下面调度任务的代码:

 /**
 * @description: 全局线程池守护进程
 * @author:weirx
 * @date:2021/9/10 16:32
 * @version:3.0
 */@Slf4j
@Component
public class DaemonThreadTask {

    /**
     * 服务支持最大线程数
     */    public final static int SERVER_MAX_SIZE = 50;

    /**
     * 最大阈值Maximum threshold,百分比
     */    private final static int MAXIMUM_THRESHOLD = 8;

    /**
     * 每次递增最大线程数
     */    private final static int INCREMENTAL_MAX_NUM = 10;

    /**
     * 每次递增核心线程数
     */    private final static int INCREMENTAL_CORE_NUM = 5;

    /**
     * 当前线程数
     */    private static int currentSize = GlobalThreadPool.MAX_NUM_POOL_SIZE;

    /**
     * 当前核心线程数
     */    private static int currentCoreSize = GlobalThreadPool.CORE_POOL_SIZE;

    @Scheduled(cron = "0 */5 * * * ?")
    public static void execute() {
        threadMonitor();
    }


    /**
     * description: 动态监控并设置线程参数
     *
     * @return: void
     * @author: weirx
     * @time: 2021/9/10 13:20
     */    private static void threadMonitor() {
        ThreadPoolExecutor instance = GlobalThreadPool.getExecutor();
        int activeCount = instance.getActiveCount();
        int size = instance.getQueue().size();
        log.info("GlobalThreadPool: the active thread count is {}", activeCount);
        // 线程数不足,增加线程
        if (activeCount > GlobalThreadPool.MAX_NUM_POOL_SIZE % MAXIMUM_THRESHOLD
                && size >= GlobalThreadPool.BLOCKING_QUEUE_SIZE) {
            currentSize = currentSize + INCREMENTAL_MAX_NUM;
            currentCoreSize = currentCoreSize + INCREMENTAL_CORE_NUM;
            //当前设置最大线程数小于服务最大支持线程数才可以继续增加线程
            if (currentSize <= SERVER_MAX_SIZE) {
                instance.setMaximumPoolSize(currentSize);
                instance.setCorePoolSize(currentCoreSize);
                log.info("this max thread size is {}", currentSize);
            } else {
                log.info("current size is more than server max size, can not add");
            }
        }
        // 线程数足够,降低线程数,当前活跃数小于默认核心线程数
        if (activeCount < GlobalThreadPool.MAX_NUM_POOL_SIZE
                && size == 0
                && currentSize > GlobalThreadPool.MAX_NUM_POOL_SIZE) {
            currentSize = GlobalThreadPool.MAX_NUM_POOL_SIZE;
            currentCoreSize = GlobalThreadPool.CORE_POOL_SIZE;
            instance.setMaximumPoolSize(currentSize);
            instance.setCorePoolSize(currentCoreSize);
        }
    }
}
复制代码  

3.3 动态线程池有什么意义?

有的朋友其实问过我,我直接把线程池设置大一点不就好了,这种动态线程池有什么意义呢?

其实这是一个好问题。在以前的传统软件当中,单机部署,硬件部署,确实,我们能使用的线程数取决于服务器的核心线程数,而且基本没有其他服务来争抢这些线程。

文章来源:智云一二三科技

文章标题:异步调用如何使用是最好的方式?

文章地址:https://www.zhihuclub.com/187890.shtml

关于作者: 智云科技

热门文章

评论已关闭

3条评论

  1. Pathologic stage was calculated according to the American Joint Committee on Cancer 6th Edition of the Cancer Staging Manual

  2. Reproduced with permission from Patel DR, Greydanus DE, Baker RJ Pediatric Practice Sports Medicine

网站地图