使用 ExecuteStreamCommand 的 Python 脚本

2024-01-04

在尽我所能找到以前的问题和与此问题相关的示例后,仍然没有找到我正在寻找的答案,我想我会自己提交一个问题。

由于以下原因,ExecuteStreamCommand 对我来说似乎是完美的处理器:

  • 我能够执行任何 Python 脚本并避免使用 Jython(与 ExecuteScript 类似)。 Jython 不适合我。
  • 我可以接受 FlowFiles。这是必要的,因为我的脚本是为了消耗先前处理器的输出而制作的。此外,我喜欢将数据保存在“NiFi 管理”之下的想法。
  • 它写入对路由有用的“执行状态”。

简而言之,我试图用 ExecuteStreamCommand 做的是:

  • 摄取前一个处理器的输出(准确地说,Scrapy 蜘蛛输出包含 JSON 行的文本文件)
  • 调用 python 脚本(例如python3 my_script.py)
  • 加载在我的 python 脚本中摄取的 FlowFile。
  • 选择流文件的内容。
  • 在 python 中对 FlowFile 的内容进行操作。
  • 输出原始 FlowFile 的更新版本或创建新版本。
  • 使用更新/新的 FlowFile 继续我的 NiFi 流程。

为了清楚起见,我目前不明白:

  • 如何调用 python 脚本(从 ExecuteStreamCommand 处理器)
  • 如何从 Python 中加载 FlowFile
  • 如何从 Python 中更新或创建新的 FlowFile
  • 如何将更新后的 FlowFile 从 Python 输出回 NiFi。

我遇到过 ExecuteScript 的各种示例,但不幸的是,这些示例并不能完全转化为 ExecuteStreamCommand 的使用。

先感谢您。任何建议表示赞赏。


从你的问题来看,你说你需要调用Python脚本而不使用InvokeScriptedProcessor or ExecuteScript处理器,因为您无法使用 Jython。考虑到这一要求,您仍然应该能够实现您的目标。虽然需要对框架有一定的了解,但所有这些信息都来自ExecuteStreamCommand文档 https://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi/nifi-standard-nar/1.5.0/org.apache.nifi.processors.standard.ExecuteStreamCommand/index.html.

您的“我目前不明白”部分:

  • 如何调用 python 脚本(从 ExecuteStreamCommand 处理器)

    • In your ExecuteStreamCommand处理器,配置命令参数 and 命令路径具有以下属性:

      • 命令参数:any flags or args, delimited by ; (i.e. /path/to/my_script.py)
      • 命令路径:/path/to/python3
  • 如何从 Python 中加载 FlowFile

    • 流文件内容将通过STDIN,因此在您的 Python 脚本中,以与通常处理方式相同的方式处理该数据STDIN.
  • How to update or create a new FlowFile from within Python
    • NiFi 处理框架中的流文件创建。 Python 脚本传递给的任何数据STDOUT将填充到传递给的结果流文件的内容中输出流的关系ExecuteStreamCommand处理器。在这种情况下,您的脚本不需要了解“流文件”。如果您使用的是ISP or ES处理器,您可以使用 NiFi 脚本 API,它会自动注入到脚本中来创建或更新流文件对象。
  • How to output the updated FlowFile from Python back to NiFi.
    • 同样,只需将所需的流文件内容写入STDOUT从您的脚本中,并且(给定返回状态代码0) NiFi 将生成包含该内容的新流文件。如果您设置输出目的地属性的财产ESC为非空值时,NiFi 将使用包含脚本输出的同名新属性来更新现有流文件。
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

