我定义了一个类来包装com.google.api.services.bigquery.model.TableRow
类将其定义为内部成员
public class TableRowWrapper implements Serializable {
private TableRow tableRow;
public TableRowWrapper() {
}
...
}
我也有一些DoFn
处理该输入/输出实例TableRowWrapper
类导致PCollection<TableRowWrapper>
。我尝试用注释该类@DefaultCoder(SerializableCoder.class)
and @DefaultCoder(ArvoCoder.class)
但它总是无法编码,因为它找不到成员属性实例的编码器TableRow
。
这是使用时的示例ArvoCoder
java.lang.IllegalArgumentException: Unable to encode element 'com.test.bigquery.api.TableRowWrapper@5129e8a6' with coder 'AvroCoder'.
at com.google.cloud.dataflow.sdk.coders.StandardCoder.getEncodedElementByteSize(StandardCoder.java:177)
at com.google.cloud.dataflow.sdk.coders.StandardCoder.registerByteSizeObserver(StandardCoder.java:191)
at com.google.cloud.dataflow.sdk.util.WindowedValue$FullWindowedValueCoder.registerByteSizeObserver(WindowedValue.java:633)
at com.google.cloud.dataflow.sdk.util.WindowedValue$FullWindowedValueCoder.registerByteSizeObserver(WindowedValue.java:542)
at com.google.cloud.dataflow.sdk.runners.worker.MapTaskExecutorFactory$ElementByteSizeObservableCoder.registerByteSizeObserver(MapTaskExecutorFactory.java:429)
at com.google.cloud.dataflow.sdk.util.common.worker.OutputObjectAndByteCounter.update(OutputObjectAndByteCounter.java:115)
at com.google.cloud.dataflow.sdk.runners.worker.DataflowOutputCounter.update(DataflowOutputCounter.java:61)
at com.google.cloud.dataflow.sdk.util.common.worker.OutputReceiver.process(OutputReceiver.java:46)
at com.google.cloud.dataflow.sdk.runners.worker.ParDoFnBase$1.output(ParDoFnBase.java:157)
at com.google.cloud.dataflow.sdk.util.DoFnRunner$DoFnContext.outputWindowedValue(DoFnRunner.java:329)
at com.google.cloud.dataflow.sdk.util.DoFnRunner$DoFnProcessContext.output(DoFnRunner.java:483)
at com.test.cdf.wrapper.pipeline.DataflowPipeline$TableRowToWrapperDoFn.processElement(DataflowPipeline.java:203)
Caused by: java.lang.NullPointerException: in com.test.bigquery.api.TableRowWrapper in com.google.api.services.bigquery.model.TableRow in array null of array in field f of com.google.api.services.bigquery.model.TableRow in field tableRow of com.test.bigquery.api.TableRowWrapper
at org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:145)
at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:58)
at com.google.cloud.dataflow.sdk.coders.AvroCoder.encode(AvroCoder.java:227)
at com.google.cloud.dataflow.sdk.coders.StandardCoder.getEncodedElementByteSize(StandardCoder.java:174)
at com.google.cloud.dataflow.sdk.coders.StandardCoder.registerByteSizeObserver(StandardCoder.java:191)
at com.google.cloud.dataflow.sdk.util.WindowedValue$FullWindowedValueCoder.registerByteSizeObserver(WindowedValue.java:633)
at com.google.cloud.dataflow.sdk.util.WindowedValue$FullWindowedValueCoder.registerByteSizeObserver(WindowedValue.java:542)
at com.google.cloud.dataflow.sdk.runners.worker.MapTaskExecutorFactory$ElementByteSizeObservableCoder.registerByteSizeObserver(MapTaskExecutorFactory.java:429)
at com.google.cloud.dataflow.sdk.util.common.worker.OutputObjectAndByteCounter.update(OutputObjectAndByteCounter.java:115)
at com.google.cloud.dataflow.sdk.runners.worker.DataflowOutputCounter.update(DataflowOutputCounter.java:61)
at com.google.cloud.dataflow.sdk.util.common.worker.OutputReceiver.process(OutputReceiver.java:46)
at com.google.cloud.dataflow.sdk.runners.worker.ParDoFnBase$1.output(ParDoFnBase.java:157)
at com.google.cloud.dataflow.sdk.util.DoFnRunner$DoFnContext.outputWindowedValue(DoFnRunner.java:329)
at com.google.cloud.dataflow.sdk.util.DoFnRunner$DoFnProcessContext.output(DoFnRunner.java:483)
at com.test.cdf.wrapper.pipeline.DataflowPipeline$TableRowToWrapperDoFn.processElement(DataflowPipeline.java:203)
at com.google.cloud.dataflow.sdk.util.DoFnRunner.invokeProcessElement(DoFnRunner.java:189)
at com.google.cloud.dataflow.sdk.util.DoFnRunner.processElement(DoFnRunner.java:171)
at com.google.cloud.dataflow.sdk.runners.worker.ParDoFnBase.processElement(ParDoFnBase.java:193)
at com.google.cloud.dataflow.sdk.util.common.worker.ParDoOperation.process(ParDoOperation.java:52)
at com.google.cloud.dataflow.sdk.util.common.worker.OutputReceiver.process(OutputReceiver.java:52)
at com.google.cloud.dataflow.sdk.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:171)
at com.google.cloud.dataflow.sdk.util.common.worker.ReadOperation.start(ReadOperation.java:117)
at com.google.cloud.dataflow.sdk.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:66)
at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorker.executeWork(DataflowWorker.java:234)
at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorker.doWork(DataflowWorker.java:171)
at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorker.getAndPerformWork(DataflowWorker.java:137)
at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerHarness$WorkerThread.call(DataflowWorkerHarness.java:147)
at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerHarness$WorkerThread.call(DataflowWorkerHarness.java:132)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.NullPointerException
at org.apache.avro.reflect.ReflectDatumWriter.writeArray(ReflectDatumWriter.java:67)
at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:68)
at org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:143)
at org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:114)
at org.apache.avro.reflect.ReflectDatumWriter.writeField(ReflectDatumWriter.java:175)
at org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:104)
at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:66)
at org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:143)
at org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:114)
at org.apache.avro.reflect.ReflectDatumWriter.writeField(ReflectDatumWriter.java:175)
at org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:104)
at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:66)
at org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:143)
... 31 more
我如何为这个类定义一个编码器?
正如你所注意到的,自从TableRow is not Serializable,你将无法使用SerializableCoder.
为了对可为 null 的值进行编码,Avro 的自动模式生成需要显式联合模式包括 null via @AvroSchema注释或@Nullable注释——具体来说org.apache.avro.reflect.Nullable not javax.annotation.Nullable
。这些不存在于TableRow, so AvroCoder也是不适用的。
也许为您提供编码器的最简单方法TableRowWrapper
是直接通过薄包装来做到这一点TableRowJsonCoder:
class TableRowWrapperCoder extends CustomCoder<TableRowWrapper> {
private static final Coder<TableRow> tableRowCoder = TableRowJsonCoder.of();
@Override
public void encode(TableRowWrapper value, OutputStream outStream, Context context)
throws IOException {
tableRowCoder.encode(value.getRow(), outStream, context);
}
@Override
public TableRowWrapper decode(InputStream inStream, Context context)
throws IOException {
return new TableRowWrapper(tableRowCoder.decode(inStream, context));
}
...
}
您可以通过以下方式为整个管道注册此编码器
pipeline.getCoderRegistry()
.registerCoder(TableRowWrapper.class, new TableRowWrapperCoder());
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)