Java流并行化的可视化

通常,并不十分清楚并行流是如何将输入分成块以及块连接的顺序。 有没有办法可视化任何流源的整个过程,以更好地了解正在发生的事情? 假设我创建了一个这样的流:

Stream stream = IntStream.range(0, 100).boxed().parallel(); 

我想看到一些树状的结构:

  [0..99] _____/ \_____ | | [0..49] [50..99] __/ \__ __/ \__ | | | | [0..24] [25..49] [50..74] [75..99] 

这意味着整个输入范围[0..99]被分割为[0..49][50..99]范围,这些范围又分开。 当然这样的图应该反映Stream API的实际工作,所以如果我用这样的流执行一些实际操作,则应该以相同的方式执行拆分。

我想通过一个解决方案来增强Tagir的优秀答案 ,该解决方案用于监视端的分裂甚至是中间操作(当前流API实现强加了一些限制):

 public static  Stream proxy(Stream src) { Class> sClass=(Class)Stream.class; Class> spClass=(Class)Spliterator.class; return proxy(src, sClass, spClass, StreamSupport::stream); } public static IntStream proxy(IntStream src) { return proxy(src, IntStream.class, Spliterator.OfInt.class, StreamSupport::intStream); } public static LongStream proxy(LongStream src) { return proxy(src, LongStream.class, Spliterator.OfLong.class, StreamSupport::longStream); } public static DoubleStream proxy(DoubleStream src) { return proxy(src, DoubleStream.class, Spliterator.OfDouble.class, StreamSupport::doubleStream); } static final Object EMPTY=new StringBuilder("empty"); static , Sp extends Spliterator> S proxy( S src, Class sc, Class spc, BiFunction f) { final class Node implements InvocationHandler,Runnable, Consumer, IntConsumer, LongConsumer, DoubleConsumer { final Class type; Spliterator src; Object first=EMPTY, last=EMPTY; Node left, right; Object currConsumer; public Node(Spliterator src, Class type) { this.src = src; this.type=type; } private void value(Object t) { if(first==EMPTY) first=t; last=t; } public void accept(Object t) { value(t); ((Consumer)currConsumer).accept(t); } public void accept(int t) { value(t); ((IntConsumer)currConsumer).accept(t); } public void accept(long t) { value(t); ((LongConsumer)currConsumer).accept(t); } public void accept(double t) { value(t); ((DoubleConsumer)currConsumer).accept(t); } public void run() { System.out.println(); finish().forEach(System.out::println); } public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { Node curr=this; while(curr.right!=null) curr=curr.right; if(method.getName().equals("tryAdvance")||method.getName().equals("forEachRemaining")) { curr.currConsumer=args[0]; args[0]=curr; } if(method.getName().equals("trySplit")) { Spliterator s=curr.src.trySplit(); if(s==null) return null; Node pfx=new Node<>(s, type); pfx.left=curr.left; curr.left=pfx; curr.right=new Node<>(curr.src, type); src=null; return pfx.create(); } return method.invoke(curr.src, args); } Object create() { return Proxy.newProxyInstance(null, new Class[]{type}, this); } String pad(String s, int left, int len) { if (len == s.length()) return s; char[] result = new char[len]; Arrays.fill(result, ' '); s.getChars(0, s.length(), result, left); return new String(result); } public List finish() { String cur = toString(); if (left == null) { return Collections.singletonList(cur); } List l = left.finish(); List r = right.finish(); int len1 = l.get(0).length(); int len2 = r.get(0).length(); int totalLen = len1 + len2 + 1; int leftAdd = 0; if (cur.length() < totalLen) { cur = pad(cur, (totalLen - cur.length()) / 2, totalLen); } else { leftAdd = (cur.length() - totalLen) / 2; totalLen = cur.length(); } List result = new ArrayList<>(); result.add(cur); char[] dashes = new char[totalLen]; Arrays.fill(dashes, ' '); Arrays.fill(dashes, len1 / 2 + leftAdd + 1, len1 + len2 / 2 + 1 + leftAdd, '_'); int mid = totalLen / 2; dashes[mid] = '/'; dashes[mid + 1] = '\\'; result.add(new String(dashes)); Arrays.fill(dashes, ' '); dashes[len1 / 2 + leftAdd] = '|'; dashes[len1 + len2 / 2 + 1 + leftAdd] = '|'; result.add(new String(dashes)); int maxSize = Math.max(l.size(), r.size()); for (int i = 0; i < maxSize; i++) { String lstr = l.size() > i ? l.get(i) : String.format("%" + len1 + "s", ""); String rstr = r.size() > i ? r.get(i) : String.format("%" + len2 + "s", ""); result.add(pad(lstr + " " + rstr, leftAdd, totalLen)); } return result; } private Object first() { if(left==null) return first; Object o=left.first(); if(o==EMPTY) o=right.first(); return o; } private Object last() { if(right==null) return last; Object o=right.last(); if(o==EMPTY) o=left.last(); return o; } public String toString() { Object o=first(), p=last(); return o==EMPTY? "(empty)": "["+o+(o!=p? ".."+p+']': "]"); } } Node n=new Node<>(src.spliterator(), spc); Sp sp=(Sp)Proxy.newProxyInstance(null, new Class[]{n.type}, n); return f.apply(sp, true).onClose(n); } 

