Dubbo 3.x源码(9)—Dubbo启动元数据中心源码

2023-11-01

基于Dubbo 3.1,详细介绍了Dubbo启动元数据中心源码。

Dubbo配置的加载与覆盖的一系列源码文章:

  1. Dubbo 3.x源码(7)—Dubbo配置的加载入口源码
  2. Dubbo 3.x源码(8)—Dubbo配置中心的加载与优先级源码
  3. Dubbo 3.x源码(9)—Dubbo启动元数据中心源码
  4. Dubbo 3.x源码(10)—Dubbo初始化模块配置源码

此前我们学习了Dubbo应用程序发布器DefaultApplicationDeployer#initialize方法部分源码,该方法完成启动配置中心按照优先级加载配置等工作,在最后会启动Dubbo元数据中心。

1 DefaultApplicationDeployer#startMetadataCenter启动元数据中心

关于Dubbo中的元数据中心,可以看看官方的介绍:https://dubbo.apache.org/zh/docs3-v2/java-sdk/reference-manual/metadata-center/
元数据中心为 Dubbo 中的两类元数据提供了存取能力:

  1. 地址发现元数据
    1. ‘接口-应用’ 映射关系
    2. 接口配置数据
  2. 服务运维元数据
    1. 接口定义描述数据
    2. 消费者订阅关系数据

Dubbo2采用接口级别的服务发现和服务注册,注册中心存储中接口-实例的关系,而Dubbo3采用应用级别的服务发现和服务注册,注册中心则是存储着应用-实例的关系。
**
但是Dubbo Consumer 中只声明了要消费的接口列表,如果采用Dubbo3部署,那么Consumer 需要能够将接口转换为 Provider 应用名才能进行精准服务订阅,因此,Dubbo提供了元数据中心,并在其中存储着接口-应用的对应关系。**
**
startMetadataCenter方法的大概步骤为:**

  1. 调用useRegistryAsMetadataCenterIfNecessary方法,出于兼容性的考虑,当没有显式指定元数据中心且registryConfig的useAsConfigCenter为null或true时,使用registry作为默认配置中心。
  2. 获取获取元数据中心MetadataReport实例的存储库实例MetadataReportInstance,通过其init方法初始化全部元数据配置中心。

metadataType:应用级服务发现 metadata 传递方式,是以 Provider 视角而言的,Consumer 侧配置无效,默认值local。可选值有:

  1. remote - Provider 把 metadata 放到远端注册中心,Consumer 从注册中心获取;
  2. local - Provider 把 metadata 放在本地,Consumer 从 Provider 处直接获取;
/**
 * DefaultApplicationDeployer的方法
 * <p>
 * 启动元数据中心
 */
private void startMetadataCenter() {
    /*
     * 1 出于兼容性的考虑,当没有显式指定元数据中心且registryConfig的useAsConfigCenter为null或true时,使用registry作为默认配置中心
     */
    useRegistryAsMetadataCenterIfNecessary();

    ApplicationConfig applicationConfig = getApplication();
    //元数据类型,local 或 remote,选择远程时,需要进一步指定元数据中心。
    String metadataType = applicationConfig.getMetadataType();
    // FIXME, multiple metadata config support.
    //获取元数据配置
    Collection<MetadataReportConfig> metadataReportConfigs = configManager.getMetadataConfigs();
    //如果没有元数据配置,并且元数据类型为remote,那么抛出异常,否则直接返回
    if (CollectionUtils.isEmpty(metadataReportConfigs)) {
        if (REMOTE_METADATA_STORAGE_TYPE.equals(metadataType)) {
            throw new IllegalStateException("No MetadataConfig found, Metadata Center address is required when 'metadata=remote' is enabled.");
        }
        return;
    }
    //获取元数据中心MetadataReport实例的存储库实例,该实例在MetadataScopeModelInitializer#initializeApplicationModel方法中注册
    //MetadataReport实例是在deploy.start()开始时初始化的,需要与元数据服务器交互的组件使用它。
    //如果需要声明多个MetadataReport和注册中心,建议将每个MetadataReport和注册中心组在一起,给它们相同的id,例如:
    //<dubbo:registry id=demo1 address="registry://"/>
    //<dubbo:metadata id=demo1 address="metadata://"/>
    //<dubbo:registry id=demo2 address="registry://"/>
    //<dubbo:metadata id=demo2 address="metadata://"/>
    MetadataReportInstance metadataReportInstance = applicationModel.getBeanFactory().getBean(MetadataReportInstance.class);
    //校验元数据配置
    List<MetadataReportConfig> validMetadataReportConfigs = new ArrayList<>(metadataReportConfigs.size());
    for (MetadataReportConfig metadataReportConfig : metadataReportConfigs) {
        ConfigValidationUtils.validateMetadataConfig(metadataReportConfig);
        validMetadataReportConfigs.add(metadataReportConfig);
    }
    /*
     * 2 初始化全部元数据配置中心
     */
    metadataReportInstance.init(validMetadataReportConfigs);
    //初始化失败则抛出异常
    if (!metadataReportInstance.inited()) {
        throw new IllegalStateException(String.format("%s MetadataConfigs found, but none of them is valid.", metadataReportConfigs.size()));
    }
}

2 useRegistryAsMetadataCenterIfNecessary元数据中心兼容

出于兼容性的考虑,当没有显式指定元数据中心且registryConfig的useAsMetadataCenter为null或true时,使用registry作为默认配置中心。该方法和useRegistryAsConfigCenterIfNecessary方法差不多。大概逻辑为:

  1. 如果配置管理器中有元数据中心的配置,直接返回。
  2. 调用getDefaultRegistries方法,获取默认注册中心配置,如果registryConfig的isDefault为true或null,则表示默认注册中心。遍历默认的注册中心:
  3. 调用isUsedRegistryAsMetadataCenter方法,判断该注册中心可以作为元数据中心,如果useAsMetadataCenter属性为true或者该注册中心协议有对于该中心的扩展类型的实现类,那么就可以作为配置中心。
  4. 调用registryAsMetadataCenter方法,注册中心转元数据中心,注册中心的配置属性作为元数据中心的配置属性。
  5. 将元数据中心的配置实例添加到configManager的configsCache缓存。
