欢迎来到Introzo百科
Introzo百科
当前位置:网站首页 > 技术 > 性能爆炸:SpringBoot使用ThreadPoolTask​​Executor批量插入百万数据!

性能爆炸:SpringBoot使用ThreadPoolTask​​Executor批量插入百万数据!

日期:2023-09-30 04:12

点击关注公众号,实用技术文章随时了解最新


前言

开发目的:

提高百万数据插入效率。

制定计划:

利用 ThreadPoolTask​​Executor 进行多线程批量插入。

所用技术:

  • 弹簧boot2.1.1
  • MybatisPlus3.0.6
  • swagger2.5.0
  • Lombok1.18.4
  • postgresql
  • ThreadPoolTask​​Executor

具体实施细则

application-dev.properties添加线程池配置信息

#异步线程配置
#配置核心线程数
async.executor.thread.core_pool_size = 30
#配置最大线程数
async.executor.thread.max_pool_size = 30
# 配置队列大小
async.executor.thread.queue_capacity = 99988
# 配置线程池中线程的名称前缀
async.executor.thread.name.prefix = async -导入DB-

spring容器注入线程池bean对象

@配置
@EnableAsync
@Slf4j
公共  ExecutorConfig {
     @Value("${async.executor.thread.core_pool_size}")
    private int corePoolSize;
     @Value("${async.executor.thread.max_pool_size}")
    private intmaxPoolSize;
     @Value("${async.executor.thread.queue_capacity}")
    private int队列容量;
     @Value("${async.executor.thread.name.prefix}")
    private String namePre修复;
 
    @ Bean(名称= “asyncServiceExecutor”
    public 执行器 asyncServiceExecutor() 
{
        log.warn ("start asyncServiceExecutor");
        //这里修改
        ThreadPoolTask​​Executor执行器= newVisiableThreadPoolTask​​Executor();
//配置核心线程数
          executor.setCorePoolSize(corePoolSize); executor.setMaxPoolSize(maxPoolSize);
//配置队列大小
executor.setQueueCapacity(queueCapacity);
//配置ure 线程池中线程的名称前缀
executor.setThreadPrefixName(namePrefix);
    //rejection-policy : 当pool达到Max Size时,如何处理新任务
// Caller_Runs: 不执行任务在new 线程,而是调用者所在线程执行
Executor.Setrejecutionhandler ( ( new ThreadPoolExecutor.CallerRunsPolicy());
/ /执行初始化
executor.initialize(); 返回执行器;
}
}

创建异步线程业务类

@Service
@Slf4j
公共  AsyncServiceImpl  实现 AsyncService {
@Override
    @Async“asyncServiceExecutor”
    公共   executeAsync(列表 logOutputResults,LogOutputResultMapper logOutputResultMapper,CountDownLatch countDownLatch) {
        尝试{
            log.warn("开始执行executeAsync");
            //异步线程要做的事情
            logOutputResultMapper.addLogOutputResultBatch(logOutputResults);
            log.warn("结束executeAsync");
}最后 {
            countDownLatch.countDown();//很关键,无论上面的程序是否异常必须执行countDown,否则等待无法释放
        }
    }
}

创建多线程批量插入具体业务方法

@Override
public inttestMultiThread() {
列表 logOutputResults = getTestData();
//每插入100条数据就测试开一个线程
List>lists = ConvertHandler.splitList(logOutputResults, 100);
CountDownLatch countDownL atch= CountDownLatch(lists.size());
for (List listSub:lists) {
async Service.executeAsync(listSub, logOutputResultMap)每,countDownLatch);
} 尝试
{
Countdownlatch.await (); // 保证前面的线程都执行完毕才进入后面;获取以下所有线程执行的汇总结果
} catch(例外 e){
                                                                                                                                 +e.getMessage( );

模拟2000003条数据进行测试