您的位置 首页 java

Java高并发系列-异步的开始future

1. 概述

在本文中,我们将了解Future,一个自 Java 1.5 以来就存在的接口,在处理异步调用和并发处理时非常有用。

2. 创建Futures

简单地说,Future类代表了一个异步计算的未来结果—处理完成后最终会出现在Future 中的结果。

让我们看看如何编写创建和返回Future实例的方法。

长时间运行的方法非常适合异步处理和Future接口,这使我们能够在等待Future封装的任务完成时执行一些其他进程。

利用Future的异步性质的一些操作示例是:

  • 计算密集型过程(数学和科学计算)
  • 操作大数据结构(大数据)
  • 远程方法调用(下载文件、HTML 抓取、Web 服务)。

2.1. 使用FutureTask实现Futures

对于我们的示例,我们将创建一个非常简单的类来计算 Integer 的平方,这绝对不适合“长时间运行”方法类别,但我们将对其进行Thread.sleep()调用以使其持续 1 秒完成:

 public class SquareCalculator {    
    
    private ExecutorService executor 
      = Executors.newSingleThreadExecutor();
    
    public Future<Integer> calculate(Integer input) {        
        return executor.submit(() -> {
             Thread .sleep(1000);
            return input * input;
        });
    }
}  

实际执行计算的代码包含在call()方法中,以 lambda 表达式的形式提供,如您所见,除了前面提到的sleep()调用之外,它没有什么特别之处。

当我们将注意力集中在Callable 和ExecutorService的使用上时,它会变得更有趣。

Callable是一个接口,表示返回结果并具有单个call()方法的任务。在这里,我们使用 lambda 表达式创建了它的一个实例。

创建Callable的实例,我们仍然需要将这个实例传递给一个执行程序,该执行程序将负责在新线程中启动该任务并将有价值的Future对象返回给我们,这就是ExecutorService 的用武之地。

有几种方法可以获取ExecutorService实例,其中大多数是由实用程序类Executors的静态工厂方法提供的。在这个例子中,我们使用了基本的newSingleThreadExecutor(),它为我们提供了一个能够一次处理一个线程的ExecutorService。

一旦我们有了一个ExecutorService对象,我们只需要调用submit()将我们的Callable作为参数传递,submit()将负责启动任务并返回一个FutureTask 对象,它是Future接口的实现。

3. 消费Futures

到目前为止,我们已经学习了如何创建Future的实例。

在本节中,我们将通过探索属于Future API 的所有方法来学习如何使用此实例。

3.1. 使用isDone()和get()获取结果

现在我们需要调用calculate()并使用返回的Future来获取结果Integer,Future API 中的两个方法将帮助我们完成这项任务。

Future.isDone()告诉我们 执行器 是否已完成处理任务,如果任务完成,则返回true ,否则返回false。

返回实际计算结果的方法是Future.get(),请注意,此方法会在任务完成之前阻止执行,但在我们的示例中,这不会成为问题,因为我们将首先通过调用isDone() 来检查任务是否已完成。

通过使用这两种方法,我们可以在等待主任务完成时运行一些其他代码:

 Future<Integer> future = new SquareCalculator().calculate(10);

while(!future.isDone()) {
    System.out.println("Calculating...");
    Thread.sleep(300);
}

Integer result = future.get();  

在这个例子中,我们在输出上写了一条简单的消息,让用户知道程序正在执行计算。

get()方法将阻止执行,直到任务完成。但是我们不必担心这一点,因为我们的示例只会在确保任务完成后调用get()。因此,在这种情况下,future.get()将始终立即返回。

值得一提的是get()有一个重载版本,它以超时和TimeUnit作为参数:

 Integer result = future.get(500, TimeUnit.MILLISECONDS);  

get(long, TimeUnit)和get()之间的区别在于,如果任务在指定的超时期限之前没有返回,则前者将抛出TimeoutException。

3.2. 用cancel()方法取消Future

假设我们触发了一个任务,但由于某种原因,我们不再关心结果,我们可以使用Future.cancel(boolean)告诉 executor 停止操作并中断其底层线程:

 Future<Integer> future = new SquareCalculator().calculate(4);

boolean canceled = future.cancel(true);  

我们上面代码中的Future实例永远不会完成它的操作,事实上,如果我们尝试从该实例调用get(),在调用cancel() 之后,结果将是CancellationException。Future.isCancelled()会告诉我们一个Future是否已经被取消。这对于避免获得CancellationException非常有用。

对cancel()的调用可能会失败。在这种情况下,它的返回值将为false。请注意,cancel()将一个布尔值作为参数——这控制着执行此任务的线程是否应该被中断。

在进行客户端和服务器的远程 RPC 交互中,我们在服务器端经常使用时间轮来取消在客户端已经超时的服务器请求,就用到了cancel的方法,如果感兴趣的话,可以参看netty的 TimeWheel实现。

4. 用线程池创建更多的线程

我们当前的ExecutorService是单线程的,因为它是通过Executors.newSingleThreadExecutor获得的。为了突出这个“单线程”,让我们同时触发两个计算:

 SquareCalculator squareCalculator = new SquareCalculator();