/**
 * DefaultApplicationDeployer的方法
 * <p>
 * 出于兼容性的考虑,当没有显式指定元数据中心且registryConfig的useAsMetadataCenter为null或true时,使用registry作为默认配置中心
 */
private void useRegistryAsMetadataCenterIfNecessary() {
    //如果配置管理器中有元数据中心的配置,直接返回
    Collection<MetadataReportConfig> metadataConfigs = configManager.getMetadataConfigs();

    if (CollectionUtils.isNotEmpty(metadataConfigs)) {
        return;
    }
    //获取默认注册中心配置,如果registryConfig的isDefault为null或true,则表示默认注册中心
    List<RegistryConfig> defaultRegistries = configManager.getDefaultRegistries();
    if (defaultRegistries.size() > 0) {
        defaultRegistries
            .stream()
            //如果该注册中心可以作为元数据中心,useAsMetadataCenter为null或true
            .filter(this::isUsedRegistryAsMetadataCenter)
            //注册中心的配置属性作为元数据中心的配置属性
            .map(this::registryAsMetadataCenter)
            .forEach(metadataReportConfig -> {
                if (metadataReportConfig.getId() == null) {
                    Collection<MetadataReportConfig> metadataReportConfigs = configManager.getMetadataConfigs();
                    if (CollectionUtils.isNotEmpty(metadataReportConfigs)) {
                        for (MetadataReportConfig existedConfig : metadataReportConfigs) {
                            if (existedConfig.getId() == null && existedConfig.getAddress().equals(metadataReportConfig.getAddress())) {
                                return;
                            }
                        }
                    }
                    //添加配置到configManager的configsCache缓存
                    configManager.addMetadataReport(metadataReportConfig);
                } else {
                    //根据此id或者name获取已存在的元数据配置,如果存在则直接返回,否则添加到缓存中
                    Optional<MetadataReportConfig> configOptional = configManager.getConfig(MetadataReportConfig.class, metadataReportConfig.getId());
                    if (configOptional.isPresent()) {
                        return;
                    }
                    //添加配置到configManager的configsCache缓存
                    configManager.addMetadataReport(metadataReportConfig);
                }
                logger.info("use registry as metadata-center: " + metadataReportConfig);
            });
    }
}

3 init初始化元数据中心

在获取到全部元数据中心配置之后,调用metadataReportInstance#init方法对他们进行初始化。其内部先基于Dubbo SPI机制获取MetadataReportFactory的适配器类的实现,即MetadataReportFactory$Adaptive,然后依次调用内部的init方法初始化全部的元数据中心。
内部的init方法的大概步骤为:

  1. 首先组装组装元数据中心远程url地址,包括各种参数,最终地址类似于:zookeeper://127.0.0.1:2181?application=demo-provider&client=&file.cache=null&port=2181&protocol=zookeeper&registry-type=service&registry.type=service&timeout=20001。
  2. 随后调用MetadataReportFactoryKaTeX parse error: Expected 'EOF', got '#' at position 9: Adaptive#̲getMetadataRepo…Adaptive可以看作是MetadataReportFactory的适配器类,其内部实际上是调用各种协议对应的真正的MetadataReportFactory实现的getMetadataReport方法来完成对于不用协议的解析。例如例如zookeeper协议,对应着ZookeeperMetadataReportFactory。
/**
 * MetadataReportInstance的方法
 * <p>
 * 初始化全部的元数据配置列表
 *
 * @param metadataReportConfigs 元数据配置列表
 */
public void init(List<MetadataReportConfig> metadataReportConfigs) {
    //CAS保证只初始化一次
    if (!init.compareAndSet(false, true)) {
        return;
    }
    //元数据类型,local 或 remote,选择远程时,需要进一步指定元数据中心。
    this.metadataType = applicationModel.getApplicationConfigManager().getApplicationOrElseThrow().getMetadataType();
    //如果为null,则默认local
    if (metadataType == null) {
        this.metadataType = DEFAULT_METADATA_STORAGE_TYPE;
    }
    //基于Dubbo SPI机制获取MetadataReportFactory的适配器类的实现,即MetadataReportFactory$Adaptive
    MetadataReportFactory metadataReportFactory = applicationModel.getExtensionLoader(MetadataReportFactory.class).getAdaptiveExtension();
    /*
     * 依次初始化全部的元数据中心
     */
    for (MetadataReportConfig metadataReportConfig : metadataReportConfigs) {
        init(metadataReportConfig, metadataReportFactory);
    }
}
    /**
     * MetadataReportInstance的方法
     * <p>
     * 初始化指定的元数据中心
     *
     * @param config                元数据中心配置
     * @param metadataReportFactory 元数据中心工厂
     */
    private void init(MetadataReportConfig config, MetadataReportFactory metadataReportFactory) {
        //组装元数据中心远程url地址
        URL url = config.toUrl();
        //metadata协议
        if (METADATA_REPORT_KEY.equals(url.getProtocol())) {
            String protocol = url.getParameter(METADATA_REPORT_KEY, DEFAULT_DIRECTORY);
            url = URLBuilder.from(url)
                .setProtocol(protocol)
                .setScopeModel(config.getScopeModel())
                .removeParameter(METADATA_REPORT_KEY)
                .build();
        }
        //如果存在application.name属性,那么设置到url的application属性
        url = url.addParameterIfAbsent(APPLICATION_KEY, applicationModel.getCurrentConfig().getName());
        //是否支持元数据本地缓存,默认为true,file.cache属性,但Dubbo3.1版本似乎没设置默认值,导致无法本地缓存
        url = url.addParameterIfAbsent(REGISTRY_LOCAL_FILE_CACHE_ENABLED, String.valueOf(applicationModel.getCurrentConfig().getEnableFileCache()));
        //id
        String relatedRegistryId = isEmpty(config.getRegistry()) ? (isEmpty(config.getId()) ? DEFAULT_KEY : config.getId()) : config.getRegistry();
//        RegistryConfig registryConfig = applicationModel.getConfigManager().getRegistry(relatedRegistryId)
//                .orElseThrow(() -> new IllegalStateException("Registry id " + relatedRegistryId + " does not exist."));
        //从元数据工厂中根据元数据配置,或者元数据中心实例MetadataReport
        //MetadataReportFactory$Adaptive内部实际上调用不同的协议对应的真正的元数据工厂类的getMetadataReport方法
        //例如zookeeper协议,对应着ZookeeperMetadataReportFactory
        MetadataReport metadataReport = metadataReportFactory.getMetadataReport(url);
        if (metadataReport != null) {
            //存入metadataReports缓存
            metadataReports.put(relatedRegistryId, metadataReport);
        }
    }

