Dataproc:使用 PySpark 从 BigQuery 读取和写入数据时出现错误

2024-04-28

我正在尝试读取一些 BigQuery 数据(ID:my-project.mydatabase.mytable[原始名称受保护])来自用户管理的 Jupyter Notebook 实例,内部Dataproc https://cloud.google.com/dataproc?hl=es工作台。我正在尝试的灵感来自于this https://cloud.google.com/dataproc-serverless/docs/guides/bigquery-connector-spark-example?hl=en#submit_a_pyspark_wordcount_batch_workload,更具体地说,代码是(请阅读关于代码本身的一些附加注释):

from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
from pyspark.sql.types import IntegerType, ArrayType, StringType
from google.cloud import bigquery

# UPDATE (2022-08-10): BQ conector added
spark = SparkSession.builder.appName('SpacyOverPySpark') \
                    .config('spark.jars.packages', 'com.google.cloud.spark:spark-bigquery-with-dependencies_2.12:0.24.2') \
                    .getOrCreate()

# ------------------ IMPORTING DATA FROM BIG QUERY --------------------------

# UPDATE (2022-08-10): This line now runs...
df = spark.read.format('bigquery').option('table', 'my-project.mydatabase.mytable').load()

# But imports the whole table, which could become expensive and not optimal
print("DataFrame shape: ", (df.count(), len(df.columns)) # 109M records & 9 columns; just need 1M records and one column: "posting"

# I tried the following, BUT with NO success:
# sql = """
# SELECT `posting`
# FROM `mentor-pilot-project.indeed.indeed-data-clean`
# LIMIT 1000000
# """
# df = spark.read.format("bigquery").load(sql)
# print("DataFrame shape: ", (df.count(), len(df.columns)))

# ------- CONTINGENCY PLAN: IMPORTING DATA FROM CLOUD STORAGE ---------------

# This section WORKS (just to enable the following sections)
# HINT: This dataframe contains 1M rows of text, under a single column: "posting"
df = spark.read.csv("gs://hidden_bucket/1M_samples.csv", header=True)

# ---------------------- EXAMPLE CUSTOM PROCESSING --------------------------

# Example Python UDF Python
def split_text(text:str) -> list:
    return text.split()

# Turning Python UDF into Spark UDF
textsplitUDF = udf(lambda z: split_text(z), ArrayType(StringType()))

# "Applying" a UDF on a Spark Dataframe (THIS WORKS OK)
df.withColumn("posting_split", textsplitUDF(col("posting")))

# ------------------ EXPORTING DATA TO BIG QUERY ----------------------------

# UPDATE (2022-08-10) The code causing the error:

# df.write.format('bigquery') \
#   .option('table', 'wordcount_dataset.wordcount_output') \
#   .save()

# has been replace by a code that successfully stores data in BQ:

df.write \
  .format('bigquery') \
  .option("temporaryGcsBucket", "my_temp_bucket_name") \
  .mode("overwrite") \
  .save("my-project.mynewdatabase.mytable")

使用 SQL 查询从 BigQuery 读取数据时,触发的错误为:

Py4JJavaError: An error occurred while calling o195.load.
: com.google.cloud.spark.bigquery.repackaged.com.google.inject.ProvisionException: Unable to provision, see the following errors:

1) Error in custom provider, java.lang.IllegalArgumentException: 'dataset' not parsed or provided.
  at com.google.cloud.spark.bigquery.SparkBigQueryConnectorModule.provideSparkBigQueryConfig(SparkBigQueryConnectorModule.java:65)
  while locating com.google.cloud.spark.bigquery.SparkBigQueryConfig

1 error
    at com.google.cloud.spark.bigquery.repackaged.com.google.inject.internal.InternalProvisionException.toProvisionException(InternalProvisionException.java:226)
    at com.google.cloud.spark.bigquery.repackaged.com.google.inject.internal.InjectorImpl$1.get(InjectorImpl.java:1097)
    at com.google.cloud.spark.bigquery.repackaged.com.google.inject.internal.InjectorImpl.getInstance(InjectorImpl.java:1131)
    at com.google.cloud.spark.bigquery.BigQueryRelationProvider.createRelationInternal(BigQueryRelationProvider.scala:75)
    at com.google.cloud.spark.bigquery.BigQueryRelationProvider.createRelation(BigQueryRelationProvider.scala:46)
    at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:332)
    at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:242)
    at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:230)
    at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:197)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:750)
