您的位置 首页 java

必收藏技能!为Java多线程应用程序优化数据存储库

必收藏技能!为Java多线程应用程序优化数据存储库

Java 多线程应用程序优化数据存储库

数据存储库通常是超高要求系统的瓶颈。在这些系统中,正在执行的查询数量非常大。DelayedBatchExecutor是一个用于减少所需查询数量的组件,通过在Java多线程应用程序中对所需查询进行批处理。

1个参数的n个查询Vs. n个参数的1个查询

假设有一个对关系数据库执行查询的Java应用程序,以便在给定其唯一标识符(id)的情况下检索Product实体(row)。

查询如下所示:

SELECT * FROM PRODUCT WHERE ID = <productId>

现在,检索n个Products,有如下两种方法:

  • 执行1个参数的n个独立查询:

SELECT * FROM PRODUCT WHERE ID = <productId1>

SELECT * FROM PRODUCT WHERE ID = <productId2>

SELECT * FROM PRODUCT WHERE ID = <productIdn>

  • 使用IN运算符或ORs的串联,对n个参数执行1个查询以便同时检索n个 Products

— Example using IN OPERATOR

SELECT * FROM PRODUCT WHERE ID IN (<productId1>, <productId2>, …, <productIdn>)

后者在网络流量和 数据库服务器 资源(CPU和磁盘)方面更为有效,因为:

  • 往返数据库的次数为1,而不是n。
  • 数据库引擎优化了n个参数的数据遍历过程,即每个表格可能只需要扫描1次,而不是n次。

这不仅适用于SELECT操作,而且适用于其他操作,例如 INSERTs,UPDATEs和DELETEs。实际上,JDBC API包括上述操作的批量处理操作。

同样的情况也适用于 NoSQL 存储库,其中大多都明确提供BULK操作。

DelayedBatchExecutor

需要从数据库中检索数据的Java应用程序,如REST微服务或异步消息处理器,通常以多线程应用程序(*1)实现,其中:

  • 每个线程在其执行的某个时刻执行相同的查询(每个查询具有不同的参数)。
  • 并发线程数很高(每秒数十或数百)。

在这种场景下,数据库很可能在较短的时间间隔内多次执行相同的查询。

如前所述,如果将1个参数的n个查询替换为具有n个参数的单个等效查询,那么则应用程序将使用较少的数据库服务器和网络资源。

好消息是它可以通过timewindows(时间窗口)的机制来实现,如下所示:

第一个尝试执行查询的线程会打开一个时间窗口,因此其参数被存储在一个列表中,同时该线程被暂停。在时间窗口内执行相同查询的其余线程会将其参数添加到列表中,并且也会被暂停。此时,数据库上未执行任何查询。

时间窗口结束或列表已满(先前已定义最大容量限制)后,将使用列表中存储的所有参数执行单个查询。最后,一旦数据库提供了该查询的结果,每个线程将接收相应的结果,同时所有线程将自动恢复。

笔者构建了一个简单而轻量级的应用机制(DelayedBatchExecutor),很容易在新的或现有的应用程序中使用。它基于Reactor库,并且为参数列表使用超时的Flux缓冲发布器。

运用DelayedBatchExecutor的吞吐量和延迟分析

假设针对Products的REST微服务公开了一个端点,用于检索数据库中给定的 productId的Product数据。在没有DelayedBatchExecutor的情况下,如果每秒对端点命中200次,则数据库每秒执行200个查询。如果端点使用的DelayedBatchExecutor 配置了50毫秒的时间窗口且最大容量=10个参数,数据库每秒钟将只执行10个参数的20个查询,代价是每执行一个线程,最多在50毫秒内增加延时(*2)。

换句话说,为了将延时增加50毫秒(* 2),数据库每秒接收的查询减少了10倍,然而保持了系统的整体吞吐量。还不错!!

其他有趣的配置:

  • 窗口时间= 100毫秒,最大容量= 20个参数→20个参数的10个查询(查询减少20倍)
  • 窗口时间= 500毫秒,最大容量= 100个参数→2个查询100个参数(查询减少100倍)

执行中的DelayedBatchExecutor

