编辑 2018-01-27:
事实证明,这个问题与DirectRunner有关。如果您使用 DataflowRunner 运行相同的管道,您应该获得实际上最多 1,000 条记录的批次。 DirectRunner 在分组操作后始终创建大小为 1 的包。
原答案:
使用 Apache Beam 的 JdbcIO 写入云数据库时,我遇到了同样的问题。问题是,虽然 JdbcIO 确实支持批量写入最多 1,000 条记录,但我从未真正见过它一次写入超过 1 行(我必须承认:这总是在开发环境中使用 DirectRunner)。
因此,我向 JdbcIO 添加了一项功能,您可以通过将数据分组在一起并将每个组作为一个批次写入来自行控制批次的大小。下面是基于 Apache Beam 原始 WordCount 示例的如何使用此功能的示例。
p.apply("ReadLines", TextIO.read().from(options.getInputFile()))
// Count words in input file(s)
.apply(new CountWords())
// Format as text
.apply(MapElements.via(new FormatAsTextFn()))
// Make key-value pairs with the first letter as the key
.apply(ParDo.of(new FirstLetterAsKey()))
// Group the words by first letter
.apply(GroupByKey.<String, String> create())
// Get a PCollection of only the values, discarding the keys
.apply(ParDo.of(new GetValues()))
// Write the words to the database
.apply(JdbcIO.<String> writeIterable()
.withDataSourceConfiguration(
JdbcIO.DataSourceConfiguration.create(options.getJdbcDriver(), options.getURL()))
.withStatement(INSERT_OR_UPDATE_SQL)
.withPreparedStatementSetter(new WordCountPreparedStatementSetter()));
与JdbcIO普通写入方法的区别在于新的方法writeIterable()
这需要一个PCollection<Iterable<RowT>>
作为输入而不是PCollection<RowT>
。每个 Iterable 都作为一批写入数据库。
添加此内容的 JdbcIO 版本可以在此处找到:https://github.com/olavloite/beam/blob/JdbcIOIterableWrite/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java https://github.com/olavloite/beam/blob/JdbcIOIterableWrite/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
包含上述示例的整个示例项目可以在这里找到:https://github.com/olavloite/spanner-beam-example https://github.com/olavloite/spanner-beam-example
(Apache Beam 上还有一个待处理的拉取请求,希望将其包含在项目中)