您现在的位置是:首页 >其他 >Nacos 客户端的服务发现与服务订阅机制的纠缠 - 篇七网站首页其他

Nacos 客户端的服务发现与服务订阅机制的纠缠 - 篇七

是佩奇吗? 2023-06-02 08:00:03
简介Nacos 客户端的服务发现与服务订阅机制的纠缠 - 篇七

Nacos 客户端的服务发现与服务订阅机制的纠缠 - 篇七

历史篇章

🕐Nacos 客户端服务注册源码分析-篇一

🕑Nacos 客户端服务注册源码分析-篇二

🕒Nacos 客户端服务注册源码分析-篇三

🕓Nacos 服务端服务注册源码分析-篇四

🕔Nacos 服务端健康检查-篇五

🕕Nacos 客户端服务发现源码分析-篇六

Nacos 客户服务发现续接

之前,在第六篇的时候我们探究了 Nacos 客户端的服务发现源码的具体实现流程。

image-20211022150934273

最终是调用的 NamingService 的 getAllInstance 方法获取了所有的实例列表,而客户端实例列表是封装在一个 List <Instance> 的集合当中的。

//获取所以的实例信息,这里的实例信息就是客户端的信息
List<Instance> list = namingService.getAllInstances("nacos.test.1");

最终是调用 NamingClientProxyDelegate 类下的 subscribe 方法完成订阅,并返回实体信息的。

if (null == serviceInfo) {
    //如果本地的缓存不存在服务信息,则进行订阅
    //查找到最新的实例信息
    serviceInfo = clientProxy.subscribe(serviceName, groupName, clusterString);
}

由于这一部分的内容在之前的第六篇 Nacos 客户端服务发现源码分析-篇六 已经是分析过了的,所以这里我就不再进行赘述这一块的内容了,感兴趣的可以返回调转到指定的篇章进行浏览即可。

可能有些人好奇,哎。标题为什么称作 Nacos 客户端的服务发现与服务订阅机制的纠缠呢?

哈哈,其实他们两者是有联系的,具体是什么联系,就在我们接下来要探究 Nacos 客户端的服务订阅当中有其答案。

既然如此,我们就今天研究一把, Nacos 客户端服务订阅事件机制的具体实现叭。。。


Nacos 客户端服务订阅机制核心流程

首先,先谈谈什么是订阅?生活中那些那些方面体现着类似于订阅这样的概念?只要真正的理解了订阅这一概念,我们才能更好的进行接下来的内容。

订阅其实简单与生活对比来讲,其实就是预定。当然预定的这个动作有发出者,就必须有动作的承受者,举个栗子,外出旅游我们可以会定酒店,那么酒店的服务者就是动作的承受者,订酒店的对象就是动作的发出者,再比如我们的常常提到的订阅一个期刊,如果这个期刊的周期是一年,而该期刊每月都会推送该期的内容,那么订阅期刊的对象就是动作发出者,发布期刊的对象就是动作承受者。

订阅者订阅,承受者在接受到订阅者的指定命令后,周期性的完成指定的任务,这就是订阅。

所以对于注册中心 Nacos 也是同样提供了这样的服务的。。。

大致的流程就是 客户端 通过一个定时的任务每 6 秒从注册中心获取当前的实例列表,当发现实例发生了变化的时候,发布变更事件。对于订阅者而言,完成业务部分的处理(更新实例,更新本地缓存)。

我们可以通过一个流程图,观察其具体的实现。。。 原图点这里

image-20230421231128284

其实从图中已经大致的清楚了,客户端的这个订阅的整体流程。

我们从源码的角度进分析一波。

进入我们的 NacosNamingService 类当中

//在 NacosNamingService 中暴露了许多的重载的 subscribe 方法
//这里 NacosNamingService 类下的 subscribe 方法 和 NamingService 下的 getAllInstances 发现获取实例列表的方法重载的过程都是一样的
@Override
public void subscribe(String serviceName, EventListener listener) throws NacosException {
    //创建一个空的集群对象集合
    subscribe(serviceName, new ArrayList<String>(), listener);
}
@Override
public void subscribe(String serviceName, List<String> clusters, EventListener listener) throws NacosException {
   //设置默认的群组 DEFAULT_GROUP 默认群组
   subscribe(serviceName, Constants.DEFAULT_GROUP, clusters, listener);
}
@Override
public void subscribe(String serviceName, String groupName, List<String> clusters, EventListener listener)
        throws NacosException {
    //如果事件监听器为空 则返回
    if (null == listener) {
        return;
    }
    String clusterString = StringUtils.join(clusters, ",");
    //注册监听器
    changeNotifier.registerListener(groupName, serviceName, clusterString, listener);
    //对于订阅的本质就是服务的发现的一种方式,也就是服务在发现的时候执行订阅方法,同时触发定时任务去服务端拉去数据
    clientProxy.subscribe(serviceName, groupName, clusterString);
}

