如何终止Observable?
我有一个Observable,如果不满足某个条件,我想终止(即如果某个网站的响应不成功),这样我就可以重新查询网站,并再次调用observable。 我该怎么做呢?
这就是我想要做的:
Observable.create(new Observable.OnSubscribe() { @Override public void call(Subscriber subscriber) { //Perform network actions here if (!response.isSuccessful()) { //terminate this Observable so I can retrieve the token and call this observable again } } });
您可以使用Rx的重试运算符。 并且无需终止Observable。
定义了一个自定义exception:
public class FailedException extends RuntimeException{ // ... } private static final int RETRY_COUNT = 3; // max retry counts Observable.create(new Observable.OnSubscribe() { @Override public void call(Subscriber super String> subscriber) { //Perform network actions here if (!response.isSuccessful()) { // if response is unsucceed, invoke onError method and it will be stop emit data and into retry method. subscriber.onError(new FailedException()); } } }) .retry((integer, throwable) -> { // Retry network actions when failed. // if return true, Observable will be retry to network actions emit data; // if return false, you can process in onError() method of Subscribe. return throwable instanceof FailedException && integer < RETRY_COUNT; })
您可以在订阅前过滤结果。 创建observable时不要处理它。
检查observable.filter函数