如何终止Observable?

我有一个Observable,如果不满足某个条件,我想终止(即如果某个网站的响应不成功),这样我就可以重新查询网站,并再次调用observable。 我该怎么做呢?

这就是我想要做的:

Observable.create(new Observable.OnSubscribe() { @Override public void call(Subscriber subscriber) { //Perform network actions here if (!response.isSuccessful()) { //terminate this Observable so I can retrieve the token and call this observable again } } }); 

您可以使用Rx的重试运算符。 并且无需终止Observable。

定义了一个自定义exception:

 public class FailedException extends RuntimeException{ // ... } private static final int RETRY_COUNT = 3; // max retry counts Observable.create(new Observable.OnSubscribe() { @Override public void call(Subscriber subscriber) { //Perform network actions here if (!response.isSuccessful()) { // if response is unsucceed, invoke onError method and it will be stop emit data and into retry method. subscriber.onError(new FailedException()); } } }) .retry((integer, throwable) -> { // Retry network actions when failed. // if return true, Observable will be retry to network actions emit data; // if return false, you can process in onError() method of Subscribe. return throwable instanceof FailedException && integer < RETRY_COUNT; }) 

您可以在订阅前过滤结果。 创建observable时不要处理它。

检查observable.filter函数