Apache Flink:如何在摄取时间模式下获取事件的时间戳?

2023-12-11

我想知道是否可以使用Flink的摄取时间模式来获取记录的时间戳。考虑以下 flink 代码示例(https://github.com/apache/flink/blob/master/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/join/WindowJoinSampleData.scala),

val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)

val grades = WindowJoinSampleData.getGradeSource(env, rate)
val salaries = WindowJoinSampleData.getSalarySource(env, rate)

val joined = joinStreams(grades, salaries, windowSize)

...
case class Grade(name: String, level: Int) 
case class Salary(name: String, salary: Int)

默认情况下,等级和工资均不包含时间戳字段。然而,由于Flink允许使用“ingestionTime”将挂钟时间戳分配给数据流中的记录,那么是否可以在运行时获取这样的时间戳?例如,这就是我正在尝试做的事情:

val oldDatastream = env.addSource...  // Using ingestion time
val newDatastream = oldDatastream.map{record =>   
    val ts = getRecordTimestamp(record)
    // do some thing with ts
    }

谢谢你的帮助。


Use ProcessFunction这给你一个Context,您可以使用它来获取元素的时间戳(无论是摄取时间、处理时间还是事件时间)。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Apache Flink:如何在摄取时间模式下获取事件的时间戳? 的相关文章

  • 计算流数据的直方图 - 在线直方图计算

    我正在寻找一种算法来生成大量流数据的直方图 最大值和最小值事先未知 但标准差和平均值在特定范围内 我很欣赏你的想法 Cheers 我刚刚找到了一个解决方案 秒 从流式并行决策树算法构建在线直方图 论文的 2 2 该算法由 Hive 项目中的
  • node.js - 将两个可读流写入同一个可写流

    我想知道如果您同时将两个不同的读取流传输到同一目的地 node js 会如何操作 例如 var a fs createReadStream a var b fs createReadStream b var c fs createWrite
  • nginx server_name 在流块内可能吗?

    目前设置如下 stream server listen 9987 udp server name subdomain EXAMPLE com this line is resulting in an error proxy pass loc
  • 是否可以像 html 视频元素一样将流作为源添加到 html canvas 元素?

    根据MDN The HTMLMediaElement https developer mozilla org en US docs Web API HTMLMediaElement界面添加到HTMLElement属性 支持基本媒体相关功能所
  • .Net StreamWriter.BaseStream,这个定义是什么意思? “获取与后备存储接口的底层流。”

    我正在读关于StreamWriter今天 偶然发现了这个楼盘 BaseStream 我正在寻找定义并发现了这个 获取与后备存储接口的底层流 从这里MSDN StreamWriter BaseStream http msdn microsof
  • Apache Flink 动态设置 JVM_OPT env.java.opts

    是否可以设置自定义 JVM 选项env java opts提交作业时未在作业中指定conf flink conf yaml file 我问的原因是我想在 log4j 中使用一些自定义变量 我也在 YARN 上运行我的工作 我已经使用 CLI
  • 如何使用 ffmpeg 设置默认流

    我有一些 m4v 文件 我想用 ffmpeg 添加字幕 我知道我需要映射流以将它们放入输出文件中 但如何确保此字幕流将是默认流 字幕是 srt 人们似乎说它们与 mp4 容器不兼容 我需要先将字幕转换为什么 另外 各种流的顺序重要吗 视频流
  • 在 Windows 上以 QML 播放 RTSP 视频

    我正在尝试将 QML 中的 RTSP 流播放到视频标签中 如下所示 Repeater model 8 Video Layout fillWidth true Layout fillHeight true fillMode VideoOutp
  • 使用 node.js 上的 knox 将八位字节流从请求流式传输到 S3

    我正在尝试使用以下命令将八位字节流直接流式传输到 S3knox https github com LearnBoost knox 在 Node js 上 octet stream 是从浏览器上传的 XHR 文件 我以为我可以将请求流式传输到
  • 如何使用 Spring Boot 传输音频

    我想让用户能够播放声音 我的实现在 Firefox 上运行良好 在 Safari 上 不播放声音 我验证了音频控制可以在 Safari 中与其他网站一起使用 所以 我认为我必须更改控制器中的某些内容 控制器 RequestMapping v
  • 在 C# 中使用流读取大文本文件

    我有一项可爱的任务 就是研究如何处理加载到我们应用程序的脚本编辑器中的大文件 就像VBA http en wikipedia org wiki Visual Basic for Applications用于我们内部产品的快速宏 大多数文件约
  • 如何在 Java 中读取/转换 InputStream 为字符串?

    如果你有一个java io InputStream对象 您应该如何处理该对象并生成一个String 假设我有一个InputStream包含文本数据 我想将其转换为String 例如我可以将其写入日志文件 最简单的方法是什么InputStre
  • Flink - 无法从检查点恢复

    我使用一个作业管理器和两个任务管理器在 kubernetes 上运行集群 我通过在作业运行时杀死一个任务管理器 Pod 来测试检查点机制 我在作业管理器和重新启动的任务管理器上遇到以下异常 工作经理例外 java lang Exceptio
  • 流多播 - 读取一次流,但以不同的方式处理它,并使用最少的缓冲

    为了可扩展性和节省资源 最好避免将整个输入流读入内存 而是尝试将其作为流处理 一次读取小块 当您想要对数据执行一件事 例如从 Web 请求中读取数据并将其保存到文件中 时 这在 NET 中很容易实现 简单的例子 input CopyTo o
  • Flink Kafka - 如何使应用程序并行运行?

    我正在 Flink 中创建一个应用程序 读取某个主题的消息 对其进行一些简单的处理 将结果写入不同的主题 我的代码确实有效 然而它不并行运行我怎么做 看来我的代码只在一个线程 块上运行 在 Flink Web 仪表板上 应用程序进入运行状态
  • Flink 在 Kubernetes 上的部署和 Native Kubernetes 有什么不同

    黑白的主要区别是什么原生 Kubernetes https ci apache org projects flink flink docs stable ops deployment native kubernetes html and 库
  • HttpRequest PUT内容到poco库中

    我想使用 HTTP PUT 请求将一些数据从 C 应用程序发送到服务器 我在用poco http pocoproject org我的应用程序中的网络库 我正在使用这个代码片段 HTTPClientSession session uri ge
  • 在 C# 中使用(IDisposable obj = new ...) 在流中写入代码块(例如 XML)

    我已经开始使用实现 IDisposable 的类通过 using 语句在流中写入块 这有助于保持正确的嵌套并避免丢失或错误放置开始 结束部件 基本上 构造函数写入块的开头 例如打开 XML 标签 Dispose 写入结束 例如关闭 XML
  • 这是 Box API v2 获取事件时的错误吗

    使用 BOX API v2 从 Box 获取事件时 我发现了一个奇怪的行为 我得到的场景如下 使用 box API v1 将 14 个文件上传到 box net 使用stream position now 获取事件 这给出了流位置 例如12
  • Streamjs和linqjs有什么关系

    读完SICP后 我最近发现streamjs https github com dionyziz stream js 开发商参考linqjs http linqjs codeplex com 作为具有不同语法的替代实现 但我无法建立连接 St

