AWS Glue 作业 - 写入单个 Parquet 文件

2024-03-08

我正在带分区的 S3 存储桶中收集 JSON 格式的数据。

例子:

 s3://bucket/app-events/year=2019/month=9/day=30/0001.json
 s3://bucket/app-events/year=2019/month=9/day=30/0002.json
 s3://bucket/app-events/year=2019/month=9/day=30/0003.json

爬虫工作在s3://bucket/app-events/并创建一个表。

我想将这些 JSON 文件转换为单个 Parquet 文件,但我的工作为每个 JSON 文件创建一个新的 Parquet。

这是我的 Python 作业脚本:

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

print("-------------- Execute Script --------------\n")

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

print("-------------- JOB_NAME: " + args['JOB_NAME'] + " --------------\n")

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

print("-------------- Execute Finding Sources --------------\n")

## @type: DataSource
## @args: [database = "my-db", table_name = "year_2019", transformation_ctx = "datasource0"]
## @return: datasource0
## @inputs: []
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "my-db", table_name = "year_2019", transformation_ctx = "datasource0")

# datasource0 = glueContext.create_dynamic_frame_from_options("s3", {'paths': ["s3://bucket/app-events"], 'recurse':True, 'groupFiles': 'inPartition', 'groupSize': '1048576'}, format="json")
# Print out the count of found datasources
print("-------------- Sources Found: " + str(datasource0.count()) + "--------------\n")

## @type: ApplyMapping
## @args: [mapping = [("debug", "boolean", "debug", "boolean"), ("_id", "string", "_id", "string"), ("os", "string", "os", "string"), ("`data.language`", "string", "`data.language`", "string"), ("`data.ad_id`", "string", "`data.ad_id`", "string"), ("`data.ad_contenttype`", "string", "`data.ad_contenttype`", "string"), ("`data.ad_name`", "string", "`data.ad_name`", "string"), ("`data.shop_name`", "string", "`data.shop_name`", "string"), ("`data.shop_id`", "string", "`data.shop_id`", "string"), ("device_id", "string", "device_id", "string"), ("session_id", "string", "session_id", "string"), ("os_version", "string", "os_version", "string"), ("distinct_id", "string", "distinct_id", "string"), ("shop_id", "string", "shop_id", "string"), ("page", "string", "page", "string"), ("name", "string", "name", "string"), ("start_timestamp", "string", "start_timestamp", "string"), ("id", "string", "id", "string"), ("ip_address", "string", "ip_address", "string"), ("location", "string", "location", "string"), ("city", "string", "city", "string"), ("country", "string", "country", "string"), ("start_timestamp_unix", "int", "start_timestamp_unix", "int"), ("postal", "string", "postal", "string"), ("region", "string", "region", "string"), ("`data.entity_order`", "string", "`data.entity_order`", "string"), ("`data.entity_id`", "string", "`data.entity_id`", "string"), ("`data.entity_type`", "string", "`data.entity_type`", "string"), ("`data.entity_name`", "string", "`data.entity_name`", "string"), ("`data.entity_image`", "string", "`data.entity_image`", "string"), ("`data.feedbackform_id`", "string", "`data.feedbackform_id`", "string"), ("`data.feedbackform_question_count`", "string", "`data.feedbackform_question_count`", "string"), ("`data.feedbackform_name`", "string", "`data.feedbackform_name`", "string"), ("`data.shop_pincode`", "string", "`data.shop_pincode`", "string"), ("`data.entity_quantity`", "string", "`data.entity_quantity`", "string"), ("`data.entity_choice`", "string", "`data.entity_choice`", "string"), ("`data.entity_comment`", "string", "`data.entity_comment`", "string"), ("`data.entity_price`", "string", "`data.entity_price`", "string"), ("`data.old_language`", "string", "`data.old_language`", "string"), ("app", "string", "app", "string"), ("event", "string", "event", "string"), ("shop", "string", "shop", "string"), ("year", "string", "year", "string"), ("month", "string", "month", "string"), ("day", "string", "day", "string"), ("hour", "string", "hour", "string"), ("minute", "string", "minute", "string")], transformation_ctx = "applymapping1"]
## @return: applymapping1
## @inputs: [frame = datasource0]
applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("debug", "boolean", "debug", "boolean"), ("_id", "string", "_id", "string"), ("os", "string", "os", "string"), ("`data.language`", "string", "`data.language`", "string"), ("`data.ad_id`", "string", "`data.ad_id`", "string"), ("`data.ad_contenttype`", "string", "`data.ad_contenttype`", "string"), ("`data.ad_name`", "string", "`data.ad_name`", "string"), ("`data.shop_name`", "string", "`data.shop_name`", "string"), ("`data.shop_id`", "string", "`data.shop_id`", "string"), ("device_id", "string", "device_id", "string"), ("session_id", "string", "session_id", "string"), ("os_version", "string", "os_version", "string"), ("distinct_id", "string", "distinct_id", "string"), ("shop_id", "string", "shop_id", "string"), ("page", "string", "page", "string"), ("name", "string", "name", "string"), ("start_timestamp", "string", "start_timestamp", "string"), ("id", "string", "id", "string"), ("ip_address", "string", "ip_address", "string"), ("location", "string", "location", "string"), ("city", "string", "city", "string"), ("country", "string", "country", "string"), ("start_timestamp_unix", "int", "start_timestamp_unix", "int"), ("postal", "string", "postal", "string"), ("region", "string", "region", "string"), ("`data.entity_order`", "string", "`data.entity_order`", "string"), ("`data.entity_id`", "string", "`data.entity_id`", "string"), ("`data.entity_type`", "string", "`data.entity_type`", "string"), ("`data.entity_name`", "string", "`data.entity_name`", "string"), ("`data.entity_image`", "string", "`data.entity_image`", "string"), ("`data.feedbackform_id`", "string", "`data.feedbackform_id`", "string"), ("`data.feedbackform_question_count`", "string", "`data.feedbackform_question_count`", "string"), ("`data.feedbackform_name`", "string", "`data.feedbackform_name`", "string"), ("`data.shop_pincode`", "string", "`data.shop_pincode`", "string"), ("`data.entity_quantity`", "string", "`data.entity_quantity`", "string"), ("`data.entity_choice`", "string", "`data.entity_choice`", "string"), ("`data.entity_comment`", "string", "`data.entity_comment`", "string"), ("`data.entity_price`", "string", "`data.entity_price`", "string"), ("`data.old_language`", "string", "`data.old_language`", "string"), ("app", "string", "app", "string"), ("event", "string", "event", "string"), ("shop", "string", "shop", "string"), ("year", "string", "year", "string"), ("month", "string", "month", "string"), ("day", "string", "day", "string"), ("hour", "string", "hour", "string"), ("minute", "string", "minute", "string")], transformation_ctx = "applymapping1")
## @type: ResolveChoice
## @args: [choice = "make_struct", transformation_ctx = "resolvechoice2"]
## @return: resolvechoice2
## @inputs: [frame = applymapping1]
resolvechoice2 = ResolveChoice.apply(frame = applymapping1, choice = "make_struct", transformation_ctx = "resolvechoice2")
## @type: DropNullFields
## @args: [transformation_ctx = "dropnullfields3"]
## @return: dropnullfields3
## @inputs: [frame = resolvechoice2]
dropnullfields3 = DropNullFields.apply(frame = resolvechoice2, transformation_ctx = "dropnullfields3")

