如何计算 Spark RDD 的平均值?

2024-01-12

我有 Spark Scala 的问题,我想计算 Rdd 数据的平均值,我创建一个像这样的新 RDD,

[(2,110),(2,130),(2,120),(3,200),(3,206),(3,206),(4,150),(4,160),(4,170)]

我想这样数它们

[(2,(110+130+120)/3),(3,(200+206+206)/3),(4,(150+160+170)/3)]

然后,得到这样的结果,

   [(2,120),(3,204),(4,160)]

我如何使用 RDD 中的 scala 来做到这一点? 我用的是spark 1.6版本


您可以使用aggregateByKey。

val rdd = sc.parallelize(Seq((2,110),(2,130),(2,120),(3,200),(3,206),(3,206),(4,150),(4,160),(4,170)))
val agg_rdd = rdd.aggregateByKey((0,0))((acc, value) => (acc._1 + value, acc._2 + 1),(acc1, acc2) => (acc1._1 + acc2._1, acc1._2 + acc2._2))
val sum = agg_rdd.mapValues(x => (x._1/x._2))
sum.collect
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如何计算 Spark RDD 的平均值? 的相关文章

  • 了解如何使用 apply 和 unappy

    我试图更好地理解 的正确用法apply and unapply方法 考虑到我们想要序列化和反序列化的对象 这是正确的用法吗 即斯卡拉方式 的使用apply and unapply case class Foo object Foo appl
  • Scala:如何编写将类型化为接收者的实现类型的对象返回的方法

    我知道 Scala 中不推荐使用案例类继承 但为了简单起见 我在以下示例中使用了它 scala gt case class Foo val f String def foo g String Foo this copy f g define
  • Sparklyr - 在 Apache Spark Join 中包含空值

    问题在 Apache Spark Join 中包含空值 https stackoverflow com questions 41728762 including null values in an apache spark join有 Sc
  • 重塑案例类构造函数?

    试图找到一种方法来 重塑 案例构造函数以填充某些默认值 以下情况可能吗 def reshape T R1 lt HList R2 lt HList h R1 R2 gt T example case class MyClass a Doub
  • Spark/Yarn:HDFS 上不存在文件

    我在 AWS 上设置了 Hadoop Yarn 集群 有 1 个主服务器和 3 个从服务器 我已经验证我有 3 个活动节点在端口 50070 和 8088 上运行 我在客户端部署模式下测试了 Spark 作业 一切正常 当我尝试使用 Spa
  • Scala 集合不一致

    为什么 Scala Collections API 中的集合和列表之间缺乏一致性 例如 有不可变的 Set 但也有可变的 Set 如果我想使用后者 我可以简单地这样做 val set Set A set new A 但是 本身不存在可变列表
  • 将 Scala 库转换为 DLL (.NET)

    我正在尝试从 scala 类创建一个 Dll 我将 IntelliJ 与 SBT 一起使用 我已经找到了一种使用 ikvm converter 将 jar 文件转换为 Dll 的方法 现在的问题是 当我在 SBT 下使用 package 从
  • 有没有办法捕获 Spark 中使用通配符读取的多个 parquet 文件的输入文件名?

    我使用 Spark 将多个 parquet 文件读取到单个 RDD 中 并使用标准通配符路径约定 换句话说 我正在做这样的事情 val myRdd spark read parquet s3 my bucket my folder parq
  • 使用 Akka 玩 2.5 - 找不到参数超时的隐式值:akka.util.Timeout

    我正在尝试使用 Play 2 5 测试 Akka 但遇到了一个似乎无法解决的编译错误 我正在关注 Play 文档中的此页面 https playframework com documentation 2 5 x ScalaAkka http
  • 如何发现 Scala 远程 Actor 已死亡?

    在 Scala 中 当另一个 远程 actor 终止时 可以通过设置 trapExit 标志并以第二个 actor 作为参数调用 link 方法来通知一个 actor 在这种情况下 当远程参与者通过调用 exit 结束其工作时 第一个参与者
  • 对多列应用窗口函数

    我想执行窗口函数 具体为移动平均值 但针对数据帧的所有列 我可以这样做 from pyspark sql import SparkSession functions as func df df select func avg df col
  • 任务和分区之间有什么关系?

    我能说 么 Spark任务的数量等于Spark分区的数量吗 执行器运行一次 执行器内部的批处理 等于一个任务吗 每个任务只产生一个分区 1 的重复 并行度或可以同时运行的任务数量由以下公式设置 Executor实例的数量 配置 每个执行器的
  • PySpark Yarn 应用程序在 groupBy 上失败

    我正在尝试在 Yarn 模式下运行一个处理大量数据的作业 2TB 从谷歌云存储读取 管道可以总结如下 sc textFile gs path json map lambda row json loads row map toKvPair g
  • 在 Scala 和 SBT 中调试较长的编译时间

    在我的 Scala SBT 项目中 我有一个文件需要 5 分钟才能编译 所有其他的都可以在几秒钟内编译 这使得开发非常痛苦 我确信我滥用了一些 Scala 构造 但我不知道如何调试它 如何在 Scala 中调试较长的编译时间 我正在使用 S
  • 为什么 Spark 比 Hadoop MapReduce 更快

    有人可以使用字数统计示例解释一下为什么 Spark 比 MapReduce 更快吗 bafna的答案提供了故事的记忆方面 但我想补充另外两个重要事实 DAG和生态系统 Spark 使用 惰性求值 来形成连续计算阶段的有向无环图 DAG 通过
  • 错误:协变类型 A 出现在逆变位置

    我试图写一个不可变的Matrix A 班级 我希望该类是协变的A但是当我把 在 前面A编译器开始抱怨类中的某些操作 以下是我的相关子集Matrix类 实际类比以下子集大 5 倍左右 class Matrix A private val co
  • Spark Scala 将列从一个数据帧复制到另一个数据帧

    我有一个原始数据框的修改版本 我在其上进行了聚类 现在我想将预测列恢复为原始 DF 索引没问题 因此匹配 我该怎么做 使用这段代码我得到一个错误 println Predicted dfWithOutput show println Ori
  • fetchsize和batchsize对Spark的影响

    我想通过以下方式控制 RDB 的读写速度Spark直接 但标题已经透露的相关参数似乎不起作用 我可以得出这样的结论吗fetchsize and batchsize我的测试方法不起作用 或者它们确实会影响阅读和写作方面 因为测量结果基于规模是
  • 使用spark phoenix从表中读取rdd分区号为1

    当我运行我的火花代码时 val sqlContext spark sqlContext val noact table primaryDataProcessor getTableData sqlContext zookeeper table
  • 类型级编程有哪些示例? [关闭]

    Closed 这个问题需要多问focused help closed questions 目前不接受答案 我不明白 类型级编程 是什么意思 也无法使用Google找到合适的解释 有人可以提供一个演示类型级编程的示例吗 范式的解释和 或定义将

