聊聊rocketmq的KVConfigManager

2023-11-07

本文主要研究一下rocketmq的KVConfigManager

KVConfigManager

org/apache/rocketmq/namesrv/kvconfig/KVConfigManager.java

public class KVConfigManager {
    private static final Logger log = LoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);

    private final NamesrvController namesrvController;

    private final ReadWriteLock lock = new ReentrantReadWriteLock();
    private final HashMap<String/* Namespace */, HashMap<String/* Key */, String/* Value */>> configTable =
        new HashMap<String, HashMap<String, String>>();

    public KVConfigManager(NamesrvController namesrvController) {
        this.namesrvController = namesrvController;
    }

    public void load() {
        String content = null;
        try {
            content = MixAll.file2String(this.namesrvController.getNamesrvConfig().getKvConfigPath());
        } catch (IOException e) {
            log.warn("Load KV config table exception", e);
        }
        if (content != null) {
            KVConfigSerializeWrapper kvConfigSerializeWrapper =
                KVConfigSerializeWrapper.fromJson(content, KVConfigSerializeWrapper.class);
            if (null != kvConfigSerializeWrapper) {
                this.configTable.putAll(kvConfigSerializeWrapper.getConfigTable());
                log.info("load KV config table OK");
            }
        }
    }

    public void putKVConfig(final String namespace, final String key, final String value) {
        try {
            this.lock.writeLock().lockInterruptibly();
            try {
                HashMap<String, String> kvTable = this.configTable.get(namespace);
                if (null == kvTable) {
                    kvTable = new HashMap<String, String>();
                    this.configTable.put(namespace, kvTable);
                    log.info("putKVConfig create new Namespace {}", namespace);
                }

                final String prev = kvTable.put(key, value);
                if (null != prev) {
                    log.info("putKVConfig update config item, Namespace: {} Key: {} Value: {}",
                        namespace, key, value);
                } else {
                    log.info("putKVConfig create new config item, Namespace: {} Key: {} Value: {}",
                        namespace, key, value);
                }
            } finally {
                this.lock.writeLock().unlock();
            }
        } catch (InterruptedException e) {
            log.error("putKVConfig InterruptedException", e);
        }

        this.persist();
    }

    public void persist() {
        try {
            this.lock.readLock().lockInterruptibly();
            try {
                KVConfigSerializeWrapper kvConfigSerializeWrapper = new KVConfigSerializeWrapper();
                kvConfigSerializeWrapper.setConfigTable(this.configTable);

                String content = kvConfigSerializeWrapper.toJson();

                if (null != content) {
                    MixAll.string2File(content, this.namesrvController.getNamesrvConfig().getKvConfigPath());
                }
            } catch (IOException e) {
                log.error("persist kvconfig Exception, "
                    + this.namesrvController.getNamesrvConfig().getKvConfigPath(), e);
            } finally {
                this.lock.readLock().unlock();
            }
        } catch (InterruptedException e) {
            log.error("persist InterruptedException", e);
        }

    }

    public void deleteKVConfig(final String namespace, final String key) {
        try {
            this.lock.writeLock().lockInterruptibly();
            try {
                HashMap<String, String> kvTable = this.configTable.get(namespace);
                if (null != kvTable) {
                    String value = kvTable.remove(key);
                    log.info("deleteKVConfig delete a config item, Namespace: {} Key: {} Value: {}",
                        namespace, key, value);
                }
            } finally {
                this.lock.writeLock().unlock();
            }
        } catch (InterruptedException e) {
            log.error("deleteKVConfig InterruptedException", e);
        }

        this.persist();
    }

    public byte[] getKVListByNamespace(final String namespace) {
        try {
            this.lock.readLock().lockInterruptibly();
            try {
                HashMap<String, String> kvTable = this.configTable.get(namespace);
                if (null != kvTable) {
                    KVTable table = new KVTable();
                    table.setTable(kvTable);
                    return table.encode();
                }
            } finally {
                this.lock.readLock().unlock();
            }
        } catch (InterruptedException e) {
            log.error("getKVListByNamespace InterruptedException", e);
        }

        return null;
    }

    public String getKVConfig(final String namespace, final String key) {
        try {
            this.lock.readLock().lockInterruptibly();
            try {
                HashMap<String, String> kvTable = this.configTable.get(namespace);
                if (null != kvTable) {
                    return kvTable.get(key);
                }
            } finally {
                this.lock.readLock().unlock();
            }
        } catch (InterruptedException e) {
            log.error("getKVConfig InterruptedException", e);
        }

        return null;
    }

    public void printAllPeriodically() {
        try {
            this.lock.readLock().lockInterruptibly();
            try {
                log.info("--------------------------------------------------------");

                {
                    log.info("configTable SIZE: {}", this.configTable.size());
                    Iterator<Entry<String, HashMap<String, String>>> it =
                        this.configTable.entrySet().iterator();
                    while (it.hasNext()) {
                        Entry<String, HashMap<String, String>> next = it.next();
                        Iterator<Entry<String, String>> itSub = next.getValue().entrySet().iterator();
                        while (itSub.hasNext()) {
                            Entry<String, String> nextSub = itSub.next();
                            log.info("configTable NS: {} Key: {} Value: {}", next.getKey(), nextSub.getKey(),
                                nextSub.getValue());
                        }
                    }
                }
            } finally {
                this.lock.readLock().unlock();
            }
        } catch (InterruptedException e) {
            log.error("printAllPeriodically InterruptedException", e);
        }
    }
}
复制代码
  • 这里使用HashMap,然后通过ReentrantReadWriteLock进行并发控制,map的key是namespace,而value是一个HashMap
  • putKVConfig及deleteKVConfig使用的是写锁
  • persist、getKVListByNamespace、getKVConfig、printAllPeriodically使用的是读锁

