如何统计Apache Flink在给定时间窗口内处理的记录数

2023-12-20

在flink中定义一个时间窗口后如下:

val lines = socket.timeWindowAll(Time.seconds(5))

如何计算该特定 5 秒窗口内的记录数?


执行计数聚合的最有效方法是ReduceFunction。然而,reduce有输入和输出类型必须相同的限制。所以你必须将输入转换为Int在应用窗口之前:

val socket: DataStream[(String)] = ???

val cnts: DataStream[Int] = socket
  .map(_ => 1)                    // convert to 1
  .timeWindowAll(Time.seconds(5)) // group into 5 second windows
  .reduce( (x, y) => x + y)       // sum 1s to count
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如何统计Apache Flink在给定时间窗口内处理的记录数 的相关文章

  • SingleOutputStreamOperator#returns(TypeHint typeHint) 方法的 javadoc

    我正在阅读源代码SingleOutputStreamOperator returns 它的javadoc是 Adds a type information hint about the return type of this operato
  • Apache Flink:设置并行度的指南?

    我正在尝试获取一些简单的规则或指南来设置哪些值 操作员或工作 并行性 在我看来 它应该是一个数字 例如 假设我有 2 台任务管理器机器 每台都有 4 个任务槽 假设集群上没有运行其他作业 我会设置并行度吗 用于操作 喜欢过滤并映射到 8 如
  • Flink 中复杂拓扑(多输入)的集成测试

    我需要为 flink 流拓扑编写单元测试 这基本上是一个CoFlatMapFunction 并且它有 2 个输入 我尝试从这个页面中获得一些灵感 https ci apache org projects flink flink docs s
  • flink集群启动错误[ERROR]无法正确获取JVM参数

    bin start cluster sh Starting cluster INFO 1 instance s of standalonesession are already running on centos1 Starting sta
  • 为什么 Flink SQL 对所有表使用 100 行的基数估计?

    我不确定为什么逻辑计划没有被正确评估这个例子 https stackoverflow com questions 53601410 apache flink enable join ordering 53981000 53981000 我更
  • Flink 中的水印和触发器有什么区别?

    我读到 排序运算符必须缓冲它接收到的所有元素 然后 当它接收到水印时 它可以对时间戳低于水印的所有元素进行排序 并按排序顺序发出它们 这是正确 因为水印表明不能有更多元素到达并与已排序元素混合 https cwiki apache org
  • Flink:处理数据早于应用程序水印的键控流

    我正在使用带有运动源和事件时间键控窗口的 F link 该应用程序将监听实时数据流 窗口 事件时间窗口 并处理每个键控流 我有另一个用例 我还需要能够支持某些关键流的旧数据的回填 这些将是事件时间 鉴于我正在使用水印 这会成为一个问题 因为
  • Apache Beam 计数器/指标在 Flink WebUI 中不可用

    我正在使用 Flink 1 4 1 和 Beam 2 3 0 并且想知道是否可以在 Flink WebUI 或任何地方 中提供可用的指标 如 Dataflow WebUI 中那样 我用过类似的计数器 import org apache be
  • Apache Flink AWS S3 Sink 是否需要 Hadoop 进行本地测试?

    我对 Apache Flink 比较陌生 我正在尝试创建一个简单的项目 将文件生成到 AWS S3 存储桶 根据文档 我似乎需要安装 Hadoop 才能执行此操作 如何设置本地环境来测试此功能 我在本地安装了 Apache Flink 和
  • Flink TaskManager 超时?

    我正在运行 Flink 应用程序 通过 Yarn 似乎有时任务管理器会随机超时 这是错误 java util concurrent TimeoutException Heartbeat of TaskManager with id some
  • Flink CEP:对于不同类型的事件,使用哪种方法加入数据流?

    假设我有两种不同类型的数据流 一种提供天气数据 另一种提供车辆数据 我想使用 Flink 对数据进行复杂的事件处理 Flink 1 3 x 中哪种方法是正确的使用方法 我看到了不同的方法 如 Union Connect Window Joi
  • 在 Flink 中,我可以在同一个槽中拥有一个算子的多个子任务吗?

    探索Apache Flink几天了 对Task Slot的概念有些疑惑 虽然有人问了几个问题 但有一点我不明白 我正在使用一个玩具应用程序进行测试 运行本地集群 我已禁用运算符链接 我从文档中知道插槽允许内存隔离而不是 CPU 隔离 阅读文
  • Apache Flink 上的 zipWithIndex

    我想为我的输入的每一行分配一个id 这应该是一个数字0 to N 1 where N是输入中的行数 粗略地说 我希望能够执行以下操作 val data sc textFile textFilePath numPartitions val r
  • flink - 使用匕首注入 - 不可序列化?

    我使用 Flink 最新通过 git 从 kafka 流式传输到 cassandra 为了简化单元测试 我通过 Dagger 添加依赖注入 ObjectGraph 似乎已正确设置自身 但 内部对象 被 Flink 标记为 不可序列化 如果我
  • Cassandra Pojo Sink Flink 中的动态表名称

    我是 Apache Flink 的新手 我正在使用 Pojo Sink 将数据加载到 Cassandra 中 现在 我在以下命令的帮助下指定表和键空间名称 Table注解 现在 我想在运行时动态传递表名称和键空间名称 以便可以将数据加载到用
  • Flink中为什么DataStream不支持聚合

    我是 Flink 的新手 有时 我想在 DataStream 上进行聚合 而不需要先执行 keyBy 为什么 Flink 不支持 DataStream 上的聚合 sum min max 等 谢谢你 艾哈迈德 Flink 支持非 keyed
  • Flink从hdfs读取数据

    我是 Flink 的新生 我想知道如何从 hdfs 读取数据 有人可以给我一些建议或一些简单的例子吗 谢谢你们 如果您的文件采用文本文件格式 则可以使用 ExecutionEnvironment 对象中的 readTextFile 方法 这
  • 我想使用 Flink 的 Streaming File Sink 写入 ORC 文件,但它无法正确写入文件

    我正在从 Kafka 读取数据并尝试将其以 ORC 格式写入 HDFS 文件系统 我使用了他们官方网站上的以下链接参考 但我可以看到Flink为所有数据写入完全相同的内容并生成这么多文件并且所有文件都可以103KB https ci apa
  • Flink任务管理器内存不足和内存配置

    我们使用 Flink 流在单个集群上运行一些作业 我们的工作是使用rocksDB 来保存状态 该集群配置为在 3 个独立的 VM 上使用单个 Jobmanager 和 3 个 Taskmanager 运行 每个 TM 均配置为运行 14GB
  • Flink:Jobmanager UI 中设置的并行度与任务槽有何关系?

    假设我有 8 个任务管理器和 16 个任务槽 如果我使用 Jobmanager UI 提交作业并将并行度设置为 8 我是否只使用 8 个任务槽 如果我有 8 个具有 8 个槽位的任务管理器 并以并行度 8 提交相同的作业 该怎么办 是完全一

