我在脚本中使用 for 循环来为 size_DF(数据帧)的每个元素调用函数,但这需要很多时间。我尝试通过地图删除 for 循环,但没有得到任何输出。
size_DF 是我从表中获取的大约 300 个元素的列表。
用于:
import call_functions
newObject = call_functions.call_functions_class()
size_RDD = sc.parallelize(size_DF)
if len(size_DF) == 0:
print "No record present in the truncated list"
else:
for row in size_DF:
length = row[0]
print "length: ", length
insertDF = newObject.full_item(sc, dataBase, length, end_date)
使用地图
if len(size_DF) == 0:
print "No record present in the list"
else:
size_RDD.mapPartition(lambda l: newObject.full_item(sc, dataBase, len(l[0]), end_date))
newObject.full_item(sc, dataBase, len(l[0]), end_date)在 full_item() 中——我正在执行一些选择操作并连接 2 个表并将数据插入到表中。
请帮助我,让我知道我做错了什么。
pyspark.rdd.RDD.mapPartition
方法是惰性求值的。
通常,为了强制执行计算,您可以使用一个方法,在返回的惰性 RDD 实例上返回一个值。
有更高级别的功能负责强制评估RDD
价值观。e.g. pyspark.rdd.RDD.foreach https://github.com/apache/spark/blob/v2.3.0/python/pyspark/rdd.py#L786
由于您并不真正关心操作的结果,因此您可以使用pyspark.rdd.RDD.foreach
代替pyspark.rdd.RDD.mapPartition
.
def first_of(it):
for first in it:
return first
return []
def insert_first(it):
first = first_of(it)
item_count = len(first)
newObject.full_item(sc, dataBase, item_count, end_date)
if len(size_DF) == 0:
print('No record present in the truncated list')
else:
size_DF.forEach(insert_first)
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)