如何为包装 TableRow 的类指定/定义编码器

2023-12-05

我定义了一个类来包装com.google.api.services.bigquery.model.TableRow类将其定义为内部成员

public class TableRowWrapper implements Serializable {

    private TableRow tableRow;

    public TableRowWrapper() {
    } 
...
}

我也有一些DoFn处理该输入/输出实例TableRowWrapper类导致PCollection<TableRowWrapper>。我尝试用注释该类@DefaultCoder(SerializableCoder.class) and @DefaultCoder(ArvoCoder.class)但它总是无法编码,因为它找不到成员属性实例的编码器TableRow。 这是使用时的示例ArvoCoder

 java.lang.IllegalArgumentException: Unable to encode element 'com.test.bigquery.api.TableRowWrapper@5129e8a6' with coder 'AvroCoder'.
    at com.google.cloud.dataflow.sdk.coders.StandardCoder.getEncodedElementByteSize(StandardCoder.java:177)
    at com.google.cloud.dataflow.sdk.coders.StandardCoder.registerByteSizeObserver(StandardCoder.java:191)
    at com.google.cloud.dataflow.sdk.util.WindowedValue$FullWindowedValueCoder.registerByteSizeObserver(WindowedValue.java:633)
    at com.google.cloud.dataflow.sdk.util.WindowedValue$FullWindowedValueCoder.registerByteSizeObserver(WindowedValue.java:542)
    at com.google.cloud.dataflow.sdk.runners.worker.MapTaskExecutorFactory$ElementByteSizeObservableCoder.registerByteSizeObserver(MapTaskExecutorFactory.java:429)
    at com.google.cloud.dataflow.sdk.util.common.worker.OutputObjectAndByteCounter.update(OutputObjectAndByteCounter.java:115)
    at com.google.cloud.dataflow.sdk.runners.worker.DataflowOutputCounter.update(DataflowOutputCounter.java:61)
    at com.google.cloud.dataflow.sdk.util.common.worker.OutputReceiver.process(OutputReceiver.java:46)
    at com.google.cloud.dataflow.sdk.runners.worker.ParDoFnBase$1.output(ParDoFnBase.java:157)
    at      com.google.cloud.dataflow.sdk.util.DoFnRunner$DoFnContext.outputWindowedValue(DoFnRunner.java:329)
    at com.google.cloud.dataflow.sdk.util.DoFnRunner$DoFnProcessContext.output(DoFnRunner.java:483)
    at   com.test.cdf.wrapper.pipeline.DataflowPipeline$TableRowToWrapperDoFn.processElement(DataflowPipeline.java:203)
Caused by: java.lang.NullPointerException: in com.test.bigquery.api.TableRowWrapper in com.google.api.services.bigquery.model.TableRow in array null of array in field f of com.google.api.services.bigquery.model.TableRow in field tableRow of com.test.bigquery.api.TableRowWrapper
    at org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:145)
    at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:58)
    at com.google.cloud.dataflow.sdk.coders.AvroCoder.encode(AvroCoder.java:227)
    at com.google.cloud.dataflow.sdk.coders.StandardCoder.getEncodedElementByteSize(StandardCoder.java:174)
    at com.google.cloud.dataflow.sdk.coders.StandardCoder.registerByteSizeObserver(StandardCoder.java:191)
    at com.google.cloud.dataflow.sdk.util.WindowedValue$FullWindowedValueCoder.registerByteSizeObserver(WindowedValue.java:633)
    at com.google.cloud.dataflow.sdk.util.WindowedValue$FullWindowedValueCoder.registerByteSizeObserver(WindowedValue.java:542)
    at com.google.cloud.dataflow.sdk.runners.worker.MapTaskExecutorFactory$ElementByteSizeObservableCoder.registerByteSizeObserver(MapTaskExecutorFactory.java:429)
    at com.google.cloud.dataflow.sdk.util.common.worker.OutputObjectAndByteCounter.update(OutputObjectAndByteCounter.java:115)
    at com.google.cloud.dataflow.sdk.runners.worker.DataflowOutputCounter.update(DataflowOutputCounter.java:61)
    at com.google.cloud.dataflow.sdk.util.common.worker.OutputReceiver.process(OutputReceiver.java:46)
    at com.google.cloud.dataflow.sdk.runners.worker.ParDoFnBase$1.output(ParDoFnBase.java:157)
    at com.google.cloud.dataflow.sdk.util.DoFnRunner$DoFnContext.outputWindowedValue(DoFnRunner.java:329)
    at com.google.cloud.dataflow.sdk.util.DoFnRunner$DoFnProcessContext.output(DoFnRunner.java:483)
    at com.test.cdf.wrapper.pipeline.DataflowPipeline$TableRowToWrapperDoFn.processElement(DataflowPipeline.java:203)
    at com.google.cloud.dataflow.sdk.util.DoFnRunner.invokeProcessElement(DoFnRunner.java:189)
    at com.google.cloud.dataflow.sdk.util.DoFnRunner.processElement(DoFnRunner.java:171)
    at com.google.cloud.dataflow.sdk.runners.worker.ParDoFnBase.processElement(ParDoFnBase.java:193)
    at com.google.cloud.dataflow.sdk.util.common.worker.ParDoOperation.process(ParDoOperation.java:52)
    at com.google.cloud.dataflow.sdk.util.common.worker.OutputReceiver.process(OutputReceiver.java:52)
    at com.google.cloud.dataflow.sdk.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:171)
    at com.google.cloud.dataflow.sdk.util.common.worker.ReadOperation.start(ReadOperation.java:117)
    at com.google.cloud.dataflow.sdk.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:66)
    at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorker.executeWork(DataflowWorker.java:234)
    at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorker.doWork(DataflowWorker.java:171)
    at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorker.getAndPerformWork(DataflowWorker.java:137)
    at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerHarness$WorkerThread.call(DataflowWorkerHarness.java:147)
    at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerHarness$WorkerThread.call(DataflowWorkerHarness.java:132)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
    Caused by: java.lang.NullPointerException
    at org.apache.avro.reflect.ReflectDatumWriter.writeArray(ReflectDatumWriter.java:67)
    at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:68)
    at org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:143)
    at org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:114)
    at org.apache.avro.reflect.ReflectDatumWriter.writeField(ReflectDatumWriter.java:175)
    at org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:104)
    at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:66)
    at org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:143)
    at org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:114)
    at org.apache.avro.reflect.ReflectDatumWriter.writeField(ReflectDatumWriter.java:175)
    at org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:104)
    at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:66)
    at org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:143)
    ... 31 more

