如何有效地将我的@KafkaListener绑定到ConcurrentKafkaListenerContainerFactory?

2023-12-03

我遇到了这个对我来说似乎很奇怪的场景:

所以基本上我定义了两个@KafkaListener在一堂课中:

@KafkaListener(id = "listener1", idIsGroup = false, topics = "data1", containerFactory = "kafkaListenerContainerFactory")
    public void receive(){}

@KafkaListener(id = "listener2", idIsGroup = false, topics = "data2", containerFactory = "kafkaListenerContainerFactory2")
    public void receive(){}

Their id, topics, containerFactory是不同的,并且每个都依赖于不同的ConcurrentKafkaListenerContainerFactory正如另一个类中定义的:

@Bean
public ConcurrentKafkaListenerContainerFactory<String, ConsumerRecord> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, ConsumerRecord> factory = new ConcurrentKafkaListenerContainerFactory();
    factory.setConsumerFactory(consumerFactory("group1", "earliest"));
    factory.setAutoStartup(false);
    return factory;
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, ConsumerRecord> kafkaListenerContainerFactory2() {
    ConcurrentKafkaListenerContainerFactory<String, ConsumerRecord> factory = new ConcurrentKafkaListenerContainerFactory();
    factory.setConsumerFactory(consumerFactory("group2", "latest"));
    factory.setAutoStartup(true);
    return factory;
}

@Bean
public ConsumerFactory<String, ConsumerRecord> consumerFactory(String groupId, String offset) {
    Map<String, Object> config = new HashMap<>();
    // dt is current timestamp in millisecond (epoch)
    config.put(ConsumerConfig.GROUP_ID_CONFIG, groupId + "-" + dt);
    config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, offset);
    // other config omitted
    return new DefaultKafkaConsumerFactory<>(config);
}

所以我期望看到的(以及我想要实现的)是:

  1. 只有listener2会自动启动,因为factory.setAutoStartup(true)
  2. Listener2 将以group.id“组2”和auto.offset.reset“最新的”
  3. 稍后当listener1通过某个事件监听器启动时,它将启动 和group.id“组1”和auto.offset.reset“最早的”

然而,实际上只有第一个是有保证的。 Listener2 可以从 {group2 +latest} 或 {group1+earest} 开始。后来当listener1开始使用数据时,它只会重用listener2的配置(我可以看到包含时间戳的相同组id在我的日志中打印了两次)

我的问题是,为什么listener2的组ID和偏移配置是随机选择的,而autoStartup不是随机选择的?为什么listener1会重用listener2的配置?


这是因为consumerFactory是一个单例@Bean并且第二次调用时将忽略参数。

Add @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)每次去工厂都会得到一颗新豆。

但是,您不需要任何这些,您只需设置groupId注释上的属性并避免所有这些额外的定义。

您还可以控制autoStartup关于注释(自 2.2 起)。

EDIT

回答下面评论中的问题...

groupId = "#{'${group.id}' + T(java.time.Instant).now().toEpochMilli()}"

但是,如果您想要一个唯一的组 ID;这个比较靠谱...

groupId = "#{'${group.id}' + T(java.util.UUID).randomUUID()}"
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如何有效地将我的@KafkaListener绑定到ConcurrentKafkaListenerContainerFactory? 的相关文章