它允许使用代理来包装spliterator,该代理将监视分割操作和遇到的对象。 块处理的逻辑类似于Tagir,事实上,我复制了他的结果打印例程。

您可以传入流的源或已附加相同操作的流。 (在后一种情况下,您应尽早将.parallel()应用于流)。 正如Tagir所解释的,在大多数情况下,拆分行为取决于源和配置的并行性,因此,在大多数情况下,监视中间状态可能会更改值,但不会更改已处理的块:

 try(IntStream is=proxy(IntStream.range(0, 100).parallel())) { is.filter(i -> i/20%2==0) .mapToObj(ix->"\""+ix+'"') .forEach(s->{}); } 

将打印

  [0..99] ___________________________________/\________________________________ | | [0..49] [50..99] _________________/\______________ _________________/\________________ | | | | [0..24] [25..49] [50..74] [75..99] ________/\_____ ________/\_______ ________/\_______ ________/\_______ | | | | | | | | [0..11] [12..24] [25..36] [37..49] [50..61] [62..74] [75..86] [87..99] ___/\_ ___/\___ ___/\___ ___/\___ ___/\___ ___/\___ ___/\___ ___/\___ | | | | | | | | | | | | | | | | [0..5] [6..11] [12..17] [18..24] [25..30] [31..36] [37..42] [43..49] [50..55] [56..61] [62..67] [68..74] [75..80] [81..86] [87..92] [93..99] 

 try(Stream s=proxy(IntStream.range(0, 100).parallel().filter(i -> i/20%2==0) .mapToObj(ix->"\""+ix+'"'))) { s.forEach(str->{}); } 

将打印

  ["0".."99"] ___________________________________________/\___________________________________________ | | ["0".."49"] ["50".."99"] ____________________/\______________________ ______________________/\___________________ | | | | ["0".."19"] ["40".."49"] ["50".."59"] ["80".."99"] ____________/\_________ ____________/\______ _______/\___________ ____________/\________ | | | | | | | | ["0".."11"] ["12".."19"] (empty) ["40".."49"] ["50".."59"] (empty) ["80".."86"] ["87".."99"] _____/\___ _____/\_____ ___/\__ _____/\_____ _____/\_____ ___/\__ _____/\__ _____/\_____ | | | | | | | | | | | | | | | | ["0".."5"] ["6".."11"] ["12".."17"] ["18".."19"] (empty) (empty) ["40".."42"] ["43".."49"] ["50".."55"] ["56".."59"] (empty) (empty) ["80"] ["81".."86"] ["87".."92"] ["93".."99"] 

正如我们在这里看到的,我们正在监视.filter(…).mapToObj(…)的结果,但是块明确地由源确定,可能根据filter的条件在下游产生空块。