使用 ExecuteStreamCommand 的 Python 脚本 的相关文章

  • Django REST序列化器:创建对象而不保存

    我已经开始使用 Django REST 框架 我想做的是使用一些 JSON 发布请求 从中创建一个 Django 模型对象 然后使用该对象而不保存它 我的 Django 模型称为 SearchRequest 我所拥有的是 api view
  • 如何在刻度标签和轴之间添加空间

    我已成功增加刻度标签的字体 但现在它们距离轴太近了 我想在刻度标签和轴之间添加一点呼吸空间 如果您不想全局更改间距 通过编辑 rcParams 并且想要更简洁的方法 请尝试以下操作 ax tick params axis both whic
  • InterfaceError:连接已关闭(使用 django + celery + Scrapy)

    当我在 Celery 任务中使用 Scrapy 解析函数 有时可能需要 10 分钟 时 我得到了这个信息 我用 姜戈 1 6 5 django celery 3 1 16 芹菜 3 1 16 psycopg2 2 5 5 我也使用了psyc
  • 如何生成给定范围内的回文数列表?

    假设范围是 1 X 120 这是我尝试过的 gt gt gt def isPalindrome s check if a number is a Palindrome s str s return s s 1 gt gt gt def ge
  • Pycharm Python 控制台不打印输出

    我有一个从 Pycharm python 控制台调用的函数 但没有显示输出 In 2 def problem1 6 for i in range 1 101 2 print i end In 3 problem1 6 In 4 另一方面 像
  • 如何在 Sublime Text 2 的 OSX 终端中显示构建结果

    我刚刚从 TextMate 切换到 Sublime Text 2 我非常喜欢它 让我困扰的一件事是默认的构建结果显示在 ST2 的底部 我的程序产生一些很长的结果 显示它的理想方式 如在 TM2 中 是并排查看它们 如何在 Mac 操作系统
  • Spark的distinct()函数是否仅对每个分区中的不同元组进行洗牌

    据我了解 distinct 哈希分区 RDD 来识别唯一键 但它是否针对仅移动每个分区的不同元组进行了优化 想象一个具有以下分区的 RDD 1 2 2 1 4 2 2 1 3 3 5 4 5 5 5 在此 RDD 上的不同键上 所有重复键
  • Python tcl 未正确安装

    我刚刚为 python 安装了graphics py 但是当我尝试运行以下代码时 from graphics import def main win GraphWin My Circle 100 100 c Circle Point 50
  • 从 scikit-learn 导入 make_blobs [重复]

    这个问题在这里已经有答案了 我收到下一个警告 D Programming Python ML venv lib site packages sklearn utils deprecation py 77 DeprecationWarning
  • IRichBolt 在storm-1.0.0 和 pyleus-0.3.0 上运行拓扑时出错

    我正在运行风暴拓扑 pyleus verbose local xyz topology jar using storm 1 0 0 pyleus 0 3 0 centos 6 6并得到错误 线程 main java lang NoClass
  • feedparser 在脚本运行期间失败,但无法在交互式 python 控制台中重现

    当我运行 eclipse 或在 iPython 中运行脚本时 它失败了 ascii codec can t decode byte 0xe2 in position 32 ordinal not in range 128 我不知道为什么 但
  • 当玩家触摸屏幕一侧时,如何让 pygame 发出警告?

    我使用 pygame 创建了一个游戏 当玩家触摸屏幕一侧时 我想让 pygame 给出类似 你不能触摸屏幕两侧 的错误 我尝试在互联网上搜索 但没有找到任何好的结果 我想过在屏幕外添加一个方块 当玩家触摸该方块时 它会发出警告 但这花了很长
  • 循环中断打破tqdm

    下面的简单代码使用tqdm https github com tqdm tqdm在循环迭代时显示进度条 import tqdm for f in tqdm tqdm range 100000000 if f gt 100000000 4 b
  • Numpy 优化

    我有一个根据条件分配值的函数 我的数据集大小通常在 30 50k 范围内 我不确定这是否是使用 numpy 的正确方法 但是当数字超过 5k 时 它会变得非常慢 有没有更好的方法让它更快 import numpy as np N 5000
  • 从 pygame 获取 numpy 数组

    我想通过 python 访问我的网络摄像头 不幸的是 由于网络摄像头的原因 openCV 无法工作 Pygame camera 使用以下代码就像魅力一样 from pygame import camera display camera in
  • 检查所有值是否作为字典中的键存在

    我有一个值列表和一本字典 我想确保列表中的每个值都作为字典中的键存在 目前我正在使用两组来确定字典中是否存在任何值 unmapped set foo set bar keys 有没有更Pythonic的方法来测试这个 感觉有点像黑客 您的方
  • 对输入求 Keras 模型的导数返回全零

    所以我有一个 Keras 模型 我想将模型的梯度应用于其输入 这就是我所做的 import tensorflow as tf from keras models import Sequential from keras layers imp
  • 使用基于正则表达式的部分匹配来选择 Pandas 数据帧的子数据帧

    我有一个 Pandas 数据框 它有两列 一列 进程参数 列 包含字符串 另一列 值 列 包含相应的浮点值 我需要过滤出部分匹配列 过程参数 中的一组键的子数据帧 并提取与这些键匹配的数据帧的两列 df pd DataFrame Proce
  • Spark.read 在 Databricks 中给出 KrbException

    我正在尝试从 databricks 笔记本连接到 SQL 数据库 以下是我的代码 jdbcDF spark read format com microsoft sqlserver jdbc spark option url jdbc sql
  • Python - 字典和列表相交

    给定以下数据结构 找出这两种数据结构共有的交集键的最有效方法是什么 dict1 2A 3A 4B list1 2A 4B Expected output 2A 4B 如果这也能产生更快的输出 我可以将列表 不是 dict1 组织到任何其他数

