Java 8流条件处理

我有兴趣将一个流分成两个或多个子流,并以不同的方式处理这些元素。 例如,(大)文本文件可能包含类型A的行和类型B的行,在这种情况下,我想要执行以下操作:

File.lines(path) .filter(line -> isTypeA(line)) .forEachTrue(line -> processTypeA(line)) .forEachFalse(line -> processTypeB(line)) 

以前是我尝试抽象的情况。 实际上我有一个非常大的文本文件,其中每一行都是针对正则表达式进行测试的; 如果该行通过,则处理它,而如果它被拒绝,那么我想更新一个计数器。 对拒绝字符串的进一步处理是我不仅仅使用filter

有没有合理的方法用流来做这个,还是我必须回退到循环? (我希望这也是并行运行,所以溪流是我的第一选择)。

Java 8流不是为支持这种操作而设计的。 从jdk :

应该只对一个流进行操作(调用中间或终端流操作)。 例如,这排除了“分叉”流,其中相同的源提供两个或更多个管道,或者同一个流的多个遍历。

如果你可以将它存储在内存中,你可以使用Collectors.partitioningBy如果你只有两种类型并使用Map 。 否则使用Collectors.groupingBy

只需测试每个元素,并采取相应的行动。

 lines.forEach(line -> { if (isTypeA(line)) processTypeA(line); else processTypeB(line); }); 

此行为可能隐藏在辅助方法中:

 public static  Consumer branch(Predicate test, Consumer t, Consumer f) { return o -> { if (test.test(o)) t.accept(o); else f.accept(o); }; } 

那么用法看起来像这样:

 lines.forEach(branch(this::isTypeA, this::processTypeA, this::processTypeB)); 

切线说明

Files.lines()方法不会关闭底层文件,因此您必须像这样使用它:

 try (Stream lines = Files.lines(path, encoding)) { lines.forEach(...); } 

Stream类型的变量为我抛出一点红旗,所以我更喜欢直接管理BufferedReader

 try (BufferedReader lines = Files.newBufferedReader(path, encoding)) { lines.lines().forEach(...); } 

虽然不鼓励使用行为参数中的副作用,但只要不存在干扰,就不会禁止它们,因此最简单但不是最干净的解决方案是在filter中计算:

 AtomicInteger rejected=new AtomicInteger(); Files.lines(path) .filter(line -> { boolean accepted=isTypeA(line); if(!accepted) rejected.incrementAndGet(); return accepted; }) // chain processing of matched lines 

只要您处理所有项目,结果将是一致的。 只有在使用短路终端操作(并行流)时,结果才会变得不可预测。

更新primefaces变量可能不是最有效的解决方案,但在处理来自文件的行的上下文中,开销可能可以忽略不计。

如果您想要一个干净,并行友好的解决方案,一种通用的方法是实现一个Collector ,它可以根据条件组合两个收集操作的处理。 这要求您能够将下游操作表示为收集器,但大多数流操作可以表示为收集器(并且趋势可能以这种方式表达所有操作,即Java 9将添加当前缺少的filteringflatMapping

你需要一对类型来保存两个结果,所以假设一个草图

 class Pair { final A a; final B b; Pair(A a, B b) { this.a=a; this.b=b; } } 

组合收集器实现看起来像

 public static  Collector> conditional( Predicate predicate, Collector whenTrue, Collector whenFalse) { Supplier s1=whenTrue.supplier(); Supplier s2=whenFalse.supplier(); BiConsumer a1=whenTrue.accumulator(); BiConsumer a2=whenFalse.accumulator(); BinaryOperator c1=whenTrue.combiner(); BinaryOperator c2=whenFalse.combiner(); Function f1=whenTrue.finisher(); Function f2=whenFalse.finisher(); return Collector.of( ()->new Pair<>(s1.get(), s2.get()), (p,t)->{ if(predicate.test(t)) a1.accept(pa, t); else a2.accept(pb, t); }, (p1,p2)->new Pair<>(c1.apply(p1.a, p2.a), c2.apply(p1.b, p2.b)), p -> new Pair<>(f1.apply(pa), f2.apply(pb))); } 

并且可以用于例如将匹配项收集到列表中并计算不匹配,如下所示:

 Pair, Long> p = Files.lines(path) .collect(conditional(line -> isTypeA(line), Collectors.toList(), Collectors.counting())); List matching=pa; long nonMatching=pb; 

收集器是并行友好的,并允许任意复杂的委托收集器,但请注意,对于当前实现, Files.lines返回的流可能在并行处理方面表现不佳,与“读取器#行()并行化由于不可配置的批处理而严重并行化分裂者中的规模政策“ 。 Java 9发行版计划进行改进。

我处理这个问题的方法不是将它分开,而是写下来

 Files.lines(path) .map(line -> { if (condition(line)) { return doThingA(line); } else { return doThingB(line); } })... 

细节取决于您想要做什么以及您打算如何做。

好吧,你可以干脆做

 Counter counter = new Counter(); File.lines(path) .forEach(line -> { if (isTypeA(line)) { processTypeA(line); } else { counter.increment(); } }); 

不是非常function风格,但它以与您的示例类似的方式实现。 当然,如果是并行的, Counter.increment()processTypeA()都必须是线程安全的。

实际上,您确实希望处理每一行,但根据某些条件(类型)对其进行不同的处理。

我认为这或多或少是实现它的function方式:

 public static void main(String[] args) { Arrays.stream(new int[] {1,2,3,4}).map(i -> processor(i).get()).forEach(System.out::println); } static Supplier processor(int i) { return tellType(i) ? () -> processTypeA(i) : () -> processTypeB(i); } static boolean tellType(int i) { return i % 2 == 0; } static int processTypeA(int i) { return i * 100; } static int processTypeB(int i) { return i * 10; } 

这是一种方法(忽略了强制条件处理到流中的注意事项),它将谓词和使用者包装成单个谓词副作用:

 public static class StreamProc { public static  Predicate process( Predicate condition, Consumer operation ) { Predicate p = t -> { operation.accept(t); return false; }; return (t) -> condition.test(t) ? p.test(t) : true; } } 

然后过滤流:

 someStream .filter( StreamProc.process( cond1, op1 ) ) .filter( StreamProc.process( cond2, op2 ) ) ... .collect( ... ) 

流中剩余的元素尚未处理。

例如,使用外部迭代的典型文件系统遍历如下所示

 File[] files = dir.listFiles(); for ( File f : files ) { if ( f.isDirectory() ) { this.processDir( f ); } else if ( f.isFile() ) { this.processFile( f ); } else { this.processErr( f ); } } 

随着流和内部迭代,这变成了

 Arrays.stream( dir.listFiles() ) .filter( StreamProc.process( f -> f.isDirectory(), this::processDir ) ) .filter( StreamProc.process( f -> f.isFile(), this::processFile ) ) .forEach( f -> this::processErr ); 

我希望Stream直接实现流程方法。 然后我们就可以了

 Arrays.stream( dir.listFiles() ) .process( f -> f.isDirectory(), this::processDir ) ) .process( f -> f.isFile(), this::processFile ) ) .forEach( f -> this::processErr ); 

思考?