Spring Batch Multiple Threads

我正在写一个Spring Batch,想要在需要时扩展它。 我的ApplicationContext看起来像这样

@Configuration @EnableBatchProcessing @EnableTransactionManagement @ComponentScan(basePackages = "in.springbatch") @PropertySource(value = {"classpath:springbatch.properties"}) public class ApplicationConfig { @Autowired Environment environment; @Autowired private JobBuilderFactory jobs; @Autowired private StepBuilderFactory stepBuilderFactory; @Bean public Job job() throws Exception { return jobs.get("spring_batch") .flow(step()).end() .build(); } @Bean(name = "dataSource", destroyMethod = "close") public DataSource dataSource() { BasicDataSource basicDataSource = new BasicDataSource(); return basicDataSource; } @Bean public JobRepository jobRepository() throws Exception { JobRepositoryFactoryBean jobRepositoryFactoryBean = new JobRepositoryFactoryBean(); jobRepositoryFactoryBean.setTransactionManager(transactionManager()); jobRepositoryFactoryBean.setDataSource(dataSource()); return jobRepositoryFactoryBean.getObject(); } @Bean(name = "batchstep") public Step step() throws Exception { return stepBuilderFactory.get("batchstep").allowStartIfComplete(true). transactionManager(transactionManager()). chunk(2).reader(batchReader()).processor(processor()).writer(writer()).build(); } @Bean ItemReader batchReader() throws Exception { System.out.println(Thread.currentThread().getName()+"reader"); HibernateCursorItemReader hibernateCursorItemReader = new HibernateCursorItemReader(); hibernateCursorItemReader.setQueryString("from Source"); hibernateCursorItemReader.setFetchSize(2); hibernateCursorItemReader.setSessionFactory(sessionFactory().getObject()); hibernateCursorItemReader.close(); return hibernateCursorItemReader; } @Bean public ItemProcessor processor() { return new BatchProcessor(); } @Bean public ItemWriter writer() { return new BatchWriter(); } public TaskExecutor taskExecutor(){ SimpleAsyncTaskExecutor asyncTaskExecutor=new SimpleAsyncTaskExecutor("spring_batch"); asyncTaskExecutor.setConcurrencyLimit(5); return asyncTaskExecutor; } @Bean public LocalSessionFactoryBean sessionFactory() { LocalSessionFactoryBean sessionFactory = new LocalSessionFactoryBean(); sessionFactory.setDataSource(dataSource()); sessionFactory.setPackagesToScan(new String[]{"in.springbatch.entity"}); sessionFactory.setHibernateProperties(hibernateProperties()); return sessionFactory; } @Bean public PersistenceExceptionTranslationPostProcessor exceptionTranslation() { return new PersistenceExceptionTranslationPostProcessor(); } @Bean @Autowired public HibernateTransactionManager transactionManager() { HibernateTransactionManager txManager = new HibernateTransactionManager(); txManager.setSessionFactory(sessionFactory().getObject()); return txManager; } Properties hibernateProperties() { return new Properties() { { setProperty("hibernate.hbm2ddl.auto", environment.getProperty("hibernate.hbm2ddl.auto")); setProperty("hibernate.dialect", environment.getProperty("hibernate.dialect")); setProperty("hibernate.globally_quoted_identifiers", "false"); } }; } 

}

  1. 通过以上配置,我可以从DB读取,处理数据并写入DB。
  2. 我使用块大小为2并使用HibernateCusrsorItem读取器从游标读取2条记录,我从DB读取的查询基于日期来选择当前日期记录。
  3. 到目前为止,我能够实现所需的行为以及重新启动function,仅使用作业选择由于先前运行失败而未处理的记录。

现在我的要求是批量使用多个线程来处理数据并写入数据库。

我的处理器和编写器看起来像这样

 @Component public class BatchProcessor implements ItemProcessor{ @Override public DestinationDto process(Source source) throws Exception { System.out.println(Thread.currentThread().getName()+":"+source); DestinationDto destination=new DestinationDto(); destination.setName(source.getName()); destination.setValue(source.getValue()); destination.setSourceId(source.getSourceId().toString()); return destination; } @Component public class BatchWriter implements ItemWriter{ @Autowired IBatchDao batchDao; @Override public void write(List list) throws Exception { System.out.println(Thread.currentThread().getName()+":"+list); batchDao.saveToDestination((List)list); } 

我更新了我的步骤并添加了一个ThreadPoolTask​​Executor,如下所示

 @Bean(name = "batchstep") public Step step() throws Exception { return stepBuilderFactory.get("batchstep").allowStartIfComplete(true). transactionManager(transactionManager()).chunk(1).reader(batchReader()). processor(processor()).writer(writer()).taskExecutor(taskExecutor()).build(); } 

在此之后,我的处理器被多个线程调用但具有相同的源数据。 我需要做什么额外的事吗?

这是一个很大的问题

  1. 获得一个好答案的最佳选择是查看Spring批处理文档中的Scaling and Parallel Processing章节( 这里 )

  2. 弹簧批处理示例中可能有一些multithreading样本( 这里 )

  3. 线程化Spring批处理作业的一种简单方法是创建一个未来处理器 – 您将所有处理逻辑放在一个Future Object中,而spring-processor类只会将Objects添加到未来。 然后你的编写器类在执行写入过程之前等待将来完成。 对不起,我也没有给你点样的例子 – 但如果你有具体的问题,我可以试着回答!