事务性 Kafka 生产者

2024-05-08

我正在尝试让我的卡夫卡生产者具有事务性。 我正在发送 10 条消息。如果发生任何错误,则不应向 kafka 发送任何消息,即不发送或全部消息。

我正在使用 Spring Boot KafkaTemplate。

@Configuration
@EnableKafka
public class KakfaConfiguration {

    @Bean
    public ProducerFactory<String, String> producerFactory() {
        Map<String, Object> config = new HashMap<>();

        // props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");
        // props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG,
        // appProps.getJksLocation());
        // props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG,
        // appProps.getJksPassword());
        config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        config.put(ProducerConfig.ACKS_CONFIG, acks);
        config.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, retryBackOffMsConfig);
        config.put(ProducerConfig.RETRIES_CONFIG, retries);
        config.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
        config.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "prod-99");

        return new DefaultKafkaProducerFactory<>(config);
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }

    @Bean(name = "ktm")
    public KafkaTransactionManager kafkaTransactionManager() {
        KafkaTransactionManager ktm = new KafkaTransactionManager(producerFactory());
        ktm.setTransactionSynchronization(AbstractPlatformTransactionManager.SYNCHRONIZATION_ON_ACTUAL_TRANSACTION);
        return ktm;
    }

}

我正在发送 10 条消息,如下所示,如文档中所述。应发送 9 条消息,并且 I 消息大小超过 1MB,由于以下原因被 Kafka 代理拒绝RecordTooLargeException

https://docs.spring.io/spring-kafka/reference/html/#using-kafkatransactionmanager https://docs.spring.io/spring-kafka/reference/html/#using-kafkatransactionmanager

@Component
@EnableTransactionManagement
class Sender {

    @Autowired
    private KafkaTemplate<String, String> template;

    private static final Logger LOG = LoggerFactory.getLogger(Sender.class);

    @Transactional("ktm")
    public void sendThem(List<String> toSend) throws InterruptedException {
        List<ListenableFuture<SendResult<String, String>>> futures = new ArrayList<>();
        CountDownLatch latch = new CountDownLatch(toSend.size());
        ListenableFutureCallback<SendResult<String, String>> callback = new ListenableFutureCallback<SendResult<String, String>>() {

            @Override
            public void onSuccess(SendResult<String, String> result) {
                LOG.info(" message sucess : " + result.getProducerRecord().value());
                latch.countDown();
            }

            @Override
            public void onFailure(Throwable ex) {
                LOG.error("Message Failed ");
                latch.countDown();
            }
        };

        toSend.forEach(str -> {
            ListenableFuture<SendResult<String, String>> future = template.send("t_101", str);
            future.addCallback(callback);
        });

        if (latch.await(12, TimeUnit.MINUTES)) {
            LOG.info("All sent ok");
        } else {
            for (int i = 0; i < toSend.size(); i++) {
                if (!futures.get(i).isDone()) {
                    LOG.error("No send result for " + toSend.get(i));
                }
            }
        }

但是当我看到主题 t_hello_world 9 消息时就在那里。我的期望是看到 0 条消息,因为我的生产者是事务性的。 我怎样才能实现它?

我收到以下日志

2020-04-30 18:04:36.036 ERROR 18688 --- [   scheduling-1] o.s.k.core.DefaultKafkaProducerFactory   : commitTransaction failed: CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@1eb5a312, txId=prod-990]

org.apache.kafka.common.KafkaException: Cannot execute transactional method because we are in an error state
    at org.apache.kafka.clients.producer.internals.TransactionManager.maybeFailWithError(TransactionManager.java:923) ~[kafka-clients-2.4.1.jar:na]
at org.apache.kafka.clients.producer.internals.TransactionManager.lambda$beginCommit$2(TransactionManager.java:297) ~[kafka-clients-2.4.1.jar:na]
at org.apache.kafka.clients.producer.internals.TransactionManager.handleCachedTransactionRequestResult(TransactionManager.java:1013) ~[kafka-clients-2.4.1.jar:na]
at org.apache.kafka.clients.producer.internals.TransactionManager.beginCommit(TransactionManager.java:296) ~[kafka-clients-2.4.1.jar:na]
at org.apache.kafka.clients.producer.KafkaProducer.commitTransaction(KafkaProducer.java:713) ~[kafka-clients-2.4.1.jar:na]
at org.springframework.kafka.core.DefaultKafkaProducerFactory$CloseSafeProducer.commitTransaction(DefaultKafkaProducerFactory.java



Caused by: org.apache.kafka.common.errors.RecordTooLargeException: The request included a message larger than the max message size the server will accept.

2020-04-30 18:04:36.037  WARN 18688 --- [   scheduling-1] o.s.k.core.DefaultKafkaProducerFactory   : Error during transactional operation; producer removed from cache; possible cause: broker restarted during transaction: CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@1eb5a312, txId=prod-990]
2020-04-30 18:04:36.038  INFO 18688 --- [   scheduling-1] o.a.k.clients.producer.KafkaProducer     : [Producer clientId=producer-prod-990, transactionalId=prod-990] Closing the Kafka producer with timeoutMillis = 5000 **ms.
2020-04-30 18:04:36.038  INFO 18688 --- [oducer-prod-990] o.a.k.clients.producer.internals.Sender  : [Producer clientId=producer-prod-990, transactionalId=prod-990] Aborting incomplete transaction due to shutdown**

未提交的记录写入日志;当事务提交或回滚时,会在日志中写入一条额外的记录,其中包含事务的状态。

默认情况下,消费者可以看到所有记录,包括未提交的记录(但看不到特殊的提交/中止记录)。

对于控制台消费者,需要将隔离级别设置为read_committed。查看帮助:

--isolation-level <String>           Set to read_committed in order to      
                                       filter out transactional messages    
                                       which are not committed. Set to      
                                       read_uncommitted to read all          
                                       messages. (default: read_uncommitted)
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

事务性 Kafka 生产者 的相关文章

