Airflow 1.9 - 无法将日志写入 s3

2024-05-23

我在 aws 的 kubernetes 中运行气流 1.9。我希望将日志发送到 s3,因为气流容器本身的寿命并不长。

我已经阅读了描述该过程的各种线程和文档,但我仍然无法让它工作。首先是一个测试,向我证明 s3 配置和权限是有效的。这是在我们的一个工作实例上运行的。

使用airflow写入s3文件

airflow@airflow-worker-847c66d478-lbcn2:~$ id
uid=1000(airflow) gid=1000(airflow) groups=1000(airflow)
airflow@airflow-worker-847c66d478-lbcn2:~$ env |grep s3
AIRFLOW__CONN__S3_LOGS=s3://vevo-dev-us-east-1-services-airflow/logs/
AIRFLOW__CORE__REMOTE_LOG_CONN_ID=s3_logs
AIRFLOW__CORE__REMOTE_BASE_LOG_FOLDER=s3://vevo-dev-us-east-1-services-airflow/logs/
airflow@airflow-worker-847c66d478-lbcn2:~$ python
Python 3.6.4 (default, Dec 21 2017, 01:37:56)
[GCC 4.9.2] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> import airflow
>>> s3 = airflow.hooks.S3Hook('s3_logs')
/usr/local/lib/python3.6/site-packages/airflow/utils/helpers.py:351: DeprecationWarning: Importing S3Hook directly from <module 'airflow.hooks' from '/usr/local/lib/python3.6/site-packages/airflow/hooks/__init__.py'> has been deprecated. Please import from '<module 'airflow.hooks' from '/usr/local/lib/python3.6/site-packages/airflow/hooks/__init__.py'>.[operator_module]' instead. Support for direct imports will be dropped entirely in Airflow 2.0.
  DeprecationWarning)
>>> s3.load_string('put this in s3 file', airflow.conf.get('core', 'remote_base_log_folder') + "/airflow-test")
[2018-02-23 18:43:58,437] {{base_hook.py:80}} INFO - Using connection to: vevo-dev-us-east-1-services-airflow

现在让我们从 s3 检索文件并查看内容。我们可以看到这里一切看起来都很好。

root@4f8171d4fe47:/# aws s3 cp s3://vevo-dev-us-east-1-services-airflow/logs//airflow-test .
download: s3://vevo-dev-us-east-1-services-airflow/logs//airflow-test to ./airflow-test
root@4f8171d4fe47:/# cat airflow-test
put this in s3 fileroot@4f8171d4fe47:/stringer#

因此,除了气流作业不使用 s3 进行日志记录之外,气流 s3 连接似乎良好。以下是我的设置,我认为有些东西是错误的,或者是我遗漏了一些东西。

正在运行的worker/scheduler/master实例的环境变量是

airflow@airflow-worker-847c66d478-lbcn2:~$ env |grep -i s3
AIRFLOW__CONN__S3_LOGS=s3://vevo-dev-us-east-1-services-airflow/logs/
AIRFLOW__CORE__REMOTE_LOG_CONN_ID=s3_logs
AIRFLOW__CORE__REMOTE_BASE_LOG_FOLDER=s3://vevo-dev-us-east-1-services-airflow/logs/
S3_BUCKET=vevo-dev-us-east-1-services-airflow

这表明airflow中存在s3_logs连接

airflow@airflow-worker-847c66d478-lbcn2:~$ airflow connections -l|grep s3
│ 's3_logs'              │ 's3'                    │ 'vevo-dev-
us-...vices-airflow' │ None   │ False          │ False                │ None                           │

我把这个文件https://github.com/apache/incubator-airflow/blob/master/airflow/config_templates/airflow_local_settings.py https://github.com/apache/incubator-airflow/blob/master/airflow/config_templates/airflow_local_settings.py在我的 docker 镜像中。您可以在此处查看我们一位员工的示例

airflow@airflow-worker-847c66d478-lbcn2:~$ ls -al /usr/local/airflow/config/
total 32
drwxr-xr-x. 2 root    root    4096 Feb 23 00:39 .
drwxr-xr-x. 1 airflow airflow 4096 Feb 23 00:53 ..
-rw-r--r--. 1 root    root    4471 Feb 23 00:25 airflow_local_settings.py
-rw-r--r--. 1 root    root       0 Feb 16 21:35 __init__.py

