Flink:处理数据早于应用程序水印的键控流

2024-02-25

我正在使用带有运动源和事件时间键控窗口的 F​​link。该应用程序将监听实时数据流、窗口(事件时间窗口)并处理每个键控流。我有另一个用例,我还需要能够支持某些关键流的旧数据的回填(这些将是事件时间

鉴于我正在使用水印,这会成为一个问题,因为 Flink 不支持每键水印。因此,任何用于回填的键控流最终都将被忽略,因为该流的事件时间将小于实时流维护的应用程序水印。

我已经解决了其他类似的问题,但无法找到可能的方法。 以下是我正在考虑的可能方法,但仍有一些悬而未决的问题。

可能的方法 - 1

(i) 保留一份申请副本,专门用于回填目的。回填工作很少发生(大约每月几次)。发送到应用程序副本的数据流将在流中具有开始和停止的指示符。我计划使用它来启动/重置水印。 开放问题?是否可以使用流中的指示器重置水印?我知道这不是最佳实践,但无法想出替代解决方案。

跟进:清除DataStream中的Flink水印状态 https://stackoverflow.com/questions/53163364/clear-flink-watermark-state-in-datastream#new-answer[未提供明确的解决方案。]

可能的方法 - 2 每个键都有并行实例,因为每个任务可能有不同的水印。 -> 不这样做,因为我将拥有 > 5k 键控流。

如果需要任何其他详细信息,请告诉我。


您可以通过在 BATCH 执行模式下运行回填作业来解决此问题。当 DataStream API 在批处理模式下运行时,输入是有界(有限)的,并且是预先已知的。这允许 Flink 按键和时间戳对输入进行排序,并且处理将根据事件时间正确进行,而不用担心水印或延迟事件。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Flink:处理数据早于应用程序水印的键控流 的相关文章

  • 如何使用openCV python从文本文档中删除水印?

    我是 OpenCV 新手 我需要帮助从该图像中删除水印 我尝试使用修复 但我想要一种更自动化的特征映射和修复方式 请帮助我 如果您的所有图像都是这样的并且具有水印 如具有浅灰色水印的问题所示 那么简单的阈值操作将起作用 import cv2
  • 如何在 ffmpeg 命令中缩放并添加正确的徽标?

    我正在尝试应用水印 并通过 ffmpeg 命令将其缩放到当前视频大小 这是我最初的无水印命令 ffmpeg v 0 vcodec h264 qsv i udp some ip 1234 fifo size 1000000 overrun n
  • Flink 日志记录限制:如何将日志记录配置传递给 Flink 作业

    我有一个 flink 作业 它使用 logback 作为日志记录框架 因为日志需要发送到logstash 而 logback 有一个 logstash 附加程序 Logstash logback appender Appender 工作正常
  • 示例 flink 作业的错误消息:无法使字段 private Final byte[] java.lang.String.value 可访问

    我正在开始使用 Apache Flink 我将发布我遇到的错误 然后复制它的步骤 这是我运行的命令 bin flink run examples streaming WordCount jar input 这是错误文本 org apache
  • ASP.NET:动态添加“水印”到图像

    我看到了有关以下方面的精彩问题和答案使用php在图像上添加水印 https stackoverflow com questions 2235152 add watermark to images with php 我也想做同样的事情 这次是
  • Apache Flink 检查点卡住

    我们正在运行一个 ListState 介于 300GB 到 400GB 之间的作业 并且有时该列表可能会增加到数千 在我们的用例中 每个项目都必须有自己的 TTL 因此我们使用 S3 上的 RocksDB 后端为此 ListState 的每
  • 是否存在一种可以抵抗图像操纵的数字图像隐写算法?

    我想知道 是否有一种针对数字图像的隐写解决方案可以抵抗图像操作 我所说的 操作 是指最标准的操作 重新压缩 JPEG 甚至完全更改文件格式 裁剪和缩放 这种方法的应用自然是为了图像版权的保护 我完全理解 图像被操纵的越多 隐写水印完好无损的
  • Apache Flink、JDBC 和 fat jar 是否存在类加载问题?

    使用 Apache Flink 1 8 并尝试运行RichAsyncFunction 我得到No Suitable Driver Found初始化 Hikari 池时出错RichAsyncFunction open 在 IDE 中它运行得很
  • 在 Flink 中,我可以在同一个槽中拥有一个算子的多个子任务吗?

    探索Apache Flink几天了 对Task Slot的概念有些疑惑 虽然有人问了几个问题 但有一点我不明白 我正在使用一个玩具应用程序进行测试 运行本地集群 我已禁用运算符链接 我从文档中知道插槽允许内存隔离而不是 CPU 隔离 阅读文
  • Flink 流顺序

    Flink 能保证流的执行顺序吗 我有两个 Kafka 主题 每个主题都有一个分区 流 1 和流 2 并使用keyBy 流由一个处理coprocess功能 在我的测试过程中 我可以看到两个流的内容并不总是按顺序执行 我可以将并行度设置为 1
  • Flink 检查点到 Google Cloud Storage

    我正在尝试为 GCS 中的 flink 作业配置检查点 如果我在本地运行测试作业 没有 docker 和任何集群设置 一切正常 但如果我使用 docker compose 或集群设置运行它并在 flink 仪表板中使用作业部署 fat ja
  • 基于流的应用程序中的受控/手动错误/恢复处理

    我正在开发一个基于的应用程序Apache Flink 它利用Apache Kafka用于输入和输出 该应用程序可能会被移植到Apache Spark 所以我也将其添加为标签 问题仍然相同 我要求通过 kafka 接收的所有传入消息必须按顺序
  • 我可以将 flink RocksDB 状态后端与本地文件系统一起使用吗?

    我正在探索使用 FlinkrocksDb 状态后端 文档似乎暗示我可以使用常规文件系统 例如 file data flink checkpoints 但代码 javadoc 仅在此处提到 hdfs 或 s3 选项 我想知道是否可以将本地文件
  • Flink中为什么DataStream不支持聚合

    我是 Flink 的新手 有时 我想在 DataStream 上进行聚合 而不需要先执行 keyBy 为什么 Flink 不支持 DataStream 上的聚合 sum min max 等 谢谢你 艾哈迈德 Flink 支持非 keyed
  • 当我重新运行 Flink 消费者时,Kafka 再次消费最新消息

    我在用 Scala 编写的 Apache Flink API 中创建了一个 Kafka 消费者 每当我从某个主题传递一些消息时 它就会及时接收它们 但是 当我重新启动使用者时 它不会接收新的或未使用的消息 而是使用发送到该主题的最新消息 这
  • Apache Flink 中的并行度

    我可以为 Flink 程序中任务的不同部分设置不同的并行度吗 例如 Flink 如何解释以下示例代码 两个自定义实践者MyPartitioner1 MyPartitioner2 将输入数据划分为两个4和2个分区 partitionedDat
  • 我想使用 Flink 的 Streaming File Sink 写入 ORC 文件,但它无法正确写入文件

    我正在从 Kafka 读取数据并尝试将其以 ORC 格式写入 HDFS 文件系统 我使用了他们官方网站上的以下链接参考 但我可以看到Flink为所有数据写入完全相同的内容并生成这么多文件并且所有文件都可以103KB https ci apa
  • 带回形针、导轨的水印图像 4

    我一直在尝试按照中列出的答案向我的图像添加水印带有回形针的水印 https stackoverflow com questions 13517757 watermark with paperclip 水印 rb module Papercl
  • Flink任务管理器内存不足和内存配置

    我们使用 Flink 流在单个集群上运行一些作业 我们的工作是使用rocksDB 来保存状态 该集群配置为在 3 个独立的 VM 上使用单个 Jobmanager 和 3 个 Taskmanager 运行 每个 TM 均配置为运行 14GB
  • Flink:Jobmanager UI 中设置的并行度与任务槽有何关系?

    假设我有 8 个任务管理器和 16 个任务槽 如果我使用 Jobmanager UI 提交作业并将并行度设置为 8 我是否只使用 8 个任务槽 如果我有 8 个具有 8 个槽位的任务管理器 并以并行度 8 提交相同的作业 该怎么办 是完全一

随机推荐

  • 激活当前的 QNetworkInterface 并连接到互联网

    我想让当前的网络接口处于活动状态并连接到互联网 实际上 我可以检查网络是否正常以及是否不是环回网络 foreach QNetworkInterface interface QNetworkInterface allInterfaces if
  • Rails - 以可移植的方式创建临时文件

    我的 Rails 应用程序在 Ubuntu 服务器计算机上运行 我需要创建临时文件 以便将它们 馈送到 第二个独立应用程序 我将为此使用 rake 任务 以防需要此信息 我的问题是 在 Rails 应用程序上创建临时字段的最佳方法是什么 因
  • npm install:有没有办法忽略 package.json 中的特定依赖项

    我目前正在尝试为包含本地依赖项的node js 项目创建一个docker 容器 这似乎会导致 docker 出现问题 因此作为解决方法 我尝试仅复制本地依赖项文件夹并忽略 package json 文件中的依赖项条目 有没有办法指定我想忽略
  • 如何合并对象数组?

    假设我有一系列文章 每篇文章可能有也可能没有超过 1 个图像对象 现在由于 mysql 无法将对象分组在一起 所以你必须自己做 所以结果是你得到near重复的文章对象 唯一的区别是图像对象 By near重复我的意思是返回结果的唯一区别是图
  • 如何使用 jest 模拟 window.navigator.language

    我试图嘲笑window navigator language我的玩笑单元测试中浏览器中的属性 以便我可以测试页面上的内容是否使用正确的语言 我在网上发现有人使用这个 Object defineProperty window navigato
  • 自定义异常类型

    我可以在 JavaScript 中为用户定义的异常定义自定义类型吗 如果是这样 我该怎么做 From 网络参考 http webreference com programming javascript rg32 index 2 html t
  • .htaccess 将图像文件重写为php脚本

    这是我现在拥有的 htaccess这应该在未来有效 RewriteEngine On RewriteCond HTTPS on RewriteRule https SERVER NAME 1 R L RewriteCond REQUEST
  • 如何在 iOS 之外创建 Apple Music 用户令牌

    我试图让人们通过网页喜欢 Apple Music 上的曲目 专辑或播放列表 我理解此页上的手册 https developer apple com library content documentation NetworkingIntern
  • Flask中的链式下拉菜单,从sqlite数据库获取数据

    我正在尝试获取不同县的 html 选择标签 从数据库获取 当用户选择一个县时 我想要另一个选择标签来启用并显示该县的城市 我在 sqlite 数据库中有数据 其中县 ID 位于城市数据库中 我在 Pycharm 中使用 python 和 F
  • 适用于 Windows 的便携式 Ruby on Rails 环境

    有人问了同样的问题 https stackoverflow com questions 258801 portable ruby on rails environment大约两年前 当时的答案是InstantRails 但 InstantR
  • 如何在 Objective-C 中测试相等性?

    比较顶视图时 第一个 无法执行 相等测试失败 In init i self setCurrentPuzzleView p1 后来 我 if self currentPuzzleView p1 NSLog Removing P1 from S
  • Google Cloud ML Engine 错误 429 内存不足

    我将模型上传到 ML engine 当尝试进行预测时 我收到以下错误 ERROR gcloud ml engine predict HTTP request failed Response error code 429 message Pr
  • 将数组的内容打印为字符串

    我正在尝试编写一个小函数 它接受整数数据并将其转换为 char 数组内的十六进制值 我想做的是 取int数组数据 将其转换为另一个数组中的十六进制 将十六进制数组的内容转换为字符串 打印字符串的内容 int main int data 40
  • phpmailer如何向两个不同的用户发送两条不同的消息

    我正在向两个不同的人发送邮件 两封不同的消息 一封发送给用户 一封发送给管理员 message1 hello user message2 hello admin email email protected cdn cgi l email p
  • Clojure 测试框架的优势?

    您更喜欢哪一个 为什么 各自的优点和缺点是什么 在什么情况下 每个人都比其他人更胜一筹 我对 midje 与 clojure test 特别感兴趣 但也可以随意提出其他 Clojure 测试框架 也可以看看Clojure 的最佳单元测试框架
  • 在静态方法中在 UIView 上用清晰的颜色绘制(切一个洞)

    我有一个iPhone应用程序 我需要实现以下方法 UITextView textView UITextView withCuttedRect CGRect r 此方法必须剪切 填充 UIColor clearColor 矩形r from U
  • 在同一套接字上并行调用 send/recv 是否有效?

    我们可以在同一个套接字上从一个线程调用 send 并从另一个线程调用 receive 吗 我们可以从同一套接字上的不同线程并行调用多个发送吗 我知道好的设计应该避免这种情况 但我不清楚这些系统 API 的行为方式 我也找不到同样的好的文档
  • 表单不提交动态生成的输入(jQuery)

    您好 我正在尝试为我的表单动态生成一些输入 但它没有发布生成的新输入 到目前为止我一直在四处搜索 我唯一能找到的就是使表单成为body 标签 并且在我的应用程序的设计中这是不可能的 所以有人可能知道会发生什么以及如何解决它 不 它不适用于
  • 如何在 Visual Studio Code 中显示 Jupyter Notebook 中的所有输出?

    在 VS Code 中的 Jupyter Notebook 中 当我运行在某个时刻打印大量输出的代码时 剩余的输出将被抑制并显示一条消息 显示更多 在文本编辑器中打开原始输出数据 如何使所有输出可见 我认为你在这里使用内部构建是正确的设置
  • Flink:处理数据早于应用程序水印的键控流

    我正在使用带有运动源和事件时间键控窗口的 F link 该应用程序将监听实时数据流 窗口 事件时间窗口 并处理每个键控流 我有另一个用例 我还需要能够支持某些关键流的旧数据的回填 这些将是事件时间 鉴于我正在使用水印 这会成为一个问题 因为