我们的管道如下所示:
GCS(gz 压缩文件)-> ParDo -> BigQuery
我想使用“展平”从 GCS 中提取多个文件作为管道的输入。但它一直因错误而烦恼:
Workflow failed. Causes: (5001e5764f46ac2c): BigQuery creation of import job for table "Impressions_05_2015_denormalized_test" in dataset "CPT_XXXX" in project "gdfp-XXXX" failed. Causes: (5001e5764f46a1cf): Error:
Message: Load configuration must specify at least one source URI
HTTP Code: 400
Code:
PCollection<String> file1 = pipeline.apply(TextIO.Read.from("gs://<bucket_name_removed>/NetworkActiveViews_232503_20140918_21.gz").withCompressionType(TextIO.CompressionType.GZIP));
PCollection<String> file2 = pipeline.apply(TextIO.Read.from("gs://<bucket_name_removed>/NetworkActiveViews_232503_20140918_22.gz").withCompressionType(TextIO.CompressionType.GZIP));
PCollectionList<String> allFiles = PCollectionList.of(file1).and(file2);
PCollection<String> inputRead = allFiles.apply(Flatten.<String>pCollections());
inputRead.apply(ParDo.of(transformation)
.named(String.format("%s-CPT-transform", type))
.withSideInputs(views))
.apply(Write.to(getOutputTable(type))
.withCreateDisposition(CREATE_IF_NEEDED)
.withWriteDisposition(WRITE_APPEND)
.withSchema(schema)
.named(String.format("%s-BQ-write", type)));
作业 ID 示例:2015-05-12_19_54_06-10158770219525037626
我究竟做错了什么?
我没有采用提议的黑客方法(这确实非常粗糙),而是在finishBundle()
方法。这将为每个包写入 1 个空行,但我们可以忍受这一点,直到推出修复程序。设置“id”可以更轻松地稍后过滤掉这些行。
此外,这种解决方法/黑客更容易实现:
@Override
public void finishBundle(Context c) throws Exception {
TableRow workaroundRow = new TableRow();
workaroundRow.set("id", "workaround_row");
c.output(workaroundRow); //Workaround to http://goo.gl/CpBxEf
}
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)