Kafka以相反的顺序消费消息

2024-01-16

我使用Kafka 0.10,我有一个主题logs我的物联网设备将日志发布到其中,我的消息的关键是device-id,所以同一设备的所有日志都在同一个分区。

我有一个 API/devices/{id}/tail-logs需要显示呼叫时某台设备的最后 N 条日志。

目前,我以一种非常低效的方式(但有效)实现它,因为我从包含设备日志的分区的开头(即最旧的日志)开始,直到达到当前时间戳。

一种更有效的方法是,如果我可以获得当前的最新偏移量,然后向后消费消息(我需要过滤掉一些消息以仅保留我正在寻找的设备的消息)

可以用kafka来做吗?如果不是,如何解决这个问题? (我看到的一个更重的解决方案是将kafka-connect链接到弹性搜索,然后查询elasticsearch,但为此再增加2个组件似乎有点矫枉过正......)


由于您使用的是 0.10.2,我建议编写一个 Kafka Streams 应用程序。应用程序将是有状态的,并且状态将保存每个的最后 N 条记录/日志device-id-- 如果新数据写入输入主题,Kafka Streams 应用程序将仅更新其状态(无需重新读取整个主题)。

此外,该应用程序还可以满足您的请求(“api/devices/{id}/tail-logs" using 交互式查询 http://docs.confluent.io/current/streams/developer-guide.html#interactive-queries特征。

因此,我不会构建一个必须重新计算每个请求的答案的无状态应用程序,而是构建一个有状态应用程序,它为所有可能的请求(即,对于所有请求)急切地计算结果(并始终自动更新结果)device-ids) 并在请求到来时返回已经计算的结果。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Kafka以相反的顺序消费消息 的相关文章

  • Kafka Connect Confluence S3 Sink 连接器:找不到类 io.confluence.connect.avro.AvroConverter

    使用此 Kafka Connect 连接器 https www confluence io hub confluenceinc kafka connect s3 https www confluent io hub confluentinc
  • 将数据从 Kafka 存储传输到 Kafka 主题

    我想在卡夫卡做这样的事情 继续将数据存储在 KStream Ktable Kafka store 中 当我的应用程序收到特定事件 数据时 仅将上述存储中的特定数据集发送到主题 我们可以在卡夫卡中做到这一点吗 我认为单独使用 Kafka 消费
  • 从 Apache Kafka 中的主题删除消息

    所以我是 Apache Kafka 的新手 我正在尝试创建一个简单的应用程序 以便我可以更好地理解 API 我知道这个问题在这里被问了很多 但是如何清除存储在主题上的消息 记录 我看到的大多数答案都说要更改消息保留时间或删除并重新创建主题
  • 了解Kafka流groupBy和window

    我无法理解 kafka 流中的 groupBy groupById 和窗口的概念 我的目标是聚合一段时间内 例如 5 秒 的流数据 我的流数据看起来像 value 0 time 1533875665509 value 10 time 153
  • 卡夫卡流:RocksDB TTL

    据我了解 默认 TTL 设置为无穷大 非正数 但是 如果我们需要在存储中保留数据最多 2 天 我们可以使用 RocksDBConfigSetter 接口实现 即 options setWalTtlSeconds 172800 进行覆盖吗 或
  • Spring Kafka - 为任何主题的分区消耗最后 N 条消息

    我正在尝试读取请求的卡夫卡消息数 对于非事务性消息 我们将从 endoffset N 对于 M 个分区 开始轮询并收集当前偏移量小于每个分区的结束偏移量的消息 对于幂等 事务消息 我们必须考虑事务标记 重复消息 这意味着偏移量将不连续 在这
  • 为什么我无法从外部连接到 Kafka?

    我在 ec2 实例上运行 kafka 所以amazon ec2实例有两个ip 一个是内部ip 第二个是外部使用的 我从本地计算机创建了生产者 但它重定向到内部 IP 并给我连接不成功的错误 任何人都可以帮助我在 ec2 实例上配置 kafk
  • 如何检测java中的消费者是否无法使用kafka代理?

    我有一个简单的 Java Kafka 消费者 如果 Kafka 代理不可用 我试图捕获异常 我需要它来中断线程 我有这样的代码 KafkaConsumer
  • 带有 spring-kafka 的 Kafka 死信队列 (DLQ)

    最好的实施方式是什么死信队列 DLQ Spring Boot 2 0 应用程序中的概念 使用 spring kafka 2 1 x 来处理无法处理的所有消息 KafkaListener某些bean发送到某些预定义的Kafka DLQ主题的方
  • Kafka 是否保证具有任何配置参数值的单个分区内的消息排序?

    如果我在 Producer 中将 Kafka 配置参数设置为 1 retries 3 2 max in flight requests per connection 5 那么一个分区内的消息很可能不按 send order 排列 Kafka
  • Kafka Streams 反序列化处理程序

    我正在尝试在反序列化中使用 LogAndContinueExceptionHandler 当发生错误时 通过成功记录错误并继续 它可以正常工作 但是 假设我的传入消息有连续的错误流 我停止并重新启动 kafka 流应用程序 然后我看到失败并
  • Kafka JDBC Sink Connector,批量插入值

    我每秒收到很多消息 通过 http 协议 50000 100000 并希望将它们保存到 PostgreSql 我决定使用 Kafka JDBC Sink 来实现此目的 消息以一条记录保存到数据库 而不是批量保存 我想在 PostgreSQL
  • JDBC Kafka Connector 可以从多个数据库中提取数据吗?

    我想设置一个 JDBC Kafka 连接器集群 并将它们配置为从同一主机上运行的多个数据库中提取数据 我一直在查看 Kafka Connect 文档 似乎在配置 JDBC 连接器后 它只能从单个数据库中提取数据 谁能证实这一点吗 根据您启动
  • 如何使用PySpark结构流+Kafka

    我尝试将 Spark 结构流与 kafka 一起使用 并且在使用 Spark 提交时遇到问题 消费者仍然从生产中接收数据 但 Spark 结构出错 请帮我找到我的代码的问题 这是我在 test py 中的代码 from kafka impo
  • 即使在 Kafka 中进行轮询后,当前也不会发生分区分配

    我有 Java 8 应用程序与 Apache Kafka 2 11 0 10 1 0 一起使用 我需要使用seek特征为poll来自分区的旧消息 然而我遇到了一个例外No current assignment for partition每次
  • 如何在kafka中定义多个序列化器?

    比如说 我发布和使用不同类型的 java 对象 对于每个对象 我必须定义自己的序列化器实现 我们如何在 serializer class 属性下提供kafka消费者 生产者属性文件中的所有实现 我们有一个类似的设置 不同主题中的不同对象 但
  • Kafka Consumer 如何(应该)应对有毒消息

    当 Kafka Consumer 无法反序列化消息时 客户端应用程序是否有责任处理有毒消息 Or Kafka是否会 增加 消息偏移并继续消费有效消息 是否有处理 Kafka 主题上的有毒消息的 最佳实践 当 Kafka 无法反序列化记录时
  • 使用来自多个 kafka 主题的消息的最佳实践是什么?

    我需要消费来自不同卡夫卡主题的消息 我是否应该为每个主题创建不同的消费者实例 然后根据分区数量启动一个新的处理线程 或者 我应该从单个消费者实例订阅所有主题 并且应该启动不同的处理线程 感谢和问候 梅加 唯一的规则是 您必须考虑 Kafka
  • 为什么kafka中的__consumer_offsets主题没有传播到所有经纪人?

    我有一个3 zk节点集群 和7 卡夫卡经纪人 nodes 因此 当我创建任何主题时 我可以使用命令行参数设置副本因子和分区数 这些分区分布到所有 7 个经纪商 但是有一个主题 即 consumer offsets 它是自动创建的 并且仅传播
  • 如何删除 Apache Kafka 中的多个主题

    假设我有许多具有相同前缀的主题 例如 giorgos topic1 giorgos topic2 giorgos topic3 用于删除单个主题的命令 例如giorgos topic1 如下 bin kafka topics sh zook

