如何配置 spring-kafka 忽略格式错误的消息?

2024-01-12

我们的 Kafka 主题之一存在问题,该主题被DefaultKafkaConsumerFactory & ConcurrentMessageListenerContainer组合描述here http://docs.spring.io/spring-kafka/docs/1.2.2.RELEASE/reference/html/_reference.html#message-listener-container with a JsonDeserializer工厂使用。不幸的是,有人有点热情并在该主题上发布了一些无效消息。看来 spring-kafka 默默地无法处理第一条消息。是否可以让 spring-kafka 记录错误并继续?查看记录的错误消息,似乎 Apache kafka-clients 库应该处理这样的情况:在迭代一批消息时,其中一个或多个消息可能无法解析?

以下代码是说明此问题的示例测试用例:

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.junit.ClassRule;
import org.junit.Test;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.listener.KafkaMessageListenerContainer;
import org.springframework.kafka.listener.MessageListener;
import org.springframework.kafka.listener.config.ContainerProperties;
import org.springframework.kafka.support.SendResult;
import org.springframework.kafka.support.serializer.JsonDeserializer;
import org.springframework.kafka.support.serializer.JsonSerializer;
import org.springframework.kafka.test.rule.KafkaEmbedded;
import org.springframework.kafka.test.utils.ContainerTestUtils;
import org.springframework.util.concurrent.ListenableFuture;

import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
import static org.springframework.kafka.test.hamcrest.KafkaMatchers.hasKey;
import static org.springframework.kafka.test.hamcrest.KafkaMatchers.hasValue;

/**
 * @author jfreedman
 */
public class TestSpringKafka {
    private static final String TOPIC1 = "spring.kafka.1.t";

    @ClassRule
    public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, 1, TOPIC1);

    @Test
    public void submitMessageThenGarbageThenAnotherMessage() throws Exception {
        final BlockingQueue<ConsumerRecord<String, JsonObject>> records = createListener(TOPIC1);
        final KafkaTemplate<String, JsonObject> objectTemplate = createPublisher("json", new JsonSerializer<JsonObject>());

        sendAndVerifyMessage(records, objectTemplate, "foo", new JsonObject("foo"), 0L);

        // push some garbage text to Kafka which cannot be marshalled, this should not interrupt processing
        final KafkaTemplate<String, String> garbageTemplate = createPublisher("garbage", new StringSerializer());
        final SendResult<String, String> garbageResult = garbageTemplate.send(TOPIC1, "bar","bar").get(5, TimeUnit.SECONDS);
        assertEquals(1L, garbageResult.getRecordMetadata().offset());

        sendAndVerifyMessage(records, objectTemplate, "baz", new JsonObject("baz"), 2L);
    }

    private <T> KafkaTemplate<String, T> createPublisher(final String label, final Serializer<T> serializer) {
        final Map<String, Object> producerProps = new HashMap<>();
        producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafka.getBrokersAsString());
        producerProps.put(ProducerConfig.CLIENT_ID_CONFIG, "TestPublisher-" + label);
        producerProps.put(ProducerConfig.ACKS_CONFIG, "all");
        producerProps.put(ProducerConfig.RETRIES_CONFIG, 2);
        producerProps.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1);
        producerProps.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 5000);
        producerProps.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 5000);
        producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, serializer.getClass());
        final DefaultKafkaProducerFactory<String, T> pf = new DefaultKafkaProducerFactory<>(producerProps);
        pf.setValueSerializer(serializer);
        return new KafkaTemplate<>(pf);
    }

    private BlockingQueue<ConsumerRecord<String, JsonObject>> createListener(final String topic) throws Exception {
        final Map<String, Object> consumerProps = new HashMap<>();
        consumerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafka.getBrokersAsString());
        consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "TestConsumer");
        consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
        consumerProps.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
        consumerProps.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 15000);
        consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
        final DefaultKafkaConsumerFactory<String, JsonObject> cf = new DefaultKafkaConsumerFactory<>(consumerProps);
        cf.setValueDeserializer(new JsonDeserializer<>(JsonObject.class));
        final KafkaMessageListenerContainer<String, JsonObject> container = new KafkaMessageListenerContainer<>(cf, new ContainerProperties(topic));
        final BlockingQueue<ConsumerRecord<String, JsonObject>> records = new LinkedBlockingQueue<>();
        container.setupMessageListener((MessageListener<String, JsonObject>) records::add);
        container.setBeanName("TestListener");
        container.start();
        ContainerTestUtils.waitForAssignment(container, embeddedKafka.getPartitionsPerTopic());
        return records;
    }

    private void sendAndVerifyMessage(final BlockingQueue<ConsumerRecord<String, JsonObject>> records,
                                      final KafkaTemplate<String, JsonObject> template,
                                      final String key, final JsonObject value,
                                      final long expectedOffset) throws InterruptedException, ExecutionException, TimeoutException {
        final ListenableFuture<SendResult<String, JsonObject>> future = template.send(TOPIC1, key, value);
        final ConsumerRecord<String, JsonObject> record = records.poll(5, TimeUnit.SECONDS);
        assertThat(record, hasKey(key));
        assertThat(record, hasValue(value));
        assertEquals(expectedOffset, future.get(5, TimeUnit.SECONDS).getRecordMetadata().offset());
    }

    public static final class JsonObject {
        private String value;

        public JsonObject() {}

        JsonObject(final String value) {
            this.value = value;
        }

        public String getValue() {
            return value;
        }

        public void setValue(final String value) {
            this.value = value;
        }

        @Override
        public boolean equals(final Object o) {
            if (this == o) { return true; }
            if (o == null || getClass() != o.getClass()) { return false; }
            final JsonObject that = (JsonObject) o;
            return Objects.equals(value, that.value);
        }

        @Override
        public int hashCode() {
            return Objects.hash(value);
        }

        @Override
        public String toString() {
            return "JsonObject{" +
                    "value='" + value + '\'' +
                    '}';
        }
    }
}