我如何为这个类定义一个编码器?


正如你所注意到的,自从TableRow is not Serializable,你将无法使用SerializableCoder.

为了对可为 null 的值进行编码,Avro 的自动模式生成需要显式联合模式包括 null via @AvroSchema注释或@Nullable注释——具体来说org.apache.avro.reflect.Nullable not javax.annotation.Nullable。这些不存在于TableRow, so AvroCoder也是不适用的。

也许为您提供编码器的最简单方法TableRowWrapper是直接通过薄包装来做到这一点TableRowJsonCoder:

class TableRowWrapperCoder extends CustomCoder<TableRowWrapper> {

  private static final Coder<TableRow> tableRowCoder = TableRowJsonCoder.of();

  @Override
  public void encode(TableRowWrapper value, OutputStream outStream, Context context)
      throws IOException {
    tableRowCoder.encode(value.getRow(), outStream, context);
  }

  @Override
  public TableRowWrapper decode(InputStream inStream, Context context)
      throws IOException {
    return new TableRowWrapper(tableRowCoder.decode(inStream, context));
  }

  ...
}

您可以通过以下方式为整个管道注册此编码器

pipeline.getCoderRegistry()
    .registerCoder(TableRowWrapper.class, new TableRowWrapperCoder());
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如何为包装 TableRow 的类指定/定义编码器 的相关文章