随机推荐

  • 在 asp.net 中上传文件之前如何检查文件类型?

    我们如何在不使用文件扩展名的情况下检查文件类型 例如jpg等格式 上传它们使用 asp net 和 c 我正在使用 vs 2008 asp net c TELERIK 控件 RadUpload 想象一下有人将文本文件扩展名更改为 jpg 并
  • Haskell 中类型表达式的 Lambda?

    Haskell 或特定的编译器是否有类似类型级 lambda 的东西 如果这甚至是一个术语 详细说明一下 假设我有一个参数化类型Foo a b并想要Foo b成为 Functor 的一个实例 有没有什么机制可以让我做类似的事情 instan
  • 如何在安装了 goclipse 的 eclipse 中运行 GO 项目

    我已经在 eclipse 中安装了 goclipse 并创建了一个新的 go 项目 现在这就是我所拥有的 我的 hello go 看起来像这样 package main import fmt func main fmt Println He
  • 何时在 Makefile 中使用空格或制表符?

    我正在创建一个使用条件 if 和 ifneq 的 makefile 我注意到 如果我使用 if 下一行应该用空格缩进 if d d then
  • 如何在 gdb 中使用带有 FS 或 GS​​ 基址的逻辑地址?

    gdb 提供了读取或写入特定的功能线性地址 例如 gdb x 1wx 0x080483e4 0x80483e4
  • Spark 2.0 DataSets groupByKey 和 除法操作以及类型安全

    我对 Spark 2 0 DataSets 非常满意 因为它的编译时类型安全 但这里有几个我无法解决的问题 我也没有找到很好的文档 问题 1 对聚合列进行除法运算 考虑下面的代码 我有一个 DataSet MyCaseClass 我想对 c
  • 如何在android中使网格视图水平滚动而不是垂直滚动?

    我有一个动态网格视图 意味着其内容有所不同 因此 如果项目数量增加 则会进行垂直滚动 我想把它做成水平滚动 请为此提出一些解决方案
  • Python pandas / matplotlib 在条形图列上方注释标签[重复]

    这个问题在这里已经有答案了 如何添加要在条形图中的条形上方显示的值的标签 import pandas as pd import matplotlib pyplot as plt df pd DataFrame Users Bob Jim T
  • 使用“wait_variable()”时无法退出 tkinter 应用程序

    我有一个 python 代码 其中包括tkinter窗口和其他正在运行的任务 我一直在尝试绑定 WM DELETE WINDOW 当我关闭窗口但无法实现该功能时 该事件会退出我的 python 代码 这就是我尝试的 def on exit
  • 如何在 postgresql 上使用 sqlalchemy 进行正确的更新插入?

    我想使用 sqlalchemy 核心使用 postgresql 9 5 添加的 新 功能进行更新插入 虽然它已实现 但我对语法感到非常困惑 它无法适应我的需求 这是我希望能够执行的示例代码 from sqlalchemy ext decla
  • 仅在提供后才计算下载次数

    我们有以下代码可供下载 public class downloadRelease IHttpHandler public void ProcessRequest HttpContext context snip context Respon
  • Flex-wrap 具有不同高度的行

    我正在实现带有哈希标签链接的纯 CSS 选项卡 我非常非常接近 但无法完全让柔性包装正常工作 为了让一切按照我想要的方式工作 target 我之前已经使用单选按钮完成了此操作 这提供了更多的灵活性 我需要所有选项卡和所有部分都处于同一级别
  • 如何解决 AWS Cloudformation 中的循环依赖关系

    我创建了一个 AWS Cloudformation 模板 但在克服循环依赖项时遇到问题 我正在创建一个 EC2 实例和一个负载均衡器 负载均衡器依赖于 EC2 实例 因为它在其实例属性中引用它 一切都工作正常 直到我必须在 EC2 实例 I
  • 如何更新datagridview中的单元格?

    我有连接到我的数据库 访问 的 datagridview 如果我停留在任何单元格上并更改值 则会看到该值已更改 但是当我进行刷新时 我看到该值又回到了原始值 我如何更新这个单元格 没有sql查询 我将数据集绑定到 datagridview
  • 在 Spring Boot JPA 中,如何正确 POST 其实体表示与不同实体具有外键关联的对象?

    如果我有一个包含另一个类的对象的实体 例如Book其内部有一个实体Publisher关联的实体如下 ManyToOne JoinColumn name PUB CODE referencedColumnName PUB CODE priva
  • 如何反转 Groovy 集合的排序?

    我正在根据多个字段对列表进行排序 sortedList sort it getAuthor it getDate 这工作正常 但我想要逆转日期并且reverse 不起作用 如何按升序对作者排序 但按降序 反向 顺序对日期排序 我想要的示例
  • 如何以干净的方式分叉现有的 Meteorite 包?

    我正在尝试找出在项目中分叉 Atmosphere 上现有包的最佳 最干净的方法 我遇到过一些情况 现有的包需要一些修改 我被迫分叉它 据我所知 存在以下选项 不幸的是 所有这些都有自己的问题 我还没有找到完美的解决方案 我会用meteor
  • 使用水豚测试内容顺序(序列)

    我尝试过使用以下语法 page body index 姓名 但问题是 如果同一页面上有多个具有相同内容的字符串 则无法检查特定字符串的索引 对于前 页面有内容 姓名 和 电话 3次 那么如何验证具体内容的顺序 请建议我们是否可以使用 CSS
  • Chrome 不支持 getUserMedia()

    我正在尝试使用 getUserMedia 使用我自己的网站 使用我自己的 IP 地址运行 来访问我的网络摄像头 它工作正常 直到我再次尝试我的网站 我尝试过其他演示站点 给出的错误是 getUserMedia 不受支持 Chrome版本v4
  • Kafka以相反的顺序消费消息

    我使用Kafka 0 10 我有一个主题logs我的物联网设备将日志发布到其中 我的消息的关键是device id 所以同一设备的所有日志都在同一个分区 我有一个 API devices id tail logs需要显示呼叫时某台设备的最后