气流:Dag 每隔几秒安排两次

2024-04-24

我尝试每天仅运行一次 DAG00:15:00(午夜 15 分钟),然而,它被安排了两次,间隔几秒钟。

dag = DAG(
    'my_dag',
    default_args=default_args,
    start_date=airflow.utils.dates.days_ago(1) - timedelta(minutes=10),
    schedule_interval='15 0 * * * *',
    concurrency=1,
    max_active_runs=1,
    retries=3,
    catchup=False,
)

该 Dag 的主要目标是检查新电子邮件,然后检查 SFTP 目录中的新文件,然后运行“合并”任务以将这些新文件添加到数据库中。

所有作业都是 Kubernetes Pod:

email_check = KubernetesPodOperator(
    namespace='default',
    image="g.io/email-check:0d334adb",
    name="email-check",
    task_id="email-check",
    get_logs=True,
    dag=dag,
)
sftp_check = KubernetesPodOperator(
    namespace='default',
    image="g.io/sftp-check:0d334adb",
    name="sftp-check",
    task_id="sftp-check",
    get_logs=True,
    dag=dag,
)
my_runner = KubernetesPodOperator(
    namespace='default',
    image="g.io/my-runner:0d334adb",
    name="my-runner",
    task_id="my-runner",
    get_logs=True,
    dag=dag,
)
my_runner.set_upstream([sftp_check, email_check])

所以,问题是似乎有两次运行DAG 预定的相隔几秒钟。他们不run同时进行,但一旦第一个完成,第二个就开始。

这里的问题是my_runnerjob 的目的是每天只运行一次:它尝试创建一个以日期为后缀的文件,如果该文件已经存在,它会抛出异常,因此第二次运行总是会抛出异常(因为当天的文件第一次运行时已经正确创建)

由于一张(或两张)图像相当于一千个单词,因此如下:

您将看到已安排第一次运行“00:15 后 22 秒“(这很好......有时它会在这里或那里变化几秒钟)然后还有第二个似乎总是被安排好的"5800:15 UTC 后的秒数”(至少根据他们得到的名字)。因此,第一个运行正常,似乎没有其他运行......并且一旦完成运行,就进行第二次运行(安排在00:15:58)启动(并失败)。

一个“好”的:

一个“坏”的:


您可以检查计划间隔参数吗? schedule_interval='15 0 * * * *'. The cron schedule takes only 5 parameters and I see an extra star. 另外,你能固定开始日期吗? start_date: datetime(2019, 11, 10)

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

