将参数从 Cloud 函数传递到 Dataflow


我想将 Google Cloud Storage 上上传的文件的文件名从 Cloud Functions 传递到 Dataflow,以便我可以处理上传的文件。


const google = require('googleapis');

exports.goWithTheDataFlow = function(event, callback) {
 const file = event.data;
 if (file.resourceState === 'exists' && file.name) {
   google.auth.getApplicationDefault(function (err, authClient, projectId) {
     if (err) {
       throw err;

     if (authClient.createScopedRequired && authClient.createScopedRequired()) {
       authClient = authClient.createScoped([

     const dataflow = google.dataflow({ version: 'v1b3', auth: authClient });

       projectId: '-------',
       resource: {
           parameters: {
           inputFile: `gs://${file.bucket}/${file.name}`
         jobName: '-------',
         gcsPath: '-------'
     }, function(err, response) {
       if (err) {
         console.error("problem running dataflow template, error was: ", err);
       console.log("Dataflow template response: ", response);



public interface FruitOptions extends PipelineOptions {
      @Description("Path of the file to read from")
      ValueProvider<String> getInputFile();
      void setInputFile(ValueProvider<String> value);

//Main Method
FruitOptions options = PipelineOptionsFactory.fromArgs(args).withValidation()
      Pipeline p = Pipeline.create(options);

如何创建上述 Pipeline 代码的模板?我一直在关注链接使用云函数触发数据流管道这样做,但我在运行 Maven 命令时遇到几个错误,例如:

java.lang.NoClassDefFoundError: com/google/cloud/dataflow/sdk/options/PipelineOp
        at java.lang.Class.getDeclaredMethods0(Native Method)
        at java.lang.Class.privateGetDeclaredMethods(Class.java:2701)
        at java.lang.Class.privateGetMethodRecursive(Class.java:3048)
        at java.lang.Class.getMethod0(Class.java:3018)
        at java.lang.Class.getMethod(Class.java:1784)
        at org.codehaus.mojo.exec.ExecJavaMojo$1.run(ExecJavaMojo.java:281)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.ClassNotFoundException: com.google.cloud.dataflow.sdk.optio
        at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
        ... 7 more
[INFO] ------------------------------------------------------------------------
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 12.411 s
[INFO] Finished at: 2017-06-29T12:16:15+05:30
[INFO] Final Memory: 11M/27M
[INFO] ------------------------------------------------------------------------
[ERROR] Failed to execute goal org.codehaus.mojo:exec-maven-plugin:1.4.0:java (d
efault-cli) on project Common: An exception occured while executing the Java cla
ss. com/google/cloud/dataflow/sdk/options/PipelineOptions: com.google.cloud.data
flow.sdk.options.PipelineOptions -> [Help 1]
[ERROR] To see the full stack trace of the errors, re-run Maven with the -e swit
[ERROR] Re-run Maven using the -X switch to enable full debug logging.
[ERROR] For more information about the errors and possible solutions, please rea
d the following articles:
[ERROR] [Help 1] http://cwiki.apache.org/confluence/display/MAVEN/MojoExecutionE


mvn compile exec:java -Dexec.mainClass=Common.StarterPipeline -Dexec.args="--project=******** --stagingLocation=gs://******** --dataflowJobFile=gs://********* --runner=TemplatingDataflowPipelineRunner"

POM 文件:

<project xmlns="http://maven.apache.org/POM/4.0.0"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">



      <name>Sonatype OSS Repository Hosting</name>




    <!-- slf4j API frontend binding with JUL backend -->




