如何在项目中使用kafka?

2023-11-04

1.如何在项目中使用kafka?

1.1)因为kafka的使用依赖于zookeeper(https://mp.weixin.qq.com/s/geR3pDw_Yjhmu8KMsXQosg在kafka v2.8版本后将zookeeper也集成在了服务中在kafka v2.8版本后官网取消了kafka依赖zookeeper集群的机制,采用内置kraft的方式),配置zookeeper的信息,即需要在kafka/config/zookeeper.properties配置zookeeper服务运行期间产生的数据存放位置dataDir,

 1.2)在kafka/config/server.properties 中配置kafka服务运行期间产生的log文件的位置,(注意:Kafka 使用消息日志(Log)来保存数据)

1.3)运行zookeeper和kafka服务,

./bin/zookeeper-server-start.sh ./config/zookeeper.properties
./bin/kafka-server-start.sh ./config/server.properties

 1.4)在spring中引入kafka依赖

<dependency>
     <groupId>org.springframework.kafka</groupId>
     <artifactId>spring-kafka</artifactId>
</dependency>

spring中如何使用kafka?

发送消息:利用KafkaTemplate类->kafkaTemplate.send(topic, message);

消费消息:利用注解@KafkaListener(topics = {**},groupId = "")

1.5)应用:如何在项目中使用kafka实现发布系统通知功能?

用户进行关注,评论,回复等行为都是时刻在发生的.如果当某个用户进行关注/评论/回复(上游服务)后,系统立即对此请求进行处理,即向被关注/评论/回复的用户发送系统通知(下游服务)这一功能,当用户关注,评论,回复行为异常活跃时,就会拖垮服务器或者数据库.

利用kafka消息引擎的异步,解耦,流量削峰的特性,来实现项目中系统通知的功能.

1.5.1)将用户进行关注/评论/回复的行为抽象为事件Event对象

@Component
public class Event {
    String topic;//事件的主题(点赞/关注/回复)
    int userId;//事件的发起者
    int entityType;//被点赞/关注/回复的实体类型(项目中只有三种:用户,帖子,评论)
    int entityId;//被点赞/关注/回复的实体id
    int entityUserId;//被点赞/关注/回复的实体的作者(实体类型为用户时,entityId==entityUserId)
    Map<String, Object> data = new HashMap<>();//事件中其他额外需要装的数据

    /**
    省略get/set方法
    */
}

1.5.2)写生产者Producer类

@Component
public class EventProducer {
    @Autowired
    private KafkaTemplate kafkaTemplate;

    public void fireEvent(Event event){
        kafkaTemplate.send(event.getTopic(), JSONObject.toJSONString(event));
    }
}

1.5.3)写消费者Consumer 类

@Component
public class EventConsumer implements CommunityConst {


    @Autowired
    private MessageService messageService;


    @KafkaListener(topics = {KAFKA_TOPIC_COMMENT,KAFKA_TOPIC_LIKE,KAFKA_TOPIC_FOLLOW})
    public void handleCommentMessage(ConsumerRecord record){
//spring监听到以上某一个主题下有消息了,就会自动调用此方法,并将消息封装到ConsumerRecord对象中
        if(record ==null || record.value()!=null) {
            logger.error("event is null");
        }
        Event event = JSONObject.parseObject( record.value().toString(), Event.class);
        if(event == null){
            logger.error("event is not null, but it has a wrong form");
        }
        //produce message
        Message message = new Message();
        message.setFromId(SYSTEM_USER_ID);
        message.setToId(event.getEntityUserId());
        message.setConversationId(event.getTopic());
        message.setCreateTime(new Date());

        Map<String, Object> content = new HashMap<>();
        content.put("userId",event.getUserId());
        content.put("entityType",event.getEntityType());
        content.put("entityId",event.getEntityId());
        if(event.getData()!=null){
            for(Map.Entry<String, Object> entry: event.getData().entrySet()){
                content.put(entry.getKey(),entry.getValue());
            }
        }
        message.setContent(JSONObject.toJSONString(content));
        //add a system message
       messageService.sendMessage(message);//插入一条消息到数据库消息表中
      
    }

}

1.5.4)什么时刻触发消息的发送

以评论行为为例,


@Controller
@RequestMapping(path = "/comment")
public class CommentController implements CommunityConst {
    @Autowired
    private HostHolder hostHolder;
    @Autowired
    private CommentService commentService;
    @Autowired
    private EventProducer eventProducer;
    @Autowired
    private DiscussPostService discussPostService;
    @Autowired
    private RedisTemplate redisTemplate;

