尝试序列化 avro 记录时,B 无法转换为 java.nio.ByteBuffer

2023-12-24

我编写了一个小型 Java 程序,该程序应该监视目录中的新文件并将它们以 binay Avro 格式发送到 Kafka 主题。 我是 Avro 新手,我使用 Avro 文档和在线示例编写了这篇文章。 监控部分运行良好,但程序在运行时到达 Avro 序列化时失败。我得到这个错误堆栈:

Exception in thread "main" java.lang.ClassCastException: [B cannot be cast to java.nio.ByteBuffer
    at org.apache.avro.generic.GenericDatumWriter.writeBytes(GenericDatumWriter.java:260)
    at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:116)
    at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:73)
    at org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:153)
    at org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:143)
    at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:105)
    at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:73)
    at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:60)
    at producers.AvroBinaryProducer.buildAvroData(AvroBinaryProducer.java:90)
    at producers.AvroBinaryProducer.start(AvroBinaryProducer.java:120)
    at producers.AvroBinaryProducer.main(AvroBinaryProducer.java:140)
C:\Users\guys\AppData\Local\NetBeans\Cache\8.1\executor-snippets\run.xml:53: Java returned: 1
BUILD FAILED (total time: 7 seconds)

此行失败:writer.write(datum,encoder);

看起来它需要一个 ByteBuffer,而文档和示例说我应该传递 GenericRecord。我究竟做错了什么 ?

这是我的代码(还有另一个名为 Config 的实用程序类,它从文件中读取配置参数,但我没有将其包含在此处):

package producers;

import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.file.FileSystems;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.WatchService;
import java.util.Properties;
import org.apache.avro.Schema;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import static java.nio.file.StandardWatchEventKinds.*;
import java.nio.file.WatchEvent;
import java.nio.file.WatchKey;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.BinaryEncoder;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.io.EncoderFactory;


/**
 *
 * @author guys
 */
public class AvroBinaryProducer {
    String mySchema;
    Schema avroSchema;
    Config myConf;  
    Producer<String, byte[]> producer;
    String topic, bootstrapServers, watchDir; 
    Path path;
    ByteArrayOutputStream out;
    BinaryEncoder encoder;


    public AvroBinaryProducer(String configPath) throws IOException
    {
        // Read initial configuration
        myConf=new Config(configPath);

        // first setting the kafka producer stuff
        Properties props = new Properties();   
        props.put("bootstrap.servers",myConf.get("bootstrap.servers"));        
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        producer = new KafkaProducer<>(props);
        topic=myConf.get("topic"); 
        watchDir=myConf.get("watchdir");
        path=FileSystems.getDefault().getPath(watchDir);

        // Now define the Avro schema
        mySchema="{\n" +
        " \"type\": \"record\",\n" +
        " \"name\": \"photo\",\n" +
        " \"fields\": [\n" +
        "     {\"name\": \"name\", \"type\": \"string\"},\n" +
        "     {\"name\": \"data\",  \"type\": \"bytes\"}\n" +
        " ]\n" +
        "}";

        Schema.Parser parser = new Schema.Parser();
        avroSchema=parser.parse(mySchema);   

        out = new ByteArrayOutputStream();
        encoder = EncoderFactory.get().binaryEncoder( out, null );


    }

    private byte[] buildAvroData(String name, byte[] data) throws IOException
    {       
        out.reset();                       
        GenericRecord datum=new GenericData.Record(avroSchema);        
        datum.put("name", name);
        datum.put("data",data);
        DatumWriter<GenericRecord> writer=new GenericDatumWriter<>(avroSchema);        
        writer.write(datum,encoder);
    encoder.flush();
        return out.toByteArray();        
    }

    private void start() throws IOException, InterruptedException
    {
        String fileName;
        byte[] fileData;       

        WatchService watcher = FileSystems.getDefault().newWatchService();
        WatchKey key=path.register(watcher, ENTRY_CREATE);

        while (true)
        {
            key = watcher.take();
            // The code gets beyond this point only when a filesystem event occurs

            for (WatchEvent<?> event: key.pollEvents()) 
            {
                WatchEvent.Kind<?> kind = event.kind();
                if (kind==ENTRY_CREATE)
                {
                    WatchEvent<Path> ev = (WatchEvent<Path>)event;
                    Path filename = ev.context();
                    fileName=filename.toString();
                    System.out.println("New file "+fileName+" found !");
                    // We need this little delay to make sure the file is closed before we read it
                    Thread.sleep(500);
                    fileData=Files.readAllBytes(FileSystems.getDefault().getPath(watchDir+File.separator+fileName));
                    publishMessage(buildAvroData(fileName,fileData));
                }
            }
            key.reset();
        }
    }

