我的应用程序是利用 MongoDB 作为平台构建的。
DB中的一个集合数据量很大,选择了apache Spark来检索并通过计算生成分析数据。
我已经配置了MongoDB 的 Spark 连接器 https://docs.mongodb.com/spark-connector/getting-started/与 MongoDB 通信。
我需要使用查询 MongoDB 集合pyspark并构建一个由 mongodb 查询结果集组成的数据框。
请建议我一个合适的解决方案。
您可以将数据直接加载到数据框中,如下所示:
# Create the dataframe
df = sqlContext.read.format("com.mongodb.spark.sql.DefaultSource").option("uri", "mongodb://127.0.0.1/mydb.mycoll").load()
# Filter the data via the api
df.filter(people.age > 30)
# Filter via sql
df.registerTempTable("people")
over_thirty = sqlContext.sql("SELECT name, age FROM people WHERE age > 30")
有关更多信息,请参阅 Mongo Spark 连接器Python API https://docs.mongodb.com/spark-connector/python-api/部分或简介.py https://github.com/mongodb/mongo-spark/blob/master/examples/src/test/python/introduction.py。 SQL 查询被转换并传回连接器,以便数据可以在发送到 Spark 集群之前在 MongoDB 中查询。
您也可以提供自己的聚合管道 https://docs.mongodb.com/manual/aggregation/#aggregation-pipeline在将结果返回到 Spark 之前应用于集合:
dfr = sqlContext.read.option("pipeline", "[{ $match: { name: { $exists: true } } }]")
df = dfr.option("uri", ...).format("com.mongodb.spark.sql.DefaultSource").load()
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)