如何使用Akka HTTP从多个actor / web处理程序正确调用单个服务器?

我有一个服务(让我们称之为服务A),它使用Akka Server HTTP来处理传入的请求。 我还有第三方应用程序(服务B),它提供了几个Web服务。 服务A的目的是转换客户端请求,调用服务B的一个或多个Web服务,合并/转换结果并将其提供给客户端。

我在某些部分使用Actors,而在其他部分使用Future。 要调用服务B,我使用Akka HTTP客户端。

Http.get(actorSystem).singleRequest(HttpRequest.create() .withUri("http://127.0.0.1:8082/test"), materializer) .onComplete(...) 

问题是,每个Service A请求都会创建一个新流,如果有多个并发连接,则会产生akka.stream.OverflowStrategy$Fail$BufferOverflowException: Exceeded configured max-open-requests value of [32] error

我已经问过这个问题,并建议使用单个Flow 如何正确调用Akka HTTP客户端以获取多个(10k-100k)请求?

虽然它适用于来自单个地方的一批请求,但我不知道如何使用来自所有并发请求处理程序的单个Flow。

这样做的正确“Akka-way”是什么?

我想你可以使用Source.queue来缓冲你的请求。 下面的代码假设您需要从第三方服务获得答案,因此非常欢迎拥有Future[HttpResponse] 。 这样,您还可以提供溢出策略以防止资源不足。

 import akka.actor.ActorSystem import akka.http.scaladsl.Http import akka.http.scaladsl.model.{HttpRequest, HttpResponse} import akka.stream.scaladsl.{Keep, Sink, Source} import akka.stream.{ActorMaterializer, OverflowStrategy} import scala.concurrent.duration._ import scala.concurrent.{Await, Future, Promise} import scala.util.{Failure, Success} import scala.concurrent.ExecutionContext.Implicits.global implicit val system = ActorSystem("main") implicit val materializer = ActorMaterializer() val pool = Http().cachedHostConnectionPool[Promise[HttpResponse]](host = "google.com", port = 80) val queue = Source.queue[(HttpRequest, Promise[HttpResponse])](10, OverflowStrategy.dropNew) .via(pool) .toMat(Sink.foreach({ case ((Success(resp), p)) => p.success(resp) case ((Failure(e), p)) => p.failure(e) }))(Keep.left) .run val promise = Promise[HttpResponse] val request = HttpRequest(uri = "/") -> promise val response = queue.offer(request).flatMap(buffered => { if (buffered) promise.future else Future.failed(new RuntimeException()) }) Await.ready(response, 3 seconds) 

(从我的博文中复制的代码)

这是接受答案的 Java版本

 final Flow< Pair>, Pair, Promise>, NotUsed> flow = Http.get(actorSystem).superPool(materializer); final SourceQueue>> queue = Source.>> queue(BUFFER_SIZE, OverflowStrategy.dropNew()) .via(flow) .toMat(Sink.foreach(p -> p.second().complete(p.first())), Keep.left()) .run(materializer); ... public CompletionStage request(HttpRequest request) { log.debug("Making request {}", request); Promise promise = Futures.promise(); return queue.offer(Pair.create(request, promise)) .thenCompose(buffered -> { if (buffered instanceof QueueOfferResult.Enqueued$) { return FutureConverters.toJava(promise.future()) .thenApply(resp -> { if (log.isDebugEnabled()) { log.debug("Got response {} {}", resp.status(), resp.getHeaders()); } return resp; }); } else { log.error("Could not buffer request {}", request); return CompletableFuture.completedFuture(HttpResponse.create().withStatus(StatusCodes.SERVICE_UNAVAILABLE)); } }); } 

您需要做的就是在服务A代码中设置HostConnectionPool到Service B. 这将为您提供一个可以添加到服务A流的流,以使用连接池而不是每个流的新连接将请求从A发送到B. 从文档:

与连接级客户端API相反,主机级API使您无需手动管理各个HTTP连接。 它自动管理与一个特定目标端点(即主机/端口组合)的可配置连接池。

在不同的流中,此Flow的每个实现都将从此底层连接池中获取:

获取连接池到给定目标端点的最佳方法是Http.get(system).cachedHostConnectionPool(...)方法,该方法返回可以“烘焙”到应用程序级流设置中的Flow 。 此流程也称为“池客户端流程”。