Spring Batch Multi Threading – 如何让每个线程读取唯一记录?

许多论坛已经多次询问过这个问题。 但我没有看到适合我的答案。 我试图在我的春季批处理实现中实现multithreading步骤。

  1. 有一个包含100k记录的临时表

  2. 想要在每个线程的10个提交间隔300个线程中处理它 – 所以在任何时间点都有3000个记录。

  3. 我定义了一个任务执行器,并在我想要multithreading的步骤中引用它

  4. 我的想法是,首先我将获得线程池大小(10)并使用velue(可以是1-10)更新thread_id列到每个100k记录。 在这种情况下,有10个线程和100k记录,因此10k记录将被分配一个id – 我正在尝试实现一个stagingsteplistener来执行此操作。

  5. 为这个临时表写了一个读者。 任务执行器将创建10个读者,每个读者必须读取300个不同的记录并处理它们 – 现在我如何在步骤监听器和读取器之间传递一个公共ID,以便每个线程都有自己的一组记录来处理。

截至目前,我只有一个JVM。 所以我想在Multi Threaded步骤中做这个,而不是考虑基于分区的方法。

请帮忙……

我提到了pro spring批处理书并创建了一个临时步骤监听器,它使用作业参数从作业配置xml接受运行ID,如下所示

      

我找不到的是这个? 这个“run.id”来自哪里。 我在书中的任何地方都没有看到。 我在我的spring批处理中复制了相同的实现,当我运行它时,我看到exception说run.id是不可识别的。 请帮我讲一下如何做到这一点?


  • 我找不到的是这个? 这个“run.id”来自哪里

JobParameters

这只是传递给jobParameters的参数。 通常使用每个实例的不同run.id (传统名称),因为框架无法知道JobParameters的哪些更改使其成为“下一个”作业实例。

您可以将此“run.id”传递给jobParameters:

 new JobParametersBuilder().addLong("run.id", 1L).toJobParameters() 

有关详细信息,请查看JobParametersIncrementer的文档 。


  • 如何在步骤侦听器和读取器之间传递公共ID,以便每个线程都有自己的一组记录来处理

这是一条非常危险的路线,因为Step中的许多参与者(例如读者和编写者)都是有状态的,如果状态没有被线程隔离,那么这些组件在multithreading步骤中是不可用的。 特别是Spring Batch的大多数现成的读者和作者都不是为multithreading使用而设计的。

分区

我建议使用分区 。 它比看起来简单得多,你仍然可以使用多个线程 。 看一下使用分区的样本批处理作业,它来自“Spring Batch samples”,它是:

使用PartitionHandler SPI显示multithreading步骤执行。 该示例使用TaskExecutorPartitionHandler来传播读取多个线程的一些文件的工作,每个线程执行一步。 关键组件是PartitionStep和MultiResourcePartitioner,负责划分工作。 请注意,正在进行分区的Step中的读取器和编写器是步进式的,因此它们的状态不会在执行的线程之间共享。