我有一个大型数据集存储在 BigQuery 表中,我想将其加载到 pypark RDD 中以进行 ETL 数据处理。
我意识到 BigQuery 支持 Hadoop 输入/输出格式
https://cloud.google.com/hadoop/writing-with-bigquery-connector
并且 pyspark 应该能够使用此接口,以便通过使用方法“newAPIHadoopRDD”创建 RDD。
http://spark.apache.org/docs/latest/api/python/pyspark.html
不幸的是,两端的文档似乎很少,并且超出了我对 Hadoop/Spark/BigQuery 的了解。有没有人知道如何做到这一点?
谷歌现在有一个example了解如何将 BigQuery 连接器与 Spark 结合使用。
使用 GsonBigQueryInputFormat 似乎确实存在问题,但我得到了一个简单的莎士比亚字数统计示例
import json
import pyspark
sc = pyspark.SparkContext()
hadoopConf=sc._jsc.hadoopConfiguration()
hadoopConf.get("fs.gs.system.bucket")
conf = {"mapred.bq.project.id": "<project_id>", "mapred.bq.gcs.bucket": "<bucket>", "mapred.bq.input.project.id": "publicdata", "mapred.bq.input.dataset.id":"samples", "mapred.bq.input.table.id": "shakespeare" }
tableData = sc.newAPIHadoopRDD("com.google.cloud.hadoop.io.bigquery.JsonTextBigQueryInputFormat", "org.apache.hadoop.io.LongWritable", "com.google.gson.JsonObject", conf=conf).map(lambda k: json.loads(k[1])).map(lambda x: (x["word"], int(x["word_count"]))).reduceByKey(lambda x,y: x+y)
print tableData.take(10)
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)