Spring Batch Processor

我在Spring Batch中有一个要求,我有一个文件,其中有数千条记录按排序顺序排列。关键字段是产品代码。 该文件可能具有相同产品代码的多个记录。要求是我必须将具有相同产品代码的记录分组(即列表),然后将它们发送到方法即validateProductCodes(List prodCodeList)。 我正在寻找最好的方法。我想到的方法是读取处理器中的每条记录,然后在处理器中为相同的产品代码构建一组记录。如果处理器中的任何一点,如果记录中的产品代码不同于暗示productCode分组已完成,并且可以使用相同的产品代码为该组记录调用validateProductCodes()。我也使用Step.So不会自动意味着进程是multithreading的?含义具有相同productCode的记录组将以multithreading方式处理。请注意。

谢谢

您的问题有两个问题:第一,您想知道如何将项目分组在一起,以及第二个如何处理它们。

为了对它们进行分组,您可以像Luca建议的那样创建一个组阅读器,例如:

public class GroupReader implements ItemReader>{ private SingleItemPeekableItemReader reader; private ItemReader peekReaderDelegate; public void setReader(ItemReader reader) { peekReaderDelegate = reader; } @Override public void afterPropertiesSet() throws Exception { Assert.notNull(peekReaderDelegate, "The 'itemReader' may not be null"); this.reader= new SingleItemPeekableItemReader(); this.reader.setDelegate(delegateReader); } @Override public List read() throws Exception { State state = State.NEW; List group = null; I item = null; while (state != State.COMPLETE) { item = reader.read(); switch (state) { case NEW: { if (item == null) { // end reached state = State.COMPLETE; break; } group = new ArrayList(); group.add(item); state = State.READING; I nextItem = reader.peek(); if (isItAKeyChange(item, nextItem)) { state = State.COMPLETE; } break; } case READING: { group.add(item); // peek and check if there the peeked entry has a new date I nextItem = peekEntry(); if (isItAKeyChange(item, nextItem)) { state = State.COMPLETE; } break; } default: { throw new org.springframework.expression.ParseException(groupCounter, "ParsingError: Reader is in an invalid state"); } } } return group; } } 

对于每个键,此阅读器将返回一个列表,其中包含与此键匹配的所有元素。 因此,分组直接在读者中完成。 正如您所描述的那样,您无法使用处理器执行此操作。

关于multithreading的第二个问题。 现在,使用步骤并不一定意味着,使用多个线程处理该步骤。

为此,您需要设置AsyncTaskExecutor,并且必须设置限制。

但是,如果您这样做,您的读者必须是线程安全的,否则您的分组将无法正常工作。 你可以通过简单地将上面的read方法定义为synchronized来实现。

另一种方法是编写一个小的SynchronizedWrapperReader,如这个问题所示: Parellel Processing Spring Batch StaxEventItemReader

请注意,根据您要写入的目标,您可能还必须同步编写器,并在必要时重新排序结果。