可以看到的是 NacosNamingService 中提供了大量的 subscribe 的重载方法,这些重载一些默认的参数。

走到 subscribe 方法的尽头,在该方法内可以看到有两个核心的方法 InstanceChangeNotifier 类下的registerListener 注册监听器方法与 NamingClientProxy 类下的 subscribe 订阅方法。我们就探究一下这两个方法具体实现,以及这两个方法的功能作用是什么?

changeNotifier.registerListener 注册监听器

/**
 * register listener.
 *
 * @param groupName   group name
 * @param serviceName serviceName
 * @param clusters    clusters, concat by ','. such as 'xxx,yyy'
 * @param listener    custom listener
 */
public void registerListener(String groupName, String serviceName, String clusters, EventListener listener) {
    String key = ServiceInfo.getKey(NamingUtils.getGroupedName(serviceName, groupName), clusters);
    ConcurrentHashSet<EventListener> eventListeners = listenerMap.get(key);
    if (eventListeners == null) {
        synchronized (lock) {
            eventListeners = listenerMap.get(key);
            if (eventListeners == null) {
                eventListeners = new ConcurrentHashSet<EventListener>();
                listenerMap.put(key, eventListeners);
            }
        }
    }
    eventListeners.add(listener);
}
/**
 * deregister listener.
 *
 * @param groupName   group name
 * @param serviceName serviceName
 * @param clusters    clusters, concat by ','. such as 'xxx,yyy'
 * @param listener    custom listener
 */
public void deregisterListener(String groupName, String serviceName, String clusters, EventListener listener) {
    String key = ServiceInfo.getKey(NamingUtils.getGroupedName(serviceName, groupName), clusters);
    ConcurrentHashSet<EventListener> eventListeners = listenerMap.get(key);
    if (eventListeners == null) {
        return;
    }
    eventListeners.remove(listener);
    if (CollectionUtils.isEmpty(eventListeners)) {
        listenerMap.remove(key);
    }
}

可以看到在 InstancesChangeNotifier 类下有两个关于监听器的方法,注册监听与取消监听。

注册监听其实就是在监听集合对象 ConcurrentHashSet<EventListener> 中添加一个监听事件,而对于取消监听是通过 key 将需要移除的监听事件从集合当中移除。

那么关于这个监听事件添加都监听集合当中后,这个监听事件是如何触发又如何调用执行的呢?这个。。。哈哈,留一个坑,其实这一块我自己还没有研究的特别清楚。。。

接下来我们看看,另一个重要的方法 clientProxy.subscribe() 服务订阅

clientProxy.subscribe 服务订阅

其实玩到这里呢,也就与我们的标题 Nacos 客户的服务发现与客户端服务订阅机制的纠缠,就关联了起来,为什么这么说呢?那让我们看看 clientProxy.subscribe 方法内部的具体实现咯。。。

//其实走到这里就可以看到,该方法与之前的服务发现调用的是同一个方法,这里其实在做的是服务列表的查询
//查询与订阅都调用了同样的而方法
@Override
public ServiceInfo subscribe(String serviceName, String groupName, String clusters) throws NacosException {
    String serviceNameWithGroup = NamingUtils.getGroupedName(serviceName, groupName);
    String serviceKey = ServiceInfo.getKey(serviceNameWithGroup, clusters);
    //开启定时任务调度 UpdateTask
    serviceInfoUpdateService.scheduleUpdateIfAbsent(serviceName, groupName, clusters);
    //获取缓存中的 ServiceInfo
    ServiceInfo result = serviceInfoHolder.getServiceInfoMap().get(serviceKey);
    if (null == result) {
        //如果缓存中没有数据,则进行订阅逻辑处理,基于 gRPC 协议
        result = grpcClientProxy.subscribe(serviceName, groupName, clusters);
    }
    //serviceInfo 本地缓存处理
    serviceInfoHolder.processServiceInfo(result);
    return result;
}

哈哈 ,看到这一块的代码是不是有一种似曾相识的感觉呢?

对咯,没错在第六篇 Nacos 客户端服务发现源码分析 当中的发现获取实例列表的时候在 NacosNamingService 中的 getAllInstances 方法多次重载之后调用的 clientProxy.sunscribe 调用的是同一个方法。

所以其实到这里是可以得到一个结论的,就是 在 Nacos 客户端的查询与订阅服务都是调用了同样的方法的

这就解释了为什么标题 Nacos 客户端的服务发现与服务订阅机制是冥冥之中有种联系在一起的呢。

我们还记得流程图中有一个关于 UpdateTask 定时任务调度吗?

让我们接下来看看,这个里面到底在做什么呢???

定时任务执行内容

