通过 Hadoop 输入格式用于 pyspark 的 BigQuery 连接器示例

2023-11-27

我有一个大型数据集存储在 BigQuery 表中,我想将其加载到 pypark RDD 中以进行 ETL 数据处理。

我意识到 BigQuery 支持 Hadoop 输入/输出格式

https://cloud.google.com/hadoop/writing-with-bigquery-connector

并且 pyspark 应该能够使用此接口,以便通过使用方法“newAPIHadoopRDD”创建 RDD。

http://spark.apache.org/docs/latest/api/python/pyspark.html

不幸的是,两端的文档似乎很少,并且超出了我对 Hadoop/Spark/BigQuery 的了解。有没有人知道如何做到这一点?


谷歌现在有一个example了解如何将 BigQuery 连接器与 Spark 结合使用。

使用 GsonBigQueryInputFormat 似乎确实存在问题,但我得到了一个简单的莎士比亚字数统计示例

import json
import pyspark
sc = pyspark.SparkContext()

hadoopConf=sc._jsc.hadoopConfiguration()
hadoopConf.get("fs.gs.system.bucket")

conf = {"mapred.bq.project.id": "<project_id>", "mapred.bq.gcs.bucket": "<bucket>", "mapred.bq.input.project.id": "publicdata", "mapred.bq.input.dataset.id":"samples", "mapred.bq.input.table.id": "shakespeare"  }

tableData = sc.newAPIHadoopRDD("com.google.cloud.hadoop.io.bigquery.JsonTextBigQueryInputFormat", "org.apache.hadoop.io.LongWritable", "com.google.gson.JsonObject", conf=conf).map(lambda k: json.loads(k[1])).map(lambda x: (x["word"], int(x["word_count"]))).reduceByKey(lambda x,y: x+y)
print tableData.take(10)
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

