Nacos名字服务(Naming Service)

您所在的位置:网站首页 anoter怎么读的 Nacos名字服务(Naming Service)

Nacos名字服务(Naming Service)

2024-01-18 09:00| 来源: 网络整理| 查看: 265

目录

一、使用

1. 服务提供方

2. 服务消费方

二、服务发现与注册

1. 消费方服务发现

(1)主动拉取服务

NacosNamingService.getAllInstances()

NacosNamingService.getServiceInfo(final String serviceName, final String clusters)

NacosNamingService.updateServiceNow(String serviceName, String clusters)

NacosNamingService.scheduleUpdateIfAbsent(String serviceName, String clusters)

UpdateTask.run()

(2)接收推送服务

PushReceiver构造方法

PushReceiver.run()

HostReactor.processServiceJSON(String json)

2. 提供方服务注册

NacosNamingService.registerInstance(String serviceName, String groupName, Instance instance) throws NacosException

NacosNamingService.addBeatInfo(String serviceName, BeatInfo beatInfo)

心跳任务:BeatTask

3. 注册中心

ServiceManager.registerInstance(String namespaceId, String serviceName, Instance instance);

ServiceManager.addInstance(String namespaceId, String serviceName, boolean ephemeral, Instance... ips) throws NacosException

DelegateConsistencyServiceImpl.put(String key, Record value)

RaftConsistencyServiceImpl.put(String key, Record value)

RaftConsistencyServiceImpl.signalPublish(String key, Record value)

RaftCore.onPublish(Datum datum, RaftPeer source)

命名服务是指通过指定的名字来获取资源或者服务的地址,提供者的信息。

名字服务需要提供服务的订阅与注册功能,dubbo默认基于zk实现,nacos有自己的实现方式,数据存储主要基于raft。

提供分布式系统中所有对象(Object)、实体(Entity)的“名字”到关联的元数据之间的映射管理服务,例如 ServiceName -> Endpoints Info, Distributed Lock Name -> Lock Owner/Status Info, DNS Domain Name -> IP List, 服务发现和 DNS 就是名字服务的2大场景。

一、使用 1. 服务提供方 @NacosInjected // 使用nacos client的NacosInjected注解注入服务 private NamingService namingService; @RequestMapping(value = "/set", method = GET) @ResponseBody public String set(@RequestParam String serviceName) { try { namingService.registerInstance(serviceName, "10.26.15.125", 8848); // 注册中心的地址 return "OK"; } catch (NacosException e) { e.printStackTrace(); return "ERROR"; } }

调用http://localhost:8084/discovery/set?serviceName=example,注册服务name=example

2. 服务消费方

 

@NacosInjected private NamingService namingService; @RequestMapping(value = "/get", method = GET) @ResponseBody public List get(@RequestParam(defaultValue = "") String serviceName) throws NacosException { return namingService.getAllInstances(serviceName); }

调用http://localhost:8084/discovery/get?serviceName=example,获取服务,下面是返回结果:

