如何处理 AWS Glue 中映射函数中的错误?

2023-12-22

我正在使用mapDynamicFrame 的方法(或者等效地,Map.apply方法)。我注意到我传递给这些函数的函数中的任何错误都会被默默地忽略,并导致返回的 DynamicFrame 为空。

假设我有一个这样的作业脚本:

import sys
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.transforms import *

glueContext = GlueContext(SparkContext.getOrCreate())
dyF = glueContext.create_dynamic_frame.from_catalog(database="radixdemo", table_name="census_csv")

def my_mapper(rec):
    import logging
    logging.error("[RADIX] An error-log from in the mapper!")
    print "[RADIX] from in the mapper!"
    raise Exception("[RADIX] A bug!")
dyF = dyF.map(my_mapper, 'my_mapper')

print "Count:  ", dyF.count()
dyF.printSchema()
dyF.toDF().show()

如果我在 Glue Dev Endpoint 中运行此脚本gluepython,我得到这样的输出:

[glue@ip-172-31-83-196 ~]$ gluepython gluejob.py
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/usr/share/aws/glue/etl/jars/glue-assembly.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/usr/lib/spark/jars/slf4j-log4j12-1.7.16.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
18/05/23 20:56:46 WARN Client: Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME.
ERROR StatusLogger No log4j2 configuration file found. Using default configuration: logging only errors to the console.
Count:   0
root

++
||
++
++

关于此输出的注释:

  • 我没有看到结果print声明或logging.error陈述。
  • 没有迹象表明my_mapper提出了一个例外。
  • The printSchema调用显示生成的 DynamicFrame 上没有架构元数据
  • the show方法也不会产生任何输出,表明所有行都消失了。

同样,当我将此脚本保存为 AWS Glue 控制台中的作业并运行它时,该作业不会指示发生任何错误 - 作业状态为“成功”。值得注意的是,我do得到print声明和logging.error调用输出到作业日志,但仅限于常规“日志”,而不是“错误日志”。

我想要的是能够表明我的工作失败了,并且能够轻松地找到这些错误日志。最重要的是仅仅表明它已经失败了。

有没有一种方法可以在映射函数中记录错误,以便 Glue 将其作为“错误日志”(并将其放入单独的 AWS CloudWatch Logs 路径中)?如果发生这种情况,它会自动将整个作业标记为失败吗?或者是否有其他方法可以从映射函数中显式地使作业失败?

(我的计划,如果有办法记录错误和/或将作业标记为失败,是创建一个装饰器或其他实用函数,它将自动捕获映射函数中的异常,并确保它们被记录并标记为失败)。


我发现使 Glue 作业显示为“失败”的唯一方法是从主脚本中引发异常(not在映射器或过滤器函数内部,因为它们似乎被旋转到数据处理单元)。

幸运的是,有is一种检测映射或过滤函数内部是否发生异常的方法:使用DynamicFrame.stageErrorsCount()方法。它将返回一个数字,指示运行最近的转换时引发了多少个异常。

所以解决所有问题的正确方法是:

  • 确保您的地图或变换功能明确地记录其中发生的任何异常。最好通过使用装饰器函数或通过其他一些可重用机制来完成,而不是依赖于 puttry/except您编写的每个函数中的语句。
  • 在您想要捕获错误的每个转换之后,调用stageErrorsCount()方法并检查它是否大于 0。如果您想中止作业,只需引发异常即可。

例如:

import logging

def log_errors(inner):
    def wrapper(*args, **kwargs):
        try:
            return inner(*args, **kwargs)
        except Exception as e:
            logging.exception('Error in function: {}'.format(inner))
            raise
    return wrapper

@log_errors
def foo(record):
    1 / 0

然后,在你的工作中,你会做类似的事情:

df = df.map(foo, "foo")
if df.stageErrorsCount() > 0:
    raise Exception("Error in job! See the log!")

请注意,即使调用logging.exception从映射器函数内部仍然不会将日志写入error由于某种原因登录 AWS CloudWatch Logs。它被写入常规成功日志中。但是,通过这种技术,您至少会看到作业失败并能够在日志中找到信息。另一个警告:开发端点似乎没有显示来自映射器或过滤器功能的任何日志。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如何处理 AWS Glue 中映射函数中的错误? 的相关文章

