在值上迭代两次(MapReduce)
我收到一个迭代器作为参数,我想迭代两次值。
public void reduce(Pair key, Iterator values, Context context)
可能吗 ? 怎么样 ? 签名是由我使用的框架(即Hadoop)强加的。
– 编辑 –
最后, reduce
方法的真实签名是iterable
。 我被这个wiki页面误导了(这实际上是我发现的wordcount的唯一非弃用(但错误)示例)。
如果要再次迭代,我们必须缓存迭代器中的值。 至少我们可以结合第一次迭代和缓存:
Iterator it = getIterator(); List cache = new ArrayList (); // first loop and caching while (it.hasNext()) { IntWritable value = it.next(); doSomethingWithValue(); cache.add(value); } // second loop for(IntWritable value:cache) { doSomethingElseThatCantBeDoneInFirstLoop(value); }
(只是为了添加代码答案,知道你在自己的评论中提到了这个解决方案;))
为什么没有缓存是不可能的: Iterator
是实现接口的东西,没有一个要求, Iterator
对象实际存储值。 迭代两次你必须重置迭代器(不可能)或克隆它(再次:不可能)。
举一个迭代器的例子,其中克隆/重置没有任何意义:
public class Randoms implements Iterator { private int counter = 10; @Override public boolean hasNext() { return counter > 0; } @Override public boolean next() { count--; return Math.random(); } @Override public boolean remove() { throw new UnsupportedOperationException("delete not supported"); } }
不幸的是,如果不按照Andreas_D的回答缓存值,这是不可能的。
即使使用新的API, Reducer
接收Iterable
而不是Iterator
,也不能迭代两次。 尝试以下方式非常诱人:
for (IntWritable value : values) { // first loop } for (IntWritable value : values) { // second loop }
但这实际上并不起作用。 从Iterable
的iterator()
方法中获得的iterator()
是特殊的。 这些值可能并非都在内存中; Hadoop可能正在从磁盘流式传输它们。 它们并非真正由Collection
支持,因此允许多次迭代是非常重要的。
您可以在Reducer
和ReduceContext
代码中ReduceContext
。
在某种类型的Collection
中缓存值可能是最简单的答案,但如果您在大型数据集上操作,则可以轻松地进行堆积。 如果您可以为我们提供有关您的问题的更多细节,我们可以帮助您找到不涉及多次迭代的解决方案。
重用给定的迭代器,没有。
但是当你在第一个位置迭代它们然后迭代构造的ArrayList时,你可以将值保存在ArrayList中(或者你可以通过使用一些花哨的Collection方法直接构建它,然后直接在ArrayList两次。这是一个品味问题)。
无论如何,你确定首先通过Iterator是一件好事吗? 迭代器习惯于对集合进行线性扫描,这就是他们不公开“倒带”方法的原因。
你应该传递一些不同的东西,比如Collection
或Iterable
,就像在另一个答案中已经提到的那样。
迭代器只是一次遍历。 某些迭代器类型是可复制的,您可以在遍历之前克隆它,但这不是一般情况。
你应该让你的函数取代Iterable
,如果你能实现的话。
如果方法签名无法更改,那么我建议使用Apache Commons IteratorUtils将Iterator转换为ListIterator。 考虑这个示例方法,对值进行两次迭代:
void iterateTwice(Iterator it) { ListIterator> lit = IteratorUtils.toListIterator(it); System.out.println("Using ListIterator 1st pass"); while(lit.hasNext()) System.out.println(lit.next()); // move the list iterator back to start while(lit.hasPrevious()) lit.previous(); System.out.println("Using ListIterator 2nd pass"); while(lit.hasNext()) System.out.println(lit.next()); }
使用上面的代码,我能够迭代值列表, 而无需在代码中保存List元素的副本。
如果我们尝试在Reducer中迭代两次,如下所示
ListIterator lit = IteratorUtils.toListIterator(it); System.out.println("Using ListIterator 1st pass"); while(lit.hasNext()) System.out.println(lit.next()); // move the list iterator back to start while(lit.hasPrevious()) lit.previous(); System.out.println("Using ListIterator 2nd pass"); while(lit.hasNext()) System.out.println(lit.next());
我们只会输出为
Using ListIterator 1st pass 5.3 4.9 5.3 4.6 4.6 Using ListIterator 2nd pass 5.3 5.3 5.3 5.3 5.3
为了以正确的方式获取它,我们应该像这样循环:
ArrayList cache = new ArrayList (); for (DoubleWritable aNum : values) { System.out.println("first iteration: " + aNum); DoubleWritable writable = new DoubleWritable(); writable.set(aNum.get()); cache.add(writable); } int size = cache.size(); for (int i = 0; i < size; ++i) { System.out.println("second iteration: " + cache.get(i)); }
产量
first iteration: 5.3 first iteration: 4.9 first iteration: 5.3 first iteration: 4.6 first iteration: 4.6 second iteration: 5.3 second iteration: 4.9 second iteration: 5.3 second iteration: 4.6 second iteration: 4.6
你可以做到这一点
MarkableIterator mitr = new MarkableIterator (values.iterator()); mitr.mark(); while (mitr.hasNext()) { //do your work } mitr.reset(); while(mitr.hasNext()) { //again do your work }
-
参考链接2
-
参考链接2
尝试这个:
ListIterator it = list.listIterator(); while(it.hasNext()){ while(it.hasNext()){ System.out.println("back " + it.next() +" "); } while(it.hasPrevious()){ it.previous(); } }
如果你想随时改变值,我想最好使用listIterator然后使用它的set()方法。
ListIterator lit = list.listIterator(); while(lit.hasNext()){ String elem = (String) lit.next(); System.out.println(elem); lit.set(elem+" modified"); } lit = null; lit = list.listIterator(); while(lit.hasNext()){ System.out.println(lit.next()); }
我只是在同一个列表迭代器对象上获得.listIterator()的另一个实例,而不是调用.previous()。
在搜索并做了很多尝试和错误之后,我找到了一个解决方案。
-
声明一个新的集合(比如
cache
)(链表或Arraylist或其他) -
在第一次迭代中,分配当前的迭代器,如下例所示:
cache.add(new Text(current.get()))
-
迭代缓存:
for (Text count : counts) { //counts is iterable object of Type Text cache.add(new Text(count.getBytes())); } for(Text value:cache) { // your logic.. }