## @type: DataSink
## @args: [connection_type = "s3", connection_options = {"path": "s3://bucket/app-events-processed"}, format = "parquet", transformation_ctx = "datasink4"]
## @return: datasink4
## @inputs: [frame = dropnullfields3]
datasink4 = glueContext.write_dynamic_frame.from_options(frame = l_history, connection_type = "s3", connection_options = {"path": "s3://bucket/app-events-processed/singles"}, format = "parquet", transformation_ctx = "datasink4")
job.commit()

我怎样才能做到这一点?


AWS Glue 基于 Apache Spark,它跨多个节点对数据进行分区以实现高吞吐量。当将数据写入 Amazon S3 等基于文件的接收器时,Glue 将为每个分区写入一个单独的文件。要更改 DynamicFrame 中的分区数量,您可以首先将其转换为 DataFrame,然后利用 Apache Spark 的分区功能。

# Convert to a dataframe and partition based on "partition_col"
partitioned_dataframe = datasource0.toDF().repartition(1)

# Convert back to a DynamicFrame for further processing.
partitioned_dynamicframe = DynamicFrame.fromDF(partitioned_dataframe, glueContext, "partitioned_df")

参考 :AWS Glue:如何做事 https://github.com/aws-samples/aws-glue-samples/blob/master/FAQ_and_How_to.md

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

AWS Glue 作业 - 写入单个 Parquet 文件 的相关文章

