Airflow BashOperator 日志不包含完整输出

2024-02-20

我遇到一个问题,BashOperator 没有记录 wget 的所有输出。它只会记录输出的前 1-5 行。

我已经尝试过仅使用 wget 作为 bash 命令:

tester = BashOperator(
    task_id = 'testing',
    bash_command = "wget -N -r -nd --directory-prefix='/tmp/' http://apache.cs.utah.edu/httpcomponents/httpclient/source/httpcomponents-client-4.5.3-src.zip",
    dag = dag)

我还尝试过将其作为更长的 bash 脚本的一部分,该脚本具有跟随 wget 的其他命令。 Airflow 在触发下游任务之前会等待脚本完成。这是一个 bash 脚本示例:

#!/bin/bash
echo "Starting up..."
wget -N -r -nd --directory-prefix='/tmp/' http://apache.cs.utah.edu/httpcomponents/httpclient/source/httpcomponents-client-4.5.3-src.zip
echo "Download complete..."
unzip /tmp/httpcomponents-client-4.5.3-src.zip -o -d /tmp/test_airflow
echo "Archive unzipped..."

日志文件的最后几行:

[2017-04-13 18:33:34,214] {base_task_runner.py:95} INFO - Subtask: --------------------------------------------------------------------------------
[2017-04-13 18:33:34,214] {base_task_runner.py:95} INFO - Subtask: Starting attempt 1 of 1
[2017-04-13 18:33:34,215] {base_task_runner.py:95} INFO - Subtask: --------------------------------------------------------------------------------
[2017-04-13 18:33:34,215] {base_task_runner.py:95} INFO - Subtask: 
[2017-04-13 18:33:35,068] {base_task_runner.py:95} INFO - Subtask: [2017-04-13 18:33:35,068] {models.py:1342} INFO - Executing <Task(BashOperator): testing> on 2017-04-13 18:33:08
[2017-04-13 18:33:37,569] {base_task_runner.py:95} INFO - Subtask: [2017-04-13 18:33:37,569] {bash_operator.py:71} INFO - tmp dir root location: 
[2017-04-13 18:33:37,569] {base_task_runner.py:95} INFO - Subtask: /tmp
[2017-04-13 18:33:37,571] {base_task_runner.py:95} INFO - Subtask: [2017-04-13 18:33:37,571] {bash_operator.py:81} INFO - Temporary script location :/tmp/airflowtmpqZhPjB//tmp/airflowtmpqZhPjB/testingCkJgDE
[2017-04-13 18:14:54,943] {base_task_runner.py:95} INFO - Subtask: [2017-04-13 18:14:54,942] {bash_operator.py:82} INFO - Running command: /var/www/upstream/xtractor/scripts/Temp_test.sh 
[2017-04-13 18:14:54,951] {base_task_runner.py:95} INFO - Subtask: [2017-04-13 18:14:54,950] {bash_operator.py:91} INFO - Output:
[2017-04-13 18:14:54,955] {base_task_runner.py:95} INFO - Subtask: [2017-04-13 18:14:54,954] {bash_operator.py:96} INFO - Starting up...
[2017-04-13 18:14:54,958] {base_task_runner.py:95} INFO - Subtask: [2017-04-13 18:14:54,957] {bash_operator.py:96} INFO - --2017-04-13 18:14:54--  http://apache.cs.utah.edu/httpcomponents/httpclient/source/httpcomponents-client-4.5.3-src.zip
[2017-04-13 18:14:55,106] {base_task_runner.py:95} INFO - Subtask: [2017-04-13 18:14:55,105] {bash_operator.py:96} INFO - Resolving apache.cs.utah.edu (apache.cs.utah.edu)... 155.98.64.87
[2017-04-13 18:14:55,186] {base_task_runner.py:95} INFO - Subtask: [2017-04-13 18:14:55,186] {bash_operator.py:96} INFO - Connecting to apache.cs.utah.edu (apache.cs.utah.edu)|155.98.64.87|:80... connected.
[2017-04-13 18:14:55,284] {base_task_runner.py:95} INFO - Subtask: [2017-04-13 18:14:55,284] {bash_operator.py:96} INFO - HTTP request sent, awaiting response... 200 OK
[2017-04-13 18:14:55,285] {base_task_runner.py:95} INFO - Subtask: [2017-04-13 18:14:55,284] {bash_operator.py:96} INFO - Length: 1662639 (1.6M) [application/zip]
[2017-04-13 18:15:01,485] {jobs.py:2083} INFO - Task exited with return code 0

