如何映射Java流中的RuntimeExceptions以从无效流元素中“恢复”

想象一下,我正在构建一个库,它将接收一个整数流,并且所有库代码需要做的是返回一个字符串流,其中包含数字的字符串表示。

public Stream convertToString(Stream input) { return input.map(n -> String.valueOf(n)); } 

但是,想象某人决定使用以下代码调用我的函数:

 Stream input = Stream.of(1,2,3) .map(n -> { if (n %3 ) { throw new RuntimeException("3s are not welcome here!"); } else { return n; } } Stream output = convertToString(input); 

由于我希望我的API协议能够处理这种情况并且仍然返回一个字符串流,我想知道如何更改我的代码,以便它检测到流中存在exception并从exception中“恢复”通过将其映射到特殊的"NaN"值。

最后,我希望我的输出流为"1","2","NaN"

笔记:

  • 我不希望我的API必须为输入定义一个特殊的包装器。
  • 由于依赖性的影响,我不想依赖第三方库。
  • 我不知道流的大小或生成的速度有多大,所以我不能使用任何类型的缓存/预加载所有值。

这可能与Java 8 Streams一起使用吗?

有了Iterator,我想我可以做到:

 public class SafeIntegerToStringIterator implements Iterator { Iterator innerIterator; ... public String next() throws NoSuchElementException { try { return String.valueOf(this.innerIterator.next()); } catch (RuntimeException e) { return "NaN"; } } } ... public Iterator convertToString(Iterator input) { return new SafeIntegerToStringIterator(input); } 

谢谢

注意:请参阅本文末尾的编辑,修复了原始答案中的错误。 无论如何,我正在离开原来的答案,因为它对许多情况仍然有用,我认为它有助于解决OP的问题,至少有一些限制。


您使用Iterator方法是正确的方向。 可以按如下方式起草解决方案:将流转换为迭代器,按照您已经完成的方式包装迭代器,然后从包装器迭代器创建流,除了您应该使用Spliterator 。 这是代码:

 private static  Stream asNonThrowingStream( Stream stream, Supplier valueOnException) { // Get spliterator from original stream Spliterator spliterator = stream.spliterator(); // Return new stream from wrapper spliterator return StreamSupport.stream( // Extending AbstractSpliterator is enough for our purpose new Spliterators.AbstractSpliterator( spliterator.estimateSize(), spliterator.characteristics()) { // We only need to implement tryAdvance @Override public boolean tryAdvance(Consumer action) { try { return spliterator.tryAdvance(action); } catch (RuntimeException e) { action.accept(valueOnException.get()); return true; } } }, stream.isParallel()); } 

我们正在扩展AbstractSpliterator来包装原始流返回的spliterator。 我们只需要实现tryAdvance方法,该方法可以委托给原始的spliterator的tryAdvance方法,也可以捕获RuntimeException并使用提供的valueOnException值调用该操作。

Spliterator的契约指定如果操作被占用,则tryAdvance的返回值必须为true ,因此如果捕获了RuntimeException ,则意味着原始的spliterator已从其自己的tryAdvance方法中抛出它。 因此,在这种情况下我们返回true ,这意味着该元素无论如何都被消耗了。

通过将这些值作为参数传递给AbstractSpliterator的构造函数来保留原始spliterator的估计大小和特征。

最后,我们通过StreamSupport.stream方法从新的spliterator创建一个新流。 如果原始流也是并行的,则新流是并行的。

以下是如何使用上述方法:

 public Stream convertToString(Stream input) { return asNonThrowingStream(input.map(String::valueOf), () -> "NaN"); } 

编辑

根据Holger在下面的评论 , 用户holi-java提供了一个解决方案,避免了Holger指出的陷阱。

这是代码:

  Stream exceptionally(Stream source, BiConsumer> handler) { class ExceptionallySpliterator extends AbstractSpliterator implements Consumer { private Spliterator source; private T value; private long fence; ExceptionallySpliterator(Spliterator source) { super(source.estimateSize(), source.characteristics()); this.fence = source.getExactSizeIfKnown(); this.source = source; } @Override public Spliterator trySplit() { Spliterator it = source.trySplit(); return it == null ? null : new ExceptionallySpliterator(it); } @Override public boolean tryAdvance(Consumer action) { return fence != 0 && consuming(action); } private boolean consuming(Consumer action) { Boolean state = tryConsuming(action); if (state == null) { return true; } if (state) { action.accept(value); value = null; return true; } return false; } private Boolean tryConsuming(Consumer action) { fence--; try { return source.tryAdvance(this); } catch (Exception ex) { handler.accept(ex, action); return null; } } @Override public void accept(T value) { this.value = value; } } return stream(new ExceptionallySpliterator(source.spliterator()), source.isParallel()).onClose(source::close); } 

如果您想进一步了解此解决方案,请参阅测试 。

流中间操作中出现错误,像解决问题一样聪明的方法是使用代理设计模式 。 对于使用流api,您只需要通过StreamSupport#stream & Spliterators#spliterator(Iterator,long,int)将Iterator从源Stream 代理到另一个Stream ,例如:

 Stream result = convertToString(Stream.of("1", "bad", "2") .map(Integer::parseInt)); public Stream convertToString(Stream input) { return exceptionally(input, (e, action) -> action.accept(null)) .map(it -> String.format("%s", it == null ? "NaN" : it)); } 

当前版本Stream基于Iterator修复了Stream.of(T)错误,有关详细信息,请参阅我的问题 。

  Stream exceptionally(Stream source, BiConsumer> handler) { Spliterator s = source.spliterator(); return StreamSupport.stream( spliterator( exceptionally(s, handler), s.estimateSize(), s.characteristics() ), source.isParallel() ).onClose(source::close); } //Don't worried the thread-safe & robust since it is invisible for anyone private  Iterator exceptionally(Spliterator spliterator, BiConsumer> handler) { class ExceptionallyIterator implements Iterator, Consumer { private Iterator source = Spliterators.iterator(spliterator); private T value; private boolean valueInReady = false; private boolean stop = false; @Override public boolean hasNext() { while (true) { if (valueInReady) return true; if (stop) return false; try { return source.hasNext(); } catch (Exception ex) { stop = shouldStopTraversing(ex); handler.accept(ex, this); } } } @Override public T next() { return valueInReady ? dump() : source.next(); } private T dump() { T result = value; valueInReady = false; value = null; return result; } @Override public void accept(T value) { this.value = value; this.valueInReady = true; } } return new ExceptionallyIterator(); } 

 static final String BUG_CLASS = "java.util.stream.Streams$StreamBuilderImpl"; public static boolean shouldStopTraversing(Exception ex) { for (StackTraceElement element : ex.getStackTrace()) { if (BUG_CLASS.equals(element.getClassName())) { return true; } } return false; }