随机推荐

  • C++ LibVLC 从帧/图像创建流

    我想使用 LibVLC 从图像创建视频 目前我还没有使用 LibVLC 的经验 我已经实施了一个像这里这样的测试项目 使用 libvlc 播放 mp3 的简单 C 程序 https stackoverflow com questions 1
  • Google Chrome 自动填充背景颜色在版本 72.0 中是否已更改?

    我最近注意到 在 Google Chrome 中 所有自动填充值的输入元素都显示蓝色背景 是的 Google 将自动填充预览的背景颜色更改为GoogleBlue50 您可以在这里找到问题 https bugs chromium org p
  • 如何使用表值参数插入多行,然后返回它们的 ID?

    在我的应用程序中 我需要将大量 100 行插入到数据库中 将它们插入数据库后 我需要插入它们的子项 这些子项具有指向子项的外键引用 我想知道是否有一种方法可以编写一个存储过程来插入所有这些行并将它们的 ID 返回到我的应用程序 您已使用表值
  • 向 JavaFX 2.2 添加其他视频编解码器/DVD 支持

    更新 由于 JFX 的媒体端已经开源 我自己对此进行了研究 这确实是可能的 但需要更改和重建 JFX 源 Java 和 C 部分 描述了该过程here http berry120 blogspot co uk 2014 03 expandi
  • TortoiseGit Log Graph 中分支线上的方形图标是什么意思?

    我在 TortoiseGit 日志图表的分支线上有方形和圆形图标 方形图标的含义是什么 Square 合并提交 回合 承诺
  • 如何将twitter、facebook集成到iPhone应用程序中?

    我想将 facebook twitter flickr 放入我的 iPhone 应用程序中 以便用户可以轻松登录并从我的应用程序发布消息 把问题放在这里就是为了得到一些线索或者现有的作品来达到这个目的 经过搜索 我在这里找到了一些相关作品h
  • 导入与命名空间同名的类

    我正在使用带有单独 types 定义的第三方库 如下所示 declare namespace Foo declare class Foo export Foo 如何在我的代码中导入 Foo 类 另外 这种奇怪的名称重复在 Typescrip
  • 查看 Visual Studio 中数据流 SSIS 元素的完整工具提示错误消息

    当我将光标悬停在 SSIS 数据流图中的一个元素上时 工具提示中会显示错误 Where我可以看到全文这个的错误信息 显示此工具提示的元素是标记为 规格 的元素 我正在使用 Microsoft Visual Studio 2008 The m
  • 插入初始数据JPA

    是否可以在我的数据库中加载初始数据 我正在使用 JPA 并使用 删除并创建 策略 这样每次我想测试我的应用程序时 数据库都会首先被清除 我将不得不重新创建所有内容 我知道在其他语言中 Php gt Doctrine datafixtures
  • JPA 使用 ElementCollection 映射多行

    我正在尝试遵循JPA教程 http en wikibooks org wiki Java Persistence ElementCollection并使用ElementCollection记录员工电话号码 PHONE table OWNER
  • 将 python 请求与 javascript 页面结合使用

    我正在尝试将 Requests 框架与 python 一起使用 http docs python requests org en latest http docs python requests org en latest 但是我试图访问的
  • CanvasRenderingContext2D 中的透明度组

    有没有一种方法可以将多个绘制操作组合到 2d 画布渲染上下文中 从而使它们各自独立combined结果被组合到画布的先前内容上 而不是每个绘图操作都由其本身组合 一个应用程序 我想绘制一条带有箭头的半透明线 并且我想避免线和箭头重叠的那些区
  • 为什么“for”循环条件失败? [复制]

    这个问题在这里已经有答案了 在下面所示的代码中 没有打印任何内容 这意味着for循环失败 可能是什么原因 我想知道因为当我打印时TOTAL ELEMENTS单独地 它给出5 所以自然这一定是5 2 3 gt 1 lt 3 所以它应该打印一些
  • Java 原始类型值分配给泛型类型运行时 getClass() 方法错误

    public class Box
  • CLR 定义的方法(例如 [delegate].BeginInvoke)记录在哪里? [关闭]

    Closed 这个问题正在寻求书籍 工具 软件库等的推荐 不满足堆栈溢出指南 help closed questions 目前不接受答案 编辑 完全改写 看来我的问题措辞确实不好 而且也没有得到很好的回应 所以我希望这个完整的改写能有所帮助
  • AppFabric 缓存内存非常密集

    问题 我做错了什么吗 配置设置不正确 AppFabric 显示的内存使用量是否低于正常值 问题 我正在将数据从 SQL 数据库表读取到 AppFabric 缓存内存中 似乎 AppFabric 对于一个相当小的对象使用了大量的内存 我无法理
  • 在Python中将一个16位整数拆分为两个8位整数

    我必须将给定的 16 位整数转换为两个 8 位整数 然后将其用作输出 它们的标题是获取两个 8 位整数并将它们重新组合为 16 位输入 不幸的是 这超出了我的控制范围 我的解决方案有效 但感觉不干净 对于粗略数字 我对原始数字进行了位移位
  • 为什么我看不到 Microsoft Forms 2.0 对象库?

    我想使用这个库来处理剪贴板 我希望看到它如下面的屏幕截图所示 但我在参考库列表中找不到它 我怎样才能让它出现 如果您将用户窗体添加到项目中 则会自动添加该库 如果您不需要用户窗体 以后可以随时将其删除
  • Django 模型 __unicode__ 在记录时引发异常

    我有一个模型类 如下所示 class Address models Model taking length of address city fields from existing UserProfile model address 1 m
  • 如何计算 Spark RDD 的平均值?

    我有 Spark Scala 的问题 我想计算 Rdd 数据的平均值 我创建一个像这样的新 RDD 2 110 2 130 2 120 3 200 3 206 3 206 4 150 4 160 4 170 我想这样数它们 2 110 13