编辑:更多测试表明记录 wget 的输出存在问题。


这是因为在默认运算符中仅打印最后一行。请将里面的代码替换为以下内容airflow/operators/bash_operator.py无论您的气流安装在哪里。通常,您需要查看您的 python 在哪里,然后转到site-packages

from builtins import bytes
import os
import signal
import logging
from subprocess import Popen, STDOUT, PIPE
from tempfile import gettempdir, NamedTemporaryFile

from airflow.exceptions import AirflowException
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
from airflow.utils.file import TemporaryDirectory


class BashOperator(BaseOperator):
    """
    Execute a Bash script, command or set of commands.

    :param bash_command: The command, set of commands or reference to a
        bash script (must be '.sh') to be executed.
    :type bash_command: string
    :param xcom_push: If xcom_push is True, the last line written to stdout
        will also be pushed to an XCom when the bash command completes.
    :type xcom_push: bool
    :param env: If env is not None, it must be a mapping that defines the
        environment variables for the new process; these are used instead
        of inheriting the current process environment, which is the default
        behavior. (templated)
    :type env: dict
    :type output_encoding: output encoding of bash command
    """
    template_fields = ('bash_command', 'env')
    template_ext = ('.sh', '.bash',)
    ui_color = '#f0ede4'

    @apply_defaults
    def __init__(
            self,
            bash_command,
            xcom_push=False,
            env=None,
            output_encoding='utf-8',
            *args, **kwargs):

        super(BashOperator, self).__init__(*args, **kwargs)
        self.bash_command = bash_command
        self.env = env
        self.xcom_push_flag = xcom_push
        self.output_encoding = output_encoding

    def execute(self, context):
        """
        Execute the bash command in a temporary directory
        which will be cleaned afterwards
        """
        bash_command = self.bash_command
        logging.info("tmp dir root location: \n" + gettempdir())
        line_buffer = []        
        with TemporaryDirectory(prefix='airflowtmp') as tmp_dir:
            with NamedTemporaryFile(dir=tmp_dir, prefix=self.task_id) as f:

                f.write(bytes(bash_command, 'utf_8'))
                f.flush()
                fname = f.name
                script_location = tmp_dir + "/" + fname
                logging.info("Temporary script "
                             "location :{0}".format(script_location))
                logging.info("Running command: " + bash_command)
                sp = Popen(
                    ['bash', fname],
                    stdout=PIPE, stderr=STDOUT,
                    cwd=tmp_dir, env=self.env,
                    preexec_fn=os.setsid)

                self.sp = sp

                logging.info("Output:")
                line = ''

                for line in iter(sp.stdout.readline, b''):
                    line = line.decode(self.output_encoding).strip()
                    line_buffer.append(line)
                    logging.info(line)
                sp.wait()
                logging.info("Command exited with "
                             "return code {0}".format(sp.returncode))

                if sp.returncode:
                    raise AirflowException("Bash command failed")
        logging.info("\n".join(line_buffer))
        if self.xcom_push_flag:
            return "\n".join(line_buffer)

    def on_kill(self):
        logging.info('Sending SIGTERM signal to bash process group')
        os.killpg(os.getpgid(self.sp.pid), signal.SIGTERM)
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Airflow BashOperator 日志不包含完整输出 的相关文章

