Tag: datastax java driver backpressure

获得Cassandra Writes背压的最佳方法是什么?

我有一个服务,以我控制的速率消耗队列中的消息。 我做了一些处理,然后尝试通过Datastax Java客户端写入Cassandra集群。 我已经使用maxRequestsPerConnection和maxConnectionsPerHost设置了我的Cassandra集群。 但是,在测试中我发现当我到达maxConnectionsPerHost并且对session.executeAsync maxRequestsPerConnection调用时不会阻塞。 我现在正在做的是使用new Semaphore(maxConnectionsPerHost * maxRequestsPerConnection)并在每个异步请求之前递增它,并在executeAsync返回的未来完成时递减它。 这很好用,但由于驱动程序已在内部跟踪请求和连接,因此它似乎是多余的。 有没有人想出更好的解决方案来解决这个问题? 一个警告:我希望在完成之前将其视为未完成的请求。 这包括重试 ! 我从群集中获得可重试失败的情况(例如等待一致性的超时)是我想要反压并停止消耗来自队列的消息的主要情况。 问题: // the rate at which I consume messages depends on how fast this method returns processMessage(message) { // this appears to return immediately even if I have exhausted connections/requests session.executeAsync(preparedStatement.bind(…)); } 当前解决方案 constructor() { this.concurrentRequestsSemaphore = new Semaphore(maxConnectionsPerHost […]