Apache Flink 使用 Windows 在写入 Sink 之前引发延迟

2024-01-09

我想知道 Flink 窗口是否可能导致从数据进入管道到写入 Cassandra 中的表之间有 10 分钟的延迟。

我最初的意图是将每个事务写入 Cassandra 中的一个表,并在 Web 层使用范围键查询该表,但由于数据量很大,我正在考虑延迟写入 N 秒的选项。这意味着我的表将只包含至少 10 分钟前的数据。

下面的小图显示了每分钟滚动的 10 分钟窗口。随着时间的推移,我只想将超过 10 分钟的数据写入 Cassandra(绿色部分)。我想这对于 Flink 来说是可能的吗?

我可以创建每分钟滚动一次的 11 分钟窗口,但最终会丢弃 90% 的数据,这似乎很浪费。

最终解决方案

我创造了我自己的味道FlinkKafkaConsumer09 called DelayedKafkaConsumer这样做的主要原因是覆盖了创建KafkaFetcher

public class DelayedKafkaConsumer<T> extends FlinkKafkaConsumer09<T> {

    private ConsumerRecordFunction applyDelayAction;

    .............

    @Override
    protected AbstractFetcher<T, ?> createFetcher(SourceContext<T> sourceContext, Map<KafkaTopicPartition, Long> assignedPartitionsWithInitialOffsets,
                                                  SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic,
                                                  SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
                                                  StreamingRuntimeContext runtimeContext, OffsetCommitMode offsetCommitMode) throws Exception {
        return new DelayedKafkaFetcher<>(
            sourceContext, assignedPartitionsWithInitialOffsets, watermarksPeriodic, watermarksPunctuated,
            runtimeContext.getProcessingTimeService(), runtimeContext.getExecutionConfig().getAutoWatermarkInterval(),
            runtimeContext.getUserCodeClassLoader(), runtimeContext.getTaskNameWithSubtasks(),
            runtimeContext.getMetricGroup(), this.deserializer, this.properties, this.pollTimeout, useMetrics, applyDelayAction);
    }

The DelayedKafkaFetcher里面有一小段代码runFetchLoop在发出记录之前休眠 n 毫秒。

private void delayMessage(Long msgTransactTime, Long nowMinusDelay) throws InterruptedException {

        if (msgTransactTime > nowMinusDelay) {
            Long sleepTimeout = msgTransactTime - nowMinusDelay;
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug(format("Message with transaction time {0}ms is not older than {1}ms. Sleeping for {2}", msgTransactTime, nowMinusDelay, sleepTimeout));
            }
            TimeUnit.MILLISECONDS.sleep(sleepTimeout);
        }
    }