我有一个解决方案,但我不知道它是否是最好的,我扩展了JsonDeserializer如下,结果是null价值被 spring-kafka 消耗,并且需要必要的下游更改来处理这种情况。

class SafeJsonDeserializer[A >: Null](targetType: Class[A], objectMapper: ObjectMapper) extends JsonDeserializer[A](targetType, objectMapper) with Logging {
  override def deserialize(topic: String, data: Array[Byte]): A = try {
    super.deserialize(topic, data)
  } catch {
    case e: Exception =>
      logger.error("Failed to deserialize data [%s] from topic [%s]".format(new String(data), topic), e)
      null
  }
}
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如何配置 spring-kafka 忽略格式错误的消息? 的相关文章

  • 在 Unix 上,我应该对 errno 使用 system_category 还是 generic_category ?

    C 0x 有两个预定义的error category对象 generic category and system category 据我目前所知 system category 应用于操作系统返回的错误 并且generic category
  • 处理 ANTLR 4 中的错误

    遵循后接受的答案 https stackoverflow com a 18137301 2279200的指示处理 ANTLR4 中的错误 https stackoverflow com q 18132078 2279200问题 我遇到了以下
  • 卡夫卡流:RocksDB TTL

    据我了解 默认 TTL 设置为无穷大 非正数 但是 如果我们需要在存储中保留数据最多 2 天 我们可以使用 RocksDBConfigSetter 接口实现 即 options setWalTtlSeconds 172800 进行覆盖吗 或
  • 更新面板异常处理

    当我在正在构建的 ASP NET Web 应用程序中实现的 UpdatePanel 中发生异常时 它们会导致页面上出现 JavaScript 错误 并在警报中提供一些高级错误输出 这对于开发来说还可以 但是一旦系统投入生产 由于多种原因 这
  • Kafka Java Consumer 已关闭

    我刚刚开始使用卡夫卡 我面临着消费者的一个小问题 我用Java写了一个消费者 我收到此异常 IllegalStateException 此消费者已关闭 我在以下行中遇到异常 ConsumerRecords
  • 如何告诉 Java SAX 解析器忽略无效字符引用?

    当尝试使用字符引用解析不正确的 XML 时 例如 x1 Java 的 SAX 解析器因致命错误而惨死 例如 org xml sax SAXParseException Character reference x1 is an invalid
  • Kafka 中的内部和外部通信

    流动 本地 gt 代理 gt Kafka advertised listeners PLAINTEXT proxyhostname 8080 for external communication listeners PLAINTEXT 90
  • 创建 Kafka 主题导致没有领导者

    我正在使用 Kafka v0 9 0 1 Scala v2 11 和com 101tec zkclientv0 7 我正在尝试使用AdminUtils创建一个kafka主题 我的代码如下 String zkServers node1 218
  • Kotlin 协程 - 优雅地处理挂起函数中的错误

    尝试使用从异步方法调用的挂起函数来实现对错误的优雅处理 如何捕获挂起方法引发的错误 suspend fun findById id Long User throw Exception my exception intentionally t
  • 如何检测java中的消费者是否无法使用kafka代理?

    我有一个简单的 Java Kafka 消费者 如果 Kafka 代理不可用 我试图捕获异常 我需要它来中断线程 我有这样的代码 KafkaConsumer
  • VB6中VarType返回的vbError是什么?

    我一直在网上搜索这个 但一切都在谈论ErrObject类 或常量vbError由返回VarType功能 我想知道那个类型实际上是什么is 例如一个integer是这样的4 a string是这样的 hello world etc 对于一点背
  • 如何删除 Apache Kafka 中的主题? [复制]

    这个问题在这里已经有答案了 我需要删除 Kafka 0 8 2 2 3 中的一个主题 我使用以下命令删除主题 bin kafka topics sh zookeeper localhost 2181 delete topic DummyTo
  • Kafka JDBC Sink Connector,批量插入值

    我每秒收到很多消息 通过 http 协议 50000 100000 并希望将它们保存到 PostgreSql 我决定使用 Kafka JDBC Sink 来实现此目的 消息以一条记录保存到数据库 而不是批量保存 我想在 PostgreSQL
  • Javascript:如何捕获使用 window.location.href = url 导航到的页面上的错误

    我正在使用 REST 服务生成一个 CSV 文件 我想提示用户下载该文件 该服务的示例如下 https localhost 8444 websvc exportCSV viewId 93282392 为了提示用户下载文件 我使用以下代码 w
  • 如何使用PySpark结构流+Kafka

    我尝试将 Spark 结构流与 kafka 一起使用 并且在使用 Spark 提交时遇到问题 消费者仍然从生产中接收数据 但 Spark 结构出错 请帮我找到我的代码的问题 这是我在 test py 中的代码 from kafka impo
  • Spring @ControllerAdvice 与 ErrorController

    在我的 REST 服务应用程序中 我计划创建一个 ControllerAdvice捕获控制器抛出的异常并返回的类ResponseEntity根据错误类型的对象 但我已经有一个 RestController类实现ErrorController
  • 使用 Elmah 进行异常处理

    我用 Elmah 记录异常 想知道我使用的技术是否是好的设计 现在 我捕获并重新抛出各种类和方法中发生的异常 并将它们记录到程序的主 try catch 块中的 Elmah 主程序 try Some code that fires off
  • 是否有用于事件驱动的 Kafka 消费者的 Python API?

    我一直在尝试构建一个以 Kafka 作为唯一界面的 Flask 应用程序 因此 我希望有一个 Kafka 消费者 当相关主题的流中存在新消息时 该消费者会被触发 并通过将消息推回到 Kafka 流来进行响应 我一直在寻找类似 Spring
  • 获取:导入 Spark 模块时出错:没有名为“pyspark.streaming.kafka”的模块

    我需要将从 pyspark 脚本创建的日志推送到 kafka 我正在做 POC 所以在 Windows 机器上使用 Kafka 二进制文件 我的版本是 kafka 2 4 0 spark 3 0 和 python 3 8 1 我正在使用 p
  • 当异常抛出到路由之外时,如何在 Slim 框架中传递错误页面?

    我正在尝试了解处理 Slim 框架应用程序中抛出的异常和最终页面交付的操作顺序 基本上 如果我在类中抛出异常 我希望 Slim 提供漂亮的 Twig 500 页面 但当在路由之外抛出异常时 我什至无法让 Slim 提供自己的正常错误页面 给

