在值上迭代两次(MapReduce)

我收到一个迭代器作为参数,我想迭代两次值。

public void reduce(Pair key, Iterator values, Context context) 

可能吗 ? 怎么样 ? 签名是由我使用的框架(即Hadoop)强加的。

– 编辑 –
最后, reduce方法的真实签名是iterable 。 我被这个wiki页面误导了(这实际上是我发现的wordcount的唯一非弃用(但错误)示例)。

如果要再次迭代,我们必须缓存迭代器中的值。 至少我们可以结合第一次迭代和缓存:

 Iterator it = getIterator(); List cache = new ArrayList(); // first loop and caching while (it.hasNext()) { IntWritable value = it.next(); doSomethingWithValue(); cache.add(value); } // second loop for(IntWritable value:cache) { doSomethingElseThatCantBeDoneInFirstLoop(value); } 

(只是为了添加代码答案,知道你在自己的评论中提到了这个解决方案;))


为什么没有缓存是不可能的: Iterator是实现接口的东西,没有一个要求, Iterator对象实际存储值。 迭代两次你必须重置迭代器(不可能)或克隆它(再次:不可能)。

举一个迭代器的例子,其中克隆/重置没有任何意义:

 public class Randoms implements Iterator { private int counter = 10; @Override public boolean hasNext() { return counter > 0; } @Override public boolean next() { count--; return Math.random(); } @Override public boolean remove() { throw new UnsupportedOperationException("delete not supported"); } } 

不幸的是,如果不按照Andreas_D的回答缓存值,这是不可能的。

即使使用新的API, Reducer接收Iterable而不是Iterator ,也不能迭代两次。 尝试以下方式非常诱人:

 for (IntWritable value : values) { // first loop } for (IntWritable value : values) { // second loop } 

但这实际上并不起作用。 从Iterableiterator()方法中获得的iterator()是特殊的。 这些值可能并非都在内存中; Hadoop可能正在从磁盘流式传输它们。 它们并非真正由Collection支持,因此允许多次迭代是非常重要的。

您可以在ReducerReduceContext代码中ReduceContext

在某种类型的Collection中缓存值可能是最简单的答案,但如果您在大型数据集上操作,则可以轻松地进行堆积。 如果您可以为我们提供有关您的问题的更多细节,我们可以帮助您找到不涉及多次迭代的解决方案。

重用给定的迭代器,没有。

但是当你在第一个位置迭代它们然后迭代构造的ArrayList时,你可以将值保存在ArrayList中(或者你可以通过使用一些花哨的Collection方法直接构建它,然后直接在ArrayList两次。这是一个品味问题)。

无论如何,你确定首先通过Iterator是一件好事吗? 迭代器习惯于对集合进行线性扫描,这就是他们不公开“倒带”方法的原因。

你应该传递一些不同的东西,比如CollectionIterable ,就像在另一个答案中已经提到的那样。

迭代器只是一次遍历。 某些迭代器类型是可复制的,您可以在遍历之前克隆它,但这不是一般情况。

你应该让你的函数取代Iterable ,如果你能实现的话。

如果方法签名无法更改,那么我建议使用Apache Commons IteratorUtils将Iterator转换为ListIterator。 考虑这个示例方法,对值进行两次迭代:

 void iterateTwice(Iterator it) { ListIterator lit = IteratorUtils.toListIterator(it); System.out.println("Using ListIterator 1st pass"); while(lit.hasNext()) System.out.println(lit.next()); // move the list iterator back to start while(lit.hasPrevious()) lit.previous(); System.out.println("Using ListIterator 2nd pass"); while(lit.hasNext()) System.out.println(lit.next()); } 

使用上面的代码,我能够迭代值列表, 而无需在代码中保存List元素的副本。

如果我们尝试在Reducer中迭代两次,如下所示

 ListIterator lit = IteratorUtils.toListIterator(it); System.out.println("Using ListIterator 1st pass"); while(lit.hasNext()) System.out.println(lit.next()); // move the list iterator back to start while(lit.hasPrevious()) lit.previous(); System.out.println("Using ListIterator 2nd pass"); while(lit.hasNext()) System.out.println(lit.next()); 

我们只会输出为

 Using ListIterator 1st pass 5.3 4.9 5.3 4.6 4.6 Using ListIterator 2nd pass 5.3 5.3 5.3 5.3 5.3 

为了以正确的方式获取它,我们应该像这样循环:

 ArrayList cache = new ArrayList(); for (DoubleWritable aNum : values) { System.out.println("first iteration: " + aNum); DoubleWritable writable = new DoubleWritable(); writable.set(aNum.get()); cache.add(writable); } int size = cache.size(); for (int i = 0; i < size; ++i) { System.out.println("second iteration: " + cache.get(i)); } 

产量

 first iteration: 5.3 first iteration: 4.9 first iteration: 5.3 first iteration: 4.6 first iteration: 4.6 second iteration: 5.3 second iteration: 4.9 second iteration: 5.3 second iteration: 4.6 second iteration: 4.6 

你可以做到这一点

 MarkableIterator mitr = new MarkableIterator(values.iterator()); mitr.mark(); while (mitr.hasNext()) { //do your work } mitr.reset(); while(mitr.hasNext()) { //again do your work } 
  1. 参考链接2

  2. 参考链接2

尝试这个:

  ListIterator it = list.listIterator(); while(it.hasNext()){ while(it.hasNext()){ System.out.println("back " + it.next() +" "); } while(it.hasPrevious()){ it.previous(); } } 

如果你想随时改变值,我想最好使用listIterator然后使用它的set()方法。

 ListIterator lit = list.listIterator(); while(lit.hasNext()){ String elem = (String) lit.next(); System.out.println(elem); lit.set(elem+" modified"); } lit = null; lit = list.listIterator(); while(lit.hasNext()){ System.out.println(lit.next()); } 

我只是在同一个列表迭代器对象上获得.listIterator()的另一个实例,而不是调用.previous()。

在搜索并做了很多尝试和错误之后,我找到了一个解决方案。

  1. 声明一个新的集合(比如cache )(链表或Arraylist或其他)

  2. 在第一次迭代中,分配当前的迭代器,如下例所示:

     cache.add(new Text(current.get())) 
  3. 迭代缓存:

     for (Text count : counts) { //counts is iterable object of Type Text cache.add(new Text(count.getBytes())); } for(Text value:cache) { // your logic.. }