我想在 kubernetes 上运行 apache flink (1.11.1) 流应用程序。文件系统状态后端保存到 s3。 s3 的检查点正在运行
args:
- "standalone-job"
- "-s"
- "s3://BUCKET_NAME/34619f2862ce3e5fc91d80eae13a434a/chk-4/_metadata"
- "--job-classname"
- "com.abc.def.MY_JOB"
- "--kafka-broker"
- "KAFKA_HOST:9092"
所以我面临的问题是:
- 我必须手动选择以前的状态目录。有可能让它变得更好吗?
- 该作业会增加 chk 目录,但不使用检查点。意味着当我第一次看到一个事件时我抛出一个新事件并将其存储到
ListState<String>
每当我通过 Gitlab 部署应用程序的较新版本时,它都会再次引发此事件。
- 当我将 state.backend 定义到文件系统时,为什么必须在代码中显式启用检查点?
env.enableCheckpointing(Duration.ofSeconds(60).toMillis());
and env.getCheckpointConfig().enableExternalizedCheckpoints(RETAIN_ON_CANCELLATION);
- 你可能会更高兴Ververica 平台:社区版 https://www.ververica.com/getting-started,这将抽象级别提高到您不必处理此级别的细节的程度。它有一个在设计时考虑到 CI/CD 的 API。
- 我不确定我是否理解您的第二点,但您的作业在恢复期间会倒带并重新处理一些数据,这是正常的。 Flink 不保证精确一次处理,而是保证精确一次语义:每个事件都会影响 Flink 管理的状态一次。这是通过回滚到最近检查点中的偏移量,并将所有其他状态回滚到消耗完这些偏移量的所有数据后的状态来完成的。
- 拥有一个状态后端是必要的,因为它可以在作业运行时存储作业的工作状态。如果不启用检查点,则工作状态不会被检查点,并且无法恢复。但是,从 Flink 1.11 开始,您可以通过配置文件启用检查点,使用
execution.checkpointing.interval: 60000
execution.checkpointing.externalized-checkpoint-retention: RETAIN_ON_CANCELLATION
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)