解决多线程批量插入中的事物一致性问题

您所在的位置:网站首页 如何进行数据入库 解决多线程批量插入中的事物一致性问题

解决多线程批量插入中的事物一致性问题

2024-07-13 19:35| 来源: 网络整理| 查看: 265

在开发当中我们,我们有时为了加快执行速度,会使用到多线程!尤其在大量数据插入数据库时,我们需要使用多线程进行批量插入,加快程序的执行效率!

但是由于多线程中每个线程的事物是不一致的导致程序不一致!为了保证多个线程执行插入时,事物的一致性!我们需要保证所有事务统一操作。

实现思路:

1、编程式事务,通过代码控制事务的提交和回滚

2、获取每个线程的执行结果

3、根据每个线程执行结果判断是否需要回滚!

4、捕获到异常时也回滚

具体实现代码 @Service @Slf4j @DS(DynamicDatasourceConstants.DMB_DB) public class TestManyThreadInsert { private final AbnormalNotPresentDao abnormalNotPresentDao; private final SqlSession sqlSession; public TestManyThreadInsert(AbnormalNotPresentDao abnormalNotPresentDao, SqlSession sqlSession) { this.abnormalNotPresentDao = abnormalNotPresentDao; this.sqlSession = sqlSession; } /** * 多线程处理数据 * @param dataList 插入数据库的数据 */ @DS(DynamicDatasourceConstants.DMB_DB) public void saveBatch(List dataList) { // 获取数据库连接,获取会话(内部自有事务) final String push = DynamicDataSourceContextHolder.push(DynamicDatasourceConstants.DMB_DB); Connection connection = null; try { connection = sqlSession.getConnection(); final ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNamePrefix( "algorithmParamConfig-threadName").build(); int threadNum = 5; final ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(threadNum, threadNum, 10000L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(1024), namedThreadFactory, new ThreadPoolExecutor.AbortPolicy()); List splitList = averageAssign(dataList, threadNum); //监控子线程执行完毕,再执行主线程,要不然会导致主线程关闭,子线程也会随着关闭 CountDownLatch countDownLatch = new CountDownLatch(splitList.size()); AtomicBoolean atomicBoolean = new AtomicBoolean(true); // 设置手动提交 connection.setAutoCommit(false); abnormalNotPresentDao.removeById(BigInteger.valueOf(1619972395408429056L)); List futures = new ArrayList(); for (int i = 0; i < threadNum; i++) { if (i == splitList.size() - 1) { atomicBoolean.set(false); } List list = splitList.get(i); Future future = threadPoolExecutor.submit(new Callable() { @Override public Object call() throws Exception { try { //最后一个线程抛出异常 if (!atomicBoolean.get()) { throw new BizException(1000255, "出现异常"); } //批量添加,mybatisPlus中自带的batch方法 return abnormalNotPresentDao.saveBatch(list); } finally { countDownLatch.countDown(); } } }); futures.add(future); } //执行子线程 for (Future future : futures) { if (!Boolean.TRUE.equals(future.get())) { connection.rollback(); return; } } connection.commit(); //当子线程执行完毕时,主线程再往下执行 countDownLatch.await(); log.info("执行主线程"); } catch (Exception e) { try { connection.rollback(); } catch (SQLException ex) { throw new RuntimeException(ex); } log.info("线程{}====插入数据错误", Thread.currentThread().getName()); e.printStackTrace(); } finally { try { connection.close(); } catch (SQLException e) { throw new RuntimeException(e); } } } /** * 平均拆分list方法. * * @param source * @param n * @param * @return */ public static List averageAssign(List source, int n) { List result = new ArrayList(); int remaider = source.size() % n; int number = source.size() / n; int offset = 0;//偏移量 for (int i = 0; i < n; i++) { List value = null; if (remaider > 0) { value = source.subList(i * number + offset, (i + 1) * number + offset + 1); remaider--; offset++; } else { value = source.subList(i * number + offset, (i + 1) * number + offset); } result.add(value); } return result; } }

上面是主要代码,其他代码和往常crud一样!



【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3