(二十七)admin-boot项目之集成websocket实时推送消息

2023-11-12

(二十七)集成websocket实时推送消息

基础项目地址:

https://gitee.com/springzb/admin-boot

此次整合推送消息有两种方案:

1. 采用 rabbitmq 详细插件 rabbitmq_web_stomp

2.采用 websocket的方案(mica-mqtt-core底层采用 t-io 框架, mica-mqtt-core提供了 mqtt 和 websocket 两种通信协议)

方案一 适用于连接少,消息多的情况

方案二 适用于连接多,消息多的情况

rabbitmq方案具体实施查看《(二十五)集成rabbitmq消息推送》

引入依赖:

<!--整合 rabbitmq-->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

<!--mica-mqtt依赖-->
<dependency>
    <groupId>net.dreamlu</groupId>
    <artifactId>mica-mqtt-core</artifactId>
    <version>1.2.10</version>
</dependency>

统一配置:

package cn.mesmile.admin.modules.message;

import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;

/**
 * @author zb
 * @Description
 */
@Data
@ConfigurationProperties(prefix = "admin.message")
public class MessageProperties {

    /**
     * 是否启用消息发送
     */
    private Boolean enabled = Boolean.FALSE;

    /**
     * 发送消息底层调用逻辑
     */
    private MessageServiceTypeEnum serviceType = MessageServiceTypeEnum.RABBIT_MQ;

    /**
     * mqtt配置 服务端ip
     */
    private String mqttServerIp = "0.0.0.0";

    /**
     * mqtt服务端 端口号
     */
    private Integer mqttServerPort = 1883;

    /**
     * webSocket端口号
     */
    private Integer mqttServerWebsocketPort = 5883;

}
package cn.mesmile.admin.modules.message;

/**
 * @author zb
 * @Description 发送消息调用的底层服务
 */
public enum  MessageServiceTypeEnum {

    /**
     * rabbitMq
     */
    RABBIT_MQ,
    /**
     * rocketMq
     */
    ROCKET_MQ,
    /**
     * mqtt
     */
    MQTT
    
}

一、rabbitmq推送方案

package cn.mesmile.admin.modules.message.operational;

import cn.mesmile.admin.common.rabbit.constant.RabbitConstant;
import cn.mesmile.admin.modules.message.vo.MqMessageVO;
import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.util.List;
import java.util.Set;

/**
 * rabbitmq方式实现接口
 * 生产者-直连交换机模式
 * 此模式下路由键匹配,同一账号多处在线都可接收到消息,未登录账号无法接收推送,且再登录账号不会有推送,路由消息丢失,适用于web端
 *
 * @author zb
 * @Description
 */
@Slf4j
@Component
@ConditionalOnProperty(
  name = "admin.message.service-type",
  havingValue = "rabbit_mq"
)
public class RabbitMqSendServiceImpl implements ISendService{

  /**
     rabbitmq-plugins enable rabbitmq_web_stomp rabbitmq_web_stomp_examples
     直连交换机
  String MESSAGE_EXCHANGE_DIRECT = "message.direct.exchange";
   普通通知消息
   String MESSAGE_QUEUE_SINGLE_ROUTE_KEY = "message.single_";
   群发通知消息
  String MESSAGE_QUEUE_GROUP_ROUTE_KEY = "message.group_";
   */

    @Resource
    public RabbitTemplate rabbitTemplate;

  @Override
  public void sendSingleMessage(String receiveUserId, MqMessageVO mqMessageVO) {
    try {
      rabbitTemplate.convertAndSend(RabbitConstant.MESSAGE_EXCHANGE_DIRECT,
          RabbitConstant.MESSAGE_QUEUE_SINGLE_ROUTE_KEY + receiveUserId, JSONObject.toJSONString(mqMessageVO));
    }catch (Exception e){
      log.error("发送消息失败:",e);
    }
  }

  @Override
  public void sendGroupMessage(Set<String> userSet, MqMessageVO mqMessageVO) {
    try {
      userSet.forEach(
          userId-> rabbitTemplate.convertAndSend(RabbitConstant.MESSAGE_EXCHANGE_DIRECT,
          RabbitConstant.MESSAGE_QUEUE_SINGLE_ROUTE_KEY +userId, JSONObject.toJSONString(mqMessageVO))
      );
    }catch (Exception e){
      log.error("发送消息失败:",e);
    }
  }
}

采用rabbitmq 前端需要安装插件 stomp 插件,对接rabbitmq 消息

安装使用stomp插件请参考:https://blog.csdn.net/suprezheng/article/details/126613531 【插件】

二、websocket方案(mica-mqtt-core)

