Spring Batch Partitioning在itemReader中注入stepExecutionContext参数

我正在尝试使用Partitioner学习Spring Batch。

问题是我需要从Partitioner实现动态设置文件名。 我试图在itemReader获取它。 但它给出了文件名null

我的Spring Batch配置:

 @Bean @StepScope public ItemReader itemReader(@Value("#{stepExecutionContext[filename]}") String filename) throws UnexpectedInputException, ParseException { FlatFileItemReader reader = new FlatFileItemReader(); DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer(); String[] tokens = { "username", "userid", "transactiondate", "amount" }; tokenizer.setNames(tokens); reader.setResource(new ClassPathResource( "input/"+filename)); DefaultLineMapper lineMapper = new DefaultLineMapper(); lineMapper.setLineTokenizer(tokenizer); lineMapper.setFieldSetMapper(new RecordFieldSetMapper()); reader.setLinesToSkip(1); reader.setLineMapper(lineMapper); return reader; } @Bean(name = "partitioningJob") public Job partitioningJob() throws UnexpectedInputException, MalformedURLException, ParseException { return jobs.get("partitioningJob").listener(jobListener()).start(partitionStep()).build(); } @Bean public Step partitionStep() throws UnexpectedInputException, MalformedURLException, ParseException { return steps.get("partitionStep").partitioner(step2()).partitioner("step2", partitioner()).gridSize(2).taskExecutor(taskExecutor).build(); } @Bean public Step step2() throws UnexpectedInputException, MalformedURLException, ParseException { return steps.get("step2"). chunk(1).reader(itemReader(null)).processor(itemProcessor()).writer(itemWriter(marshaller(),null)).build(); } @Bean public TransactionPartitioner partitioner() { TransactionPartitioner partitioner = new TransactionPartitioner(); return partitioner; } @Bean public JobListener jobListener() { return new JobListener(); } @Bean public TaskExecutor taskExecutor() { ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor(); taskExecutor.setMaxPoolSize(2); taskExecutor.setQueueCapacity(2); taskExecutor.setCorePoolSize(2); taskExecutor.afterPropertiesSet(); return taskExecutor; } 

我的TransactionPartitioner类是:

 public class TransactionPartitioner implements Partitioner { public Map partition(int range) { Map result = new HashMap(); for (int i = 1; i <= range; i++) { ExecutionContext exContext = new ExecutionContext(); exContext.put("filename", "input"+i+".csv"); exContext.put("name", "Thread" + i); result.put("partition" + i, exContext); } return result; } } 

这不是正确的方法吗? 请建议。

这是堆栈跟踪:

  18:23:39.060 [main] DEBUG org.springframework.batch.core.job.AbstractJob - Upgrading JobExecution status: StepExecution: id=1, version=2, name=partitionStep, status=FAILED, exitStatus=FAILED, readCount=0, filterCount=0, writeCount=0 readSkipCount=0, writeSkipCount=0, processSkipCount=0, commitCount=0, rollbackCount=0, exitDescription=org.springframework.batch.core.JobExecutionException: Partition handler returned an unsuccessful step at org.springframework.batch.core.partition.support.PartitionStep.doExecute(PartitionStep.java:112) at org.springframework.batch.core.step.AbstractStep.execute(AbstractStep.java:200) at org.springframework.batch.core.job.SimpleStepHandler.handleStep(SimpleStepHandler.java:148) at org.springframework.batch.core.job.AbstractJob.handleStep(AbstractJob.java:392) at org.springframework.batch.core.job.SimpleJob.doExecute(SimpleJob.java:135) at org.springframework.batch.core.job.AbstractJob.execute(AbstractJob.java:306) at org.springframework.batch.core.launch.support.SimpleJobLauncher$1.run(SimpleJobLauncher.java:135) at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50) at org.springframework.batch.core.launch.support.SimpleJobLauncher.run(SimpleJobLauncher.java:128) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:317) at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:190) at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:157) at org.springframework.batch.core.configuration.annotation.SimpleBatchConfiguration$PassthruAdvice.invoke(SimpleBatchConfiguration.java:127) at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:179) at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:207) at com.sun.proxy.$Proxy19.run(Unknown Source) at org.baeldung.spring_batch_intro.App.main(App.java:24) ; org.springframework.batch.item.ItemStreamException: Failed to initialize the reader at org.springframework.batch.item.support.AbstractItemCountingItemStreamItemReader.open(AbstractItemCountingItemStreamItemReader.java:147) at org.springframework.batch.item.support.CompositeItemStream.open(CompositeItemStream.java:96) at org.springframework.batch.core.step.tasklet.TaskletStep.open(TaskletStep.java:310) at org.springframework.batch.core.step.AbstractStep.execute(AbstractStep.java:197) at org.springframework.batch.core.partition.support.TaskExecutorPartitionHandler$1.call(TaskExecutorPartitionHandler.java:139) at org.springframework.batch.core.partition.support.TaskExecutorPartitionHandler$1.call(TaskExecutorPartitionHandler.java:136) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.IllegalStateException: Input resource must exist (reader is in 'strict' mode): class path resource [input/null] at org.springframework.batch.item.file.FlatFileItemReader.doOpen(FlatFileItemReader.java:251) at org.springframework.batch.item.support.AbstractItemCountingItemStreamItemReader.open(AbstractItemCountingItemStreamItemReader.java:144) ... 9 more 

根据@ Sabir的建议,我检查了我的数据。 步骤上下文表如下所示:

  | STEP_EXECUTION_ID | SHORT_CONTEXT | SERIALIZED_CONTEXT | | 1 | {"map":[{"entry":[{"string":"SimpleStepExecutionSplitter.GRID_SIZE","long":2},{"string":["batch.stepType","org.springframework.batch.core.partition.support.PartitionStep"]}]}]} | NULL | 2 | {"map":[{"entry":[{"string":["filename","input2.csv"]},{"string":["name","Thread2"]}]}]} | NULL | | 3 | {"map":[{"entry":[{"string":["filename","input1.csv"]},{"string":["name","Thread1"]}]}]} 

以下是完整代码: https //drive.google.com/file/d/0Bziay9b2ceLbUXdTRnZoSjRfR2s/view?usp =sharing

完成代码并尝试运行它。

目前它没有在范围级别绑定文件名。

您有两个配置文件:

  1. SpringConfig – 包含与Spring相关的配置bean
  2. SpringBatchConfig – 包含Spring批次相关的bean

第一个包含注释@EnableBatchProcessing@Configuration

但是itemReader在另一个配置文件中定义,该文件不包含任何注释。

你也应该在另一个文件上有@Configuration

要么

您可以将这两个注释添加到SpringBatchConfig配置文件中,并可以在Spring跳过它们

如果没有这个,这些配置将无法正确读取,并且itemReader不被视为Step Scoped(即注释@StepScope不起作用),并且不会在步骤级别绑定值,因此您将获得NULL值。

它不是应用程序代码调用partition方法的责任,就像你在下面做的那样,

 @Bean public TransactionPartitioner partitioner() { TransactionPartitioner partitioner = new TransactionPartitioner(); partitioner.partition(10); return partitioner; } 

Framework将为您调用partition方法。 您只需要在不显式调用partition(10)方法的情况下返回Partitioner

话虽如此,您需要在分区步骤中设置分区器gridSize ,如下所示,

 @Bean public Step partitionStep() throws UnexpectedInputException, MalformedURLException, ParseException { return steps.get("partitionStep").partitioner(step2()).partitioner("step2", partitioner()).gridSize(10).taskExecutor(taskExecutor).build(); } 

以上几点可能是您的问题的根本原因。 其余的东西看起来还不错。