Apache Spark - foreach 与 foreachPartition 何时使用 什么?

2023-12-08

我想知道是否foreachPartitionforeach方法考虑到我正在流过的情况RDD为了对累加器变量执行一些求和。


foreach and foreachPartitions是行动。

foreach(函数): 单位

用于调用具有副作用的操作的通用函数。对于每个 RDD 中的元素,它调用传递的函数。这是 通常用于操作累加器或写入外部 商店。

注意:修改累加器之外的变量foreach()可能会导致未定义的行为。看了解闭包更多细节。

example :

scala> val accum = sc.longAccumulator("My Accumulator")
accum: org.apache.spark.util.LongAccumulator = LongAccumulator(id: 0, name: Some(My Accumulator), value: 0)

scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum.add(x))
...
10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s

scala> accum.value
res2: Long = 10

foreachPartition(函数):单位

如同foreach(),而不是为每个调用函数 元素,它为每个分区调用它。该功能应该可以 接受迭代器。这比foreach()因为 它减少了函数调用的次数(就像mapPartitions() ).

的用法foreachPartition例子:


  • 示例 1:对于您想要使用的每个分区一个数据库连接(每个分区块内部),这是如何使用 scala 完成此操作的示例用法。


/**
    * Insert in to database using foreach partition.
    *
    * @param sqlDatabaseConnectionString
    * @param sqlTableName
    */
  def insertToTable(sqlDatabaseConnectionString: String, sqlTableName: String): Unit = {

    //numPartitions = number of simultaneous DB connections you can planning to give

datframe.repartition(numofpartitionsyouwant)

    val tableHeader: String = dataFrame.columns.mkString(",")
    dataFrame.foreachPartition { partition =>
      // Note : Each partition one connection (more better way is to use connection pools)
      val sqlExecutorConnection: Connection = DriverManager.getConnection(sqlDatabaseConnectionString)
      //Batch size of 1000 is used since some databases cant use batch size more than 1000 for ex : Azure sql
      partition.grouped(1000).foreach {
        group =>
          val insertString: scala.collection.mutable.StringBuilder = new scala.collection.mutable.StringBuilder()
          group.foreach {
            record => insertString.append("('" + record.mkString(",") + "'),")
          }

          sqlExecutorConnection.createStatement()
            .executeUpdate(f"INSERT INTO [$sqlTableName] ($tableHeader) VALUES "
              + insertString.stripSuffix(","))
      }


      sqlExecutorConnection.close() // close the connection so that connections wont exhaust.
    }
  }
  
  • 示例2:

的用法foreachPartition与 Spark Streaming (dstreams) 和 kafka Producer

dstream.foreachRDD { rdd =>
  rdd.foreachPartition { partitionOfRecords =>
// only once per partition You can safely share a thread-safe Kafka //producer instance.
    val producer = createKafkaProducer()
    partitionOfRecords.foreach { message =>
      producer.send(message)
    }
    producer.close()
  }
}

Note :如果您想避免这种每个分区创建一次生产者的方式,更好的方法是使用广播生产者sparkContext.broadcast由于 Kafka 生产者是异步的并且 在发送之前大量缓冲数据。


累加器采样片段来玩弄它......通过它 你可以测试性能



     test("Foreach - Spark") {
        import spark.implicits._
        var accum = sc.longAccumulator
        sc.parallelize(Seq(1,2,3)).foreach(x => accum.add(x))
        assert(accum.value == 6L)
      }

      test("Foreach partition - Spark") {
        import spark.implicits._
        var accum = sc.longAccumulator
        sc.parallelize(Seq(1,2,3)).foreachPartition(x => x.foreach(accum.add(_)))
        assert(accum.value == 6L)
      }
  

结论 :

foreachPartition对分区的操作显然是 比更好的边缘foreach

经验法则:

foreachPartition当您访问成本高昂时应该使用 数据库连接或kafka生产者等资源,将初始化 每个分区一个而不是每个元素一个(foreach)。当它 对于累加器,您可以通过上述测试来衡量性能 方法,在累加器的情况下也应该工作得更快。

