Spring Batch Partitioning在itemReader中注入stepExecutionContext参数
我正在尝试使用Partitioner学习Spring Batch。
问题是我需要从Partitioner实现动态设置文件名。 我试图在itemReader
获取它。 但它给出了文件名null 。
我的Spring Batch配置:
@Bean @StepScope public ItemReader itemReader(@Value("#{stepExecutionContext[filename]}") String filename) throws UnexpectedInputException, ParseException { FlatFileItemReader reader = new FlatFileItemReader(); DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer(); String[] tokens = { "username", "userid", "transactiondate", "amount" }; tokenizer.setNames(tokens); reader.setResource(new ClassPathResource( "input/"+filename)); DefaultLineMapper lineMapper = new DefaultLineMapper(); lineMapper.setLineTokenizer(tokenizer); lineMapper.setFieldSetMapper(new RecordFieldSetMapper()); reader.setLinesToSkip(1); reader.setLineMapper(lineMapper); return reader; } @Bean(name = "partitioningJob") public Job partitioningJob() throws UnexpectedInputException, MalformedURLException, ParseException { return jobs.get("partitioningJob").listener(jobListener()).start(partitionStep()).build(); } @Bean public Step partitionStep() throws UnexpectedInputException, MalformedURLException, ParseException { return steps.get("partitionStep").partitioner(step2()).partitioner("step2", partitioner()).gridSize(2).taskExecutor(taskExecutor).build(); } @Bean public Step step2() throws UnexpectedInputException, MalformedURLException, ParseException { return steps.get("step2"). chunk(1).reader(itemReader(null)).processor(itemProcessor()).writer(itemWriter(marshaller(),null)).build(); } @Bean public TransactionPartitioner partitioner() { TransactionPartitioner partitioner = new TransactionPartitioner(); return partitioner; } @Bean public JobListener jobListener() { return new JobListener(); } @Bean public TaskExecutor taskExecutor() { ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor(); taskExecutor.setMaxPoolSize(2); taskExecutor.setQueueCapacity(2); taskExecutor.setCorePoolSize(2); taskExecutor.afterPropertiesSet(); return taskExecutor; }
我的TransactionPartitioner
类是:
public class TransactionPartitioner implements Partitioner { public Map partition(int range) { Map result = new HashMap(); for (int i = 1; i <= range; i++) { ExecutionContext exContext = new ExecutionContext(); exContext.put("filename", "input"+i+".csv"); exContext.put("name", "Thread" + i); result.put("partition" + i, exContext); } return result; } }
这不是正确的方法吗? 请建议。
这是堆栈跟踪:
18:23:39.060 [main] DEBUG org.springframework.batch.core.job.AbstractJob - Upgrading JobExecution status: StepExecution: id=1, version=2, name=partitionStep, status=FAILED, exitStatus=FAILED, readCount=0, filterCount=0, writeCount=0 readSkipCount=0, writeSkipCount=0, processSkipCount=0, commitCount=0, rollbackCount=0, exitDescription=org.springframework.batch.core.JobExecutionException: Partition handler returned an unsuccessful step at org.springframework.batch.core.partition.support.PartitionStep.doExecute(PartitionStep.java:112) at org.springframework.batch.core.step.AbstractStep.execute(AbstractStep.java:200) at org.springframework.batch.core.job.SimpleStepHandler.handleStep(SimpleStepHandler.java:148) at org.springframework.batch.core.job.AbstractJob.handleStep(AbstractJob.java:392) at org.springframework.batch.core.job.SimpleJob.doExecute(SimpleJob.java:135) at org.springframework.batch.core.job.AbstractJob.execute(AbstractJob.java:306) at org.springframework.batch.core.launch.support.SimpleJobLauncher$1.run(SimpleJobLauncher.java:135) at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50) at org.springframework.batch.core.launch.support.SimpleJobLauncher.run(SimpleJobLauncher.java:128) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:317) at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:190) at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:157) at org.springframework.batch.core.configuration.annotation.SimpleBatchConfiguration$PassthruAdvice.invoke(SimpleBatchConfiguration.java:127) at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:179) at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:207) at com.sun.proxy.$Proxy19.run(Unknown Source) at org.baeldung.spring_batch_intro.App.main(App.java:24) ; org.springframework.batch.item.ItemStreamException: Failed to initialize the reader at org.springframework.batch.item.support.AbstractItemCountingItemStreamItemReader.open(AbstractItemCountingItemStreamItemReader.java:147) at org.springframework.batch.item.support.CompositeItemStream.open(CompositeItemStream.java:96) at org.springframework.batch.core.step.tasklet.TaskletStep.open(TaskletStep.java:310) at org.springframework.batch.core.step.AbstractStep.execute(AbstractStep.java:197) at org.springframework.batch.core.partition.support.TaskExecutorPartitionHandler$1.call(TaskExecutorPartitionHandler.java:139) at org.springframework.batch.core.partition.support.TaskExecutorPartitionHandler$1.call(TaskExecutorPartitionHandler.java:136) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.IllegalStateException: Input resource must exist (reader is in 'strict' mode): class path resource [input/null] at org.springframework.batch.item.file.FlatFileItemReader.doOpen(FlatFileItemReader.java:251) at org.springframework.batch.item.support.AbstractItemCountingItemStreamItemReader.open(AbstractItemCountingItemStreamItemReader.java:144) ... 9 more
根据@ Sabir的建议,我检查了我的数据。 步骤上下文表如下所示:
| STEP_EXECUTION_ID | SHORT_CONTEXT | SERIALIZED_CONTEXT | | 1 | {"map":[{"entry":[{"string":"SimpleStepExecutionSplitter.GRID_SIZE","long":2},{"string":["batch.stepType","org.springframework.batch.core.partition.support.PartitionStep"]}]}]} | NULL | 2 | {"map":[{"entry":[{"string":["filename","input2.csv"]},{"string":["name","Thread2"]}]}]} | NULL | | 3 | {"map":[{"entry":[{"string":["filename","input1.csv"]},{"string":["name","Thread1"]}]}]}
以下是完整代码: https : //drive.google.com/file/d/0Bziay9b2ceLbUXdTRnZoSjRfR2s/view?usp =sharing
完成代码并尝试运行它。
目前它没有在范围级别绑定文件名。
您有两个配置文件:
- SpringConfig – 包含与Spring相关的配置bean
- SpringBatchConfig – 包含Spring批次相关的bean
第一个包含注释@EnableBatchProcessing
和@Configuration
。
但是itemReader
在另一个配置文件中定义,该文件不包含任何注释。
你也应该在另一个文件上有@Configuration
。
要么
您可以将这两个注释添加到SpringBatchConfig
配置文件中,并可以在Spring
跳过它们
如果没有这个,这些配置将无法正确读取,并且itemReader
不被视为Step Scoped(即注释@StepScope
不起作用),并且不会在步骤级别绑定值,因此您将获得NULL
值。
它不是应用程序代码调用partition
方法的责任,就像你在下面做的那样,
@Bean public TransactionPartitioner partitioner() { TransactionPartitioner partitioner = new TransactionPartitioner(); partitioner.partition(10); return partitioner; }
Framework将为您调用partition
方法。 您只需要在不显式调用partition(10)
方法的情况下返回Partitioner
。
话虽如此,您需要在分区步骤中设置分区器gridSize
,如下所示,
@Bean public Step partitionStep() throws UnexpectedInputException, MalformedURLException, ParseException { return steps.get("partitionStep").partitioner(step2()).partitioner("step2", partitioner()).gridSize(10).taskExecutor(taskExecutor).build(); }
以上几点可能是您的问题的根本原因。 其余的东西看起来还不错。