随机推荐

  • 文本视图行 - 建议

  • 如何更改控制器中的 $model->attributes 值 - Yii

    用户主控制器代码 public function actionUpdate id model this gt loadModel id if isset POST UserMaster model gt attributes POST Us
  • arm-linux-androideabi-g++:-fuse-linker-plugin,但找不到 liblto_plugin.so

    我在ubuntu 12 04下编译Chrome V8时遇到一个问题是 arm linux androideabi g 致命错误 fuse linker plugin 但找不到 liblto plugin so ndk版本是r8b 我怎么解决
  • 了解使用 Photoshop 生成的 24 位 PNG

    具有透明度的 24 位 png 文件 可以使用以下命令生成Photoshop 真的有 24 位分布在每种颜色加上 alpha 上吗 或者 24 位仅指颜色并忽略 alpha RGBA 8888 有没有工具可以检查 PNG 文件并验证此类信息
  • 具有多个图像的 Pod

    创建一个名为 xyz 的 pod 其中包含一个容器 用于在其中运行以下每个映像 指定的映像可能在 1 到 4 个之间 nginx redis Memcached consul 问题不太清楚 但假设您希望一个 Pod 具有多个容器 下面是可以
  • 错误:结果不是以下位置的数据框:

    我正在尝试在相当大的数据框上运行拟合函数 该数据框由名为的变量分组 big group and small group 特别是 我试图获得每个的预测和 coefs 值small group代替big group 也就是说 我试图将这些新列添
  • 有没有什么好的方法来加密C#桌面应用程序[重复]

    这个问题在这里已经有答案了 可能的重复 保护 NET 代码免遭逆向工程 我们只是用C winforms开发一个应用程序 有什么好的加密方法可以帮助我们防止盗版吗 我看到有些软件可能需要硬件支持来保护其软件 如何实现 提前致谢 好吧 你在这里
  • 不读取模型[关闭]

    Closed 这个问题需要调试细节 目前不接受答案 我正在用Python编写一个程序 我想连接GPT4ALL 以便该程序像GPT聊天一样工作 仅在我的编程环境中本地运行 为此 我已经安装了 GPT4All 13B snoozy ggmlv3
  • 在 javascript 警报中编写 php

    我用以下方式在JS中编写PHP alert echo Error login 关联一个xml 用symfony翻译成两种语言 但现在不起作用 我该如何解决 您缺少引号alert call alert
  • Ruby on Rails - 将模型中的字段添加到另一个模型的表单上

    我有两个型号Contract and Addendum 合同has many addendums和附录belongs to contract 创建新合同时 将自动创建新的附录 但需要一些额外的元素来创建新的附录 如何添加字段value 这是
  • Pandas 中的顺序组内枚举

    假设我有以下数据框 date A B C D 0 2014 03 20 1 561714 0 979202 0 454935 0 629215 1 2014 03 20 0 390851 0 045697 1 683257 0 771027
  • 将引用(工具>引用)与 VBA 代码(宏)连接

    我想使用 VBA 代码以编程方式将一些引用连接到我的 VBA 项目 即无需使用 工具 gt 引用 手动设置引用 这可能吗 例如 Microsoft Office 12 0 对象库 您没有提到 Office 应用程序 在 MS Access
  • 使用 malloc 时出错

    I pass char input from main to processInExp 函数 然后我再次传递它processInExp 功能为getInput 函数在读取文件时动态分配它 Inside getInput 功能input检查时
  • 为什么 ASP.NET 网站没有 Designer.cs?

    如果我们在 Visual Studio 中创建 ASP NET Web 应用程序 我们可以看到每个 aspx 文件都会有一个关联的自动生成的 aspx designer cs 文件 但对于 ASP NET 网站 每个 aspx 文件只有一个
  • 如何屏蔽具有 RepeatVector() 层的 LSTM 自动编码器中的输入?

    我一直在尝试使用 LSTM 自动编码器获取向量序列的向量表示 以便我可以使用 SVM 或其他此类监督算法对序列进行分类 数据量阻止我使用完全连接的密集层进行分类 我的输入的最短大小是 7 个时间步长 最长的序列是 356 个时间步长 因此
  • _vscprintf 在 Android 上等效吗?

    vscprintf在 Android 上不可用 还vsprintf NULL fmt ap 不起作用 产生段错误 因此似乎无法计算 vsnprintf 成功所需的缓冲区大小 Android sources表明 android log pri
  • “304 Not Modified”到底是如何工作的?

    304 Not Modified 响应是如何生成的 浏览器如何判断HTTP请求的响应是否为304 是浏览器设置的还是服务器发送的 如果由服务器发送 服务器如何知道缓存中可用的数据 以及如何将304设置为图像 我的猜测 如果它是由浏览器生成的
  • Git pre-commit hook:使用 -a 标志提交时如何获取添加/修改的文件

    当我使用 git commit a 提交我的工作时 预提交挂钩中的 git diff diff filter ACM name only cached 无法获取 git 将添加的文件 那么对于这种情况 正确的解决方案是什么 这里的问题是gi
  • 如何禁用/启用对话框负积极按钮?

    请查看下面的自定义对话框 我在对话框上有一个编辑文本字段 如果文本字段为空 我想禁用positiveButton 我可以为文本字段获取 charListener 但我不确定如何设置positivebutton禁用或启用该侦听器 正负按钮的参
  • Apache Flink:如何在摄取时间模式下获取事件的时间戳?

    我想知道是否可以使用Flink的摄取时间模式来获取记录的时间戳 考虑以下 flink 代码示例 https github com apache flink blob master flink examples flink examples