3.1 getMetadataReport获取元数据

MetadataReportFactory的getMetadataReport方法实际上由抽象父类AbstractMetadataReportFactory实现。大概步骤为:

  1. 添加path组装最终url,例如:zookeeper://127.0.0.1:2181/org.apache.dubbo.metadata.report.MetadataReport?application=demo-provider&client=&file.cache=null&port=2181&protocol=zookeeper&registry-type=service&registry.type=service&timeout=20001。
  2. 获取缓存key,元数据将会被缓存到本地AbstractMetadataReportFactory的serviceStoreMap中,下次获取时直接返回即可。缓存key例如:zookeeper://127.0.0.1:2181/org.apache.dubbo.metadata.report.MetadataReport。
  3. 加锁再次从缓存获取,没有则调用createMetadataReport方法创建MetadataReport。这是一个双重校验锁。
    1. createMetadataReport方法由各个元数据中心工厂实现类自己实现,例如zookeeper作为元数据中心时,他的元数据中心工厂为ZookeeperMetadataReportFactory,createMetadataReport方法将会创建并返回一个ZookeeperMetadataReport。
    2. 在ZookeeperMetadataReport构造器中,将会连接zookeeper服务端并持有一个zkClient,还会加载本地缓存文件,设置定时同步元数据等等。
  4. 将获取到的结果加入serviceStoreMap缓存。
/**
 * AbstractMetadataReportFactory的方法
 *
 * 获取元数据
 * @param url url
 */
@Override
public MetadataReport getMetadataReport(URL url) {
    //组装最终url,zookeeper://127.0.0.1:2181/org.apache.dubbo.metadata.report.MetadataReport?application=demo-provider&client=&file.cache=null&port=2181&protocol=zookeeper&registry-type=service&registry.type=service&timeout=20001
    url = url.setPath(MetadataReport.class.getName())
        .removeParameters(EXPORT_KEY, REFER_KEY);
    //缓存key,zookeeper://127.0.0.1:2181/org.apache.dubbo.metadata.report.MetadataReport
    String key = url.toServiceString();
    //尝试重缓存直接获取
    MetadataReport metadataReport = serviceStoreMap.get(key);
    if (metadataReport != null) {
        return metadataReport;
    }

    // Lock the metadata access process to ensure a single instance of the metadata instance
    //加锁再次获取
    lock.lock();
    try {
        metadataReport = serviceStoreMap.get(key);
        if (metadataReport != null) {
            return metadataReport;
        }
        boolean check = url.getParameter(CHECK_KEY, true) && url.getPort() != 0;
        try {
            //创建元数据
            metadataReport = createMetadataReport(url);
        } catch (Exception e) {
            if (!check) {
                logger.warn("The metadata reporter failed to initialize", e);
            } else {
                throw e;
            }
        }

        if (check && metadataReport == null) {
            throw new IllegalStateException("Can not create metadata Report " + url);
        }
        //加入缓存
        if (metadataReport != null) {
            serviceStoreMap.put(key, metadataReport);
        }
        return metadataReport;
    } finally {
        // Release the lock
        lock.unlock();
    }
}

/**
 * ZookeeperMetadataReportFactory的方法
 * <p>
 * 创建元数据实例
 *
 * @param url 地址
 * @return zookeeper的元数据实例
 */
@Override
public MetadataReport createMetadataReport(URL url) {
    //创建并返回ZookeeperMetadataReport实例
    return new ZookeeperMetadataReport(url, zookeeperTransporter);
}

3.2 ZookeeperMetadataReport基于Zookeeper的元数据

使用zookeeper作为元数据中心时,将会创建并返回ZookeeperMetadataReport。大概逻辑为:

  1. 首先调用父类AbstractMetadataReport的构造器,主要用于加载本地文件缓存、定期发布元数据配置等相关逻辑。
  2. 随后校验url并且连接zookeeper获取zkClienk。
/**
 * ZookeeperMetadataReport的构造器
 *
 * @param url                  地址
 * @param zookeeperTransporter 客户端管理链接池
 */
public ZookeeperMetadataReport(URL url, ZookeeperTransporter zookeeperTransporter) {
    /*
     * 父类构造器
     * 加载本地缓存、定期发布元数据配置等相关逻辑
     */
    super(url);
    //host不能是0.0.0.0,并且url上anyhost参数不能为true
    if (url.isAnyHost()) {
        throw new IllegalStateException("registry address == null");
    }
    //组,默认dubbo
    String group = url.getGroup(DEFAULT_ROOT);
    if (!group.startsWith(PATH_SEPARATOR)) {
        group = PATH_SEPARATOR + group;
    }
    this.root = group;
    //连接zookeeper创建zkClient
    zkClient = zookeeperTransporter.connect(url);
}

3.2.1 AbstractMetadataReport元数据配置

AbstractMetadataReport的构造器主要用于完成加载本地文件缓存、创建定期发布元数据的任务等相关公共的逻辑。

  1. 是否开启元数据本地缓存由file.cache属性控制,默认缓存文件路径为:/{user.home}/.dubbo/dubbo-metadata-{dubbo.application.name}-{ip}-{post}.cache。将会加载缓存文件中的配置到properties属性中。
  2. 更新元数据是通过一个单线程的调度任务线程池执行的,默认异步更新,该构造器中会创建定时任务,首次执行任务的时间为凌晨2点到6点之间的随机时间,后续每次任务执行间隔24h,即每天发布一次元数据。
