您的位置 首页 java

线程池-打开方式

为什么不推荐使用Executors

底层确实是通过LinkedBlockingQueue实现的,默认不设置队列大小的话,将是一个无边界的阻塞队列,最大长度为Integer.MAX_VALUE,而创建这么多线程,必然就有可能导致OOM,报错如下:

Exception in thread "main"  java .lang.OutOfMemoryError: GC overhead limit exceeded
 at java.util.concurrent.LinkedBlockingQueue.offer(LinkedBlockingQueue.java:416)
 at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1371)
 at com.hollis.ExecutorsDemo.main(ExecutorsDemo.java:16)
 

创建 线程池 的正确姿势

public class ExecutorsDemo {
 private static ThreadFactory namedThreadFactory = new ThreadFactoryBuilder()
 .setNameFormat("demo-pool-%d").build();
 
 private static ExecutorService pool = new ThreadPoolExecutor(5, 200,
 0L, TimeUnit.MILLISECONDS,
 new LinkedBlockingQueue<Runnable>(1024), namedThreadFactory, new ThreadPoolExecutor.AbortPolicy());
 
 public static void main(String[] args) {
 for (int i = 0; i < Integer.MAX_VALUE; i++) {
 pool.execute(new SubThread());
 }
 }
}
 

关闭 线程

long start = System.currentTimeMillis();
for (int i = 0; i <= 5; i++) {
 pool.execute(new Job());
}
 pool.shutdown();
 while (!pool.awaitTermination(1, TimeUnit.SECONDS)) {
 LOGGER.info("线程还在执行。。。");
}
 long end = System.currentTimeMillis();
 LOGGER.info("一共处理了【{}】", (end - start));
 

SpringBoot 使用线程池

//线程池配置
@Configuration
public class TreadPoolConfig {
 /**
 * 消费队列线程
 * @return
 */ @Bean(value = "consumerQueueThreadPool")
 public ExecutorService buildConsumerQueueThreadPool(){
 ThreadFactory namedThreadFactory = new ThreadFactoryBuilder()
 .setNameFormat("consumer-queue-thread-%d").build();
 ExecutorService pool = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MILLISECONDS,
 new ArrayBlockingQueue<Runnable>(5),namedThreadFactory,new ThreadPoolExecutor.AbortPolicy());
 return pool ;
 }
}
//线程池使用
@Resource(name = "consumerQueueThreadPool")
 private ExecutorService consumerQueueThreadPool;
 @Override
 public void execute() {
 //消费队列
 for (int i = 0; i < 5; i++) {
 consumerQueueThreadPool.execute(new ConsumerQueueThread());
 }
 }
 

线程池隔离

hystrix 隔离

首先需要定义两个线程池,分别用于执行订单、处理用户。

/**
 *  Function :订单服务
 *
 * @author crossoverJie
 * Date: 2018/7/28 16:43
 * @since JDK 1.8
 */public class CommandOrder extends HystrixCommand<String> {
 private final static Logger LOGGER = Logger Factory .getLogger(CommandOrder.class);
 private String orderName;
 public CommandOrder(String orderName) {
 super(Setter.withGroupKey(
 //服务分组
 HystrixCommandGroupKey.Factory.asKey("OrderGroup"))
 //线程分组
 .andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey("OrderPool"))
 //线程池配置
 .andThreadPoolPropertiesDefaults(HystrixThreadPoolProperties.Setter()
 .withCoreSize(10)
 .withKeepAliveTimeMinutes(5)
 .withMaxQueueSize(10)
 .withQueueSizeRejectionThreshold(10000))
 .andCommandPropertiesDefaults(
 HystrixCommandProperties.Setter()
 .withExecutionIsolationStrategy(HystrixCommandProperties.ExecutionIsolationStrategy.THREAD))
 )
 ;
 this.orderName = orderName;
 }
 @Override
 public String run() throws Exception {
 LOGGER.info("orderName=[{}]", orderName);
 TimeUnit.MILLISECONDS.sleep(100);
 return "OrderName=" + orderName;
 }
}
/**
 * Function:用户服务
 *
 * @author crossoverJie
 * Date: 2018/7/28 16:43
 * @since JDK 1.8
 */public class CommandUser extends HystrixCommand<String> {
 private final static Logger LOGGER = LoggerFactory.getLogger(CommandUser.class);
 private String userName;
 public CommandUser(String userName) {
 super(Setter.withGroupKey(
 //服务分组
 HystrixCommandGroupKey.Factory.asKey("UserGroup"))
 //线程分组
 .andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey("UserPool"))
 //线程池配置
 .andThreadPoolPropertiesDefaults(HystrixThreadPoolProperties.Setter()
 .withCoreSize(10)
 .withKeepAliveTimeMinutes(5)
 .withMaxQueueSize(10)
 .withQueueSizeRejectionThreshold(10000))
 //线程池隔离
 .andCommandPropertiesDefaults(
 HystrixCommandProperties.Setter()
 .withExecutionIsolationStrategy(HystrixCommandProperties.ExecutionIsolationStrategy.THREAD))
 )
 ;
 this.userName = userName;
 }
 @Override
 public String run() throws Exception {
 LOGGER.info("userName=[{}]", userName);
 TimeUnit.MILLISECONDS.sleep(100);
 return "userName=" + userName;
 }
}
 

模拟运行:

public static void main(String[] args) throws Exception {
 CommandOrder commandPhone = new CommandOrder("手机");
 CommandOrder command = new CommandOrder("电视");
 //阻塞方式执行
 String execute = commandPhone.execute();
 LOGGER.info("execute=[{}]", execute);
 //异步非阻塞方式
 Future<String> queue = command.queue();
 String value = queue.get(200, TimeUnit.MILLISECONDS);
 LOGGER.info("value=[{}]", value);
 CommandUser commandUser = new CommandUser("张三");
 String name = commandUser.execute();
 LOGGER.info("name=[{}]", name);
 }
 

原理: 利用一个 Map 来存放不同业务对应的线程池。

注意: 自定义的 Command 并不是一个单例,每次执行需要 new 一个实例,不然会报 This instance can only be executed once. Please instantiate a new instance.


文章来源:智云一二三科技

文章标题:线程池-打开方式

文章地址:https://www.zhihuclub.com/187918.shtml

关于作者: 智云科技

热门文章

网站地图