foreach
and foreachPartitions
是行动。
foreach(函数): 单位
用于调用具有副作用的操作的通用函数。对于每个
RDD 中的元素,它调用传递的函数。这是
通常用于操作累加器或写入外部
商店。
注意:修改累加器之外的变量foreach()
可能会导致未定义的行为。看了解闭包更多细节。
example :
scala> val accum = sc.longAccumulator("My Accumulator")
accum: org.apache.spark.util.LongAccumulator = LongAccumulator(id: 0, name: Some(My Accumulator), value: 0)
scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum.add(x))
...
10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s
scala> accum.value
res2: Long = 10
foreachPartition(函数):单位
如同foreach()
,而不是为每个调用函数
元素,它为每个分区调用它。该功能应该可以
接受迭代器。这比foreach()
因为
它减少了函数调用的次数(就像mapPartitions
() ).
的用法foreachPartition
例子:
- 示例 1:对于您想要使用的每个分区一个数据库连接(每个分区块内部),这是如何使用 scala 完成此操作的示例用法。
/**
* Insert in to database using foreach partition.
*
* @param sqlDatabaseConnectionString
* @param sqlTableName
*/
def insertToTable(sqlDatabaseConnectionString: String, sqlTableName: String): Unit = {
//numPartitions = number of simultaneous DB connections you can planning to give
datframe.repartition(numofpartitionsyouwant)
val tableHeader: String = dataFrame.columns.mkString(",")
dataFrame.foreachPartition { partition =>
// Note : Each partition one connection (more better way is to use connection pools)
val sqlExecutorConnection: Connection = DriverManager.getConnection(sqlDatabaseConnectionString)
//Batch size of 1000 is used since some databases cant use batch size more than 1000 for ex : Azure sql
partition.grouped(1000).foreach {
group =>
val insertString: scala.collection.mutable.StringBuilder = new scala.collection.mutable.StringBuilder()
group.foreach {
record => insertString.append("('" + record.mkString(",") + "'),")
}
sqlExecutorConnection.createStatement()
.executeUpdate(f"INSERT INTO [$sqlTableName] ($tableHeader) VALUES "
+ insertString.stripSuffix(","))
}
sqlExecutorConnection.close() // close the connection so that connections wont exhaust.
}
}
的用法foreachPartition
与 Spark Streaming (dstreams) 和 kafka Producer
dstream.foreachRDD { rdd =>
rdd.foreachPartition { partitionOfRecords =>
// only once per partition You can safely share a thread-safe Kafka //producer instance.
val producer = createKafkaProducer()
partitionOfRecords.foreach { message =>
producer.send(message)
}
producer.close()
}
}
Note :如果您想避免这种每个分区创建一次生产者的方式,更好的方法是使用广播生产者sparkContext.broadcast
由于 Kafka 生产者是异步的并且
在发送之前大量缓冲数据。
累加器采样片段来玩弄它......通过它
你可以测试性能
test("Foreach - Spark") {
import spark.implicits._
var accum = sc.longAccumulator
sc.parallelize(Seq(1,2,3)).foreach(x => accum.add(x))
assert(accum.value == 6L)
}
test("Foreach partition - Spark") {
import spark.implicits._
var accum = sc.longAccumulator
sc.parallelize(Seq(1,2,3)).foreachPartition(x => x.foreach(accum.add(_)))
assert(accum.value == 6L)
}
结论 :
foreachPartition
对分区的操作显然是
比更好的边缘foreach
经验法则:
foreachPartition
当您访问成本高昂时应该使用
数据库连接或kafka生产者等资源,将初始化
每个分区一个而不是每个元素一个(foreach
)。当它
对于累加器,您可以通过上述测试来衡量性能
方法,在累加器的情况下也应该工作得更快。
还有……看地图与地图分区具有相似的概念,但它们是转换。