    @RequestMapping(path = "/add/{PostId}",method = RequestMethod.POST)
    public String addComments(@PathVariable("PostId") int postId, Comment comment){
        //1.
        comment.setUserId(hostHolder.getUser().getId());
        comment.setCreateTime(new Date());
        comment.setStatus(0);
        //2.
        commentService.addComment(comment);
        //3.send system comment message
        Event event = new Event()
                .setTopic(KAFKA_TOPIC_COMMENT)
                .setUserId(hostHolder.getUser().getId())
                .setEntityType(comment.getEntityType())
                .setEntityId(comment.getEntityId())
                .setData("postId",postId);
        if(comment.getEntityType() == ENTITY_TYPE_POST){
            //the author of post
            event.setEntityUserId(discussPostService.findDiscussPostById(comment.getEntityId()).getUserId());
        }else if(comment.getEntityType() == ENTITY_TYPE_COMMENT){
            //the author of comment
            event.setEntityUserId(commentService.findCommentById(comment.getEntityId()).getUserId());
        }
        eventProducer.fireEvent(event);

  
}

其他两类业务代码类似.

1.5.4)展示某个用户的通知列表

@RequestMapping(path = "/notice/list",method = RequestMethod.GET)
    public String getNoticeList(Model model){
        User user = hostHolder.getUser();
        //comment
        Map<String,Object> map = new HashMap<>();
        Message leastNotice = messageService.findLeastNotice(KAFKA_TOPIC_COMMENT, user.getId());
        if(leastNotice!=null){
            map.put("leastNotice",leastNotice);
            HashMap content = JSONObject.parseObject(leastNotice.getContent(), HashMap.class);
            map.put("entityType",content.get("entityType"));
            map.put("entityId",content.get("entityId"));
            map.put("user",userService.findUserById((Integer) content.get("userId")));
            map.put("postId",content.get("postId"));

            int noticeCount = messageService.findNoticeCount(KAFKA_TOPIC_COMMENT, user.getId());
            int unReadNoticeCount = messageService.findUnReadNoticeCount(KAFKA_TOPIC_COMMENT, user.getId());
            map.put("noticeCount",noticeCount);
            map.put("unReadNoticeCount",unReadNoticeCount);
            model.addAttribute("comment",map);
        }


        //like
        map = new HashMap<>();
        leastNotice = messageService.findLeastNotice(KAFKA_TOPIC_LIKE, user.getId());
        if(leastNotice!=null){
            map.put("leastNotice",leastNotice);
            HashMap content = JSONObject.parseObject(leastNotice.getContent(), HashMap.class);
            map.put("entityType",content.get("entityType"));
            map.put("entityId",content.get("entityId"));
            map.put("user",userService.findUserById((Integer) content.get("userId")));
            map.put("postId",content.get("postId"));

            int noticeCount = messageService.findNoticeCount(KAFKA_TOPIC_LIKE, user.getId());
            int unReadNoticeCount = messageService.findUnReadNoticeCount(KAFKA_TOPIC_LIKE, user.getId());
            map.put("noticeCount",noticeCount);
            map.put("unReadNoticeCount",unReadNoticeCount);
            model.addAttribute("like",map);
        }


        //follow
        map = new HashMap<>();
        leastNotice = messageService.findLeastNotice(KAFKA_TOPIC_FOLLOW, user.getId());
        if(leastNotice!=null){
            map.put("leastNotice",leastNotice);
            HashMap content = JSONObject.parseObject(leastNotice.getContent(), HashMap.class);
            map.put("entityType",content.get("entityType"));
            map.put("entityId",content.get("entityId"));
            map.put("user",userService.findUserById((Integer) content.get("userId")));
            int noticeCount = messageService.findNoticeCount(KAFKA_TOPIC_FOLLOW, user.getId());
            int unReadNoticeCount = messageService.findUnReadNoticeCount(KAFKA_TOPIC_FOLLOW, user.getId());
            map.put("noticeCount",noticeCount);
            map.put("unReadNoticeCount",unReadNoticeCount);
            model.addAttribute("follow",map);
        }


        //
        model.addAttribute("unreadConversationCount",messageService.findUnreadLetterCount(user.getId(), null));
        model.addAttribute("unreadNoticeCount",messageService.findUnReadNoticeCount(null,user.getId()));

        return "/site/notice";
    }

1.5.5)某类通知的详情

.....

总结:到此就实现了在spring项目中用kafka作为消息引擎系统来实现系统通知的功能.

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如何在项目中使用kafka? 的相关文章