Caused by: java.lang.IllegalArgumentException: 'dataset' not parsed or provided.
    at com.google.cloud.bigquery.connector.common.BigQueryUtil.lambda$parseTableId$2(BigQueryUtil.java:153)
    at java.util.Optional.orElseThrow(Optional.java:290)
    at com.google.cloud.bigquery.connector.common.BigQueryUtil.parseTableId(BigQueryUtil.java:153)
    at com.google.cloud.spark.bigquery.SparkBigQueryConfig.from(SparkBigQueryConfig.java:237)
    at com.google.cloud.spark.bigquery.SparkBigQueryConnectorModule.provideSparkBigQueryConfig(SparkBigQueryConnectorModule.java:67)
    at com.google.cloud.spark.bigquery.SparkBigQueryConnectorModule$$FastClassByGuice$$db983008.invoke(<generated>)
    at com.google.cloud.spark.bigquery.repackaged.com.google.inject.internal.ProviderMethod$FastClassProviderMethod.doProvision(ProviderMethod.java:264)
    at com.google.cloud.spark.bigquery.repackaged.com.google.inject.internal.ProviderMethod.doProvision(ProviderMethod.java:173)
    at com.google.cloud.spark.bigquery.repackaged.com.google.inject.internal.InternalProviderInstanceBindingImpl$CyclicFactory.provision(InternalProviderInstanceBindingImpl.java:185)
    at com.google.cloud.spark.bigquery.repackaged.com.google.inject.internal.InternalProviderInstanceBindingImpl$CyclicFactory.get(InternalProviderInstanceBindingImpl.java:162)
    at com.google.cloud.spark.bigquery.repackaged.com.google.inject.internal.ProviderToInternalFactoryAdapter.get(ProviderToInternalFactoryAdapter.java:40)
    at com.google.cloud.spark.bigquery.repackaged.com.google.inject.internal.SingletonScope$1.get(SingletonScope.java:168)
    at com.google.cloud.spark.bigquery.repackaged.com.google.inject.internal.InternalFactoryToProviderAdapter.get(InternalFactoryToProviderAdapter.java:39)
    at com.google.cloud.spark.bigquery.repackaged.com.google.inject.internal.InjectorImpl$1.get(InjectorImpl.java:1094)
    ... 18 more

向BigQuery写入数据时,出现错误:

Py4JJavaError: An error occurred while calling o167.save.
: java.lang.ClassNotFoundException: Failed to find data source: bigquery. Please find packages at http://spark.apache.org/third-party-projects.html

UPDATE:(2022-09-10) 向BigQuery写入数据时出错的问题已经解决,请参考上面的代码以及下面的评论部分。

我究竟做错了什么?