MixAll.string2File

org/apache/rocketmq/common/MixAll.java

    public static void string2File(final String str, final String fileName) throws IOException {

        String tmpFile = fileName + ".tmp";
        string2FileNotSafe(str, tmpFile);

        String bakFile = fileName + ".bak";
        String prevContent = file2String(fileName);
        if (prevContent != null) {
            string2FileNotSafe(prevContent, bakFile);
        }

        File file = new File(fileName);
        file.delete();

        file = new File(tmpFile);
        file.renameTo(new File(fileName));
    }

    public static void string2FileNotSafe(final String str, final String fileName) throws IOException {
        File file = new File(fileName);
        File fileParent = file.getParentFile();
        if (fileParent != null) {
            fileParent.mkdirs();
        }
        FileWriter fileWriter = null;

        try {
            fileWriter = new FileWriter(file);
            fileWriter.write(str);
        } catch (IOException e) {
            throw e;
        } finally {
            if (fileWriter != null) {
                fileWriter.close();
            }
        }
    }
复制代码
  • 将文本内容写到指定路径的文件
  • 这里先写到.tmp文件,然后备份上一个版本的内容,在删除上一个版本的文件,最后将tmp文件重命名为正式的文件名

RemotingSerializable

org/apache/rocketmq/remoting/protocol/RemotingSerializable.java

public abstract class RemotingSerializable {
    private final static Charset CHARSET_UTF8 = Charset.forName("UTF-8");

    public static byte[] encode(final Object obj) {
        final String json = toJson(obj, false);
        if (json != null) {
            return json.getBytes(CHARSET_UTF8);
        }
        return null;
    }

    public static String toJson(final Object obj, boolean prettyFormat) {
        return JSON.toJSONString(obj, prettyFormat);
    }

    public static <T> T decode(final byte[] data, Class<T> classOfT) {
        final String json = new String(data, CHARSET_UTF8);
        return fromJson(json, classOfT);
    }

    public static <T> T fromJson(String json, Class<T> classOfT) {
        return JSON.parseObject(json, classOfT);
    }

    public byte[] encode() {
        final String json = this.toJson();
        if (json != null) {
            return json.getBytes(CHARSET_UTF8);
        }
        return null;
    }

    public String toJson() {
        return toJson(false);
    }

    public String toJson(final boolean prettyFormat) {
        return toJson(this, prettyFormat);
    }
}
复制代码
  • 这里toJson使用的是fastjson的方法

