Apache Beam 数据流中的外部 API 调用

2023-12-26

我有一个用例,我读取存储在谷歌云存储中的换行 json 元素并开始处理每个 json。在处理每个 json 时,我必须调用外部 API 来进行重复数据删除,无论该 json 元素之前是否被发现。我正在做一个ParDo with a DoFn在每个 json.

我还没有看到任何在线教程说明如何从 apache beam 调用外部 API 端点DoFn数据流。

我在用着JAVABeam的SDK。我学习的一些教程解释说,使用startBundle and FinishBundle但我不清楚如何使用它


如果您需要检查外部存储中每个 JSON 记录的重复项,那么您仍然可以使用DoFn为了那个原因。有几个注释,例如@Setup, @StartBundle, @FinishBundle等,可用于注释您的方法DoFn.

例如,如果您需要实例化客户端对象以将请求发送到外部数据库,那么您可能需要在@Setup方法(如 POJO 构造函数),然后在您的@ProcessElement method.

让我们考虑一个简单的例子:

static class MyDoFn extends DoFn<Record, Record> {

    static transient MyClient client;

    @Setup
    public void setup() {
        client = new MyClient("host");
    }

    @ProcessElement
    public void processElement(ProcessContext c) {
        // process your records
        Record r = c.element();
        // check record ID for duplicates
        if (!client.isRecordExist(r.id()) {
            c.output(r);
        }
    }

    @Teardown
    public void teardown() {
        if (client != null) {
            client.close();
            client = null;
        }
    }
}

此外,为了避免对每条记录进行远程调用,您可以将捆绑记录批处理到内部缓冲区(将输入数据分束到捆绑中)并以批处理模式检查重复项(如果您的客户端支持此操作)。为此,您可以使用@StartBundle and @FinishBundle带注释的方法将在相应处理 Beam 束之前和之后调用。

对于更复杂的示例,我建议查看不同 Beam IO 中的 Sink 实现,例如运动IO https://github.com/apache/beam/blob/e65d457e8ccaddaf291de553bd17e8035ef7f43a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisIO.java#L603, 例如。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Apache Beam 数据流中的外部 API 调用 的相关文章

  • 在 Java 中连接和使用 Cassandra

    我已经阅读了一些关于 Cassandra 是什么以及它可以做什么的教程 但我的问题是如何在 Java 中与 Cassandra 交互 教程会很好 如果可能的话 有人可以告诉我是否应该使用 Thrift 还是 Hector 哪一个更好以及为什
  • 为什么 i++ 不是原子的?

    Why is i Java 中不是原子的 为了更深入地了解 Java 我尝试计算线程中循环的执行频率 所以我用了一个 private static int total 0 在主课中 我有两个线程 主题 1 打印System out prin
  • 多个 Maven 配置文件激活多个 Spring 配置文件

    我想在 Maven 中构建一个环境 在其中我想根据哪些 Maven 配置文件处于活动状态来累积激活多个 spring 配置文件 目前我的 pom xml 的相关部分如下所示
  • 控制Android的前置LED灯

    我试图在用户按下某个按钮时在前面的 LED 上实现 1 秒红色闪烁 但我很难找到有关如何访问和使用前置 LED 的文档 教程甚至代码示例 我的意思是位于 自拍 相机和触摸屏附近的 LED 我已经看到了使用手电筒和相机类 已弃用 的示例 但我
  • Liferay ClassNotFoundException:DLFileEntryImpl

    在我的 6 1 0 Portal 实例上 带有使用 ServiceBuilder 和 DL Api 的 6 1 0 SDK Portlet 这一行 DynamicQuery query DynamicQueryFactoryUtil for
  • Spring Data JPA 应用排序、分页以及 where 子句

    我目前正在使用 Spring JPA 并利用此处所述的排序和分页 如何通过Spring data JPA通过排序和可分页查询数据 https stackoverflow com questions 10527124 how to query
  • 无法解析插件 Java Spring

    我正在使用 IntelliJ IDEA 并且我尝试通过 maven 安装依赖项 但它给了我这些错误 Cannot resolve plugin org apache maven plugins maven clean plugin 3 0
  • 十进制到八进制的转换[重复]

    这个问题在这里已经有答案了 可能的重复 十进制转换错误 https stackoverflow com questions 13142977 decimal conversion error 我正在为一个类编写一个程序 并且在计算如何将八进
  • 如何为俚语和表情符号构建正则表达式 (regex)