随机推荐

  • Android xsl 转换空指针,但在桌面上工作

    我正在 Android 上开发 xslt users xml 文件
  • 如何通过用均值替换来处理 R 中的缺失值? [关闭]

    Closed 这个问题需要多问focused help closed questions 目前不接受答案 我有一个看起来像这样的数据框 row1 key1 10 row2 key1 12 row3 key1 NA row4 key2 2 r
  • 根据列表列连接两个 pandas 数据框

    I have 2 个数据框包含列表的列 我想join他们基于2 共同价值观在名单上 例子 ColumnA ColumnB ColumnA ColumnB id1 a b c id3 a b c x y z id2 a d e 在这种情况下我
  • 如何将 MSSQL PRINT 输出存储到变量

    在 MS SQL 2008 R2 上 我希望能够将消息输出捕获到变量中 我需要在许多数据库上运行脚本 捕获消息输出 来自 print 或 raiserror 的消息 并将其记录到表中 我需要从存储过程内部执行此操作 对于这个脚本 Decla
  • 对于 Perl 开发的理想 Vim 配置,您有何建议?

    有很多关于如何配置的线程Vim GVim http www vim org 用于 Perl 开发在 PerlMonks org 上 http www google com search q vimrc site 3Aperlmonks or
  • 进行同步 JavaScript 调用的技巧 [重复]

    这个问题在这里已经有答案了 JavaScript 在大多数现代 API 中使用异步调用来处理磁盘 IO 和网络等 慢 事物 我意识到这样做的目的是什么 但是在某些情况下确实需要进行同步调用 例如 我有一段无法重写的 JavaScript 代
  • 输入类型=“按钮”的 onClick 函数不起作用

    我有这段代码 单击 获取更多字段 按钮将创建整个 div 的副本并创建字段 但点击按钮后什么也没有发生 所有其他按钮都工作正常 不知道我做错了什么 提前感谢您的帮助 div style display none div
  • 使用 Jersey 1.x 进行自定义注释注入

    我使用的是球衣 1 9 1 我有如下休息方法 授权标头包含编码凭据 例如用户名 和密码 并在方法中解析并映射本地值 PUT Path SystemConstants REST MESSAGE SENDSMS Consumes MediaTy
  • 使用 MSAL 对 Azure 静态 Web 应用中的函数进行授权

    我正在尝试使用 Azure AD 和 MSAL 对使用 Azure 静态 Web 应用创建和公开的 Azure 函数进行身份验证和授权 如果我将应用配置为使用较旧的 AAD v1 流 但不使用 MSAL 则用户可以成功访问 API 设置 用
  • 如何用Delphi计算当前日期的日出和日落时间

    我需要在给定特定纬度和经度的情况下并使用 Delphi 我正在使用 XE2 确定当天的日出和日落时间 当地时间 我找到了本文 http en wikipedia org wiki Sunrise equation并使用了系统工具库 http
  • “无法计算表达式...”错误

    在 Visual Studio 2010 beta 2 中调试应用程序时 突然出现以下错误 无法评估表达式 因为 线程停止在某个点 垃圾收集是不可能的 可能是因为代码是 优化 监视窗口中的几乎每个属性或字段都会显示此消息 有谁知道发生了什么
  • 当我们可以解析线性回归时为什么要梯度下降

    在线性回归空间中使用梯度下降有什么好处 看起来我们可以用分析方法解决这个问题 找到最小成本函数的theta0 n 那么为什么我们仍然想使用梯度下降来做同样的事情呢 谢谢 当您使用正规方程为了分析求解成本函数 您必须计算 其中 X 是输入观测
  • LinearLayout 的边距,以 dp 编程

    是否可以以编程方式设置 LinearLayout 的边距 但不能使用像素 而是使用 dp 您可以使用显示指标 http developer android com intl de reference android util Display
  • 如何在 cakephp 中设置默认时区?

    所以我的系统已经基本完成 只是解决了一些最后的错误 我们遇到一个问题 该程序似乎永久设置在纽约时区 我在 core php 和 bootstrap php 中都有这行代码 date default timezone set Australi
  • 在 Android 中自定义 Google SignInButton

    我想在 Android 中自定义 Google 登录按钮 目前我使用以下代码有一个非常基本的默认布局
  • 使用 SQL LAG 函数计算股票收益

    我有一张股票价格表 我正在尝试计算每日回报 Ticker Date price ABC 01 01 13 100 00 ABC 01 02 13 101 50 ABC 01 03 13 99 80 ABC 01 04 13 95 50 AB
  • struts2 中的日期验证

    我正在创建一个表单 用户将在其中注册 struts2 应用程序 用户需要输入特定格式的日期 由于我不打算使用 datepicker ajax 标签 因此我使用带有日期标签的文本字段 格式如下
  • 如何在 OS X 中获取当前前台应用程序的名称或 PID?

    我需要在 OS X 中获取当前前台应用程序的名称或 PID 如何使用终端获取它 您可以使用 lsappinfo 命令从终端找到此信息 手册页详细介绍了该工具可以返回的大量信息 为了获取最前面的应用程序 您可以使用 front 参数调用 ls
  • endl 操纵器在哪里定义

    我们知道endl是操纵器 它在内部放置 n 缓冲然后刷新缓冲区 哪里endl定义 什么是endl 是宏 函数 变量 类还是对象 我该如何定义自己的endl操纵者 cout lt lt hello lt lt endl what is end
  • 如何处理 AWS Glue 中映射函数中的错误?

    我正在使用mapDynamicFrame 的方法 或者等效地 Map apply方法 我注意到我传递给这些函数的函数中的任何错误都会被默默地忽略 并导致返回的 DynamicFrame 为空 假设我有一个这样的作业脚本 import sys