将参数从 Cloud 函数传递到 Dataflow

2023-12-02

我想将 Google Cloud Storage 上上传的文件的文件名从 Cloud Functions 传递到 Dataflow,以便我可以处理上传的文件。

我为云函数编写的代码是

const google = require('googleapis');

exports.goWithTheDataFlow = function(event, callback) {
 const file = event.data;
 if (file.resourceState === 'exists' && file.name) {
     console.log(file.name);
   google.auth.getApplicationDefault(function (err, authClient, projectId) {
     if (err) {
       throw err;
     }

     if (authClient.createScopedRequired && authClient.createScopedRequired()) {
       authClient = authClient.createScoped([
         'https://www.googleapis.com/auth/cloud-platform',
         'https://www.googleapis.com/auth/userinfo.email'
       ]);
     }

     const dataflow = google.dataflow({ version: 'v1b3', auth: authClient });

     dataflow.projects.templates.create({
       projectId: '-------',
       resource: {
           parameters: {
           inputFile: `gs://${file.bucket}/${file.name}`
         },
         jobName: '-------',
         gcsPath: '-------'
       }
     }, function(err, response) {
       if (err) {
         console.error("problem running dataflow template, error was: ", err);
       }
       console.log("Dataflow template response: ", response);
       callback();
     });

   });
 }
};

我的管道的代码看起来像这样:

public interface FruitOptions extends PipelineOptions {
      @Description("Path of the file to read from")
      @Validation.Required
      ValueProvider<String> getInputFile();
      void setInputFile(ValueProvider<String> value);
  }


//Main Method
FruitOptions options = PipelineOptionsFactory.fromArgs(args).withValidation()
      .as(FruitOptions.class);
      Pipeline p = Pipeline.create(options);

如何创建上述 Pipeline 代码的模板?我一直在关注链接使用云函数触发数据流管道这样做,但我在运行 Maven 命令时遇到几个错误,例如:

[WARNING]
java.lang.NoClassDefFoundError: com/google/cloud/dataflow/sdk/options/PipelineOp
tions
        at java.lang.Class.getDeclaredMethods0(Native Method)
        at java.lang.Class.privateGetDeclaredMethods(Class.java:2701)
        at java.lang.Class.privateGetMethodRecursive(Class.java:3048)
        at java.lang.Class.getMethod0(Class.java:3018)
        at java.lang.Class.getMethod(Class.java:1784)
        at org.codehaus.mojo.exec.ExecJavaMojo$1.run(ExecJavaMojo.java:281)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.ClassNotFoundException: com.google.cloud.dataflow.sdk.optio
ns.PipelineOptions
        at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
        ... 7 more
[INFO] ------------------------------------------------------------------------
[INFO] BUILD FAILURE
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 12.411 s
[INFO] Finished at: 2017-06-29T12:16:15+05:30
[INFO] Final Memory: 11M/27M
[INFO] ------------------------------------------------------------------------
[ERROR] Failed to execute goal org.codehaus.mojo:exec-maven-plugin:1.4.0:java (d
efault-cli) on project Common: An exception occured while executing the Java cla
ss. com/google/cloud/dataflow/sdk/options/PipelineOptions: com.google.cloud.data
flow.sdk.options.PipelineOptions -> [Help 1]
[ERROR]
[ERROR] To see the full stack trace of the errors, re-run Maven with the -e swit
ch.
[ERROR] Re-run Maven using the -X switch to enable full debug logging.
[ERROR]
[ERROR] For more information about the errors and possible solutions, please rea
d the following articles:
[ERROR] [Help 1] http://cwiki.apache.org/confluence/display/MAVEN/MojoExecutionE
xception

我运行的命令是:

mvn compile exec:java -Dexec.mainClass=Common.StarterPipeline -Dexec.args="--project=******** --stagingLocation=gs://******** --dataflowJobFile=gs://********* --runner=TemplatingDataflowPipelineRunner"

POM 文件:

<project xmlns="http://maven.apache.org/POM/4.0.0"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>

  <groupId>Common</groupId>
  <artifactId>Common</artifactId>
  <version>0.0.1-SNAPSHOT</version>

  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  </properties>

  <repositories>
    <repository>
      <id>ossrh.snapshots</id>
      <name>Sonatype OSS Repository Hosting</name>
      <url>https://oss.sonatype.org/content/repositories/snapshots/</url>
      <releases>
        <enabled>false</enabled>
      </releases>
      <snapshots>
        <enabled>true</enabled>
      </snapshots>
    </repository>
  </repositories>

  <build>
   <plugins>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-compiler-plugin</artifactId>
        <version>3.5.1</version>
        <configuration>
          <source>1.8</source>
          <target>1.8</target>
        </configuration>
      </plugin>
    </plugins>

    <pluginManagement>
      <plugins>
        <plugin>
          <groupId>org.codehaus.mojo</groupId>
          <artifactId>exec-maven-plugin</artifactId>
          <version>1.4.0</version>
          <configuration>
            <cleanupDaemonThreads>false</cleanupDaemonThreads>
          </configuration>
        </plugin>
      </plugins>
    </pluginManagement>
  </build>

  <dependencies>
    <dependency>
      <groupId>com.google.cloud.dataflow</groupId>
      <artifactId>google-cloud-dataflow-java-sdk-all</artifactId>
      <version>2.0.0</version>
    </dependency>

    <!-- slf4j API frontend binding with JUL backend -->
    <dependency>
      <groupId>org.slf4j</groupId>
      <artifactId>slf4j-api</artifactId>
      <version>1.7.14</version>
    </dependency>
    <dependency>
      <groupId>org.slf4j</groupId>
      <artifactId>slf4j-jdk14</artifactId>
      <version>1.7.14</version>
    </dependency>
  </dependencies>
</project>

有人可以帮我解决这个问题吗?如果我做错了什么,请告诉我。谢谢


None

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

