在 Google Data Flow 上的 Spring Boot 项目中运行 Apache Beam 管道

2024-05-02

我正在尝试在 Google Data Flow 上的 Spring Boot 项目中运行 Apache Beam 管道,但我一直遇到此错误Failed to construct instance from factory method DataflowRunner#fromOptions(interfaceorg.apache.beam.sdk.options.PipelineOptions

我尝试运行的示例是官方文档提供的基本字数统计,https://beam.apache.org/get-started/wordcount-example/ https://beam.apache.org/get-started/wordcount-example/。问题是这个示例为每个示例使用不同的类,并且每个示例都有自己的主要功能,但我试图做的是在 Spring Boot 项目中使用实现 CommandLineRunner 的类运行该示例。

Spring引导主类:

 @SpringBootApplication
  public class BeamApplication {
public static void main(String[] args) {
    SpringApplication.run(BeamApplication.class, args);
}}  

命令行运行器:

@Component
public class Runner implements CommandLineRunner {
@Override
public void run(String[] args) throws Exception {

    WordCountOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(WordCountOptions.class);
    runWordCount(options);
}

static void runWordCount(WordCountOptions options) throws InterruptedException {

    Pipeline p = Pipeline.create(options);

    p.apply("ReadLines", TextIO.read().from(options.getInputFile()))
            .apply(new CountWords())
            .apply(MapElements.via(new FormatAsTextFn()))
            .apply("WriteCounts", TextIO.write().to(options.getOutput()));
    p.run().waitUntilFinish();
}}

字数统计选项:

public interface WordCountOptions extends PipelineOptions {

@Description("Path of the file to read from")
@Default.String("./src/main/resources/input.txt")
String getInputFile();
void setInputFile(String value);

@Description("path of output file")
// @Validation.Required
// @Default.String("./target/ts_output/extracted_words")
@Default.String("Path of the file to write to")
String getOutput();
void setOutput(String value);
}

摘录的话:

public class ExtractWordsFn extends DoFn<String, String> {
   public static final String TOKENIZER_PATTERN = "[^\\p{L}]+";

@ProcessElement
public void processElement(ProcessContext c) {
    for (String word : c.element().split(TOKENIZER_PATTERN)) {
        if (!word.isEmpty()) {
            c.output(word);
        }}}}

计算字数:

public  class CountWords extends    PTransform<PCollection<String>,PCollection<KV<String, Long>>> {

@Override
public PCollection<KV<String, Long>> expand(PCollection<String> lines){
    // Convert lines of text into individual words.
    PCollection<String> words = lines.apply(
            ParDo.of(new ExtractWordsFn()));

    // Count the number of times each word occurs.
    PCollection<KV<String, Long>> wordCounts =
            words.apply(Count.perElement());

    return wordCounts;
}}

当我使用直接运行程序时,项目按预期工作并在项目的根目录中生成文件,但是当我尝试通过传递这些参数来使用 Google 数据流运行程序时--runner=DataflowRunner --project=datalake-ng --stagingLocation=gs://data_transformer/staging/ --output=gs://data_transformer/output(当使用 java -jar 或 Intellij 时)。 我收到帖子开头提到的错误。

我正在使用 Java 11,看了这个之后无法从 beamSql、apache beam 中的工厂方法 DataflowRunner#fromOptions 构造实例 https://stackoverflow.com/questions/49227327/failed-to-construct-instance-from-factory-method-dataflowrunnerfromoptions-in-b我尝试将我的代码放入新的 Java 8 Spring boot 项目中,但错误仍然存​​在。

当运行 Apache Beam 文档提供的项目(具有不同电源的类)时,它在 Google 数据流上运行良好,我可以在 Google 存储桶中看到生成的输出。和我的WordCountOptions接口与官方文档提供的接口相同。

该问题可能是由以下原因引起的CommandLineRunner?我虽然应用程序没有收到参数,但是当我调试这一行时,

WordCountOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(WordCountOptions.class); 

变量options具有正确的价值观,即--runner=DataflowRunner --project=target-datalake-ng --stagingLocation=gs://data_transformer/staging/ --output=gs://data_transformer/output .

EDIT:

我发现错误的原因是gcloud身份验证和访问Google云存储桶的问题(Anonymous caller does not have storage.buckets.list access to project 961543751)。我仔细检查了访问权限,它应该设置正确,因为它在 Beam 示例默认项目上运行良好。我撤销了所有访问权限并重新设置,但问题仍然存在。我看了这些https://github.com/googleapis/google-cloud-node/issues/2456 https://github.com/googleapis/google-cloud-node/issues/2456 https://github.com/googleapis/google-cloud-ruby/issues/1588 https://github.com/googleapis/google-cloud-ruby/issues/1588,我仍在尝试找出问题,但目前看来是依赖版本问题。


None

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

