如何生成 pandas 数据框行以触发数据框

2024-01-11

嗨,我正在转型,我已经创建了some_function(iter)发电机至yield Row(id=index, api=row['api'], A=row['A'], B=row['B']生成从 pandas 数据帧到 rdd 和 Spark 数据帧的转换行。我收到错误。 (我必须使用 pandas 来转换数据,因为有大量遗留代码)

输入 Spark 数据帧

respond_sdf.show()
    +-------------------------------------------------------------------+
    |content                                                            |
    +-------------------------------------------------------------------+
    |{'api': ['api_1', 'api_1', 'api_1'],'A': [1,2,3], 'B': [4,5,6] }   |
    |{'api': ['api_2', 'api_2', 'api_2'],'A': [7,8,9], 'B': [10,11,12] }|
    +-------------------------------------------------------------------+

转换后的预期 Spark Dataframe

transform_df.show()
    +-------------------+
    |  api   |  A  |  B |
    +-------------------+
    | api_1  |  1  |  4 |
    | api_1  |  3  |  5 |
    | api_1  |  4  |  6 |
    | api_2  |  7  | 10 |
    | api_2  |  8  | 11 |
    | api_2  |  9  | 12 |
    +-------------------+

最小示例代码

#### IMPORT PYSPARK ###

import pandas as pd
import pyspark
from pyspark.sql import Row
from pyspark.sql.types import StructType, StructField, IntegerType,StringType
spark = pyspark.sql.SparkSession.builder.appName("test") \
    .master('local[*]') \
    .getOrCreate()
sc = spark.sparkContext


####### INPUT DATAFRAME WITH LIST OF JSONS ########################

rdd_list = [["{'api': ['api_1', 'api_1', 'api_1'],'A': [1,2,3], 'B': [4,5,6] }"],
            ["{'api': ['api_2', 'api_2', 'api_2'],'A': [7,8,9], 'B': [10,11,12] }"]]

schema = StructType([StructField('content', StringType(), True)])

jsons = sc.parallelize(rdd_list)
respond_sdf = spark.createDataFrame(jsons, schema)
respond_sdf.show(truncate=False)


####### TRANSFORMATION DATAFRAME ########################

# Pandas transformation function returning pandas dataframe
def pandas_function(url_json):
    # Complex Pandas transformation
    url = url_json[0]
    json = url_json[1]
    df = pd.DataFrame(eval(json))
    return df

# Generator returing Row from pandas dataframe
def some_function(iter):
  # Pandas generator
  pandas_df = pandas_function(iter)
  for index, row in pandas_df.iterrows():
      ## ERROR COMES FROM THIS ROW
      yield Row(id=index, api=row['api'], A=row['A'], B=row['B'])

# Creating transformation spark dataframe
schema = StructType([
  StructField('API', StringType(), True),
  StructField('A', IntegerType(), True),
  StructField('B', IntegerType(), True)
  ])


rdd = respond_sdf.rdd.map(lambda x: some_function(x))
transform_df = spark.createDataFrame(rdd,schema)
transform_df.show()

我收到以下错误:

raise TypeError(new_msg("StructType can not accept object %r in type %s"
TypeError: StructType can not accept object <generator object some_function at 0x7f69b43def90> in type <class 'generator'>

完整错误:

Py4JJavaError: An error occurred while calling o462.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 2 in stage 37.0 failed 1 times, most recent failure: Lost task 2.0 in stage 37.0 (TID 97, dpc, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 605, in main
    process()
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 597, in process
    serializer.dump_stream(out_iter, outfile)
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 271, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/util.py", line 107, in wrapper
    return f(*args, **kwargs)
  File "/usr/lib/spark/python/pyspark/sql/session.py", line 612, in prepare
    verify_func(obj)
  File "/usr/lib/spark/python/pyspark/sql/types.py", line 1408, in verify
    verify_value(obj)
  File "/usr/lib/spark/python/pyspark/sql/types.py", line 1395, in verify_struct
    raise TypeError(new_msg("StructType can not accept object %r in type %s"
TypeError: StructType can not accept object <generator object some_function at 0x7f69b43def90> in type <class 'generator'>

我遵循以下链接的建议:pySpark将mapPartitions的结果转换为spark DataFrame https://stackoverflow.com/questions/59262543/pyspark-convert-result-of-mappartitions-to-spark-dataframe


EDIT:Spark 3.0中还有一个mapInPandas函数应该更有效,因为不需要分组。

import pyspark.sql.functions as F

def pandas_function(iterator):
    for df in iterator:
        yield pd.concat(pd.DataFrame(x) for x in df['content'].map(eval))

transformed_df = respond_sdf.mapInPandas(pandas_function, "api string, A int, B int")
transformed_df.show()

另一种方式:使用pandas_udf and apply:

import pyspark.sql.functions as F

@F.pandas_udf("api string, A int, B int", F.PandasUDFType.GROUPED_MAP)
def pandas_function(url_json):
    df = pd.DataFrame(eval(url_json['content'][0]))
    return df

transformed_df = respond_sdf.groupBy(F.monotonically_increasing_id()).apply(pandas_function)
transformed_df.show()

+-----+---+---+
|  api|  A|  B|
+-----+---+---+
|api_2|  7| 10|
|api_2|  8| 11|
|api_2|  9| 12|
|api_1|  1|  4|
|api_1|  2|  5|
|api_1|  3|  6|
+-----+---+---+

旧答案(不太可扩展......):

def pandas_function(url_json):
    df = pd.DataFrame(eval(url_json))
    return df

transformed_df = spark.createDataFrame(pd.concat(respond_sdf.rdd.map(lambda r: pandas_function(r[0])).collect()))
transformed_df.show()
+-----+---+---+
|  api|  A|  B|
+-----+---+---+
|api_1|  1|  4|
|api_1|  2|  5|
|api_1|  3|  6|
|api_2|  7| 10|
|api_2|  8| 11|
|api_2|  9| 12|
+-----+---+---+
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如何生成 pandas 数据框行以触发数据框 的相关文章

随机推荐