日期:2023-09-30 04:12
点击关注公众号,实用技术文章随时了解最新 前言 开发目的: 提高百万数据插入效率。 制定计划: 利用 ThreadPoolTaskExecutor 进行多线程批量插入。 所用技术: 弹簧boot2.1.1 MybatisPlus3.0.6 swagger2.5.0 Lombok1.18.4 postgresql ThreadPoolTaskExecutor 具体实施细则 application-dev.properties添加线程池配置信息 #异步线程配置#配置核心线程数async.executor.thread.core_pool_size = 30#配置最大线程数async.executor.thread.max_pool_size = 30 # 配置队列大小 async.executor.thread.queue_capacity = 99988 # 配置线程池中线程的名称前缀 async.executor.thread.name.prefix = async -导入DB- spring容器注入线程池bean对象@配置@EnableAsync@Slf4j公共 类 ExecutorConfig { @Value("${async.executor.thread.core_pool_size}") private int corePoolSize; @Value("${async.executor.thread.max_pool_size}") private intmaxPoolSize; @Value("${async.executor.thread.queue_capacity}") private int队列容量; @Value("${async.executor.thread.name.prefix}") private String namePre修复; @ Bean(名称= “asyncServiceExecutor”) public 执行器 asyncServiceExecutor() { log.warn ("start asyncServiceExecutor"); //这里修改 ThreadPoolTaskExecutor执行器= newVisiableThreadPoolTaskExecutor(); //配置核心线程数 executor.setCorePoolSize(corePoolSize); executor.setMaxPoolSize(maxPoolSize); //配置队列大小 executor.setQueueCapacity(queueCapacity); //配置ure 线程池中线程的名称前缀 executor.setThreadPrefixName(namePrefix); //rejection-policy : 当pool达到Max Size时,如何处理新任务 // Caller_Runs: 不执行任务在new 线程,而是调用者所在线程执行 Executor.Setrejecutionhandler ( ( new ThreadPoolExecutor.CallerRunsPolicy()); / /执行初始化 executor.initialize(); 返回执行器; } } 创建异步线程业务类@Service@Slf4j公共 类 AsyncServiceImpl 实现 AsyncService {@Override @Async(“asyncServiceExecutor”) 公共 空 executeAsync(列表 logOutputResults,LogOutputResultMapper logOutputResultMapper,CountDownLatch countDownLatch) { 尝试{ log.warn("开始执行executeAsync"); //异步线程要做的事情 logOutputResultMapper.addLogOutputResultBatch(logOutputResults); log.warn("结束executeAsync"); }最后 { countDownLatch.countDown();//很关键,无论上面的程序是否异常必须执行countDown,否则等待无法释放 } } } 创建多线程批量插入具体业务方法@Overridepublic inttestMultiThread() {列表 logOutputResults = getTestData(); //每插入100条数据就测试开一个线程 List>lists = ConvertHandler.splitList(logOutputResults, 100); CountDownLatch countDownL atch=新 CountDownLatch(lists.size()); for (List listSub:lists) { async Service.executeAsync(listSub, logOutputResultMap)每,countDownLatch); } 尝试 { Countdownlatch.await (); // 保证前面的线程都执行完毕才进入后面;获取以下所有线程执行的汇总结果 } catch(例外 e){ +e.getMessage( ); 模拟2000003条数据进行测试 多线程测试2000003需要以下时间:1.67分钟 这次开了30个帖子,截图如下: 单线程测试2000003耗时如下:需要5.75分钟 检查多线程录入的数据,是否存在重复录入的问题: 根据id分组,检查是否有id重复的数据,通过sql语句检查,没有发现重复存储问题 检查数据完整性: 通过sql语句查询,多线程数据录入完成 测试结果 使用不同数量的线程进行测试: 总结 通过上述测试用例,还导入了2,000,003条数据。多线程耗时1.67分钟,单线程耗时5.75分钟。通过测试不同的线程数量,我们发现线程并不是越多越好,而是具体数量合适。网上有一个不成文的算法:
开发目的:
提高百万数据插入效率。
制定计划:
利用 ThreadPoolTaskExecutor 进行多线程批量插入。
ThreadPoolTaskExecutor
所用技术:
application-dev.properties添加线程池配置信息
application-dev.properties
#异步线程配置#配置核心线程数async.executor.thread.core_pool_size = 30#配置最大线程数async.executor.thread.max_pool_size = 30 # 配置队列大小 async.executor.thread.queue_capacity = 99988 # 配置线程池中线程的名称前缀 async.executor.thread.name.prefix = async -导入DB-
spring容器注入线程池bean对象
@配置@EnableAsync@Slf4j公共 类 ExecutorConfig { @Value("${async.executor.thread.core_pool_size}") private int corePoolSize; @Value("${async.executor.thread.max_pool_size}") private intmaxPoolSize; @Value("${async.executor.thread.queue_capacity}") private int队列容量; @Value("${async.executor.thread.name.prefix}") private String namePre修复; @ Bean(名称= “asyncServiceExecutor”) public 执行器 asyncServiceExecutor() { log.warn ("start asyncServiceExecutor"); //这里修改 ThreadPoolTaskExecutor执行器= newVisiableThreadPoolTaskExecutor(); //配置核心线程数 executor.setCorePoolSize(corePoolSize); executor.setMaxPoolSize(maxPoolSize); //配置队列大小 executor.setQueueCapacity(queueCapacity); //配置ure 线程池中线程的名称前缀 executor.setThreadPrefixName(namePrefix); //rejection-policy : 当pool达到Max Size时,如何处理新任务 // Caller_Runs: 不执行任务在new 线程,而是调用者所在线程执行 Executor.Setrejecutionhandler ( ( new ThreadPoolExecutor.CallerRunsPolicy()); / /执行初始化 executor.initialize(); 返回执行器; } } 创建异步线程业务类@Service@Slf4j公共 类 AsyncServiceImpl 实现 AsyncService {@Override @Async(“asyncServiceExecutor”) 公共 空 executeAsync(列表 logOutputResults,LogOutputResultMapper logOutputResultMapper,CountDownLatch countDownLatch) { 尝试{ log.warn("开始执行executeAsync"); //异步线程要做的事情 logOutputResultMapper.addLogOutputResultBatch(logOutputResults); log.warn("结束executeAsync"); }最后 { countDownLatch.countDown();//很关键,无论上面的程序是否异常必须执行countDown,否则等待无法释放 } } } 创建多线程批量插入具体业务方法@Overridepublic inttestMultiThread() {列表 logOutputResults = getTestData(); //每插入100条数据就测试开一个线程 List>lists = ConvertHandler.splitList(logOutputResults, 100); CountDownLatch countDownL atch=新 CountDownLatch(lists.size()); for (List listSub:lists) { async Service.executeAsync(listSub, logOutputResultMap)每,countDownLatch); } 尝试 { Countdownlatch.await (); // 保证前面的线程都执行完毕才进入后面;获取以下所有线程执行的汇总结果 } catch(例外 e){ +e.getMessage( ); 模拟2000003条数据进行测试 多线程测试2000003需要以下时间:1.67分钟 这次开了30个帖子,截图如下: 单线程测试2000003耗时如下:需要5.75分钟 检查多线程录入的数据,是否存在重复录入的问题: 根据id分组,检查是否有id重复的数据,通过sql语句检查,没有发现重复存储问题 检查数据完整性: 通过sql语句查询,多线程数据录入完成 测试结果 使用不同数量的线程进行测试: 总结 通过上述测试用例,还导入了2,000,003条数据。多线程耗时1.67分钟,单线程耗时5.75分钟。通过测试不同的线程数量,我们发现线程并不是越多越好,而是具体数量合适。网上有一个不成文的算法:
创建异步线程业务类
@Service@Slf4j公共 类 AsyncServiceImpl 实现 AsyncService {@Override @Async(“asyncServiceExecutor”) 公共 空 executeAsync(列表 logOutputResults,LogOutputResultMapper logOutputResultMapper,CountDownLatch countDownLatch) { 尝试{ log.warn("开始执行executeAsync"); //异步线程要做的事情 logOutputResultMapper.addLogOutputResultBatch(logOutputResults); log.warn("结束executeAsync"); }最后 { countDownLatch.countDown();//很关键,无论上面的程序是否异常必须执行countDown,否则等待无法释放 } } }
创建多线程批量插入具体业务方法
@Overridepublic inttestMultiThread() {列表 logOutputResults = getTestData(); //每插入100条数据就测试开一个线程 List>lists = ConvertHandler.splitList(logOutputResults, 100); CountDownLatch countDownL atch=新 CountDownLatch(lists.size()); for (List listSub:lists) { async Service.executeAsync(listSub, logOutputResultMap)每,countDownLatch); }
模拟2000003条数据进行测试
多线程测试2000003需要以下时间:1.67分钟
这次开了30个帖子,截图如下:
单线程测试2000003耗时如下:需要5.75分钟
检查多线程录入的数据,是否存在重复录入的问题:
根据id分组,检查是否有id重复的数据,通过sql语句检查,没有发现重复存储问题
检查数据完整性:
通过sql语句查询,多线程数据录入完成
使用不同数量的线程进行测试:
通过上述测试用例,还导入了2,000,003条数据。多线程耗时1.67分钟,单线程耗时5.75分钟。通过测试不同的线程数量,我们发现线程并不是越多越好,而是具体数量合适。网上有一个不成文的算法:
关灯