我正在尝试在 Google Data Flow 上的 Spring Boot 项目中运行 Apache Beam 管道,但我一直遇到此错误Failed to construct instance from factory method DataflowRunner#fromOptions(interfaceorg.apache.beam.sdk.options.PipelineOptions
我尝试运行的示例是官方文档提供的基本字数统计,https://beam.apache.org/get-started/wordcount-example/ https://beam.apache.org/get-started/wordcount-example/。问题是这个示例为每个示例使用不同的类,并且每个示例都有自己的主要功能,但我试图做的是在 Spring Boot 项目中使用实现 CommandLineRunner 的类运行该示例。
Spring引导主类:
@SpringBootApplication
public class BeamApplication {
public static void main(String[] args) {
SpringApplication.run(BeamApplication.class, args);
}}
命令行运行器:
@Component
public class Runner implements CommandLineRunner {
@Override
public void run(String[] args) throws Exception {
WordCountOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(WordCountOptions.class);
runWordCount(options);
}
static void runWordCount(WordCountOptions options) throws InterruptedException {
Pipeline p = Pipeline.create(options);
p.apply("ReadLines", TextIO.read().from(options.getInputFile()))
.apply(new CountWords())
.apply(MapElements.via(new FormatAsTextFn()))
.apply("WriteCounts", TextIO.write().to(options.getOutput()));
p.run().waitUntilFinish();
}}
字数统计选项:
public interface WordCountOptions extends PipelineOptions {
@Description("Path of the file to read from")
@Default.String("./src/main/resources/input.txt")
String getInputFile();
void setInputFile(String value);
@Description("path of output file")
// @Validation.Required
// @Default.String("./target/ts_output/extracted_words")
@Default.String("Path of the file to write to")
String getOutput();
void setOutput(String value);
}
摘录的话:
public class ExtractWordsFn extends DoFn<String, String> {
public static final String TOKENIZER_PATTERN = "[^\\p{L}]+";
@ProcessElement
public void processElement(ProcessContext c) {
for (String word : c.element().split(TOKENIZER_PATTERN)) {
if (!word.isEmpty()) {
c.output(word);
}}}}
计算字数:
public class CountWords extends PTransform<PCollection<String>,PCollection<KV<String, Long>>> {
@Override
public PCollection<KV<String, Long>> expand(PCollection<String> lines){
// Convert lines of text into individual words.
PCollection<String> words = lines.apply(
ParDo.of(new ExtractWordsFn()));
// Count the number of times each word occurs.
PCollection<KV<String, Long>> wordCounts =
words.apply(Count.perElement());
return wordCounts;
}}
当我使用直接运行程序时,项目按预期工作并在项目的根目录中生成文件,但是当我尝试通过传递这些参数来使用 Google 数据流运行程序时--runner=DataflowRunner --project=datalake-ng --stagingLocation=gs://data_transformer/staging/ --output=gs://data_transformer/output
(当使用 java -jar 或 Intellij 时)。
我收到帖子开头提到的错误。
我正在使用 Java 11,看了这个之后无法从 beamSql、apache beam 中的工厂方法 DataflowRunner#fromOptions 构造实例 https://stackoverflow.com/questions/49227327/failed-to-construct-instance-from-factory-method-dataflowrunnerfromoptions-in-b我尝试将我的代码放入新的 Java 8 Spring boot 项目中,但错误仍然存在。
当运行 Apache Beam 文档提供的项目(具有不同电源的类)时,它在 Google 数据流上运行良好,我可以在 Google 存储桶中看到生成的输出。和我的WordCountOptions
接口与官方文档提供的接口相同。
该问题可能是由以下原因引起的CommandLineRunner
?我虽然应用程序没有收到参数,但是当我调试这一行时,
WordCountOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(WordCountOptions.class);
变量options
具有正确的价值观,即--runner=DataflowRunner --project=target-datalake-ng --stagingLocation=gs://data_transformer/staging/ --output=gs://data_transformer/output
.
EDIT:
我发现错误的原因是gcloud身份验证和访问Google云存储桶的问题(Anonymous caller does not have storage.buckets.list access to project 961543751
)。我仔细检查了访问权限,它应该设置正确,因为它在 Beam 示例默认项目上运行良好。我撤销了所有访问权限并重新设置,但问题仍然存在。我看了这些https://github.com/googleapis/google-cloud-node/issues/2456 https://github.com/googleapis/google-cloud-node/issues/2456
https://github.com/googleapis/google-cloud-ruby/issues/1588 https://github.com/googleapis/google-cloud-ruby/issues/1588,我仍在尝试找出问题,但目前看来是依赖版本问题。