[ { instanceId: "10.26.15.125#8848#DEFAULT#DEFAULT_GROUP@@examplee", ip: "10.26.15.125", port: 8848, weight: 1, healthy: true, enabled: true, ephemeral: true, clusterName: "DEFAULT", serviceName: "DEFAULT_GROUP@@example", metadata: {} } ] 二、服务发现与注册 1. 消费方服务发现 (1)主动拉取服务 NacosNamingService.getAllInstances() public List getAllInstances(String serviceName, String groupName, List clusters, boolean subscribe) throws NacosException { ServiceInfo serviceInfo; if (subscribe) { // 是否订阅了服务 // 从缓存或注册中心获取服务信息 serviceInfo = hostReactor.getServiceInfo(NamingUtils.getGroupedName(serviceName, groupName), StringUtils.join(clusters, ",")); } else { // 直接从注册中心获取服务 // getServiceInfoDirectlyFromServer方法很简单,直接调用serverProxy.queryList(下面会讲)获取服务, serviceInfo = hostReactor.getServiceInfoDirectlyFromServer(NamingUtils.getGroupedName(serviceName, groupName), StringUtils.join(clusters, ",")); } List list; if (serviceInfo == null || CollectionUtils.isEmpty(list = serviceInfo.getHosts())) { return new ArrayList(); } return list; } NacosNamingService.getServiceInfo(final String serviceName, final String clusters) public ServiceInfo getServiceInfo(final String serviceName, final String clusters) { NAMING_LOGGER.debug("failover-mode: " + failoverReactor.isFailoverSwitch()); String key = ServiceInfo.getKey(serviceName, clusters); // 注册中心与服务提供方失去联系,会把该服务置成Failover状态,下面的代码会直接缓存取服务信息,或者取不到返回空服务。 if (failoverReactor.isFailoverSwitch()) { return failoverReactor.getService(key); } // 从本地缓存map中取 ServiceInfo serviceObj = getSerivceInfo0(serviceName, clusters); // 如果本地缓存没有,则请求注册中心,并更新本地缓存。 if (null == serviceObj) { serviceObj = new ServiceInfo(serviceName, clusters); serviceInfoMap.put(serviceObj.getKey(), serviceObj); updatingMap.put(serviceName, new Object()); // 请求注册中心,并更新本地缓存。 updateServiceNow(serviceName, clusters); updatingMap.remove(serviceName); } else if (updatingMap.containsKey(serviceName)) { if (updateHoldInterval > 0) { // hold a moment waiting for update finish synchronized (serviceObj) { try { serviceObj.wait(updateHoldInterval); } catch (InterruptedException e) { NAMING_LOGGER.error("[getServiceInfo] serviceName:" + serviceName + ", clusters:" + clusters, e); } } } } // 生成定时任务,等一段时间再执行更新操作 scheduleUpdateIfAbsent(serviceName, clusters); return serviceInfoMap.get(serviceObj.getKey()); } NacosNamingService.updateServiceNow(String serviceName, String clusters) public void updateServiceNow(String serviceName, String clusters) { // 先取本地缓存 ServiceInfo oldService = getSerivceInfo0(serviceName, clusters); try { // 从配置中心拉取 String result = serverProxy.queryList(serviceName, clusters, pushReceiver.getUDPPort(), false); if (StringUtils.isNotEmpty(result)) { processServiceJSON(result); } } catch (Exception e) { NAMING_LOGGER.error("[NA] failed to update serviceName: " + serviceName, e); } finally { if (oldService != null) { synchronized (oldService) { oldService.notifyAll(); } } } } NacosNamingService.scheduleUpdateIfAbsent(String serviceName, String clusters) // 缓存中存在,直接返回。 if (futureMap.get(ServiceInfo.getKey(serviceName, clusters)) != null) { return; } synchronized (futureMap) { if (futureMap.get(ServiceInfo.getKey(serviceName, clusters)) != null) { return; } // 缓存没有,生成一个任务,定时延迟执行, 该任务是循环回调自己的任务 ScheduledFuture future = addTask(new UpdateTask(serviceName, clusters)); futureMap.put(ServiceInfo.getKey(serviceName, clusters), future); } UpdateTask.run() @Override public void run() { try { ServiceInfo serviceObj = serviceInfoMap.get(ServiceInfo.getKey(serviceName, clusters)); if (serviceObj == null) { updateServiceNow(serviceName, clusters); executor.schedule(this, DEFAULT_DELAY, TimeUnit.MILLISECONDS); return; } if (serviceObj.getLastRefTime() serviceInfo.getLastRefTime()) { NAMING_LOGGER.warn("out of date data received, old-t: " + oldService.getLastRefTime() + ", new-t: " + serviceInfo.getLastRefTime()); } // 共享内存 serviceInfoMap.put(serviceInfo.getKey(), serviceInfo); Map oldHostMap = new HashMap(oldService.getHosts().size()); for (Instance host : oldService.getHosts()) { oldHostMap.put(host.toInetAddr(), host); } Map newHostMap = new HashMap(serviceInfo.getHosts().size()); for (Instance host : serviceInfo.getHosts()) { newHostMap.put(host.toInetAddr(), host); } Set modHosts = new HashSet(); Set newHosts = new HashSet(); Set remvHosts = new HashSet(); List newServiceHosts = new ArrayList( newHostMap.entrySet()); for (Map.Entry entry : newServiceHosts) { Instance host = entry.getValue(); String key = entry.getKey(); if (oldHostMap.containsKey(key) && !StringUtils.equals(host.toString(), oldHostMap.get(key).toString())) { modHosts.add(host); continue; } if (!oldHostMap.containsKey(key)) { newHosts.add(host); continue; } } for (Map.Entry entry : oldHostMap.entrySet()) { Instance host = entry.getValue(); String key = entry.getKey(); if (newHostMap.containsKey(key)) { continue; } if (!newHostMap.containsKey(key)) { remvHosts.add(host); continue; } } if (newHosts.size() > 0) { NAMING_LOGGER.info("new ips(" + newHosts.size() + ") service: " + serviceInfo.getName() + " -> " + JSON.toJSONString(newHosts)); } if (remvHosts.size() > 0) { NAMING_LOGGER.info("removed ips(" + remvHosts.size() + ") service: " + serviceInfo.getName() + " -> " + JSON.toJSONString(remvHosts)); } if (modHosts.size() > 0) { NAMING_LOGGER.info("modified ips(" + modHosts.size() + ") service: " + serviceInfo.getName() + " -> " + JSON.toJSONString(modHosts)); } serviceInfo.setJsonFromServer(json); if (newHosts.size() > 0 || remvHosts.size() > 0 || modHosts.size() > 0) { // 事件更新,做业务拓展 eventDispatcher.serviceChanged(serviceInfo); DiskCache.write(serviceInfo, cacheDir); } } else { NAMING_LOGGER.info("new ips(" + serviceInfo.ipCount() + ") service: " + serviceInfo.getName() + " -> " + JSON .toJSONString(serviceInfo.getHosts())); serviceInfoMap.put(serviceInfo.getKey(), serviceInfo); eventDispatcher.serviceChanged(serviceInfo); serviceInfo.setJsonFromServer(json); // 写入到磁盘 DiskCache.write(serviceInfo, cacheDir); } // 使用prometheus.Gauge 监控服务数量 MetricsMonitor.getServiceInfoMapSizeMonitor().set(serviceInfoMap.size()); NAMING_LOGGER.info("current ips:(" + serviceInfo.ipCount() + ") service: " + serviceInfo.getName() + " -> " + JSON.toJSONString(serviceInfo.getHosts())); return serviceInfo; } 2. 提供方服务注册 NacosNamingService.registerInstance(String serviceName, String groupName, Instance instance) throws NacosException public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException { BeatInfo beatInfo = new BeatInfo(); beatInfo.setServiceName(NamingUtils.getGroupedName(serviceName, groupName)); beatInfo.setIp(instance.getIp()); beatInfo.setPort(instance.getPort()); beatInfo.setCluster(instance.getClusterName()); beatInfo.setWeight(instance.getWeight()); beatInfo.setMetadata(instance.getMetadata()); beatInfo.setScheduled(false); // 添加心跳任务,后续向注册中心发送心跳 beatReactor.addBeatInfo(NamingUtils.getGroupedName(serviceName, groupName), beatInfo); // 向注册中心注册服务 serverProxy.registerService(NamingUtils.getGroupedName(serviceName, groupName), groupName, instance); } NacosNamingService.addBeatInfo(String serviceName, BeatInfo beatInfo) public void addBeatInfo(String serviceName, BeatInfo beatInfo) { NAMING_LOGGER.info("[BEAT] adding beat: {} to beat map.", beatInfo); // 加入dom2Beat(map),后续定时任务发送心跳 dom2Beat.put(buildKey(serviceName, beatInfo.getIp(), beatInfo.getPort()), beatInfo); // 使用Prometheus.Gauge监控心跳服务数量,任务类:BeatTask MetricsMonitor.getDom2BeatSizeMonitor().set(dom2Beat.size()); } 心跳任务:BeatTask class BeatTask implements Runnable { BeatInfo beatInfo; public BeatTask(BeatInfo beatInfo) { this.beatInfo = beatInfo; } @Override public void run() { // 注册中心代理发送心跳信息 long result = serverProxy.sendBeat(beatInfo); beatInfo.setScheduled(false); if (result > 0) { clientBeatInterval = result; } } } 3. 注册中心

下面是注册中心收到服务注册请求后的逻辑

ServiceManager.registerInstance(String namespaceId, String serviceName, Instance instance); public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException { if (ServerMode.AP.name().equals(switchDomain.getServerMode())) { createEmptyService(namespaceId, serviceName); } // 检查是否已经注册,重复注册会抛出异常 Service service = getService(namespaceId, serviceName); if (service == null) { throw new NacosException(NacosException.INVALID_PARAM, "service not found, namespace: " + namespaceId + ", service: " + serviceName); } if (service.allIPs().contains(instance)) { throw new NacosException(NacosException.INVALID_PARAM, "instance already exist: " + instance); } addInstance(namespaceId, serviceName, instance.isEphemeral(), instance); } ServiceManager.addInstance(String namespaceId, String serviceName, boolean ephemeral, Instance... ips) throws NacosException public void addInstance(String namespaceId, String serviceName, boolean ephemeral, Instance... ips) throws NacosException { String key = KeyBuilder.buildInstanceListKey(namespaceId, serviceName, ephemeral); Service service = getService(namespaceId, serviceName); // 获取该服务的所有实例 List instanceList = addIpAddresses(service, ephemeral, ips); Instances instances = new Instances(); instances.setInstanceList(instanceList); consistencyService.put(key, instances); } DelegateConsistencyServiceImpl.put(String key, Record value) public void put(String key, Record value) throws NacosException { mapConsistencyService(key).put(key, value); } // CAP定理, // AP模式下,默认返回 EphemeralConsistencyService实例,数据保存到内存当中 // CP模式下,默认返回 PersistentConsistencyService实例,数据保存到硬盘中 private ConsistencyService mapConsistencyService(String key) { return KeyBuilder.matchEphemeralKey(key) ? ephemeralConsistencyService : persistentConsistencyService; }

由于线上集群模式都是CP模式,数据都会保存到磁盘当中,所有下面看看PersistentConsistencyService保存实例的实现逻辑,其实现类是:RaftConsistencyServiceImpl

RaftConsistencyServiceImpl.put(String key, Record value) public void put(String key, Record value) throws NacosException { try { // 触发发布服务信号 raftCore.signalPublish(key, value); } catch (Exception e) { Loggers.RAFT.error("Raft put failed.", e); throw new NacosException(NacosException.SERVER_ERROR, "Raft put failed, key:" + key + ", value:" + value); } } RaftConsistencyServiceImpl.signalPublish(String key, Record value) public void signalPublish(String key, Record value) throws Exception { // 如果自己不是leader节点,请求将会转发到leader节点 if (!isLeader()) { JSONObject params = new JSONObject(); params.put("key", key); params.put("value", value); Map parameters = new HashMap(1); parameters.put("key", key); // 内部使用的是http协议转发请求 raftProxy.proxyPostLarge(getLeader().ip, API_PUB, params.toJSONString(), parameters); return; } try { OPERATE_LOCK.lock(); long start = System.currentTimeMillis(); final Datum datum = new Datum(); datum.key = key; datum.value = value; // timestamp记录的是服务被操作的次数,相当于版本号 if (getDatum(key) == null) { datum.timestamp.set(1L); } else { // 操作一次版本号+1 datum.timestamp.set(getDatum(key).timestamp.incrementAndGet()); } JSONObject json = new JSONObject(); json.put("datum", datum); json.put("source", peers.local()); // leader节点发布服务 onPublish(datum, peers.local()); final String content = JSON.toJSONString(json); final CountDownLatch latch = new CountDownLatch(peers.majorityCount()); // 发布服务后,同步到所有非leader节点 for (final String server : peers.allServersIncludeMyself()) { if (isLeader(server)) { latch.countDown(); continue; } final String url = buildURL(server, API_ON_PUB); HttpClient.asyncHttpPostLarge(url, Arrays.asList("key=" + key), content, new AsyncCompletionHandler() { @Override public Integer onCompleted(Response response) throws Exception { if (response.getStatusCode() != HttpURLConnection.HTTP_OK) { Loggers.RAFT.warn("[RAFT] failed to publish data to peer, datumId={}, peer={}, http code={}", datum.key, server, response.getStatusCode()); return 1; } latch.countDown(); return 0; } @Override public STATE onContentWriteCompleted() { return STATE.CONTINUE; } }); } // 等待所有从节点更新服务成功,超时抛异常 if (!latch.await(UtilsAndCommons.RAFT_PUBLISH_TIMEOUT, TimeUnit.MILLISECONDS)) { // only majority servers return success can we consider this update success Loggers.RAFT.info("data publish failed, caused failed to notify majority, key={}", key); throw new IllegalStateException("data publish failed, caused failed to notify majority, key=" + key); } long end = System.currentTimeMillis(); Loggers.RAFT.info("signalPublish cost {} ms, key: {}", (end - start), key); } finally { OPERATE_LOCK.unlock(); } } RaftCore.onPublish(Datum datum, RaftPeer source) public void onPublish(Datum datum, RaftPeer source) throws Exception { // 校验逻辑 RaftPeer local = peers.local(); if (datum.value == null) { Loggers.RAFT.warn("received empty datum"); throw new IllegalStateException("received empty datum"); } if (!peers.isLeader(source.ip)) { Loggers.RAFT.warn("peer {} tried to publish data but wasn't leader, leader: {}", JSON.toJSONString(source), JSON.toJSONString(getLeader())); throw new IllegalStateException("peer(" + source.ip + ") tried to publish " + "data but wasn't leader"); } if (source.term.get() < local.term.get()) { Loggers.RAFT.warn("out of date publish, pub-term: {}, cur-term: {}", JSON.toJSONString(source), JSON.toJSONString(local)); throw new IllegalStateException("out of date publish, pub-term:" + source.term.get() + ", cur-term: " + local.term.get()); } // 重新设置leader持续的时间 local.resetLeaderDue(); // if data should be persistent, usually this is always true: if (KeyBuilder.matchPersistentKey(datum.key)) { // 数据持久化到本地磁盘 raftStore.write(datum); } datums.put(datum.key, datum); if (isLeader()) { local.term.addAndGet(PUBLISH_TERM_INCREASE_COUNT); } else { if (local.term.get() + PUBLISH_TERM_INCREASE_COUNT > source.term.get()) { //set leader term: getLeader().term.set(source.term.get()); local.term.set(getLeader().term.get()); } else { local.term.addAndGet(PUBLISH_TERM_INCREASE_COUNT); } } // 更新leader的term数 +1 raftStore.updateTerm(local.term.get()); // 发布事件通知,注意这个作用是作为业务逻辑的拓展,不是主从同步, 如果没有注册业务实现的listener,将不做任何事情。 notifier.addTask(datum, ApplyAction.CHANGE); Loggers.RAFT.info("data added/updated, key={}, term={}", datum.key, local.term); }

 

 



【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3