pyspark_自定义udf_解析json列【附代码】

2023-11-14

一、背景:

车联网数据有很多车的时序数据,现有一套云端算法需要对每一辆车历史数据进行计算得到结果,每日将全部车算一遍存到hive数仓中

二、调研方案:

1、python脚本运行,利用pyhive拉取数据到pandas进行处理,将结果to_parquet后用hdfs_client存到数仓中
问题:数据量上亿,对内存要求极大,无法直接拉取到python脚本所在的服务器内存中运算
2、将算法内容改写成SQL或者SPARKSQL,每日调度
问题:代码改写SQL要重新梳理代码逻辑,且很多函数SQL实现复杂,有些函数不支持

三、利用Pyspark + udf自定义函数实现大数据并行计算

整体流程

1、pyspark-spark sql拉取数据到spark df
2、spark df 按 车辆唯一标识分组,执行udf自定义函数(算法),每一个分组的返回值是String类型的json字符串,执行完成后返回的是result_df, spark_df【索引(车辆唯一标识)、数据(String类型的json字符串)】
3、解析json并拼接成spark_df
4、spark_df生成临时表,将临时表数据写入hive数仓

案例代码运行结果:

案例代码运行结果

案例代码:

代码地址:

https://github.com/SeafyLiang/Python_study/blob/master/pyspark_demo/pyspark_udf_json.py

代码

from pyspark.sql import SparkSession  # SparkConf、SparkContext 和 SQLContext 都已经被封装在 SparkSession
from pyspark.sql import functions as F
import pandas as pd
from pyspark.sql import types as T  # spark df的数据类型
from pyspark.sql.functions import array, from_json, col, explode
import sys


def get_auc(id, date, vol):
    temp_df = pd.DataFrame({
        'id': id,
        'date': date,
        'vol': vol
    })
    temp_df['date'] = temp_df['date'].apply(lambda x: x + 'aaa')
    temp_df_json = temp_df.to_json(orient='records')  # orient='records'是关键,可以把json转成array<json>
    return temp_df_json


if __name__ == '__main__':
    spark = SparkSession.builder.appName('test_sklearn_pyspark') \
        .config("spark.sql.warehouse.dir", "hdfs://nameservice1/user/hive/warehouse") \
        .config("hive.exec.dynamici.partition", True) \
        .config("hive.exec.dynamic.partition.mode", "nonstrict") \
        .config("spark.sql.crossJoin.enabled", "true"). \
        config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
        .enableHiveSupport() \
        .getOrCreate()
    print(spark)

    temp_dict = {
        'id': [1, 2, 3, 4, 1, 1],
        'date': ['2022-05-01', '2022-05-02', '2022-05-03', '2022-05-04', '2022-05-05', '2022-05-05'],
        'vol': [68.22, 45.10, 899.33, 45.11, 32.22, 99.33]
    }
    tempdf = pd.DataFrame(temp_dict)
    df = spark.createDataFrame(tempdf)

    # 自定义函数(计算AUC),并且变成UDF
    """注意:自定义函数的重点在于定义返回值的数据类型,这个返回值的数据类型必须与该函数return值的数据类型一致,否则会报错。
    该例子中,该函数return的值auc,是string类型,在将该函数定义成udf的时候,指定的返回值类型,也必须是string!!"""

    get_auc_udfs = F.udf(get_auc, returnType=T.StringType())  # 定义成udf,并且此udf的返回值类型为string

    # 分组聚合操作:分别计算每月样本量、逾期率、AUC
    """使用上面定义的UDF,结合F.collect_list(col)来实现UDAF的功能。
    F.collect_lits(col)的作用是将列col的值变成一个list返回."""

    df_result = df.groupby('id').agg(get_auc_udfs(
        F.collect_list(F.col('id').cast('int')),
        F.collect_list(F.col('date').cast('string')),
        F.collect_list(F.col('vol').cast('double'))
    ).alias('json_str'))  # 利用自定的UDF,实现指定聚合计算

    df_result.show(truncate=False)

    opn = 2
    if opn == 1:
        # 【不推荐】方式一:spark_df转成pandas_df,拼接json成pandas_all_df后再转成spark_df写入
        # 数据量大时会把大量数据拉到driver本地,导致内存溢出
        all_result_df = pd.DataFrame()
        df_result_pandas = df_result.toPandas()
        for row in df_result_pandas.itertuples():
            print(row.json_str)
            temp_df = pd.read_json(row.json_str)
            all_result_df = pd.concat([all_result_df, temp_df], ignore_index=True)
        print(all_result_df)
    elif opn == 2:
        # 【推荐】方式二:解析json成新的spark_df
        json_schema = T.ArrayType(
            T.StructType().add("id", T.IntegerType()).add("date", T.StringType()).add("vol", T.DoubleType()))
        df_result = df_result.withColumn('parsed_json', from_json(col('json_str'), json_schema))
        df_result.show()
        df_result.select('parsed_json').show(3, truncate=False)
        df_result = df_result.select(explode(col('parsed_json')).alias('parsed_json_explode'))
        df_result.show()
        df_result = df_result.select(col('parsed_json_explode.id').alias('id'),
                                     col('parsed_json_explode.date').alias('date'),
                                     col('parsed_json_explode.vol').alias('vol'))
        df_result.show()
        print('df_result:', df_result.count())
        # 写入hive表
        # dt_before1day = sys.argv[1]
        # print('dt_before1day:', dt_before1day)
        # # df 转为临时表/临时视图
        # df_result.createOrReplaceTempView("df_tmp_view")
        # # spark.sql 插入hive
        # spark.sql("""
        #         insert overwrite table table_name partition(dt='{DT}')
        #         select
        #         *
        #         from df_tmp_view
        #         """.format(DT=dt_before1day))
        # print('spark write end!')

    print('end')
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

