嗨,我正在转型,我已经创建了some_function(iter)
发电机至yield Row(id=index, api=row['api'], A=row['A'], B=row['B']
生成从 pandas 数据帧到 rdd 和 Spark 数据帧的转换行。我收到错误。 (我必须使用 pandas 来转换数据,因为有大量遗留代码)
输入 Spark 数据帧
respond_sdf.show()
+-------------------------------------------------------------------+
|content |
+-------------------------------------------------------------------+
|{'api': ['api_1', 'api_1', 'api_1'],'A': [1,2,3], 'B': [4,5,6] } |
|{'api': ['api_2', 'api_2', 'api_2'],'A': [7,8,9], 'B': [10,11,12] }|
+-------------------------------------------------------------------+
转换后的预期 Spark Dataframe
transform_df.show()
+-------------------+
| api | A | B |
+-------------------+
| api_1 | 1 | 4 |
| api_1 | 3 | 5 |
| api_1 | 4 | 6 |
| api_2 | 7 | 10 |
| api_2 | 8 | 11 |
| api_2 | 9 | 12 |
+-------------------+
最小示例代码
#### IMPORT PYSPARK ###
import pandas as pd
import pyspark
from pyspark.sql import Row
from pyspark.sql.types import StructType, StructField, IntegerType,StringType
spark = pyspark.sql.SparkSession.builder.appName("test") \
.master('local[*]') \
.getOrCreate()
sc = spark.sparkContext
####### INPUT DATAFRAME WITH LIST OF JSONS ########################
rdd_list = [["{'api': ['api_1', 'api_1', 'api_1'],'A': [1,2,3], 'B': [4,5,6] }"],
["{'api': ['api_2', 'api_2', 'api_2'],'A': [7,8,9], 'B': [10,11,12] }"]]
schema = StructType([StructField('content', StringType(), True)])
jsons = sc.parallelize(rdd_list)
respond_sdf = spark.createDataFrame(jsons, schema)
respond_sdf.show(truncate=False)
####### TRANSFORMATION DATAFRAME ########################
# Pandas transformation function returning pandas dataframe
def pandas_function(url_json):
# Complex Pandas transformation
url = url_json[0]
json = url_json[1]
df = pd.DataFrame(eval(json))
return df
# Generator returing Row from pandas dataframe
def some_function(iter):
# Pandas generator
pandas_df = pandas_function(iter)
for index, row in pandas_df.iterrows():
## ERROR COMES FROM THIS ROW
yield Row(id=index, api=row['api'], A=row['A'], B=row['B'])
# Creating transformation spark dataframe
schema = StructType([
StructField('API', StringType(), True),
StructField('A', IntegerType(), True),
StructField('B', IntegerType(), True)
])
rdd = respond_sdf.rdd.map(lambda x: some_function(x))
transform_df = spark.createDataFrame(rdd,schema)
transform_df.show()
我收到以下错误:
raise TypeError(new_msg("StructType can not accept object %r in type %s"
TypeError: StructType can not accept object <generator object some_function at 0x7f69b43def90> in type <class 'generator'>
完整错误:
Py4JJavaError: An error occurred while calling o462.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 2 in stage 37.0 failed 1 times, most recent failure: Lost task 2.0 in stage 37.0 (TID 97, dpc, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 605, in main
process()
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 597, in process
serializer.dump_stream(out_iter, outfile)
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 271, in dump_stream
vs = list(itertools.islice(iterator, batch))
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/util.py", line 107, in wrapper
return f(*args, **kwargs)
File "/usr/lib/spark/python/pyspark/sql/session.py", line 612, in prepare
verify_func(obj)
File "/usr/lib/spark/python/pyspark/sql/types.py", line 1408, in verify
verify_value(obj)
File "/usr/lib/spark/python/pyspark/sql/types.py", line 1395, in verify_struct
raise TypeError(new_msg("StructType can not accept object %r in type %s"
TypeError: StructType can not accept object <generator object some_function at 0x7f69b43def90> in type <class 'generator'>
我遵循以下链接的建议:pySpark将mapPartitions的结果转换为spark DataFrame https://stackoverflow.com/questions/59262543/pyspark-convert-result-of-mappartitions-to-spark-dataframe