气流:Dag 每隔几秒安排两次 的相关文章

  • 编程错误:(psycopg2.errors.UndefinedColumn)关系“task_fail”的列“execution_date”不存在

    我正在尝试在气流中运行 DAG 以将数据集摄取到谷歌云存储 这是 DAG 脚本 import os from airflow import DAG from airflow utils dates import days ago from
  • 如何在特定 systemd 服务重新启动时触发自定义脚本运行

    我想知道如何安排自定义脚本在重新启动服务时运行 我的用例是 每当重新启动 Tomcat 服务时 我都必须运行多个命令 我想知道是否有一种方法可以编写脚本并安排它在重新启动 Tomcat 服务时运行 我已将 tomcat 脚本设置为 syst
  • 打印到 stdout 会导致阻塞的 goroutine 运行吗?

    作为一个愚蠢的基本线程练习 我一直在尝试实现理发师睡觉的问题 http en wikipedia org wiki Sleeping barber problem在戈兰 对于通道来说 这应该很容易 但我遇到了一个 heisenbug 也就是
  • Airflow 默认连接数过多

    我打开气流并检查连接 发现其后面运行的连接太多 关于如何杀死那些我不使用的任何想法 或者我很想知道运行它的最小 conn id 建筑学 LocalExecutor 与其他经纪人不同 Postgres 作为元数据库 但它列出了 17 个连接
  • 如何在 Zend Framework 中存储 cron 作业的脚本?

    因为 ZF 的所有 URL 都依赖于 mod 重写 所以我并不清楚应该在哪里存储用于 cron 作业的本地脚本 有人有什么建议 或者有 正式接受 的方式吗 我用模块化目录结构 http framework zend com manual e
  • 使用正则表达式模式查找 -name 并使用 cp 替换文件名

    目前我正在使用该命令cron复制 data从源到目标路径 find source path name data exec cp target path 源码结构为 source path category1 001 data source
  • 是否有一种更简单的方法可以并行运行命令,同时在 Windows PowerShell 中保持高效?

    此自我回答旨在为那些受困于 Windows PowerShell 并由于公司政策等原因而无法安装模块的用户提供一种简单且高效的并行替代方案 在 Windows PowerShell 中 built in可用的替代方案local并行调用是St
  • 在 .NET 并发线程之间传递数据的最佳方式是什么?

    我有两个线程 一个需要轮询一堆单独的静态资源以查找更新 另一种需要获取数据并将其存储在数据库中 线程1如何告诉线程2有东西要处理 如果数据块是独立的 则将数据块视为要由线程池处理的工作项 使用线程池和QueueUserWorkItem将数据
  • 查明具有特定 ID 的会话是否已过期

    我正在创建一个上传功能 将用户上传的文件存储在服务器上 并以用户的会话 ID 作为名称 现在 我只想将此文件保留在服务器上 直到该会话处于活动状态 所以 我的问题是 如何根据会话 ID 确定会话是活动的还是过期的 以便在后一种情况下我可以安
  • Cron 作业中的 PyAutoGUI

    我正在尝试运行一个程序 该程序可以通过 crontab 使用 Selenium 和 PyAutoGUI 在 python 3 6 中自动拉出一些选项卡 这是当 cron 不运行该程序时我尝试运行的脚本 import pyautogui im
  • 乐观并发:IsConcurrencyToken 和 RowVersion

    我正在创建将在我的应用程序中使用的默认并发策略 我决定采取乐观的策略 我的所有实体都映射为Table per Type TPT 使用继承 我很快了解到 在实体框架上使用带有继承的 RowVersion 类型的列时存在问题 Product I
  • Shell Crontab 不工作

    GNU nano 2 0 9 文件 tmp crontab XXXXzBQgwS 5 check phpfpm sh 5 check nginx sh 5 disk clean sh 5 loadcheck sh 按理说我的代码应该每 5
  • 如何在C中同时运行两个子进程?

    所以我开始学习并发编程 但由于某种原因我什至无法掌握基础知识 我有一个名为 fork c 的文件 其中包含一个 main 方法 在此方法中 我将 main 分叉两次 分别进入子进程 1 和 2 在孩子 1 中 我打印了字符 A 50 次 在
  • mongodb/node.js 中单文档并发读写操作的问题

    编辑 6 15我尝试运行相同的代码 在调用之前添加延迟 doSafePush 再次收到 ConcurrencyDBError 时 即执行return when resolve wait delay 35 then function doSa
  • scala.concurrent.blocking - 它实际上做了什么?

    我花了一段时间学习 Scala 执行上下文 底层线程模型和并发性的主题 你能解释一下通过什么方式吗scala concurrent blocking 调整运行时行为 and 可以提高性能或避免死锁 如中所述scaladoc http www
  • Haskell 中多核编程的现状如何?

    Haskell 中多核编程的现状如何 现在有哪些项目 工具和库可用 有哪些经验报道 2009年至2012年期间 发生了以下事件 2012 从 2012 年开始 并行 Haskell 状态更新开始出现在并行 Haskell 摘要 http w
  • 如何让BackgroundWorker返回一个对象

    我需要做RunWorkerAsync 返回一个List
  • XAMPP Windows 上的 Php Cron 作业

    嗯 我是这个词的新手CRON 据我所知 这是一个Unix安排特定操作在定义的时间间隔后执行的概念 我需要运行一个php文件 每小时更新一次数据库 但我的困惑在于安排执行 我在用XAMPP用于 Windows 7 上的本地开发测试 我发现了什
  • 当一种语言是另一种语言的平行超集时,这意味着什么?

    我正在阅读关于实时并发 C 的期刊文章 http link springer com article 10 1007 2FBF00365999 并且它在摘要中提到 因此你们中的任何人都可以通过该链接查看上下文 Concurrent C 是
  • ArrayDeque 和 LinkedBlockingDeque

    只是想知道为什么他们做了一个LinkedBlockingDeque而同一个非并发对应物是ArrayDeque它基于可调整大小的数组 LinkedBlockingQueue使用一组节点 例如LinkedList 尽管没有实施List 我知道可