    private void publishMessage(byte[] bytes) 
    {        
        ProducerRecord <String, byte[]> data =new ProducerRecord<>(topic, bytes);
        producer.send(data);

    }

    public static void main (String args[])
    {
        AvroBinaryProducer abp;
        try {
            abp=new AvroBinaryProducer(args[0]);
            try {
                abp.start();
            } catch (InterruptedException ex) {
                Logger.getLogger(AvroBinaryProducer.class.getName()).log(Level.SEVERE, null, ex);
            }
        } catch (IOException ex) {
            Logger.getLogger(AvroBinaryProducer.class.getName()).log(Level.SEVERE, null, ex);
        }
    }
}

谢谢 !


我就是这样解决的。如果它需要 ByteBuffer,那么就给它 ByteBuffer。 我将函数更改为:

private byte[] buildAvroData(String name, byte[] data) throws IOException
{       
    out.reset(); 
    GenericRecord datum=new GenericData.Record(avroSchema);        
    datum.put("name", name);
    datum.put("data",ByteBuffer.wrap(data));
    DatumWriter<GenericRecord> writer=new GenericDatumWriter<>(avroSchema);        
    writer.write(datum,encoder);
encoder.flush();
    return out.toByteArray(); 

我只是用 ByteBuffer 包装了数据,这很有效。 您必须记住从消费者端的 ByteBuffer 中提取字节数组。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

尝试序列化 avro 记录时,B 无法转换为 java.nio.ByteBuffer 的相关文章

  • Grails 3.x bootRun 失败

    我正在尝试在 grails 3 1 11 中运行一个项目 但出现错误 失败 构建失败并出现异常 什么地方出了错 任务 bootRun 执行失败 进程 命令 C Program Files Java jdk1 8 0 111 bin java
  • 如何为最终用户方便地启动Java GUI程序

    用户想要从以下位置启动 Java GUI 应用程序Windows 以及一些额外的 JVM 参数 例如 javaw Djava util logging config file logging properties jar MyGUI jar
  • 制作一个交互式Windows服务

    我希望我的 Java 应用程序成为交互式 Windows 服务 用户登录时具有 GUI 的 Windows 服务 我搜索了这个 我发现这样做的方法是有两个程序 第一个是服务 第二个是 GUI 程序并使它们进行通信 服务将从 GUI 程序获取
  • JAXb、Hibernate 和 beans

    目前我正在开发一个使用 Spring Web 服务 hibernate 和 JAXb 的项目 1 我已经使用IDE hibernate代码生成 生成了hibernate bean 2 另外 我已经使用maven编译器生成了jaxb bean
  • Android:捕获的图像未显示在图库中(媒体扫描仪意图不起作用)

    我遇到以下问题 我正在开发一个应用程序 用户可以在其中拍照 附加到帖子中 并将图片保存到外部存储中 我希望这张照片也显示在图片库中 并且我正在使用媒体扫描仪意图 但它似乎不起作用 我在编写代码时遵循官方的Android开发人员指南 所以我不
  • INSERT..RETURNING 在 JOOQ 中不起作用

    我有一个 MariaDB 数据库 我正在尝试在表中插入一行users 它有一个生成的id我想在插入后得到它 我见过this http www jooq org doc 3 8 manual sql building sql statemen
  • Mockito when().thenReturn 不必要地调用该方法

    我正在研究继承的代码 我编写了一个应该捕获 NullPointerException 的测试 因为它试图从 null 对象调用方法 Test expected NullPointerException class public void c
  • Spring @RequestMapping 带有可选参数

    我的控制器在请求映射中存在可选参数的问题 请查看下面的控制器 GetMapping produces MediaType APPLICATION JSON VALUE public ResponseEntity
  • 禁止的软件包名称:java

    我尝试从数据库名称为 jaane 用户名 Hello 和密码 hello 获取数据 错误 java lang SecurityException Prohibited package name java at java lang Class
  • Java按日期升序对列表对象进行排序[重复]

    这个问题在这里已经有答案了 我想按一个参数对对象列表进行排序 其日期格式为 YYYY MM DD HH mm 按升序排列 我找不到正确的解决方案 在 python 中使用 lambda 很容易对其进行排序 但在 Java 中我遇到了问题 f
  • 在两个活动之间传输数据[重复]

    这个问题在这里已经有答案了 我正在尝试在两个不同的活动之间发送和接收数据 我在这个网站上看到了一些其他问题 但没有任何问题涉及保留头等舱的状态 例如 如果我想从 A 类发送一个整数 X 到 B 类 然后对整数 X 进行一些操作 然后将其发送
  • 总是使用 Final?

    我读过 将某些东西做成最终的 然后在循环中使用它会带来更好的性能 但这对一切都有好处吗 我有很多地方没有循环 但我将 Final 添加到局部变量中 它会使速度变慢还是仍然很好 还有一些地方我有一个全局变量final 例如android Pa
  • 加密 JBoss 配置中的敏感信息

    JBoss 中的标准数据源配置要求数据库用户的用户名和密码位于 xxx ds xml 文件中 如果我将数据源定义为 c3p0 mbean 我会遇到同样的问题 是否有标准方法来加密用户和密码 保存密钥的好地方是什么 这当然也与 tomcat
  • Java执行器服务线程池[关闭]

    很难说出这里问的是什么 这个问题是含糊的 模糊的 不完整的 过于宽泛的或修辞性的 无法以目前的形式得到合理的回答 如需帮助澄清此问题以便重新打开 访问帮助中心 help reopen questions 如果我使用 Executor 框架在
  • Android 中麦克风的后台访问

    是否可以通过 Android 手机上的后台应用程序 服务 持续监控麦克风 我想做的一些想法 不断聆听背景中的声音信号 收到 有趣的 音频信号后 执行一些网络操作 如果前台应用程序需要的话 后台应用程序必须能够智能地放弃对麦克风的访问 除非可
  • 声明的包“”与预期的包不匹配

    我可以编译并运行我的代码 但 VSCode 中始终显示错误 早些时候有一个弹出窗口 我不记得是什么了 我点击了 全局应用 从那以后一直是这样 Output is there but so is the error The declared
  • 获取 JVM 上所有引导类的列表?

    有一种方法叫做findBootstrapClass对于一个类加载器 如果它是引导的 则返回一个类 有没有办法找到类已经加载了 您可以尝试首先通过例如获取引导类加载器呼叫 ClassLoader bootstrapLoader ClassLo
  • 编译器抱怨“缺少返回语句”,即使不可能达到缺少返回语句的条件

    在下面的方法中 编译器抱怨缺少退货声明即使该方法只有一条路径 并且它包含一个return陈述 抑制错误需要另一个return陈述 public int foo if true return 5 鉴于Java编译器可以识别无限循环 https
  • 有没有办法为Java的字符集名称添加别名

    我收到一个异常 埋藏在第 3 方库中 消息如下 java io UnsupportedEncodingException BIG 5 我认为发生这种情况是因为 Java 没有定义这个名称java nio charset Charset Ch
  • 如何实现仅当可用内存较低时才将数据交换到磁盘的写缓存

    我想将应用程序生成的数据缓存在内存中 但如果内存变得稀缺 我想将数据交换到磁盘 理想情况下 我希望虚拟机通知它需要内存并将我的数据写入磁盘并以这种方式释放一些内存 但我没有看到任何方法以通知我的方式将自己挂接到虚拟机中before an O

随机推荐

  • closeAllDocumentsWithDelegate 的正确参数

    我正在编写一个应用程序 需要在一个窗口中包含多个文档 正如所询问的那样here https stackoverflow com questions 1116886 multiple documents in a single window
  • eBay API - 检查Finding API调用计数?

    感谢该页面 https go developer ebay com api call limits https go developer ebay com api call limits我们知道 eBay 允许每个 eBay 开发者帐户每天
  • 如何编写涉及间接继承的Checkstyle自定义检查?

    我们需要编写一个 checkstyle 自定义检查来验证直接或间接继承自某个类 A 的类的特定条件 是否可以使用 checkstyle API 来识别间接继承 例如 假设我们有 C类 扩展 gt B类 B类 扩展 gt A类 在这种情况下
  • 指向结构成员的指针[关闭]

    这个问题不太可能对任何未来的访客有帮助 它只与一个较小的地理区域 一个特定的时间点或一个非常狭窄的情况相关 通常不适用于全世界的互联网受众 为了帮助使这个问题更广泛地适用 访问帮助中心 help reopen questions 我正在尝试
  • 手机是否仅使用 GPS 来获取其位置?

    我正在进行测试 我们正在比较 Android 手机和 GPS 设备的 GPS 位置 我们希望将其集成到我们的硬件中 但为了使测试准确 手机只需要使用 GPS 而不是手机信号塔和 WiFi 这是代码 我在其中设置手机使用哪种服务 Locati
  • 仅当调试器运行时防伪令牌异常

    我有一个 mvc2 应用程序 它通过 iframe 提供内容 iframed 页面只是一个表单 它有一个请求验证令牌 在开发人员使用 Visual Studio 2005 调试器之前 一切都可以跨域正常运行 一旦他们这样做 我就会收到以下错
  • 如何在本地安装 Haskell Stack?

    我正在学校服务器上工作 我需要安装 Haskell 的堆栈 在里面README https github com commercialhaskell stack blob master README md文件并在website https
  • iOS 应用程序开发建议。应用程序进入后台

    我正在享受 swift 的乐趣 并且正在尝试制作一个简单的游戏 我得到了一些在游戏过程中发生变化的变量 如果出现以下情况 保存这些变量的最佳实践是什么applicationDidEnterBackground对于所有其他功能appDeleg
  • 将矩阵并排放置以创建另一个矩阵

    我有一个由 12 个矩阵组成的数组 由以下代码给出 ma array sample 0 127 3 4 6 replace TRUE c 3 4 12 让他们被命名为A B C L 我想创建一个矩阵 其中上面的矩阵以 4 行 3 列的模式排
  • 应用程序被最近任务杀死后如何重新启动服务

    我创建了一项服务来定期获取设备的当前位置 我希望该服务在后台运行 即使该应用程序已从最近打开的应用程序中清除 目前 该服务仅在后台运行 直到应用程序出现在最近打开的应用程序中 但当应用程序被刷掉 或以其他方式终止 时 该服务会立即停止 我已
  • Numpy sum keepdims 错误

    Python 在矩阵上调用 numpy sum 函数时会抛出错误 probs exp scores np sum exp scores axis 1 keepdims True 错误 probs exp scores np sum exp
  • 奇怪的javascript变量重新赋值问题

    我有一个变量叫做data 它将数组带入函数 然后我决定继续仅使用数组的第一个元素 data 0 如果我执行以下任一操作 会有什么不同吗 重用 替换数组变量名data通过为其分配第一个元素 这将是 data data 0 并继续使用data
  • 如何禁用从移动浏览器上传文件的相机选项?

    我正在使用文件类型输入 它应该接受 pdf doc 和 docx 文件格式 所以我添加了以下输入标签
  • ViewCompat.setOnApplyWindowInsetsListener() 更改系统导航栏的背景

    应用程序通常有这样的导航栏 但是当我添加ViewCompat setOnApplyWindowInsetsListener ViewCompat setOnApplyWindowInsetsListener window decorView
  • 用户访问通讯录时崩溃报告

    在我的应用程序中 Crashlytics 用于收集用户的崩溃报告 这是来自用户的一份崩溃报告 这可能取决于用户的联系信息 我无法重现崩溃 因为我不知道他 她的联系人中有什么 有人对这种情况有想法吗 com apple root defaul
  • MongoDB:如何将嵌套数组分组到一个文档中?

    我有以下收藏 id 23423 dsfsdf 32423 name Proj1 services id sdfs 24423 sdf name P1 Service1 products id sdfs 24jhh sdf name P1 S
  • dart中的完整路径和相对路径有什么区别

    我开发了一个 flutter 应用程序 在 model 包中定义了多个模型 然后我声明一个类Example例如 在 模型 中 模型 示例 dart class Example override String toString return
  • C# 如何在控制台应用程序中制作水平条形图

    我需要制作一个水平条形图来表示直方图字典中数字的出现情况 我尝试过使用 Console BackgroundColor 但是 这显然只会使线条背景色变成蓝色 static void Main string args string Speac
  • 为什么 '\97' ascii 值等于 55

    就像C code include
  • 尝试序列化 avro 记录时,B 无法转换为 java.nio.ByteBuffer

    我编写了一个小型 Java 程序 该程序应该监视目录中的新文件并将它们以 binay Avro 格式发送到 Kafka 主题 我是 Avro 新手 我使用 Avro 文档和在线示例编写了这篇文章 监控部分运行良好 但程序在运行时到达 Avr