我正在阅读《使用 Apache Flink 进行流处理》一书,其中指出“从版本 0.10.0 开始,Kafka 支持消息时间戳。当从 Kafka 0.10 或更高版本读取时,如果应用程序在事件时间模式下运行,消费者将自动提取消息时间戳作为事件时间时间戳*”
所以里面一个processElement
执行呼叫context.timestamp()
默认情况下会返回kafka消息时间戳吗?
您能否提供一个简单的示例来说明如何实现基于消耗的kafka消息时间戳提取(并构建水印)的AssignerWithPeriodicWatermarks/AssignerWithPunctuatedWatermarks。
如果我正在使用TimeCharacteristic.ProcessingTime
,ctx.timestamp() 会返回处理时间吗?在这种情况下,它会类似于context.timerService().currentProcessingTime()
.
谢谢。
Flink Kafka 消费者会为您处理这个问题,并将时间戳放在需要的位置。在 Flink 1.11 中,您可以简单地依赖于此,尽管您仍然需要提供一个 WatermarkStrategy 来指定无序性(或断言时间戳是有序的):
FlinkKafkaConsumer<String> myConsumer = new FlinkKafkaConsumer<>(...);
myConsumer.assignTimestampsAndWatermarks(
WatermarkStrategy.
.forBoundedOutOfOrderness(Duration.ofSeconds(20)));
在 Flink 的早期版本中,您必须提供时间戳分配器的实现,如下所示:
public long extractTimestamp(Long element, long previousElementTimestamp) {
return previousElementTimestamp;
}
这个版本的extractTimestamp
方法将 StreamRecord 中存在的时间戳的当前值传递为previousElementTimestamp
,在本例中将是 Flink Kafka 消费者放置的时间戳。
Flink 1.11 文档 https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/connectors/kafka.html#kafka-consumers-and-timestamp-extractionwatermark-emission
Flink 1.10 文档 https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/connectors/kafka.html#using-kafka-timestamps-and-flink-event-time-in-kafka-010
至于返回的是什么ctx.timestamp()
使用时TimeCharacteristic.ProcessingTime
,在这种情况下此方法返回 NULL。 (从语义上讲,是的,时间戳就好像是当前处理时间,但这不是它的实现方式。)
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)