我正在开发一个基于的应用程序Apache Flink
,它利用Apache Kafka
用于输入和输出。该应用程序可能会被移植到Apache Spark
,所以我也将其添加为标签,问题仍然相同。
我要求通过 kafka 接收的所有传入消息必须按顺序处理,并安全地存储在持久层(数据库)中,并且不得丢失任何消息。
此应用程序中的流部分相当琐碎/小,因为主要逻辑将归结为以下内容:
environment.addSource(consumer) // 1) DataStream[Option[Elem]]
.filter(_.isDefined) // 2) discard unparsable messages
.map(_.get) // 3) unwrap Option
.map(InputEvent.fromXml(_)) // 4) convert from XML to internal representation
.keyBy(_.id) // 5) assure in-order processing on logical-key level
.map(new DBFunction) // 6) database lookup, store of update and additional enrichment
.map(InputEvent.toXml(_)) // 7) convert back to XML
.addSink(producer) // 8) attach kafka producer sink
现在,在这个管道中,出现了几种错误情况could occur:
- 数据库变得不可用(关闭、表空间已满……)
- 由于逻辑错误(来自列格式),无法存储更改
- 由于代理不可用,kafka 生产者无法发送消息
可能还有其他情况。
现在我的问题是,how在这些情况下,我能否保证上述的一致性,而实际上我必须这样做:
- Stream-Operator 6) 检测到问题(数据库不可用)
- 的 DB 连接
DBFunction
必须恢复对象,这可能只会在几分钟后成功
- 这意味着整个处理必须暂停,最好是整个管道,以便传入的消息被大量加载到内存中
- 数据库恢复后恢复处理。处理必须准确地恢复到 1) 中遇到问题的消息
现在我知道至少有两种关于故障处理的工具:
- kafka消费者抵消
- apache flink 检查点
然而,在搜索文档时,我没有看到如何在单个运算符内的流处理过程中使用其中任何一个。
那么,在流应用程序中进行细粒度错误处理和恢复的推荐策略是什么?
几点:
keyBy 不会帮助确保按顺序处理。如果有的话,它可能会交错来自不同 Kafka 分区的事件(这些分区在每个分区内可能是有序的),从而在以前不存在的地方造成无序。如果不了解您打算使用多少个 FlinkKafkaConsumer 实例、每个实例将从多少个分区中消费、密钥如何跨 Kafka 分区分布以及您为什么这么想,就很难更具体地评论如何保证按顺序处理keyBy 是必要的——但是如果你设置正确,保留顺序可能是可以实现的。重新解释为KeyedStream https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/stream/experimental.html在这里可能会有帮助,但这个功能很难理解,而且正确使用也很棘手。
你可以使用 Flink 的异步函数 https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/stream/operators/asyncio.html以容错、一次性的方式管理与外部数据库的连接。
Flink 不支持系统化的细粒度恢复——它的检查点是整个分布式集群状态的全局快照,并且被设计为在恢复期间作为整体、自洽的快照使用。如果您的作业失败,通常唯一的办法是从检查点重新启动,这将涉及回滚输入队列(到检查点中存储的偏移量),重播自这些偏移量以来的事件,重新发出数据库查找(异步函数会自动完成),并使用kafka事务来实现端到端的Exactly Once语义。然而,在并行作业令人尴尬的情况下,有时可以利用细粒度恢复 https://cwiki.apache.org/confluence/display/FLINK/FLIP-1+%3A+Fine+Grained+Recovery+from+Task+Failures.
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)