Spring Batch – 循环读取器/处理器/写入器步骤

回答

根据接受的答案代码,对该代码的以下调整对我有用:

// helper method to create a split flow out of a List of steps private static Flow createParallelFlow(List steps) { SimpleAsyncTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor(); taskExecutor.setConcurrencyLimit(steps.size()); Flow[] flows = new Flow[steps.size()]; for (int i = 0; i < steps.size(); i++) { flows[i] = new FlowBuilder(steps.get(i).getName()).start(steps.get(i)).build(); } return new FlowBuilder("parallelStepsFlow") .split(taskExecutor) .add(flows) .build(); } 

编辑

我已经将问题更新为正确循环的版本,但是随着应用程序的扩展,能够并行处理很重要,我仍然不知道如何在运行时动态地使用javaconfig …

精炼问题: 如何在运行时动态创建一个读取器 – 处理器 – 写入器 ,例如5个不同的情况(5个查询意味着现在配置的循环为5)?

我的LoopDecider看起来像这样:

 public class LoopDecider implements JobExecutionDecider { private static final Logger LOG = LoggerFactory.getLogger(LoopDecider.class); private static final String COMPLETED = "COMPLETED"; private static final String CONTINUE = "CONTINUE"; private static final String ALL = "queries"; private static final String COUNT = "count"; private int currentQuery; private int limit; @SuppressWarnings("unchecked") @Override public FlowExecutionStatus decide(JobExecution jobExecution, StepExecution stepExecution) { List allQueries = (List) jobExecution.getExecutionContext().get(ALL); this.limit = allQueries.size(); jobExecution.getExecutionContext().put(COUNT, currentQuery); if (++currentQuery >= limit) { return new FlowExecutionStatus(COMPLETED); } else { LOG.info("Looping for query: " + allQueries.get(currentQuery - 1)); return new FlowExecutionStatus(CONTINUE); } } } 

基于查询列表(HQL查询),我想要每个查询的读取器 – 处理器 – 编写器。 我当前的配置如下所示:

工作

 @Bean public Job subsetJob() throws Exception { LoopDecider loopDecider = new LoopDecider(); FlowBuilder flowBuilder = new FlowBuilder(FLOW_NAME); Flow flow = flowBuilder .start(createHQL()) .next(extractData()) .next(loopDecider) .on("CONTINUE") .to(extractData()) .from(loopDecider) .on("COMPLETED") .end() .build(); return jobBuilderFactory.get("subsetJob") .start(flow) .end() .build(); } 

 public Step extractData(){ return stepBuilderFactory.get("extractData") .chunk(100_000) .reader(reader()) .processor(processor()) .writer(writer()) .build(); } 

读者

 public HibernateCursorItemReader reader(){ CustomHibernateCursorItemReader reader = new CustomHibernateCursorItemReader(); reader.setSessionFactory(HibernateUtil.getSessionFactory()); reader.setUseStatelessSession(false); return reader; } 

处理器

 public DynamicRecordProcessor processor(){ return new DynamicRecordProcessor(); } 

作家

 public FlatFileItemWriter writer(){ CustomFlatFileItemWriter writer = new CustomFlatFileItemWriter(); writer.setLineAggregator(new DelimitedLineAggregator(){{ setDelimiter(TARGET_DELIMITER); setFieldExtractor(new PassThroughFieldExtractor()); }} ); return writer; } 

目前,该过程适用于单个查询。 但是,我实际上有一个查询列表。

我最初的想法是循环步骤并将步骤传递给查询列表,并为每个查询读取 – 进程 – 写入。 这对于并行分块也是理想的选择。

但是,当我将查询列表作为参数添加到extractData步骤时,对于每个查询,我创建一个步骤,返回一个步骤列表,而不是预期的单个步骤。 工作开始抱怨它需要一步而不是一系列步骤。

另一个想法是创建一个自定义MultiHibernateCursorItemReader与MultiItemResourceReader具有相同的想法,但我真的在寻找一个更开箱即用的解决方案。

 @Bean public List extractData(@Value("#{jobExecutionContext[HQL]}") List queries){ List steps = new ArrayList(); for (String query : queries) { steps.add(stepBuilderFactory.get("extractData") .chunk(100_000) .reader(reader(query)) .processor(processor()) .writer(writer(query)) .build()); } return steps; } 


如何循环步骤并将其集成到作业中?

不要将您的步骤,读者,处理器和作者实例化为Spring-Beans。 没有必要这样做。 只有你的作业实例必须是一个Spring Bean。

因此,只需从步骤,读取器,编写器和处理器创建者方法中删除@Bean和@StepScope配置,并在需要时将其实例化。

只有一个catch,你必须手动调用afterPropertiesSet()。 例如:

 // @Bean -> delete // @StepScope -> delete public FlatFileItemWriter writer(@Value("#{jobExecutionContext[fileName]}") String fileName){ FlatFileItemWriter writer = new FlatFileItemWriter(); writer.setResource(new FileSystemResource(new File(TARGET_LOCATION + fileName + TARGET_FILE_EXTENSION))); writer.setLineAggregator(new DelimitedLineAggregator(){{ setDelimiter(TARGET_DELIMITER); setFieldExtractor(new PassThroughFieldExtractor()); }} ); // ------- ADD!! writer.afterPropertiesSet(); return writer; } 

这样,您的步骤,阅读器,编写器实例将自动“步长”,因为您明确地为每个步骤实例化它们。

如果我的回答不够明确,请告诉我。 然后我会添加一个更详细的例子。

编辑

一个简单的例子:

 @Configuration public class MyJobConfiguration { @Autowired private JobBuilderFactory jobBuilderFactory; @Autowired private StepBuilderFactory stepBuilderFactory; List filenames = Arrays.asList("file1.txt", "file2.txt"); @Bean public Job myJob() { List steps = filenames.stream().map(name -> createStep(filename)); return jobBuilderFactory.get("subsetJob") .start(createParallelFlow(steps)); .end() .build(); } // helper method to create a step private Step createStep(String filename) { { return stepBuilderFactory.get("convertStepFor" + filename); // !!! Stepname has to be unique .chunk(100_000) .reader(createFileReader(new FileSystemResource(new File(filename)), new YourInputLineMapper())); .processor(new YourConversionProcessor()); .writer(createFileWriter(new FileSystemResource(new File("converted_"+filename)), new YourOutputLineAggregator())); .build(); } // helper method to create a split flow out of a List of steps private static Flow createParallelFlow(List steps) { SimpleAsyncTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor(); taskExecutor.setConcurrencyLimit(steps.size()); List flows = steps.stream() // we have to convert the steps to a flows .map(step -> // new FlowBuilder("flow_" + step.getName()) // .start(step) // .build()) // .collect(Collectors.toList()); return new FlowBuilder("parallelStepsFlow").split(taskExecutor) // .add(flows.toArray(new Flow[flows.size()])) // .build(); } // helper methods to create filereader and filewriters public static  ItemReader createFileReader(Resource source, LineMapper lineMapper) throws Exception { FlatFileItemReader reader = new FlatFileItemReader<>(); reader.setEncoding("UTF-8"); reader.setResource(source); reader.setLineMapper(lineMapper); reader.afterPropertiesSet(); return reader; } public static  ItemWriter createFileWriter(Resource target, LineAggregator aggregator) throws Exception { FlatFileItemWriter writer = new FlatFileItemWriter<>(); writer.setEncoding("UTF-8"); writer.setResource(target); writer.setLineAggregator(aggregator); writer.afterPropertiesSet(); return writer; } }