Pyspark dataframe:如何按组应用 scipy.optimize 函数

2024-05-26

我有一段运行良好的代码,但使用 pandas 数据帧 groupby 处理。 但是,由于文件很大(> 7000 万组),我需要转换代码以使用 PYSPARK 数据框架。 这是使用 pandas dataframe 和小示例数据的原始代码:

import pandas as pd
import numpy as np
from scipy.optimize import minimize

df = pd.DataFrame({
'y0': np.random.randn(20),
'y1': np.random.randn(20),
'x0': np.random.randn(20), 
'x1': np.random.randn(20),
'grpVar': ['a', 'b'] * 10})

# Starting values
startVal = np.ones(2)*(1/2)

#Constraint  Sum of coefficients = 0
cons = ({'type':'eq', 'fun': lambda x: 1 - sum(x)})

# Bounds on coefficients
bnds = tuple([0,1] for x in startVal)

# Define a function to calculate sum of squared differences
def SumSqDif(a, df):
    return np.sum((df['y0'] - a[0]*df['x0'])**2 + (df['y1'] - a[1]*df['x1'])  **2)

# Define a function to call minimize function 
def RunMinimize(data, startVal, bnds, cons):
    ResultByGrp = minimize(SumSqDif, startVal, method='SLSQP',
    bounds=bnds, constraints = cons, args=(data))
return ResultByGrp.x

# Do the calculation by applyng the function by group:
# Create GroupBy object
grp_grpVar = df.groupby('grpVar')

Results = grp_grpVar.apply(RunMinimize, startVal=startVal, bnds=bnds, cons=cons))

现在我尝试使用 pySpark dataframe 为了测试代码,我将 pandas 数据帧转换为 pyspark 数据帧。

sdf = sqlContext.createDataFrame(df)
type(sdf)
#  <class 'pyspark.sql.dataframe.DataFrame'>

# Create GroupBy object
Sgrp_grpVar = sdf.groupby('grpVar')

# Redefine functions
def sSumSqDif(a, sdf):
    return np.sum((sdf['y0'] - a[0]*sdf['x0'])**2 + (sdf['y1'] - a[1]*sdf['x1'])**2)

def sRunMinimize(data=sdf, startVal=startVal, bnds=bnds, cons=cons):
    ResultByGrp = minimize(sSumSqDif, startVal, method='SLSQP',
                       bounds=bnds, constraints = cons, args=(data))
return ResultByGrp.x

from pyspark.sql.functions import UserDefinedFunction
from pyspark.sql.types import DoubleType
from pyspark.sql.types import StringType

udf = UserDefinedFunction(sRunMinimize , StringType())

Results = Sgrp_grpVar.agg(sRunMinimize()) 

但是,在我尝试定义用户定义函数 udf 后,出现以下错误 - 见下文。 非常感谢任何帮助纠正我的错误或建议替代方法的帮助。

udf = UserDefinedFunction(sRunMinimize , StringType()) 回溯(最近一次调用最后一次): 文件“”,第 1 行,位于 文件“/usr/hdp/current/spark2-client/python/pyspark/sql/functions.py”,第 1760 行,位于initself._judf = self._create_judf(名称).......


您正在尝试编写一个用户定义的聚合函数,这在 pyspark 中无法完成,请参阅https://stackoverflow.com/a/40030740 https://stackoverflow.com/a/40030740.

你可以写的是UDF以列表形式收集的每组内的数据:

首先进行设置:

import pandas as pd 
import numpy as np 
from scipy.optimize import minimize
import pyspark.sql.functions as psf
from pyspark.sql.types import *

df = pd.DataFrame({
    'y0': np.random.randn(20),
    'y1': np.random.randn(20),
    'x0': np.random.randn(20), 
    'x1': np.random.randn(20),
    'grpVar': ['a', 'b'] * 10})
sdf = sqlContext.createDataFrame(df)

# Starting values
startVal = np.ones(2)*(1/2)
#Constraint  Sum of coefficients = 0
cons = ({'type':'eq', 'fun': lambda x: 1 - sum(x)})
# Bounds on coefficients
bnds = tuple([0,1] for x in startVal)

我们将广播这些变量,因为我们需要在聚合数据帧的每一行上调用它们,它将把值复制到每个节点,这样它们就不必在驱动程序上获取它们:

sc.broadcast(startVal)
sc.broadcast(bnds)

让我们使用以下方法聚合数据collect_list,我们将更改周围数据的结构,以便我们只有一列(您可以将每一列收集到不同的列中,但随后您必须修改将数据传递给函数的方式):

Sgrp_grpVar = sdf\
    .groupby('grpVar')\
    .agg(psf.collect_list(psf.struct("y0", "y1", "x0", "x1")).alias("data"))
Sgrp_grpVar.printSchema()

    root
     |-- grpVar: string (nullable = true)
     |-- data: array (nullable = true)
     |    |-- element: struct (containsNull = true)
     |    |    |-- y0: double (nullable = true)
     |    |    |-- y1: double (nullable = true)
     |    |    |-- x0: double (nullable = true)
     |    |    |-- x1: double (nullable = true)

我们现在可以创建我们的UDF,返回的数据类型对于pyspark来说太复杂,numpy arrayspyspark 不支持,所以我们需要稍微改变一下:

def sSumSqDif(a, data):
    return np.sum(
        (data['y0'] - a[0]*data['x0'])**2 \
        + (data['y1'] - a[1]*data['x1'])**2)

def sRunMinimize(data, startVal=startVal, bnds=bnds, cons=cons):
    data = pd.DataFrame({k:v for k,v in zip(["y0", "y1", "x0", "x1"], data)})
    ResultByGrp = minimize(sSumSqDif, startVal, method='SLSQP',
                       bounds=bnds, constraints = cons, args=(data))
    return ResultByGrp.x.tolist()

sRunMinimize_udf = lambda startVal, bnds, cons: psf.udf(
    lambda data: sRunMinimize(data, startVal, bnds, cons), 
    ArrayType(DoubleType())
)

我们现在可以将此函数应用于每组中收集的数据:

Results = Sgrp_grpVar.select(
    "grpVar", 
    sRunMinimize_udf(startVal, bnds, cons)("data").alias("res")
)
Results.show(truncate=False)

    +------+-----------------------------------------+
    |grpVar|res                                      |
    +------+-----------------------------------------+
    |b     |[0.4073139282953772, 0.5926860717046227] |
    |a     |[0.8275186444565927, 0.17248135554340727]|
    +------+-----------------------------------------+

但我不认为 pyspark 是合适的工具。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Pyspark dataframe:如何按组应用 scipy.optimize 函数 的相关文章