None

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Apache Flink 使用 Windows 在写入 Sink 之前引发延迟 的相关文章

  • Spring应用中Eureka健康检查的问题

    我正在开发一个基于 Spring 的应用程序 其中包含多个微服务 我的一个微服务充当尤里卡服务器 到目前为止一切正常 在我所有其他微服务中 用 EnableEurekaClient 我想启用这样的健康检查 应用程序 yml eureka c
  • Mockito:如何通过模拟测试我的服务?

    我是模拟测试新手 我想测试我的服务方法CorrectionService correctPerson Long personId 实现尚未编写 但这就是它将执行的操作 CorrectionService将调用一个方法AddressDAO这将
  • Junit:如何测试从属性文件读取属性的方法

    嗨 我有课ReadProperty其中有一个方法ReadPropertyFile返回类型的Myclass从属性文件读取参数值并返回Myclass目的 我需要帮助来测试ReadPropertyFile方法与JUnit 如果可能的话使用模拟文件
  • 为 java 游戏创建交互式 GUI

    大家好 我正在创建一个类似于 java 中的 farmville 的游戏 我只是想知道如何实现用户通常单击以与游戏客户端交互的交互式对象 按钮 我不想使用 swing 库 通用 Windows 看起来像对象 我想为我的按钮导入自定义图像 并
  • 过滤两次 Lambda Java

    我有一个清单如下 1 2 3 4 5 6 7 和 预期结果必须是 1 2 3 4 5 6 7 我知道怎么做才能到7点 我的结果 1 2 3 4 5 6 我也想知道如何输入 7 我添加了i gt i objList size 1到我的过滤器
  • HSQL - 识别打开连接的数量

    我正在使用嵌入式 HSQL 数据库服务器 有什么方法可以识别活动打开连接的数量吗 Yes SELECT COUNT FROM INFORMATION SCHEMA SYSTEM SESSIONS
  • 如何在 Spring 中禁用使用 @Component 注释创建 bean?

    我的项目中有一些用于重构逻辑的通用接口 它看起来大约是这样的 public interface RefactorAwareEntryPoint default boolean doRefactor if EventLogService wa
  • 在 Jar 文件中运行 ANT build.xml 文件

    我需要使用存储在 jar 文件中的 build xml 文件运行 ANT 构建 该 jar 文件在类路径中可用 是否可以在不分解 jar 文件并将 build xml 保存到本地目录的情况下做到这一点 如果是的话我该怎么办呢 Update
  • 如何更改javaFX中按钮的图像?

    我正在使用javaFX 我制作了一个按钮并为此设置了图像 代码是 Image playI new Image file c Users Farhad Desktop icons play2 jpg ImageView iv1 new Ima
  • Java 公历日历更改时区

    我正在尝试设置 HOUR OF DAY 字段并更改 GregorianCalendar 日期对象的时区 GregorianCalendar date new GregorianCalendar TimeZone getTimeZone GM
  • 像 Java 这样的静态类型语言中动态方法解析背后的原因是什么

    我对 Java 中引用变量的动态 静态类型和动态方法解析的概念有点困惑 考虑 public class Types Override public boolean equals Object obj System out println i
  • tomcat 中受密码保护的应用程序

    我正在使用 JSP Servlet 开发一个Web应用程序 并且我使用了Tomcat 7 0 33 as a web container 所以我的要求是tomcat中的每个应用程序都会password像受保护的manager applica
  • 如何在谷歌地图android上显示多个标记

    我想在谷歌地图android上显示带有多个标记的位置 问题是当我运行我的应用程序时 它只显示一个位置 标记 这是我的代码 public class koordinatTask extends AsyncTask
  • 不接受任何内容也不返回任何内容的函数接口[重复]

    这个问题在这里已经有答案了 JDK中是否有一个标准的函数式接口 不接受也不返回任何内容 我找不到一个 像下面这样 FunctionalInterface interface Action void execute 可运行怎么样 Functi
  • 专门针对 JSP 的测试驱动开发

    在理解 TDD 到底是什么之前 我就已经开始编写测试驱动的代码了 在没有实现的情况下调用函数和类可以帮助我以更快 更有效的方式理解和构建我的应用程序 所以我非常习惯编写代码 gt 编译它 gt 看到它失败 gt 通过构建其实现来修复它的过程
  • Android:无法使用 DbHelper 和 Contract 类将数据插入 SQLite

    public class Main2Activity extends AppCompatActivity private EditText editText1 editText2 editText3 editText4 private Bu
  • 我如何在java中读取二进制数据文件

    因此 我正在为学校做一个项目 我需要读取二进制数据文件并使用它来生成角色的统计数据 例如力量和智慧 它的设置是让前 8 位组成一个统计数据 我想知道执行此操作的实际语法是什么 是不是就像读文本文件一样 这样 File file new Fi
  • 找不到符号 NOTIFICATION_SERVICE?

    package com test app import android app Notification import android app NotificationManager import android app PendingIn
  • CamcorderProfile.videoCodec 返回错误值

    根据docs https developer android com reference android media CamcorderProfile html 您可以使用CamcorderProfile获取设备默认视频编解码格式 然后将其
  • 如果没有抽象成员,基类是否应该标记为抽象?

    如果一个类没有抽象成员 可以将其标记为抽象吗 即使没有实际理由直接实例化它 除了单元测试 是的 将不应该实例化的基类显式标记为抽象是合理且有益的 即使在没有抽象方法的情况下也是如此 它强制执行通用准则来使非叶类抽象 它阻止其他程序员创建该类

随机推荐