更改 Spark 数据框中列的可为空属性

2024-01-30

我正在手动创建一个数据框进行一些测试。创建它的代码是:

case class input(id:Long, var1:Int, var2:Int, var3:Double)
val inputDF = sqlCtx
  .createDataFrame(List(input(1110,0,1001,-10.00),
    input(1111,1,1001,10.00),
    input(1111,0,1002,10.00)))

所以架构看起来像这样:

root
 |-- id: long (nullable = false)
 |-- var1: integer (nullable = false)
 |-- var2: integer (nullable = false)
 |-- var3: double (nullable = false)

我想为这些变量中的每一个设置“nullable = true”。如何从一开始就声明它或在创建后将其切换到新的数据帧中?


Answer

随着进口

import org.apache.spark.sql.types.{StructField, StructType}
import org.apache.spark.sql.{DataFrame, SQLContext}
import org.apache.spark.{SparkConf, SparkContext}

您可以使用

/**
 * Set nullable property of column.
 * @param df source DataFrame
 * @param cn is the column name to change
 * @param nullable is the flag to set, such that the column is  either nullable or not
 */
def setNullableStateOfColumn( df: DataFrame, cn: String, nullable: Boolean) : DataFrame = {

  // get schema
  val schema = df.schema
  // modify [[StructField] with name `cn`
  val newSchema = StructType(schema.map {
    case StructField( c, t, _, m) if c.equals(cn) => StructField( c, t, nullable = nullable, m)
    case y: StructField => y
  })
  // apply new schema
  df.sqlContext.createDataFrame( df.rdd, newSchema )
}

直接地。

