kafka flink timestamp 事件时间和水印

2023-12-23

我正在阅读《使用 Apache Flink 进行流处理》一书,其中指出“从版本 0.10.0 开始,Kafka 支持消息时间戳。当从 Kafka 0.10 或更高版本读取时,如果应用程序在事件时间模式下运行,消费者将自动提取消息时间戳作为事件时间时间戳*” 所以里面一个processElement执行呼叫context.timestamp()默认情况下会返回kafka消息时间戳吗? 您能否提供一个简单的示例来说明如何实现基于消耗的kafka消息时间戳提取(并构建水印)的AssignerWithPeriodicWatermarks/AssignerWithPunctuatedWatermarks。

如果我正在使用TimeCharacteristic.ProcessingTime,ctx.timestamp() 会返回处理时间吗?在这种情况下,它会类似于context.timerService().currentProcessingTime() .

谢谢。


Flink Kafka 消费者会为您处理这个问题,并将时间戳放在需要的位置。在 Flink 1.11 中,您可以简单地依赖于此,尽管您仍然需要提供一个 WatermarkStrategy 来指定无序性(或断言时间戳是有序的):

FlinkKafkaConsumer<String> myConsumer = new FlinkKafkaConsumer<>(...);
myConsumer.assignTimestampsAndWatermarks(
    WatermarkStrategy.
        .forBoundedOutOfOrderness(Duration.ofSeconds(20)));

在 Flink 的早期版本中,您必须提供时间戳分配器的实现,如下所示:

public long extractTimestamp(Long element, long previousElementTimestamp) {
    return previousElementTimestamp;
}

这个版本的extractTimestamp方法将 StreamRecord 中存在的时间戳的当前值传递为previousElementTimestamp,在本例中将是 Flink Kafka 消费者放置的时间戳。

Flink 1.11 文档 https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/connectors/kafka.html#kafka-consumers-and-timestamp-extractionwatermark-emission
Flink 1.10 文档 https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/connectors/kafka.html#using-kafka-timestamps-and-flink-event-time-in-kafka-010

至于返回的是什么ctx.timestamp()使用时TimeCharacteristic.ProcessingTime,在这种情况下此方法返回 NULL。 (从语义上讲,是的,时间戳就好像是当前处理时间,但这不是它的实现方式。)

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

kafka flink timestamp 事件时间和水印 的相关文章