public AbstractMetadataReport(URL reportServerURL) {
    //将reportURL属性赋值为reportServerURL
    setUrl(reportServerURL);
    //判断是否支持本地缓存,默认为true支持
    boolean localCacheEnabled = reportServerURL.getParameter(REGISTRY_LOCAL_FILE_CACHE_ENABLED, true);
    // Start file save timer 启动文件保存定时器
    //获取默认本地元数据缓存文件路径 /{user.home}/.dubbo/dubbo-metadata-{dubbo.application.name}-{ip}-{post}.cache
    String defaultFilename = System.getProperty(USER_HOME) + DUBBO_METADATA +
        reportServerURL.getApplication() + "-" +
        replace(reportServerURL.getAddress(), ":", "-") + CACHE;
    //最终文件路径,如果有指定路径(file属性)则使用指定的路径,否则使用默认路径
    String filename = reportServerURL.getParameter(FILE_KEY, defaultFilename);
    File file = null;
    //如果支持本地文件缓存并且文件名不为空
    if (localCacheEnabled && ConfigUtils.isNotEmpty(filename)) {
        //基于此路径创建File
        file = new File(filename);
        //如果文件不存在,并且父目录不存在
        if (!file.exists() && file.getParentFile() != null && !file.getParentFile().exists()) {
            //创建父目录,失败则抛出异常
            if (!file.getParentFile().mkdirs()) {
                throw new IllegalArgumentException("Invalid service store file " + file + ", cause: Failed to create directory " + file.getParentFile() + "!");
            }
        }
        // if this file exists, firstly delete it.
        //如果还没初始化完毕,并且缓存文件存在,那么先删除
        if (!initialized.getAndSet(true) && file.exists()) {
            file.delete();
        }
    }
    //保存缓存文件的引用
    this.file = file;
    //加载缓存文件内容到properties集合中
    loadProperties();
    //sync-report属性,是否发布更新元数据,默认为false异步
    syncReport = reportServerURL.getParameter(SYNC_REPORT_KEY, false);
    //元数据重试对象,retry-times 重试次数,默认100,retry-period 重试时间间隔,默认3000毫秒
    metadataReportRetry = new MetadataReportRetry(reportServerURL.getParameter(RETRY_TIMES_KEY, DEFAULT_METADATA_REPORT_RETRY_TIMES),
        reportServerURL.getParameter(RETRY_PERIOD_KEY, DEFAULT_METADATA_REPORT_RETRY_PERIOD));
    // cycle report the data switch
    //cycle-report 是否每天更新完整元数据,默认true
    if (reportServerURL.getParameter(CYCLE_REPORT_KEY, DEFAULT_METADATA_REPORT_CYCLE_REPORT)) {
        //构建单线程的调度任务线程池
        reportTimerScheduler = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("DubboMetadataReportTimer", true));
        //创建定时任务,首次执行任务的时间为凌晨2点到6点之间的随机时间,后续每次任务执行间隔24h,即一天
        reportTimerScheduler.scheduleAtFixedRate(this::publishAll, calculateStartTime(), ONE_DAY_IN_MILLISECONDS, TimeUnit.MILLISECONDS);
    }
    //report-metadata 是否上报地址发现中的接口配置报元数据
    //dubbo.application.metadata-type=remote 该配置不起作用即一定会上报,dubbo.application.metadata-type=local 时是否上报由该配置值决定
    this.reportMetadata = reportServerURL.getParameter(REPORT_METADATA_KEY, false);
    //report-definition 是否上报服务运维用元数据   
    this.reportDefinition = reportServerURL.getParameter(REPORT_DEFINITION_KEY, true);
}

.3.3 publishAll定时发布元数据

定时发布元数据的方法是AbstractMetadataReport#publishAll方法,该方法主要是同步allMetadataReports里面的全部元数据。

内部调用doHandleMetadataCollection方法,如果是provider端的元数据,调用storeProviderMetadata同步,如果是consumer端的元数据,调用storeConsumerMetadata同步。

void publishAll() {
logger.info("start to publish all metadata.");
    this.doHandleMetadataCollection(allMetadataReports);
}

/**
 * AbstractMetadataReport的方法
 * <p>
 * 同步元数据
 *
 * @param metadataMap 元数据map
 * @return
 */
private boolean doHandleMetadataCollection(Map<MetadataIdentifier, Object> metadataMap) {
    //如果本地内存没有元数据,那么无需同步,返回true
    if (metadataMap.isEmpty()) {
        return true;
    }
    Iterator<Map.Entry<MetadataIdentifier, Object>> iterable = metadataMap.entrySet().iterator();
    while (iterable.hasNext()) {
        Map.Entry<MetadataIdentifier, Object> item = iterable.next();
        //如果是provider端的元数据,调用storeProviderMetadata同步
        if (PROVIDER_SIDE.equals(item.getKey().getSide())) {
            this.storeProviderMetadata(item.getKey(), (FullServiceDefinition) item.getValue());
        }
        //如果是consumer端的元数据,调用storeConsumerMetadata同步
        else if (CONSUMER_SIDE.equals(item.getKey().getSide())) {
            this.storeConsumerMetadata(item.getKey(), (Map) item.getValue());
        }

    }
    return false;
}

3.3.1 storeProviderMetadata同步ProviderMetadata

provider端的元数据,调用storeProviderMetadata同步。该方法的大概步骤为:

  1. 将元数据存入内存的allMetadataReports中,从失败的缓存failedReports中移除。我们的入口方法doHandleMetadataCollection中的参数就是allMetadataReports,这里为何再存储进去一次呢?实际上,这个方法在MetadataUtils#publishServiceDefinition方法中也被调用到了,而那里就是allMetadataReports中的数据的来源,即导出服务和引用的时候会同步元数据。
  2. 将serviceDefinition序列化为json字符串。
  3. 调用doStoreProviderMetadata方法将数据同步到元数据中心。
  4. 调用saveProperties方法将数据同步到本地缓存文件。
  5. 如果抛出了异常,将元数据存入失败的元数据缓存,随后重试一次。如果再次失败,抛出异常。
/**
 * AbstractMetadataReport的方法
 * <p>
 * 同步Provider端的Metadata
 *
 * @param providerMetadataIdentifier 元数据标识符,用于存储方法描述符
 * @param serviceDefinition          服务定义,包括接口的规范名称、类文件的位置、类型的定义等属性
 */
@Override
public void storeProviderMetadata(MetadataIdentifier providerMetadataIdentifier, ServiceDefinition serviceDefinition) {
    //同步还是异步
    if (syncReport) {
        //同步调用
        storeProviderMetadataTask(providerMetadataIdentifier, serviceDefinition);
    } else {
        //异步调用,其实就是在reportCacheExecutor线程池中异步执行
        reportCacheExecutor.execute(() -> storeProviderMetadataTask(providerMetadataIdentifier, serviceDefinition));
    }
}

