Flink学习20:算子介绍reduce

2023-11-06

1.reduce简介

按照指定的方式,把每个元素进行累计执行。比如实现累加计算

 

示例:

import keyByNameTest.StockPrice
import org.apache.flink.api.scala.createTypeInformation
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment

object reduceTest {

  //defined the dataSource's data type
  case class StockPrice(stockId:String, timestamp: Long, price:Double)

  def main(args: Array[String]): Unit = {

    //create env
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    //create ds

    val pricesList = List(StockPrice("stock1", 10, 1), StockPrice("stock1", 11, 2), StockPrice("stock2", 10, 666), StockPrice("stock3", 10, 888.23))

    val ds = env.fromCollection(pricesList)

    //transformation

//update the stock's new time, and accumulate the price
    val reducedDs = ds.keyBy(0).reduce((t1, t2) => StockPrice(t1.stockId, t2.timestamp, t1.price + t2.price))

    reducedDs.print()

    env.execute()

  }

}


输出结果:

自定义reduce func

核心步骤:

1.继承 ReduceFunction 类

2.重写reduce 方法

示例:

import org.apache.flink.api.common.functions.ReduceFunction
import org.apache.flink.api.scala.createTypeInformation
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment


object myReduceTest {


  //defined the dataSource's data type
  case class StockPrice(stockId:String, timestamp: Long, price:Double)

  //define my reduce func
  update the stock's new time, and accumulate the price
  class MyReduceFunc extends ReduceFunction[StockPrice] {
    override def reduce(t: StockPrice, t1: StockPrice): StockPrice = {

      //update the stock's new time, and accumulate the price
      StockPrice(t.stockId, t1.timestamp, t.price + t1.price)

    }
  }

  def main(args: Array[String]): Unit = {

    //create env
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    //create ds
    val pricesList = List(StockPrice("stock1", 10, 1), StockPrice("stock1", 11, 2), StockPrice("stock2", 10, 666), StockPrice("stock3", 10, 888.23))

    val ds = env.fromCollection(pricesList)

    //transformation
    val keyByedDs = ds.keyBy(0)

      //use my reduce func
    val myReducedDs = keyByedDs.reduce(new MyReduceFunc)

    myReducedDs.print()

    env.execute()
  }



}