请注意,我们可以将源监控与Tagir的收集器监控结合起来:

 try(IntStream s=proxy(IntStream.range(0, 100))) { s.parallel().filter(i -> i/20%2==0) .boxed().collect(parallelVisualize()) .forEach(System.out::println); } 

这将打印(请注意首先打印collect输出):

  [0..99] ________________________________/\_______________________________ | | [0..49] [50..99] ________________/\______________ _______________/\_______________ | | | | [0..19] [40..49] [50..59] [80..99] ________/\_____ ________/\______ _______/\_______ ________/\_____ | | | | | | | | [0..11] [12..19] (empty) [40..49] [50..59] (empty) [80..86] [87..99] ___/\_ ___/\___ ___/\__ ___/\___ ___/\___ ___/\__ ___/\_ ___/\___ | | | | | | | | | | | | | | | | [0..5] [6..11] [12..17] [18..19] (empty) (empty) [40..42] [43..49] [50..55] [56..59] (empty) (empty) [80] [81..86] [87..92] [93..99] [0..99] ___________________________________/\________________________________ | | [0..49] [50..99] _________________/\______________ _________________/\________________ | | | | [0..24] [25..49] [50..74] [75..99] ________/\_____ ________/\_______ ________/\_______ ________/\_______ | | | | | | | | [0..11] [12..24] [25..36] [37..49] [50..61] [62..74] [75..86] [87..99] ___/\_ ___/\___ ___/\___ ___/\___ ___/\___ ___/\___ ___/\___ ___/\___ | | | | | | | | | | | | | | | | [0..5] [6..11] [12..17] [18..24] [25..30] [31..36] [37..42] [43..49] [50..55] [56..61] [62..67] [68..74] [75..80] [81..86] [87..92] [93..99] 

我们可以清楚地看到处理的块如何匹配,但是在过滤之后,一些块具有较少的元素,其中一些是完全空的。

这是展示的地方,两种监测方式可以产生显着差异:

 try(DoubleStream is=proxy(DoubleStream.iterate(0, i->i+1)).parallel().limit(100)) { is.boxed() .collect(parallelVisualize()) .forEach(System.out::println); } 
  [0.0..99.0] ___________________________________________________/\________________________________________________ | | [0.0..49.0] [50.0..99.0] _________________________/\______________________ _________________________/\________________________ | | | | [0.0..24.0] [25.0..49.0] [50.0..74.0] [75.0..99.0] ____________/\_________ ____________/\___________ ____________/\___________ ____________/\___________ | | | | | | | | [0.0..11.0] [12.0..24.0] [25.0..36.0] [37.0..49.0] [50.0..61.0] [62.0..74.0] [75.0..86.0] [87.0..99.0] _____/\___ _____/\_____ _____/\_____ _____/\_____ _____/\_____ _____/\_____ _____/\_____ _____/\_____ | | | | | | | | | | | | | | | | [0.0..5.0] [6.0..11.0] [12.0..17.0] [18.0..24.0] [25.0..30.0] [31.0..36.0] [37.0..42.0] [43.0..49.0] [50.0..55.0] [56.0..61.0] [62.0..67.0] [68.0..74.0] [75.0..80.0] [81.0..86.0] [87.0..92.0] [93.0..99.0] [0.0..10239.0] _____________________________/\_____ | | [0.0..1023.0] [1024.0..10239.0] ____________________/\_______ | | [1024.0..3071.0] [3072.0..10239.0] ____________/\______ | | [3072.0..6143.0] [6144.0..10239.0] ___/\_______ | | [6144.0..10239.0] (empty) 

这说明了Tagir已经解释过的 ,未知大小的流分裂得很差,甚至limit(…)提供了良好估计的可能性(实际上,无限+限制在理论上是可预测的),实现没有任何优势它

使用1024的批量大小将源拆分为块,在每次拆分后增加1024 ,从而创建超出limit范围的块。 我们还可以看到每次分离前缀的方式。