/**
 * AbstractMetadataReport的方法
 * <p>
 * 同步Provider端的Metadata
 *
 * @param providerMetadataIdentifier 元数据标识符,用于存储方法描述符
 * @param serviceDefinition          服务定义,包括接口的规范名称、类文件的位置、类型的定义等属性
 */
private void storeProviderMetadataTask(MetadataIdentifier providerMetadataIdentifier, ServiceDefinition serviceDefinition) {
    try {
        //输出日志
        if (logger.isInfoEnabled()) {
            logger.info("store provider metadata. Identifier : " + providerMetadataIdentifier + "; definition: " + serviceDefinition);
        }
        //存入内存的allMetadataReports中
        allMetadataReports.put(providerMetadataIdentifier, serviceDefinition);
        //从失败的缓存中移除
        failedReports.remove(providerMetadataIdentifier);
        //转换为json字符窜
        String data = JsonUtils.getJson().toJson(serviceDefinition);
        /*
         * 将数据同步到元数据中心
         */
        doStoreProviderMetadata(providerMetadataIdentifier, data);
        /*
         * 将数据同步到本地缓存文件
         */
        saveProperties(providerMetadataIdentifier, data, true, !syncReport);
    } catch (Exception e) {
        // retry again. If failed again, throw exception.
        //抛出异常
        //存入失败的元数据缓存
        failedReports.put(providerMetadataIdentifier, serviceDefinition);
        //重试一次。如果再次失败,抛出异常。
        metadataReportRetry.startRetryTask();
        logger.error("Failed to put provider metadata " + providerMetadataIdentifier + " in  " + serviceDefinition + ", cause: " + e.getMessage(), e);
    }
}

3.3.1.1 doStoreProviderMetadata同步Provider元数据

该方法将Provider元数据同步到远程元数据中心,以zookeeper为例子。

/**
 * ZookeeperMetadataReport的方法
 * <p>
 * 将Provider元数据同步到远程元数据中心
 *
 * @param providerMetadataIdentifier 元数据标识符,用于存储方法描述符
 * @param serviceDefinitions         服务定义json,包括接口的规范名称、类文件的位置、类型的定义等属性
 */
@Override
protected void doStoreProviderMetadata(MetadataIdentifier providerMetadataIdentifier, String serviceDefinitions) {
    storeMetadata(providerMetadataIdentifier, serviceDefinitions);
}

ZookeeperMetadataReport#storeMetadata方法存储元数据,不区分producer或者是consumer。实际上就是创建一个指定路径的永久节点,然后将元数据json字符串存储进去即可。节点路径/dubbo/{pathTag}/{servicePath}/{version}/{group}/{side},例如 /dubbo/metadata/org.apache.dubbo.demo.GreetingService/1.0.0/greeting/provider/demo-provider。

/**
 * ZookeeperMetadataReport的方法
 * <p>
 * 将元数据同步到远程元数据中心
 *
 * @param metadataIdentifier 元数据标识符,用于存储方法描述符
 * @param v                  元数据
 */
private void storeMetadata(MetadataIdentifier metadataIdentifier, String v) {
    //创建一个永久节点
    zkClient.create(getNodePath(metadataIdentifier), v, false);
}

/**
 * 获取节点路径
 *
 * @param metadataIdentifier 元数据标识符,用于存储方法描述符
 * @return 节点路径
 */
String getNodePath(BaseMetadataIdentifier metadataIdentifier) {
    //节点路径   /dubbo/{pathTag}/{servicePath}/{version}/{group}/{side}
    //例如 /dubbo/metadata/org.apache.dubbo.demo.GreetingService/1.0.0/greeting/provider/demo-provider
    return toRootDir() + metadataIdentifier.getUniqueKey(KeyTypeEnum.PATH);
}

zookeeper中的producer元数据样式如下:
image.png

3.3.1.2 saveProperties同步到本地文件

该方法将内存中的元数据同步到本地缓存文件。

/**
 * AbstractMetadataReport的方法
 * <p>
 * 将元数据同步到本地缓存文件
 *
 * @param metadataIdentifier 元数据标识符,用于存储方法描述符
 * @param value              元数据json值
 * @param add                是新增元数据还是删除,true - 新增, false - 删除
 * @param sync               同步还是异步,true - 同步,false - 异步
 */
private void saveProperties(MetadataIdentifier metadataIdentifier, String value, boolean add, boolean sync) {
    //缓存文件不存在则直接返回
    if (file == null) {
        return;
    }

    try {
        //新增元数据
        if (add) {
            properties.setProperty(metadataIdentifier.getUniqueKey(KeyTypeEnum.UNIQUE_KEY), value);
        }
        //否则就删除元数据
        else {
            properties.remove(metadataIdentifier.getUniqueKey(KeyTypeEnum.UNIQUE_KEY));
        }
        //数据变更版本自增
        long version = lastCacheChanged.incrementAndGet();
        //同步存储
        if (sync) {
            new SaveProperties(version).run();
        }
        //异步存储,采用线程池
        else {
            reportCacheExecutor.execute(new SaveProperties(version));
        }

    } catch (Throwable t) {
        logger.warn(t.getMessage(), t);
    }
}

具体的同步操作本抽象成为了一个线程任务SaveProperties。

private class SaveProperties implements Runnable {
private long version;

    private SaveProperties(long version) {
        this.version = version;
    }

    @Override
    public void run() {
        //执行本地同步
        doSaveProperties(version);
    }
}

/**
 * AbstractMetadataReport
 * 、执行元数据同步到本地缓存文件
 *
 * @param version 最新数据变更版本
 */
