并行消息使用Java8流API从令牌分隔的输入流解组
是否有可能使用Java8流API创建接口,如Message pullNext()
在分隔的输入流之上检测,逻辑步骤如下所示?
- 从字符输入流中读取分隔的标记
- 用于并行unmarshaller的令牌(一个令牌 – >一个消息)
- 邮件将重新排序以保留原始传入订单
- 消息可以使用前面提到的
pullNext()
有点破坏者和unmarshal阶段由并发池服务。 与此类似,也许(在InputStream之上实现stash
是要解决的问题):
Iterable stash = Iterables.cycle("one", "two", "three"); Iterator sink = StreamSupport.stream(stash.spliterator(), true). .parallel().map((x)->x+"-unmarshalled").iterator(); while (sink.hasNext()) process(sink.next()); // do something with decoded message
一个问题是parallelStream ForkJoinPool使用当前线程来贡献池。 这意味着没有当前线程可以自由执行此操作。
这样做的唯一现实方法是在单线程执行器中启动它以启动parallelStream执行forEach
写入固定大小的BlockingQueue,并且当前线程可以使用队列的结果。
我怀疑你最好重新编写你的代码,这样就pullNext
。 例如
Iterable stash = Iterables.cycle("one", "two", "three"); Iterator sink = StreamSupport.stream(stash.spliterator(), true). .parallel() .map(x -> x + "-unmarshalled") .forEach(x -> process(x));