随机推荐

  • 如何访问CodeIgniter中的静态资源?

    我正在将 CodeIgniter 用于项目 文件夹结构 htdocs NewProject application 我创建了一个名为 Home 的控制器 applications controllers home php 我创建了一个视图
  • 线图中 x 轴标签居中

    我正在构建一个包含两条线的线图 一条用于高温 另一条用于低温 x 轴基于日期时间格式的一整年的天数 2014 01 01 等 然而 我在几个月内 一月 二月 三月等 更改了标签 而不是数据 问题是第一个标签 Jan 位于原点中 我想要的是将
  • Xcode 5 在创建新项目时崩溃

    我无法在 Xcode 5 中创建新项目 这适用于任何类型的项目 并且 IDE 在我点击时崩溃 没有任何错误消息Create填写项目信息后 点击新建项目按钮 我什至不知道从哪里开始寻找解决这个问题的方法 我想我上周创建了一个新项目 直到今天才
  • 如何在创建时拦截所有 Hibernate 会话(Spring / Grails 环境)

    有没有办法在创建新的 Hibernate 会话时拦截它们 我需要访问每个 Session 实例以启用带有参数的 Hibernate 过滤器 我得到的唯一解决方案涉及包装 SessionFactory 但这涉及到很多半令人讨厌的黑客行为 并且
  • 在 THREE.js 中,如何使纹理与分辨率无关并渲染而不会模糊?

    我试图将纹理应用于 THREE js 中的网格 但为了获得可接受的清晰度水平 我被迫使用比所需大得多的 PNG 尺寸高达数百像素平方 例如 如果我要使用一些简单的东西 例如八乘八的方格图案 那么最小的分辨率可能看起来像一堆点 用于纹理的代码
  • 段落标签未关闭?

    我将我的 html 代码简化为这样 div class index div p p h2 Who are we h2 wersfgse div 但是 当我运行它并打开页面源代码时 它显示 范围内没有 p 元素 但看到了 p 结束标记 它运行
  • 通过反射在 C# 中转换 System.__ComObject

    我正在尝试投射System ComObject使用反射到接口类型 我尝试过使用Convert ChangeType Object Type 但是 c 然后抛出这个错误 System InvalidCastException 对象 必须实现
  • VS2008一起调试ASP和ASP.net

    有没有什么方法可以从 Visual Studio 调试经典 ASP 和 ASP net 应用程序 以便我可以在 ASP 代码行和 ASP net 代码行上设置断点 这是针对具有较旧的经典 ASP 组件和较新的 ASP net 模块的遗留系统
  • 在jquery中使用多个id

    我制作了一个小 jquery 脚本来检查输入框值是否大于 5 但是我有 2 个带有 id 的标签 并且只有其中一个有效 div div
  • 根据类型打字稿创建对象

    免得说我有这种类型 type foo go string start string 如何动态创建一个将返回的函数 go start 在 Type Script 上 我们有什么方法可以仅基于类型动态生成空对象吗 或者这是不可能的 因为我们不能
  • 将单个 HTTP 标头的多个值添加到请求或响应的标准

    如果我想添加值列表作为 HTTP 标头 是否有标准方法可以做到这一点 我在 RFC 822 中找不到任何内容 我可以轻松理解 例如 是 逗号分隔值标准或分号分隔值 有标准吗 Example Key value1 value2 value3
  • C 中箭头运算符 (->) 的用法

    我正在读一本叫做 21 天自学 C 的书 我已经学会了 Java 和 C 所以我的学习速度要快得多 我正在阅读关于指针和 gt 箭 operator没有解释就出现了 我认为它用于调用成员和函数 就像相当于 点 运算符 但用于指针而不是成员
  • ASP.NET 中 APP_Data 文件夹的安全性

    我的 Microsoft Access DB 文件位于 APP DATA 文件夹中 我的服务器是 Windows 2003 我想知道保护此文件的最佳方法是什么 那么哪一种更安全呢 wwwroot App data 数据库 mdb or db
  • crti.o 文件丢失

    我正在使用 GNU 工具链构建一个项目 一切正常 直到我开始链接它 链接器抱怨它丢失 找不到crti o 这不是我的目标文件之一 它似乎与 libc 有关 但我不明白为什么它需要这个crti o 它不会使用库文件吗 例如libc a 我正在
  • 获取 GitHub 存储库的本地副本、跟踪更改并将更新推送回远程

    我在 GitHub 上有一个存储库 我想用对从中提取它的文件夹所做的更改来更新它 Git 新用户从使用的那一刻起就应该执行哪些步骤 不用行话或简写术语 cd directory 包括如何跟踪对任何文件所做的本地更改 以及如何将这些更改和更新
  • 一个框架。滚轮滚动放大

    我已经浏览了官方文档 但无法找到有关放大 缩小可能性的信息panorama图像 A 框架是否支持它 或者可能有一个解决方法可以阅读有关实现某些three js在它的上面 这可能是 2018 年更清洁的方式 我将 Aframe 相机的变焦限制
  • Laravel - 在标头中发送 api_token

    我正在为 Laravel 构建一个 API 我想在标头中发送 api token 而不是表单帖子 这是已经内置的东西还是我必须弄清楚如何创建自己的身份验证驱动程序 经过我自己的努力 我终于成功了 您需要首先遵循这个小教程 了解如何在 Lar
  • 从 Swift 访问 Azure 表存储

    我想从 Swift 访问 azure 表存储 制作标题的指令是here但是 我尝试构建请求但无法使其工作 let urlString https
  • 使用 PyGame 显示 PyMunk - Python

    我正在尝试学习 PyMunk 我使用了网站上的基本示例 import pymunk space pymunk Space space gravity 0 1000 body pymunk Body 1 1666 body position
  • 如何为包装 TableRow 的类指定/定义编码器

    我定义了一个类来包装com google api services bigquery model TableRow类将其定义为内部成员 public class TableRowWrapper implements Serializable