异步迭代器

我有以下代码:

while(slowIterator.hasNext()) { performLengthTask(slowIterator.next()); } 

因为迭代器和任务都很慢,所以将它们放入单独的线程中是有意义的。 以下是Iterator包装器的快速而脏的尝试:

 class AsyncIterator implements Iterator { private final BlockingQueue queue = new ArrayBlockingQueue(100); private AsyncIterator(final Iterator delegate) { new Thread() { @Override public void run() { while(delegate.hasNext()) { queue.put(delegate.next()); // try/catch removed for brevity } } }.start(); } @Override public boolean hasNext() { return true; } @Override public T next() { return queue.take(); // try/catch removed for brevity } // ... remove() throws UnsupportedOperationException } 

但是,这种实现缺乏对“hasNext()”的支持。 当然可以阻止hasNext()方法阻塞,直到它知道是否返回true。 我可以在我的AsyncIterator中有一个peek对象,我可以更改hasNext()从队列中获取一个对象并让next()返回此窥视。 但是如果已达到委托迭代器的结尾,这将导致hasNext()无限期地阻塞。

我可以自己做线程通信,而不是使用ArrayBlockingQueue:

 private static class AsyncIterator implements Iterator { private final Queue queue = new LinkedList(); private boolean delegateDone = false; private AsyncIterator(final Iterator delegate) { new Thread() { @Override public void run() { while (delegate.hasNext()) { final T next = delegate.next(); synchronized (AsyncIterator.this) { queue.add(next); AsyncIterator.this.notify(); } } synchronized (AsyncIterator.this) { delegateDone = true; AsyncIterator.this.notify(); } } }.start(); } @Override public boolean hasNext() { synchronized (this) { while (queue.size() == 0 && !delegateDone) { try { wait(); } catch (InterruptedException e) { throw new Error(e); } } } return queue.size() > 0; } @Override public T next() { return queue.remove(); } @Override public void remove() { throw new UnsupportedOperationException(); } } 

然而,所有额外的同步,等待和通知并没有真正使代码更具可读性,并且很容易在某处隐藏竞争条件。

还有更好的想法?

更新

是的我知道常见的观察者/可观察的模式。 但是,通常的实现并不预见数据流的结束,它们不是迭代器。

我特别想要一个迭代器,因为实际上上面提到的循环存在于一个外部库中,它需要一个迭代器。

这是一个棘手的问题,但我想这次我得到了正确答案。 (我删除了我的第一个答案。)

答案是使用哨兵。 我没有测试过这段代码,为了清楚起见,我删除了try / catches:

 public class AsyncIterator implements Iterator { private BlockingQueue queue = new ArrayBlockingQueue(100); private T sentinel = (T) new Object(); private T next; private AsyncIterator(final Iterator delegate) { new Thread() { @Override public void run() { while (delegate.hasNext()) { queue.put(delegate.next()); } queue.put(sentinel); } }.start(); } @Override public boolean hasNext() { if (next != null) { return true; } next = queue.take(); // blocks if necessary if (next == sentinel) { return false; } return true; } @Override public T next() { T tmp = next; next = null; return tmp; } } 

这里的见解是hasNext()需要阻塞,直到下一个项目准备好。 它还需要某种退出条件,并且由于线程问题,它不能使用空队列或布尔标志。 哨兵在没有任何锁定或同步的情况下解决问题。

编辑:缓存“下一步”,因此可以多次调用hasNext()。

或者让自己避免头痛并使用RxJava:

 import java.util.Iterator; import rx.Observable; import rx.Scheduler; import rx.observables.BlockingObservable; import rx.schedulers.Schedulers; public class RxAsyncIteratorExample { public static void main(String[] args) throws InterruptedException { final Iterator slowIterator = new SlowIntegerIterator(3, 7300); // the scheduler you use here will depend on what behaviour you // want but io is probably what you want Iterator async = asyncIterator(slowIterator, Schedulers.io()); while (async.hasNext()) { performLengthTask(async.next()); } } public static  Iterator asyncIterator( final Iterator slowIterator, Scheduler scheduler) { final Observable tObservable = Observable.from(new Iterable() { @Override public Iterator iterator() { return slowIterator; } }).subscribeOn(scheduler); return BlockingObservable.from(tObservable).getIterator(); } /** * Uninteresting implementations... */ public static void performLengthTask(Integer integer) throws InterruptedException { log("Running task for " + integer); Thread.sleep(10000l); log("Finished task for " + integer); } private static class SlowIntegerIterator implements Iterator { private int count; private final long delay; public SlowIntegerIterator(int count, long delay) { this.count = count; this.delay = delay; } @Override public boolean hasNext() { return count > 0; } @Override public Integer next() { try { log("Starting long production " + count); Thread.sleep(delay); log("Finished long production " + count); } catch (InterruptedException e) { throw new IllegalStateException(e); } return count--; } @Override public void remove() { throw new UnsupportedOperationException(); } } private static final long startTime = System.currentTimeMillis(); private static void log(String s) { double time = ((System.currentTimeMillis() - startTime) / 1000d); System.out.println(time + ": " + s); } } 

给我:

 0.031: Starting long production 3 7.332: Finished long production 3 7.332: Starting long production 2 7.333: Running task for 3 14.633: Finished long production 2 14.633: Starting long production 1 17.333: Finished task for 3 17.333: Running task for 2 21.934: Finished long production 1 27.334: Finished task for 2 27.334: Running task for 1 37.335: Finished task for 1