获得 Cassandra Writes 背压的最佳方法是什么?

2023-11-22

我有一项服务以我控制的速率消耗队列中的消息。我做了一些处理,然后尝试通过 Datastax Java 客户端写入 Cassandra 集群。我已经设置了我的 Cassandra 集群maxRequestsPerConnection and maxConnectionsPerHost。然而,在测试中我发现当我达到maxConnectionsPerHost and maxRequestsPerConnection打电话给session.executeAsync不要阻止。

我现在正在做的是使用new Semaphore(maxConnectionsPerHost * maxRequestsPerConnection)并在每个异步请求之前递增它,并在 future 返回时递减它executeAsync完成。这工作得很好,但似乎是多余的,因为驱动程序已经在内部跟踪请求和连接。

有没有人想出更好的解决方案来解决这个问题?

需要注意的是:我希望请求在完成之前被视为未完成。这包括重试!我从集群中收到可重试失败的情况(例如等待一致性的超时)是我想要背压并停止消耗队列中的消息的主要情况。

Problem:

// the rate at which I consume messages depends on how fast this method returns
processMessage(message) {
    // this appears to return immediately even if I have exhausted connections/requests
    session.executeAsync(preparedStatement.bind(...));
}

目前的解决方案:

constructor() {
    this.concurrentRequestsSemaphore = new Semaphore(maxConnectionsPerHost * maxRequestsPerConnection);
}

processMessage(message) {
    ResultSetFuture resultSetFuture = session.executeAsync(preparedStatement.bind(...));
    CompletableFuture<ResultSet> future = completableFromListenable(resultSetFuture);
    concurrentRequestsSemaphore.acquireUninterruptibly();
    future.whenComplete((result, exception) -> concurrentRequests.release());
}

另外,有人能看出这个解决方案有什么明显的问题吗?


不杀死集群的一种可能的想法是“限制”您的调用executeAsync例如在一批 100 个(或任何最适合您的集群和工作负载的数字)之后,您将在客户端代码中进行睡眠并对所有 100 个 future 进行阻塞调用(或使用 Guava 库来转换 future 列表)进入列表的未来)

这样,在发出 100 个异步查询后,您将强制客户端应用程序等待所有查询都成功,然后再继续处理。如果调用时发现任何异常future.get(),您可以安排重试。通常,Java 驱动程序的默认 RetryStrategy 已尝试重试。

关于来自服务器的反压信号,从CQL二进制协议V3开始,有一个错误代码,通知客户端协调器正在运行超载 : https://github.com/apache/cassandra/blob/trunk/doc/native_protocol_v3.spec#L951

从客户端,你可以得到这个超载信息有两种方式:

  • Java 驱动程序 3.0.0:新重载异常班级介绍:http://www.datastax.com/dev/blog/datastax-java-driver-3-0-0-released#misc
  • 3.0.0之前的Java驱动程序:DriverException(“主机过载”)被抛出
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

获得 Cassandra Writes 背压的最佳方法是什么? 的相关文章

