并行消息使用Java8流API从令牌分隔的输入流解组

是否有可能使用Java8流API创建接口,如Message pullNext()在分隔的输入流之上检测,逻辑步骤如下所示?

  1. 从字符输入流中读取分隔的标记
  2. 用于并行unmarshaller的令牌(一个令牌 – >一个消息)
  3. 邮件将重新排序以保留原始传入订单
  4. 消息可以使用前面提到的pullNext()

有点破坏者和unmarshal阶段由并发池服务。 与此类似,也许(在InputStream之上实现stash是要解决的问题):

 Iterable stash = Iterables.cycle("one", "two", "three"); Iterator sink = StreamSupport.stream(stash.spliterator(), true). .parallel().map((x)->x+"-unmarshalled").iterator(); while (sink.hasNext()) process(sink.next()); // do something with decoded message 

一个问题是parallelStream ForkJoinPool使用当前线程来贡献池。 这意味着没有当前线程可以自由执行此操作。

这样做的唯一现实方法是在单线程执行器中启动它以启动parallelStream执行forEach写入固定大小的BlockingQueue,并且当前线程可以使用队列的结果。

我怀疑你最好重新编写你的代码,这样就pullNext 。 例如

 Iterable stash = Iterables.cycle("one", "two", "three"); Iterator sink = StreamSupport.stream(stash.spliterator(), true). .parallel() .map(x -> x + "-unmarshalled") .forEach(x -> process(x));