如何在 Palantir Foundry 中解析 xml 文档?

2023-12-03

我有一套.xml我想要解析的文档。

我以前曾尝试使用获取文件内容并将它们转储到单个单元格中的方法来解析它们,但是我注意到这在实践中不起作用,因为我看到运行时间越来越慢,通常只有一项任务需要执行运行数十小时:

我的第一个变换是.xml内容并将其放入单个单元格中,第二个转换采用该字符串并使用 Pythonxml将字符串解析为文档的库。然后我可以从该文档中提取属性并返回 DataFrame。

我正在使用一个UDF执行将字符串内容映射到我想要的字段的过程。

我怎样才能让这个更快/更好地处理大型.xml files?


对于这个问题,我们将结合几种不同的技术来使该代码既可测试又具有高度可扩展性。

Theory

解析原始文件时,您可以考虑以下几个选项:

  1. ❌ You can write your own parser to read bytes from files and convert them into data Spark can understand.
    • 由于工程时间和不可扩展的架构,尽可能不鼓励这样做。当您执行此操作时,它不会利用分布式计算,因为您必须将整个原始文件引入解析方法才能使用它。这不是对您资源的有效利用。
  2. ⚠ You can use your own parser library not made for Spark, such as the XML Python library mentioned in the question
    • 虽然这比编写自己的解析器更容易实现,但它仍然没有利用 Spark 中的分布式计算。让某些东西运行起来更容易,但最终会达到性能极限,因为它没有利用仅在编写 Spark 库时公开的低级 Spark 功能。
  3. ✅ You can use a Spark-native raw file parser
    • 在所有情况下,这都是首选选项,因为它利用低级 Spark 功能,并且不需要您编写自己的代码。如果存在低级 Spark 解析器,则应该使用它。

在我们的例子中,我们可以使用 Databricks 解析器来达到很好的效果。

一般来说,您还应该避免使用.udf方法,因为它可能正在使用,而不是 Spark API 中已有的良好功能。 UDF 的性能不如本机方法,仅应在没有其他选项可用时使用。

UDF 掩盖隐藏问题的一个很好的例子是列内容的字符串操作;当你在技术上can使用 UDF 来做诸如分割和修剪字符串之类的事情,这些事情已经存在于火花API并且将比您自己的代码快几个数量级。

Design

我们的设计将使用以下内容:

  1. 通过以下方式完成低级 Spark 优化文件解析Databricks XML 解析器
  2. 测试驱动的原始文件解析如所解释的here

连接解析器

首先,我们需要添加.jar to our spark_session在 Transforms 中可用。由于最近的改进,此参数在配置后将允许您使用.jar在预览/测试和完整构建时。以前,这需要完整的构建,但现在不需要了。

我们需要去我们的transforms-python/build.gradle文件并添加 2 个配置块:

  1. 启用pytest plugin
  2. 启用condaJars论证并声明.jar依赖性

My /transforms-python/build.gradle现在看起来如下所示:

buildscript {
    repositories {
       // some other things
    }

    dependencies {
        classpath "com.palantir.transforms.python:lang-python-gradle-plugin:${transformsLangPythonPluginVersion}"
    }
}

apply plugin: 'com.palantir.transforms.lang.python'
apply plugin: 'com.palantir.transforms.lang.python-defaults'

dependencies {
    condaJars "com.databricks:spark-xml_2.13:0.14.0"
}

// Apply the testing plugin
apply plugin: 'com.palantir.transforms.lang.pytest-defaults'

// ... some other awesome features you should enable

应用此配置后,您需要通过单击底部功能区并点击来重新启动代码辅助会话Refresh

Refresh

刷新代码辅助后,我们现在可以使用低级功能来解析我们的代码.xml文件,现在我们需要测试它!

测试解析器

如果我们采用与以下相同的测试驱动开发风格here,我们最终得到/transforms-python/src/myproject/datasets/xml_parse_transform.py包含以下内容:

from transforms.api import transform, Output, Input
from transforms.verbs.dataframes import union_many


def read_files(spark_session, paths):
    parsed_dfs = []
    for file_name in paths:
        parsed_df = spark_session.read.format('xml').options(rowTag="tag").load(file_name)
        parsed_dfs += [parsed_df]
    output_df = union_many(*parsed_dfs, how="wide")
    return output_df


@transform(
    the_output=Output("my.awesome.output"),
    the_input=Input("my.awesome.input"),
)
def my_compute_function(the_input, the_output, ctx):
    session = ctx.spark_session
    input_filesystem = the_input.filesystem()
    hadoop_path = input_filesystem.hadoop_path
    files = [hadoop_path + "/" + file_name.path for file_name in input_filesystem.ls()]
    output_df = read_files(session, files)
    the_output.write_dataframe(output_df)

...示例文件/transforms-python/test/myproject/datasets/sample.xml内容:

<tag>
<field1>
my_value
</field1>
</tag>

和一个测试文件/transforms-python/test/myproject/datasets/test_xml_parse_transform.py:

from myproject.datasets import xml_parse_transform
from pkg_resources import resource_filename


def test_parse_xml(spark_session):
    file_path = resource_filename(__name__, "sample.xml")
    parsed_df = xml_parse_transform.read_files(spark_session, [file_path])
    assert parsed_df.count() == 1
    assert set(parsed_df.columns) == {"field1"}

我们现在有:

  1. 分布式计算、低级.xml高度可扩展的解析器
  2. 测试驱动的设置,我们可以快速迭代以获得正确的功能

Cheers

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如何在 Palantir Foundry 中解析 xml 文档? 的相关文章

随机推荐