  • 生产者程序中的 kafka 网络处理器错误(ArrayIndexOutOfBoundsException:18)

    我有下面的 kafka Producer Api 程序 我对 kafka 本身是新手 下面的代码从 API 之一获取数据并将消息发送到 kafka 主题 package kafka Demo import java util Propert
  • 如何在 Python 中以编程方式检查 Kafka Broker 是否已启动并运行

    我正在尝试使用来自 Kafka 主题的消息 我正在使用包装器confluent kafka消费者 我需要在开始使用消息之前检查连接是否已建立 我读到消费者很懒 所以我需要执行一些操作才能建立连接 但我想检查连接建立而不执行consume o
  • 为每个键使用主题中的最新值

    我有一个 Kafka 生产者 它正在以高速率生成消息 消息键是用户名 值是他在游戏中的当前分数 Kafka消费者处理消费消息的速度相对较慢 在这里 我的要求是显示最新的分数并避免显示陈旧的数据 但代价是某些分数可能永远不会显示 本质上 对于
  • kafka消费者群体正在重新平衡

    我正在使用 Kafka 9 和新的 java 消费者 我正在循环内进行轮询 当代码尝试执行 Consumer commitSycn 时 由于组重新平衡 我收到 commitfailedexcption 请注意 我将 session time
  • 频繁出现“offset out of range”消息,分区被消费者抛弃

    我们正在运行 3 节点 Kafka 0 10 0 1 集群 我们有一个消费者应用程序 它有一个连接到多个主题的消费者组 我们在消费者日志中看到奇怪的行为 有了这些线 Fetch offset 1109143 is out of range
  • 有没有办法使用 .NET 中的 Kafka Ksql Push 查询

    我目前正在 NET 中使用 Kafka 消费者处理大量 Kafka 消息 我的处理过程的第一步是解析 JSON 并根据 JSON 中特定字段的值丢弃许多消息 我不想首先处理 特别是不下载 那些不需要的消息 看起来 kSql 查询 写为推送查
  • Spring Kafka - 为任何主题的分区消耗最后 N 条消息

    我正在尝试读取请求的卡夫卡消息数 对于非事务性消息 我们将从 endoffset N 对于 M 个分区 开始轮询并收集当前偏移量小于每个分区的结束偏移量的消息 对于幂等 事务消息 我们必须考虑事务标记 重复消息 这意味着偏移量将不连续 在这
  • Kafka Java Consumer 已关闭

    我刚刚开始使用卡夫卡 我面临着消费者的一个小问题 我用Java写了一个消费者 我收到此异常 IllegalStateException 此消费者已关闭 我在以下行中遇到异常 ConsumerRecords
  • 如何有效地将数据从 Kafka 移动到 Impala 表?

    以下是当前流程的步骤 Flafka http blog cloudera com blog 2014 11 flafka apache flume meets apache kafka for event processing 将日志写入
  • 如何在 PySpark 中使用 foreach 或 foreachBatch 写入数据库?

    我想使用 Python PySpark 从 Kafka 源到 MariaDB 进行 Spark 结构化流处理 Spark 2 4 x 我想使用流式 Spark 数据帧 而不是静态数据帧或 Pandas 数据帧 看来必须要用foreach o
  • Zookeeper + Kafka - 无法创建数据目录