还有……看地图与地图分区具有相似的概念,但它们是转换。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Apache Spark - foreach 与 foreachPartition 何时使用 什么? 的相关文章

  • HSQL - 识别打开连接的数量

    我正在使用嵌入式 HSQL 数据库服务器 有什么方法可以识别活动打开连接的数量吗 Yes SELECT COUNT FROM INFORMATION SCHEMA SYSTEM SESSIONS
  • 如何在 Spring 中禁用使用 @Component 注释创建 bean?

    我的项目中有一些用于重构逻辑的通用接口 它看起来大约是这样的 public interface RefactorAwareEntryPoint default boolean doRefactor if EventLogService wa
  • Java 集合的并集或交集

    建立并集或交集的最简单方法是什么Set在 Java 中 我见过这个简单问题的一些奇怪的解决方案 例如手动迭代这两个集合 最简单的单行解决方案是这样的 set1 addAll set2 Union set1 retainAll set2 In
  • 没有 Spring 的自定义 Prometheus 指标

    我需要为 Web 应用程序提供自定义指标 问题是我不能使用 Spring 但我必须使用 jax rs 端点 要求非常简单 想象一下 您有一个包含键值对的映射 其中键是指标名称 值是一个简单的整数 它是一个计数器 代码会是这样的 public
  • Hibernate 的 PersistentSet 不使用 hashCode/equals 的自定义实现

    所以我有一本实体书 public class Book private String id private String name private String description private Image coverImage pr
  • 像 Java 这样的静态类型语言中动态方法解析背后的原因是什么

    我对 Java 中引用变量的动态 静态类型和动态方法解析的概念有点困惑 考虑 public class Types Override public boolean equals Object obj System out println i
  • tomcat 中受密码保护的应用程序

    我正在使用 JSP Servlet 开发一个Web应用程序 并且我使用了Tomcat 7 0 33 as a web container 所以我的要求是tomcat中的每个应用程序都会password像受保护的manager applica
  • 如何在谷歌地图android上显示多个标记

    我想在谷歌地图android上显示带有多个标记的位置 问题是当我运行我的应用程序时 它只显示一个位置 标记 这是我的代码 public class koordinatTask extends AsyncTask
  • java for windows 中的文件图标叠加

    我正在尝试像 Tortoise SVN 或 Dropbox 一样在文件和文件夹上实现图标叠加 我在网上查了很多资料 但没有找到Java的解决方案 Can anyone help me with this 很抱歉确认您的担忧 但这无法在 Ja
  • java.io.Serialized 在 C/C++ 中的等价物是什么?

    C C 的等价物是什么java io Serialized https docs oracle com javase 7 docs api java io Serializable html 有对序列化库的引用 用 C 序列化数据结构 ht
  • 专门针对 JSP 的测试驱动开发

    在理解 TDD 到底是什么之前 我就已经开始编写测试驱动的代码了 在没有实现的情况下调用函数和类可以帮助我以更快 更有效的方式理解和构建我的应用程序 所以我非常习惯编写代码 gt 编译它 gt 看到它失败 gt 通过构建其实现来修复它的过程
  • META-INF/服务应该在 sbt 中的哪里

    META INF 目录应该放在哪里 以便 sbt 获取自定义配置 我在尝试使用 ServiceLoader 时遇到了这个问题 并且我试图在 META INF services 中创建自定义服务 如果您将 META INF 文件夹放在 src
  • Spark 3 KryoSerializer 问题 - 无法找到类:org.apache.spark.util.collection.OpenHashMap

    我正在将 Spark 2 4 项目升级到 Spark 3 x 我们遇到了一些现有 Spark ml 代码的问题 var stringIndexers Array StringIndexer for featureColumn lt FEAT
  • 我如何在java中读取二进制数据文件

    因此 我正在为学校做一个项目 我需要读取二进制数据文件并使用它来生成角色的统计数据 例如力量和智慧 它的设置是让前 8 位组成一个统计数据 我想知道执行此操作的实际语法是什么 是不是就像读文本文件一样 这样 File file new Fi
  • 包 javax.el 不存在

    我正在使用 jre6 eclipse 并导入 javax el 错误 包 javax el 不存在 javac 导入 javax el 过来 这不应该是java的一部分吗 谁能告诉我为什么会这样 谢谢 米 EL 统一表达语言 是 Java
  • 从 scala 的 Type 获取 ParameterizedType?

    有用的是 scala 的 Universe typeOf 保留了类的类型参数 import scala reflect runtime universe case class X T TypeTag val t typeOf T e g S
  • 使用反射覆盖最终静态字段是否有限制?

    在我的一些单元测试中 我在最终静态字段上的反射中遇到了奇怪的行为 下面是说明我的问题的示例 我有一个基本的 Singleton 类 其中包含一个 Integer public class BasicHolder private static
  • CamcorderProfile.videoCodec 返回错误值

    根据docs https developer android com reference android media CamcorderProfile html 您可以使用CamcorderProfile获取设备默认视频编解码格式 然后将其
  • 使用 svn 1.8.x、subclise 1.10 的 m2e-subclipse 连接器在哪里?

    我读到 m2e 的生产商已经停止生产 svn 1 7 以外的任何版本的 m2e 连接器 Tigris 显然已经填补了维护 m2e subclipse 连接器的空缺 Q1 我的问题是 使用 svn 1 8 x 的 eclipse 更新 url
  • 双枢轴快速排序和快速排序有什么区别?

    我以前从未见过双枢轴快速排序 是快速排序的升级版吗 双枢轴快速排序和快速排序有什么区别 我在 Java 文档中找到了这个 排序算法是双枢轴快速排序 作者 弗拉基米尔 雅罗斯拉夫斯基 乔恩 本特利和约书亚 布洛赫 这个算法 在许多数据集上提供