pyspark_自定义udf_解析json列【附代码】 的相关文章

  • 如何查看Databricks中的所有数据库和表

    我想列出 Azure Databricks 中每个数据库中的所有表 所以我希望输出看起来像这样 Database Table name Database1 Table 1 Database1 Table 2 Database1 Table
  • 使用 pythonbrew 编译 Python 3.2 和 2.7 时出现问题

    我正在尝试使用构建多个版本的 python蟒蛇酿造 http pypi python org pypi pythonbrew 0 7 3 但我遇到了一些测试失败 这是在运行的虚拟机上 Ubuntu 8 04 32 位 当我使用时会发生这种情
  • 使用 psycopg2 在 python 中执行查询时出现“编程错误:语法错误位于或附近”

    我正在运行 Python v 2 7 和 psycopg2 v 2 5 我有一个 postgresql 数据库函数 它将 SQL 查询作为文本字段返回 我使用以下代码来调用该函数并从文本字段中提取查询 cur2 execute SELECT
  • OpenCV Python cv2.mixChannels()

    我试图将其从 C 转换为 Python 但它给出了不同的色调结果 In C Transform it to HSV cvtColor src hsv CV BGR2HSV Use only the Hue value hue create
  • 如何在flask中使用g.user全局

    据我了解 Flask 中的 g 变量 它应该为我提供一个全局位置来存储数据 例如登录后保存当前用户 它是否正确 我希望我的导航在登录后在整个网站上显示我的用户名 我的观点包含 from Flask import g among other
  • 为 Anaconda Python 安装 psycopg2

    我有 Anaconda Python 3 4 但是每当我运行旧代码时 我都会通过输入 source activate python2 切换到 Anaconda Python 2 7 我的问题是我为 Anaconda Python 3 4 安
  • 通过最小元素比较对 5 个元素进行排序

    我必须在 python 中使用元素之间的最小比较次数来建模对 5 个元素的列表进行排序的执行计划 除此之外 复杂性是无关紧要的 结果是一个对的列表 表示在另一时间对列表进行排序所需的比较 我知道有一种算法可以通过 7 次比较 总是在元素之间
  • Django:按钮链接

    我是一名 Django 新手用户 尝试创建一个按钮 单击该按钮会链接到我网站中的另一个页面 我尝试了一些不同的例子 但似乎没有一个对我有用 举个例子 为什么这不起作用
  • Python - StatsModels、OLS 置信区间

    在 Statsmodels 中 我可以使用以下方法拟合我的模型 import statsmodels api as sm X np array 22000 13400 47600 7400 12000 32000 28000 31000 6
  • 如何替换 pandas 数据框列中的重音符号

    我有一个数据框dataSwiss其中包含瑞士城市的信息 我想用普通字母替换带有重音符号的字母 这就是我正在做的 dataSwiss Municipality dataSwiss Municipality str encode utf 8 d
  • 以编程方式停止Python脚本的执行? [复制]

    这个问题在这里已经有答案了 是否可以使用命令在任意行停止执行 python 脚本 Like some code quit quit at this point some more code that s not executed sys e
  • Python 函数可以从作用域之外赋予新属性吗?

    我不知道你可以这样做 def tom print tom s locals locals def dick z print z name z name z guest Harry print z guest z guest print di
  • Pygame:有没有简单的方法可以找到按下的任何字母数字的字母/数字?

    我目前正在开发的游戏需要让人们以自己的名义在高分板上计时 我对如何处理按键有点熟悉 但我只处理过寻找特定的按键 有没有一种简单的方法可以按下任意键的字母 而不必执行以下操作 for event in pygame event get if
  • 使用 \r 并打印一些文本后如何清除控制台中的一行?

    对于我当前的项目 有一些代码很慢并且我无法使其更快 为了获得一些关于已完成 必须完成多少的反馈 我创建了一个进度片段 您可以在下面看到 当你看到最后一行时 sys stdout write r100 80 n I use 80覆盖最终剩余的
  • Fabric env.roledefs 未按预期运行

    On the 面料网站 http docs fabfile org en 1 10 usage execution html 给出这个例子 from fabric api import env env roledefs web hosts
  • 对年龄列进行分组/分类

    我有一个数据框说df有一个柱子 Ages gt gt gt df Age 0 22 1 38 2 26 3 35 4 35 5 1 6 54 我想对这个年龄段进行分组并创建一个像这样的新专栏 If age gt 0 age lt 2 the
  • 从列表指向字典变量

    假设你有一个清单 a 3 4 1 我想用这些信息来指向字典 b 3 4 1 现在 我需要的是一个常规 看到该值后 在 b 的位置内读写一个值 我不喜欢复制变量 我想直接改变变量b的内容 假设b是一个嵌套字典 你可以这样做 reduce di
  • Python 类继承 - 诡异的动作

    我观察到类继承有一个奇怪的效果 对于我正在处理的项目 我正在创建一个类来充当另一个模块的类的包装器 我正在使用第 3 方 aeidon 模块 用于操作字幕文件 但问题可能不太具体 以下是您通常如何使用该模块 project aeidon P
  • 导入错误:没有名为 site 的模块 - mac

    我已经有这个问题几个月了 每次我想获取一个新的 python 包并使用它时 我都会在终端中收到此错误 ImportError No module named site 我不知道为什么会出现这个错误 实际上 我无法使用任何新软件包 因为每次我
  • Python Selenium:如何在文本文件中打印网站上的值?

    我正在尝试编写一个脚本 该脚本将从 tulsaspca org 网站获取以下 6 个值并将其打印在 txt 文件中 最终输出应该是 905 4896 7105 23194 1004 42000 放置的动物 的 HTML span class

