为什么不推荐使用Executors
底层确实是通过LinkedBlockingQueue实现的,默认不设置队列大小的话,将是一个无边界的阻塞队列,最大长度为Integer.MAX_VALUE,而创建这么多线程,必然就有可能导致OOM,报错如下:
Exception in thread "main" java .lang.OutOfMemoryError: GC overhead limit exceeded
at java.util.concurrent.LinkedBlockingQueue.offer(LinkedBlockingQueue.java:416)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1371)
at com.hollis.ExecutorsDemo.main(ExecutorsDemo.java:16)
创建 线程池 的正确姿势
public class ExecutorsDemo { private static ThreadFactory namedThreadFactory = new ThreadFactoryBuilder() .setNameFormat("demo-pool-%d").build(); private static ExecutorService pool = new ThreadPoolExecutor(5, 200, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(1024), namedThreadFactory, new ThreadPoolExecutor.AbortPolicy()); public static void main(String[] args) { for (int i = 0; i < Integer.MAX_VALUE; i++) { pool.execute(new SubThread()); } } }
关闭 线程 池
long start = System.currentTimeMillis(); for (int i = 0; i <= 5; i++) { pool.execute(new Job()); } pool.shutdown(); while (!pool.awaitTermination(1, TimeUnit.SECONDS)) { LOGGER.info("线程还在执行。。。"); } long end = System.currentTimeMillis(); LOGGER.info("一共处理了【{}】", (end - start));
SpringBoot 使用线程池
//线程池配置 @Configuration public class TreadPoolConfig { /** * 消费队列线程 * @return */ @Bean(value = "consumerQueueThreadPool") public ExecutorService buildConsumerQueueThreadPool(){ ThreadFactory namedThreadFactory = new ThreadFactoryBuilder() .setNameFormat("consumer-queue-thread-%d").build(); ExecutorService pool = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(5),namedThreadFactory,new ThreadPoolExecutor.AbortPolicy()); return pool ; } } //线程池使用 @Resource(name = "consumerQueueThreadPool") private ExecutorService consumerQueueThreadPool; @Override public void execute() { //消费队列 for (int i = 0; i < 5; i++) { consumerQueueThreadPool.execute(new ConsumerQueueThread()); } }
线程池隔离
hystrix 隔离
首先需要定义两个线程池,分别用于执行订单、处理用户。
/** * Function :订单服务 * * @author crossoverJie * Date: 2018/7/28 16:43 * @since JDK 1.8 */public class CommandOrder extends HystrixCommand<String> { private final static Logger LOGGER = Logger Factory .getLogger(CommandOrder.class); private String orderName; public CommandOrder(String orderName) { super(Setter.withGroupKey( //服务分组 HystrixCommandGroupKey.Factory.asKey("OrderGroup")) //线程分组 .andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey("OrderPool")) //线程池配置 .andThreadPoolPropertiesDefaults(HystrixThreadPoolProperties.Setter() .withCoreSize(10) .withKeepAliveTimeMinutes(5) .withMaxQueueSize(10) .withQueueSizeRejectionThreshold(10000)) .andCommandPropertiesDefaults( HystrixCommandProperties.Setter() .withExecutionIsolationStrategy(HystrixCommandProperties.ExecutionIsolationStrategy.THREAD)) ) ; this.orderName = orderName; } @Override public String run() throws Exception { LOGGER.info("orderName=[{}]", orderName); TimeUnit.MILLISECONDS.sleep(100); return "OrderName=" + orderName; } } /** * Function:用户服务 * * @author crossoverJie * Date: 2018/7/28 16:43 * @since JDK 1.8 */public class CommandUser extends HystrixCommand<String> { private final static Logger LOGGER = LoggerFactory.getLogger(CommandUser.class); private String userName; public CommandUser(String userName) { super(Setter.withGroupKey( //服务分组 HystrixCommandGroupKey.Factory.asKey("UserGroup")) //线程分组 .andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey("UserPool")) //线程池配置 .andThreadPoolPropertiesDefaults(HystrixThreadPoolProperties.Setter() .withCoreSize(10) .withKeepAliveTimeMinutes(5) .withMaxQueueSize(10) .withQueueSizeRejectionThreshold(10000)) //线程池隔离 .andCommandPropertiesDefaults( HystrixCommandProperties.Setter() .withExecutionIsolationStrategy(HystrixCommandProperties.ExecutionIsolationStrategy.THREAD)) ) ; this.userName = userName; } @Override public String run() throws Exception { LOGGER.info("userName=[{}]", userName); TimeUnit.MILLISECONDS.sleep(100); return "userName=" + userName; } }
模拟运行:
public static void main(String[] args) throws Exception { CommandOrder commandPhone = new CommandOrder("手机"); CommandOrder command = new CommandOrder("电视"); //阻塞方式执行 String execute = commandPhone.execute(); LOGGER.info("execute=[{}]", execute); //异步非阻塞方式 Future<String> queue = command.queue(); String value = queue.get(200, TimeUnit.MILLISECONDS); LOGGER.info("value=[{}]", value); CommandUser commandUser = new CommandUser("张三"); String name = commandUser.execute(); LOGGER.info("name=[{}]", name); }
原理: 利用一个 Map 来存放不同业务对应的线程池。
注意: 自定义的 Command 并不是一个单例,每次执行需要 new 一个实例,不然会报 This instance can only be executed once. Please instantiate a new instance.