火花 >= 2.4.0
您可以使用内置 Avro 支持 https://spark.apache.org/docs/latest/sql-data-sources-avro.html。该 API 向后兼容spark-avro
包,添加了一些内容(最值得注意的是from_avro
/ to_avro
功能)。
请注意,该模块未与标准 Spark 二进制文件捆绑在一起,必须使用spark.jars.packages
或等效机制。
也可以看看Pyspark 2.4.0,使用读取流从 kafka 读取 avro - Python https://stackoverflow.com/q/54693110/10465355
火花
您可以使用spark-avro https://github.com/databricks/spark-avro图书馆。首先让我们创建一个示例数据集:
import avro.schema
from avro.datafile import DataFileReader, DataFileWriter
schema_string ='''{"namespace": "example.avro",
"type": "record",
"name": "KeyValue",
"fields": [
{"name": "key", "type": "string"},
{"name": "value", "type": ["int", "null"]}
]
}'''
schema = avro.schema.parse(schema_string)
with open("kv.avro", "w") as f, DataFileWriter(f, DatumWriter(), schema) as wrt:
wrt.append({"key": "foo", "value": -1})
wrt.append({"key": "bar", "value": 1})
阅读它使用spark-csv
就这么简单:
df = sqlContext.read.format("com.databricks.spark.avro").load("kv.avro")
df.show()
## +---+-----+
## |key|value|
## +---+-----+
## |foo| -1|
## |bar| 1|
## +---+-----+