    我需要构建一个正则表达式来匹配俚语 即 lol lmao imo 等 和表情符号 即 P 等 我按照以下示例进行操作http www coderanch com t 497238 java java Regular Expression D
  • 从 127.0.0.1 到 2130706433,然后再返回

    使用标准 Java 库 从 IPV4 地址的点分字符串表示形式获取的最快方法是什么 127 0 0 1 到等效的整数表示 2130706433 相应地 反转所述操作的最快方法是什么 从整数开始2130706433到字符串表示形式 127 0
  • 总是使用 Final?

    我读过 将某些东西做成最终的 然后在循环中使用它会带来更好的性能 但这对一切都有好处吗 我有很多地方没有循环 但我将 Final 添加到局部变量中 它会使速度变慢还是仍然很好 还有一些地方我有一个全局变量final 例如android Pa
  • 加密 JBoss 配置中的敏感信息

    JBoss 中的标准数据源配置要求数据库用户的用户名和密码位于 xxx ds xml 文件中 如果我将数据源定义为 c3p0 mbean 我会遇到同样的问题 是否有标准方法来加密用户和密码 保存密钥的好地方是什么 这当然也与 tomcat
  • Java Integer CompareTo() - 为什么使用比较与减法?

    我发现java lang Integer实施compareTo方法如下 public int compareTo Integer anotherInteger int thisVal this value int anotherVal an
  • 如何在控制器、服务和存储库模式中使用 DTO

    我正在遵循控制器 服务和存储库模式 我只是想知道 DTO 在哪里出现 控制器应该只接收 DTO 吗 我的理解是您不希望外界了解底层域模型 从领域模型到 DTO 的转换应该发生在控制器层还是服务层 在今天使用 Spring MVC 和交互式
  • 如何从指定日期获取上周五的日期? [复制]

    这个问题在这里已经有答案了 如何找出上一个 上一个 星期五 或指定日期的任何其他日期的日期 public getDateOnDay Date date String dayName 我不会给出答案 先自己尝试一下 但是 也许这些提示可以帮助
  • simpleframework,将空元素反序列化为空字符串而不是 null

    我使用简单框架 http simple sourceforge net http simple sourceforge net 在一个项目中满足我的序列化 反序列化需求 但在处理空 空字符串值时它不能按预期工作 好吧 至少不是我所期望的 如
  • 在 Maven 依赖项中指定 jar 和 test-jar 类型

    我有一个名为 commons 的项目 其中包含运行时和测试的常见内容 在主项目中 我添加了公共资源的依赖项
  • java.lang.IllegalStateException:驱动程序可执行文件的路径必须由 webdriver.chrome.driver 系统属性设置 - Similiar 不回答

    尝试学习 Selenium 我打开了类似的问题 但似乎没有任何帮助 我的代码 package seleniumPractice import org openqa selenium WebDriver import org openqa s
  • 将 List 转换为 JSON

    Hi guys 有人可以帮助我 如何将我的 HQL 查询结果转换为带有对象列表的 JSON 并通过休息服务获取它 这是我的服务方法 它返回查询结果列表 Override public List
  • 按日期对 RecyclerView 进行排序

    我正在尝试按日期对 RecyclerView 进行排序 但我尝试了太多的事情 我不知道现在该尝试什么 问题就出在这条线上适配器 notifyDataSetChanged 因为如果我不放 不会显示错误 但也不会更新 recyclerview

随机推荐

  • 绑定字符串格式数字逗号且无小数位

    好吧 这是一个简单的问题 但我非常感谢你的帮助 因为我已经花了一个小时试图让它工作 如何更改以下内容以去掉小数位并仅显示整数 Binding Binding ANLA StringFormat n 我知道格式是这样的 0 0 0 但我无法让
  • static_cast 从 Derived* 到 void* 到 Base*

    我想将一个指向派生类成员的指针转换为void 并从那里指向基类的指针 如下例所示 include
  • 生成随机字母字符串的有效方法?

    我想要一个随机字母表中所有字符的字符串 现在 我创建一个包含 26 个字符的可变数组 使用 ExchangeObjectAtIndex 方法对它们进行打乱 然后将每个字符添加到我返回的字符串中 必须有更好的方法来做到这一点 这是我的代码 N
  • 重写 hashcode 方法时的 HashMap 性能

    In a HashMap 如果我将自定义对象作为键 如果我重写会发生什么hashCode 方法并实现它以将值传递为 1 会有任何性能影响吗 如果我改变hashCode 使用返回随机值的方法Math random 功能 性能会发生什么变化 添
  • 如何在 Python 中将 MP3 转换为 WAV

