是否有一种优雅的方式来处理块中的流?

我的确切场景是批量插入数据库,所以我想累积DOM对象然后每1000个,刷新它们。

我通过将代码放入累加器来检测丰满度然后刷新来实现它,但这似乎是错误的 – 刷新控件应该来自调用者。

我可以将流转换为List然后以迭代方式使用subList,但这似乎也很笨拙。

它有一个简洁的方法来采取行动每n个元素然后继续流,而只处理流一次?

优雅是旁观者的眼睛。 如果你不介意在groupingBy使用有状态函数,你可以这样做:

 AtomicInteger counter = new AtomicInteger(); stream.collect(groupingBy(x->counter.getAndIncrement()/chunkSize)) .values() .forEach(database::flushChunk); 

与原始解决方案相比,这不会赢得任何性能或内存使用点,因为它在执行任何操作之前仍会实现整个流。

如果您想避免实现列表,流API将无法帮助您。 你必须得到流的迭代器或分裂器,并做这样的事情:

 Spliterator split = stream.spliterator(); int chunkSize = 1000; while(true) { List chunk = new ArrayList<>(size); for (int i = 0; i < chunkSize && split.tryAdvance(chunk::add); i++){}; if (chunk.isEmpty()) break; database.flushChunk(chunk); } 

使用库StreamEx解决方案看起来像

 Stream stream = IntStream.iterate(0, i -> i + 1).boxed().limit(15); AtomicInteger counter = new AtomicInteger(0); int chunkSize = 4; StreamEx.of(stream) .groupRuns((prev, next) -> counter.incrementAndGet() % chunkSize != 0) .forEach(chunk -> System.out.println(chunk)); 

输出:

 [0, 1, 2, 3] [4, 5, 6, 7] [8, 9, 10, 11] [12, 13, 14] 

groupRuns接受谓词,该谓词决定2个元素是否应该在同一个组中。

一旦找到不属于它的第一个元素,它就会生成一个组。

如果你的项目有guava依赖,你可以这样做:

 StreamSupport.stream(Iterables.partition(simpleList, 1000).spliterator(), false).forEach(...); 

请参阅https://google.github.io/guava/releases/23.0/api/docs/com/google/common/collect/Lists.html#partition-java.util.List-int-

正如Misha所说的那样,优雅是旁观者的眼睛。 我个人认为一个优雅的解决方案是让插入数据库的类执行此任务。 类似于BufferedWriter 。 这样,它不依赖于您的原始数据结构,甚至可以在多个流之后使用。 我不确定这是否正是您在累加器中使用您认为错误的代码的意思。 我不认为这是错误的,因为现有的类如BufferedWriter这种方式工作。 通过这种方式调用者可以通过在编写器上随时调用flush()来获得一些刷新控制。

类似下面的代码。

 class BufferedDatabaseWriter implements Flushable { List buffer = new LinkedList(); public void write(DomObject o) { buffer.add(o); if(buffer.length > 1000) flush(); } public void flush() { //write buffer to database and clear it } } 

现在您的流处理如下:

 BufferedDatabaseWriter writer = new BufferedDatabaseWriter(); stream.forEach(o -> writer.write(o)); //if you have more streams stream2.forEach(o -> writer.write(o)); writer.flush(); 

如果要使用multithreading,可以运行flush异步。 从流中获取不能并行进行,但我认为无论如何都无法从流并行计算1000个元素。

您还可以扩展编写器以允许在构造函数中设置缓冲区大小,或者您可以使其实现AutoCloseable并在尝试使用AutoCloseable等运行它。 你从BufferedWriter获得的好东西。

您可以创建项目流和给定块大小 的块List

  • 按块索引对项目进行分组(元素索引/块大小)
  • 按索引排序块
  • 仅将地图缩减为有序元素

码:

 public static  Stream> chunked(Stream stream, int chunkSize) { AtomicInteger index = new AtomicInteger(0); return stream.collect(Collectors.groupingBy(x -> index.getAndIncrement() / chunkSize)) .entrySet().stream() .sorted(Map.Entry.comparingByKey()).map(Map.Entry::getValue); } 

用法示例:

 Stream stream = IntStream.range(0, 100).mapToObj(Integer::valueOf); Stream> chunked = chunked(stream, 8); 
Interesting Posts