Java / Scala Future由回调驱动

精简版:

如何创建在回调触发器上完成的Promise

长版:

我正在开发一个处理第三方SOAP服务的应用程序。 用户的请求同时委托多个SOAP服务,聚合结果并发送回用户。

系统需要是可扩展的,并且应该允许多个并发用户。 当每个用户请求最终触发大约10个Web服务调用并且每个呼叫阻塞大约1秒时,系统需要设计为具有非阻塞I / O.

我在Play Framework(Java)中使用Apache CXF用于此系统。 我已设法生成异步WS客户端代理并启用异步传输。 我无法弄清楚当我委托多个Web服务代理时如何将Future转换为Play的线程,结果将作为回调获得。

选项1:使用返回Java Future的异步方法调用。

正如java.util.concurrent.Future线程的scala.concurrent.Future包装器中所描述的那样,我们无法将Java Future转换为Scala Future。 从Future获得结果的唯一方法是执行阻止调用者的Future.get() 。 由于CXF生成的代理返回Java Future,因此排除了此选项。

选项2:使用Scala Future。

由于CXF生成代理接口,我不确定是否有任何方式可以干预并返回Scala Future(AFAIK Akka使用Scala Futures)而不是Java Future?

选项3:使用回调方法。

由CXF生成的返回Java Future的异步方法也需要一个回调对象,我想这将在结果准备好时提供回调。 要使用这种方法,我需要返回一个等待我收到回调的Future。

我认为选项3最有希望,虽然我对如何返回一个将在收到回调时完成的Promise没有任何想法。 我可能有一个线程等待一段while(true)并等待,直到结果可用。 再一次,我不知道如何在不阻塞线程的情况下进入wait

简而言之,我正在尝试构建一个系统,该系统正在进行大量的SOAP Web服务调用,其中每个调用都会占用大量时间。 在大量并发Web服务调用的情况下,系统可能很容易耗尽线程。 我正在寻找一种基于非阻塞I / O的解决方案,它可以同时允许许多正在进行的Web服务调用。

选项3看起来不错:)一些import开始……

 import scala.concurrent.{Await, Promise} import scala.concurrent.duration.Duration 

并且,为了说明这一点,这是一个采用回调的模拟CXF API:

 def fetch(url: String, callback: String => Unit) = { callback(s"results for $url") } 

创建一个promise,使用promise作为回调调用API:

 val promise = Promise[String] fetch("http://corp/api", result => promise.success(result)) 

然后你可以将promise.future作为Future一个实例加入你的Play应用程序中。

要测试它,您可以这样做:

 Await.result(promise.future, Duration.Inf) 

这将阻止等待结果,此时你应该在控制台中看到“ http:// corp / api的结果 ”。