将参数从 Cloud 函数传递到 Dataflow 的相关文章

  • 如何在 firebase 中设置重复项目? [复制]

    这个问题在这里已经有答案了 我想在 firebase 中创建一个重复的项目 这样我就不必经历添加 firebase 功能和通知等的麻烦 如果可以的话 我会删除所有身份验证用户 以便为实际的应用程序做好准备 但我无法做到这一点 那么 如何在没
  • firebase批量更新和onWrite触发同步

    我在同步两个 Firebase 云函数时遇到问题 第一个函数对多个文档执行批量更新 第二个函数由onWrite触发这些文档之一 为了便于说明 假设我有两个文档A and B 在两个单独的集合中 第一个云功能更新两个文档A and B有消防库
  • page.goto() 上的云函数超时

    我在云函数中使用 puppeteer 运行测试 如果我在本地机器上运行测试一切都很好 如果我在云函数模拟器中运行测试也没关系 但是当我将函数部署到云端时 所有测试都停留在 page goto https 并且函数因超时而失败 在我的例子中是
  • 如何使用 Cloud Functions for Firebase 更新 Firebase 实时数据库中的值

    我浏览了 firebase 文档 使用 Firebase 的 Cloud Functions 更新实时数据库中的值 但无法理解 我的数据库结构是 user KdD1f0ecmVXHZ3H3abZ email email protected
  • 对 Google Cloud Functions 使用 us-central1 以外的区域

    据我所知 Google Cloud Functions 仅在 us central1 中可用 因为选择区域的下拉菜单仅允许我选择 us central1 并且当我尝试使用与该区域不同的内容编写自己的部署脚本时像 us east4 这样的选项
  • Cloud Functions,删除Firestore SubCollections,是否需要AdminToken?

    我正在尝试构建可调用的云函数 当用户删除帖子时 它也会尝试删除评论 这是帖子的子集合 所以我看到了这个例子并像文档示例一样实现 const admin require firebase admin const firebase tools
  • 使用谷歌云数据流PubSubIO,消息的读取何时得到确认?

    是否可以延迟确认 直到成功处理子图 PubSubIO Read 下面的所有内容 例如 我们是流媒体从 google pubsub 订阅中读取数据 然后将文件写入 GCS 在另一个分支中 我们使用 BigQueryIO Write 写入 Bi
  • Firebase CLI 部署错误:“现在在 Firebase CLI 中禁用部署到 Node.js 10 以下的运行时。”

    我有一个使用 Cloud Functions for Firebase 的项目 在将 Firebase CLI 更新到版本 9 0 0 后 我收到一条错误消息 错误 函数目录中的 package json 有一个引擎字段 不受支持 有效的选
  • nuxt v2 和 firebase 函数的部署错误

    当我尝试在 firebase 函数中渲染 nuxt 时 出现 用户代码加载失败 无法确定后端规范 错误 其他功能都可以部署 但是无论模式是SSR还是SPA 只有nuxt功能失败 我查看了日志 没有发现任何有用的信息 当我查看 无法确定后端规
  • Firebase Cloud Storage - 使用元数据上传 -

    我希望从浏览器上传带有元数据的文件 以便通过云功能正确识别和处理文件 在客户端上 我的上传器代码如下所示 uploadTaskPromise async function file return new Promise resolve re
  • 云函数 onUpdate:无法读取未定义的属性“forEach”

    现在我正在尝试更新我的项目中的图片 我可以更新云火商店中的图片网址 但我也想使用 firebase 云功能从云存储中删除上一张图片 我想要实现的是 当我上传新图片时 从云存储中删除以前的图片 This is my data structur
  • 如何在欧洲使用 Cloud Dataflow 区域终端节点?

    是否可以将 Google Cloud Platform Dataflow 作业的区域更改为欧洲 我已将管道区域设置为europe west1 d但我无法更改工作本身的区域 我尝试更改管道选项中的区域 但这会导致错误 并且只有默认区域有效 p
  • 将侧输入应用于 Apache Beam 中的 BigQueryIO.read 操作

    有没有办法将侧面输入应用于 Apache Beam 中的 BigQueryIO read 操作 举例来说 我在 PCollection 中有一个值 我想在查询中使用该值从 BigQuery 表中获取数据 使用侧面输入可以吗 或者在这种情况下
  • 尝试部署 firebase 函数部署错误:无法配置触发器[关闭]

    Closed 这个问题是无法重现或由拼写错误引起 help closed questions 目前不接受答案 我的云功能可以正常工作 现在我尝试部署云功能 我得到了 Error functions limitTasksPerCreatorF
  • Python Apache Beam 端输入断言错误

    我对 Apache Beam Cloud Dataflow 还很陌生 所以如果我的理解不正确 我深表歉意 我正在尝试通过管道读取大约 30 000 行长的数据文件 我的简单管道首先从 GCS 打开 csv 从数据中提取标题 通过 ParDo
  • 将 google 端点中的路径参数传递到后端不起作用

    我的设置包含 google endpoints 和 google cloud functions 作为我的后端 Google 端点是使用以下 swagger v2 yaml 定义的 swagger 2 0 info description
  • 在 npm 上使用 firebase 部署时出错 --prefix $RESOURCE_DIR run lint

    我全新安装了 firebase 工具 在此之后tutorial https firebase google com docs functions get started 我正在尝试上传我的第一个 firebase 函数 我在运行 fireb
  • 调试firebase云函数

    如何使用 Visual Studio 代码进行调试firebase 数据库 trigger功能 我尝试了模拟器 但是当我调用它时出现错误 functions debug myMethod C functions functions gt f
  • Firebase 函数,admin.database().ref(...).get() 不是函数

    我正在开发一个 Android 应用程序 并使用 firebase 作为它的后端 我正在尝试让通知系统正常工作 该系统依赖于监听数据库中的更改 但遇到问题 因为我收到以下错误 想知道是否有人能够提供帮助 可以提供任何额外的代码 Fireba
  • 谷歌云功能发送重复通知

    我有一个发送主题通知的 gcf 我从管理 Android 应用程序触发该功能 一切都按预期工作 但突然该函数发送通知两次 有时三次 谷歌云上的函数日志显示该函数已发送一条通知 它只打印一行 定制发送成功 但 Android 应用程序会收到多

随机推荐