是否有一种优雅的方式来处理块中的流?
我的确切场景是批量插入数据库,所以我想累积DOM对象然后每1000个,刷新它们。
我通过将代码放入累加器来检测丰满度然后刷新来实现它,但这似乎是错误的 – 刷新控件应该来自调用者。
我可以将流转换为List然后以迭代方式使用subList,但这似乎也很笨拙。
它有一个简洁的方法来采取行动每n个元素然后继续流,而只处理流一次?
优雅是旁观者的眼睛。 如果你不介意在groupingBy
使用有状态函数,你可以这样做:
AtomicInteger counter = new AtomicInteger(); stream.collect(groupingBy(x->counter.getAndIncrement()/chunkSize)) .values() .forEach(database::flushChunk);
与原始解决方案相比,这不会赢得任何性能或内存使用点,因为它在执行任何操作之前仍会实现整个流。
如果您想避免实现列表,流API将无法帮助您。 你必须得到流的迭代器或分裂器,并做这样的事情:
Spliterator split = stream.spliterator(); int chunkSize = 1000; while(true) { List chunk = new ArrayList<>(size); for (int i = 0; i < chunkSize && split.tryAdvance(chunk::add); i++){}; if (chunk.isEmpty()) break; database.flushChunk(chunk); }
使用库StreamEx解决方案看起来像
Stream stream = IntStream.iterate(0, i -> i + 1).boxed().limit(15); AtomicInteger counter = new AtomicInteger(0); int chunkSize = 4; StreamEx.of(stream) .groupRuns((prev, next) -> counter.incrementAndGet() % chunkSize != 0) .forEach(chunk -> System.out.println(chunk));
输出:
[0, 1, 2, 3] [4, 5, 6, 7] [8, 9, 10, 11] [12, 13, 14]
groupRuns
接受谓词,该谓词决定2个元素是否应该在同一个组中。
一旦找到不属于它的第一个元素,它就会生成一个组。
如果你的项目有guava依赖,你可以这样做:
StreamSupport.stream(Iterables.partition(simpleList, 1000).spliterator(), false).forEach(...);
请参阅https://google.github.io/guava/releases/23.0/api/docs/com/google/common/collect/Lists.html#partition-java.util.List-int-
正如Misha所说的那样,优雅是旁观者的眼睛。 我个人认为一个优雅的解决方案是让插入数据库的类执行此任务。 类似于BufferedWriter
。 这样,它不依赖于您的原始数据结构,甚至可以在多个流之后使用。 我不确定这是否正是您在累加器中使用您认为错误的代码的意思。 我不认为这是错误的,因为现有的类如BufferedWriter
这种方式工作。 通过这种方式调用者可以通过在编写器上随时调用flush()
来获得一些刷新控制。
类似下面的代码。
class BufferedDatabaseWriter implements Flushable { List buffer = new LinkedList (); public void write(DomObject o) { buffer.add(o); if(buffer.length > 1000) flush(); } public void flush() { //write buffer to database and clear it } }
现在您的流处理如下:
BufferedDatabaseWriter writer = new BufferedDatabaseWriter(); stream.forEach(o -> writer.write(o)); //if you have more streams stream2.forEach(o -> writer.write(o)); writer.flush();
如果要使用multithreading,可以运行flush异步。 从流中获取不能并行进行,但我认为无论如何都无法从流并行计算1000个元素。
您还可以扩展编写器以允许在构造函数中设置缓冲区大小,或者您可以使其实现AutoCloseable
并在尝试使用AutoCloseable
等运行它。 你从BufferedWriter
获得的好东西。
您可以创建项目流和给定块大小 的块 ( List
) 流
- 按块索引对项目进行分组(元素索引/块大小)
- 按索引排序块
- 仅将地图缩减为有序元素
码:
public static Stream> chunked(Stream stream, int chunkSize) { AtomicInteger index = new AtomicInteger(0); return stream.collect(Collectors.groupingBy(x -> index.getAndIncrement() / chunkSize)) .entrySet().stream() .sorted(Map.Entry.comparingByKey()).map(Map.Entry::getValue); }
用法示例:
Stream stream = IntStream.range(0, 100).mapToObj(Integer::valueOf); Stream> chunked = chunked(stream, 8);