小结

  • rocketmq的KVConfigManager采用的是HashMap来存配置项,key为namespace,value为HashMap,存储的值采用的是String
  • 采用ReentrantReadWriteLock进行并发控制,支持序列JSON到磁盘,也支持从磁盘文件加载到内存

doc

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

聊聊rocketmq的KVConfigManager 的相关文章

  • Grails 项目 - Servlet 调用 - ClassNotFoundException:javax.servlet.AsyncContext

    我在用 IntelliJ IDEA 终极版 12 4 grails 2 2 0 BuildConfig groovy 文件中的 grails servlet version 2 5 并实现了简单的 servlet post 请求 使用 RE
  • Glassfish 4 - JDBC 领域

    Glassfish 4 中的密码加密算法和摘要算法有什么区别 因为Password加密算法不能为空 所以我使用了MD5 Encoding使用了Hex 摘要算法为空 因此默认为 SHA 256 但是 如果我使用 JAAS 制作一个简单的登录应
  • 如何将日期字符串解析为Date? [复制]

    这个问题在这里已经有答案了 如何将下面的日期字符串解析为Date object String target Thu Sep 28 20 29 30 JST 2000 DateFormat df new SimpleDateFormat E
  • NIO 直接缓冲区何时以及如何被释放?

    我有一个 C 库 需要一个临时缓冲区作为暂存空间 我正在考虑将直接字节缓冲区的地址传递给它 在最终释放缓冲区之前 是否允许虚拟机重新定位缓冲区 JNI 框架消失后 本机库将保留该指针 我的理解是 JNI 本地对象引用无法缓存 因为 VM 可
  • Eclipse 无法识别 persistence.xml 的内容

    我在 eclipse 中收到以下错误 persistence xml 文件没有可识别的内容 我的 persistence xml 文件在我的应用程序中工作得很好 但 eclipse 一直给我这个错误 我在移动文件并使用 m2eclipse
  • Java 客户端到服务器未知来源

    我有一个简单的乒乓球游戏 需要通过网络工作 服务器将创建一个带有球和 2 个球棒位置的游戏 当客户端连接到服务器时 服务器将创建一个名为 PongPlayerThread 的新类 它将处理客户端到服务器的输入和输出流 我的服务器工作100
  • Eclipse RCP - 将视图与编辑器区域堆叠?

    在开发 Eclipse RCP 应用程序时 是否可以将视图与编辑器区域堆叠在一起 像这样 我有多个列表 表格 我想创建一种预览组合 当通过单击鼠标选择列表上的项目时 我希望我的预览合成显示该项目的数据 如果用户双击某个项目 我想在预览合成后
  • 我可以使用 Selenium Webdriver 测试元素的顺序吗?

    有一个表单 其中有 3 个字段 具有 3 个不同的 ID fieldset div div fieldset
  • ThreadPoolExecutor 和队列

    我以为使用线程池执行器 http docs oracle com javase 6 docs api java util concurrent ThreadPoolExecutor html我们可以提交Runnables 要在以下位置执行B
  • 为什么 JSON.stringify 对于似乎具有属性的对象返回空对象符号“{}”?

    下面的例子表明JSON stringify 返回字符串 对于 SpeechSynthesisVoice 对象 var voiceObject window speechSynthesis getVoices 0 JSON stringify
  • 在 java 8 下使用泛型出现类型错误,但在 java 7 下则不然

    我有一段代码可以在 java 7 下编译良好 但不能在 java 8 下编译 这是一个独立的重现示例 我已经采用了显示此问题的真实代码并删除了所有实现 import java util Iterator class ASTNode
  • Java 声音可视化器

    我正在尝试制作一个java声音可视化工具 但我完全不知道如何在实时处理音频后立即从提取的音频中获取字节 我可以将程序与 wav 文件同步 但这不是我想要做的 我想用程序生成声音 然后播放它 而不将其保存在任何地方 谢谢您的帮助 本文可以帮助
  • SQlite 获取最近的位置(带有纬度和经度)

    我的 SQLite 数据库中存储有纬度和经度的数据 我想获取距我输入的参数最近的位置 例如我当前的位置 纬度 经度等 我知道这在 MySQL 中是可能的 并且我已经做了相当多的研究 SQLite 需要一个自定义外部函数来实现半正弦公式 计算
  • 带等待/通知的同步块与不带等待/通知的同步块之间的区别?

    如果我只是使用synchronized 不是wait notify方法 它仍然是线程安全的吗 有什么不同 Using synchronized使方法 块一次只能由一个线程访问 所以 是的 它是线程安全的 这两个概念是结合在一起的 而不是相互
  • 如何在 Flask 中获取 POSTed JSON?

    我正在尝试使用 Flask 构建一个简单的 API 现在我想在其中读取一些 POSTed JSON 我使用 Postman Chrome 扩展进行 POST 我 POST 的 JSON 很简单 text lalala 我尝试使用以下方法读取
  • 对于双核手机,availableProcessors() 返回 1

    我最近购买了一部 Moto Atrix 2 手机 当我尝试查看手机中的处理器规格时 Runtime getRuntime availableProcessors 返回 1 proc cpuinfo 也仅包含有关处理器 0 的信息 出于好奇
  • 为什么 JSON 结果可以是布尔值而不是对象或数组?

    From JSON 网站 http json org JSON 建立在两种结构之上 名称 值对的集合 在各种语言中 这被实现为对象 记录 结构 字典 哈希表 键控列表或关联数组 值的有序列表 在大多数语言中 这被实现为数组 向量 列表或序列
  • Volley 在第一次调用方法时返回 null

    我正在尝试使用 volley 从服务器检索数据 但是当我第一次调用此方法时 我收到服务器的响应 但该方法返回 null 如果我第二次调用它 我会得到最后的响应 public String retrieveDataFromServer Str
  • 将 SQL 数据中的一行映射到 Java 对象

    我有一个 Java 类 其实例字段 以及匹配的 setter 方法 与 SQL 数据库表的列名相匹配 我想优雅地从表中获取一行 到 ResultSet 中 并将其映射到此类的实例 例如 我有一个 Student 类 其中包含实例字段 FNA
  • removeall 和removeif 的用例

    我找到了这个 fun main val list MutableList