随机推荐

  • 使用slidenavigatoin动态地将xtype项目添加到面板

    所以我试图将项目动态地放置到具有slidenavigation特征 FlyoutNavigation js Ext define APN view FlyoutNavigation id flyoutNavigationPanel exte
  • 检查当前活动窗口是否为桌面

    我尝试检查当前活动窗口是否是桌面执行某些操作 我在计时器中编写了以下代码 但 GetDesktopWindow 和 GetForegroundWindow 返回的句柄值不是相同的值 if GetForegroundWindow GetDes
  • 图放置四开降价(浮动说明符)

    在 LaTeX 中 可以使用浮点说明符来控制图形位置 例如 H 将浮点精确地放置在代码中的位置 begin figure H caption Caption label label includegraphics width textwid
  • 计算两个日期之间的天数 - J2ME

    我想计算两个日期之间的天数 我在网上找到了一些解决方案 但问题出在我的 NetBeans 中公历不可用 所以无法计算天数 有人可以帮忙吗 在 Java Micro Edition 中 您没有 GregorianCalendar 因此您必须使
  • 具有重复字符的正则表达式

    我需要编写一个正则表达式 它可以检测仅包含字符 x y 和 z 的字符串 但其中的字符与其相邻字符不同 这是一个例子 xyzxzyz 通过 xyxyxyx 通过 xxyzxz 失败 重复 x zzzxxzz 失败 相邻字符重复 我认为这会起
  • React-native 0.40+ 获取自签名证书

    我找到了这个答案https stackoverflow com a 41703745 1646117 https stackoverflow com a 41703745 1646117但我无法让它与 React native 0 40 一
  • myDate.toLocaleString() 对于英语以外的任何其他语言都没有正确格式化

    我当前正在将日期发送到电子邮件模板 我需要根据收件人的语言设置这些日期的格式 我正在使用 toLocaleString 函数 它对英语很有魅力 但对我尝试过的其他语言则不然 我尝试使用 moment 或 luxon 来格式化日期 但我想我的
  • Nginx、Rails 和 Oauth。上游过早关闭连接

    我有一个奇怪的问题 只有当我使用 nginx 和 unicorn 时才会在生产环境中出现 当我在没有 nginx 的情况下使用 unicorn 时 这种情况不会发生 问题 我有一个简单的 oauth 身份验证 允许用户通过 GitHub 注
  • 内存屏障生成器

    Reading Joseph Albahari 的线程教程 http www albahari com threading part4 aspx 以下被提及为内存屏障的生成器 C s lock陈述 Monitor Enter Monitor
  • 当我使用“仅运行此应用程序所需的文件”选项发布 ASP.NET Web 项目时,为什么未复制 web.config?

    如何纠正呢 仅运行此应用程序所需的文件 是指 bin 文件夹中的任何构建输出文件 例如 DLL 和引用 以及具有内容构建操作的任何文件 由于 web config 不是其中任何一个 因此您必须将发布选项更改为 所有项目文件 或者手动复制 w
  • 使用 REGEXP_SUBSTR(AGGREGATOR,'[^;]+',1,LEVEL) 时 Oracle 查询速度变慢

    我使用查询来获取不同的行而不是分号分隔的值 该表如下所示 row id aggregator 1 12 45 2 25 使用查询我希望输出如下所示 row id aggregator 1 12 1 45 2 25 我正在使用以下查询 SEL
  • 自适应阈值参数混淆

    谁能告诉我这些自适应阈值函数中的参数是什么以及它们如何控制黑白像素 cv2 adaptiveThreshold img 255 cv2 ADAPTIVE THRESH MEAN C cv2 THRESH BINARY 11 2 th3 cv
  • 谷歌图表水平滚动条

    这是到目前为止我的图表 JSFiddle https jsfiddle net 5yv936sr 5 google charts load current packages corechart google charts setOnLoad
  • 使用 cURL 限制下载带宽

    我一直在尝试用PHP限制带宽 我无法使用 PHP 来限制下载速率 你能在这里帮忙吗 function total filesize url ch curl init curl setopt ch CURLOPT URL url curl s
  • Meteor Deps - 运行客户端函数

    我正在努力掌握 Meteor 部门 具体来说我的用例是 2 当集合改变时 我想在客户端运行一个jQuery函数 I thinkdeps 是我正在寻找的东西 但目前我只使用过 Template templateName set gt retu
  • Windows Phone 从文本文件读取

    我正在编写一个应用程序 它从文本文件读取数据并将其用作应用程序的基础 这只是一个简单的文本文件 其中包含程序所需的几行数据 我已将文本文件作为项目的一部分包含在 Visual Studio 中 但是 当我尝试运行应用程序并使用 Stream
  • 如何将 ASP.NET MVC 视图呈现为字符串?

    我想输出两个不同的视图 一个作为字符串 将作为电子邮件发送 另一个是向用户显示的页面 这在 ASP NET MVC beta 中可能吗 我尝试过多个例子 1 ASP NET MVC Beta 中的 RenderPartial 为字符串 ht
  • OverflowError:MongoDB 只能处理最多 8 字节的整数?

    过去 12 个小时我一直在网上搜索 我完全迷失了 请帮助 我正在尝试从 API 端点提取数据并将其放入 MongoDB 中 数据如下 links self href https us api battle net data sc2 ladd
  • 好的 Javascript 组合框可以替代包含大量元素的组合框吗?

    我有一个页面 其中我的组合框有数百个元素 这使得很难选择我想要的一项 有没有一个好的 Javascript 替代品可以比
  • kafka flink timestamp 事件时间和水印

    我正在阅读 使用 Apache Flink 进行流处理 一书 其中指出 从版本 0 10 0 开始 Kafka 支持消息时间戳 当从 Kafka 0 10 或更高版本读取时 如果应用程序在事件时间模式下运行 消费者将自动提取消息时间戳作为事