您现在的位置是:首页 >技术交流 >[Nacos] Nacos Client获取所有服务和定时更新Client端的注册表 (三)网站首页技术交流
[Nacos] Nacos Client获取所有服务和定时更新Client端的注册表 (三)
文章目录
Nacos的服务发现功能: 获取所有服务, 定时更新Client端的注册表
1.Nacos Client获取所有服务
1.1 Client如何获取所有服务
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
</dependency>
NacosDiscoveryClientAutoConfiguration.java
@Configuration
@ConditionalOnNacosDiscoveryEnabled
@AutoConfigureBefore({ SimpleDiscoveryClientAutoConfiguration.class,
CommonsClientAutoConfiguration.class })
public class NacosDiscoveryClientAutoConfiguration {
@Bean
@ConditionalOnMissingBean
public NacosDiscoveryProperties nacosProperties() {
return new NacosDiscoveryProperties();
}
@Bean
public DiscoveryClient nacosDiscoveryClient(
NacosDiscoveryProperties discoveryProperties) {
return new NacosDiscoveryClient(discoveryProperties);
}
@Bean
@ConditionalOnMissingBean
@ConditionalOnProperty(value = "spring.cloud.nacos.discovery.watch.enabled", matchIfMissing = true)
public NacosWatch nacosWatch(NacosDiscoveryProperties nacosDiscoveryProperties) {
return new NacosWatch(nacosDiscoveryProperties);
}
}
NacosDiscoveryClient#getServices()
@Override
public List<String> getServices() {
try {
ListView<String> services = discoveryProperties.namingServiceInstance()
.getServicesOfServer(1, Integer.MAX_VALUE);
return services.getData();
} catch (Exception e) {
log.error("get service name from nacos server fail,", e);
return Collections.emptyList();
}
}
这里的discoveryProperties为上面的NacosDiscoveryClientAutoConfiguration自动注入了。
DiscoveryClientHealthIndicator为SpringBoot的actuator自带的监控功能。
DiscoveryClientHealthIndicator#health()调用了这个getServices()方法。
1.2 Client获取服务方法getServices()详解
NacosMaingService#getServicesOfServer()
NamingProxy#getServiceList()
最后还是通过向Server端发送get请求。
result为两个服务名称。
2.Nacos定时更新Client端的注册表
还是自动注册类NacosDiscoreryClientAutoConfiguration.java, 会自动注入NaocsWatch类, 这个类在Nacos源码不存在, 只是自动注入SpringBoot的类。
2.1 Nacos和Eureka定时更新Client端的注册表的区别
Eureka:
Nacos:
2.2 Client定时更新本地服务过程
NacosNamingService#subscribe()
通过hostReactor.getServiceInfo()更新本地服务过程
HostReactor#getServiceInfo()
public ServiceInfo getServiceInfo(final String serviceName, final String clusters) {
NAMING_LOGGER.debug("failover-mode: " + failoverReactor.isFailoverSwitch());
// 构建key,其格式为 groupId@@微服务名称@@clusters名称
String key = ServiceInfo.getKey(serviceName, clusters);
if (failoverReactor.isFailoverSwitch()) {
return failoverReactor.getService(key);
}
// 从当前client的本地注册表中获取当前服务
ServiceInfo serviceObj = getServiceInfo0(serviceName, clusters);
if (null == serviceObj) { // 若本地注册表中没有该服务,则创建一个
// 创建一个空的服务(没有任何提供者实例instance的ServiceInfo)
serviceObj = new ServiceInfo(serviceName, clusters);
// serviceInfoMap即为Client端的本地注册表
serviceInfoMap.put(serviceObj.getKey(), serviceObj);
// updatingMap是一个临时缓存,其主要是使用这个缓存map的key,
// 使用map的key不能重复这个特性
// 只要有服务名称出现在这个缓存map中,就表示当前这个服务正在被更新
// 准备要更新serviceName的服务了,就先将其名称写入到这个临时缓存map
updatingMap.put(serviceName, new Object());
// 更新本地注册表中的serviceName的服务
updateServiceNow(serviceName, clusters);
// 更新完毕,将该serviceName服务从临时缓存map中干掉
updatingMap.remove(serviceName);
// 若当前注册表中已经有了这个服务,那么查看一下临时缓存map中
// 是否存在该服务。若存在,则说明这个服务正在被更新,所以本次
// 操作先等待wait一会儿
} else if (updatingMap.containsKey(serviceName)) {
if (UPDATE_HOLD_INTERVAL > 0) {
// hold a moment waiting for update finish
synchronized (serviceObj) {
try {
serviceObj.wait(UPDATE_HOLD_INTERVAL);
} catch (InterruptedException e) {
NAMING_LOGGER
.error("[getServiceInfo] serviceName:" + serviceName + ", clusters:" + clusters, e);
}
}
}
}
// 启动一个定时任务,定时更新本地注册表中的当前服务
scheduleUpdateIfAbsent(serviceName, clusters);
return serviceInfoMap.get(serviceObj.getKey());
}
private ServiceInfo getServiceInfo0(String serviceName, String clusters) {
String key = ServiceInfo.getKey(serviceName, clusters);
// serviceInfoMap即为Client端的本地注册表,
// 其key为 groupId@@微服务名称@@clusters名称
// value为ServiceInfo
return serviceInfoMap.get(key);
}
- 通过Key, Value的模式去serviceInfoMap即为Client端的本地注册表获取服务, Key:
groupId@@微服务名称@@clusters名称
, Value:ServiceInfo
- 先从当前client的本地注册表中获取当前服务, 若本地注册表中没有该服务,则创建一个, 首先创建一个空的服务, 然后填充属性。
- updatingMap: updatingMap是一个临时缓存,其主要是使用这个缓存map的key, 使用map的key不能重复这个特性, 只要有服务名称出现在这个缓存map中,就表示当前这个服务正在被更新, 准备要更新serviceName的服务了,就先将其名称写入到这个临时缓存map。
- updateServiceNow(serviceName, clusters): 更新本地注册表中的serviceName的服务, 更新完毕后从updatingMap删除serviceName服务
- 若当前注册表中已经有了这个服务,那么查看一下临时缓存map中, 是否存在该服务。若存在,则说明这个服务正在被更新, 操作先等待wait一会儿
- 最后, 启动一个定时任务,定时更新本地注册表中的当前服务
2.3 updateServiceNow方法解析
更新本地注册表中的serviceName的服务
private void updateServiceNow(String serviceName, String clusters) {
try {
// 更新本地注册表中的serviceName的服务
updateService(serviceName, clusters);
} catch (NacosException e) {
NAMING_LOGGER.error("[NA] failed to update serviceName: " + serviceName, e);
}
}
processServiceJson方法去将 将来自于Server的ServiceInfo更新到本地注册表, 这里的ServerInfo为向server提交一个GTE请求, 是JSON字符串的形式。
HostReactor#processServiceJson(): 将来自Server的ServerInfo更新至本地注册表, 有一些情况具体分析。
public ServiceInfo processServiceJson(String json) {
// 将来自于Server的JSON转换为ServiceInfo
ServiceInfo serviceInfo = JacksonUtils.toObj(json, ServiceInfo.class);
// 获取注册表中当前服务的ServiceInfo
ServiceInfo oldService = serviceInfoMap.get(serviceInfo.getKey());
if (serviceInfo.getHosts() == null || !serviceInfo.validate()) {
//empty or error push, just ignore
return oldService;
}
boolean changed = false;
// 若当前注册表中存在当前服务,则想办法将来自于server的数据更新到本地注册表
if (oldService != null) {
// 为了安全起见,这种情况几乎是不会出现的
if (oldService.getLastRefTime() > serviceInfo.getLastRefTime()) {
NAMING_LOGGER.warn("out of date data received, old-t: " + oldService.getLastRefTime() + ", new-t: "
+ serviceInfo.getLastRefTime());
}
// 将来自于Server的serviceInfo替换掉注册表中的当前服务
serviceInfoMap.put(serviceInfo.getKey(), serviceInfo);
// 遍历本地注册表中当前服务的所有instance实例
Map<String, Instance> oldHostMap = new HashMap<String, Instance>(oldService.getHosts().size());
for (Instance host : oldService.getHosts()) {
// 将当前遍历的instance主机的ip:port作为key,instance作为value,
// 写入到一个新的map
oldHostMap.put(host.toInetAddr(), host);
}
// 遍历来自于server的当前服务的所有instance实例
Map<String, Instance> newHostMap = new HashMap<String, Instance>(serviceInfo.getHosts().size());
for (Instance host : serviceInfo.getHosts()) {
// 将当前遍历的instance主机的ip:port作为key,instance作为value,
// 写入到一个新的map
newHostMap.put(host.toInetAddr(), host);
}
// 该set集合中存放的是,两个map(oldHostMap与newHostMap)中都有的ip:port,
// 但它们的instance不相同,此时会将来自于server的instance写入到这个set
Set<Instance> modHosts = new HashSet<Instance>();
// 只有newHostMap中存在的instance,即在server端新增的instance
Set<Instance> newHosts = new HashSet<Instance>();
// 只有oldHostMap中存在的instance,即在server端被删除的instance
Set<Instance> remvHosts = new HashSet<Instance>();
List<Map.Entry<String, Instance>> newServiceHosts = new ArrayList<Map.Entry<String, Instance>>(
newHostMap.entrySet());
// 遍历来自于server的主机
for (Map.Entry<String, Instance> entry : newServiceHosts) {
Instance host = entry.getValue();
// ip:port
String key = entry.getKey();
// 在注册表中存在该ip:port,但这两个instance又不同,则将这个instance写入到modHosts
if (oldHostMap.containsKey(key) && !StringUtils
.equals(host.toString(), oldHostMap.get(key).toString())) {
modHosts.add(host);
continue;
}
// 若注册表中不存在该ip:port,说明这个主机是新增的,则将其写入到newHosts
if (!oldHostMap.containsKey(key)) {
newHosts.add(host);
}
}
// 遍历来自于本地注册表的主机
for (Map.Entry<String, Instance> entry : oldHostMap.entrySet()) {
Instance host = entry.getValue();
String key = entry.getKey();
if (newHostMap.containsKey(key)) {
continue;
}
// 注册表中存在,但来自于server的serviceInfo中不存在,
// 说明这个instance被干掉了,将其写入到remvHosts
if (!newHostMap.containsKey(key)) {
remvHosts.add(host);
}
}
if (newHosts.size() > 0) {
changed = true;
NAMING_LOGGER.info("new ips(" + newHosts.size() + ") service: " + serviceInfo.getKey() + " -> "
+ JacksonUtils.toJson(newHosts));
}
if (remvHosts.size() > 0) {
changed = true;
NAMING_LOGGER.info("removed ips(" + remvHosts.size() + ") service: " + serviceInfo.getKey() + " -> "
+ JacksonUtils.toJson(remvHosts));
}
if (modHosts.size() > 0) {
changed = true;
// 变更心跳信息BeatInfo
updateBeatInfo(modHosts);
NAMING_LOGGER.info("modified ips(" + modHosts.size() + ") service: " + serviceInfo.getKey() + " -> "
+ JacksonUtils.toJson(modHosts));
}
serviceInfo.setJsonFromServer(json);
// 只要发生了变更,就将这个发生变更的serviceInfo记录到一个缓存队列
if (newHosts.size() > 0 || remvHosts.size() > 0 || modHosts.size() > 0) {
eventDispatcher.serviceChanged(serviceInfo);
DiskCache.write(serviceInfo, cacheDir);
}
// 若本地注册表中就没有当前服务,则直接将来自于server的serviceInfo写入到注册表
} else {
changed = true;
NAMING_LOGGER.info("init new ips(" + serviceInfo.ipCount() + ") service: " + serviceInfo.getKey() + " -> "
+ JacksonUtils.toJson(serviceInfo.getHosts()));
// 将来自于server的serviceInfo写入到注册表
serviceInfoMap.put(serviceInfo.getKey(), serviceInfo);
// 将这个发生变更的serviceInfo记录到一个缓存队列
eventDispatcher.serviceChanged(serviceInfo);
serviceInfo.setJsonFromServer(json);
DiskCache.write(serviceInfo, cacheDir);
}
MetricsMonitor.getServiceInfoMapSizeMonitor().set(serviceInfoMap.size());
if (changed) {
NAMING_LOGGER.info("current ips:(" + serviceInfo.ipCount() + ") service: " + serviceInfo.getKey() + " -> "
+ JacksonUtils.toJson(serviceInfo.getHosts()));
}
return serviceInfo;
}
- 来自于Server的数据是最新的数据, oldService为当前注册表中的服务SericeInfo, 为旧的服务。
- 若当前注册表中存在当前服务, 将来自于server的数据更新到本地注册表
- 将来自于Server的serviceInfo替换掉注册表中的当前服务, 保持至serviceInfoMap
- 遍历本地注册表中当前服务的所有instance实例, 将当前遍历的instance主机的ip:port作为key,instance作为value,写入到一个新的map, oldHostMap
- 遍历来自于server的当前服务的所有instance实例, 将当前遍历的instance主机的ip:port作为key,instance作为value,入到一个新的map, newHostMap
- 创建3个Set集合, modHosts存放的是两个map(oldHostMap与newHostMap)中都有的ip:port, 但它们的instance不相同,此时会将来自于server的instance写入到这个set, newHosts存放的是只有newHostMap中存在的instance,即在server端新增的instance, remvHosts存放的是只有oldHostMap中存在的instance,即在server端被删除的instance
- 遍历来自于server的主机, 如果在注册表中存在该ip:port,但这两个instance又不同,则将这个instance写入到modHosts, 如果若注册表中不存在该ip:port,说明这个主机是新增的,则将其写入到newHosts
- 遍历来自于本地注册表的主机, 如果注册表中存在,但来自于server的serviceInfo中不存在, 说明这个instance被干掉了,将其写入到remvHosts
- 如果modHosts中存在对象, 那么变更心跳信息BeatInfo, updateBeatInfo()
- 只要发生了变更,就将这个发生变更的serviceInfo记录到一个缓存队列,
DiskCache
- 若本地注册表中就没有当前服务,则直接将来自于server的serviceInfo写入到注册表, 将这个发生变更的serviceInfo记录到一个缓存队列,
DiskCache
2.4 定时更新本地注册表中的当前服务
HostReactor#scheduleUpdateIfAbsent()
发起一个定时任务, 定时更新本地注册表中的当前服务
在HostReator#getServiceInfo()最后有一个定时任务的代码
public void scheduleUpdateIfAbsent(String serviceName, String clusters) {
// futureMap是一个缓存map,其key为 groupId@@微服务名称@@clusters
// value是一个定时异步操作对象
// 这种结构称之为:双重检测锁,DCL,Double Check Lock
// 该结构是为了避免在并发情况下,多线程重复写入数据
// 该结构的特征:
// 1)有两个不为null的判断
// 2)有共享集合
// 3)有synchronized代码块
if (futureMap.get(ServiceInfo.getKey(serviceName, clusters)) != null) {
return;
}
synchronized (futureMap) {
if (futureMap.get(ServiceInfo.getKey(serviceName, clusters)) != null) {
return;
}
// 创建一个定时异步操作对象,并启动这个定时任务
ScheduledFuture<?> future = addTask(new UpdateTask(serviceName, clusters));
// 将这个定时异步操作对象写入到缓存map
futureMap.put(ServiceInfo.getKey(serviceName, clusters), future);
}
}
这个是一个DCL结构, 结构是为了避免在并发情况下,多线程重复写入数据。
UpdateTask#run() 启动定时任务
@Override
public void run() {
long delayTime = DEFAULT_DELAY;
try {
// 从本地注册表中获取当前服务
ServiceInfo serviceObj = serviceInfoMap.get(ServiceInfo.getKey(serviceName, clusters));
// 若本地注册表中不存在该服务,则从server获取到后,更新到本地注册表
if (serviceObj == null) {
// 从server获取当前服务,并更新到本地注册表
updateService(serviceName, clusters);
return;
}
// 处理本地注册表中存在当前服务的情况
// 1)serviceObj.getLastRefTime() 获取到的是当前服务最后被访问的时间,这个时间
// 是来自于本地注册表的,其记录的是所有提供这个服务的instance中最后一个instance
// 被访问的时间
// 2)缓存lastRefTime 记录的是当前instance最后被访问的时间
// 若1)时间 小于 2)时间,说明当前注册表应该更新的
if (serviceObj.getLastRefTime() <= lastRefTime) {
updateService(serviceName, clusters);
serviceObj = serviceInfoMap.get(ServiceInfo.getKey(serviceName, clusters));
} else {
// if serviceName already updated by push, we should not override it
// since the push data may be different from pull through force push
refreshOnly(serviceName, clusters);
}
// 将来自于注册表的这个最后访问时间更新到当前client的缓存
lastRefTime = serviceObj.getLastRefTime();
if (!eventDispatcher.isSubscribed(serviceName, clusters) && !futureMap
.containsKey(ServiceInfo.getKey(serviceName, clusters))) {
// abort the update task
NAMING_LOGGER.info("update task is stopped, service:" + serviceName + ", clusters:" + clusters);
return;
}
if (CollectionUtils.isEmpty(serviceObj.getHosts())) {
incFailCount();
return;
}
delayTime = serviceObj.getCacheMillis();
resetFailCount();
} catch (Throwable e) {
incFailCount();
NAMING_LOGGER.warn("[NA] failed to update serviceName: " + serviceName, e);
} finally {
// 开启下一次的定时任务
executor.schedule(this, Math.min(delayTime << failCount, DEFAULT_DELAY * 60), TimeUnit.MILLISECONDS);
}
}
-
从本地注册表中获取当前服务, 若本地注册表中不存在该服务, Server获取后, 更新到本地注册表ServiceInfo
这里的updateService()在之前分析过。 -
处理本地注册表中存在当前服务的情况, 判断当前服务最后被访问的时间和当前instance最后被访问的时间的大小
如果当前服务最后被访问的时间小于等于当前instance最后被访问的时间的话, 那么说明当前注册表应该更新的
这里的updateService()在之前分析过。 -
最后开启下一次的定时任务