Spring批处理聚合值并写入单个值

我正在使用spring批次,我需要实现以下目标

  1. 阅读csv文件,其中包含日期和金额等详细信息
  2. 汇总同一日期的所有金额的总和
  3. 持有一个日期和总和的条目

我过去使用过批处理,我想到了以下方法。 创建一个包含2个步骤的批处理

步骤1:

  1. Reader:使用FlatFileItemReader遍历整个文件
  2. 处理器:使用Key填充地图作为日期和值作为金额。 如果存在条目,则获取该值并将其添加到新值
  3. 作家:没有作家,因为我不想写

第2步:

  1. Reader:循环遍历地图的值
  2. 作家:坚持价值观

我能够实现第一步,我填充了Map 。 此Map已使用@JobScope声明

我陷入困境,如何为step2创建只需读取值列表的阅读器。 我尝试了ListItemReader但我无法从ListItemReader访问Map

请告知解决方案,或者您是否有更好的方法来解决这个问题

谢谢

选项1:如果你的cvs已按日期排序,你可以实现一个组阅读器,它读取行直到键值发生变化。 之后,整个组可以作为一个项目传递给处理器。

这样的群组阅读器可能如下所示:

  private SingleItemPeekableItemReader reader; private ItemReader peekReaderDelegate; @Override public void afterPropertiesSet() throws Exception { Assert.notNull(peekReaderDelegate, "The 'itemReader' may not be null"); this.reader= new SingleItemPeekableItemReader(); this.reader.setDelegate(peekReaderDelegate); } @Override // GroupDTO is just a simple container. It is also possible to use // List instead of GroupDTO public GroupDTO read() throws Exception { State state = State.NEW; // a simple enum with the states NEW, READING, and COMPLETE GroupDTO group = null; I item = null; while (state != State.COMPLETE) { item = reader.read(); switch (state) { case NEW: { if (item == null) { // end reached state = State.COMPLETE; break; } group = new GroupDTO(); group.addItem(item); state = State.READING; I nextItem = reader.peek(); // isGroupBreak returns true, if 'item' and 'nextItem' do NOT belong to the same group if (nextItem == null || getGroupBreakStrategy.isGroupBreak(item, nextItem)) { state = State.COMPLETE; } break; } case READING: { group.addItem(item); // peek and check if there the peeked entry has a new date I nextItem = peekEntry(); // isGroupBreak returns true, if 'item' and 'nextItem' do NOT belong to the same group if (nextItem == null || getGroupBreakStrategy.isGroupBreak(item, nextItem)) { state = State.COMPLETE; } break; } default: { throw new org.springframework.expression.ParseException(groupCounter, "ParsingError: Reader is in an invalid state"); } } } return group; } 

您需要一个SingleItemPeekableItemReader,以便预读下一个元素。 这个包装了你的普通读者。

选项2:第一步是你提出的,但只是为第2步编写一个tasklet。不需要使用reader-process-writer方法,而是可以使用一个简单的tasklet将地图的内容写入文件。

选项3:如果您真的想在步骤2中使用读取器 – 处理器 – 写入器方法,请编写自己的读取器,迭代您的地图。

类似的东西(我没有测试那段代码):

 public class MapReader implements ItemReader { private MapContainer container; private Iterator mapIterator; @PostConstruct public void afterPropertiesSet() { Assert.notNull(container); iterator = container.getMap().entry().iterator; } public void setMapContainer(MapContainer container) { this.container = container; } public Map.Entry read() { if (iterator.hasNext()) { return iterator.next(); } return null; } } @Component public class MapContainer { private Map data = new Hashmap<>(); public Map getMap() { return data; } // add modifier method as needed for step 1 } 

所以,你为Container创建一个spring-bean实例,将它注入你的步骤2的处理器,填充它,也将它注入上面的阅读器。