    我在单节点中使用zookeeper 3 4 8并尝试使用kafka 当我运行这个命令时 zookeeper server start sh usr local kafka 2 9 2 0 8 2 2 config zookeeper pro
  • Spring Kafka Acknowledgement.acknowledge 线程安全吗?

    我正在实现一个基于卡夫卡的应用程序 我想在其中手动确认传入消息 架构迫使我在单独的线程中完成它 问题是 在与消费者不同的线程中执行 Acknowledgement acknowledge 是否可能且安全 是的 只要你使用MANUAL并不是M
  • Kafka中如何同时实现分布式处理和高可用?

    我有一个由 n 个分区组成的主题 为了进行分布式处理 我创建了两个在不同机器上运行的进程 他们使用相同的 groupd id 订阅主题并分配 n 2 个线程 每个线程处理单个流 每个进程 n 2 个分区 这样我就可以实现负载分配 但现在如果
  • 删除主题级别配置

    为了删除主题中的所有数据 我将其retention ms配置设置为1000 bin kafka topics sh zookeeper KAFKAZKHOSTS alter topic
  • 从kafka获取特定时间段的结果

    这是我的代码 它使用kafka python now datetime now month ago now relativedelta month 1 topic some topic name consumer KafkaConsumer
  • 如何删除 Apache Kafka 中的主题? [复制]

    这个问题在这里已经有答案了 我需要删除 Kafka 0 8 2 2 3 中的一个主题 我使用以下命令删除主题 bin kafka topics sh zookeeper localhost 2181 delete topic DummyTo
  • Kafka JDBC Sink Connector,批量插入值

    我每秒收到很多消息 通过 http 协议 50000 100000 并希望将它们保存到 PostgreSql 我决定使用 Kafka JDBC Sink 来实现此目的 消息以一条记录保存到数据库 而不是批量保存 我想在 PostgreSQL
  • 将 Kafka 输入流动态连接到多个输出流

    Kafka Streams 中是否内置了允许将单个输入流动态连接到多个输出流的功能 KStream branch允许基于真 假谓词进行分支 但这并不是我想要的 我希望每个传入的日志都确定它将在运行时流式传输到的主题 例如日志 date 20
  • 如何使用PySpark结构流+Kafka

    我尝试将 Spark 结构流与 kafka 一起使用 并且在使用 Spark 提交时遇到问题 消费者仍然从生产中接收数据 但 Spark 结构出错 请帮我找到我的代码的问题 这是我在 test py 中的代码 from kafka impo
  • 卡夫卡监听器中的钩子

    kafka 监听消息之前 之后是否有任何类型的钩子可用 使用案例 必须设置MDC关联id才能进行日志溯源 我在寻找什么 之前 之后回调方法 以便可以在进入时设置 MDC 关联 ID 并最终在退出时清除 MDC 编辑后的场景 我将关联 id

随机推荐

  • 如何在android中直接从.zip文件读取文件而不解压它

    过去几个月我一直在研究 android 现在我的问题是读取放在 sdcard 上的 zip 文件 我已经成功完成了将 zip 文件下载到 SD 卡上的编码 我已将 img zip 文件下载到 SD 卡上 此 img zip 包含 5 个图像
  • C++ 求矩阵最小和最大元素之间的元素和

    因此 我的程序按其应有的方式工作 但前提是最小和最大元素位于对角 所以我的问题是如何将二维数组从一个特定元素迭代到另一个元素 也许摆脱一些嵌套循环 我应该将这个数组转换为一维吗 这是代码的正确工作方式 就在这时 出现了问题 元素 0 1 和
  • 当子列表视图在颤动中到达末尾时,有什么方法可以滚动父列表视图吗?

    假设我有一个可滚动页面 在该页面内我有另一个可滚动列表视图 垂直 所以我希望当子列表视图到达末尾时 可滚动页面开始移动到其末尾 此外 当子列表视图到达顶部时 可滚动页面开始移动到顶部 怎样才能做到这一点 这是我的代码 Widget Fres
  • P2P网络游戏/应用程序:类似“战网”匹配服务器的不错选择

    我正在制作一个网络游戏 1v1 游戏中是 p2p 不需要游戏服务器 然而 为了让玩家能够 找到彼此 而不需要在另一种媒介中协调并输入IP地址 类似于网络游戏的现代时代 我需要有一个协调 匹配服务器 我无法使用常规网络托管 因为 客户端将使用
  • 构建 jar 后无法运行 exe

    我制作了一个简单的实用应用程序 其中我有一个要运行的exe文件 我通过使用它来运行 Runtime getRuntime exec this getClass getResource filename exe getPath 当我从 ide
  • Java 中的 sscanf 等效项[重复]

