如何将基于回调的API转换为基于Observable的API?

我正在使用的库使用回调对象发出一系列Message对象。

 interface MessageCallback { onMessage(Message message); } 

使用一些libraryObject.setCallback(MessageCallback)调用添加回调,并使用非阻塞libraryObject.start()方法调用启动该进程。

创建将发出这些对象的Observable的最佳方法是什么?

如果libraryObject.start()阻塞怎么办?

我认为你需要这样的东西(在scala中给出的例子)

 import rx.lang.scala.{Observable, Subscriber} case class Message(message: String) trait MessageCallback { def onMessage(message: Message) } object LibraryObject { def setCallback(callback: MessageCallback): Unit = { ??? } def removeCallback(callback: MessageCallback): Unit = { ??? } def start(): Unit = { ??? } } def messagesSource: Observable[Message] = Observable((subscriber: Subscriber[Message]) ⇒ { val callback = new MessageCallback { def onMessage(message: Message) { subscriber.onNext(message) } } LibraryObject.setCallback(callback) subscriber.add { LibraryObject.removeCallback(callback) } }) 

至于阻塞/非阻塞start() :通常基于回调的体系结构将回调订阅和进程启动分开。 在这种情况下,您可以完全独立于start()过程时创建任意数量的messageSource 。 你决定是否分叉也是决定性的。 你的建筑与此不同吗?

您还应该以某种方式处理完成过程。 最好的方法是在MessageCallback接口上添加一个onCompleted处理程序。 如果要处理错误,还要添加onError处理程序。 现在,你刚刚宣布了RxJava的基本建筑石, 观察员 🙂

1.回调调用无限次

我们可以像这样将它转换为Observable(RxJava 2的例子):

 Observable source = Observable.create(emitter -> { MessageCallback callback = message -> emitter.onNext(message); libraryObject.setCallback(callback); Schedulers.io().scheduleDirect(libraryObject::start); emitter.setCancellable(() -> libraryObject.removeCallback(callback)); }) .share(); // make it hot 

share使得这个可观察的热点 ,即多个订阅者将共享单个订阅,即最多只有一个使用libraryObject注册的回调。

我使用io调度程序来安排从后台线程start调用,因此它不会延迟第一次订阅。

2.单消息回调

这也是很常见的情况。 假设我们有以下回调式异步方法:

 libraryObject.requestDataAsync(Some parameters, MessageCallback callback); 

然后我们可以像这样将其转换为Observable(RxJava 2的示例):

 Observable makeRequest(parameters) { return Observable.create(emitter -> { libraryObject.requestDataAsync(parameters, message -> { emitter.onNext(message); emitter.onComplete(); }); }); }