随机推荐

  • win7计算机窗口左边被改了,win7电脑开始菜单变成经典模式?三种方法教你改回来...

    今天小编一开电脑 觉得怪怪的 怎么感觉电脑桌面不太一样了 仔细一看 原来是系统开始菜单变成经典模式了 很不习惯 那怎么恢复呢 今天小编就以win7系统为例 教大家怎么改回来 方法一 1 在桌面空白处单击鼠标右键 选择个性化 2 鼠标左键单击
  • 【app逆向】Frida-rpc 的常用python脚本

    1 1 Frida rpc常用脚本 在执行frida rpc时 会涉及到先关参数类型的处理和转换 例如 python程序调用时 传入参数 frida的JavaScript脚本如何获取参数 JavaScript的参数如何转换到 Java中所需
  • Endnote参考文献分享与导入

    Endnote参考文献分享与导入 转移 文章发给老师如何将参考文献一同发送 软件版本为X8 一 分享 首先找到Endnote library的位置 将两个文件 一个是后缀为enl的数据库文件和另一个是 data文件夹 一起复制到一个文件夹
  • opencv 表格识别之表格透视矫正(二)

    上一篇文章中给出了一种对表格进行矫正的方法 但是只能用于只有一个表格的情况 对于有多个表格的情况的矫正的方法 将在这篇文章中给出 单个表格矫正 链接 一 函数的介绍 1 Homography 函数返回映射关系H 3 3的矩阵 CV EXPO
  • 微信公众h5页面如何在web端调试

    由于微信公众页面在手机上不好调试 所以可以选择使用微信开发者工具 登录微信公众号 开发者工具 绑定微信账号 下载工具 安装 微信扫码 工具内输入公众号网页地址
  • 等保2.0建设

    现在的等保2 0建设都是要完整
  • Springboot整合mybatis(注解-基础篇)最通俗易懂的文章

    注解开发mybatis 开发流程 1 引入MyBatis依赖 maven下的springboot添加如下坐标即可 POM xml
  • 微信小程序——云函数部署问题

    摸索了一个下午才明白云函数是什么 效率真的有点低 不过好在还是弄出来了 云开发 即无需搭建服务器 将云端当做服务端 只需进行前端开发 小程序 云开发提供了三个基础能力 数据库 存储和云函数 数据库 json数据库 就理解为往里面存的是jso
  • ApplicationContext 与 BeanFactory 区别(MS)

    1 从继承关系上来说 BeanFactory 是 的父类 BeanFactory 只是提供了基础操作Bean的方法 ApplicationContext除了拥有父类的基础操作之外 还提供了自己独有的功能 2 从性能方面上来说 Applica
  • 关于PHP发送邮箱验证码功能介绍

    关于PHP发送邮箱验证码功能介绍 PHP语言发送邮箱验证码 可以使用PHPMailer这个现成的类文件 完美集成实现邮箱发送验证码 前期准备 a PHPMailer下载地址 在git上获取最新版即可 https github com PHP
  • vue 前端内存问题 解决方案

    前端内存问题 JavaScript heap out of memory 解决 1 全局安装increase memory limit npm install g increase memory limit 2 进入工程目录 执行 incr
  • 动态规划经典题目:最大连续子序列和、最大不连续子序列和

    1 最大连续子序列和 记数组为nums 思路 记录dp i 为i位置结尾的最大连续子序列和 则有dp i dp i 1 gt 0 dp i 1 nums i nums i 然后求dp数组的max即为最终结果 1 最大不连续子序列和 记数组为
  • java数组程序_Java数组

    7 1数组 l 数组是多个相同类型数据的组合 实现对这些数据的统一管理 l 数组中的元素可以是任何数据类型 包括基本数据类型和引用数据类型 l 数组属引用类型 数组型数据是对象 object 数组中的每个元素相当于该对象的成员变量 7 2一
  • element-ui遮罩层

    通用下载方法内 通用下载方法 export function download url params filename method downloadLoadingInstance Loading service text 正在下载数据 请
  • 7-WebApis-6

    Web APIs 6 目标 能够利用正则表达式完成小兔鲜注册页面的表单验证 具备常见的表单验证能力 正则表达式 综合案例 阶段案例 正则表达式 正则表达式 Regular Expression是一种字符串匹配的模式 规则 使用场景 例如验证
  • 启动nacos报错:Exception in thread main java.lang.UnsupportedClassVersionError

    最近在做微服务项目的时候用到nacos 使用的版本分别是最新版nacos server 2 0 2和nacos server 1 4 2 但是在启动的时候报错了先看一下报错信息吧 报错信息 G nacos server 1 4 2 naco
  • 单相并网逆变器学习记录-------------SOGI-PLL锁相环

    目录 一 锁相环的简介 二 dq坐标系 三 SOGI生成 坐标系 四 SOGI PLL 五 SOGI PLL的仿真 一 锁相环的简介 进入21世纪 随着资源 环境问题的日益加剧 以太阳能 风能和热电联产等为代表的可再生 清洁能源纷纷通过逆变
  • JetBrains Account connection error: java.security.SignatureException: Signat

    用学生账户注册登录idea时 网上看到了很多解决方式 大部分都是修改hosts文件 即删除里面的 0 0 0 0 account jetbrains com 0 0 0 0 www jetbrains com 那么问题来了 我的 hosts
  • 被勒索病毒加密的文件如何破解?

    想要硬刚勒索病毒 脱密加密的文件 是很难的 之前 我已经介绍了数字签名 勒索病毒使用了公钥加密另一个常用应用 数字信封 技术 想要恢复勒索病毒加密的文件 可以破解黑客的公钥 或者破解黑客加密文件的临时对称密钥 而这2种算法 黑客都选用了目前
  • 如何在项目中使用kafka?

    1 如何在项目中使用kafka 1 1 因为kafka的使用依赖于zookeeper https mp weixin qq com s geR3pDw Yjhmu8KMsXQosg在kafka v2 8版本后将zookeeper也集成在了服