我有一项服务以我控制的速率消耗队列中的消息。我做了一些处理,然后尝试通过 Datastax Java 客户端写入 Cassandra 集群。我已经设置了我的 Cassandra 集群maxRequestsPerConnection
and maxConnectionsPerHost
。然而,在测试中我发现当我达到maxConnectionsPerHost
and maxRequestsPerConnection
打电话给session.executeAsync
不要阻止。
我现在正在做的是使用new Semaphore(maxConnectionsPerHost * maxRequestsPerConnection)
并在每个异步请求之前递增它,并在 future 返回时递减它executeAsync
完成。这工作得很好,但似乎是多余的,因为驱动程序已经在内部跟踪请求和连接。
有没有人想出更好的解决方案来解决这个问题?
需要注意的是:我希望请求在完成之前被视为未完成。这包括重试!我从集群中收到可重试失败的情况(例如等待一致性的超时)是我想要背压并停止消耗队列中的消息的主要情况。
Problem:
// the rate at which I consume messages depends on how fast this method returns
processMessage(message) {
// this appears to return immediately even if I have exhausted connections/requests
session.executeAsync(preparedStatement.bind(...));
}
目前的解决方案:
constructor() {
this.concurrentRequestsSemaphore = new Semaphore(maxConnectionsPerHost * maxRequestsPerConnection);
}
processMessage(message) {
ResultSetFuture resultSetFuture = session.executeAsync(preparedStatement.bind(...));
CompletableFuture<ResultSet> future = completableFromListenable(resultSetFuture);
concurrentRequestsSemaphore.acquireUninterruptibly();
future.whenComplete((result, exception) -> concurrentRequests.release());
}
另外,有人能看出这个解决方案有什么明显的问题吗?
不杀死集群的一种可能的想法是“限制”您的调用executeAsync
例如在一批 100 个(或任何最适合您的集群和工作负载的数字)之后,您将在客户端代码中进行睡眠并对所有 100 个 future 进行阻塞调用(或使用 Guava 库来转换 future 列表)进入列表的未来)
这样,在发出 100 个异步查询后,您将强制客户端应用程序等待所有查询都成功,然后再继续处理。如果调用时发现任何异常future.get()
,您可以安排重试。通常,Java 驱动程序的默认 RetryStrategy 已尝试重试。
关于来自服务器的反压信号,从CQL二进制协议V3开始,有一个错误代码,通知客户端协调器正在运行超载 : https://github.com/apache/cassandra/blob/trunk/doc/native_protocol_v3.spec#L951
从客户端,你可以得到这个超载信息有两种方式:
- Java 驱动程序 3.0.0:新重载异常班级介绍:http://www.datastax.com/dev/blog/datastax-java-driver-3-0-0-released#misc
- 3.0.0之前的Java驱动程序:DriverException(“主机过载”)被抛出
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)