private void doSaveProperties(long version) {
    //如果版本过小,那么直接返回
    if (version < lastCacheChanged.get()) {
        return;
    }
    //缓存文件不存在则直接返回
    if (file == null) {
        return;
    }
    // Save
    try {
        //新建文件锁,文件路径为缓存文件的绝对路径
        ///{user.home}/.dubbo/dubbo-metadata-{dubbo.application.name}-{ip}-{post}.cache.lock
        File lockfile = new File(file.getAbsolutePath() + ".lock");
        //如果不存在则创建文件
        if (!lockfile.exists()) {
            lockfile.createNewFile();
        }
        try (RandomAccessFile raf = new RandomAccessFile(lockfile, "rw");
             //打开锁文件通道
             FileChannel channel = raf.getChannel()) {
            //获取文件独占锁
            FileLock lock = channel.tryLock();
            //加锁失败
            if (lock == null) {
                throw new IOException("Can not lock the metadataReport cache file " + file.getAbsolutePath() + ", ignore and retry later, maybe multi java process use the file, please config: dubbo.metadata.file=xxx.properties");
            }
            // Save
            try {
                //缓存文件不存在就创建
                if (!file.exists()) {
                    file.createNewFile();
                }

                Properties tmpProperties;
                //异步
                if (!syncReport) {
                    // When syncReport = false, properties.setProperty and properties.store are called from the same
                    // thread(reportCacheExecutor), so deep copy is not required
                    tmpProperties = properties;
                }
                //同步
                else {
                    // Using store method and setProperty method of the this.properties will cause lock contention
                    // under multi-threading, so deep copy a new container
                    //使用一个新的集合,避免锁争用
                    tmpProperties = new Properties();
                    Set<Map.Entry<Object, Object>> entries = properties.entrySet();
                    for (Map.Entry<Object, Object> entry : entries) {
                        tmpProperties.setProperty((String) entry.getKey(), (String) entry.getValue());
                    }
                }
                //将元数据写入到本地文件中
                try (FileOutputStream outputFile = new FileOutputStream(file)) {
                    tmpProperties.store(outputFile, "Dubbo metadataReport Cache");
                }
            } finally {
                //释放锁
                lock.release();
            }
        }
    } catch (Throwable e) {
        if (version < lastCacheChanged.get()) {
            return;
        } else {
            //失败则稍后重试
            reportCacheExecutor.execute(new SaveProperties(lastCacheChanged.incrementAndGet()));
        }
        logger.warn("Failed to save service store file, cause: " + e.getMessage(), e);
    }
}

3.3.2 storeConsumerMetadata同步ConsumerMetadata

comsumer端的元数据,调用storeConsumerMetadata同步。该方法的大概步骤为:

  1. 将元数据存入内存的allMetadataReports中,从失败的缓存failedReports中移除。我们的入口方法doHandleMetadataCollection中的参数就是allMetadataReports,这里为何再存储进去一次呢?实际上,这个方法在MetadataUtils#publishServiceDefinition方法中也被调用到了,而那里就是allMetadataReports中的数据的来源,即导出服务和引用的时候会同步元数据。
  2. 将serviceParameterMap序列化为json字符串。
  3. 调用doStoreConsumerMetadata方法将数据同步到元数据中心。
  4. 调用saveProperties方法将数据同步到本地缓存文件。
  5. 如果抛出了异常,将元数据存入失败的元数据缓存,随后重试一次。如果再次失败,抛出异常。
/**
 * AbstractMetadataReport的方法
 * <p>
 * 同步consumer端的Metadata
 *
 * @param consumerMetadataIdentifier 元数据标识符,用于存储方法描述符
 * @param serviceParameterMap        服务参数map
 */
@Override
public void storeConsumerMetadata(MetadataIdentifier consumerMetadataIdentifier, Map<String, String> serviceParameterMap) {
    //同步还是异步
    if (syncReport) {
        //同步调用
        storeConsumerMetadataTask(consumerMetadataIdentifier, serviceParameterMap);
    } else {
        //异步调用,其实就是在reportCacheExecutor线程池中异步执行
        reportCacheExecutor.execute(() -> storeConsumerMetadataTask(consumerMetadataIdentifier, serviceParameterMap));
    }
}

/**
 * AbstractMetadataReport的方法
 * <p>
 * 同步consumer端的Metadata
 *
 * @param consumerMetadataIdentifier 元数据标识符,用于存储方法描述符
 * @param serviceParameterMap        服务参数map
 */
protected void storeConsumerMetadataTask(MetadataIdentifier consumerMetadataIdentifier, Map<String, String> serviceParameterMap) {
    try {
        //输出日志
        if (logger.isInfoEnabled()) {
            logger.info("store consumer metadata. Identifier : " + consumerMetadataIdentifier + "; definition: " + serviceParameterMap);
        }
        //存入内存的allMetadataReports中
        allMetadataReports.put(consumerMetadataIdentifier, serviceParameterMap);
        //从失败的缓存中移除
        failedReports.remove(consumerMetadataIdentifier);
        //转换为json字符窜
        String data = JsonUtils.getJson().toJson(serviceParameterMap);
        /*
         * 将数据同步到元数据中心
         */
        doStoreConsumerMetadata(consumerMetadataIdentifier, data);
        /*
         * 将数据同步到本地缓存文件
         */
        saveProperties(consumerMetadataIdentifier, data, true, !syncReport);
    } catch (Exception e) {
        // retry again. If failed again, throw exception.
        failedReports.put(consumerMetadataIdentifier, serviceParameterMap);
        metadataReportRetry.startRetryTask();
        logger.error("Failed to put consumer metadata " + consumerMetadataIdentifier + ";  " + serviceParameterMap + ", cause: " + e.getMessage(), e);
    }
}

3.3.2.1 doStoreConsumerMetadata同步Consumer元数据

该方法将Consumer元数据同步到远程元数据中心,以zookeeper为例子。

/**
 * ZookeeperMetadataReport的方法
 * <p>
 * 将Provider元数据同步到远程元数据中心
 *
 * @param consumerMetadataIdentifier 元数据标识符,用于存储方法描述符
 * @param value                      服务参数json
 */
@Override
protected void doStoreConsumerMetadata(MetadataIdentifier consumerMetadataIdentifier, String value) {
    storeMetadata(consumerMetadataIdentifier, value);
}

