带有断路器的 Kafka Consumer,使用 Resilience4j 重试模式

2023-12-31

我需要一些帮助来了解如何使用 Spring boot、Kafka、Resilence4J 提出解决方案,以实现来自 Kafka Consumer 的微服务调用。假设如果微服务关闭,那么我需要使用断路器模式通知我的 Kafka 消费者停止获取消息/事件,直到微服务启动并运行。


借助 Spring Kafka,您可以使用pause and resume方法取决于 CircuitBreaker 状态转换。我发现最好的方法是将其定义为带有 @Configuration 注释的“supervisor”。还使用了 Resilience4j。

@Configuration
public class CircuitBreakerConsumerConfiguration {

public CircuitBreakerConsumerConfiguration(CircuitBreakerRegistry circuitBreakerRegistry, KafkaManager kafkaManager) {
    circuitBreakerRegistry.circuitBreaker("yourCBName").getEventPublisher().onStateTransition(event -> {
  
        switch (event.getStateTransition()) {
            case CLOSED_TO_OPEN:
            case CLOSED_TO_FORCED_OPEN:
            case HALF_OPEN_TO_OPEN:
                kafkaManager.pause();
                break;
            case OPEN_TO_HALF_OPEN:
            case HALF_OPEN_TO_CLOSED:
            case FORCED_OPEN_TO_CLOSED:
            case FORCED_OPEN_TO_HALF_OPEN:
                kafkaManager.resume();
                break;
            default:
                throw new IllegalStateException("Unknown transition state: " + event.getStateTransition());
        }
    });
   }
}

这是我与带有注释的 KafkaManager 结合使用的@Component.

@Component
public class KafkaManager {
  private final KafkaListenerEndpointRegistry registry;

  public KafkaManager(KafkaListenerEndpointRegistry registry) {
    this.registry = registry;
  }
  public void pause() {   
    registry.getListenerContainers().forEach(MessageListenerContainer::pause);
  }

  public void resume() {
    registry.getListenerContainers().forEach(MessageListenerContainer::resume);
  }
}

此外,我的消费者服务如下所示:

  @KafkaListener(topics = "#{'${topic.name}'}", concurrency = "1", id = "CBListener")
public void receive(final ConsumerRecord<String, ReplayData> replayData, Acknowledgment acknowledgment) throws
        Exception {

    try {
        httpClientServiceCB.receiveHandleCircuitBreaker(replayData);
        acknowledgement.acknowledge();
    } catch (Exception e) {
        acknowledgment.nack(1000);
    }
}

And the @CircuitBreaker注解:

@CircuitBreaker(name = "yourCBName")
public void receiveHandleCircuitBreaker(ConsumerRecord<String, ReplayData> replayData) throws
        Exception {
    try {
        String response = restTemplate.getForObject("http://localhost:8081/item", String.class);
    } catch (Exception e                                                                       ) {
       
        // throwing the exception is needed to trigger the Circuit Breaker state change
        throw new Exception();
    }
}

并且还补充了以下内容application.properties

  resilience4j.circuitbreaker.instances.yourCBName.failure-rate-threshold=80
  resilience4j.circuitbreaker.instances.yourCBName.sliding-window-type=COUNT_BASED
  resilience4j.circuitbreaker.instances.yourCBName.sliding-window-size=5
  resilience4j.circuitbreaker.instances.yourCBName.wait-duration-in-open-state=10000
  resilience4j.circuitbreaker.instances.yourCBName.automatic-transition-from-open-to-half-open-enabled=true
  spring.kafka.consumer.enable.auto.commit = false
  spring.kafka.listener.ack-mode = MANUAL_IMMEDIATE

还可以看看https://resilience4j.readme.io/docs/Circuitbreaker https://resilience4j.readme.io/docs/circuitbreaker

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

带有断路器的 Kafka Consumer,使用 Resilience4j 重试模式 的相关文章