通过 Hadoop 输入格式用于 pyspark 的 BigQuery 连接器示例 的相关文章

  • Antlr 解析器运算符优先级

    考虑以下语法 我对运算符优先级有疑问 例如 res 2 a b有一个类似的解析树res 2 a b 我知道问题出在哪里 但我没有想到没有相互左递归的 漂亮 解决方案 你能帮我一点忙吗 该语法与自定义访问者一起使用 grammar Math
  • 如何通过索引访问 JSON 对象中的字段

    我知道这不是最好的方法 但我别无选择 我必须通过索引访问 JSONObject 中的项目 访问对象的标准方法是只写this objectName or this objectName 我还找到了一种获取 json 对象内所有字段的方法 fo
  • MySQL 查询计算上个月

    我想计算上个月的订单总额 我收到了从当前日期获取当月数据的查询 SELECT SUM goods total AS Total Amount FROM orders WHERE order placed date gt date sub c
  • 没有输入的 jQuery 日期选择器

    我有一个相当复杂的网络应用程序 我想向其中添加一些日期选择 UI 我遇到的问题是我无法从文档中弄清楚如何真正控制日期选择器的出现方式和时间 不涉及任何表单元素 不 我不会添加秘密表单字段 因此简单的开箱即用方法根本行不通 我希望有人可以提供
  • PrimeFaces 对话框参考父级

    我有一个 xhtml 页面 显示带有条目的数据表 我还有一个用于插入新条目的按钮 该按钮显示一个包含表单的对话框 插入表格用作
  • 类型或命名空间“MyNamespace”不存在等

    我有通常的类型或命名空间名称不存在错误 除了我引用了程序集 using 语句没有显示为不正确 并且我引用的类是公共的 事实上 我在不同的解决方案中引用并使用相同的程序集来执行相同的操作 并且效果很好 顺便说一句 这是VS2010 有人有什么
  • Pandas 与 Numpy 数据帧

    看这几行代码 df2 df copy df2 1 df 1 df 1 values 1 df2 ix 0 0 我们的教练说我们需要使用 values属性来访问底层的 numpy 数组 否则我们的代码将无法工作 我知道 pandas Data
  • Mono 应用程序在非阻塞套接字发送时冻结

    我在 debian 9 上的 mono 下运行一个服务器应用程序 大约有 1000 2000 个客户端连接 并且应用程序经常冻结 CPU 使用率达到 100 我执行 kill QUIT pid 来获取线程堆栈转储 但它总是卡在这个位置
  • php 数组中出现意外的 json 输出结构

    我正在尝试转换动态数据 如何从 PHP 获取此 JSON JSON 122240cb 253c 4046 adcd ae81266709a6 item 0 3 这就是我所做的 但它不起作用 PHP json array 122240cb 2
  • Amazon RDS for SQL Server 是否支持 SSIS?

    从谷歌搜索中读到一些相互矛盾的答案 不确定答案是是 否还是可能 我觉得读的时候已经很清楚了this http docs aws amazon com AmazonRDS latest UserGuide CHAP SQLServer htm
  • NSArrayController 无需将大型数据集加载到数组中

    我想使用 NSArrayController 向 NSTableView 提供数据 我面临的问题是我不想将所有数据预先加载到数组中 然后使用数组控制器setContent 方法 我的数据模型是一个管理数百万条记录的大型现有代码库 它包含有效
  • 一种无需 JavaScript 即可在 PHP 中确定浏览器宽度的方法?

    首先有吗 或者我必须使用javascript 我希望能够更改使用的 CSS 因此 frex 我可以为移动设备或其他设备加载较小的字体 不幸的是 仅使用 PHP 无法检测用户分辨率 如果您使用 Javascript 则可以在 cookie 中
  • 如何在 Angular 4 中翻译 mat-paginator?

    你知道如何在 Angular 中翻译 每页项目 吗mat paginator标签 这mat paginator是材料设计中的一个元素 您可以使用MatPaginatorIntl为了这 威尔 豪厄尔制作 https github com an
  • 从 mvc 控制器使用 Web api 控制器操作

    我有两个控制器 一个mvc控制器和一个api控制器 它们都在同一个项目中 HomeController Controller DataController ApiController 如果我想从 HomeController 中使用 Dat
  • 如何修复:“无法解析类型 java.lang.CharSequence。它是从所需的 .class 文件间接引用的”消息? [复制]

    这个问题在这里已经有答案了 我正在尝试使用这个字符串 amountStr amountStr replace replace replace 但我收到一条错误消息 我知道我收到的错误消息是因为我刚刚发布的字符串已过时 所以我想知道该字符串的
  • 如何在 JFreeChart 中设置多个系列的线条粗细?

    我创建了很多图表 在他们每个人中我都需要打电话 renderer setSeriesStroke i new BasicStroke 2 0f 对于每个系列 renderer is chart getXYPlot getRenderer 我
  • 使用 WGL 创建现代 OpenGL 上下文?

    我正在尝试使用 Windows 函数创建 OpenGL 上下文 现代版本 基本上代码就是 创建窗口类 注册班级 创建一个窗口 choose PIXELFORMATDESCRIPTOR并设置它 创建旧版 OpenGL 上下文 使上下文成为当前
  • 禁用允许文本选择的

    残疾人可以吗
  • PyAudio ErrNo 输入溢出 -9981

    我遇到了与用户相同的错误 Python 使用 Pyaudio 以 16000Hz 录制音频时出错 https stackoverflow com questions 12994981 python error audio recording
  • 探查器模板可以迁移到较新版本的 SQL Profiler 吗?

    是否可以将 Profiler 模板迁移到较新版本的 SQL Server 就我而言 我想将 SQL 2008 模板带到 2012 年 我尝试过 1 直接文件复制和 2 导出 导入 在这两种情况下 旧模板都会运行 但无法修改 修改后会出现以下