zookeeper中的consumer的元数据格式如下:
image.png

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Dubbo 3.x源码(9)—Dubbo启动元数据中心源码 的相关文章

  • 面试官问:SpringBoot中@Async默认线程池导致OOM如何解决?

    前言 1 最近项目上在测试人员压测过程中发现了OOM问题 项目使用springboot搭建项目工程 通过查看日志中包含信息 unable to create new native thread 内存溢出的三种类型 1 第一种OutOfMem
  • 第7篇 rabbitmq 创建SocketFrameHandler

    本节主要是熟悉socketFrameHandlerFactory类 真正涉及到socket流处理器 展示如下类图 我们本文关心是SocketFrameHandlerFactory 和SocketFrameHandler 由类图可以知道Soc
  • RPC实践(四)Dubbo实践

    Dubbo是一款重要的RPC框架 它是Alibaba开源的分布式服务框架 它主要特点 提供了注册中心来进行服务的管理 支持zookeeper redis等方式来实现注册中心 Dubbo按照分层的方式来架构 使用这种方式可以使各个层之间解耦合
  • 谷歌chrome浏览器的源码分析(一)

    随着网络技术的发展 越来越多应用都已经离不开网络 特别像人类大脑一样的知识库的搜索引擎 更加是离不开功能强大的云计算 不过 即便云计算非常强大 但它还不能直接地把结果呈现给用户 这样就需要一个客户端来呈现出来 这个客户端就是浏览器 现在越来
  • jdk8源码之Queue-ArrayQueue

    关于队列这个数据结构 大家应该都是比较熟悉 列队是一种先进先出 FIFO 的数据结构 删除操作只能在表的头部 插入操作只能在表的尾部 Queue一般是作为一个缓冲队列使用的 简单举例 生产端的生产速度偶尔会大于消费端的消费速度 但又不想等待
  • Android 字体国际化适配方法以及源码解析

    起源 由于我们公司的app 支持多国语言 所以就导致了 同样的文案 但是长度不同 就会出现适配的问题 因为 中文 是 字表义 外文是 音表义 今天就用8 0新特新来解决这个问题 适配前是这样的 在固定的宽高就会出现适配的问题 在之前博客中也
  • 20个Android游戏源码,…

    原文地址 分享20个Android游戏源码 希望大家喜欢哈 作者 我算哪根葱 分享20个 Android 游戏源码 希望大家喜欢哈 http www apkbus com android 21834 1 1 html Android 疯狂足
  • spring boot配置类注册深入解析

    前言 spring ApplicationContext的刷新总体来看有两个过程 第一个是注册BeanDefinition 提供整个IOC容器初始化的材料 第二个是根据BeanDefinition加载bean 从spring boot开始
  • Dubbo源码解析:服务暴露与发现

    dubbo源码解析 服务暴露与发现 概述 dubbo是一个简单易用的RPC框架 通过简单的提供者 消费者配置就能完成无感的网络调用 那么在dubbo中是如何将提供者的服务暴露出去 消费者又是如何获取到提供者相关信息的呢 这就是本章我们要讨论
  • 深入探索 Dubbo 的 AOT 技术及其技术演进历程

    引言 随着云原生和微服务架构的兴起 高性能和低延迟成为了开发者们的关注重点 在 Java 生态系统中 Spring 和 Dubbo 是两个备受青睐的框架 它们为开发者提供了强大的功能和灵活性 为了进一步提升性能 Dubbo 团队引入了 AO
  • 【零知ESP8266教程】快速入门5-使用按键来控制你的灯

    上节课 我们已经学习了如何制作一个简易交通灯 那么如何去控制一个LED的亮或者灭呢 此次试验采用按键来控制我们的LED 实现LED的简单控制 一 工具原料 电脑 windows系统 ESP8266开发板 micro usb线 LED灯一个
  • 基于SpringBoot-Dubbo的微服务快速开发框架

    简介 基于Dubbo的分布式 微服务基础框架 为前端提供脚手架开发服务 结合前一篇 Web AP快速开发基础框架 可快速上手基于Dubbo的分布式服务开发 项目代码 https github com backkoms web service
  • SpringCloud与Dubbo的比较

    目录 Dubbo 一 dubbo简介 二 dubbo组织架构图 三 dubbo的优势 SpringCloud 一 SpringCloud简介 二 SpringCloud组织架构 三 SpringCloud特点 四 Dubbo与SpringC
  • dubbo整合nacos没有注册成功

    这里大家整合的时候一定要注意dubbo3 0的版本 nacos整合的话要是2 0以上的版本 不然就会出现服务注册不上的情况 下面是nacos的下载地址 推荐大家使用这一个 还是比较主流的 Nacos 快速开始
  • Java PECS(Producer Extends Consumer Super)原则

    在看 Alibaba 开发手册时遇到 PECS 原则 刚开始阅读时感觉比较绕 也搜索了一些博文参考 个人觉得 Stackoverflow 的这篇文章比较实用 What is PECS Producer Extends Consumer Su
  • QGis二次开发 -- 源码编译终极篇

    由于是开源软件 QGis版本迭代比较快 在保持long term release版本的基础上 每个月都会有一个monthly release的新版本发布 源码工程变化快速 给想要上手编译开发的新人朋友带来了一些困惑 我之前分别写过QGis1
  • dubbo分布式服务

    架构 节点角色说明 Provider 暴露服务的服务提供方 Consumer 调用远程服务的服务消费方 Registry 服务注册与发现的注册中心 Monitor 统计服务的调用次调和调用时间的监控中心 Container 服务运行容器 调
  • Dubbo 注册中心挂了,consumer 还能不能调用 provider?

    在 Dubbo 中 如果注册中心 如 Zookeeper Nacos 等 出现故障 消费者 consumer 仍然可以调用提供者 provider 的服务 但需要满足以下条件 消费者和提供者之间的通信配置正确 消费者需要知道提供者的地址和端
  • Zookeeper 和 Dubbo 的关系?

    Zookeeper的作用 zookeeper用来注册服务和进行负载均衡 哪一个服务由哪一个机器来提供必需让调用者知道 简单来说就是ip地址和服务名称的对应关系 当然也可以通过硬编码的方式把这种对应关系在调用方业务代码中实现 但是如果提供服务
  • Dubbo 注册中心挂了,consumer 还能不能调用 provider?

    在 Dubbo 中 如果注册中心 如 Zookeeper Nacos 等 出现故障 消费者 consumer 仍然可以调用提供者 provider 的服务 但需要满足以下条件 消费者和提供者之间的通信配置正确 消费者需要知道提供者的地址和端