随机推荐

  • Java机器人鼠标移动:设置速度?

    Java Robot 类允许人们移动鼠标 就像移动实际的物理鼠标一样 然而 如何以一种人性化 而非即时 的方式将鼠标从 Point1 移动到 Point2 又名 如何设置移动速度 如果Robot类不可能达到这样的速度 那么如果鼠标只能瞬时移
  • 如何解决 FATAL:超出非超级用户的连接限制

    我写了一个用于批量插入的java代码 我使用复制命令为不同的表导入和创建不同的连接对象 但在执行时 程序抛出以下错误 FATAL connection limit exceeded for non superusers 您已超出 Postg
  • 如何防止弹出基本身份验证表单

    我有一个 Java 应用程序 JSF 它使用 javascript 连接到需要基本身份验证的网站 我想要配合的事情与我在弹出表单中输入用户名和密码时发生的事情完全相同 我已经尝试了许多关于该主题的不同方法 但没有一个有效 奇怪的是 ajax
  • 以编程方式单击 jetpack compose 中的文本字段

    有没有一种方法可以以编程方式单击文本字段 以便当我的搜索屏幕弹出时 它会自动单击文本字段并弹出键盘 或者 有没有办法知道文本字段的触摸事件 With 1 0 x您可以将焦点放在该组件上 就像是 var text by remember mu
  • 使列等高 - 通过嵌套

    我的设计使用两个外部列 并在标题部分的外部列之一和下面的另外两列中 如下所示 header out1 out2 footer
  • MockMVC 对异步服务执行后期测试

    我需要测试调用异步服务的控制器 控制器代码 RequestMapping value path method RequestMethod POST produces MediaType APPLICATION JSON VALUE Resp
  • 如何在 Spark 中以小块形式迭代大型 Cassandra 表

    在我的测试环境中 我有 1 个 Cassandra 节点和 3 个 Spark 节点 我想迭代大约有 200k 行的明显大表 每行大约占用 20 50KB CREATE TABLE foo uid timeuuid events blob
  • 如何将 MIDAS.DLL 嵌入客户端可执行文件中

    据博士说 鲍勃 这是可以做到的 有人可以提供分步示例或教程吗 你没有嵌入MIDAS DLL 你添加MidasLib到你的项目的USES条款 这会将基本功能嵌入到您的程序中 而无需依赖单独的 DLL
  • 服务器控制行为异常

    我有一个我编写的服务器控件 通常工作正常 但是 当我添加突出显示的行时 它添加的不是一个而是两个 br 元素 这不是我所追求的 mounting new DropDownLabel mounting ID mountTypeList mou
  • 是否可以在 Adob​​e Flex 中执行#define?

    我正在寻找一种方法来执行类似于 adobe flex 中的 c c define 的操作 我希望项目构建可以采用许多不同的路径 具体取决于是否定义了某些内容 Flex 中存在这样的东西吗 我知道有一些方法可以设置全局变量 但这并不真正适合我
  • 反序列化Bson文件

    我有一个用 mongodump 工具生成的 Bson 文件 我想在 C 代码中反序列化 为此 我似乎可以使用 mongodb C 驱动程序或 Json net 库 我尝试了它们 但我无法让它们工作 使用 Json net 库 input 是
  • MySQL Sum() 多列

    我有一张学生记分卡表 这是桌子 subject mark1 mark2 mark3 markn stud1 99 87 92 46 stud2 studn 现在 我需要对每个学生的总分进行求和 我通过使用得到它sum mark1 mark2
  • MySQL 数据透视表列数据作为行

    我正在努力寻找解决这个 MySQL 问题的方法 我似乎不知道该怎么做 我有以下表格 Question table id question 1 Is it this 2 Or this 3 Or that Results Table id u
  • android 使用外部 java 库时出现 java lang verifyerror

    我在我的 android 项目中使用外部库 在调用导入该库的类时出现 javalang verify 错误 它是 java jxl 库 请提供帮助我在库项目中创建了一个名为 lib 的目录 然后在库中引用它 这个库与原始的 Android
  • .NET 中的全局变量(或替代方案)最佳实践

    在 VB NET WinForms 应用程序中存储全局变量的最佳实践是什么 例如 当用户登录应用程序时 您可能希望存储一个可以在整个应用程序中访问的 CurrentUser 对象 您可以将其存储为模块中的对象 或者创建一个包含所有所需全局变
  • 如何使用 Javascript 滚动到底部时附加更多行

    It s in 共享点2010年 但我认为它可能会正常运行 1 默认情况下 每页的项目限制为 30 因此 我已经完成了在页面加载时显示 30 行的列表 然后我将鼠标向下滚动到底部 它隐藏了最后 10 行 Summary 页面加载 30 行
  • ToList 方法不适用于 TrackableCollection

    我们正在 EF 4 0 之上与可跟踪实体合作 为了删除实体及其所有依赖实体 我正在编写一个通用的DeleteDependentEntities 以便从EntityManager 中的Delete 方法调用 我们不 或者不想 依赖于在数据库中
  • 从数据库字段中删除特殊字符

    我有一个包含数千条记录的数据库 我需要删除其中一个字段以确保它只包含某些字符 字母数字 空格和单引号 我可以使用什么 SQL 从整个数据库的该字段中删除任何其他字符 例如斜杠等 update mytable set FieldName RE
  • Angular 7 - 我是否创建了太多订阅?

    我想知道我的代码是否会造成内存泄漏 Context 我有一个应该显示 应用程序 对象的组件类 它具有过滤和分页功能 我创建了一个方法加载应用程序数据 其中我订阅到 向 Web 服务发出请求后返回的 Observable 该方法在初始化时被调
  • 如何配置 spring-kafka 忽略格式错误的消息?

    我们的 Kafka 主题之一存在问题 该主题被DefaultKafkaConsumerFactory ConcurrentMessageListenerContainer组合描述here http docs spring io spring