随机推荐

  • 时区显示值与给定时区的 GMT 偏移量?

    我想构建一个时区列表以显示给用户选择 显示名称必须类似于 GMT 5 30 India Standard Time Asia Calcutta 我正在使用所有时区TimeZone getAvailableIDs 并构建列表 我写的代码是 S
  • 初始化 C++ 向量的大小

    初始化 C 向量以及其他容器的大小有哪些优点 如果有 有什么理由不只使用默认的无参数构造函数吗 基本上 两者之间是否存在显着的性能差异 vector
  • 如何正确创建可通过http访问的SVN存储库? (在 public_html 内)?

    情况是这样的 subversion 已安装在服务器中 并且我可以访问服务器中的共享帐户之一 不是 root 并且该共享托管帐户具有 SSH 访问权限 我想创建一个存储库 我可以在其中提交我正在处理的 PHP 文件 当我提交时 它应该可以在浏
  • Fabric 消息太大

    我试图将 5MB 数据从服务传递给参与者 但收到错误 Fabric 消息太大 如何增加微服务之间可传输的最大大小 我看了以下内容page https github com Azure azure content blob master ar
  • ESS 在 Windows 上找不到 Rterm.exe

    我将 R 安装在名为 X alphaAndOmega R R 的目录中 所以Rterm exe 32位版本 位于 X alphaAndOmega R R bin i386 我知道它不是 标准 R 目录 并且 标准 R 目录 例如 R 3 0
  • 如何在没有设置器的情况下设置属性值

    我已经看到提出并回答了各种问题 我们可以使用反射来调用私有设置器 如下所示 是否可以通过反射获取属性的私有设置器 https stackoverflow com questions 9219261 is it possible to get
  • 现代 x86 硬件不能将单个字节存储到内存中吗?

    说到 C 的并发内存模型 Stroustrup 的C 编程语言 第 4 版 第 1 节 41 2 1 说 就像大多数现代硬件一样 机器无法加载或存储小于单词的任何内容 然而 我的 x86 处理器已经有几年的历史了 它可以并且确实存储小于单词
  • 将 JQuery getJSON 与包含的其他 JavaScript 一起使用会出现 ReferenceError

    我制作了一个小型 HTML 页面示例来使 JQuery 的 getJSON 方法正常工作 它如下所示 抱歉 草率了 这只是一个概念证明 稍后会添加到更大的项目中
  • 在三角形内强制图表 d3.js

    我正在研究 d3 js 力图 我有一个问题 是否可以在具有某些坐标的三角形内制作力图 这是我的代码 var width 500 var height 500 margin var marginLeft 10 var marginTop 10
  • 从矩阵中删除零行(优雅的方式)

    我有一个包含一些零行的矩阵 我想删除零行 矩阵是Nx3 我所做的很简单 我创造std vector其中每三个元素代表一行 然后我将其转换为Eigen MatrixXd 有没有一种优雅的方法来删除零行 include
  • 在 ncurses 中的指定位置添加相同符号的快捷方式是什么?

    我想添加str in ncurse屏幕 带坐标x 5 to 24 y 23 to 42 这是一个正方形 但我想不出一个简单的方法来做到这一点 我试过了 stdscr addstr range 23 42 range 5 24 但这行不通 它
  • 使用敏捷方法建造飞机? [关闭]

    Closed 这个问题是基于意见的 help closed questions 目前不接受答案 开发者可以从其他行业学到很多东西 作为一个思维练习 是否有可能使用敏捷技术建造一架客机 暂时忘记成本 对硬件 机身 机翼等 和软件进行迭代和增量
  • DYMOLA:opc 服务器如何使用 MATLAB 使用 dsin.txt 或 mat 文件进行初始化

    我在 DYMOLA 中创建了一个 OPC 服务器 现在我在 DYMOSIM 中有这个可以单击并初始化 使用 dsin txt 的 MAT 文件 现在我在 MATLAB 中创建了一个 GUI 文件 并获取变量的输入并创建了一个 mat 文件
  • 无法构建轮子 - 错误:无效命令“bdist_wheel”

    我已经尝试了这个非常相关的问题中的所有内容 为什么我无法在 python 中创建轮子 https stackoverflow com questions 26664102 why can i not create a wheel in py
  • postgresql自连接

    假设我有一张这样的桌子 id device cmd value id unique row ID device device identifier mac address cmd some arbitrary command value v
  • Rails:创建删除表级联迁移

    如何在 Rails 3 2 迁移中强制执行 DROP TABLE CASCADE 是否有一个选项可以传递给 drop table table name 在 Rails 4 中 您可以执行以下操作 drop table accounts fo
  • 如何使用在单击按钮上创建的用户触发图表中的放大和缩小?

    我正在构建一个角度应用程序 其中我们需要创建用于放大和缩小图表的单击按钮 我们可以使用可悬停模式栏上的按钮放大缩小图表 但这对于我们的应用程序来说不是必需的 我们希望使用通过单击按钮创建的用户来放大和缩小图表 有没有办法使用单击按钮触发可悬
  • Electron如何拦截http响应体

    有什么办法可以拦截BrowserWindow主进程中的http响应主体没有调试器 是否无法使用WebRequest类和onCompleted method 我可以使用调试器做到这一点 但由于某种原因我不能使用它 await w webCon
  • 在 Eclipse (Spring Source) 中,Grails 始终以生产模式构建

    当在 Grails 项目中使用 Eclipse 时 战争的构建似乎陷入了生产模式 如果您想部署到附加的 tcServer 您只需右键单击您的项目 然后选择 运行方式 gt 在服务器上运行 如果您将 grails 项目设置为 dev 右键单击
  • 气流:Dag 每隔几秒安排两次

    我尝试每天仅运行一次 DAG00 15 00 午夜 15 分钟 然而 它被安排了两次 间隔几秒钟 dag DAG my dag default args default args start date airflow utils dates