随机推荐

  • 上下文相关的标记化是否需要词汇语法中的多个目标符号?

    根据ECMAScript 规范 https tc39 es ecma262 sec ecmascript language lexical grammar 词法输入的识别有几种情况 元素对句法语法上下文敏感 即 消耗输入元素 这需要多个目标
  • 如何解析包含 javascript 代码的 html

    如何解析大量使用 javascript 的 html 文档 我知道python中有一些库可以解析静态xml html文件 我基本上正在寻找一个程序或库 甚至是firefox插件 来读取html javascript 执行javascript
  • 添加文本框值并使用 javascript 显示它

    我正在尝试使用 javascript 添加几个文本框的输入值并在下面显示总数 如何添加并保留计算后显示的总和 我不是 JavaScript 专家 下面是一个向您展示如何执行此操作的示例
  • Angular js 对本地化的支持 [关闭]

    Closed 这个问题正在寻求书籍 工具 软件库等的推荐 不满足堆栈溢出指南 help closed questions 目前不接受答案 我尝试在 AngularJS 中查找支持多种语言的文档 但没有成功 支持本地化吗 看看角度翻译 htt
  • 如何在python中使用scrapy获取直接父节点?

    我是新来的scrapy 我想从网络上抓取一些数据 我得到了如下所示的html文档 dom style1 div class user info p class user name something in p tag p text data
  • 实体框架:多对多关系中的重复记录

    我有以下实体框架代码第一代码 创建表并插入数据 但是 Club 表中有重复的记录 我的操作是 使用俱乐部创建应用程序创建俱乐部 使用人员应用程序创建人员 如何避免重复录入 static void Main string args Datab
  • Linq Select 语句 - 不在的地方

    我正在尝试编写相当于以下内容的 LINQ 语句 select e EmployeeID EmployeeName e FirstName e LastName from Employees e where e EmployeeID not
  • 对 `search_as_you_type` ngram 子字段感到困惑

    我正在尝试将 键入时搜索 功能添加到 Elasticsearch 中名为email address 我的理解从文档 https www elastic co guide en elasticsearch reference 7 7 sear
  • 从本地到 Heroku 服务器的 SCP 文件

    我想将 config yml 文件从本地 django 应用程序目录复制到我的 heroku 服务器 但我不知道如何获取 电子邮件受保护 cdn cgi l email protectionHeroku 的格式 我尝试过运行 heroku
  • Android Room 按别名排序

    我想根据我创建的自定义别名来订购数据集 我尝试过 但它会导致语法错误 我究竟做错了什么 Code Query SELECT a b as ratio FROM dataset where my status myStatus order b
  • WKWebview注入cookie头导致重定向循环

    我试图将我单独获取的会话cookie注入到WKWebview请求中 结果证明这是相当痛苦的 我设法使用注入会话cookie这个解决方案 https stackoverflow com questions 26573137 can i set
  • PCM -> AAC(编码器) -> PCM(解码器)实时且正确优化

    我正在尝试实施 AudioRecord MIC gt PCM gt AAC Encoder AAC gt PCM Decode gt AudioTrack SPEAKER with MediaCodec在 Android 4 1 API16
  • 如何在MySQL中进行批量插入

    我有 1 多条记录需要输入到表中 在查询中执行此操作的最佳方法是什么 我应该创建一个循环并每次迭代插入一条记录吗 或者 还有更好的方法 来自MySQL手册 http dev mysql com doc refman 5 7 en inser
  • Azure 管理 REST API - “身份验证失败。‘授权’标头以无效格式提供。”

    我拼命尝试将 2 个经典存储帐户从旧的 MSDN 订阅移动到 MPN 订阅 但我一直遇到困难 因为仅通过 REST API 支持这些帐户的移动 我已按照此处的说明启用了 API https azure microsoft com en us
  • Eclipse 是否有排列类文件的功能?

    Eclipse 有很多功能 我想知道这个功能是否存在 或者是否存在任何捷径 我想将我的类数据排列到该流程中的变量 构造函数 方法中 从上到下 进一步细化我想按访问级别 pub private protected 和类型 void 或返回的方
  • 使用 GSON 获取 JSON 键名

    我有一个 JSON 数组 其中包含如下对象 bjones fname Betty lname Jones password ababab level manager 我的 User 类有一个用户名 需要使用 JSON 对象的密钥 我如何获取
  • 添加不属于模型一部分的自定义表单字段 (Django)

    我在管理网站上注册了一个模型 它的字段之一是长字符串表达式 我想将自定义表单字段添加到管理员中此模型的添加 更新页面 根据这些字段的值 我将构建长字符串表达式并将其保存在相关的模型字段中 我怎样才能做到这一点 我正在从符号构建数学或字符串表
  • 在elasticbeanstalk中设置NODE_ENV变量

    我创建了一个名为 elasticbeanstalk environment config其中包含以下内容 option settings option name NODE ENV value development 我还将 process
  • 具有多个可选参数的 Spring Data MongoDB AND/OR 查询

    我正在尝试执行具有两个以上可选参数的查询 但没有得到任何结果 对于2个参数我遵循了这个问题的答案spring data mongo 可选查询参数 https stackoverflow com questions 11613464 spri
  • 带有断路器的 Kafka Consumer,使用 Resilience4j 重试模式

    我需要一些帮助来了解如何使用 Spring boot Kafka Resilence4J 提出解决方案 以实现来自 Kafka Consumer 的微服务调用 假设如果微服务关闭 那么我需要使用断路器模式通知我的 Kafka 消费者停止获取