但是当我们查看终端分割输出时,我们可以看到这些多余的块之间已经被丢弃,并且第一个块的另一个分裂已经发生。 由于这个块是由第一个拆分中的默认实现填充的中间数组的后端,我们在源处没有注意到它,但是我们可以在终端操作中看到该数组已被拆分(不出所料)很平衡。

所以我们需要两种监控方式来全面了解……

当前流API实现使用收集器组合器以与先前拆分的方式完全相同的方式组合中间结果。 分裂策略还取决于源和公共池并行度级别,但不依赖于所使用的精确还原操作(对于reducecollectforEachcount等相同)。 依靠这一点,创建可视化收集器并不是很困难:

 public static Collector> parallelVisualize() { class Range { private String first, last; private Range left, right; void accept(Object obj) { if (first == null) first = obj.toString(); else last = obj.toString(); } Range combine(Range that) { Range p = new Range(); p.first = first == null ? that.first : first; p.last = Stream .of(that.last, that.first, this.last, this.first) .filter(Objects::nonNull).findFirst().orElse(null); p.left = this; p.right = that; return p; } String pad(String s, int left, int len) { if (len == s.length()) return s; char[] result = new char[len]; Arrays.fill(result, ' '); s.getChars(0, s.length(), result, left); return new String(result); } public List finish() { String cur = toString(); if (left == null) { return Collections.singletonList(cur); } List l = left.finish(); List r = right.finish(); int len1 = l.get(0).length(); int len2 = r.get(0).length(); int totalLen = len1 + len2 + 1; int leftAdd = 0; if (cur.length() < totalLen) { cur = pad(cur, (totalLen - cur.length()) / 2, totalLen); } else { leftAdd = (cur.length() - totalLen) / 2; totalLen = cur.length(); } List result = new ArrayList<>(); result.add(cur); char[] dashes = new char[totalLen]; Arrays.fill(dashes, ' '); Arrays.fill(dashes, len1 / 2 + leftAdd + 1, len1 + len2 / 2 + 1 + leftAdd, '_'); int mid = totalLen / 2; dashes[mid] = '/'; dashes[mid + 1] = '\\'; result.add(new String(dashes)); Arrays.fill(dashes, ' '); dashes[len1 / 2 + leftAdd] = '|'; dashes[len1 + len2 / 2 + 1 + leftAdd] = '|'; result.add(new String(dashes)); int maxSize = Math.max(l.size(), r.size()); for (int i = 0; i < maxSize; i++) { String lstr = l.size() > i ? l.get(i) : String.format("%" + len1 + "s", ""); String rstr = r.size() > i ? r.get(i) : String.format("%" + len2 + "s", ""); result.add(pad(lstr + " " + rstr, leftAdd, totalLen)); } return result; } public String toString() { if (first == null) return "(empty)"; else if (last == null) return "[" + first + "]"; return "[" + first + ".." + last + "]"; } } return Collector.of(Range::new, Range::accept, Range::combine, Range::finish); } 

这是使用4核机器的这个收集器获得的一些有趣的结果(结果将在具有不同数量的availableProcessors()机器上不同)。

拆分简单范围

 IntStream.range(0, 100) .boxed().parallel().collect(parallelVisualize()) .forEach(System.out::println); 

甚至分成16个任务:

  [0..99] ___________________________________/\________________________________ | | [0..49] [50..99] _________________/\______________ _________________/\________________ | | | | [0..24] [25..49] [50..74] [75..99] ________/\_____ ________/\_______ ________/\_______ ________/\_______ | | | | | | | | [0..11] [12..24] [25..36] [37..49] [50..61] [62..74] [75..86] [87..99] ___/\_ ___/\___ ___/\___ ___/\___ ___/\___ ___/\___ ___/\___ ___/\___ | | | | | | | | | | | | | | | | [0..5] [6..11] [12..17] [18..24] [25..30] [31..36] [37..42] [43..49] [50..55] [56..61] [62..67] [68..74] [75..80] [81..86] [87..92] [93..99] 

拆分两个流串联

 IntStream .concat(IntStream.range(0, 10), IntStream.range(10, 100)) .boxed().parallel().collect(parallelVisualize()) .forEach(System.out::println); 