随机推荐

  • R 相当于 Python 的 re.findall

    我试图从字符串中获取 RegExp 的所有匹配项 但显然在 R 中这并不容易 或者我忽略了一些东西 说实话 这真的很令人困惑 我发现自己在所有选项中迷失了 str extract str match str match all regexe
  • 如何更改 GNU Make 中的当前目录

    我想将源目录与目标目录分开 看来从 Makefile 更改当前工作目录应该是最简单的解决方案 由于以下缺点 目标的显式路径是不够的 Makefile 中的冗余代码 因为对目标的每个引用都应以变量为前缀 更复杂的命令行来构建特定的中间目标 对
  • Oracle驱动程序内存泄漏-Tomcat

    我们使用的是 tomcat 7 0 33 Spring 3 0 1 和 JPA 使用 tomcat JNDI 数据源 Oracle 10g在后端使用ojdbc6 jar 最新 当我们尝试取消部署应用程序时 一些 Oracle 类似乎正在泄漏
  • 为什么我的 Google 应用引擎的域名为“my-project.df.r.appspot.com”?

    我在 Google Cloud 项目中启用了 Google App Enginemy project App Engine 的 URL 是my project appspot com默认情况下 这很好 然而 我发现有时它会变成my proj
  • 如何判断用户的电子邮件地址是否已使用 Django、allauth、rest-auth 和自定义用户进行验证

    我正在使用 Django 2 0 10 以及 Rest framework rest auth 和 allauth 我有一个自定义用户模型 我已经使用 allauth 视图进行了电子邮件验证 用户注册时会发送验证电子邮件 如果我单击电子邮件
  • 重置继承的WPF样式?

    在我的应用程序的 App xaml 部分中 我有一个ResourceDictionary目标类似的元素DataGridColumnHeader and DataGridCell并对它们应用自定义样式 这些定义是全局的 因为它们不使用 x k
  • AFNetworking 2.0 从失败块中的代码 400 获取 JSON

    我在用着AFHTTPRequestOperationManager for a POST要求 现在我故意输入不正确的信息来处理400错误代码 现在 Web 服务实际上返回了一个JSON并向用户解释他们做错了什么的消息 我非常想得到这个JSO
  • Java集合binarySearch无法正常工作

    我只是尝试使用本机 Java 二进制搜索 希望它总能找到第一个出现的位置 但它并不总是返回第一次出现 我在这里做错了什么 import java util class BinarySearchWithComparator public st
  • 不带参数抛出失败信号

    直接打电话就可以吗throw 如果出现问题 您不知道如何恢复 这个想法是让应用程序因转储而崩溃 因为状态未知 或者你应该总是指定一个参数 从MSDN我只发现如果没有参数它会重新抛出 但不知道如果没有初始异常要重新抛出会发生什么 No thr
  • 当从另一个表中删除行时,如何使 PostgreSQL 将行插入到表中?

    我们有一个应用程序 它将根据用户请求从表中删除一行 我无法更改应用程序代码 但是 我想将一行插入到另一个表 有点像日志日志 中 其中包含来自其他几个表的信息 基于要删除的行的信息 我如何在 PostgreSQL 中实现这一目标 写一个触发函
  • Nuget 找不到更新的依赖项

    我刚刚在 ASP 5 MVC 6 beta8 中创建了一个新项目和一个用于测试的兼容类库 问题出现在我打算用于测试的这个新的 Web 类库 项目中 这是我的project json 的样子 version 1 0 0 description
  • Grails hasOne 与belongsTo

    要在 Grails 中创建一对一关系 我可以这样做 class Person static hasOne address Address 在这种情况下 地址表拥有其个人的密钥 我还可以这样做 class Address static bel
  • 将焦点设置在android中listview的任何项目上

    我有一个列表视图 其中包含文本视图作为其元素 现在我希望在启动应用程序时自动聚焦列表的第一项 当我单击其他视图 例如按钮 时 如何将焦点设置在列表中的任何项目上 设置选择和设置焦点是两个不同的事情 如果您只想将选择设置为某个项目 那么您可以
  • 由于 CORS 限制,无法使用 firebase 进行本地测试

    我当前的用例很简单 我只需要向我本地开发的云函数发出post请求 问题是 当我开火时 firebase serve 托管部署在本地主机 5000 并且云功能部署在本地主机 5001 由于端口不同 这两者来自不同的来源 因此 当浏览器发送初始
  • 如何处理对数图中的零

    问题 我想使用 ggplot2 将数据绘制在 y 轴上具有对数刻度的折线图中 不幸的是 我的一些价值观一路下降到零 数据表示依赖于某些参数的特征的相对出现 当在样本中没有观察到该特征时 值为零 这意味着它很少出现 或者实际上从未出现 这些零
  • Android游戏RPG库存系统

    我使用 ArrayList 作为我的 库存 我无法找到一种方法来添加多个相同的物品而不占用 库存 中的位置 例如 我在库存中添加了一瓶药水 现在我添加了另一种药水 但这次不是在库存中添加另一种药水 而是应该显示我有 药水 x 2 同时只占用
  • 获取 Urllib2.Request 的请求标头?

    有没有办法从使用 Urllib2 创建的请求中获取标头或确认使用 urllib2 urlopen 发送的 HTTP 标头 查看请求 和响应标头 的一种简单方法是启用调试输出 opener urllib2 build opener urlli
  • llvm JIT 将库添加到模块

    我正在开发一个使用 LLVM 的 JIT 该语言有一个用 C 编写的小型运行时 我使用 clang 将其编译为 LLVM IR clang runtime cu cuda gpu arch sm 50 c emit llvm 然后加载 bc
  • Hadoop 流 - 从减速器输出中删除尾随选项卡

    我有一个 hadoop 流作业 其输出不包含键 值对 您可以将其视为仅值对或仅键对 我的流式减速器 一个 php 脚本 正在输出由换行符分隔的记录 Hadoop 流处理将此视为没有值的键 并在换行符之前插入一个制表符 这个额外的选项卡是不需
  • 获得 Cassandra Writes 背压的最佳方法是什么?

    我有一项服务以我控制的速率消耗队列中的消息 我做了一些处理 然后尝试通过 Datastax Java 客户端写入 Cassandra 集群 我已经设置了我的 Cassandra 集群maxRequestsPerConnection and