mica-mqtt-core

https://gitee.com/596392912/mica-mqtt

package cn.mesmile.admin.modules.message;

import lombok.extern.slf4j.Slf4j;
import net.dreamlu.iot.mqtt.codec.ByteBufferUtil;
import net.dreamlu.iot.mqtt.core.server.MqttServer;
import net.dreamlu.iot.mqtt.core.server.event.IMqttConnectStatusListener;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.tio.core.ChannelContext;

/**
 * @author zb
 * @Description
 */
@Slf4j
@Configuration(
        proxyBeanMethods = false
)
@EnableConfigurationProperties({MessageProperties.class})
@ConditionalOnProperty(
        value = {"admin.message.enabled"},
        havingValue = "true"
)
public class MessageConfiguration {

    /**
     *  客户端工具下载: https://mqttx.app/
     *  mqtt://127.0.0.1:1883
     *  ws://127.0.0.1:5883
     *
     *  TODO 切换为 springboot 方式
     * https://gitee.com/596392912/mica-mqtt/blob/master/starter/mica-mqtt-server-spring-boot-starter/README.md
     */
    @ConditionalOnProperty(
            value = {"admin.message.service-type"},
            havingValue = "mqtt"
    )
    @Bean
    public MqttServer get(MessageProperties messageProperties) {
        // 注意:为了能接受更多链接(降低内存),请添加 jvm 参数 -Xss129k
        MqttServer mqttServer = MqttServer.create()
                // 默认:0.0.0.0
                .ip(messageProperties.getMqttServerIp())
                // 默认:1883
                .port(messageProperties.getMqttServerPort())
                .webPort(messageProperties.getMqttServerWebsocketPort())
                // 默认为: 8092(mqtt 默认最大消息大小),为了降低内存可以减小小此参数,如果消息过大 t-io 会尝试解析多次
                .readBufferSize(512)
                // 最大包体长度,如果包体过大需要设置此参数,默认为: 8092
                .maxBytesInMessage(1024 * 10)
                // 自定义认证
                .authHandler((context, uniqueId, clientId, userName, password) -> true)
                // 消息监听
                .messageListener((context, clientId, message) -> {
                    log.info("clientId:{} message:{} payload:{}", clientId, message, ByteBufferUtil.toString(message.getPayload()));
                })
                .connectStatusListener(new IMqttConnectStatusListener() {
                    @Override
                    public void online(ChannelContext channelContext, String s) {
                        log.info("--------客户端【上线】----------{}, 客户端id:{}",channelContext, s);
                    }

                    @Override
                    public void offline(ChannelContext channelContext, String s) {
                        log.info("--------客户端【下线】----------{}, 客户端id:{}",channelContext, s);
                    }
                })
                .httpEnable(true)
                .debug() // 开启 debug 信息日志
                .start();
        return mqttServer;
    }


}

package cn.mesmile.admin.modules.message.operational;

import cn.mesmile.admin.modules.message.vo.MqMessageVO;
import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import net.dreamlu.iot.mqtt.core.server.MqttServer;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Set;

/**
 * @author zb
 * @Description
 */
@Slf4j
@ConditionalOnProperty(
    value = {"admin.message.service-type"},
    havingValue = "mqtt"
)
@Component
public class MqttSendServiceImpl implements  ISendService{

  @Resource
  private MqttServer mqttServer;

  @Override
  public void sendSingleMessage(String receiveUserId, MqMessageVO mqMessageVO) {
    boolean b = mqttServer.publishAll("/userId_" + receiveUserId,
        ByteBuffer.wrap(JSONObject.toJSONString(mqMessageVO).getBytes(StandardCharsets.UTF_8)));
  }

  @Override
  public void sendGroupMessage(Set<String> userSet, MqMessageVO mqMessageVO) {
    userSet.forEach(
        userId-> this.sendSingleMessage(userId, mqMessageVO)
    );
  }

}

三、mica-mqtt方案测试

  @Resource
    private ISendService sendService;

  /**
   *  客户端工具:https://mqttx.app/
   */
  @GetMapping("/message")
  public R message(){
      MqMessageVO mqMessageVO = new MqMessageVO();
      mqMessageVO.setReceiveUser("123");
      mqMessageVO.setContent("设置内容11111111111");
      sendService.sendSingleMessage("123",mqMessageVO);
      return R.data("test");
  }

客户端工具下载: https://mqttx.app/

mqtt://127.0.0.1:1883
ws://127.0.0.1:5883

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-QymU7gGV-1663506368083)(image/image_FMKWQ55TCS.png)]

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

(二十七)admin-boot项目之集成websocket实时推送消息 的相关文章

随机推荐