输出结果:

 

 

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Flink学习20:算子介绍reduce 的相关文章

  • 我当年自学黑客(网络安全)的一些心得!(内附学习笔记)

    前 言 写这篇教程的初衷是很多朋友都想了解如何入门 转行网络安全 实现自己的 黑客梦 文章的宗旨是 1 指出一些自学的误区 2 提供客观可行的学习表 3 推荐我认为适合小白学习的资源 大佬绕道哈 文末有福利 一 自学网络安全学习的误区和陷阱
  • 网络安全从入门到精通(超详细)学习路线

    首先看一下学网络安全有什么好处 1 可以学习计算机方面的知识 在正式学习网络安全之前是一定要学习计算机基础知识的 只要把网络安全认真的学透了 那么计算机基础知识是没有任何问题的 操作系统 网络架构 网站容器 数据库 前端后端等等 可以说不想
  • API接口:技术、应用与实践

    随着数字化时代的到来 API接口在软件开发和数据交互中扮演着越来越重要的角色 本文深入探讨了API接口的基本概念 技术原理 设计方法 最佳实践以及在各行业的应用案例 关键词 API接口 软件开发 数据交互 技术原理 设计方法 一 引言 随着
  • The Planets:Venus

    靶场下载 The Planets Venus VulnHub 信息收集 arp scan l Interface eth0 type EN10MB MAC 00 0c 29 43 7c b1 IPv4 192 168 1 60 Starti
  • 闵氏距离在文本检索中的应用

    1 背景介绍 文本检索是现代信息处理系统中不可或缺的一部分 它的主要目标是根据用户的查询需求 从海量的文本数据中找出与查询最相关的文档 随着互联网的普及 文本数据的规模不断膨胀 这导致了传统的文本检索方法面临着巨大的挑战 为了解决这些问题
  • 慢思维大脑:SOP流程的心理学背景

    1 背景介绍 慢思维大脑 SOP流程的心理学背景 慢思维是指人类大脑在处理复杂问题 做出重要决策时所采用的思考方式 它与快速 自动的快思维相对 主要通过以下几种方式表现 深入思考 慢思维会让人类大脑深入思考问题的本质 从而找出更深层次的解决
  • 线性代数在深度学习中的角色

    1 背景介绍 深度学习是一种人工智能技术 它主要通过神经网络来学习和模拟人类大脑的思维过程 线性代数是一门数学分支 它研究的是向量和矩阵的运算 在深度学习中 线性代数起着非常重要的作用 因为它为神经网络提供了数学模型和计算方法 在这篇文章中
  • 线性代数在数据挖掘中的应用

    1 背景介绍 线性代数是数学的一个分支 主要研究的是线性方程组和向量的相关概念和方法 在数据挖掘领域 线性代数的应用非常广泛 包括数据处理 特征提取 模型训练等方面 本文将从以下几个方面进行阐述 背景介绍 核心概念与联系 核心算法原理和具体
  • 人工智能与机器学习:未来的编程范式

    1 背景介绍 人工智能 Artificial Intelligence AI 和机器学习 Machine Learning ML 是现代计算机科学的重要领域之一 它们旨在让计算机能够自主地学习 理解和进化 以解决复杂的问题 随着数据量的增加
  • 技术管理者的核心能力在哪?

    作为管理者我曾经被下属当面问过 你为什么不写代码 诚然 我最近两年 代码越写越少 会越开越多 但 存在真的合理吗 我的核心能力应该是什么 看了一篇文章 它提出一个观点 技术管理者的核心能力在于技术判断力 通过在技术领域和非技术领域的长期积累
  • 用CHAT如何写大学生会计综合模拟实训报告

    CHAT回复 标题 大学生会计综合模拟实训报告 一 前言 随着信息化时代的发展 现代会计工作不再只依赖手动运算和记录 而是更加倚重电脑软件系统的配合运用 因此 对我们大学生来说 把握会计理论知识的同时 积极掌握相关的实践应用技能变得非常重要
  • 如何利用CHAT做简单的总结体会?

    问CHAT 在测试过程中使用appium python自动化的优点和体会 CHAT回复 使用 Appium 配合 Python 进行自动化测试主要有以下几点优点 1 跨平台性 Appium 支持 iOS 和 Android 平台的应用自动化
  • 【计算机毕业设计】实验室预约管理

    身处网络时代 随着网络系统体系发展的不断成熟和完善 人们的生活也随之发生了很大的变化 人们在追求较高物质生活的同时 也在想着如何使自身的精神内涵得到提升 而读书就是人们获得精神享受非常重要的途径 为了满足人们随时随地只要有网络就可以看书的要
  • 扬帆证券投资者必知:股票配股与增发的区别你清楚吗?

    配股和增发都是股票再融资的方式 不过二者有一定的区别 1 发行对象不同 配股是向原股东发售一定量股票 一般会以低于市价的价格发售 增发是向全体社会公众发行股票 即新老股东都能获得 2 发行前是否需要公告价格 配股会事先公告配股价 配股的定价
  • 扬帆证券:突发利好!外资重大转变,A股收到多份喜报

    A股财报季 利好音讯密集传来 1月16日晚间 A股多家上市公司披露了成绩预告 其间成绩预增 扭亏等利好公告数量占比超80 其间 普瑞眼科公告 估计2023年净赢利同比添加高达1163 98 1285 51 别的 多家上市公司公告称 估计20
  • 独家 | 鸿蒙(HarmonyOS)开发详细学习笔记免费分享

    前言 华为宣布 将在1月18日 在北京 上海 杭州 南京 成都 厦门 武汉 长沙 8 大城市同时召开大会 届时将揭秘鸿蒙生态和 HarmonyOS NEXT 进阶新篇章 简单的来说就是 纯血鸿蒙系统 即将彻底揭晓 鸿蒙系统自推出来以来 就一
  • ESM10A 消除对单独 PLC 的需求

    ESM10A 消除对单独 PLC 的需求 ESM10A 可以消除对单独 PLC 的需求 该程序是在 PC 上开发的 然后使用免费提供的简单易用的 EzSQ 软件下载到逆变器 似乎这些改进还不够 日立还在 SJ700 中添加了其他新功能 例如
  • Cortex-M3与M4权威指南

    处理器类型 所有的ARM Cortex M 处理器是32位的精简指令集处理器 它们有 32位寄存器 32位内部数据路径 32位总线接口 除了32位数据 Cortex M处理器也可以有效地处理器8位和16位数据以及支持许多涉及64位数据的操作
  • 【js学习之路】遍历数组api之 `filter `和 `map`的区别

    一 前言 数组是我们在项目中经常使用的数据类型 今天我们主要简述作用于遍历数组的api filter 和 map 的区别 二 filter和map的共同点 首先 我们主要阐述一下 filter 和 map 的共同点 api的参数都是回调函数
  • 2023下半年软考「单独划线」合格标准公布

    中国计算机技术职业资格网发布了 关于2023年度下半年计算机软件资格考试单独划线地区合格标准的通告 2023下半年软考单独划线地区合格标准各科目均为42分 01 官方通告 关于2023年度下半年计算机软件资格考试单独划线地区合格标准的通告

