我想问一下 Spark 中输入的可能性。我可以看到从http://spark.apache.org/docs/latest/programming-guide.html http://spark.apache.org/docs/latest/programming-guide.html,我可以使用sc.textFile()
用于将文本文件读取到 RDD,但我想在分发到 RDD 之前进行一些预处理,例如我的文件可能是 JSON 格式,例如{id:123, text:"...", value:6}
我只想使用 JSON 的某些字段进行进一步处理。
我的想法是是否有可能以某种方式使用 Python 生成器作为 SparkContext 的输入?
或者 Spark 中是否有一些更自然的方式如何处理自定义文件,而不是 Spark 的纯文本文件?
EDIT:
看来接受的答案应该有效,但它让我想到了更实际的以下问题Spark 和 Python 尝试使用 gensim 解析维基百科 https://stackoverflow.com/questions/26202978/spark-and-python-trying-to-parse-wikipedia-using-gensim
最快的方法可能是按原样加载文本文件并进行处理以在生成的 RDD 上选择所需的字段。这可以跨集群并行工作,并且比在单台机器上进行任何预处理更有效地扩展。
对于 JSON(甚至 XML),我认为您不需要自定义输入格式。由于 PySpark 在 Python 环境中执行,因此您可以使用 Python 中常用的函数来反序列化 JSON 并提取所需的字段。
例如:
import json
raw = sc.textFile("/path/to/file.json")
deserialized = raw.map(lambda x: json.loads(x))
desired_fields = deserialized.map(lambda x: x['key1'])
desired_fields
现在是下面所有值的 RDDkey1
在原始 JSON 文件中。
您可以使用此模式来提取字段的组合,通过空格或其他方式分割它们。
desired_fields = deserialized.map(lambda x: (x['key1'] + x['key2']).split(' '))
如果这变得太复杂,您可以替换lambda
使用常规的 Python 函数来完成您想要的所有预处理并只需调用deserialized.map(my_preprocessing_func)
.
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)