正如您所猜测的,关键是要加载包,以便 PySpark 将在 Jupyter 的上下文中使用它。
使用常规导入启动您的笔记本:
import pandas as pd
from pyspark.sql import SparkSession
import os
在实例化会话之前,请执行以下操作:
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.databricks:spark-xml_2.12:0.12.0 pyspark-shell'
Notes:
- 包版本的第一部分必须与构建 Spark 所用的 Scala 版本相匹配 - 您可以通过从命令行执行 Spark-submit --version 来找到这一点。例如
$ spark-submit --version
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 3.0.2
/_/
Using Scala version 2.12.10, OpenJDK 64-Bit Server VM, 1.8.0_292
Branch HEAD
Compiled by user centos on 2021-02-16T06:09:22Z
Revision 648457905c4ea7d00e3d88048c63f360045f0714
Url https://gitbox.apache.org/repos/asf/spark.git
Type --help for more information.
软件包版本的第二部分必须是为给定版本的 Scala 提供的内容 - 您可以在这里找到:https://github.com/databricks/spark-xml https://github.com/databricks/spark-xml- 所以就我而言,由于我使用 Scala 2.12 构建了 Spark,所以我需要的包是 com.databricks:spark-xml_2.12:0.12.0
现在实例化您的会话:
# Creates a session on a local master
sparkSesh = SparkSession.builder.appName("XML_Import") \
.master("local[*]").getOrCreate()
找到一个简单的 .xml 文件,其结构您知道 - 在我的例子中,我使用了 nmap 输出的 XML 版本
thisXML = "simple.xml"
这样做的原因是您可以为下面的“rootTag”和“rowTag”提供适当的值:
someXSDF = sparkSesh.read.format('xml') \
.option('rootTag', 'nmaprun') \
.option('rowTag', 'host') \
.load(thisXML)
如果文件足够小,您可以执行 .toPandas() 来查看它:
someXSDF.toPandas()[["address", "ports"]][:5]
然后关闭会话。
sparkSesh.stop()
结束语:
- 如果你想在 Jupyter 之外测试这个,只需进入命令行并执行
pyspark --packages com.databricks:spark-xml_2.12:0.12.0
您应该会看到它在 PySpark shell 中正确加载
- 如果包版本与 scala 版本不匹配,您可能会收到以下错误:
"Exception: Java gateway process exited before sending its port number"
这是一个非常有趣的方式来解释包版本号是错误的
- 如果您为用于构建 Spark 的 Scala 版本加载了错误的包,那么当您尝试读取 XML 时,您可能会收到此错误:
py4j.protocol.Py4JJavaError: An error occurred while calling o43.load. : java.lang.NoClassDefFoundError: scala/Product$class
- 如果读取似乎有效,但您得到一个空数据框,则您可能指定了错误的根标签和/或行标签
- 如果您需要支持多种读取类型(假设您还需要能够读取同一笔记本中的 Avro 文件),您可以列出多个包,并用逗号(无空格)分隔它们,如下所示:
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.databricks:spark-xml_2.12:0.12.0,org.apache.spark:spark-avro_2.12:3.1.2 pyspark-shell'
- 我的版本信息:Python 3.6.9,Spark 3.0.2