如何使用 MongoDB 中过滤的记录构建 Spark 数据框架?

2024-02-21

我的应用程序是利用 MongoDB 作为平台构建的。 DB中的一个集合数据量很大,选择了apache Spark来检索并通过计算生成分析数据。 我已经配置了MongoDB 的 Spark 连接器 https://docs.mongodb.com/spark-connector/getting-started/与 MongoDB 通信。 我需要使用查询 MongoDB 集合pyspark并构建一个由 mongodb 查询结果集组成的数据框。 请建议我一个合适的解决方案。


您可以将数据直接加载到数据框中,如下所示:

# Create the dataframe
df = sqlContext.read.format("com.mongodb.spark.sql.DefaultSource").option("uri", "mongodb://127.0.0.1/mydb.mycoll").load()

# Filter the data via the api
df.filter(people.age > 30)

# Filter via sql
df.registerTempTable("people")
over_thirty = sqlContext.sql("SELECT name, age FROM people WHERE age > 30")

有关更多信息,请参阅 Mongo Spark 连接器Python API https://docs.mongodb.com/spark-connector/python-api/部分或简介.py https://github.com/mongodb/mongo-spark/blob/master/examples/src/test/python/introduction.py。 SQL 查询被转换并传回连接器,以便数据可以在发送到 Spark 集群之前在 MongoDB 中查询。

您也可以提供自己的聚合管道 https://docs.mongodb.com/manual/aggregation/#aggregation-pipeline在将结果返回到 Spark 之前应用于集合:

dfr = sqlContext.read.option("pipeline", "[{ $match: { name: { $exists: true } } }]")
df = dfr.option("uri", ...).format("com.mongodb.spark.sql.DefaultSource").load()
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如何使用 MongoDB 中过滤的记录构建 Spark 数据框架? 的相关文章

随机推荐

  • 如何删除 ASP.NET 网站上的 cookie

    在我的网站中 当用户单击 注销 按钮时 Logout aspx 页面会加载代码Session Clear 在 ASP NET C 中 这是否会清除所有 cookie 或者是否需要添加任何其他代码来删除我网站的所有 cookie 尝试这样的事
  • 如何快速创建新行

    有没有办法像java中的 n 一样在swift中创建一个新行 var example String Hello World n This is a new line 你应该能够使用 n在 Swift 字符串中 它应该按预期工作 创建一个换行
  • gganimate:结合transition_layers和geom_smooth

    如何将geom smooth method lm 函数与gganimate 的transition layers 结合起来 以便当各个条向上漂移 增长时 geom smooth 的线性线出现 如下所示 geom smooth 线所需外观的示
  • 在Python中,f.readlines()和list(f)有什么区别

    来自两者Python2教程 https docs python org 2 tutorial inputoutput html methods of file objects and Python3教程 https docs python
  • 将列表<>、对象和原语混合传递到 ASP MVC 控制器操作的方法

    我对 C 相当陌生 正在创建我的第一个 MVC 项目 并且很难弄清楚将 3 个不同类型的参数传递给控制器 操作的方法 这是我的控制器方法 public ActionResult Create Notification notificatio
  • 将 Lambda 输出映射到 API Gateway 标头

    我正在尝试将 Lambda 函数的 JSON 输出映射到 API 网关调用的标头 作为此处的示例 我们可以考虑重定向场景 所以我想添加一个Location我的 API 网关调用的响应的标头 Lambda 函数的 JSON 输出 Locati
  • 当月最后一天?

    我必须得到本月的最后一天 我怎样才能得到 SQLite sqlite3 计算当月的最后一天 SELECT date now start of month 1 month 1 day 如果您使用 sqlite3 也请查看此链接sqlite 日
  • 在 Android Studio 中方法自动完成后禁用左括号

    I am using Android Studio based on IDEA not Eclipse and every time I select a method from the autocomplete popup with Ta
  • Swift 泛型和向上转型

    我有一个关于 Swift 泛型的快速问题 问题是我试图存储一个以泛型作为参数的变量 但无法将其转换为它所限制的类型 最好用一个简短的例子来解释 class Foo class Thing
  • Mean.JS 并添加外部依赖项

    我正在尝试将外部依赖项 ui codemirror 添加到我的 Mean JS 0 4 2 应用程序中 我的理解来自here https stackoverflow com questions 25832660 angularjs how
  • python 中的二进制数组

    如何在 python 中创建大数组 创建效率如何 在 C C 中 byte data byte memalloc 10000 or byte data new byte 10000 在蟒蛇 看看阵列模块 http docs python o
  • 什么是 setContentView(R.layout.main)?

    我知道这与应用程序布局有关 但我什么时候必须使用它 我试图寻找解释此方法的链接 但找不到 先感谢您 在 Android 中 视觉设计存储在 XML 文件中 每个文件Activity https developer android com g
  • TeamCity + MSTest - 仅运行失败的测试?

    有没有办法只运行 TeamCity 中先前测试运行中失败的测试 更好的是 有没有办法以某种方式只运行选定的测试 就像我们只想运行某些测试类一样 我们是否可以通过其他机制来实现这一目标 也许在后续测试运行中将某种配置文件传递给 MSTest
  • 用于任意和可变深度的嵌套列表的正确 C++ 类型?

    我正在尝试将一些代码从 Python 移植到 C Python代码有一个函数foo可以采用具有可变列表深度的嵌套整数列表 例如 这些是对 foo 的合法函数调用 foo foo 1 foo 1 2 3 4 5 6 7 8 9 10 对于可以
  • iOS 上的 UIGraphicsBeginImageContext 与 CGBitmapContextCreate

    这可能是一个非常愚蠢的问题 但是有人可以告诉我使用 UIGraphicsBeginImageContext 创建 CGContextRef 和使用 CGBitmapContextCreate 绘制图像之间的区别吗 特别是现在 由于 UIKi
  • 自定义类加载/覆盖 Android 原生类

    主要目标是用我自己的实现覆盖 Android 系统类 Activity View 等 http android developers blogspot com 2011 07 custom class loading in dalvik h
  • 如何从 C:\cygdrive\c\ 更改 git 路径

    如何将 git 路径从 C cygdrive c 更改为 C 或只是 cygdrive c 我已将 notepad 设置为 git 的编辑器 当我尝试执行交互式变基 git rebase i 时 记事本打开 但由于这个奇怪的路径 包括 C
  • 为什么 javascript 语音识别 api 在没有互联网的情况下无法工作?

    我正在使用 javascript 语音识别 api new webkitSpeechRecognition 我很惊讶为什么它在没有互联网的情况下无法工作 因为它是 javascript 代码 所以它应该可以离线工作 我检查了chrome开发
  • AMP 项目 - Google Analytics - 内容分组 amp-analytics 代码

    我正在尝试将内容分组变量分配给我的 AMP 标记中的 Google Analytics 代码 我的 AMP 分析代码
  • 如何使用 MongoDB 中过滤的记录构建 Spark 数据框架?

    我的应用程序是利用 MongoDB 作为平台构建的 DB中的一个集合数据量很大 选择了apache Spark来检索并通过计算生成分析数据 我已经配置了MongoDB 的 Spark 连接器 https docs mongodb com s