    如果我有 MP3 文件 如何将其转换为 WAV 文件 最好使用纯Python方法 我维护一个开源库 pydub http pydub com 这可以帮助您解决这个问题 from pydub import AudioSegment sound
  • C++中将一个类对象分配给另一个类对象

    我想在 C 中将一个类对象分配给另一个类对象 Ex 有一个类别为 狗 另一个类别为 猫 为每个 d1 c1 创建一个实例 不想使用任何STL 我想在我的代码中使用这个语句 d1 c1 Program class dog char dc fl
  • jQuery + RGBA 彩色动画

    有谁知道 jQuery 是否可以处理如下动画 rgba 0 0 0 0 2 rgba 0 255 0 0 4 我知道有一个plugin http github com jquery jquery color blob master jque
  • 无法使用 dart:ffi 在 Flutter 中使用编译后的 C 文件

    我正在尝试使用 C 代码并在 flutter 中调用 main 方法 我不确定我们是否可以在 flutter 中执行此操作 我编译了C文件 没问题 当我在 dart 文件中使用它并使用命令运行它时 dart myDartCode dart
  • 从名称作为变量传递的表中进行选择

    我正在尝试编写一个简单的存储过程 它采用三个参数 数据库名称一 数据库名称二 和 表名称 然后 sql 将对每个数据库中定义的表执行行计数并存储它 零碎地工作我遇到了你做不到的第一个问题 select from tablename 我知道你
  • Azure 网站上的 SQLXML

    我的应用程序正在使用库 SqlXML 并且我正在尝试将此应用程序发布到 Azure Web 服务 发布后我收到此错误 Retrieving the COM class factory for component with CLSID 83D
  • 调车场算法及功能调试

    我想在调车场算法中除了运算符之外实现 函数 并根据结果算法做一个小解释器 但是 默认算法会忽略标记的语法错误使用 有没有人写过一个解释器 或不想 想帮助我 这将帮助很多陷入这个问题的人 这里列出了一些测试 shunting yard 函数忽
  • Powershell - 创建计划任务作为本地系统/服务运行

    谁能告诉我如何使用作为本地系统或本地服务运行的 powershell 创建计划任务 除了对 ITaskFolder RegisterTaskDefinition 的调用之外 一切都运行良好 如果我传入 null 或 则调用炸弹会说用户名或密
  • 如何将 activerecord 结果转换为哈希值?

    我有一个查询成功返回 ActiveRecord 中的结果 select trunc b transaction date as transaction date sum a transaction amount as transaction
  • Gradle 是否支持 Ivy 存储库的分类器?

    我正在尝试基于分类器从 Ivy 存储库 在 Artifactory 中 检索 Gradle 依赖项 以过滤包含本机代码的 DLL 以获取相关处理器架构的 DLL 我的 build gradle 看起来像这样 repositories ivy
  • 使用评分在 SQL 中查找最佳匹配

    假设我有一个数据表 例如 ID Col1 Col2 Col3 1 a b 23 2 a c 14 3 f g 11 假设我有一个 POSSIBLE MATCHES 表 例如 MatchID Col1 Col2 Col3 101 a a 11
  • 识别地址是否属于堆、堆栈或寄存器

    我有一个指向 C C 变量的指针 是否可以准确地找出该变量属于内存的哪一段 如果是 怎么办 注意 我只有这个变量的地址 如果变量是本地 全局等 则没有更多信息 查明您的体系结构是否有指向堆或堆栈区域的指针 通常有一些堆栈指针或帧指针 然后将
  • 跟踪被忽略目录中的文件

    前段时间我设置了我的 gitignore文件至not跟踪文件夹my folder with my folder 现在我只想跟踪所述文件夹内的给定文件 名为my file md 制作完成后 gitignore看起来像这样 my folder
  • Apscheduler 正在多次执行作业

    我有一个使用 uwsgi 有 10 个工作人员 ngnix 运行的 django 应用程序 我正在使用 apscheduler 进行调度 每当我安排一项作业时 它就会被执行多次 从这些答案中ans1 https stackoverflow
  • Nancy (C#):如何获取我的帖子数据?

    我正在使用 Corona SDK 将数据发布到我的 C 服务器 headers Content Type application x www form urlencoded headers Accept Language en US loc
  • Apache Beam 数据流中的外部 API 调用

    我有一个用例 我读取存储在谷歌云存储中的换行 json 元素并开始处理每个 json 在处理每个 json 时 我必须调用外部 API 来进行重复数据删除 无论该 json 元素之前是否被发现 我正在做一个ParDo with a DoFn