如您所见,首先拆分取消连接流:

  [0..99] _______________________________________________________________________/\_____ | | [0..9] [10..99] __/\__ ___________________________________/\__________________________________ | | | | [0..4] [5..9] [10..54] [55..99] _________________/\________________ _________________/\________________ | | | | [10..31] [32..54] [55..76] [77..99] ________/\_______ ________/\_______ ________/\_______ ________/\_______ | | | | | | | | [10..20] [21..31] [32..42] [43..54] [55..65] [66..76] [77..87] [88..99] ___/\___ ___/\___ ___/\___ ___/\___ ___/\___ ___/\___ ___/\___ ___/\___ | | | | | | | | | | | | | | | | [10..14] [15..20] [21..25] [26..31] [32..36] [37..42] [43..48] [49..54] [55..59] [60..65] [66..70] [71..76] [77..81] [82..87] [88..93] [94..99] 

在串联之前执行中间操作(boxed())的两个流连接的拆分

 Stream.concat(IntStream.range(0, 50).boxed().parallel(), IntStream.range(50, 100).boxed()) .collect(parallelVisualize()) .forEach(System.out::println); 

如果其中一个输入流在连接之前没有变为并行模式,则它完全拒绝拆分:

  [0..99] ___/\_________________________________ | | [0..49] [50..99] _________________/\______________ | | [0..24] [25..49] ________/\_____ ________/\_______ | | | | [0..11] [12..24] [25..36] [37..49] ___/\_ ___/\___ ___/\___ ___/\___ | | | | | | | | [0..5] [6..11] [12..17] [18..24] [25..30] [31..36] [37..42] [43..49] 

拆分平面图

 Stream.of(0, 50) .flatMap(start -> IntStream.range(start, start+50).boxed().parallel()) .parallel().collect(parallelVisualize()) .forEach(System.out::println); 

平面图从不在嵌套流内并行化:

  [0..99] ____/\__ | | [0..49] [50..99] 

来自7000个元素的未知大小的迭代器的流 (参见上下文的答案 ):

 StreamSupport .stream(Spliterators.spliteratorUnknownSize( IntStream.range(0, 7000).iterator(), Spliterator.ORDERED), true) .collect(parallelVisualize()).forEach(System.out::println); 

分裂真的很糟糕,每个人都在等待最大的部分[3072..6143]:

  [0..6999] _______________________/\___ | | [0..1023] [1024..6999] ________________/\____ | | [1024..3071] [3072..6999] _________/\_____ | | [3072..6143] [6144..6999] ___/\____ | | [6144..6999] (empty) 

已知大小的迭代器源

 StreamSupport .stream(Spliterators.spliterator(IntStream.range(0, 7000) .iterator(), 7000, Spliterator.ORDERED), true) .collect(parallelVisualize()).forEach(System.out::println); 

提供尺寸可以更好地解锁进一步的分裂:

  [0..6999] ______________________________________________________________________________________________/\________ | | [0..1023] [1024..6999] _____/\__ ____________________________________________________________________/\________________________ | | | | [0..511] [512..1023] [1024..3071] [3072..6999] ____________/\___________ ________________/\__________________________________________________ | | | | [1024..2047] [2048..3071] [3072..6143] [6144..6999] _____/\_____ _____/\_____ _________________________/\________________________ ___/\___________ | | | | | | | | [1024..1535] [1536..2047] [2048..2559] [2560..3071] [3072..4607] [4608..6143] [6144..6999] (empty) ____________/\___________ ____________/\___________ _____/\_____ | | | | | | [3072..3839] [3840..4607] [4608..5375] [5376..6143] [6144..6571] [6572..6999] _____/\_____ _____/\_____ _____/\_____ _____/\_____ | | | | | | | | [3072..3455] [3456..3839] [3840..4223] [4224..4607] [4608..4991] [4992..5375] [5376..5759] [5760..6143] 

这种收集器的进一步改进可以生成图形图像(如svg),跟踪处理每个节点的线程,显示每个组的元素数量等等。 如果你愿意,可以使用它。