基于流的应用程序中的受控/手动错误/恢复处理

2024-04-06

我正在开发一个基于的应用程序Apache Flink,它利用Apache Kafka用于输入和输出。该应用程序可能会被移植到Apache Spark,所以我也将其添加为标签,问题仍然相同。

我要求通过 kafka 接收的所有传入消息必须按顺序处理,并安全地存储在持久层(数据库)中,并且不得丢失任何消息。

此应用程序中的流部分相当琐碎/小,因为主要逻辑将归结为以下内容:

environment.addSource(consumer)    // 1) DataStream[Option[Elem]]
  .filter(_.isDefined)             // 2) discard unparsable messages
  .map(_.get)                      // 3) unwrap Option
  .map(InputEvent.fromXml(_))      // 4) convert from XML to internal representation
  .keyBy(_.id)                     // 5) assure in-order processing on logical-key level
  .map(new DBFunction)             // 6) database lookup, store of update and additional enrichment
  .map(InputEvent.toXml(_))        // 7) convert back to XML
  .addSink(producer)               // 8) attach kafka producer sink

现在,在这个管道中,出现了几种错误情况could occur:

  • 数据库变得不可用(关闭、表空间已满……)
  • 由于逻辑错误(来自列格式),无法存储更改
  • 由于代理不可用,kafka 生产者无法发送消息

可能还有其他情况。

现在我的问题是,how在这些情况下,我能否保证上述的一致性,而实际上我必须这样做:

  1. Stream-Operator 6) 检测到问题(数据库不可用)
  2. 的 DB 连接DBFunction必须恢复对象,这可能只会在几分钟后成功
  3. 这意味着整个处理必须暂停,最好是整个管道,以便传入的消息被大量加载到内存中
  4. 数据库恢复后恢复处理。处理必须准确地恢复到 1) 中遇到问题的消息

现在我知道至少有两种关于故障处理的工具:

  1. kafka消费者抵消
  2. apache flink 检查点

然而,在搜索文档时,我没有看到如何在单个运算符内的流处理过程中使用其中任何一个。

那么,在流应用程序中进行细粒度错误处理和恢复的推荐策略是什么?


几点:

keyBy 不会帮助确保按顺序处理。如果有的话,它可能会交错来自不同 Kafka 分区的事件(这些分区在每个分区内可能是有序的),从而在以前不存在的地方造成无序。如果不了解您打算使用多少个 FlinkKafkaConsumer 实例、每个实例将从多少个分区中消费、密钥如何跨 Kafka 分区分布以及您为什么这么想,就很难更具体地评论如何保证按顺序处理keyBy 是必要的——但是如果你设置正确,保留顺序可能是可以实现的。重新解释为KeyedStream https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/stream/experimental.html在这里可能会有帮助,但这个功能很难理解,而且正确使用也很棘手。

你可以使用 Flink 的异步函数 https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/stream/operators/asyncio.html以容错、一次性的方式管理与外部数据库的连接。

Flink 不支持系统化的细粒度恢复——它的检查点是整个分布式集群状态的全局快照,并且被设计为在恢复期间作为整体、自洽的快照使用。如果您的作业失败,通常唯一的办法是从检查点重新启动,这将涉及回滚输入队列(到检查点中存储的偏移量),重播自这些偏移量以来的事件,重新发出数据库查找(异步函数会自动完成),并使用kafka事务来实现端到端的Exactly Once语义。然而,在并行作业令人尴尬的情况下,有时可以利用细粒度恢复 https://cwiki.apache.org/confluence/display/FLINK/FLIP-1+%3A+Fine+Grained+Recovery+from+Task+Failures.

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

基于流的应用程序中的受控/手动错误/恢复处理 的相关文章