随机推荐

  • 通过浏览器检测硬件 —— 筑梦之路

    在线硬件检测工具 测试网址1 主要检测显卡显示效果 volumeshader bm 测试网址2 可以检测cpu GPU 屏幕等精大师在线显卡测试 首页 网页版GPU性能测试工具
  • h5跳转到 苹果 ios app store 应用商店 的APP详情页面

    在开发 h5跳转到 ios系统 app store的时候遇到两个问题 原理 判断是安卓还是苹果 如果为苹果显示苹果的标签 点击a标签 执行跳转唤起APP openAPP 加一个定时器 三秒 可根据需求调整 之后 如果没有唤起成功 跳转到Ap
  • Java项目:网上图书馆管理系统(java+jsp+servlert+mysql+ajax)

    源码获取 博客首页 资源 里下载 一 项目简述 功能 区分为管理员用户和普通用户 普通用户 用户登录 个 人信息修改 图书查询 用户借阅 用户归还 管理员用 户 图书馆里 归还管理 借阅信息查询 图书维护 分 类管理 读者管理等等功能 二
  • 服务器控制口协议,服务器管理ipmi接口协议的扩展方法 Extension Methods server management interface protocol ipmi...

    摘要 The invention provides an extension method for managing an IPMI Intelligent Platform Management Interface interface p
  • eclipse中配置Tomcat

    将Tomcat服务器整合到Eclipse工具中 可以通过Eclipse启动 关闭tomcat服务器 更重要的是 可以非常方便的将在Eclipse中创建的Web项目发布到Tomcat服务器中运行 文章目录 在这里插入图片描述 方式一 在win
  • ubuntu20.04安装和卸载gtsam

    安装boost Boost gt 1 43 Ubuntu sudo apt get install libboost all dev 安装cmake CMake gt 3 0 Ubuntu sudo apt get install cmak
  • 2023第十四届蓝桥杯c++ b组省赛真题

    1 冶炼金属 题目描述 小蓝有一个神奇的炉子用于将普通金属 O 冶炼成为一种特殊金属 X 这个炉子有一个称作转换率的属性 V V 是一个正整数 这意味着消耗 V 个普通金 属 O 恰好可以冶炼出一个特殊金属 X 当普通金属 O 的数目不足
  • 十进制与任意进制互转

    n进制转十进制 static int transfer char chars int n int result 0 int index chars length 1 while index gt 0 result chars index 0
  • Linux centos redhat 装NVIDIA显卡驱动

    Linux装显卡驱动 第一步 下载驱动 对应相同型号 下载即可 英伟达驱动下载地址https www nvidia cn Download index aspx lang cn 以 K620为例 如下图 2 下载完成后需要上传至服务器 以U
  • 企业引入人脸识别考勤 想要代打卡?没门!

    近年来 伴随着生物识别技术的进步 越来越多的生物识别技术应用进入市场 冲击各大行业 传统的考勤模式同样面临着来自新兴技术的挑战 以人脸识别技术为首 掀起一场考勤领域的变革 刷脸 考勤 想要代签不容易 传统的 最为人所熟知的考勤模式莫过于磁卡
  • CH11-多媒体应用开发

    目标 掌握MediaPlayer类与SoundPool类的使用 能够实现播放音频文件的功能 掌握VideoView类的使用 能够实现播放视频文件的功能 掌握MediaPlayer类与SurfaceView类的使用 能够实现播放视频文件的功能
  • 一次性解决tensorflow-gpu:library:cusolver64_10.dll/ cudart64_101.dll/cublas64_10.dll not found等

    错误 Could not load dynamic library cudart64 101 dll Could not load dynamic library cublas64 10 dll Could not load dynamic
  • 高速入门知识02:降低串扰和维持信号完整性的布线方法

    文章目录 前言 一 单端走线布线 1 1 带有短截线的菊花链布线 1 2 没有短截线的菊花链布线 1 3 星型布线 1 4 蛇型布线 二 差分走线布线 前言 串扰是并行走线间不需要的信号耦合 微带线和带状线正确的布线和叠层布局能够降低串扰
  • 剑指 Offer 55 - II. 平衡二叉树-- 心得和思路

    Definition for a binary tree node public class TreeNode int val TreeNode left TreeNode right TreeNode int x val x class
  • 0基础学习大数据之大数据技术发展趋势如何

    大数据如今飞速发展 已经逐步开始影响到我们生活的各个角落 同时涌现出了大量新的技术 它们成为大数据采集 存储 处理和呈现的有力武器 那么大数据技术的发展趋势会是怎样的呢 2014年以后 整体大数据的技术栈已经趋于稳定 由于云计算 人工智能等
  • ufs 固态硬盘_UFS究竟是什么?对于手机提升大不大,一文带你了解

    相信很多小伙伴在2020年挑选5G新手机的时候都会看到 UFS 3 0 那这个名称与USB很类似的配置究竟是什么呢 后面的数字是越大越好 还是越小越好 现在就来科普科普 首先UFS闪存的全称Universal Flash Storage 这
  • 朴素贝叶斯(基于sklearn的实现)

    由于自己太懒 不想看太长的代码 所以我就调用的sklearn库的朴素贝叶斯类 数据集选择的是这个博客中自己构造的数据集 该博客自己实现了朴素贝叶斯类 想自己编写一个朴素贝叶斯函数的读者可以参考这个博客 下面是用sklearn库实现朴素贝叶斯
  • verilog_串口实现

    verilog 串口实现 概述 先了解串口的基础知识 串口是怎样传数据的 什么是波特率 波特率怎么计算 说明 通过Verilog编写串口 通过逻辑分析仪与串口模块的对接来进一步了解串口的应用 文章目录 1 什么是波特率 波特率怎么计算 1
  • 2020秋招总结(持续更新)

    开立医疗提前批面试 1 项目讲解 没做准备 讲的不流畅 思路混乱 2 进程拥有的资源 进程地址空间 代码区 数据区 堆栈 内核部分 3 线程的理解 内核级线程 用户级线程 进程与线程的区别 4 线程间通信方式 与进程间通信方式混淆了 线程是
  • 聊聊rocketmq的KVConfigManager

    序 本文主要研究一下rocketmq的KVConfigManager KVConfigManager org apache rocketmq namesrv kvconfig KVConfigManager java public clas