当我在FlowableOnSubscribe类中调用onNext时,我的订阅者的onNext和onComplete函数不会运行
在使用RxJava 2的Android项目中,我在我的初始活动的onCreate
中创建了这样的Flowable:
Flowable.create(new MyFlowableOnSubscribe1(), BackpressureStrategy.BUFFER) .doOnComplete(new MyAction()) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new MySubscriber());
FlowableOnSubscribe的实现是:
public class MyFlowableOnSubscribe1 implements FlowableOnSubscribe { public static final String TAG = "XX MyFlOnSub1"; @Override public void subscribe(FlowableEmitter emitter) { Log.i(TAG, "subscribe"); emitter.onNext("hello"); emitter.onComplete(); } }
这是订户实施:
public class MySubscriber implements Subscriber { public static final String TAG = "XX MySubscriber"; @Override public void onSubscribe(Subscription s) { Log.i(TAG, "onSubscribe"); } @Override public void onComplete() { Log.i(TAG, "onComplete"); } @Override public void onError(Throwable e) { Log.i(TAG, "onError"); } @Override public void onNext(String s) { Log.i(TAG, "onNext: " + s); } }
行动实施是:
public class MyAction implements Action { public static final String TAG = "XX MyAction"; @Override public void run() { Log.i(TAG, "run"); } }
在我的输出中,我期待来自onNext
的日志声明,但我没有看到。 相反,这是我的整个输出:
02-23 17:56:31.334 24176-24176/com.ebelinski.rxjavaexperimentproject I/XX MySubscriber: onSubscribe 02-23 17:56:31.334 24176-24219/com.ebelinski.rxjavaexperimentproject I/XX MyFlOnSub1: subscribe 02-23 17:56:31.334 24176-24219/com.ebelinski.rxjavaexperimentproject I/XX MyAction: run
这表明onNext
永远不会运行,并且onComplete
甚至都不运行。 但MyAction
成功运行。
这是当我注释掉对onNext
的调用时会发生什么:
02-23 17:58:31.572 24176-24176/com.ebelinski.rxjavaexperimentproject I/XX MySubscriber: onSubscribe 02-23 17:58:31.572 24176-26715/com.ebelinski.rxjavaexperimentproject I/XX MyFlOnSub1: subscribe 02-23 17:58:31.572 24176-26715/com.ebelinski.rxjavaexperimentproject I/XX MyAction: run 02-23 17:58:31.652 24176-24176/com.ebelinski.rxjavaexperimentproject I/XX MySubscriber: onComplete
在这种情况下onNext
当然不会运行,但至少onComplete
运行。
我预计在两种情况下我都会看到onComplete
运行,而当我调用emitter.onNext
时我会看到onNext
运行。 我在这做错了什么?
您需要手动发出请求,否则在直接扩展Subscriber
时不会发出任何数据:
@Override public void onSubscribe(Subscription s) { Log.i(TAG, "onSubscribe"); s.request(Long.MAX_VALUE); }
或者,您可以扩展DisposableSubscriber
或ResourceSubscriber
。
你是如何测试的? 是否有可能在Observable有机会发出结果之前退出主线程,因为您正在使用Schedulers.IO
作为订阅线程。 此外,您的observeOn
将不会执行任何操作,因为它仅用于其他下游操作员。
- 如何在Android中获取Spinner Selected Item
- 绘制基于图块的地图
- DefaultHttpClient GET和POST命令Java Android
- Android ksoap2通过https
- RuntimeTypeAdapterFactory表示未定义“类型”
- 带有ProgressDialog的外部AsyncTask类
- 我可以在Android中设置AlarmManager的结束时间吗?
- java.lang.IllegalArgumentException:at android.view.Surface.unlockCanvasAndPost(Native Method)
- 使用Android拍摄单色照片(黑白)