Spring Cloud框架学习-Spring Cloud Stream

2023-11-03

1. 基本介绍

Spring Cloud Stream 用一个用来为微服务应用构建消息驱动能力的框架。Spring Cloud Stream 中,提供了一个微服务和消息中间件之间的一个粘合剂,这个粘合剂叫做Binder,Binder 负责与消息中间件进行交互。而我们开发者则通过 inputs 或者 outputs 这样的消息通道与 Binder 进行交互。
在这里插入图片描述

Spring Cloud Stream 通过使用Spring Integration来连接消息代理中间件以实现消息事件驱动,为流行的消息中间件产品(Spring Cloud Stream 原生默认支持RabbitMQ,Kafka。阿里在官方基础上提供了RocketMQ的支持)提供了个性化的自动化配置实现,引用了发布-订阅模式消费组分区的三大核心概念。

2. 设计思想

那么Spring Cloud Stream是怎么屏蔽底层差异的呢?它通过定义绑定器Binder作为中间层,实现了应用程序与消息中间件细节之间的隔离。向应用程序暴露统一的 Channel 通道,使得应用程序不需要再考虑各种消息中间件的实现

绑定器Binder的说明:
在没有绑定器这个概念的情况下,Spring Boot应用要直接与消息中间件进行信息交互的时候,由于各消息中间件构建的初衷不同,它们的实现细节上会有较大的差异性。通过定义绑定器作为中间层,可以完美地实现应用程序与消息中间件细节之间的隔离。Stream对消息中间件的进一步封装,可以做到代码层面对中间件的无感知,甚至于动态的切换中间件,实现微服务和具体消息中间件的解耦,使得微服务可以关注更多自己的业务流程。一个集成Spring Cloud Stream 程序的框架示意图,如下图所示:
在这里插入图片描述
在这里插入图片描述
Binder中的INPUT和OUTPUT针对Binder本身而言,INPUT对应于消费者,OUTPUT对应于生产者。 。INPUT接收消息生产者发送的消息,OUTPUT发送消息给到消息消费者消费。

Spring Cloud Stream处理消息的业务流程图如下:
在这里插入图片描述

  • binder: 目标绑定器,目标指的是 kafka 还是 RabbitMQ,绑定器就是封装了目标中间件的包。如果操作的是 kafka 就使用 kafka binder ,如果操作的是 RabbitMQ 就使用 rabbitmq binder。

  • Source和Sink:可理解为参照对象是Spring Cloud Stream自身,从Stream发布消息就是输出,接收消息就是输入。Source用于获取数据(要发送到MQ的数据),Sink用于提供数据(要接收MQ发送的数据,提供数据给消息消费者)

  • Channel:通道,是队列Queue的一种抽象,在消息通讯系统中就是实现存储和转发的媒介。用于存放source接收到的数据,或者是存放binder拉取的数据。

  • Message:一种规范化的数据结构,生产者和消费者基于这个数据结构通过外部消息系统与目标绑定器和其他应用程序通信。

3. 常用注解

在这里插入图片描述

@StreamListener注解需要通过配置属性实现JSON字符串到对象的转换,这是因为在Spring Cloud Stream中实现了一套可扩展的消息转换机制。在消息消费逻辑执行之前,消息转换机制会根据消息头信息中声明的消息类型,找到对应的消息转换器并实现对消息的自动转换。配置示例:

spring.cloud.stream.bindings.<通道名>.content-type=application/json

4. 简单入门

创建一个Maven项目,在pom.xml添加三个依赖:Web、RabbitMQ、Spring Cloud Stream。具体要引入的依赖:

   <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-binder-rabbit</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.amqp</groupId>
            <artifactId>spring-rabbit-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream</artifactId>
            <scope>test</scope>
            <classifier>test-binder</classifier>
            <type>test-jar</type>
        </dependency>
    </dependencies>

项目创建成功后,配置文件中添加RabbitMQ的配置信息。

spring.rabbitmq.host=101.43.30.128
spring.rabbitmq.port=5672
spring.rabbitmq.username=
spring.rabbitmq.password=
spring.rabbitmq.virtual-host=/learn

创建一个简单的消息接收器MsgReceiver 类,用于接收MQ消息。

其中@StreamListener注解的作用是将被修饰的方法注册为消息中间件上数据流的事件监听器,注解中的属性值对应了监听的消息通道名。

//@EnableBinding 表示绑定 Sink 消息通道
@EnableBinding(Sink.class)
public class MsgReceiver {
    public final static Logger LOGGER = LoggerFactory.getLogger(MsgReceiver.class);

    @StreamListener(Sink.INPUT)
    public void receive(Object payload) {
        LOGGER.info(" MsgReceiver Received:" + payload.toString());
    }
}