深入研究Product微服务示例。假设对于每个传入的HTTP请求,微服务的控制器都要求检索已有id的Product(Java Bean),因此将调用以下方法:

DAO 组件(ProductDAO)的public Product getProductById(IntegerproductId) .

以下分别是有和没有 DelayedBatchExecutor的DAO执行。

没有 DelayedBatchExecutor

public classProductDAO {

public Product getProductById(Integer id) {

Product product= …// execute the query SELECT * FROM PRODUCT WHERE ID=<id>

// using your favourite API: JDBC , JPA , Hibernate…

return product;

}

}

有DelayedBatchExecutor

// Singleton

publicclass ProductDAO {

DelayedBatchExecutor2<Product, Integer> delayedBatchExecutorProductById =

DelayedBatchExecutor.define(Duration.ofMillis(50), 10, this::retrieveProductsByIds);

public Product getProductById(Integer id) {

Product product = delayedBatchExecutorProductById.execute(id);

return product;

}

private List<Product> retrieveProductsByIds(List<Integer> idList) {

List<Product> productList = …// execute query:SELECT * FROM PRODUCT WHERE ID IN (idList.get(0), …, idList.get(n));

// using your favourite API: JDBC, JPA, Hibernate…

// The positions of the elements of the list to return must match the ones in the parameters list.

// For instance, the first Product of the list to be returned must be the one with

// the Id in the first position of productIdsList and so on…

// NOTE: null could be used as value, meaning that no Product exist for the given productId

return productList;

}

}

首先,必须在DAO中创建一个DelayedBatchExecutor实例,在本例中为 delayedBatchExecutorProductById。需要以下三个参数:

  • 时间窗口(在此示例中为50毫秒)
  • 参数列表的最大容量(在此示例中为10个参数)
  • 将使用参数列表调用的方法(详细信息见后文)。在此示例中,方法为retrieveProductsByIds

其次,已经重构了DAO方法 publicProduct getProductById(Integer productId),以简单调用delayedBatchExecutorProductById 实例的execute 方法。所有的“magic”都是由 DelayedBatchExecutor完成的。

之所以delayedBatchExecutorProductById是DelayedBatchExecutor2<Product,Integer>的实例,是因为其execute 方法返回一个Product实例并接收一个Integer实例作为其实际参数。因此,存在:DelayedBatchExecutor2<Product,Integer>。

如果execute方法需要接收两个参数(例如,一个 Integer和一个String)并返回Product实例,则定义为 DelayedBatchExecutor3<Product,Integer,String> 等。

最终,retrieveProductsByIds方法必须返回List<Product>并接收List<Integer>作为参数。

如果使用的是 DelayedBatchExecutor3<Product,Integer,String> ,则必须将retrieveProductsByIds设为List<Product>retrieveProductsByIds(List<Integer> productIdsList, List<String>stringList)

就是这样。

一旦运行,执行控制器逻辑的并发线程会在某时刻调用方法 getProductById(Integerid) ,并且此方法将返回对应的Product。并发线程不知自己已经被 DelayedBatchExecutor暂停并恢复了。

图源:pexels

由数据存储库延伸的“题外话”

尽管本文与数据存储库有关,但 DelayedBatchExecutor也可以用在其他地方,例如:对REST进行微服务请求。再说,用1个参数启动n个GET请求要比使用n个参数启动1个GET昂贵得多。

DelayedBatchExecutor的优化

笔者创建了 DelayedBatchExecutor并使用了一段时间,有效地解决了个人项目中并发线程启动的多个查询的执行问题。因此相信它对其他人可能也有用处,所以决定将其公开。

话虽如此,DelayedBatchExecutor改进和功能扩展的空间还很大。最有趣的是能够根据执行的特定条件动态更改DelayedBatchExecutor参数(窗口时间和最大容量)的功能,以便在利用带有n个参数的查询时最大程度地减少延时。

留言点赞关注

我们一起分享AI学习与发展的干货

如转载,请后台留言,遵守转载规范

文章来源:智云一二三科技

文章标题:必收藏技能!为Java多线程应用程序优化数据存储库

文章地址:https://www.zhihuclub.com/180765.shtml

关于作者: 智云科技

热门文章

网站地图