我遇到一个问题,BashOperator 没有记录 wget 的所有输出。它只会记录输出的前 1-5 行。

我已经尝试过仅使用 wget 作为 bash 命令:

tester = BashOperator(
    task_id = 'testing',
    bash_command = "wget -N -r -nd --directory-prefix='/tmp/'",
    dag = dag)

我还尝试过将其作为更长的 bash 脚本的一部分,该脚本具有跟随 wget 的其他命令。 Airflow 在触发下游任务之前会等待脚本完成。这是一个 bash 脚本示例:

echo "Starting up..."
wget -N -r -nd --directory-prefix='/tmp/'
echo "Download complete..."
unzip /tmp/ -o -d /tmp/test_airflow
echo "Archive unzipped..."


[2017-04-13 18:33:34,214] {} INFO - Subtask: --------------------------------------------------------------------------------
[2017-04-13 18:33:34,214] {} INFO - Subtask: Starting attempt 1 of 1
[2017-04-13 18:33:34,215] {} INFO - Subtask: --------------------------------------------------------------------------------
[2017-04-13 18:33:34,215] {} INFO - Subtask: 
[2017-04-13 18:33:35,068] {} INFO - Subtask: [2017-04-13 18:33:35,068] {} INFO - Executing <Task(BashOperator): testing> on 2017-04-13 18:33:08
[2017-04-13 18:33:37,569] {} INFO - Subtask: [2017-04-13 18:33:37,569] {} INFO - tmp dir root location: 
[2017-04-13 18:33:37,569] {} INFO - Subtask: /tmp
[2017-04-13 18:33:37,571] {} INFO - Subtask: [2017-04-13 18:33:37,571] {} INFO - Temporary script location :/tmp/airflowtmpqZhPjB//tmp/airflowtmpqZhPjB/testingCkJgDE
[2017-04-13 18:14:54,943] {} INFO - Subtask: [2017-04-13 18:14:54,942] {} INFO - Running command: /var/www/upstream/xtractor/scripts/ 
[2017-04-13 18:14:54,951] {} INFO - Subtask: [2017-04-13 18:14:54,950] {} INFO - Output:
[2017-04-13 18:14:54,955] {} INFO - Subtask: [2017-04-13 18:14:54,954] {} INFO - Starting up...
[2017-04-13 18:14:54,958] {} INFO - Subtask: [2017-04-13 18:14:54,957] {} INFO - --2017-04-13 18:14:54--
[2017-04-13 18:14:55,106] {} INFO - Subtask: [2017-04-13 18:14:55,105] {} INFO - Resolving (
[2017-04-13 18:14:55,186] {} INFO - Subtask: [2017-04-13 18:14:55,186] {} INFO - Connecting to (||:80... connected.
[2017-04-13 18:14:55,284] {} INFO - Subtask: [2017-04-13 18:14:55,284] {} INFO - HTTP request sent, awaiting response... 200 OK
[2017-04-13 18:14:55,285] {} INFO - Subtask: [2017-04-13 18:14:55,284] {} INFO - Length: 1662639 (1.6M) [application/zip]
[2017-04-13 18:15:01,485] {} INFO - Task exited with return code 0

编辑:更多测试表明记录 wget 的输出存在问题。

这是因为在默认运算符中仅打印最后一行。请将里面的代码替换为以下内容airflow/operators/bash_operator.py无论您的气流安装在哪里。通常,您需要查看您的 python 在哪里,然后转到site-packages

from builtins import bytes
import os
import signal
import logging
from subprocess import Popen, STDOUT, PIPE
from tempfile import gettempdir, NamedTemporaryFile

from airflow.exceptions import AirflowException
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
from airflow.utils.file import TemporaryDirectory

class BashOperator(BaseOperator):
    Execute a Bash script, command or set of commands.

    :param bash_command: The command, set of commands or reference to a
        bash script (must be '.sh') to be executed.
    :type bash_command: string
    :param xcom_push: If xcom_push is True, the last line written to stdout
        will also be pushed to an XCom when the bash command completes.
    :type xcom_push: bool
    :param env: If env is not None, it must be a mapping that defines the
        environment variables for the new process; these are used instead
        of inheriting the current process environment, which is the default
        behavior. (templated)
    :type env: dict
    :type output_encoding: output encoding of bash command
    template_fields = ('bash_command', 'env')
    template_ext = ('.sh', '.bash',)
    ui_color = '#f0ede4'

    def __init__(
            *args, **kwargs):

        super(BashOperator, self).__init__(*args, **kwargs)
        self.bash_command = bash_command
        self.env = env
        self.xcom_push_flag = xcom_push
        self.output_encoding = output_encoding

    def execute(self, context):
        Execute the bash command in a temporary directory
        which will be cleaned afterwards
        bash_command = self.bash_command"tmp dir root location: \n" + gettempdir())
        line_buffer = []        
        with TemporaryDirectory(prefix='airflowtmp') as tmp_dir:
            with NamedTemporaryFile(dir=tmp_dir, prefix=self.task_id) as f:

                f.write(bytes(bash_command, 'utf_8'))
                fname =
                script_location = tmp_dir + "/" + fname
      "Temporary script "
                             "location :{0}".format(script_location))
      "Running command: " + bash_command)
                sp = Popen(
                    ['bash', fname],
                    stdout=PIPE, stderr=STDOUT,
                    cwd=tmp_dir, env=self.env,

                self.sp = sp

                line = ''

                for line in iter(sp.stdout.readline, b''):
                    line = line.decode(self.output_encoding).strip()
      "Command exited with "
                             "return code {0}".format(sp.returncode))

                if sp.returncode:
                    raise AirflowException("Bash command failed")"\n".join(line_buffer))
        if self.xcom_push_flag:
            return "\n".join(line_buffer)

    def on_kill(self):'Sending SIGTERM signal to bash process group')
        os.killpg(os.getpgid(, signal.SIGTERM)