随机推荐

  • 初始化 C 结构的双花括号的含义是什么?

    我目前正在处理遗产C 代码 用gcc 2 9 X成功编译 我被要求将此遗留代码移植到 gcc 3 4 X 大多数错误都很容易纠正 但这个特殊的错误让我感到困惑 上下文 struct TMessage THeader header TData
  • C# 程序员的 C++ [关闭]

    Closed 这个问题正在寻求书籍 工具 软件库等的推荐 不满足堆栈溢出指南 help closed questions 目前不接受答案 我对 Java 和 C 的 OO 有很好的理解 而且我很幸运在我的工程课程中接触到了汇编程序和 C 的
  • 控制器中的@Transactional

    首先我想说 我完全同意只让服务层具有事务性 但有时世界并不完美 而现在我正处于这种情况之中 基本上我被分配到一个很棒的项目 遗留代码已经有 4 年多了 问题是 开发人员没有遵循任何引入业务逻辑的模式 因此您可以对来自控制器的多个服务调用进行
  • Tomcat 7 连接被拒绝

    我有一个在 Jelastic 上运行的 Tomcat 实例 并且有两个已部署的应用程序 用于 foo 上下文和 bar 上下文 在处理对 foo 的请求期间 我们向 bar 发出 HTTP 请求 用于授权 并且这里总是出现异常 Connec
  • 如何使用 Castle Core 或其他库(只是免费库)编写拦截器(AOP)以解决交叉问题

    我想要一个像这样的属性来处理横切关注点 例如 Logging Exception public class MyService Log Interception AOP ExceptionHandler Interception AOP p
  • 如何通过 CLI 快速重命名 macOS 或 Linux 上的文件?

    这是我的源文件 e2f9eb91 645f 408a 9241 66490b61a617 file module 1 txt d20f06a8 4de1 4da0 8175 93e9b2d81c42 file module 2 txt 67
  • 在 Windows 7 上通过 VPN 使用时 Git 无响应

    这是关于通过 VPN 处理本地存储库时简单 git 命令无响应的问题 我的 Windows 用户帐户 管理员角色 是我用来登录的域帐户 我有一些从 github 源克隆的本地存储库 场景 1 在没有 VPN 的 Windows 上使用本地存
  • xamarin.forms 处理 WebView 上的 Clicked 事件

    我想处理 WebView 控件上的单击 点击事件 我已经尝试过 GestureRecognizers 但没有任何反应 我认为 WebView 可能有某种方式使事件处理为 true
  • 设置 Angular-UI Select2 多重指令的初始值

    我有一个 select2 指令 用于多个选择的国家 地区 并使用自定义查询来获取数据 Directive
  • PHP 使用 sqlsrv 一次检索多行流内容

    这是一种后续行动这个问题 https stackoverflow com questions 67998821 create file system file from file stored in microsoft sql databa
  • 检查视频是否正在流式传输

    我有这个流来自http www tpai tv live http www tpai tv live但正如您现在所看到的 它已关闭 我需要检查流是否正在播放 如果不播放其他内容 这是流式传输代码
  • MimeMessage.saveChanges 真的很慢

    由于包含以下内容 以下测试的执行时间约为 5 秒m saveChanges import org junit Before import org junit Test import javax mail MessagingException
  • 哪些 C99 功能被认为是有害的或不受支持的 [关闭]

    Closed 这个问题是基于意见的 help closed questions 目前不接受答案 我通常在 C89 中编写 C 代码 现在使用 C99 的一些功能 例如intxx t or VA ARGS or snprintf 非常有用 甚
  • PHP 的 require 和 include 有什么区别?

    我知道PHP require require一次 include和includeonce的基本用法 但我对何时应该使用它们感到困惑 示例 我有 3 个文件 例如 settings php database php 和 index php 在
  • Cocoapods 不工作 Xcode 9.2

    Cocoapods 已安装但无法工作 Xcode 9 2 我尝试了这些解决方案https github com CocoaPods CocoaPods issues 3777 https github com CocoaPods Cocoa
  • 在调试中禁用应用程序洞察

    如何在使用调试配置时自动禁用应用程序洞察并仅在发布时启用它 是否可以在不创建另一个仅用于调试的仪器密钥的情况下执行此操作 I have trackevent语句分散在代码中 将它们包含在调试预处理器检查中并不是理想的解决方案 我当前的解决方
  • 如何告诉 find 命令转义文件名中的空格字符?

    我有一个单行 find 命令 它递归地检查并打印出在特定时间范围内创建的特定文件类型的大小 所有者和名称 但在结果中 给出文件名列 直到目录或文件名中的第一个空格字符为止 有没有办法在这个单一命令中解决这个问题 而无需在 bash 中编写任
  • 在android中使用OpenGL在按钮上单击绘制形状

    作为 android OpenGL 部分的新手 我已经下载了现场给出的示例使用 OpenGL ES 显示图形 http developer android com training graphics opengl index html so
  • Chrome 中的 Javascript 执行跟踪 - 如何进行?

    我在网站上加载了约 100 200 个 JavaScript 函数 我想确定当我单击 Google Chrome 中的一项或多项时执行什么 JavaScript 函数 我如何使用 Chrome Web 开发人员工具来做到这一点 谢谢 一种简
  • 基于流的应用程序中的受控/手动错误/恢复处理

    我正在开发一个基于的应用程序Apache Flink 它利用Apache Kafka用于输入和输出 该应用程序可能会被移植到Apache Spark 所以我也将其添加为标签 问题仍然相同 我要求通过 kafka 接收的所有传入消息必须按顺序