使用RXJava进行缓存处理

我正在尝试用rxJava实现这个工作流程,但我确定我是在滥用还是做错了。

  • 用户要求登录
  • 如果loginResult在缓存中可用,则“发出”缓存的LoginResult
  • 实际上,如果一切都成功,则实际执行对Web服务的请求并缓存结果
  • 如果发生错误,则最多重试3次,如果有第4次,则清除缓存。

这是我完整的代码片段。

public class LoginTask extends BaseBackground { private static CachedLoginResult cachedLoginResult = new CachedLoginResult(); private XMLRPCClient xmlrpcClient; private UserCredentialsHolder userCredentialsHolder; @Inject public LoginTask(XMLRPCClient client, UserCredentialsHolder userCredentialsHolder) { this.xmlrpcClient = client; this.userCredentialsHolder = userCredentialsHolder; } @Override public LoginResult performRequest() throws Exception { return UserApi.login( xmlrpcClient, userCredentialsHolder.getUserName(), userCredentialsHolder.getPlainPassword()); } @Override public Observable getObservable() { return cachedLoginResult.getObservable() .onErrorResumeNext( Observable.create( ((Observable.OnSubscribe) subscriber -> { try { if (!subscriber.isUnsubscribed()) { subscriber.onNext(performRequest()); // actually performRequest } subscriber.onCompleted(); } catch (Exception e) { subscriber.onError(e); } }) ) .doOnNext(cachedLoginResult::setLoginResult) .retry((attempts, t) -> attempts  cachedLoginResult.purgeCache()) ); } private static class CachedLoginResult { private LoginResult lr = null; private long when = 0; private CachedLoginResult() { } public boolean hasCache() { return lr != null && when + TimeUnit.MILLISECONDS.convert(30, TimeUnit.MINUTES) > System.currentTimeMillis(); } public void setLoginResult(LoginResult lr) { if (lr != null) { this.lr = lr; this.when = System.currentTimeMillis(); } } public void purgeCache() { this.lr = null; this.when = 0; } public Observable getObservable() { return Observable.create(new Observable.OnSubscribe() { @Override public void call(Subscriber subscriber) { if (!subscriber.isUnsubscribed()) { if (hasCache()) { subscriber.onNext(lr); subscriber.onCompleted(); } else { subscriber.onError(new RuntimeException("No cache")); } } } }); } } } 

由于我无法找到任何类似的例子,我开始在1天前与rxjava“玩”,我不确定我的实现。

感谢您的时间。

我觉得这段代码还不错,干得好:)

你在LoginTask使用Observable.create是正确的,因为否则调用的结果可以在内部缓存,然后retry就没有多大帮助……

我认为这对CachedLoginResultObservable是不必要的。 在这里,您可以使用Observable.justObservable.error实用程序方法简化代码,例如:

 public Observable getObservable() { if (hasCache()) { return Observable.just(lr); } else { return Observable.error(new RuntimeException("No cache")); } } 

注意: just存储您告诉它在内部发出的值,以便重新订阅将始终生成此值。 这就是我上面提到的,你不应该做Observable.just(performRequest()).retry(3)例如Observable.just(performRequest()).retry(3) ,因为performRequest只会被调用一次。

如果我理解正确,您希望执行一次登录并以被动方式缓存结果? 如果是这样,这里有一个例子我将如何做到这一点:

 import java.util.concurrent.ThreadLocalRandom; import rx.*; import rx.schedulers.Schedulers; import rx.subjects.AsyncSubject; public class CachingLogin { static class LoginResult { } /** Guarded by this. */ AsyncSubject cache; public Observable login(String username, String password) { AsyncSubject c; boolean doLogin = false; synchronized (this) { if (cache == null || cache.hasThrowable()) { cache = AsyncSubject.create(); doLogin = true; } c = cache; } if (doLogin) { Observable.just(1).subscribeOn(Schedulers.io()) .map(v -> loginAPI(username, password)) .retry(3).subscribe(c); } return c; } public void purgeCache() { synchronized (this) { cache = null; } } static LoginResult loginAPI(String username, String password) { if (ThreadLocalRandom.current().nextDouble() < 0.3) { throw new RuntimeException("Failed"); } return new LoginResult(); } }