获得Cassandra Writes背压的最佳方法是什么?

我有一个服务,以我控制的速率消耗队列中的消息。 我做了一些处理,然后尝试通过Datastax Java客户端写入Cassandra集群。 我已经使用maxRequestsPerConnectionmaxConnectionsPerHost设置了我的Cassandra集群。 但是,在测试中我发现当我到达maxConnectionsPerHost并且对session.executeAsync maxRequestsPerConnection调用时不会阻塞。

我现在正在做的是使用new Semaphore(maxConnectionsPerHost * maxRequestsPerConnection)并在每个异步请求之前递增它,并在executeAsync返回的未来完成时递减它。 这很好用,但由于驱动程序已在内部跟踪请求和连接,因此它似乎是多余的。

有没有人想出更好的解决方案来解决这个问题?

一个警告:我希望在完成之前将其视为未完成的请求。 这包括重试 ! 我从群集中获得可重试失败的情况(例如等待一致性的超时)是我想要反压并停止消耗来自队列的消息的主要情况。

问题:

 // the rate at which I consume messages depends on how fast this method returns processMessage(message) { // this appears to return immediately even if I have exhausted connections/requests session.executeAsync(preparedStatement.bind(...)); } 

当前解决方案

 constructor() { this.concurrentRequestsSemaphore = new Semaphore(maxConnectionsPerHost * maxRequestsPerConnection); } processMessage(message) { ResultSetFuture resultSetFuture = session.executeAsync(preparedStatement.bind(...)); CompletableFuture future = completableFromListenable(resultSetFuture); concurrentRequestsSemaphore.acquireUninterruptibly(); future.whenComplete((result, exception) -> concurrentRequests.release()); } 

此外,任何人都可以看到此解决方案的任何明显问题?

不杀死集群的一个可能的想法是“限制”你对executeAsync的调用,例如在一批100(或者对你的集群和工作负载最好的数字)之后,你将在客户端代码中hibernate并做一个阻止调用所有100个期货(或使用Guava库将未来列表转换为列表的未来)

这样,在发出100个异步查询之后,您将强制客户端应用程序在继续进行之前等待所有这些查询成功。 如果在调用future.get()时捕获到任何exception,则可以安排重试。 通常,Java驱动程序的默认RetryStrategy已尝试重试。

关于来自服务器的反压信号,从CQL二进制协议V3开始,有一个错误代码通知客户端协调器过载 : https : //github.com/apache/cassandra/blob/trunk/doc/native_protocol_v3。规格#L951

从客户端,您可以通过两种方式获取此重载信息:

  • Java Driver 3.0.0:引入了新的OverloadedException类: http : //www.datastax.com/dev/blog/datastax-java-driver-3-0-0-released#misc
  • 3.0.0之前的Java驱动程序: 抛出DriverException(“主机重载”)

我现在正在做的是使用新的信号量(maxConnectionsPerHost * maxRequestsPerConnection)并在每个异步请求之前递增它,并在executeAsync返回的未来完成时递减它。 这很好用,但由于驱动程序已在内部跟踪请求和连接,因此它似乎是多余的。

这是一种非常合理的方法,允许新请求填写,而其他请求完成。 您可以将许可证发布到未来完成。

驱动程序本身不这样做的原因是它试图尽可能少地阻塞,而是快速失败。 不幸的是,这会给客户带来一些责任。

在通常情况下,一次将多个请求同时发送给主机是不好的。 C *具有native_transport_max_threads设置(默认为128),该设置控制一次处理请求的线程数。 最好是在每个主机的2 *那个数字上限制自己。 (请参阅: Cassandra如何处理阻塞datastax java驱动程序中的execute语句以获取更多详细信息)

我希望在完成之前将其视为未完成的请求。 这包括重试! 我从群集中获得可重试失败的情况(例如等待一致性的超时)是我想要反压并停止消耗来自队列的消息的主要情况。

在成功完成,耗尽其重试或由于某种原因失败之前,驱动程序将无法完成未来。 因此,您可以绑定释放信号量许可证,直到将来完成或失败。