随机推荐

  • conda 切换为国内源

    添加清华源 conda config add channels https mirrors tuna tsinghua edu cn anaconda pkgs free conda config add channels https mi
  • 延长OLED透明屏的使用寿命:关键因素与有效方法分享

    OLED透明屏作为一项创新的显示技术 具备透明度和高清晰度的特点 在各个领域得到了广泛应用 然而 为了确保OLED透明屏的持久性和稳定性 延长其使用寿命是至关重要的 根据最新的研究和数据报告 在这篇文章中 尼伽将深入探讨延长OLED透明屏使
  • 微信小程序实现音乐播放器(2)

    文章目录 前情提要 BackgroundAudioManager API wx setNavigationBarTitle 搭建静态资源服务器 小程序项目 app json app wxss pages music music json p
  • three.js常用几何体介绍以及自定义几何体

    一 自定义三角形几何体 核心代码 添加物体 创建几何体 for let i 0 i lt 50 i 每一个三角形 需要3个顶点 每个顶点需要3个值 const geometry new THREE BufferGeometry const
  • 【教程

    文章目录 1 张量 Tensor 2 梯度 PyTorch 1 5 0 autograd 库是使用 PyTorch 构建神经网络的核心 首先让我们简要地浏览一下 之后我们将会训练第一个神经网络 autograd 库提供了 Tensors 上
  • 多线程问题分析thread

    多线程 进程概述 A 进程 进程指正在运行的程序 确切的来说 当一个程序进入内存运行 即变成一个进程 进程是处于运行过程中的程序 并且具有一定独立功能 简而言之 当前正在运动的程序 一个应用程序在内存中的执行区域 B 线程 线程是进程中的一
  • NFTScan 与 Bulletime 在 NFT 底层数据方面达成战略合作

    近日 Web3 基础设施 NFTScan 浏览器与 Bulletime 达成战略合作伙伴关系 双方将在 NFT 底层源数据方面展开深度合作 Bulletime 是一个专业的 NFT 项目链上和链下数据分析聚合平台 为 NFT 用户提供一站式
  • window 分布式文件服务器,Windows活动目录笔记24

    Windows活动目录笔记24 分布式文件系统DFS创建与使用 DFS复制使用复杂的进程保持多个服务器上的数据同步 DFS复制是一个多机复制引擎 一台服务器上的任何修改都将复制到其他所有的成员服务器上 测试环境 域控制器dc1 jinxin
  • 关于xshell的参数配置问题

    关于xshell其实如果想connection通的话 必须需要设置的两个地方如下图 一个是登陆目的主机的IP地址 以及登陆使用的账号和密码另一个就是需要设置代理服务器 类型和端口号是对应的如http1 1对应的是9001 具体对应端口号自己
  • ChatGPT专业应用:采访大纲自动生成

    正文共 429字 阅读大约需要 2 分钟 品牌公关人员 记者群体必备技巧 您将在2分钟后获得以下超能力 1 专业性采访大纲速成 2 多样性采访提问 Beezy评级 B级 经过简单的寻找 大部分人能立刻掌握 主要节省时间 推荐人 麻辣酱 编辑
  • 基于 Matlab 的混沌算法求解单目标优化问题

    基于 Matlab 的混沌算法求解单目标优化问题 随着科学技术的发展和应用场景的不断拓展 优化问题已经成为了一个十分重要的研究领域 在实际问题中 经常需要找到一个最佳解或最优解 从而使得系统能够更加高效地运行 针对这种问题 混沌算法已经被广
  • Flutter输入框实现银行卡输入 每隔四位插入空格进行分割

    先来看下效果图吧 实现思路 1 利用输入框的TextInputFormatter的withFunction方法来处理用户输入的内容 2 同时需要监听文本改变将光标移动至末尾 输入框 CupertinoTextField controller
  • SpringBoot项目 报错 ERROR 13804 --- [ restartedMain] o.a.tomcat.jdbc.pool.ConnectionPool : Unable

    SpringBoot项目 报错 ERROR 13804 restartedMain o a tomcat jdbc pool ConnectionPool Unable 数据库链接出错 通常发生在导入新项目时 MySQL 数据库版本不匹配
  • 浏览器出现光标

    这里想要把光标关掉 解决办法 按键盘的F7 解决办法2 ok 结束
  • 电子书djvu格式简介zz

    DjVu是由美国AT T实验室于1996年开发成功的一项新的图片压缩技术 DjVu的主要技术是将图像分为背景层 纸的纹理和图片 和前景层 文本和线条 通过将文字和背景分离开来 DjVu可以用高分辨率来还原文字 使锐利边缘得以保留 并最大限度
  • 很强,我终于找到绘制E-R图的正确姿势!

    前言 不知道大家是不是和我一样 为了追求速度 开发时一般都是直接建表就干 哪管什么E R图 直到xxx项目找上你 某某客户要E R图 提供一下吧 这时候就很烦 从头绘制E R图成本真的很高 今天我就遇到了这个糟心事 那有什么办法快速从我们的
  • 在使用Assimp库时编译器报错:C2589 “(”:“::”右边的非法标记 AssimpLoadStl

    OpenGL系列文章目录 文章目录 OpenGL系列文章目录 前言 一 错误原因 二 解决 三 运行结果 源码下载 前言 在使用Assimp库时编译器报错 严重性 代码 说明 项目 文件 行 禁止显示状态 错误 C2589 右边的非法标记
  • 大气散射模型的推导过程

    大气中粒子的散射作用是产生雾霾的主要原因 无论是用人的肉眼观察 还是从拍摄获取的图像中观察 雾天的景象总是存在对比度和视野降低的问题 1925年 Keim Nemnich 1 等人提出雾天图像能见度较低是大气中的悬浮粒子对光的吸收和散射造成
  • 吉林大学超星MOOC学习通高级语言程序设计 C++ 实验02 分支与循环程序设计(2021级)(3)

    9 三位Armstrong数 题目编号 Exp02 Basic08 GJBook3 04 12 题目名称 三位Armstrong数 题目描述 编写程序 打印所有3位的Armstrong数 Armstrong数是指其值等于它本身每位数字立方和
  • Dubbo 3.x源码(9)—Dubbo启动元数据中心源码

    基于Dubbo 3 1 详细介绍了Dubbo启动元数据中心源码 Dubbo配置的加载与覆盖的一系列源码文章 Dubbo 3 x源码 7 Dubbo配置的加载入口源码 Dubbo 3 x源码 8 Dubbo配置中心的加载与优先级源码 Dubb