我想知道 Flink 窗口是否可能导致从数据进入管道到写入 Cassandra 中的表之间有 10 分钟的延迟。
我最初的意图是将每个事务写入 Cassandra 中的一个表,并在 Web 层使用范围键查询该表,但由于数据量很大,我正在考虑延迟写入 N 秒的选项。这意味着我的表将只包含至少 10 分钟前的数据。
下面的小图显示了每分钟滚动的 10 分钟窗口。随着时间的推移,我只想将超过 10 分钟的数据写入 Cassandra(绿色部分)。我想这对于 Flink 来说是可能的吗?
我可以创建每分钟滚动一次的 11 分钟窗口,但最终会丢弃 90% 的数据,这似乎很浪费。
最终解决方案
我创造了我自己的味道FlinkKafkaConsumer09
called DelayedKafkaConsumer
这样做的主要原因是覆盖了创建KafkaFetcher
public class DelayedKafkaConsumer<T> extends FlinkKafkaConsumer09<T> {
private ConsumerRecordFunction applyDelayAction;
.............
@Override
protected AbstractFetcher<T, ?> createFetcher(SourceContext<T> sourceContext, Map<KafkaTopicPartition, Long> assignedPartitionsWithInitialOffsets,
SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic,
SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
StreamingRuntimeContext runtimeContext, OffsetCommitMode offsetCommitMode) throws Exception {
return new DelayedKafkaFetcher<>(
sourceContext, assignedPartitionsWithInitialOffsets, watermarksPeriodic, watermarksPunctuated,
runtimeContext.getProcessingTimeService(), runtimeContext.getExecutionConfig().getAutoWatermarkInterval(),
runtimeContext.getUserCodeClassLoader(), runtimeContext.getTaskNameWithSubtasks(),
runtimeContext.getMetricGroup(), this.deserializer, this.properties, this.pollTimeout, useMetrics, applyDelayAction);
}
The DelayedKafkaFetcher
里面有一小段代码runFetchLoop
在发出记录之前休眠 n 毫秒。
private void delayMessage(Long msgTransactTime, Long nowMinusDelay) throws InterruptedException {
if (msgTransactTime > nowMinusDelay) {
Long sleepTimeout = msgTransactTime - nowMinusDelay;
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(format("Message with transaction time {0}ms is not older than {1}ms. Sleeping for {2}", msgTransactTime, nowMinusDelay, sleepTimeout));
}
TimeUnit.MILLISECONDS.sleep(sleepTimeout);
}
}