我们已编辑该文件来定义 REMOTE_BASE_LOG_FOLDER 变量。这是我们的版本和上游版本之间的差异

index 899e815..897d2fd 100644
--- a/var/tmp/file
+++ b/config/airflow_local_settings.py
@@ -35,7 +35,8 @@ PROCESSOR_FILENAME_TEMPLATE = '{{ filename }}.log'
 # Storage bucket url for remote logging
 # s3 buckets should start with "s3://"
 # gcs buckets should start with "gs://"
-REMOTE_BASE_LOG_FOLDER = ''
+REMOTE_BASE_LOG_FOLDER = conf.get('core', 'remote_base_log_folder')
+

 DEFAULT_LOGGING_CONFIG = {
     'version': 1,

在这里您可以看到我们的一位工作人员的设置是正确的。

>>> import airflow
>>> airflow.conf.get('core', 'remote_base_log_folder')
's3://vevo-dev-us-east-1-services-airflow/logs/'

基于 REMOTE_BASE_LOG_FOLDER 以“s3”开头且 REMOTE_LOGGING 为 True 的事实

>>> airflow.conf.get('core', 'remote_logging')
'True'

我期望这个块https://github.com/apache/incubator-airflow/blob/master/airflow/config_templates/airflow_local_settings.py#L122-L123 https://github.com/apache/incubator-airflow/blob/master/airflow/config_templates/airflow_local_settings.py#L122-L123评估为 true 并使日志转到 s3。

请任何在 1.9 上进行 s3 日志记录的人指出我缺少什么?我想向上游项目提交 PR 来更新文档,因为这似乎是一个非常常见的问题,并且据我所知,上游文档无效或经常被误解。

谢谢! G。


是的,仅根据文档进行设置时我也遇到了麻烦。我必须仔细检查气流的代码才能弄清楚。有很多事情你本来可以不做。

需要检查的一些事项:
1. 确保您拥有 log_config.py 文件并且它位于正确的目录中:./config/log_config.py。还要确保您没有忘记该目录中的 __init__.py 文件。
2. 确保定义了 s3.task 处理程序并将其格式化程序设置为 airflow.task
3. 确保将airflow.task和airflow.task_runner处理程序设置为s3.task

这是一个适合我的 log_config.py 文件:

# -*- coding: utf-8 -*-
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import os

from airflow import configuration as conf

# TO DO: Logging format and level should be configured
# in this file instead of from airflow.cfg. Currently
# there are other log format and level configurations in
# settings.py and cli.py. Please see AIRFLOW-1455.

LOG_LEVEL = conf.get('core', 'LOGGING_LEVEL').upper()
LOG_FORMAT = conf.get('core', 'log_format')

BASE_LOG_FOLDER = conf.get('core', 'BASE_LOG_FOLDER')
PROCESSOR_LOG_FOLDER = conf.get('scheduler', 'child_process_log_directory')

FILENAME_TEMPLATE = '{{ ti.dag_id }}/{{ ti.task_id }}/{{ ts }}/{{ try_number }}.log'
PROCESSOR_FILENAME_TEMPLATE = '{{ filename }}.log'

S3_LOG_FOLDER = 's3://your_path_to_airflow_logs'

LOGGING_CONFIG = {
    'version': 1,
    'disable_existing_loggers': False,
    'formatters': {
        'airflow.task': {
            'format': LOG_FORMAT,
        },
        'airflow.processor': {
            'format': LOG_FORMAT,
        },
    },
    'handlers': {
        'console': {
            'class': 'logging.StreamHandler',
            'formatter': 'airflow.task',
            'stream': 'ext://sys.stdout'
        },
        'file.task': {
            'class': 'airflow.utils.log.file_task_handler.FileTaskHandler',
            'formatter': 'airflow.task',
            'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER),
            'filename_template': FILENAME_TEMPLATE,
        },
        'file.processor': {
            'class': 'airflow.utils.log.file_processor_handler.FileProcessorHandler',
            'formatter': 'airflow.processor',
            'base_log_folder': os.path.expanduser(PROCESSOR_LOG_FOLDER),
            'filename_template': PROCESSOR_FILENAME_TEMPLATE,
        },
        # When using s3 or gcs, provide a customized LOGGING_CONFIG
        # in airflow_local_settings within your PYTHONPATH, see UPDATING.md
        # for details
        's3.task': {
            'class': 'airflow.utils.log.s3_task_handler.S3TaskHandler',
            'formatter': 'airflow.task',
            'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER),
            's3_log_folder': S3_LOG_FOLDER,
            'filename_template': FILENAME_TEMPLATE,
        },
        # 'gcs.task': {
        #     'class': 'airflow.utils.log.gcs_task_handler.GCSTaskHandler',
        #     'formatter': 'airflow.task',
        #     'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER),
        #     'gcs_log_folder': GCS_LOG_FOLDER,
        #     'filename_template': FILENAME_TEMPLATE,
        # },
    },
    'loggers': {
        '': {
            'handlers': ['console'],
            'level': LOG_LEVEL
        },
        'airflow': {
            'handlers': ['console'],
            'level': LOG_LEVEL,
            'propagate': False,
        },
        'airflow.processor': {
            'handlers': ['file.processor'],
            'level': LOG_LEVEL,
            'propagate': True,
        },
        'airflow.task': {
            'handlers': ['s3.task'],
            'level': LOG_LEVEL,
            'propagate': False,
        },
        'airflow.task_runner': {
            'handlers': ['s3.task'],
            'level': LOG_LEVEL,
            'propagate': True,
        },
    }
}
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Airflow 1.9 - 无法将日志写入 s3 的相关文章

  • 在 Python 中搜索文本文件并打印相关行?

    如何在文本文件中搜索关键短语或关键字 然后打印关键短语或关键字所在的行 searchfile open file txt r for line in searchfile if searchphrase in line print line
  • 为什么 statsmodels 和 R 的逻辑回归结果不同?

    我正在尝试比较 python 的 statsmodels 和 R 中的逻辑回归实现 Python版本 import statsmodels api as sm import pandas as pd import pylab as pl i
  • 如何将 Jupyter Notebook 的代码片段与 VSCode 结合使用?

    我已经使用 VSCode 一段时间了 目前我正在尝试设置代码片段来工作 它们似乎适用于简单的 Python py 文件 但不适用于 Jupyter Notebook ipynb 文件 有什么办法可以让他们一起工作吗 片段就在这里 Creat
  • 如何向未知用户目录读取/写入文件?

    我正在尝试从用户目录 C Users USERNAME Test Source 读取和写入文件 但我未能成功找到任何有关如何自动检测用户名的资源 其中的 USERNAME上面的例子 或者无论如何 我可以让它读取和写入目录 而不需要知道用户名
  • OpenPyXL - 如何查询单元格边框?

    python 和 openpyxl 都是新的 编写一个 py 脚本来遍历大量 Excel 工作簿 工作表 并且需要找到由边框格式标识的某些单元格 我在网上看到几个关于如何设置单元格边框的示例 但我需要阅读它们 具体来说 当表内的数据不一致但
  • Python 中的自然日/相对日

    我想要一种在 Python 中显示日期项目的自然时间的方法 类似于 Twitter 将显示 刚才 几分钟前 两小时前 三天前 等消息 Django 1 0 在 django contrib 中有一个 人性化 方法 我没有使用 Django
  • 会话cookie太大烧瓶应用程序[重复]

    这个问题在这里已经有答案了 我正在尝试使用会话 本地 加载某些数据 并且它已经工作了一段时间 但是现在我收到以下警告 并且不再加载通过会话加载的数据 b session cookie 太大 该值是 13083 字节 但是 标头需要 44 个
  • 在 Flask 中将配置文件作为字典读取

    在 instance app cfg 我已经配置 test test 在我的烧瓶文件 app py 中 with app open instance resource app cfg as f config f read print con
  • python下安装xgboost 32位msys失败

    尝试安装 xgboost 失败 Windows 和企业版版本为 Anaconda 2 1 0 64 位 我该如何继续 我一直在使用 R 似乎从 RStudio 在 R 中安装新包相当容易 但在间谍程序中则不然 因为我需要进入命令窗口来执行此
  • 计算两个节点之间的最长路径 NetworkX

    我正在尝试使用 Networkx 制作甘特图 网络中的所有节点都是完成项目所需执行的 任务 使用 Networkx 可以轻松计算项目的总时间 但是制作甘特图我需要每个节点的最新启动 NetworkX 包含一个函数 dag longest p
  • 如何在 Python 中从 C++/C# 紧密实现 ?: ?

    在 C 中 我可以轻松编写以下内容 string stringValue string IsNullOrEmpty otherString defaultString otherString 有没有一种快速的方法可以在 Python 中做同
  • Python 请求包含有值的参数和没有值的参数

    我正在为 API 编写一个 Python 包装器 该 API 支持具有值的查询参数 例如param1如下 和查询参数do not有价值观 例如param2如下 即 https example com service param1 value
  • Pythonwinsound,ASYNC 标志不起作用?

    我正在使用 python 3 5 我试图在继续执行脚本的同时播放声音 根据https docs python org 3 5 library winsound html https docs python org 3 5 library w
  • 调度算法,找到设定长度的所有非重叠区间

    我需要为我的管理应用程序实现一种算法 该算法将告诉我何时可以将任务分配给哪个用户 我实现了一个蛮力解决方案 它似乎有效 但我想知道是否有更有效的方法来做到这一点 为了简单起见 我重写了算法以对数字列表进行操作 而不是数据库查询等 下面我将尝
  • 当输入是 DataFrame 时,在seaborn中对箱线图进行分组

    我打算在一个图中绘制多个列pandas dataframe 全部按另一列分组 使用groupby inside seaborn boxplot 对于类似的问题 这里有一个很好的答案matplotlib matplotlib 分组箱线图 ht
  • 在 Django 中使用 path() 找不到 404

    我刚刚查看 django 并尝试通过视图列出书籍id作为 URL 的参数books urls py 但出现 404 页面未找到错误 当我在浏览器中输入此网址时 我没有发现网址有什么问题 http 192 168 0 106 8000 boo
  • 为什么使用 LAMP 托管时避免使用 CGI for Python?

    我已经使用 PHP 多年了 最近我在论坛上看到很多帖子说PHP 已经过时了 现代编程语言更简单 更安全等等 所以 我决定开始学习Python 由于我习惯使用 PHP 因此我刚刚开始通过上传 htaccess 文件来构建页面 addtype
  • 删除aws beanstalk上的uuid python包

    这是针对所提出问题的后续帖子 问题here https stackoverflow com questions 44421761 flask beanstalk deployment errors 以防万一对其他人有用 自从第一篇文章以来
  • 在 envoy 中使用 rm *(通配符):没有这样的文件或目录

    我正在使用 Python 和 Envoy 我需要删除目录中的所有文件 除了一些文件外 该目录是空的 在终端中 这将是 rm tmp my silly directory 常识表明 在特使中 这转化为 r envoy run rm tmp m
  • python nltk从句子中提取关键字

    我们要做的第一件事 就是杀掉所有律师 威廉 莎士比亚 鉴于上面的引用 我想退出 kill and lawyers 作为两个突出的关键词来描述句子的整体含义 我提取了以下名词 动词 POS 标签 First NNP thing NN do V

随机推荐

  • FirebaseAnimatedList 实时更改内容

    我想知道如何使用新查询 更改路径的新内容来重建 FirebaseAnimatedList new Flexible child new FirebaseAnimatedList query query sort DataSnapshot a
  • 索引在 NOT IN 或 <> 子句中起作用吗?

    我读过 至少 Oracle 数据库中的普通索引基本上是 B 树结构 因此存储处理适当根节点的记录 小于 根的记录被迭代地存储在树的左侧部分 而 大于 根的记录被存储在右侧部分 正是这种存储方法有助于通过树遍历实现更快的扫描 因为深度和广度都
  • Ubuntu systemd 自定义服务因 python 脚本而失败

    希望获得有关 Ubuntu 中的 systemd 守护进程服务的一些帮助 我写了一个 python 脚本来禁用 Dell XPS 上的触摸屏 这更像是一个问题 而不是一个有用的功能 该脚本可以工作 但我不想一直启动它 这就是为什么我想到编写
  • 由于 UTFDataFormatException 导致 Spark 中的任务无法序列化:编码字符串太长

    我在 Yarn 上运行 Spark 应用程序时遇到一些问题 我有非常广泛的集成测试 运行时没有任何问题 但是当我在 YARN 上运行应用程序时 它将抛出以下错误 17 01 06 11 22 23 ERROR yarn Applicatio
  • Excel:COUNTIF 函数将“小于”字符视为运算符

    预读说明 我使用的是 LibreOffice 而不是 Excel 但大多数功能应该适用于两者 我正在制作一个电子表格 其中有大量数据 对于每个属性 例如员工数量或姓名 我需要一个函数来计算包含每个不同值的行数 我已经提取了不同的值 现在我使
  • 保证复制省略是否适用于函数参数?

    如果我理解正确的话 从 C 17 开始 这段代码现在要求不进行任何复制 Foo myfunc void return Foo auto foo myfunc no copy 函数参数也是如此吗 下面的代码中的副本会被优化掉吗 Foo myf
  • “google cloud run”将 HOME 更改为 CMD 的 /home,其中 RUN 使用 /root

    我正在做的是在 Dockerfile 中的 RUN 命令中设置 sbcl 和 Quicklisp 然后使用 CMD 加载我的自定义代码 当我在本地计算机上使用 Docker 运行它时 一切都很好 但是当我将其推送到 google run 时
  • JOGL 异常 - 在 java.library.path 中找不到gluegen-rt

    线程 main java lang UnsatisfiedLinkError中出现异常 java library path中没有gluegen rt 在 java lang ClassLoader loadLibrary ClassLoad
  • 使用 Play Integrity API 时,Firebase 电话身份验证会出现缺少客户端标识符错误

    使用 Firebase 电话身份验证注册 登录时 身份验证流程始终会启动 reCAPTCHA 流程 并在返回应用程序后发出missing client identifier error 我的设置之前适用于设备验证 安全网络 API 除了我的
  • 使用按钮添加自定义折扣订单总计

    我的模块带有自定义折扣 没问题 配置 xml
  • 如何使用 Javascript 在 html 文件中搜索字符串?

    我有 5 个 html 文件 并且有一个搜索表单 我想用它来搜索这些 html 文件中的文本
  • IntelliJ 组织导入

    IntelliJ 是否具有类似于 Eclipse 中的组织导入功能 我拥有的是一个 Java 文件 其中多个类缺少导入 例子 package com test public class Foo public Map map public J
  • 匹配没有周围字符列表的单词列表

    我有这个正则表达式 one common word or another 除非这两个单词相邻 否则它匹配得很好 One one s more word word common word or another word more anothe
  • 如何在 iOS 6 中强制 UIViewController 为纵向

    As the ShouldAutorotateToInterfaceOrientation在 iOS 6 中已弃用 我用它来强制特定视图仅肖像 在 iOS 6 中执行此操作的正确方法是什么 这仅适用于我的应用程序的一个区域 所有其他视图都可
  • 什么是竞争条件?

    编写多线程应用程序时 最常见的问题之一是竞争条件 我向社区提出的问题是 竞赛条件是什么 你如何检测它们 你如何处理它们 最后 如何防止它们发生 当两个或多个线程可以访问共享数据并且它们试图同时更改它时 就会出现竞争条件 由于线程调度算法可以
  • 为 fill_ Between() 段的不同颜色添加图例

    我正在创建一个 事件图 目前如下所示 但是 我不知道如何为每个颜色组添加图例 这就是目前情节的创建方式 handles dict for i channel events in enumerate channel event list fo
  • Android 中的库可以有自己的意图过滤器吗?

    我想开发一个可以包含在其他 Android 应用程序中的库来拦截某些类型的意图 是否可以 我创建了一个库和一个测试项目 两者都有自己的AndroidManifest xml文件 在库的清单中 我为操作 TEST 定义了一个意图过滤器 但是
  • 哪个视图最亮?

    在Android中 哪个是轻量级视图 例如 View Textview Edittext 等 在某些情况下 我们需要使用视图来填充区域而不向用户显示视图 同时屏幕加载速度应该很快 您可以使用空间 android widget Space S
  • LinkLabel 无下划线 - Compact Framework

    我正在使用 Microsoft Compact Framework 开发 Windows CE 应用程序 我必须使用 LinkLabel 它必须是白色且没有下划线 因此 在设计器中 我将字体颜色修改为白色 并在字体对话框中取消选中 下划线
  • Airflow 1.9 - 无法将日志写入 s3

    我在 aws 的 kubernetes 中运行气流 1 9 我希望将日志发送到 s3 因为气流容器本身的寿命并不长 我已经阅读了描述该过程的各种线程和文档 但我仍然无法让它工作 首先是一个测试 向我证明 s3 配置和权限是有效的 这是在我们