Future<Integer> future1 = squareCalculator.calculate(10);
Future<Integer> future2 = squareCalculator.calculate(100);

while (!(future1.isDone() && future2.isDone())) {
    System.out.println(
      String.format(
        "future1 is %s and future2 is %s", 
        future1.isDone() ? "done" : "not done", 
        future2.isDone() ? "done" : "not done"
      )
    );
    Thread.sleep(300);
}

Integer result1 = future1.get();
Integer result2 = future2.get();

System.out.println(result1 + " and " + result2);

squareCalculator.shutdown();  

现在让我们分析这段代码的输出:

 calculating square for: 10
future1 is not done and future2 is not done
future1 is not done and future2 is not done
future1 is not done and future2 is not done
future1 is not done and future2 is not done
calculating square for: 100
future1 is done and future2 is not done
future1 is done and future2 is not done
future1 is done and future2 is not done
100 and 10000  

很明显,这个过程不是并行的,注意第二个任务是如何在第一个任务完成后才开始的,整个过程大约需要 2 秒才能完成。

为了使我们的程序真正是多线程的,我们应该使用不同风格的ExecutorService,让我们看看如果我们使用由工厂方法Executors.newFixedThreadPool()提供的线程池,我们的示例的行为如何改变:

 public class SquareCalculator {
 
    private ExecutorService executor = Executors.newFixedThreadPool(2);
    
    //...
}  

通过对SquareCalculator类的简单更改,现在我们有了一个能够使用 2 个并发线程的执行器。

如果我们再次运行完全相同的客户端代码,我们将得到以下输出:

 calculating square for: 10
calculating square for: 100
future1 is not done and future2 is not done
future1 is not done and future2 is not done
future1 is not done and future2 is not done
future1 is not done and future2 is not done
100 and 10000  

这现在看起来好多了,注意 2 个任务是如何同时开始和完成运行的,整个过程大约需要 1 秒才能完成。

还有其他工厂方法可用于创建线程池,例如Executors.newCachedThreadPool()在可用时重用以前使用的Thread,以及Executors.newScheduledThreadPool() 调度命令在给定的延迟后运行。

有关ExecutorService 的更多信息,请阅读我们专门针对该主题的文章。

5. Fork JoinTask概述

ForkJoinTask是一个抽象类,它实现了Future并且能够运行由ForkJoinPool 中的少量实际线程托管的大量任务。

在本节中,我们将快速介绍ForkJoinPool的主要特性,有关该主题的综合指南,请查看我们的 Java Fork/Join 框架指南。

然后ForkJoinTask的主要特征是它通常会产生新的子任务作为完成其主要任务所需的工作的一部分,它通过调用fork()生成新任务,并使用join()收集所有结果,因此是类的名称。

有两个实现ForkJoinTask 的抽象类:RecursiveTask在完成时返回一个值,而RecursiveAction不返回任何内容。顾名思义,这些类将用于递归任务,例如文件系统导航或复杂的数学计算。

让我们扩展前面的例子来创建一个类,给定一个Integer,它将计算其所有阶乘元素的平方和。因此,例如,如果我们将数字 4 传递给我们的计算器,我们应该从 4² + 3² + 2² + 1² 的总和中得到结果,即 30。

首先,我们需要创建RecursiveTask的具体实现并实现它的compute()方法。这是我们将编写业务逻辑的地方:

 public class FactorialSquareCalculator extends RecursiveTask<Integer> {
 
    private Integer n;

    public FactorialSquareCalculator(Integer n) {
        this.n = n;
    }

    @Override
    protected Integer compute() {
        if (n <= 1) {
            return n;
        }

        FactorialSquareCalculator calculator 
          = new FactorialSquareCalculator(n - 1);

        calculator.fork();

        return n * n + calculator.join();
    }
}  

请注意我们如何通过在compute() 中创建FactorialSquareCalculator的新实例来实现递归。通过调用非阻塞方法fork(),我们要求ForkJoinPool启动此子任务的执行。

在加入()方法从计算,这是我们增加我们目前正在访问数的平方返回结果。

现在我们只需要创建一个ForkJoinPool来处理执行和线程管理:

 ForkJoinPool forkJoinPool = new ForkJoinPool();

FactorialSquareCalculator calculator = new FactorialSquareCalculator(10);

forkJoinPool.execute(calculator);  

六,结论

在本文中,我们全面了解了Future接口,访问了它的所有方法,我们还学习了如何利用线程池的强大功能来触发多个并行操作,还简要介绍了ForkJoinTask类的主要方法fork()和join()。

我们还有许多其他关于 Java 并行和异步操作的优秀文章。以下是与Future接口密切相关的三个,:

  • CompletableFuture -未来有许多额外的功能用Java 8中引入
  • Java Fork/Join 框架
  • Java ExecutorService——专用于ExecutorService接口

2021-06-15 北京 刘高飞

文章来源:智云一二三科技

文章标题:Java高并发系列-异步的开始future

文章地址:https://www.zhihuclub.com/176592.shtml

关于作者: 智云科技

热门文章

网站地图