@EnableBinding注解用来指定一个或多个定义了@Input或@Output注解的接口,以此实现对消息通道(Channel)的绑定。通过@EnableBinding(Sink.class)绑定了Sink接口,该接口是Spring Cloud Stream中默认实现的对输入消息通道绑定的定义。Sink接口的源码:
在这里插入图片描述

除了Sink接口外,Spring Cloud Stream还默认实现了绑定output通道的Source接口,Source接口的源码如下:
在这里插入图片描述
还有提供了结合Sink和Source的Processor接口。
在这里插入图片描述

下面启动项目,然后在RabbitMQ 后台管理页面会创建一个匿名的队列,并且应用通过RabbitMessageChannelBinder将MsgReceiver绑定为该队列的消费者。下面尝试在RabbitMQ 后台管理页面去发送一条消息。

在这里插入图片描述
在这里插入图片描述
点击Publish message按钮,查看后台输出可以看到消息能够正常接收到,并且被成功消费。

5. 自定义消息通道

我们可以通过 @Input注解和@Output注解来定义绑定消息通道的接口。如果使用了@Input注解和@Output注解没有指定具体的value值的话,将默认使用方法名作为消息通道的名称。首先创建一个名为MyChannel的接口,定义通道channel:

public interface MyChannel {
    String HELLO_INPUT = "hello-input";
    String HELLO_OUTPUT = "hello-output";

    @Output(HELLO_OUTPUT)
    MessageChannel output();

    @Input(HELLO_INPUT)
    SubscribableChannel input();
}
  1. 注意,两个消息通道的名字是不一样的
  2. 从 F 版开始,默认使用通道的名称作为实例命令,所以这里的通道名字不可以相同(早期版本可
    以相同),这样的话,为了能够正常收发消息,需要我们在 application.properties 中做一些额外
    配置。
#消息绑定
spring.cloud.stream.bindings.hello-input.destination=hello-topic
spring.cloud.stream.bindings.hello-output.destination=hello-topic

接下来,自定义一个消息接收器,用来接收自己的消息通道里的消息:

@EnableBinding(MyChannel.class)
public class MsgReceiver2 {
    public final static Logger LOGGER = LoggerFactory.getLogger(MsgReceiver2.class);

    @StreamListener(MyChannel.HELLO_INPUT)
    public void receive(Object payload) {
        LOGGER.info("MsgReceiver receive2:" + payload);
    }
}

创建一个接口用于测试发送消息

@RestController
public class HelloController {

    @Autowired
    MyChannel myChannel;

    @GetMapping("/hello")
    public void hello(){
        myChannel.output().send(MessageBuilder.withPayload("hello spring cloud stream!").build());
    }
}

6. 消息分组-处理消息重复消费

在微服务架构下,通常集群部署微服务以实现服务的高可用和负载均衡。但是默认情况下使用Spring Cloud Stream,消费者程序默认分配一个匿名且独立的单成员消费者组,如果消费者是一个集群,因为消费者属于不同的组不存在竞争关系,一条消息会被多次消费,所有部署的应用都会进行消费。举例如图所示:
在这里插入图片描述

我们可以通过消息分组解决这个问题。每个消费者绑定都可以使用spring.cloud.stream.bindings.<bindingName>.group属性指定组名。在Spring Boot 项目中添加如下配置:

#消息分组
spring.cloud.stream.bindings.hello-input.group=g1
spring.cloud.stream.bindings.hello-output.group=g1
#指定通道对应的的主题(对应RabbitMQ的交换机)
spring.cloud.stream.bindings.hello-input.destination=hello-exchange
spring.cloud.stream.bindings.hello-output.destination=hello-exchange

通过以上配置那么在同一个组中的成员只会有一个成员真正收到消息并进行处理。

7. 消息分区

7.1 概念

Spring Cloud Stream 支持在给定应用程序的多个实例之间对数据进行分区。通过消息分区可以实现相同特征的消息总是被同一个消费实例处理。即一个或多个生产者应用程序实例将消息数据发送到多个消费者应用程序实例,确保有共同特征标识的消息数据由同一消费者实例接收和处理

在这里插入图片描述
消息分区的应用场景比如在一些监控服务上,为了统计某段时间内消息生产者发送的报告内容,监控服务需要在自身聚合这些数据,那么消息生产者可以为消息增加一个固定的特征ID来进行分区,使得拥有这些ID的消息每次都能被发送到一个特定的实例上实现累计统计的效果,否则这些数据就会分散到各个不同的节点导致监控结果不一致的情况。

Spring Cloud Stream为分区提供了通用的抽象实现,用来在消息中间价的上层实现消息分区处理,为不具备分区功能的消息中间件增加了分区功能拓展。

7.2 使用示例

添加配置示例:

#开启消息分区(消费者上配置)
spring.cloud.stream.bindings.hello-input.consumer.partitioned=true
#指定消费者实例个数(消费者上配置)
spring.cloud.stream.instance-count=2
# 当前实例的索引号(消费者上配置),从0开始,最大值为instance-count的值-1
spring.cloud.stream.instance-index=0
# 指定分区键的表达式规则,我们可以根据实际的输出消息规则配置SpEL来生成合适的分区键(生产者上配置)
spring.cloud.stream.bindings.hello-output.producer.partition-key-expression=1
# 指定消息分区的数量(生产者上配置)
spring.cloud.stream.bindings.hello-output.producer.partition-count=2

接下来使用Maven打包项目为jar包

在这里插入图片描述

在控制台启动两个实例,注意启动时,spring.cloud.stream.instance-index 要动态修改。
在这里插入图片描述

java -jar stream-0.0.1-SNAPSHOT.jar --server.port=8080 --
spring.cloud.stream.instance-index=0
java -jar stream-0.0.1-SNAPSHOT.jar --server.port=8081 --
spring.cloud.stream.instance-index=1

调用接口/hello测试,可以看到多次发送同一个消息,消息只被一个消费者处理。

8. 延时消息

RabbitMQ实现发送延时消息需要安装插件rabbitmq_delayed_message_exchange,下载地址:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/v3.8.0/rabbitmq_delayed_message_exchange-3.8.0.ez

8.1 安装插件

以Docker方式安装为例:
下载插件

wget https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/v3.8.0/rabbitmq_delayed_message_exchange-3.8.0.ez

将文件拷贝到Docker容器中

docker cp rabbitmq_delayed_message_exchange-3.8.0.ez 900822f303cd:/opt/rabbitmq/plugins

进入RabbitMQ容器

docker exec -it 900822f303cd /bin/sh

启用插件

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

查看插件是否启动成功

rabbitmq-plugins list

重新启动RabbitMQ容器

8.2 具体实现

1.配置文件配置
在配置文件中配置开启通道的消息延迟功能

##开启消息延迟功能
spring.cloud.stream.rabbit.bindings.hello-input.consumer.delayed-exchange=true
spring.cloud.stream.rabbit.bindings.hello-output.producer.delayed-exchange=true

修改一下消息输入输出通道的destination定义:

spring.cloud.stream.bindings.hello-input.destination=delay_msg
spring.cloud.stream.bindings.hello-output.destination=delay_msg

2.创建接口测试

@RestController
public class HelloController {

    public final static Logger LOGGER = LoggerFactory.getLogger(HelloController.class);


    @Autowired
    MyChannel myChannel;

    @GetMapping("/hello")
    public void hello(){
        myChannel.output().send(MessageBuilder.withPayload("hello spring cloud stream!").build());
    }


    @GetMapping("/delay-hello")
    public void delayHello(){
        LOGGER.info("send msg:" + new Date());
        myChannel.output().send(MessageBuilder.withPayload("hello spring cloud stream!").setHeader("x-delay", 5000).build());
    }
}

测试可以发现该消息延迟5秒才消费。

在这里插入图片描述

参考:
1.《Spring Cloud微服务实战》

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Spring Cloud框架学习-Spring Cloud Stream 的相关文章