//开启定时任务调度 UpdateTask
serviceInfoUpdateService.scheduleUpdateIfAbsent(serviceName, groupName, clusters);
/**
 * Schedule update if absent.
 *
 * @param serviceName service name
 * @param groupName   group name
 * @param clusters    clusters
 */
public void scheduleUpdateIfAbsent(String serviceName, String groupName, String clusters) {
    String serviceKey = ServiceInfo.getKey(NamingUtils.getGroupedName(serviceName, groupName), clu
    if (futureMap.get(serviceKey) != null) {
        return;
    }
    //双重检测锁
    synchronized (futureMap) {
        if (futureMap.get(serviceKey) != null) {
            return;
        }
        //构建一个定时处理的任务,最终这里的 future 就是构建的定时任务,该任务用于在 run 中执行
        ScheduledFuture<?> future = addTask(new UpdateTask(serviceName, groupName, clusters));
        futureMap.put(serviceKey, future);
    }
}
public UpdateTask(String serviceName, String groupName, String clusters) {
    this.serviceName = serviceName;
    this.groupName = groupName;
    this.clusters = clusters;
    this.groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);
    this.serviceKey = ServiceInfo.getKey(groupedServiceName, clusters);
}
private synchronized ScheduledFuture<?> addTask(UpdateTask task) {
    //执行延时函数,延时时间为 1000L * MICRO_SCALE = 1S
    return executor.schedule(task, DEFAULT_DELAY, TimeUnit.MILLISECONDS);
}

可以看到在第二片代码中有这样一个方法,addTask () 对的,没错就是将通过 serviceName、groupName、cliusters 构建一个 UbdateTask 的更新任务对象,然后将其对象构建成一个未来执行的定时任务,添加到执行的集合当中,最终是由 ServiceInfoUpdateService 中的 run 方法去执行。

定时任务 run() 方法的执行

@Override
public void run() {
    long delayTime = DEFAULT_DELAY;
    
    try {
        //判断更改通知对象 serviceName 是否订阅
        if (!changeNotifier.isSubscribed(groupName, serviceName, clusters) && !futureMap.containsKey(serviceKe
            NAMING_LOGGER
                    .info("update task is stopped, service:" + groupedServiceName + ", clusters:" + clusters);
            return;
        }
        //获取缓存中的信息
        ServiceInfo serviceObj = serviceInfoHolder.getServiceInfoMap().get(serviceKey);
        //缓存为空
        if (serviceObj == null) {
            //生成一个服务实例对象
            serviceObj = namingClientProxy.queryInstancesOfService(serviceName, groupName, clusters, 0, false)
            //处理更新或添加到本地的缓存当中
            serviceInfoHolder.processServiceInfo(serviceObj);
            //更新最后一次的时间
            lastRefTime = serviceObj.getLastRefTime();
            return;
        }
        //过期服务,如果说,服务的更新时间是小于等于缓存刷新的时间的
        //那就说明本地的缓存不是最新的,而当前的服务实例信息也不是客户端最新的,
        //这个时候就需要从 注册中心 中重新的进行一次查询,获取最的服务实例信息并更新本地缓存
        if (serviceObj.getLastRefTime() <= lastRefTime) {
            serviceObj = namingClientProxy.queryInstancesOfService(serviceName, groupName, clusters, 0, false)
            //更新处理本地的缓存
            serviceInfoHolder.processServiceInfo(serviceObj);
        }
        //刷新更新的当前时间
        lastRefTime = serviceObj.getLastRefTime();
        if (CollectionUtils.isEmpty(serviceObj.getHosts())) {
            incFailCount();
            return;
        }
        //下次的更新缓存时间设置为缓存中的默认基数 (cacheMillis = 1000) * 6
        // TODO multiple time can be configured.
        delayTime = serviceObj.getCacheMillis() * DEFAULT_UPDATE_CACHE_TIME_MULTIPLE;
        // 重置失败数量为 0
        // 可能会出现一些异常,比如调用 queryInstancesOfService 方法的时候
        // 没有 ServiceInfo 连接不到则会出现异常
        resetFailCount();
    } catch (Throwable e) {
        incFailCount();
        NAMING_LOGGER.warn("[NA] failed to update serviceName: " + groupedServiceName, e);
    } finally {
        // 下次调度刷新时间,下次执行的时间与failCount 失败的次数有关,failCount=0,则下次调度时间为6秒,最长为1分钟
        // 当无异常的情况下 failCount 始终都是 0 则默认的时间一直都 6 s
        executor.schedule(this, Math.min(delayTime << failCount, DEFAULT_DELAY * 60), TimeUnit.MILLISECONDS);
    }
}

未完待续。。。

风语者!平时喜欢研究各种技术,目前在从事后端开发工作,热爱生活、热爱工作。