随机推荐

  • 将 OAuth2AuthorizedClient 解析为 Spring bean

    我有一个自动连接许多服务的控制器 这些服务是 HTTP Restful 调用 从各种数据源检索数据 但这些服务受 OAuth2 0 保护 我正在尝试使用 Spring Security 来实现客户端凭据流 该流将允许这些服务安全地从这些受保
  • CSS3 过渡事件

    元素是否会触发任何事件来检查 css3 转换是否已开始或结束 W3C CSS 过渡草案 https www w3 org TR css3 transitions transition events CSS Transition 的完成会生成
  • 我的 SQL 出了什么问题? (查找“上一条”记录)

    我的 SQL 查询应该返回以前的记录 声明 Previous 意味着它具有不同的主键 idData 相同的 SSN Number 和较早的 Received Date 问题是 Received Date 可能相等 所以我必须寻找另一列 优先
  • SQL Server 中 INET_ATON 的等价物是什么

    正如问题所说 SQL Server 相当于什么INET ATON来自 mysql 我需要这个的原因是因为我从以下位置导入了 IP 数据库http ipinfodb com ip database php http ipinfodb com
  • 对静态变量 C++ 的未定义引用

    您好 我在以下代码中收到未定义的引用错误 class Helloworld public static int x void foo void Helloworld foo Helloworld x 10 我不想要一个static foo
  • Silverlight ClientHttp Web请求超时

    我有一个 silverlight 4 应用程序 使用 ClientHttp 堆栈来创建一个提供二进制流服务的 WebRequest 然后我从这个流中读取并做一些事情 但是 我有以下问题 服务器缓冲了它发送下来的数据 因此发送过程就像发送 暂
  • c#:如何从 List 中的特定索引读取

    我有一类人员和列表集合 因为列表包含人员类的所有值 例如 列表 ilist 有 2 个值 0 firstname lastname 1 名字2 姓氏2 现在 当我迭代列表时 我可以打印列表 但我想更改列表中某些部分的值 例如在索引 1 中
  • JSF 语言切换器和 ajax 更新

    这个问题是老问题的后续JSF 2 中的语言切换器实现 https stackoverflow com questions 10204001 language switcher implementation in jsf 尽管 PrimeFa
  • jqplot - 轴标签中的上标

    正如标题所说 如何将上标字体添加到 jqplot 图表中的轴标签 我尝试使用 Javascriptsup 函数以及轴标题的实际 html 标签 但没有成功 Basically I need to display units like m3
  • 如何清除 HTML5 画布中的圆弧或圆?

    我发现有一个clearRect 方法 但找不到任何可以清除圆弧 或整圆 的方法 有什么办法可以清除画布上的弧线吗 没有clearArc但是你可以使用复合操作来实现同样的事情 context globalCompositeOperation
  • 如何使用php将base64字符串转换为二进制数组

    我有一个 Base 64 编码的字符串 看起来像这样 cuVrcYvlqYze3OZ8Y5tSqQY205mcquu0GsHkgXe4bPg 我尝试过base64 decode 输出是 r kq c R 6 w l 我想我可能做错了什么 我
  • 在Python中找到公差范围内两个矩阵的交集?

    我正在寻找找到两个不同大小矩阵的交集的最有效方法 每个矩阵具有三个变量 列 和不同数量的观测值 行 例如矩阵A a np matrix 1 5 1003 2 4 1002 4 3 1008 8 1 2005 b np matrix 7 9
  • 如何在亚马逊网络服务中从 boto3 生成 url

    我在 s3 中有一个 Bucket 我正在尝试提取其中图像的 url 我正在使用 boto3 并且 boto3 似乎没有实现生成 url 方法 他们有一个核心方法 可以生成这样的 url import botocore session se
  • 如何在数据库中存储税金?

    我需要在我的项目中添加每个省 州的税费 我正在争论是否应该在省份和税收之间添加多对多关系 或者只是向每个省份添加tax1 name tax1 rate tax2 name tax2 rate 我不认为任何地方都有超过2个税吗 我还需要存储每
  • aurelia-fetch-client 动态创建请求标头

    我正在使用 aurelia fetch client 将一些数据发送到 web api 在注册方法中 headers Headers register this headers new Headers this headers append
  • Composer 安装“无法打开流”

    每当我运行安装程序 甚至手动安装 Composer 时 我都会收到一条错误 不允许我完成安装 这很烦人 在 Laragon 中 一个程序用于使用 Composer 及其包创建许多不同的项目 它不允许我使用 Laravel 因为 Compos
  • .NET Core 实体框架存储过程

    我正在尝试将 ASP NET 4 5 应用程序移植到 NET Core 但有一个我似乎无法解决的实际问题 我现有的应用程序执行存储过程 该存储过程返回具有多个数据表的数据集 实体框架可以自动将返回的字段映射到我的实体属性 但仅适用于数据集中
  • 为什么 Chrome 开发者工具会自动阻止请求?

    我正在尝试构建一个 PWA 渐进式 Web 应用程序 但 Chrome 开发工具控制台警告说它正在阻止我的 css 和图标文件 Request was blocked by DevTools https example com styles
  • 在 Laravel 中一起使用 React.js 和 Vue.js

    我真的很想知道 是否可以在 Laravel 中同时使用 React js 和 vue js 例如 使用 ReactJs 作为管理仪表板 使用 VueJs 作为客户端仪表板 我知道使用其中之一在技术上和逻辑上更好 你怎么认为 我们可以一起使用
  • 使用 ExecuteStreamCommand 的 Python 脚本

    在尽我所能找到以前的问题和与此问题相关的示例后 仍然没有找到我正在寻找的答案 我想我会自己提交一个问题 由于以下原因 ExecuteStreamCommand 对我来说似乎是完美的处理器 我能够执行任何 Python 脚本并避免使用 Jyt