随机推荐

  • .net Iframe 'X-Frame-Options' to 'SAMEORIGIN' 解决办法

    Refused to display http xxx cn in a frame because it set X Frame Options to sameorigin 证明此页面不能被Iframe 解决方法 1 在被Iframe的we
  • Open3D (C++) 点云投影到圆柱

    目录 一 算法原理 1 圆柱方程 2 投影原理 二 代码实现 三 结果展示 1 原始点云 2 投影结果 四 参考链接 一 算法原理 1 圆柱方程 圆柱方程可以表示为 x x
  • CSS的相对定位,绝对定位,固定定位,及元素重叠(层级问题)

    CSS的相对定位 绝对定位 固定定位 及元素重叠 层级问题 1 项目问题及思考 1 1 问题一 关于定位问题 最近在做vue项目时 遇到这样一个需求 需要在Echarts的柱状图左侧的title的text文字内容加一个鼠标悬浮的文字提示 刚
  • Django中的Q查询

    class Poll models Model slug models SlugField unique for month pub date question models CharField maxlength 255 pub date
  • 小程序线上请求不到后台数据,开发工具上编译正常

    开发工具上真机调试请求 status failed 已配置合法域名 ssl证书未到期 https访问正常 使用此网站检测到缺少中间证书 解决方法 1 使用上面网站的证书分析 复制证书内容生成并下载中间证书文件 2 打开下载的中间证书复制到原
  • 宝塔面板搭建WordPress网站完整教程

    概述 如果还有不了解宝塔面板怎么使用的小伙伴 可以看下我总结的系列教程 保证从新手变老鸟 宝塔面板精选教程汇总 宝塔面板教程 1 基于云服务器搭建宝塔面板教程最全详解 宝塔面板教程 2 宝塔面板添加WordPress站点详细图文教程 宝塔面
  • 深度剖析Vmware 裸机映射,看完秒懂!

    不使用裸机映射时 虚拟机上的虚拟磁盘需要制作文件系统 在书记写入的过程中数据需要先使用虚拟磁盘上的文件系统进行写入 写入完成后 虚拟磁盘文件 vmkd仍然需要使用ESXI主机存储的文件系统 VMFS 在写入真实存储中 这样就导致需要经过两种
  • Save Money for Your Company 最小矩形覆盖(非计算几何)/找出N条直线相交点的边缘点/ find the dominating points of N lines

    Save Money for Your Company Description You are the director of a mining enterprise aiming to purchase an mining land fo
  • 【人工智能】人工智能与人类智能的关系

    1 基本概念界定 1 1人工智能 人工智能是在20世纪中期以后产生的学科 人工智能就是用机器模拟人类的智能活动 从而用机器代替人类行使某些方面的职能 人工智能是通过探索人的感觉和思维的规律来模拟人的智能活动 电子计算机是人工智能的媒介和基础
  • 我是双非一本软件工程毕业,想做技术文档工程师

    收到一位来自知乎网友 姜莱的咨询 征得本人同意 在此做一个公开答复吧 姜莱你好 非常感谢你的信任 在择业的关键路口 愿意听我这个陌生人聊聊想法 作为一只 误入歧途 在技术传播行业摸爬滚打了10 年的TC狗 简单分享一下个人经验 仅供参考 希
  • 1740 找到二叉树中的距离

    题目描述 给定一棵二叉树的根节点 root 以及两个整数 p 和 q 返回该二叉树中值为 p 的结点与值为 q 的结点间的 距离 两个结点间的 距离 就是从一个结点到另一个结点的路径上边的数目 示例 1 输入 root 3 5 1 6 2
  • 飞桨学习笔记之图像分割套件PaddleSeg

    1 概述 2 基本原理 2 1 DeepLabv3 2 2 U Net 2 3 PSPNet 2 4 ICNet 2 5 HRNet 2 6 Fast SCNN 2 7 模型选择 2 8 模型评估 3 具体实现过程 3 1 准备数据集 3
  • 使用指纹的锁屏解锁流程

    startuml gt BiometricUnlockController onBiometricAuthenticated BiometricUnlockController gt BiometricUnlockController st
  • STM32在线升级OTA,看这一篇就够啦~

    本文是博主在学习OTA时 up主阿正推荐学习的文章 原作者leafguo 写的非常简洁明了 在获得授权后整理发布 可以在文末点击阅读原文跳转到原文章 简介 本文主要讲解在线升级 OTA 的基础知识 主要是针对IAP OTA从原理分析 分区划
  • DVWA靶机,通过XSS盗取cookie登录

    文章目录 一 发现XSS漏洞 1 1 登录DVWA 找到XSS DOM 模块 测试XSS是否存在 1 2 在选项的传参后面加入一串js代码 也就是可以XSS弹出的代码 payload如下 二 盗取cookie 2 1 在XSS平台上搭建一个
  • Linux 学习视频完整

    链接 https pan baidu com s 1O6zsaYo7kl28QTpOnr9wCA 提取码 e7u4
  • docker部署harbor

    一 harbor下载 官方下载地址 Releases goharbor harbor GitHub 二 harbor安装 1 解压安装包 tar xzvf harbor XXXX tgz 2 配置修改 复制harbor yml tmpl文件
  • quagga源码学习--BGP协议中的routemap

    路由策略的基础知识 定义 路由策略 Routing Policy 作用于路由 主要实现了路由过滤和路由属性设置等功能 它通过改变路由属性 包括可达性 来改变网络流量所经过的路径 目的 路由器在发布 接收和引入路由信息时 根据实际组网需要实施
  • uniapp 多选框的全选功能实现

    uniapp内置的checkbox其实以及checkbox group本来挺好的 但是有两个问题 无法依赖其事件实现全选 样式固定 难以修改 他们无法实现全选的原因是 我动态修改checkbox的checked字段时 界面上的状态能够实时变
  • Spring Cloud框架学习-Spring Cloud Stream

    文章目录 1 基本介绍 2 设计思想 3 常用注解 4 简单入门 5 自定义消息通道 6 消息分组 处理消息重复消费 7 消息分区 7 1 概念 7 2 使用示例 8 延时消息 8 1 安装插件 8 2 具体实现 1 基本介绍 Spring