RxJava:“java.lang.IllegalStateException:只允许一个订阅者!”

我正在使用RxJava来计算Android中某些传感器数据的标准化自动关联。 奇怪的是,我的代码抛出一个exception(“java.lang.IllegalStateException:只允许一个订阅者!”)并且我不确定该怎么做:我知道GroupedObservables在订阅我的多个订阅者时可能抛出此exception,但是我不认为我在任何地方使用这样的东西。

您可以在下面找到(最有可能)触发exception的方法:

public Observable normalizedAutoCorrelation(Observable observable, final int lag) { Observable laggedObservable = observable.skip(lag); Observable meanObservable = mean(observable, lag); Observable laggedMeanObservable = mean(laggedObservable, lag); Observable standardDeviationObservable = standardDeviation(observable, meanObservable, lag); Observable laggedStandardDeviationObservable = standardDeviation(laggedObservable, laggedMeanObservable, lag); Observable deviation = observable.zipWith(meanObservable, new Func2() { @Override public Float call(Float value, Float mean) { return value - mean; } }); Observable laggedDeviation = observable.zipWith(laggedMeanObservable, new Func2() { @Override public Float call(Float value, Float mean) { return value - mean; } }); Observable autoCorrelationPartObservable = deviation.zipWith(laggedDeviation, new Func2() { @Override public Float call(Float value, Float laggedValue) { return value * laggedValue; } }); Observable autoCorrelationObservable = flatten(autoCorrelationPartObservable.window(lag, 1).scan(new Func2<Observable, Observable, Observable>() { @Override public Observable call(Observable memoObservable, Observable observable) { if(memoObservable == null) return observable; return memoObservable.zipWith(observable, new Func2() { @Override public Float call(Float memo, Float value) { return memo + value; } }); } })); Observable normalizationObservable = standardDeviationObservable.zipWith(laggedStandardDeviationObservable, new Func2() { @Override public Float call(Float standardDeviation, Float laggedStandardDeviation) { return lag * standardDeviation * laggedStandardDeviation; } }); return autoCorrelationObservable.zipWith(normalizationObservable, new Func2() { @Override public Float call(Float autoCorrelation, Float normalization) { return autoCorrelation / normalization; } }); } 

这是我得到的堆栈跟踪:

 java.lang.IllegalStateException: Only one subscriber allowed! at rx.internal.operators.BufferUntilSubscriber$OnSubscribeAction.call(BufferUntilSubscriber.java:124) at rx.internal.operators.BufferUntilSubscriber$OnSubscribeAction.call(BufferUntilSubscriber.java:81) at rx.Observable.unsafeSubscribe(Observable.java:7304) at rx.internal.operators.OperatorZip$Zip.start(OperatorZip.java:210) at rx.internal.operators.OperatorZip$ZipSubscriber.onNext(OperatorZip.java:154) at rx.internal.operators.OperatorZip$ZipSubscriber.onNext(OperatorZip.java:120) at rx.internal.util.ScalarSynchronousObservable$1.call(ScalarSynchronousObservable.java:41) at rx.internal.util.ScalarSynchronousObservable$1.call(ScalarSynchronousObservable.java:30) at rx.Observable$1.call(Observable.java:145) at rx.Observable$1.call(Observable.java:137) at rx.Observable.unsafeSubscribe(Observable.java:7304) at rx.internal.operators.OperatorMerge$MergeSubscriber.handleNewSource(OperatorMerge.java:188) at rx.internal.operators.OperatorMerge$MergeSubscriber.onNext(OperatorMerge.java:158) at rx.internal.operators.OperatorMerge$MergeSubscriber.onNext(OperatorMerge.java:93) at rx.internal.operators.OperatorMap$1.onNext(OperatorMap.java:55) at rx.internal.operators.OperatorScan$2.onNext(OperatorScan.java:110) at rx.internal.operators.OperatorWindowWithSize$InexactSubscriber.onNext(OperatorWindowWithSize.java:173) at rx.internal.operators.OperatorZip$Zip.tick(OperatorZip.java:255) at rx.internal.operators.OperatorZip$Zip$InnerSubscriber.onNext(OperatorZip.java:326) at rx.internal.operators.OperatorZip$Zip.tick(OperatorZip.java:255) at rx.internal.operators.OperatorZip$Zip$InnerSubscriber.onNext(OperatorZip.java:326) at rx.internal.operators.OperatorMerge$InnerSubscriber.emit(OperatorMerge.java:635) at rx.internal.operators.OperatorMerge$InnerSubscriber.onNext(OperatorMerge.java:545) at rx.internal.operators.NotificationLite.accept(NotificationLite.java:150) at rx.internal.operators.TakeLastQueueProducer.emit(TakeLastQueueProducer.java:98) at rx.internal.operators.TakeLastQueueProducer.startEmitting(TakeLastQueueProducer.java:45) at rx.internal.operators.OperatorTakeLast$1.onCompleted(OperatorTakeLast.java:59) at rx.internal.operators.OperatorScan$2.onCompleted(OperatorScan.java:121) at rx.internal.operators.BufferUntilSubscriber.onCompleted(BufferUntilSubscriber.java:161) at rx.internal.operators.OperatorWindowWithSize$InexactSubscriber.onNext(OperatorWindowWithSize.java:183) at rx.internal.operators.OperatorSkip$1.onNext(OperatorSkip.java:58) at rx.internal.operators.OperatorMap$1.onNext(OperatorMap.java:55) at rx.subjects.SubjectSubscriptionManager$SubjectObserver.onNext(SubjectSubscriptionManager.java:224) at rx.subjects.PublishSubject.onNext(PublishSubject.java:121) at com.github.joopaue.smartphonesensing.SensorService$3.onSensorChanged(SensorService.java:102) at android.hardware.SystemSensorManager$SensorEventQueue.dispatchSensorEvent(SystemSensorManager.java:418) at android.os.MessageQueue.nativePollOnce(Native Method) at android.os.MessageQueue.next(MessageQueue.java:138) at android.os.Looper.loop(Looper.java:123) at android.app.ActivityThread.main(ActivityThread.java:5146) at java.lang.reflect.Method.invokeNative(Native Method) at java.lang.reflect.Method.invoke(Method.java:515) at com.android.internal.os.ZygoteInit$MethodAndArgsCaller.run(ZygoteInit.java:732) at com.android.internal.os.ZygoteInit.main(ZygoteInit.java:566) at dalvik.system.NativeStart.main(Native Method) 

我不认为我在这里做任何奇怪的事情:一些拉链,缩小,扫描和平面图。

我错过了一些完全明显的东西,是否有一些我在这里打破的隐藏规则或者它是RxJava中的错误? 谢谢!

PS。 如果您缺乏一些代码可以得出结论,那就问问我会发布!

在RxJava中,运算符groupBywindow返回一个只能订阅一次的observable,如果订阅,它们将其累积的内容重放到唯一的订阅者并切换到“热”模式。

这是在返回完全热的可观察值和风险缺失值之间的权衡,或返回允许任何订阅者但无限期保留累积内容的无界重放可观察量。

中间地带,即单个订户,冷热观察被认为是最不令人惊讶的行为,并且让开发人员可以选择应用更多运营商并选择两个极端之间的任何点:

 source.window(1, TimeUnit.SECONDS) .map(w -> w.publish()) .doOnNext(w -> w.connect()) .subscribe(...) source.window(1, TimeUnit.SECONDS) .map(w -> w.cache()) .subscribe(...)