您还可以通过“pimp my library”库模式使该方法可用(请参阅我的 SO 帖子在 DataFrame 上定义自定义方法的最佳方法是什么? https://stackoverflow.com/questions/32585670/what-is-the-best-way-to-define-custom-methods-on-a-dataframe),这样你就可以打电话

val df = ....
val df2 = df.setNullableStateOfColumn( "id", true )

Edit

替代解决方案1

使用稍微修改过的版本setNullableStateOfColumn

def setNullableStateForAllColumns( df: DataFrame, nullable: Boolean) : DataFrame = {
  // get schema
  val schema = df.schema
  // modify [[StructField] with name `cn`
  val newSchema = StructType(schema.map {
    case StructField( c, t, _, m) ⇒ StructField( c, t, nullable = nullable, m)
  })
  // apply new schema
  df.sqlContext.createDataFrame( df.rdd, newSchema )
}

替代方案2

明确定义架构。 (使用反射创建更通用的解决方案)

configuredUnitTest("Stackoverflow.") { sparkContext =>

  case class Input(id:Long, var1:Int, var2:Int, var3:Double)

  val sqlContext = new SQLContext(sparkContext)
  import sqlContext.implicits._


  // use this to set the schema explicitly or
  // use refelection on the case class member to construct the schema
  val schema = StructType( Seq (
    StructField( "id", LongType, true),
    StructField( "var1", IntegerType, true),
    StructField( "var2", IntegerType, true),
    StructField( "var3", DoubleType, true)
  ))

  val is: List[Input] = List(
    Input(1110, 0, 1001,-10.00),
    Input(1111, 1, 1001, 10.00),
    Input(1111, 0, 1002, 10.00)
  )

  val rdd: RDD[Input] =  sparkContext.parallelize( is )
  val rowRDD: RDD[Row] = rdd.map( (i: Input) ⇒ Row(i.id, i.var1, i.var2, i.var3))
  val inputDF = sqlContext.createDataFrame( rowRDD, schema ) 

  inputDF.printSchema
  inputDF.show()
}
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

更改 Spark 数据框中列的可为空属性 的相关文章

  • Spark/Yarn:HDFS 上不存在文件

    我在 AWS 上设置了 Hadoop Yarn 集群 有 1 个主服务器和 3 个从服务器 我已经验证我有 3 个活动节点在端口 50070 和 8088 上运行 我在客户端部署模式下测试了 Spark 作业 一切正常 当我尝试使用 Spa
  • 如何使用 apply/unapply 方法重现案例类行为?

    我尝试用普通类和伴生对象替换案例类 但突然出现类型错误 编译良好的代码 综合示例 trait Elem A B def C other Elem C A Elem C B other match case Chain head tail g
  • 将 Scala 库转换为 DLL (.NET)

    我正在尝试从 scala 类创建一个 Dll 我将 IntelliJ 与 SBT 一起使用 我已经找到了一种使用 ikvm converter 将 jar 文件转换为 Dll 的方法 现在的问题是 当我在 SBT 下使用 package 从
  • 了解 Scala 中的中缀方法调用和缺点运算符(::)

    我对 Scala 编程语言相当陌生 当我遵循以下网站的讲义时 我正在尝试一些萦绕在我脑海中的东西 here http horstmann com sjsu cs152 04 closures1 html 我想我无法真正理解 cons 运算符
  • 使用原始类型模拟案例类

    考虑以下类型结构 trait HasId T def id T case class Entity id Long extends HasId Long 比方说 我们想在一些测试中模拟实体类 val entityMock mock Enti
  • 使用 Akka 玩 2.5 - 找不到参数超时的隐式值:akka.util.Timeout

    我正在尝试使用 Play 2 5 测试 Akka 但遇到了一个似乎无法解决的编译错误 我正在关注 Play 文档中的此页面 https playframework com documentation 2 5 x ScalaAkka http
  • 使用 scala 从 Spark 中的数组数组中的结构中提取值

    我正在使用 scala 将 json 数据读入 Spark 数据帧 架构如下 root metadata array nullable true element struct containsNull true playerId strin
  • 获取:导入 Spark 模块时出错:没有名为“pyspark.streaming.kafka”的模块

    我需要将从 pyspark 脚本创建的日志推送到 kafka 我正在做 POC 所以在 Windows 机器上使用 Kafka 二进制文件 我的版本是 kafka 2 4 0 spark 3 0 和 python 3 8 1 我正在使用 p
  • 对多列应用窗口函数

    我想执行窗口函数 具体为移动平均值 但针对数据帧的所有列 我可以这样做 from pyspark sql import SparkSession functions as func df df select func avg df col
  • 通用特征的隐式转换

    我正在实现一个数据结构 并希望用户能够使用任何类型作为密钥 只要他提供一个合适的密钥类型来包装它 我有这个关键类型的特质 这个想法是进行从基类型到键类型的隐式转换 反之亦然 实际上 只使用基类型 该特征看起来像这样 trait Key T
  • Scala 如何忽略 Java 的检查异常?

    例如如果调用 JavaThread sleep这会抛出一个已检查的InterruptedException来自 Scala 源文件 然后不需要将调用包含在 Scala 中try catch Scala 如何删除将调用包围在 a 中的规则tr
  • 在 Scala 和 SBT 中调试较长的编译时间

    在我的 Scala SBT 项目中 我有一个文件需要 5 分钟才能编译 所有其他的都可以在几秒钟内编译 这使得开发非常痛苦 我确信我滥用了一些 Scala 构造 但我不知道如何调试它 如何在 Scala 中调试较长的编译时间 我正在使用 S
  • 为什么《Scala 中的函数式编程》一书的“无异常处理错误”一章中没有提到“scala.util.Try”?

    在 Scala 中的函数式编程 一书中的 无异常处理错误 一章中 作者给出 从函数体抛出异常的问题 Use Option如果我们不关心实际的异常 Use Either如果我们关心实际的异常 But scala util Try没有提到 从我
  • fetchsize和batchsize对Spark的影响

    我想通过以下方式控制 RDB 的读写速度Spark直接 但标题已经透露的相关参数似乎不起作用 我可以得出这样的结论吗fetchsize and batchsize我的测试方法不起作用 或者它们确实会影响阅读和写作方面 因为测量结果基于规模是
  • 使用 scala 集合 - CanBuildFrom 麻烦

    我正在尝试编写一个接受任何类型集合的方法CC 并将其映射到一个新的集合 相同的集合类型但不同的元素类型 我正在挣扎 基本上我正在尝试实施map but 不在集合本身上 问题 我正在尝试实现一个带有签名的方法 它看起来有点像 def map
  • 懒惰背景下的变革与行动

    正如 Learning Spark 闪电般快速的大数据分析 一书中提到的 由于 Spark 计算 RDD 的方式不同 转换和操作也有所不同 在对惰性进行一些解释之后 我发现转换和操作都是惰性地进行的 那么问题来了 这句话的意思是什么 对比
  • 使用 scala 在 Flink 中进行实时流预测

    弗林克版本 1 2 0斯卡拉版本 2 11 8 我想使用 DataStream 来使用 scala 中的 flink 模型进行预测 我在使用 scala 的 flink 中有一个 DataStream String 其中包含来自 kafka
  • 如何使用 SparkR 1.6.0 写入 JDBC 源?

    使用 SparkR 1 6 0 我可以使用以下代码从 JDBC 源读取数据 jdbc url lt jdbc mysql localhost 3306 dashboard user
  • Spark KMeans 无法处理大数据吗?

    KMeans 有几个参数training http spark apache org docs latest api python pyspark mllib html highlight kmeans pyspark mllib clus
  • 在 Spark 中计算逻辑回归系数的标准误差

    我知道这个问题之前已经被问过here https stackoverflow com questions 37816701 calculating standard error of estimate wald chi square sta

随机推荐

  • 从存储为节点缓冲区的字节数组创建类型化数组

    从节点docs https nodejs org api buffer html buffer buf slice start end关于从缓冲区创建类型化数组 缓冲区的内存被解释为数组 而不是字节数组 那 是 new Uint32Arra
  • 后台进程重定向到 COPROC

    在下面的测试脚本中 我运行一个基本协进程 echo内置 在后台运行 附加其标准输出 bin bash TEST 1 coproc bin sleep 100 echo gt COPROC 1 该脚本总是失败 没有明显的原因 给出输出 tes
  • 如何确定 .Net DLL 是否用于 GUI 应用程序或 Web 服务?

    如何确定 Net DLL 是否在 Windows GUI 应用程序或 Web 服务中运行 我有一个低级类 在两个应用程序之间共享 并且需要在 Web 服务中使用它时禁用消息框 Windows 应用程序有超过 200 个解决方案 我无法重构现
  • 将 !important 应用于具有多个选项的字体系列

    如何将 important 应用于以下样式 font family Trebuchet MS Verdana Helvetica Sans Serif 我已经尝试过这个 但不起作用 font family Trebuchet MS Verd
  • Prolog 程序返回命题公式中的原子

    我是序言新手 正在尝试编写一个返回atoms在一个结构良好的命题公式中 例如查询ats and q imp or p q neg p As 应该返回 p q for As 下面是我的代码 它返回的公式为As 我不知道该怎么做才能拆分sing
  • IIS7 URL从根目录重定向到子目录[关闭]

    Closed 这个问题不符合堆栈溢出指南 help closed questions 目前不接受答案 我正在使用带有 IIS7 的 Windows Server 2008 我需要重定向访问的用户www mysite com to wwww
  • oracle中的触发器

    触发器可以增强或阻碍性能的条件是什么 何时使用系统中的触发器 何时不使用触发器 如何使用触发器来施加复杂的约束 执行触发器总是会产生一些开销 至少 您要为导致触发器触发的每一行执行从 SQL 引擎到 PL SQL 引擎的上下文转换 虽然触发
  • JavaFX ObservableList - 添加项目导致 ConcurrentModificationException

    我有一张桌子Albums用户可以过滤和排序 这是该表的样子 正如您所看到的 这些列是可排序的 并且顶部有一个文本框 当前正在过滤其中包含字符串 cu 的专辑 一切都很完美填充专辑列表后 但是 如果我在填充专辑列表时尝试排序或过滤 我会得到一
  • c++ 不合逻辑 >= 处理 vector.size() 时的比较很可能是由于 size_type 是无符号的

    在处理 vector size 又名 size type 时 我可以使用一些帮助来澄清这种奇怪的比较 vector
  • 如何更改 jqgrid 中弹出的列选择器中的列名称?

    我有一个两列标题 Phase1 和 Phase 2 图像 1 现在在列选择器窗口中显示列名称 图 2 Name 类别 子类别 类别 子类别 我想以不同的方式展示 Name 一等奖组 一期子类别 Ph2组 Ph2子类别 注意 根据我的要求不要
  • 何时在 Ruby 中使用“self”

    这个方法 def format stations and date from station titelize if from station respond to titleize to station titleize if to st
  • 使用 Windows 身份验证对单个操作而不是整个应用程序进行身份验证

    我想在某个计算机上使用 Windows 集成身份验证进行身份验证单控制器动作而不是全局应用程序 我在网上和StackOverflow上阅读了很多文章 但没有找到答案 请注意 我正在 Web API 2 0 中进行开发 而不是 MVC 也就是
  • XMLHttpRequest 上传带有参数的文件

    我想使用 Safari 5 1 的 XMLHTTRequest 上传文件并在 POST 请求中传递参数 这如何实现 它应该是纯 JavaScript 而不使用任何 API 我这样做是因为 Safari 不支持 5 1 版本中的 FileRe
  • 为什么 Python 异常被命名为“Error”?

    为什么 Python 异常被命名为 Error 例如ZeroDivisionError NameError TypeError 而不是 例外 例如ZeroDivisionException NameException TypeExcepti
  • Google 群组和订阅代码

    我对任何领域的编程都是新手 刚开始我有一个添加订阅按钮的任务 我拥有的 Google 群组邮件列表 一个网站 我没有 Google 应用引擎 要求 非常简单 网站中的订阅按钮可以自动将人们订阅到邮件列表 语言 PHP HTML 不要问我有关
  • ReplaceReducer 导致意外的键错误

    我有一个 React 应用程序 它动态加载模块 包括模块的减速器函数 然后调用 Redux 的replaceReducer 来替换减速器 不幸的是我收到了一个错误 在传递给 createStore 的初始状态参数中发现意外的键 bookEn
  • 如何在 SQLAlchemy 中使用通配符? [复制]

    这个问题在这里已经有答案了 我正在尝试使用 SQLAlchemy 对查询使用通配符 但我得到一个空列表 My code engine create engine os getenv DATABASE URL db scoped sessio
  • 如何使 IconButton 的突出显示颜色显示在父窗口小部件上?

    当我设置包含 IconButton 的容器的颜色时 我发现 IconButton 的突出显示颜色被容器的颜色隐藏了 这就是我的意思 如何保证蓝圈出现above红方块 这是我的代码 import dart ui import package
  • Google 通过图像脚本搜索本地图像[重复]

    这个问题在这里已经有答案了 我正在搜索一个脚本来查找与本地图像相似的图像 实际上 我在 stackoverflow 上搜索过类似的主题 但找不到任何解决方案或线索来解决我的问题 以下网址中的主题与我的问题类似 但它使用文本进行搜索 pyth
  • 更改 Spark 数据框中列的可为空属性

    我正在手动创建一个数据框进行一些测试 创建它的代码是 case class input id Long var1 Int var2 Int var3 Double val inputDF sqlCtx createDataFrame Lis