随机推荐

  • 从字符串获取python类对象[重复]

    这个问题在这里已经有答案了 可能的重复 Python 中的动态模块导入 https stackoverflow com questions 301134 dynamic module import in python 可能是一个简单的问题
  • 运行 Maven 安装时如何跳过许可证检查?

    I ran a mvn clean install在我从事的一个大型 Java 项目中 由于某些文件没有正确的许可证头 该项目一直失败 好吧 这不是我现在关心的问题 我该如何跳过呢 我看到的实际错误是 Failed to execute g
  • 计算 Java 函数的签名

    有没有办法computeJava 类的方法的签名 一个签名 like Ljava lang String V表示一个函数 它采用String 作为论据并返回void 什么是rule计算签名 它始终是一组括号 其中包含参数的类型指示符 一个接
  • 如何使用 Grand Central Dispatch 并行化数独求解器?

    作为编程练习 我刚刚编写了一个使用回溯算法的数独求解器 请参阅维基百科 http en wikipedia org wiki Algorithmics of sudoku Example of a brute force Sudoku so
  • python中如何检查变量是否为空?

    我想知道python是否有任何函数 例如php空函数 http php net manual en function empty php 它检查变量是否为空并符合以下条件 an empty string 0 0 as an integer
  • Git 在提交之前存储特定文件

    我不确定我正在寻找的是不是git stash但这就是我想做的 我有为本地使用定制的配置文件 这些文件已经存在于 Git 中 现在 如果我添加新功能 更改其他文件 我想存储我的配置并点击提交 并且仅提交与我的新功能相关的文件 如果我使用 gi
  • ASP.NET Core如何执行Linux shell命令?

    我在 Linux 上有一个 ASP NET Core Web 应用程序 我想执行 shell 命令并从命令中获取结果 有没有办法从 ASP NET Core 应用程序中执行 Linux shell 命令并将值返回到变量中 string Ru
  • 我可以将变量设置为未定义或将未定义作为参数传递吗?

    我对 JavaScript 有点困惑undefined and null values 什么是if testvar 实际上呢 它是否测试undefined and null要不就undefined 一旦定义了变量 我可以将其清除回undef
  • 过滤后的 CollectionView 给出错误的计数

    根据文档 http msdn microsoft com en us library system windows data collectionview count aspx 过滤后的 CollectionView 的 Count 应该只
  • jQuery 将输入类型=文本更改为文本区域

    我有一个隐藏字段 在放置事件之后 它需要转换为 文本区域 This excerpt parent find excerpt attr type textarea excerpt val textarea 产生的 属性无法更改 error 这
  • 如何创建自定义魔法文件数据库

    Unixfile命令使用 神奇 文件数据库来确定文件包含的数据类型 而与文件名或扩展名无关 我需要制作自定义魔法数据库用于测试目的 但我无法找到如何创建一个数据库 You can man magic有关如何创建您自己的魔法文件的说明 然后使
  • 与 shell 通配符和正则表达式的混淆

    发起人为reply https stackoverflow com questions 1320721 postgres regex and nested queries something like unix pipes 1322144
  • 无法将类型“System.Drawing.Image”隐式转换为“System.Drawing.Bitmap”`

    声明了一个位图 private Bitmap img1 null private Bitmap img2 null 选择图像后将被放置打开文件对话框 选定的图像被放置在一个数组中 imgName openFD FileNames 然后按钮1
  • firestore:权限缺失或不足

    我在登录时使用角色 允许读取 写入 if request auth uid null 我获取数据没问题 但是当我注销用户时 我收到错误 缺少权限或权限不足 首先我认为这是因为我没有取消订阅我尝试过的 Observable rxjs oper
  • Android Renderscript Allocation.USAGE_SHARED 崩溃

    我在运行使用渲染脚本的应用程序时发生崩溃 不幸的是 logcat 没有给出任何具体细节 b Bitmap createBitmap ib getWidth ib getHeight ib getConfig Allocation mInAl
  • Prettier 弄乱了 jsx 片段

    function App return lt gt lt Navbar gt lt Users gt gt 按 ctrl 将更改保存到 function App return lt gt lt Navbar gt lt Users gt l
  • 将 Python 解释器历史记录导出到文件?

    很多时候 在实际写入文件之前 我会使用 Python 解释器来检查变量并逐步执行命令 然而到最后 我的解释器中有大约 30 个命令 并且必须将它们复制 粘贴到文件中才能运行 有没有办法可以将 Python 解释器历史记录导出 写入到文件中
  • 在 SwiftUI 中,只有当用户将手指放在屏幕上时,什么手势才能执行代码?

    什么手势只有当用户将手指放在屏幕上时才能执行代码 运行某些代码不应该是原因 手势 的影响 我想要的是当用户将手指放在屏幕上时运行某些代码 如果用户将手指移开 代码就会停止运行 例如 some view unknownGesture runn
  • 将假设中的 ~exists 转换为 forall

    我陷入了假设的境地 exists k k lt n 1 f k f n 2 并希望将其转换为等效的 我希望如此 假设forall k k lt n 1 gt f k lt gt f n 2 这是一个小例子 Require Import Co
  • Airflow BashOperator 日志不包含完整输出

    我遇到一个问题 BashOperator 没有记录 wget 的所有输出 它只会记录输出的前 1 5 行 我已经尝试过仅使用 wget 作为 bash 命令 tester BashOperator task id testing bash