- 有时候需要平衡数据库的写入性能,将大批量的单条插入操作修改成批量插入方式,减小了数据库的写入瓶颈
- 下面例子基于 Spring ,思路主要是 5s 内如果内存数据量大于 1000 条了,就将这 1000 条数据一次写库; 当 5s 内数据不满 1000 条,但是间隔 5s 了,也执行一次写库操作;这样相当于在写库条件上,同时考虑了一次插入过大数量问题和有数据但是过长时间未写入问题
package com.example.springdemo. Batch save;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.concurrent.CustomizableThreadFactory;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.*;
import java.util.concurrent.atomic.LongAdder;
/**
* Description: 模拟批量入库
*
* @author zonda
* @version 1.0
* @date 2022/9/15 20:48
* @since JDK11
*/public class BatchSaveSample {
private final Logger logger = LoggerFactory.getLogger(BatchSaveSample.class);
private final Long Adder batchNumber = new LongAdder();
/**
* 每批次保存数据数量
*/ private final static int batchSize = 1000;
/**
* 执行保存操作的 线程池
*/ private final ExecutorService executor Service = Executors.newSingleThreadExecutor(new CustomizableThreadFactory("xx-batch-save-"));
/**
* 内存日志队列
*/ public static final BlockingQueue queue = new LinkedBlockingQueue(2000);
/**
* 执行批量保存操作
*/ private void batchSave() {
int curIdx = 0;
long start = System.currentTimeMillis();
List temp = new ArrayList<>();
while (keep()) {
try {
final Object poll = queue.poll(5, TimeUnit.SECONDS);
if (poll != null ) {
temp.add(poll);
curIdx++;
}
if (curIdx == 0) {
continue;
}
if (curIdx == batchSize || (System.currentTimeMillis() - start) > 5000) {
doSaveAndClear(temp);
curIdx = 0;
start = System.currentTimeMillis();
}
} catch (Interrupted Exception e) {
// log the error
logger.error("", e);
// do save if temp not empty
if (temp.size() > 0) {
doSaveAndClear(temp);
}
}
}
}
/**
* the loop condition
*/ private boolean keep() {
return true;
}
/**
* save to db and clear the collection
*/ private void doSaveAndClear(Collection data) {
// save to db
batchNumber.increment();
logger.info("--第 {} 批次,保存 {} 条", batchNumber.longValue(), data.size());
// clear collection
data.clear();
}
/**
* 开启消费任务
*/ private void start() {
executorService.execute(() -> batchSave());
}
// test
public static void main(String[] args) {
//
BatchSaveSample batchSaveSample = new BatchSaveSample();
// mock producer produce a data in 1s
new Thread(() -> {
do {
queue.add(new Object());
try {
Thread.sleep((long) (Math.random() * 10000));
} catch (InterruptedException e) {
Thread.interrupted();
}
} while (!Thread.currentThread().isInterrupted());
}).start();
// start the consumer
batchSaveSample.start();
}
}
文章来源:智云一二三科技
文章标题:Java 批量保存数据实践
文章地址:https://www.zhihuclub.com/183688.shtml