在 Google Data Flow 上的 Spring Boot 项目中运行 Apache Beam 管道 的相关文章

  • 如何在 Play java 中创建数据库线程池并使用该池进行数据库查询

    我目前正在使用 play java 并使用默认线程池进行数据库查询 但了解使用数据库线程池进行数据库查询可以使我的系统更加高效 目前我的代码是 import play libs Akka import scala concurrent Ex
  • 使用 Android 发送 HTTP Post 请求

    我一直在尝试从 SO 和其他网站上的大量示例中学习 但我无法弄清楚为什么我编写的示例不起作用 我正在构建一个小型概念验证应用程序 它可以识别语音并将其 文本 作为 POST 请求发送到 node js 服务器 我已确认语音识别有效 并且服务
  • Spring @RequestMapping 带有可选参数

    我的控制器在请求映射中存在可选参数的问题 请查看下面的控制器 GetMapping produces MediaType APPLICATION JSON VALUE public ResponseEntity
  • 无法解析插件 Java Spring

    我正在使用 IntelliJ IDEA 并且我尝试通过 maven 安装依赖项 但它给了我这些错误 Cannot resolve plugin org apache maven plugins maven clean plugin 3 0
  • 十进制到八进制的转换[重复]

    这个问题在这里已经有答案了 可能的重复 十进制转换错误 https stackoverflow com questions 13142977 decimal conversion error 我正在为一个类编写一个程序 并且在计算如何将八进
  • 禁止的软件包名称:java

    我尝试从数据库名称为 jaane 用户名 Hello 和密码 hello 获取数据 错误 java lang SecurityException Prohibited package name java at java lang Class
  • 在两个活动之间传输数据[重复]

    这个问题在这里已经有答案了 我正在尝试在两个不同的活动之间发送和接收数据 我在这个网站上看到了一些其他问题 但没有任何问题涉及保留头等舱的状态 例如 如果我想从 A 类发送一个整数 X 到 B 类 然后对整数 X 进行一些操作 然后将其发送
  • 如何将 pfx 文件转换为 jks,然后通过使用 wsdl 生成的类来使用它来签署传出的肥皂请求

    我正在寻找一个代码示例 该示例演示如何使用 PFX 证书通过 SSL 访问安全 Web 服务 我有证书及其密码 我首先使用下面提到的命令创建一个 KeyStore 实例 keytool importkeystore destkeystore
  • JRE 系统库 [WebSphere v6.1 JRE](未绑定)

    将项目导入 Eclipse 后 我的构建路径中出现以下错误 JRE System Library WebSphere v6 1 JRE unbound 谁知道怎么修它 右键单击项目 特性 gt Java 构建路径 gt 图书馆 gt JRE
  • 总是使用 Final?

    我读过 将某些东西做成最终的 然后在循环中使用它会带来更好的性能 但这对一切都有好处吗 我有很多地方没有循环 但我将 Final 添加到局部变量中 它会使速度变慢还是仍然很好 还有一些地方我有一个全局变量final 例如android Pa
  • AWS 无法从 START_OBJECT 中反序列化 java.lang.String 实例

    我创建了一个 Lambda 函数 我想在 API 网关的帮助下通过 URL 访问它 我已经把一切都设置好了 我还创建了一个application jsonAPI Gateway 中的正文映射模板如下所示 input input params
  • Google App Engine 如何预编译 Java?

    App Engine 对应用程序的 Java 字节码使用 预编译 过程 以增强应用程序在 Java 运行时环境中的性能 预编译代码的功能与原始字节码相同 有没有详细的信息这是做什么的 我在一个中找到了这个谷歌群组消息 http groups
  • 如何从终端运行处理应用程序

    我目前正在使用加工 http processing org对于一个小项目 但是我不喜欢它附带的文本编辑器 我使用 vim 编写所有代码 我找到了 pde 文件的位置 并且我一直在从 vim 中编辑它们 然后重新打开它们并运行它们 重新加载脚
  • 如何在桌面浏览器上使用 webdriver 移动网络

    我正在使用 selenium webdriver 进行 AUT 被测应用程序 的功能测试自动化 AUT 是响应式网络 我几乎完成了桌面浏览器的不同测试用例 现在 相同的测试用例也适用于移动浏览器 因为可以从移动浏览器访问 AUT 由于它是响
  • 声明的包“”与预期的包不匹配

    我可以编译并运行我的代码 但 VSCode 中始终显示错误 早些时候有一个弹出窗口 我不记得是什么了 我点击了 全局应用 从那以后一直是这样 Output is there but so is the error The declared
  • 如何修复 JNLP 应用程序中的“缺少代码库、权限和应用程序名称清单属性”?

    随着最近的 Java 更新 许多人都遇到了缺少 Java Web Start 应用程序的问题Codebase Permissions and Application name体现属性 尽管有资源可以帮助您完成此任务 但我找不到任何资源综合的
  • java.lang.IllegalStateException:驱动程序可执行文件的路径必须由 webdriver.chrome.driver 系统属性设置 - Similiar 不回答

    尝试学习 Selenium 我打开了类似的问题 但似乎没有任何帮助 我的代码 package seleniumPractice import org openqa selenium WebDriver import org openqa s
  • 将 List 转换为 JSON

    Hi guys 有人可以帮助我 如何将我的 HQL 查询结果转换为带有对象列表的 JSON 并通过休息服务获取它 这是我的服务方法 它返回查询结果列表 Override public List
  • 按日期对 RecyclerView 进行排序

    我正在尝试按日期对 RecyclerView 进行排序 但我尝试了太多的事情 我不知道现在该尝试什么 问题就出在这条线上适配器 notifyDataSetChanged 因为如果我不放 不会显示错误 但也不会更新 recyclerview
  • 如何在发布期间复制未版本化的测试资源:执行?

    我的问题与 Maven 在发布时不会复制未跟踪的资源 https stackoverflow com questions 10378708 maven doesnt copy untracked resources while releas

随机推荐