随机推荐

  • QWidget、QDialog及QMainWindow的区别与联系

    QWidget类是所有用户界面对象的基类 QMainWindow和QDialog都是QWidget的子类 一般来说 如果需要嵌入到其他窗体中 则基于QWidget创建 如果是顶级对话框 则基于QDialog创建 如果是主窗体 则基于QMai
  • H5如何直接跳转小程序?

    1 云开发方式 不推荐 不推荐理由 1 要钱 2 麻烦 需要兼容 参考链接 https developers weixin qq com miniprogram dev wxcloud guide staticstorage jump mi
  • 稀疏重构算法详解

    引入 在室内环境中 多径信号具有天然的空间稀疏性 根据压缩感知理论可知 如果信号是可压缩的或者在某个变换域是稀疏的 可以采用一个随机测量矩阵将高维信号映射到一个低维空间上 通过求解优化问题 以很高的概率重构出原始信号 因此 在该理论框架下
  • 带分数

    标题 带分数 100 可以表示为带分数的形式 100 3 69258 714 还可以表示为 100 82 3546 197 注意特征 带分数中 数字1 9分别出现且只出现一次 不包含0 类似这样的带分数 100 有 11 种表示法 题目要求
  • 小心踩雷,一次Java内存泄漏排查实战

    问题出现 晚上七点多开始 我就开始不停地收到报警邮件 邮件显示探测的几个接口有超时情况 多数执行栈都在 java io BufferedReader readLine BufferReader java 389 java io Buffer
  • c++ 变量常量指针练习题

    Q1 在win32 x86模式下 int p int pp double q 请说明p pp q各占几个字节的内存单元 p 占 4 个字节 pp 占 4 个字节 q 占 4 个字节 Q2常量1 1 0 1 的数据类型是什么 1 是 整形 i
  • YOLOv7中的数据集处理【代码分析】

    本文章主要是针对yolov7中数据集处理部分代码进行解析 和yolov5是一样的 也是可以更好的理解训练中送入的数据集到底是什么样子的 数据集的处理离不开两个类 一个是Dataset from torch utils data import
  • python3 -sorted函数 对所有可迭代的对象进行排序操作 sorted(corr_list,key=lambda x: -abs(x[0]))

    sorted 函数对所有可迭代的对象进行排序操作 返回重新排序的列表 sort 与 sorted 区别 sort 是应用在 list 上的方法 sorted 可以对所有可迭代的对象进行排序操 作 list 的 sort 方法返回的是对已经存
  • linux export 的作用

    功能说明 设置或显示环境变量 语 法 export fnp 变量名称 变量设置值 补充说明 在shell中执行程序时 shell会提供一组环境变量 export可新增 修改或删除环境变量 供后续执行的程序使用 export的效力仅及于该此登
  • 我在腾讯做测10年,总结的7条生存经验

    简单做个自我介绍 我是一名测试工程师 从15年毕业到现在工作了6年 一路走过来 觉得自己很幸运遇到了很多伯乐 教会了我很多道理和职场经验 也非常荣幸在阿里工作过4年 搭建过蚂蚁金服的platuo测试框架 thrift测试框架 自动化测试平台
  • React源码分析(一)=> scheduler分析

    文章目录 1 前言 2 getCurrentTime 3 unstable scheduleCallback函数 4 scheduleHostCallbackIfNeeded 4 1 flushWork 4 2 flushFirstCall
  • 学习笔记实操手册

    https note youdao com s KP25iMDf https note youdao com s GAmVO7V 使用yum安装php72 https www cnblogs com JahanGu p 10439472 h
  • 编写一个使用指针的C函数,交换数组a和数组b的对应元素

    编写一个使用指针的C函数 交换数组a和数组b的对应元素 int a 5 1 2 3 4 5 int b 5 10 20 30 40 50 输出格式要求 a d 2d b d 2d 程序运行示例如下 a 0 10 a 1 20 a 2 30
  • QT应用部署流程

    参考链接 https www shuzhiduo com A LPdo07AGz3 1 Windows系统 Windows下使用QT自带工具windeployqt exe部署 windows gt command 切换到QT的工具目录 在c
  • signature=a195252fc5196d0fb82cccccc68b06b3,Gene signatures in wound tissue as evidenced by molecular...

    Wound induction in the chicken CAM Chick embryos were cultured for 10 days and CAMs were inflicted by parallel scalpel s
  • linux 数组里面是json,将JSON解析为shell脚本中的数组

    小编典典 如果您确实无法使用适当的JSON解析器 例如 1 请尝试 基于的解决方案 jq awk Bash 4 x readarray t values lt 3 print 4 myfile json Bash 3 x IFS n rea
  • lua 3.0 中 普通方法延时

    local delayTime cc DelayTime create 1 local callFunND cc CallFunc create function self pushjoystick end local seq cc Seq
  • 微信企业付款至零钱,状态处理中,status=PROCESSING的解决办法

    前段时间腾讯因为支付系统异常 更新了一些东西 然后就开始出现了这个问题 时不时的就会有一个两个状态为 处理中 的交易 但文档中并没有给出解决办法 尝试咨询了客服 给出了两个解决方案 1 把该笔交易当做失败处理 但以后这笔订单就不要再去折腾它
  • ESP8266 RTOS SDK 移植 u8g2 移植代码

    LED屏驱动ssd1306 屏幕128x64大小 1 移植代码核心 方法1 port c define SCL Pin GPIO SCL define SDA Pin GPIO SDA void delay us uint32 t time
  • Flink学习20:算子介绍reduce

    1 reduce简介 按照指定的方式 把每个元素进行累计执行 比如实现累加计算 示例 import keyByNameTest StockPrice import org apache flink api scala createTypeI