随机推荐

  • 高效测地线最近邻

    从纬度 经度数据 以弧度为单位 开始 我尝试有效地找到最近的 n 个邻居 最好是测地线 WGS 84 距离 现在我正在使用sklearn 球树使用半正矢距离 KD Tres 仅采用 minkowskian 距离 这很好而且快速 3 4 秒即
  • 将大文件写入磁盘内存不足异常

    我正在尝试写入然后读取一个大型随机文件来计算磁盘速度 我尝试了多种算法 但在尝试写入 1GB 文件时不断出现输出或内存异常 这是我尝试过的一些 Method 1 byte data new byte 8192 Random rng new
  • jQuery UI 日期选择器可以禁用周六和周日(以及节假日)吗?

    我使用日期选择器来选择约会日期 我已经将日期范围设置为仅下个月 效果很好 我想从可用选项中排除周六和周日 这可以做到吗 如果是这样 怎么办 有的是beforeShowDay选项 它需要为每个日期调用一个函数 如果允许该日期则返回 true
  • 具有机器人框架的多个远程库

    根据 机器人框架 手册中的示例 为了从远程计算机提供关键字 必须实例化RobotRemoteServer带有实现关键字的类的实例 RobotRemoteServer ExampleRemoteLibrary sys argv 1 如果我有多
  • SQL INDEX 不用于 WHERE ABS(x-y) < k 条件,但用于 y - k < x < y + k 条件

    我有一个查询涉及时差小于 2 小时的几行 0 08333 天 SELECT mt1 mt2 FROM mytable mt1 mytable mt2 WHERE ABS JULIANDAY mt1 date JULIANDAY mt2 da
  • 如何测试未知的 Delphi RTTI TValue 是否反映任何类型的通用 TList<>(或至少 TEnumerable<>)的对象?

    在德尔福 如果我有一个TValue反映未知对象的实例 如何测试该对象是否是任何类型泛型的实例TEnumerable lt gt 或者甚至更好 也which它是特定通用枚举类型的实例 例如TList lt gt 注意 我已经知道如何轻松检查其
  • 在 scala 中重定向 stdin 和 stdout

    如何将 STDIN 和 STDOUT 重定向到文件 在 C 语言中 可以像这样完成 freopen file in r stdin 我正在寻找与 Scala 相当的东西 您可以使用 Java 系统 api 来完成此操作 Java 和 Sca
  • Java:实例双数组元素值修改问题

    我是 Java 新手 我有一个可以为其创建实例的类 在类中我定义了两个实例变量 double array1 double array2 数组的长度相等 在类中 我有一个首先填充的 method1array1然后是另一种方法2 我想在其中设置
  • 使用 KSOAP2 序列化要发送的整数数组

    我在尝试将整数数组发送到 NET Web 服务时遇到问题 该服务需要参数之一包含数组 至少这是我从 Web 服务上的 API 描述中了解到的
  • 颠倒的文字

    您将如何设计一个程序来接收一串小写字母并颠倒产生该字符串 所以如果我输入home i get o 倒挂 我尝试在书中寻找入门内容 但一无所获 试试这个 有点暴力的方法 但对于大写 小写和数字字符非常有效 所有其他字符都按原样显示 defin
  • SQL 将 2 个表连接到 1 个表

    我的任务是连接 3 个表 任务 单位和建筑物 任务表有一列表示单位 一列表示建筑物 任何单一任务仅分配给一座建筑物或一个单元 而不是两者 因此 每条记录中的一列始终为空 任务表中有6100条记录 当我使用这个连接时 select from
  • 如何使用dll?

    我知道如果我有一个 a 或 so 文件以及该库的头文件 例如 SystemC 我应该 1 包含头文件 2 链接适当的库 但我无法仅处理 dll 文件 因为我也可以链接它 但没有要包含和使用命令的侦听器文件 有人可以解释一下存在什么样的 dl
  • 结束 NSTableView 上的编辑(基于视图)

    当用户正在编辑表格视图上的文本字段并决定单击窗口的关闭按钮时 他 她所做的更改不会保存 如何强制表格视图 基于视图 结束编辑 而不是中止编辑 最简单的方法是为您的窗口分配一个委托并响应NSWindow s 窗口应该关闭 委托方法 在其中调用
  • 单击 UIBarButton 显示 toast 并双击返回操作需要执行

    我在导航栏中有一个 UIBarButton 当单击后退按钮 第一次点击 时 我需要显示 toast 如警告 双击时我需要快速退出页面 以下用于显示 toast 的代码 其工作正常 let toastLabel UILabel frame C
  • 以“MMMyyyy”为键对地图进行排序

    我有一张地图 其键采用 MMMyyyy 格式 我需要根据月份进行排序 输入 unsorted Dec2010 1 Apr2010 1 Feb2010 0 Nov2010 2 Mar2010 0 Jun2010 2 Sep2010 1 May
  • 在 Azure DevOps 中跨构建管道共享变量

    我的 azure devops 项目中有 2 个构建管道 一个用于构建源代码 另一个用于构建 进行设置 我希望将编译代码的第一个管道生成的内部版本号传递到创建安装文件的下一个管道 因为我希望安装文件采用相同的版本 所以我添加了一个变量组 其
  • 在android中创建给定形状的图像视图

    我需要在 Android 中创建这个特定形状的图像视图 如果可以以这种形状裁剪图像 那么也很好 请帮我解决一下 这将是完整的套装
  • Select 语句中的 PHP 变量

    我已经编写了这个正在运行的 PHP 脚本 现在我想将行名称更改为变量 不确定行是否正确 我的意思是来自select name 这容易多了不是吗 sql insert INSERT INTO customers name address em
  • 如何在 C++ 中杀死进程,只知道其名称的一部分

    前段时间我需要编写 C 代码来终止某些进程 在我的主程序中 我使用 system 运行大型 CAE 系统包 并在输入上使用不同的文件名字符串 CAE 软件创建许多进程 其中包含进程名称字符串filename 一些CAE过程worktime
  • 如何有效地将我的@KafkaListener绑定到ConcurrentKafkaListenerContainerFactory?

    我遇到了这个对我来说似乎很奇怪的场景 所以基本上我定义了两个 KafkaListener在一堂课中 KafkaListener id listener1 idIsGroup false topics data1 containerFacto