讨论中发现的要点:

  1. 通过以下方式将 BigQuery 连接器添加为依赖项spark.jars=<gcs-uri> or spark.jars.packages=com.google.cloud.spark:spark-bigquery-with-dependencies_<scala-version>:<version>.

  2. 指定正确的表名<project>.<dataset>.<table> format.

  3. 数据帧写入器的默认模式是errorifexists。当写入不存在的表时,数据集必须存在,该表将自动创建。写入现有表时,模式需要设置为"append" or "overwrite" in df.write.mode(<mode>)...save().

  4. 写入 BQ 表时,执行以下任一操作

    a) 直接写入(自支持)0.26.0 https://mvnrepository.com/artifact/com.google.cloud.spark/spark-bigquery-with-dependencies_2.12/0.26.0)

    df.write \
      .format("bigquery") \
      .option("writeMethod", "direct") \
      .save("dataset.table")
    

    b) 或间接写

    df.write \
      .format("bigquery") \
      .option("temporaryGcsBucket","some-bucket") \
      .save("dataset.table")
    

    看到这个doc https://github.com/GoogleCloudDataproc/spark-bigquery-connector#writing-data-to-bigquery.

  5. 通过 SQL 查询从 BigQuery 读取数据时,添加强制属性viewsEnabled=true and materializationDataset=<dataset>:

    spark.conf.set("viewsEnabled","true")
    spark.conf.set("materializationDataset","<dataset>")
    
    sql = """
      SELECT tag, COUNT(*) c
      FROM (
        SELECT SPLIT(tags, '|') tags
        FROM `bigquery-public-data.stackoverflow.posts_questions` a
        WHERE EXTRACT(YEAR FROM creation_date)>=2014
      ), UNNEST(tags) tag
      GROUP BY 1
      ORDER BY 2 DESC
      LIMIT 10
      """
    df = spark.read.format("bigquery").load(sql)
    df.show()
    

    看到这个doc https://github.com/GoogleCloudDataproc/spark-bigquery-connector#reading-data-from-a-bigquery-query.

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Dataproc:使用 PySpark 从 BigQuery 读取和写入数据时出现错误 的相关文章

  • 如何使用 cython 编译扩展?

    我正在尝试从示例页面编译一个简单的 cython 扩展here http docs cython org src userguide tutorial html在我安装了 Python 2 6 64 位版本的 Windows 7 64 位计
  • 在 Pandas 中按日期获取有效合约

    我在检测 pandas DataFrame 中的活动合约方面遇到了一些困难 假设每一行都是一个协商 对于每一行 我有两列 initial date 和 end date 我想知道的是按日期划分的活跃合约数量 到目前为止我做了一个非常低效的方
  • 在 Python 中使用 Selenium 处理“接受 Cookie”弹出窗口

    我一直在尝试用硒抓取这个房地产网站的一些信息 但是 当我访问该网站时 我需要接受 cookie 才能继续 这仅在机器人访问网站时发生 而不是在我手动执行时发生 当我尝试通过 xpath 或 id 查找相应的元素时 正如我在手动检查页面时找到
  • 为什么 .setGeometry() 不改变 QWidget 实例的大小?

    我想使用 QWidget 更改 QPushButton 的大小 setGeometry https doc qt io qtforpython 5 PySide2 QtWidgets QWidget html PySide2 QtWidge
  • 在 Numpy 中切片后确定结果数组的形状

    我很难理解在 numpy 中切片后如何确定结果数组的形状 例如 我使用以下简单代码 import numpy as np array np arange 27 reshape 3 3 3 slice1 array 1 2 1 slice2
  • 如何在Python中循环并存储自变量中的值

    我对 python 很陌生 所以这听起来可能很愚蠢 我进行了搜索 但没有找到解决方案 我在 python 中有一个名为 ExcRng 的函数 我可以对该函数执行什么样的 for 循环 以便将值存储在独立变量中 我不想将它们存储在列表中 而是
  • python - 是否可以扩展 xml-rpc 可以序列化的事物集?

    我看到几个问题询问如何发送numpy ndarray通过 xml rpc 调用 这不能开箱即用 因为正如 xml rpc 中所述docs https docs python org 2 library xmlrpclib html 有一组固
  • 从字符串到类型的词法转换

    最近 我尝试用Python存储和读取文件中的信息 遇到了一个小问题 我想从文本文件中读取类型信息 从 string 到 int 或 float 的类型转换非常有效 但从 string 到 type 的类型转换似乎是另一个问题 当然 我尝试了
  • 如何从 PyCharm 项目中获取我的“exe”[重复]

    这个问题在这里已经有答案了 通过 PyCharm 在 Python 上编写一些项目 我想从中获取一个exe文件 我尝试过 另存为 gt XXX exe 但是 当我尝试执行它时出现错误 此类操作系统不支持该文件 附注 我有win7 x64 它
  • 错误:permission_manager_qt.cpp(82) 不支持的权限类型:13

    我正在开发具有内置浏览器功能的 python 代码 PyQt 5 13 import sys from PyQt5 QtCore import from PyQt5 QtGui import from PyQt5 QtWidgets imp
  • 无法打开 Python。错误 0xc000007b

    我最近一直在学习 Python 3 我在我的上网本 32 位 Windows 7 上创建简单的小程序没有任何问题 当我将它安装在我的上网本上时 我没有遇到任何问题 但现在我已经开始使用它了 我想将它安装在我的台式机上 并且我有一个 我的桌面
  • 具有多个元素的数组的真值是二义性错误吗? Python

    from numpy import from pylab import from math import def TentMap a x if x gt 0 and x lt 0 5 return 2 a x elif x gt 0 5 a
  • 将输入发送到 python 子进程而不等待结果

    我正在尝试为一段代码编写一些基本测试 该代码通常通过 stdin 无休止地接受输入 直到给出特定的退出命令 我想检查程序是否在给出一些输入字符串时崩溃 经过一段时间来考虑处理 但似乎无法弄清楚如何发送数据而不是陷入等待我不知道的输出关心 我
  • Python - 如何查询定义方法的类?

    我的问题有点类似于this one https stackoverflow com questions 5520580 how do you get all classes defined in a module but not impor
  • 张量流:注册 numpy bfloat16 扩展

    正如我所见 tensorflow 中有 bfloat16 的 numpy 扩展 https github com tensorflow tensorflow blob 24ffe9f729160a095a5cab8f592392018280
  • Python组合目录中的所有csv文件并按日期时间排序

    我有 2 年的每日数据分成每月文件 我想将所有这些数据合并到一个按日期和时间排序的文件中 我正在使用的代码组合了所有文件 但不按顺序 我正在使用的代码 import pandas as pd import glob os import cs
  • Python 3.2 中 **kwargs 和 dict 有什么区别?

    看起来Python的很多方面都只是功能的重复 除了我在 Python 中的 kwargs 和 dict 中看到的冗余之外 还有什么区别吗 参数解包存在差异 许多人使用kwargs 并通过dict作为论据之一 使用参数解包 Prepare f
  • 全局变量是 None 而不是实例 - Python

    我正在处理Python 中的全局变量 代码应该可以正常工作 但是有一个问题 我必须使用全局变量作为类的实例Back 当我运行应用程序时 它说 back is None 这应该不是真的 因为第二行setup 功能 back Back Back
  • 在 Python 模块中使用 InstaLoader

    我正在尝试使用 Instaloader 下载与主题标签相关的照片以进行图像分析 我在GitHub存储库中找到了一个全面的方法 如何在终端中执行它 但是 我需要将脚本集成到Python笔记本中 这是脚本 instaloader no vide
  • 使用 Python 生成类似于 Messenger 或 kik 代码的圆形二维码

    我可以使用 Python 生成圆形 QR 码 就像 Facebook Messenger 或 kik 使用的那样吗 我访问了很多网站 但找不到这种类型的二维码 默认情况下 Python 生成方形 QR 码 但在我的项目中我想要圆形 QR 码

随机推荐