随机推荐

  • UIView 隐藏/显示动画

    我的简单目标是淡出动画隐藏和显示功能 Button hidden YES 够简单的 然而 有可能让它淡出而不是消失吗 这样看起来相当不专业 在 iOS 4 及更高版本中 有一种方法只需使用 UIView 转换方法即可实现此目的 而无需导入
  • 使用 WinForms GeckoFX 控件从 C# 调用 javascript 函数的推荐方法是什么?

    问题说明了一切 我已经把所有东西都连接好了 并且知道如何将消息从浏览器 html 发送到 c 但不知道如何以其他方式发送消息 我应该能够做类似的事情 browserControl JSCall myFunction Dave Smith 以
  • iOS App提交的Bundle ID

    我的应用程序商店中已有一个应用程序 我想将新应用程序上传到应用程序商店 但它要求提供捆绑包 ID 我可以使用与第一个应用程序相同的 ID 还是每个新应用程序的新捆绑包 ID 谢谢你 您将需要创建一个新的捆绑包 ID 请参阅 Apple 文档
  • Xamarin.forms 中的垂直滑块?

    我正在尝试在 Xamarin forms 中实现垂直滑块 我知道我需要分别在 ios 和 android 中创建渲染类 对于 ios 我的渲染器似乎工作正常 对于 Android 我正在点击链接https forums xamarin co
  • git stash -- 删除未跟踪的文件

    今天在工作中 我的一位同事注意到 在提供文件路径的同时执行 git stash 会删除未跟踪的文件 git stash src 此命令会删除未跟踪的文件 并且在 stash pop 后不会恢复它们 然而 git 存储 这不 我对我来说就像一
  • 在 jqplot 中打开和关闭 pointLabels

    我正在尝试以编程方式打开和关闭 pointLabels 我认为它会像这样工作 var data 1 1 2 5 4 9 var graph jqplot id graph data series pointLabels show true
  • 是否允许在 ASP.NET Core 控制器中使用 Task.Run?

    场景 我有一个带有 删除 ASP NET Core 控制器操作的 Web 服务 该实现由两个步骤组成 第一步是便宜的 之后其他操作就不再可以看到已删除的数据 第二步是长时间运行的 它执行实际的删除 可以用吗Task Run对于第二个操作并且
  • C 风格语言中匿名 { } 块的用途是什么?

    C 风格语言 C C C 中匿名 块的用途是什么 例子 void function int i 0 i i 1 int k 0 k k 1 Edit 感谢所有精彩的回答 它将变量的范围限制在 内的块内
  • 线程“main”中的异常 java.lang.ArrayIndexOutOfBoundsException [重复]

    这个问题在这里已经有答案了 我是编程新手 在 eclipse 中运行一些新代码时 我遇到了这个错误 并且完全迷失了 import java util Scanner public class Lab6 public static void
  • 如何修复 gpflow 中内核长度尺度的某些尺寸?

    我有一个 2d 内核 k gpflow kernels RBF lengthscales 24 5 1e 5 m gpflow models GPR data X Y kernel k mean function None 我想修复第二维的
  • 定义调用约定的意义何在?

    例如 int WINAPI WinMain HINSTANCE instance HINSTANCE prev instance PSTR cmd line int cmd show WINAPI 是一个如下所示的定义 define WIN
  • 部署Python程序出现问题(用py2exe打包)

    我有一个问题 我的程序使用了 py2exe 它在我的计算机上运行 我用 Inno Setup 打包它 仍然可以在我的计算机上运行 但是当我将它发送到另一台计算机时 尝试运行该应用程序时出现以下错误 CreateProcess 失败 代码 1
  • utf-8字符不显示在chrome中[关闭]

    Closed 这个问题需要调试细节 目前不接受答案 html 特殊字符在 chrome 中无法正确显示的问题请参见 示例页面 我已经检查并重新保存了项目中的每个文件 因为 utf 8 确认了我的元标记减速 并确认 chrome 设置为默认为
  • 将数组放入 class.property

    我有一个具有以下属性的类 Dim pBonds as string Private Property Get Bonds As String Bonds pBonds End Property Private Property Get Bo
  • 如何检查用户输入是否是字符串

    我有两个用户输入 在第一个用户必须插入字符串类型的文本 在第二个用户必须插入 int 类型的数字 我使用了 try except ValueError 因此用户无法在需要 int 的地方插入字符串 尽管当用户在需要字符串的地方插入 int
  • 将 TensorBoard 2 中的 2 个图与 TensorFlow 2 合并

    我想使用 Tensorflow 和 Tensorboard V2 将精度和召回率合并到同一个图上 我找到了许多以前版本的示例 但没有一个适用于我的情况 我创建了一个 Keras 回调来计算精度和召回率 然后调用张量流摘要将它们记录在同一个记
  • ARKit – 在不可见平面下渲染 3D 对象

    我有一个带有隐形物体的 ARKit 场景SCNPlane plane geometry firstMaterial colorBufferWriteMask 该平面放置在地面上 用于渲染deferred shadows来自放置在场景中的其他
  • #define 变量的类型

    如果我有 define MAXLINE 5000 MAXLINE 理解为什么类型 我应该假设它是一个int 我可以以某种方式测试它吗 一般来说 如何判断某一种类型 defineed 变量 它没有类型 这是一个简单的文本替换 文本 5000
  • Terraform 远程状态 s3 存储桶创建包含在状态文件中吗?

    我正在寻找在 S3 存储桶中创建和存储状态文件的最佳实践 我应该将 S3 存储桶的创建与基础设施一起包括在内还是 为其 S3 存储桶创建一个单独的状态文件 并为资源创建一个不同的状态文件 如果它是不同的文件 我还需要存储创建的 s3 存储桶
  • Apache Spark - foreach 与 foreachPartition 何时使用 什么?

    我想知道是否foreachPartition与foreach方法考虑到我正在流过的情况RDD为了对累加器变量执行一些求和 foreach and foreachPartitions是行动 foreach 函数 单位 用于调用具有副作用的操作