随机推荐

  • 向 ASP.NET WebApi 2.2 添加身份验证

    我创建了一个 WebApi 2 2 项目 来自一个空的新 ASP NET 项目 来证明一些实现概念 现在我想向其中添加身份验证 我注意到在新的 WebApi 应用程序上添加身份验证的唯一方法是使用模板之一 在我的例子中是 VS 2013 是
  • 如何在Java中从控制台读取时设置默认输入?

    如何将任何单词 文本 添加到输入流 我想读取控制台输入并预填充该行 以便用户不必从头开始输入单词 String ANY WORD TEXT System out print Enter some magic here String valu
  • ValueError:以 10 为基数的 int() 的文字无效:

    当我尝试将一些数据插入 django 模型时 我收到此值错误 我的Python脚本是 from task employeeDetails models import EmployeeDetails def dumpdata userName
  • Netbeans 和 Maven:不同的编译行为

    我正在使用 Netbeans 和 Maven 项目 在某些情况下 Netbeans 显示编译失败 红色气球 但 Maven 编译所有内容都没有任何问题 造成这种差异的原因可能是什么 我已经检查过两者都使用相同的 JDK 版本 一个可能的原因
  • Node/Express 4.0 中可以声明全局变量吗

    我有多个需要访问数据库的路线 对于开发我使用本地数据库 显然生产我使用托管数据库 唯一的问题是每次我去推送版本时我都必须手动更改数据库链接 e g var mongodb require mongojs connect urlhere Co
  • 从车把调用 Javascript 函数

    如何从车把脚本内部调用 JavaScript 函数 原因 我没能打破 each 从车把内部 所以我需要将它传递给 JavaScript 来执行逻辑 你可以在助手的帮助下做到这一点 Handlebars registerHelper prin
  • 在产品页面上显示最近浏览过的产品

    magento 当前的默认功能是在类别页面的右侧显示最近查看的产品 现在我想在产品页面底部显示相同的内容 使用的 phtml 文件在位置命名为 frontend base default template reports product v
  • 禁用 com.android.systemui 是否安全?

    我发现 Android 最近的应用程序对话框可以通过禁用来禁用 包裹com android systemui 我想在信息亭模式下运行我的 已取得 root 权限的 设备 因此长按时不要显示最近的应用程序对话框至关重要 现在 到底是什么com
  • 聪明的。 C# 中的硬盘数据

    只是试图从我的应用程序将运行的任何计算机上连接的硬盘驱动器中获取一些智能信息 我将 WMI 用于程序中的许多其他内容 并且我查看过的有关 SMART 的每个问题都引用了 Win32 DiskDrive 然而 这里的数据确实非常少 而且可能不
  • PHP中如何检查输入类型按钮是否被按下?

    isset 函数可用于检查输入类型submit被按下 但是有没有办法检查输入类型按钮是否被按下 在我的代码中 按钮什么也不做 只是在 Onclick 事件上调用一个函数 然后刷新页面并在 PHP 中创建数据库条目 并且我希望它仅在按下按钮后
  • Mvc ViewBag - 无法将 null 转换为“bool”,因为它是不可为 null 的值类型

    我想在生成某个视图时在控制器中将 bool 设置为 true 然后相应地更改视图的标题 这应该非常简单 但我得到的是 无法对空引用执行运行时绑定异常详细信息 Microsoft CSharp RuntimeBinder RuntimeBin
  • 如何在 Vim 中创建行号和文本之间的边框

    我希望在行号右侧和文本左侧有一条细边框线 您可以使用不同的颜色来突出显示LineNr 例如 hi LineNr cterm bold ctermbg gray ctermfg black gui bold guibg gray guifg
  • 安卓。 2D游戏开发[关闭]

    Closed 这个问题需要多问focused help closed questions 目前不接受答案 我想开始为 Android 开发一款 2D 游戏 但我完全不知道从哪里开始 例如 愤怒的小鸟 游戏是如何开发的 我猜这是一个 2D 引
  • Highcharts 问题 - 在可缩放图表中显示标签

    我有一个缩放柱形图 xAxis 中有 200 多个类别 因此 当它处于初始状态 比例1 1 时 所有这些家伙都显示在X轴下方 即使我将它们垂直放置 也无法读取任何内容 我需要缩放图表以使标签可见 Here s screenshot of t
  • Python SQLAlchemy 用户身份验证失败

    我尝试使用 SQLAlchemy 连接 PostgreSQL 数据库 我创建了一个像这样的新角色 首先 我使用以下命令登录到 postgres 帐户 sudo i u postgres 接下来 发出命令 createuser interac
  • Cordova 构建 - 无法解析 com.android.tools.build:gradle:1.5.0

    升级我的机器 Ubuntu 15 10 的 cordova 和 npm 后 我无法使用 cordova 构建新项目 构建指责 graddle 中存在错误 我做了一些研究并找到了一些可能的解决方案 解决方案例如更改版本和 url graddl
  • checked="checked" 在 Chrome 中不起作用

  • 我的石墨中的 Logstash 指标在哪里?

    这可能是一个菜鸟问题 但我很难找到答案 所以我希望你们能在这里帮助我 我有一个running logstash实例将日志从一台服务器传送到另一台运行 Graphite 的服务器 这是我的输出配置 output stdout codec gt
  • 如何使用 Google App Engine 和 Python 创建 REST 服务?

    我想创建一个 RESTFUL Web 服务 通过访问的 URL 获取请求 然后为该客户端返回适当的文档 例如 如果它是一个天气应用程序 我想通过网络浏览器获取亚特兰大的天气 我会访问http weatherapp appspot com T
  • Pyspark dataframe:如何按组应用 scipy.optimize 函数

    我有一段运行良好的代码 但使用 pandas 数据帧 groupby 处理 但是 由于文件很大 gt 7000 万组 我需要转换代码以使用 PYSPARK 数据框架 这是使用 pandas dataframe 和小示例数据的原始代码 imp