随机推荐

  • 将第二个数据库添加到 alembic 上下文中

    我想在迁移过程中连接到第二个外部数据库 以将其部分数据移至本地数据库中 最好的方法是什么 将第二个数据库添加到 alembic 上下文后 我不知道该怎么做 在迁移期间如何在数据库上运行 SQL 语句 这就是我的env py现在看起来像 fr
  • 将多个变量从 HTML 传递到 PHP

    我想将两个变量传递到我的 php 页面 下拉变量工作得很好 但是当我添加一个附加变量时 它只发送 0 而不是我在表单中输入的内容 我觉得我已经非常接近这个问题的解决方案了 当我替换这一行上的数字时 xmlhttp open GET getd
  • Haskell:“Num [a] => a”和“Num a => [a]”之间有什么区别

    显然 我的类型签名已关闭 我现在已经知道原因了 现在 我有兴趣了解有关 GHCI 对我的拼写错误推断的签名的更多信息 我试图让这段代码工作 elemNum Eq a Num b gt a gt a gt b elemNum e l f e
  • 为什么我的 build.gradle android studio 中没有 allprojects{}?

    我正在开发一个 Android 应用程序 我需要将 PayPal 付款方式添加到该应用程序 所以我使用这个 https developer paypal com docs business native checkout android h
  • MariaDB Galera集群设置问题

    我正在尝试启动并运行 mariadb 集群 但它对我来说不起作用 现在我在 64 位 Red hat ES6 机器上使用 MariaDB Galera 5 5 36 我通过这里的存储库安装了 mariadb mariadb name Mar
  • pandas 切割多列

    我希望在多个列中应用一个容器 a 1 2 9 1 5 3 b 9 8 7 8 9 1 c a b print pd cut c 3 labels False 效果很好并创造了 0 0 2 0 1 0 2 2 2 2 2 0 但是 我想应用
  • 显示来自MySQL数据库的php中的所有表名

    好吧 我对 PHP 和 SQL MySQL 还很陌生 所以非常感谢您的帮助 我觉得我采取了正确的方法 我在 php net 上搜索 MySQL 显示所有表名称 它返回了一个已弃用的方法 并建议使用 MySQL 查询SHOW TABLES F
  • 如何更改Font Awesome感叹号三角形图标的内部白色?

    如何更改图标的内部 白色 颜色 i class fa fa exclamation triangle i 附 正在申请 i class fa fa exclamation triangle style color red i 不是答案 因为
  • 将 Admob 添加到 Libgdx 游戏

    我正在休憩本教程 https www youtube com watch v cwAN4LMXo58但是当我尝试进入货币化页面时 我无法货币化或添加任何横幅 因为与教程不同 我的游戏不在商店中 我的问题是 我是否需要在不添加 admob 代
  • 安排交互式 UILocalNotification - Obj-C

    我正在尝试安排一个互动UILocalNotifaction 我的尝试是使用以下代码 这是我从中获取的tutorial https nrj io simple interactive notifications in ios 8 NSStri
  • Twilio 客户端语音通话扬声器开/关问题

    我在用Twilio 客户端 https www twilio com docs api client ios用于通话的语音通话服务 在这里 我遇到了斯皮克开 关功能的问题 我可以将通话静音 取消静音 但无法打开 关闭扬声器 我有两个功能的相
  • 使用正则表达式匹配多行文本

    我正在尝试使用 Java 来匹配多行文本 当我使用Pattern类与Pattern MULTILINE修饰符 我能够匹配 但我无法这样做 m 相同的图案与 m 并使用String matches似乎不起作用 我确信我错过了一些东西 但不知道
  • Kohana v3.1.0 ORM _ignored_columns -- 既然它消失了,我该怎么办?

    看来在 Kohana 的 ORM v3 1 0 中 ignored columns财产已被删除 处理数据库中不存在的字段的推荐技术是什么 我现在的情况是password confirm 其中password是一个字段 但我们要求用户输入密码
  • 白色背景上带有透明标题的 iOS UIButton

    我有一个习惯UIButton具有透明背景和白色标题 我正在寻找一个简单的解决方案来反转它的突出显示 白色背景和透明标题 因为它是在系统上实现的UISegmentedControl 有没有比反转用作快照的 alpha 更简单的解决方案CALa
  • 连接到恢复的数据库时 Datomic 抛出 ActiveMQInternalErrorException

    我从 dev 存储备份了 Datomic v0 9 5786 数据库 datomic datomic pro 0 9 5786 bin datomic Xmx4g Xms4g backup db datomic dev localhost
  • 构建 python 项目的非常“简单”的方法是什么?

    所以我有一个需要处理文件的 python 东西 首先是 my project script py 我会简单地运行它python script py file csv 然后它长大并成为 my project script py util st
  • 需要更新 mysql 查询来选择预订酒店房间或任何东西的日期范围

    我的项目中有一个情况如下 在检查可用房间时 sel from bookings SELECT room no FROM booking WHERE POST req tdate BETWEEN check indate AND check
  • 解码 Jwt 令牌 React

    我使用 jsonwebtoken 来解码我的令牌以查看它是否已过期 但是 console log 返回 null var token response headers authorization token token replace Be
  • 通过 HTTPS 的 JavaFX.MediaPlayer?

    我尝试使用 MediaPlayer 播放定义了 HTTPS url 的资源 但它显示协议不受支持 在 API 参考中 他们声明 FILE HTTP JAR 是受支持的协议 未提及 HTTPS 是否可以通过 HTTPS 使用 url 在我看来
  • AWS Glue 作业 - 写入单个 Parquet 文件

    我正在带分区的 S3 存储桶中收集 JSON 格式的数据 例子 s3 bucket app events year 2019 month 9 day 30 0001 json s3 bucket app events year 2019 m