随机推荐

  • 简单的 UIView drawRect 没有被调用

    我不明白这里出了什么问题 我有一个非常简单的 UIViewController 和一个非常简单的 viewDidLoad 方法 void viewDidLoad NSLog making game view GameView v GameV
  • Python-获取目录中所有文件和子文件夹的相对路径

    我正在寻找一种获取特定文件夹内文件和 子 文件夹的相对路径的好方法 对于我目前使用的方法os walk 它正在工作 但对我来说似乎并不 Pythonic myFolder myfolder fileSet set yes I need a
  • 手动设置Session过期时间-CodeIgniter

    如何在 codeigniter 中动态设置会话过期时间 例如 如果用户登录并具有以下角色 admin 过期时间应该比没有权限的用户登录时要长admin role Thanks 您可以通过在配置文件中增加此变量来更新会话过期时间 config
  • 我应该对不透明对象使用整数 ID 还是指针?

    我正在一些图形 API DirectX9 和 DirectX11 之上编写一个抽象层 我想听听您的意见 传统上 我会为每个我想要抽象的概念创建一个基类 因此 在典型的 OO 方式中 我将拥有一个 Shader 类和 2 个子类 DX9Sha
  • Django 动态模型字段

    我正在研究一个多租户应用程序中 一些用户可以定义自己的数据字段 通过管理员 以收集表单中的附加数据并报告数据 后一点使得 JSONField 不是一个很好的选择 所以我有以下解决方案 class CustomDataField models
  • 如何使 AWS EC2 上的 Tomcat 从外部本地主机可用

    我正在尝试在 AWS Linux 服务器上运行 Tomcat 我已经安装了 Tomcat 并从命令行测试了它以确保它正常工作 但我无法从另一台计算机访问它 细节 该实例在安全组中开放了用于 HTTP 的 80 传入端口 我已经通过远程登录到
  • 重写解析表达式语法(PEG),无需左递归

    Using https github com JetBrains Grammar Kit https github com JetBrains Grammar Kit如何在没有左递归的情况下重写语法 grammar exprs exprs
  • Bison 语义谓词语法错误,杂散“#”

    我正在尝试使用 Bison 的语义谓词 https www gnu org software bison manual html node Semantic Predicates html Semantic Predicates功能 但我在
  • Google 云消息服务器的 IP 地址

    我即将在服务器上部署 GCM 的实现 并且需要通过 IP 打开适当的防火墙 有谁知道在哪里可以找到 android googleapis com 的 IP 地址范围 Thanks 我发现 android googleapis com 的 I
  • 无法在 Windows 10 上使用 Docker Toolbox 共享/挂载卷

    我正在尝试使用 docker 设置我的项目 我在 Windows 10 家庭版上使用 Docker Toolbox 我对码头工人很陌生 据我了解 我必须将文件复制到新容器并添加一个卷 以便我可以保留 gulp 所做的更改 这是我的文件夹结构
  • Mongodb count 与 findone

    我的问题是 有一组用户 我试图找到 用户是否使用 id xxx has somevalue gt 5 我想知道 使用什么会更快find count gt 0 or findOne null 或者也许还有其他更快 更好的方法 查询时间之间的差
  • 使用 guice 构建带有注入类的框架,初始化的正确方法是什么?

    我正在尝试编写一个框架 其中任意 bean 类都通过我的 API 中的类注入 并且它们可以与这两个类交互 也可以根据定义的注释触发回调 这是一个示例 bean Experiment static class TestExperiment p
  • Android NDK:您确定您的 NDK_MODULE_PATH 变量已正确定义吗?

    最近 3天前 开始学习Android Studio 我购买了一个 Eclipse 游戏项目来玩 但出现错误 当我修复该错误时 我收到一个新错误 目前的错误如下 构建命令失败 执行过程时出错 C Users user AppData Loca
  • 不考虑回到起点的旅行商问题(TSP)的问题名称是什么?

    我想知道 TSP 的问题名称是什么 不考虑返回起点的方式 以及解决这个问题的算法是什么 我研究了最短路径问题 但这不是我想要的 问题只是从 2 个指定点找到最短路径 但我要寻找的是我们给出n个点并且只输入1个起点的问题 然后 找到经过所有点
  • 在应用程序和扩展程序之间共享捆绑资源

    我的照片共享扩展计划使用相同的设计资源 用于导航和向照片添加 图章 贴纸 如应用程序沙盒设计指南中所述 沙盒应用程序组 需要共享文件和其他信息的可以请求容器 目录作为其权利的一部分 这些目录是存放的 在 Library Group Cont
  • 如何获取 XGBClassifier 的预测 p 值?

    我想知道 XGBClassifier 对它所做的每个预测的置信度如何 有可能有这样的价值吗 或者 predict proba 是否已经间接成为模型的置信度 你的直觉确实是正确的 predict proba返回每个示例属于给定类别的概率 来自
  • 读取注册表项的性能?

    我想知道通过标准 C 库从 Windows 注册表读取注册表值需要多长时间 以毫秒为单位 在这种情况下 我正在阅读一些代理设置 我应该期望什么数量级的值 有没有好的基准数据可用 我正在运行 WS2k8 R2 amd64 加分点 操作系统 s
  • Django REST框架范围过滤器

    如何在 Django REST Framework 中对日期和数字进行范围过滤 其他过滤器 lt gt 等 工作正常 我尝试了很多变体 例如 import rest framework filters as filters class Or
  • 如何在 PHP 中将查询字符串转换为斜杠 URL?

    我想将 URL 转换为 http localhost projectname api index php type login to http localhost projectname api login Convert在这里不是一个常用
  • 如何统计Apache Flink在给定时间窗口内处理的记录数

    在flink中定义一个时间窗口后如下 val lines socket timeWindowAll Time seconds 5 如何计算该特定 5 秒窗口内的记录数 执行计数聚合的最有效方法是ReduceFunction 然而 reduc