随机推荐

  • 原生HTML跳转页面传递和接收参数方法

    传递参数 window location href order html info info 2 接受参数 在window nl ad function 里写下面的代码 var url location search 获取url中 符后的字
  • 过压保护电路(OVP)

    作者 AirCity 2020 2 4 Aircity007 sina com 本文所有权归作者Aircity所有 1 示例一 当VBUS 5V时 各点电压电流如图所示 Q1接近截止 输出信号是5V 当VBUS 5 4V开始 输出信号开始下
  • 《项目管理基础》学考笔记

    控制的主要成分是信息 每个人都应该进行状态和流程总结 条形图尾巴长短表示浮动时间 改善流程是每个项目经理的职责 项目控制分2方面 维持 改善质量 项目总结会议包括3类 设计 流程或经验教训 状态总结 计划时间不应超过4到6周 箭线图对于分析
  • 【平衡小车制作】(一)硬件原理图讲解(超详解)

    大家好 我是小政 之后的一系列文章我将介绍我玩平衡小车的过程以及遇到的一些问题 将这些内容记录下来分享给大家 也让大家少走一些弯路 接下来我将从硬件框架选择 软件编程 PID算法 PID调参 蓝牙遥控这五个部分向大家讲解平衡小车的制作过程
  • java 二进制转换为十进制_二进制转换十进制 算法解析

    java里面是有进制间互换现成的方法的 public class十进制与各进制的相互转换 public static voidmain String args java已经实现的机制 十进制转换为二进制 int decimal 10 Sys
  • php截取百度搜索结果

    简单的通过file URL 获取远程网页数据 用implode 函数把数组合并成string 再根据自己的需要 对string进行截取 过滤等个性化处理 基于此思想 可以进一步拓展 估计就是采集器的雏形了
  • 【华为OD机试真题2023 JS】统一限载货物数最小值

    华为OD机试真题 2023年度机试题库全覆盖 刷题指南点这里 统一限载货物数最小值 知识点二分查找 时间限制 1s 空间限制 64MB 限定语言 不限 题目描述 火车站附近的货物中转站负责将到站货物运往仓库 小明在中转站负责调度2K辆中转车
  • JeeSite快速开发平台 JNPF快速开发平台3.4.6版本 框架源码部署文档入门说明

    JeeSite快速开发平台 JeeSite 快速开发平台 不仅仅是一个后台开发框架 它是一个企业级快速开发解决方案 后端基于经典组合 Spring Boot Shiro MyBatis 前端采用 Beetl Bootstrap AdminL
  • x264编码h264

    提示 文章写完后 目录可以自动生成 如何生成可参考右边的帮助文档 文章目录 前言 一 x264介绍 二 x264中主要的编码接口以及主要数据结构介绍 1 void x264 param default x264 param t 2 int
  • 发现一个好用的层级多项目管理工具

    市面上项目管理工具蛮多的 但大多仅支持单层多项目管理 而我们公司有多条产品线 如果没有层级组织用于分类 使用起来就非常麻烦 最近 我们试用了下Topo项目管理软件 它可以根据我们的组织架构进行层级搭建 实际使用效果不错 看图 进系统 进设置
  • 【Linux】网络编程套接字(下)

    Linux 博客主页 一起去看日落吗 分享博主的在Linux中学习到的知识和遇到的问题 博主的能力有限 出现错误希望大家不吝赐教 分享给大家一句我很喜欢的话 看似不起波澜的日复一日 一定会在某一天让你看见坚持的意义 祝我们都能在鸡零狗碎里找
  • 第八天字符串

    344 反转字符串 力扣题目链接 opens new window 编写一个函数 其作用是将输入的字符串反转过来 输入字符串以字符数组 char 的形式给出 不要给另外的数组分配额外的空间 你必须原地修改输入数组 使用 O 1 的额外空间解
  • 基于单片机超声波测距语音播放

    一 系统方案 本设计采用52单片机作为主控器 HC SR04测距 液晶1602显示 按键设置报警阀值 语音报警 二 硬件设计 原理图如下 三 单片机软件设计 1 首先是系统初始化 uint dist 保存超声波模块测量到的结果 Trig P
  • pandas 数据导出

    1 导出到csv文件 1 1 DataFrame数据导出 index 0 忽略索引 header 0 忽略表头 mode a 可追加 df to csv data output path index 0 header 0 sep t flo
  • 循环控制结构小题1

    include
  • mapbox-gl支持多种坐标系

    文章目录 前言 效果 总结 前言 mapbox默认的投影是3857 但是实际应用中我们经常会使用高德 百度 天地图的服务 原生mapbox是不支持的 需要我们修改源码以支持以上坐标系 参考 支持百度 高德坐标系 mapboxgl 纠偏百度地
  • vue 项目中 zip 压缩包文件下载

    vue 项目中 zip 压缩包文件下载 参考文章 胡新fa 文件下载流程 参考文章 Mr 裴 压缩包下载打不开问题 參考文章 sqwu 注意 一定要在接口中配置 responseType blob 该属性 headers 根据需求添加 re
  • URL 地址栏锚点 window location hash 使用方法

    location是javascript里边管理地址栏的内置对象 比如location href就管理页面的url 用location href url就可以直接将页面重定向url 本文转自米扑博客 URL 地址栏锚点 window loca
  • ULN2003芯片控制直流电机学习

    ULN2003 双极型线性集成电路 达林顿晶体管阵列 ULN2003是一个单片高电压 高电流的达林顿晶体管阵列集成 电路 它是由7对NPN达林顿管组成的 它的高电压输出特性和阴 极箝位二极管可以转换感应负载 单个达林顿对的集电极电流是 50
  • pyspark_自定义udf_解析json列【附代码】

    pyspark 自定义udf 解析json列 附代码 一 背景 二 调研方案 三 利用Pyspark udf自定义函数实现大数据并行计算 整体流程 案例代码运行结果 案例代码 代码地址 代码 一 背景 车联网数据有很多车的时序数据 现有一套