    这个问题在这里已经有答案了 可能的重复 用于使用已知模式解析字符串中的值的 sscanf 的 Java 等效项是什么 https stackoverflow com questions 8430022 what is the java eq
  • 在 Java 构建过程中更改常量的最佳方法

    我继承了一个在 Tomcat 下运行的 Java 应用程序 servlet 由于历史原因 根据应用程序的部署位置 本质上是品牌问题 代码具有不同的 外观和感觉 选项 有几个常量控制这个品牌过程 它们具有不同的功能 不应压缩为单个常量 即 B
  • VT_DATE 类型的微秒支持

    VT DATE 变体类型是否支持微秒分辨率 请告诉我如何在VB中显示相同的内容 http msdn microsoft com en us library ms221646 aspx http msdn microsoft com en u
  • 回调和部分回发有什么区别?

    有区别吗 或者这些术语是同义词吗 抱歉 如果之前有人问过这个问题 我只能找到a之间的区别full回发和回调 我已经知道完整回发有何不同 在使用 ASP Net 2 0 时 如果这很重要的话 顺便问一下 这重要吗 或者这些术语对于任何基于 W
  • 用于将字符串与预定义字符混合/混淆的简单算法

    我有一个字符串如下 它的长度是10 它代表基数 36 因此包含数字和大写字母 字符串的来源是数据库生成的序列 即从 1 及以上 正在转换为基数 36 我的问题是转换为base 36转换的结果也是连续 顺序的 例如 ID 1402 gt 00
  • 获取已安装的 Windows 应用商店应用程序列表

    有多种方法可以获取控制面板中 添加 删除程序 中已安装应用程序的列表 但我也想从 Windows 应用商店获取已安装应用程序的列表 到目前为止我还没有得到任何东西 有什么方法可以获取从 Windows 应用商店安装的应用程序列表吗 您可以在
  • SQL Proc 从 varchar 到 int 的“转换失败”。为什么要转换?

    我的问题是 为什么它从 varchar 转换为 int 我不确定它想做什么 CREATE PROCEDURE myTestProcedure TransId VARCHAR 15 AS BEGIN DECLARE Result VARCHA
  • MyBatis Spring Boot 自定义类型处理程序

    我需要 Spring Boot 和 MyBatis 集成方面的帮助 我对自定义 BaseTypeHandler 有疑问 我创建了一个映射器 MappedTypes LocalDateTime class public class Local
  • 什么是“更便宜”的性能明智的 $broadcast 或 $watch

    我的应用程序中有一种情况 每次用户的角色发生变化时 我都需要重新加载菜单 一个用户可以在多个公司中拥有角色 我想知道解决这个问题的最佳方法是什么 目前我正在做以下事情 app controller menuLoadingCtrl funct
  • 如何在辅助显示器上全屏显示图像?

    如何使用 PyQt5 PySide 或任何其他 Python 库在辅助 显示器上以全屏模式显示所需的图像 过去 我使用帧缓冲区图像查看器 Fbi https manpages ubuntu com manpages bionic man1
  • 安卓多点触控?

    作为一名开发人员 我倾向于先编程 然后再研究 我试图实现一个可以处理多个用户输入的屏幕 基本上映射的不仅仅是一根手指 我尝试了两件事 我有一个实现 OnTouchListener 的 Activity 类 这里我有两个单独的子视图 它们将
  • ios6 UIImageView - 加载-568h 图像

    我看过一些关于 UIImage 自动加载的帖子文件名 568 png新的 iOS6 中的图像 但我似乎无法在 UIImageView 类中重新创建它 我正在使用故事板 不是我的应用程序 只需要做一些检查 并且我有一个简单的布局 仅缩放图像视
  • 我可以通过编程方式获取连接到手机的 wifi 的 MAC 地址吗?

    我的手机已连接到 wifi 我想获取我的 wifi 的 MAC 地址 BSSID 和 mac 地址是同一回事 您可以通过此函数获取 mac 地址 只需导入 SystemConfiguration CaptiveNetwork func ge
  • 易于使用的Python加密库/包装器?

    我想在Python中用密码加密任意长度的字符串 我会比较喜欢not处理填充 密钥生成和 IV 因为说实话 我对密码学还不太了解 而且我想避免搞砸 我还更喜欢使用众所周知的密码作为 AES 我理想的库 我们称之为 MagicCrypt 会像这
  • 事务性 Kafka 生产者

    我正在尝试让我的卡夫卡生产者具有事务性 我正在发送 10 条消息 如果发生任何错误 则不应向 kafka 发送任何消息 即不发送或全部消息 我正在使用 Spring Boot KafkaTemplate Configuration Enab