随机推荐

  • 将矩导入 Angular 会出现错误

    我收到以下错误 Module node modules moment moment has no exported member default 当我使用 import as moment from moment import defaul
  • 如何在 SQLAlchemy 中加载 SQLite3 扩展?

    我构建了一个 SQLite 扩展 即 so 库 我想使用 SQLAlchemy 在我的应用程序中使用它 它是一个 Flask 应用程序 但我不认为 Flask 在这里发挥作用 该扩展可以从 CLI 加载并且似乎可以工作 sqlite3 SQ
  • P0522R0如何破码?

    今天我正在阅读 clang 的 C 17 支持页面 我注意到一些奇怪的事情 特点将模板模板参数与兼容的参数相匹配 P0522R0 被标记为部分 因为它必须通过开关激活 他们的笔记says 尽管是缺陷报告的解决方案 但该功能在所有语言版本中默
  • 取消设置字中的最高有效位 (int32) [C]

    如何取消设置一个字的最高有效位 例如 0x00556844 gt 0x00156844 有一个 builtin clz在 gcc 中 但它只计算零 这对我来说是不需要的 另外 我应该如何替换 msvc 或 intel c 编译器的 buil
  • SQL 查询获取结果集最后一行中所有列值的总和以及行总和(分组依据)

    有人可以帮我编写一个查询来获取 TCS 和 TRS 吗 ID Jan Feb Mar TRS 1 4 5 6 15 2 5 5 5 15 3 1 1 1 3 TCS 10 11 12 TCS 总列总和 和TRS 总行总和 分别是新的列和行
  • setTimeout 是使用 javascript 执行异步函数的好解决方案吗?

    在网上搜索有关异步函数的信息 我发现很多文章使用 setTimeout 来完成这项工作 window setTimeout function console log second 0 console log first Output fir
  • 绘制金字塔图

    我需要画一个金字塔图 如所附的 我找到了一个使用 R 但不是 ggplot 的示例here 任何人都可以给我一些使用 ggplot 执行此操作的提示吗 谢谢 我用了一些解决方法 而不是使用 geom bar 而是使用了 geom liner
  • Pandas read_csv 指定 AWS 配置文件

    熊猫 v1 0 5 使用s3fs用于连接 AWS S3 并读取数据的库 默认情况下 s3fs 使用在以下位置找到的凭据 aws credentials文件输入default轮廓 如何指定 pandas 在从 S3 读取 CSV 时应使用哪个
  • SQLAlchemy 重复键更新

    有没有一种优雅的方式来做INSERT ON DUPLICATE KEY UPDATE在 SQLAlchemy 中 我的意思是语法类似于inserter insert execute list of dictionaries ON DUPLI
  • 是否可以创建一个特征来回答类型是否来自 std?

    After 这个问题通过利用 ADL 我们可以创建一个特征来回答传递的类型是否来自我们的命名空间 include
  • Doxygen 分组

    我对 doxygen 中的页面和组有疑问 我有一个项目 我在其中使用 defgroup 和 ingroup cmd 对类等进行分组 到目前为止这工作正常 现在我想使用 Markdown 页面向项目添加特殊文档 这些页面应出现在专用模块 组
  • 在 Cocoa 中生成随机字母数字字符串

    我想调用一个方法 向其传递长度并让它生成一个随机字母数字字符串 是否有任何实用程序库可能具有大量此类功能 这是一个快速而肮脏的实现 没有经过测试 NSString letters abcdefghijklmnopqrstuvwxyzABCD
  • Maven、javadoc:包没有源文件

    我正在写一个专家package具有目录结构 frtex pom xml frtex src main java some files java frtex src main java utils some other files java
  • 对象转储器类

    我正在寻找一个可以以类似于以下格式输出对象及其所有叶值的类 User Name Gordon Age 60 WorkAddress Street 10 Downing Street Town London Country UK HomeAd
  • C# 阻止 Adob​​e Reader 窗口在尝试打印文档时出现

    由于我现在无法进入的原因 我需要在尝试打印文档时阻止 Adob e Reader 窗口打开 在我之前从事此工作的开发人员设置了以下标志 尽管我不太确定它们的用途 if RegistryManager GetAcrobatVersion gt
  • Angular 2从API下载PDF并在视图中显示

    我正在学习 Angular 2 Beta 我想知道如何从 API 下载 PDF 文件并将其显示在我的视图中 我尝试使用以下方式发出请求 var headers new Headers headers append Accept applic
  • 我无法在 Nuxt 3 中使用动态组件

    我尝试让它发挥作用
  • Gradle 构建错误:原因:org.gradle.api.internal.ExtensibleDynamicObject

    我正在尝试导入 https code google com p android serialport api 进入 Android Studio 由于这个项目涉及到ndk 所以我按照以下链接的说明构建NDK http tools andro
  • C# 中 VB 的 Asc() 和 Chr() 函数相当于什么?

    VB 有几个用于将 char 转换为 ASCII 值 反之亦然 的本机函数 Asc 和 Chr 现在我需要在 C 中获得等效的功能 最好的办法是什么 您始终可以添加对 Microsoft VisualBasic 的引用 然后使用完全相同的方
  • 通过 Hadoop 输入格式用于 pyspark 的 BigQuery 连接器示例

    我有一个大型数据集存储在 BigQuery 表中 我想将其加载到 pypark RDD 中以进行 ETL 数据处理 我意识到 BigQuery 支持 Hadoop 输入 输出格式 https cloud google com hadoop