• <noscript id="e0iig"><kbd id="e0iig"></kbd></noscript>
  • <td id="e0iig"></td>
  • <option id="e0iig"></option>
  • <noscript id="e0iig"><source id="e0iig"></source></noscript>
  • Eureka Server源碼詳細分析

    標簽: Spring Cloud  Eureka Server源碼  Eureka源碼

    一,開啟Eureka Server

    我們要開啟Eureka Server首先需要在啟動類上面加@EnableEurekaServer注解

    @Target(ElementType.TYPE)
    @Retention(RetentionPolicy.RUNTIME)
    @Documented
    @Import(EurekaServerMarkerConfiguration.class)
    public @interface EnableEurekaServer {
    
    }
    

    可以看到@EnableEurekaServer注解目的是導入EurekaServerMarkerConfiguration類,繼續看此類:

    @Configuration(proxyBeanMethods = false)
    public class EurekaServerMarkerConfiguration {
    
    	@Bean
    	public Marker eurekaServerMarkerBean() {
    		return new Marker();
    	}
    
    	class Marker {
    
    	}
    
    }
    

    EurekaServerMarkerConfiguration類只是向spring容器中注入一個標記類Marker,這里先記著,下面馬上就要用到,繼續來看eureka server包下面的META-INF/spring.factories文件:

    org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
    org.springframework.cloud.netflix.eureka.server.EurekaServerAutoConfiguration
    

    啟動時會自動加載:EurekaServerAutoConfiguration來向spring容器中添加eureka-server相關功能的bean。

    @Configuration(proxyBeanMethods = false)
    @Import(EurekaServerInitializerConfiguration.class)
    @ConditionalOnBean(EurekaServerMarkerConfiguration.Marker.class)
    @EnableConfigurationProperties({ EurekaDashboardProperties.class,
    		InstanceRegistryProperties.class })
    @PropertySource("classpath:/eureka/server.properties")
    public class EurekaServerAutoConfiguration implements WebMvcConfigurer {
    
    	// 省略部分代碼
    
        // 加載EurekaController,SpringCloud 提供了一些額外的接口,用來獲取eurekaServer的信息
    	@Bean
    	@ConditionalOnProperty(prefix = "eureka.dashboard", name = "enabled",
    			matchIfMissing = true)
    	public EurekaController eurekaController() {
    		return new EurekaController(this.applicationInfoManager);
    	}
    
        // 接收客戶端的注冊等請求就是通過InstanceRegistry來處理的,是真正處理業務的類
    	@Bean
    	public PeerAwareInstanceRegistry peerAwareInstanceRegistry(
    			ServerCodecs serverCodecs) {
    		this.eurekaClient.getApplications(); // force initialization
    		return new InstanceRegistry(this.eurekaServerConfig, this.eurekaClientConfig,
    				serverCodecs, this.eurekaClient,
    				this.instanceRegistryProperties.getExpectedNumberOfClientsSendingRenews(),
    				this.instanceRegistryProperties.getDefaultOpenForTrafficCount());
    	}
    
        // 配置服務節點信息,這里的作用主要是為了配置Eureka的peer節點,也就是說當有收到有節點注冊上來的時候,需要通知給哪些節點
    	@Bean
    	@ConditionalOnMissingBean
    	public PeerEurekaNodes peerEurekaNodes(PeerAwareInstanceRegistry registry,
    			ServerCodecs serverCodecs,
    			ReplicationClientAdditionalFilters replicationClientAdditionalFilters) {
    		return new RefreshablePeerEurekaNodes(registry, this.eurekaServerConfig,
    				this.eurekaClientConfig, serverCodecs, this.applicationInfoManager,
    				replicationClientAdditionalFilters);
    	}
    
        // EurekaServer的上下文
    	@Bean
    	@ConditionalOnMissingBean
    	public EurekaServerContext eurekaServerContext(ServerCodecs serverCodecs,
    			PeerAwareInstanceRegistry registry, PeerEurekaNodes peerEurekaNodes) {
    		return new DefaultEurekaServerContext(this.eurekaServerConfig, serverCodecs,
    				registry, peerEurekaNodes, this.applicationInfoManager);
    	}
    
        // 初始化Eureka-server,會同步其他注冊中心的數據到當前注冊中心
    	@Bean
    	public EurekaServerBootstrap eurekaServerBootstrap(PeerAwareInstanceRegistry registry,
    			EurekaServerContext serverContext) {
    		return new EurekaServerBootstrap(this.applicationInfoManager,
    				this.eurekaClientConfig, this.eurekaServerConfig, registry,
    				serverContext);
    	}
    
    	// eureka-server使用了Jersey實現 對外的 restFull接口
    	@Bean
    	public FilterRegistrationBean<?> jerseyFilterRegistration(
    			javax.ws.rs.core.Application eurekaJerseyApp) {
    		FilterRegistrationBean<Filter> bean = new FilterRegistrationBean<Filter>();
    		bean.setFilter(new ServletContainer(eurekaJerseyApp));
    		bean.setOrder(Ordered.LOWEST_PRECEDENCE);
    		bean.setUrlPatterns(
    				Collections.singletonList(EurekaConstants.DEFAULT_PREFIX + "/*"));
    
    		return bean;
    	}
    
    	// 添加一些過濾器,類似于過濾請求地址,Path類似于@RequestMapping,Provider類似于@Controller
    	@Bean
    	public javax.ws.rs.core.Application jerseyApplication(Environment environment,
    			ResourceLoader resourceLoader) {
    
    		ClassPathScanningCandidateComponentProvider provider = new ClassPathScanningCandidateComponentProvider(
    				false, environment);
    
    		// Filter to include only classes that have a particular annotation.
    		//
    		provider.addIncludeFilter(new AnnotationTypeFilter(Path.class));
    		provider.addIncludeFilter(new AnnotationTypeFilter(Provider.class));
    
    		//省略部分代碼
    	}
    
    	//省略部分代碼
    

    通過@ConditionalOnBean(EurekaServerMarkerConfiguration.Marker.class)可以看到EurekaServerAutoConfiguration被注冊為Spring Bean的前提是在Spring容器中存在EurekaServerMarkerConfiguration.Marker.class的對象,而這個對象正是我們上面通過@EnableEurekaServer注解導入的。

    二,啟動Eureka Server

    EurekaServerAutoConfiguration類上面有注解@Import(EurekaServerInitializerConfiguration.class)導入了EurekaServerInitializerConfiguration類:

    @Configuration(proxyBeanMethods = false)
    public class EurekaServerInitializerConfiguration
    		implements ServletContextAware, SmartLifecycle, Ordered {
    
    	// 此處省略部分代碼
        
    	@Override
    	public void start() {
    		new Thread(() -> {
    			try {
    				// 初始化EurekaServer,同時啟動Eureka Server ,后面著重講這里
    				eurekaServerBootstrap.contextInitialized(
    						EurekaServerInitializerConfiguration.this.servletContext);
    				log.info("Started Eureka Server");
    				// 告訴client,可以來注冊了
    				publish(new EurekaRegistryAvailableEvent(getEurekaServerConfig()));
                    // 設置啟動的狀態為true
    				EurekaServerInitializerConfiguration.this.running = true;
    				publish(new EurekaServerStartedEvent(getEurekaServerConfig()));
    			}
    			catch (Exception ex) {
    				// Help!
    				log.error("Could not initialize Eureka servlet context", ex);
    			}
    		}).start();
    	}
        
        //此處省略部分代碼
    }
    

    此類實現了SmartLifecycle接口,也就是說在spring容器啟動完成之后會回調到start()方法,開啟一個線程來完成啟動Eureka Server。接下來走進eurekaServerBootstrap.contextInitialized(EurekaServerInitializerConfiguration.this.servletContext);方法:

    public class EurekaServerBootstrap {
    
    	// 省略部分代碼
    
    	public void contextInitialized(ServletContext context) {
    		try {
                // 初始化Eureka的環境變量
    			initEurekaEnvironment();
                // 初始化Eureka的上下文
    			initEurekaServerContext();
    
    			context.setAttribute(EurekaServerContext.class.getName(), this.serverContext);
    		}
    		catch (Throwable e) {
    			log.error("Cannot bootstrap eureka server :", e);
    			throw new RuntimeException("Cannot bootstrap eureka server :", e);
    		}
    	}
    
    	protected void initEurekaServerContext() throws Exception {
    		// For backward compatibility
    		JsonXStream.getInstance().registerConverter(new V1AwareInstanceInfoConverter(),
    				XStream.PRIORITY_VERY_HIGH);
    		XmlXStream.getInstance().registerConverter(new V1AwareInstanceInfoConverter(),
    				XStream.PRIORITY_VERY_HIGH);
    
    		if (isAws(this.applicationInfoManager.getInfo())) {
    			this.awsBinder = new AwsBinderDelegate(this.eurekaServerConfig,
    					this.eurekaClientConfig, this.registry, this.applicationInfoManager);
    			this.awsBinder.start();
    		}
    
    		EurekaServerContextHolder.initialize(this.serverContext);
    
    		log.info("Initialized server context");
    
    		// 從相鄰的eureka 節點復制注冊表,集群同步
    		int registryCount = this.registry.syncUp();
            // 默認每30秒發送心跳,1分鐘就是2次
            // 修改eureka狀態為up 
            // 同時,這里面會開啟一個定時任務,用于清理 60秒沒有心跳的客戶端,自動剔除。
    		this.registry.openForTraffic(this.applicationInfoManager, registryCount);
    
    		// Register all monitoring statistics.
    		EurekaMonitors.registerAllStats();
    	}
        
        // 省略部分代碼
    }
    

    上面兩個核心步驟:集群同步和服務剔除,后面我們會詳細分析。

    三,服務實例注冊表

    Eureka Server是圍繞注冊表管理的。有兩個InstanceRegistry

    在這里插入圖片描述

    • com.netflix.eureka.registry.InstanceRegistry是euraka server中注冊表管理的核心接口。職責是在內存中管理注冊到Eureka Server中的服務實例信息。實現類有PeerAwareInstanceRegistryImpl
    • org.springframework.cloud.netflix.eureka.server.InstanceRegistryPeerAwareInstanceRegistryImpl進行了繼承和擴展,使其適配Spring cloud的使用環境,主要的實現由PeerAwareInstanceRegistryImpl提供。
    • com.netflix.eureka.registry.InstanceRegistry extends LeaseManager<InstanceInfo>, LookupService<String> ,LeaseManager<InstanceInfo>是對注冊到server中的服務實例租約進行管理。LookupService是提供服務實例的檢索查詢功能。
    • LeaseManager<InstanceInfo>接口的作用是對注冊到Eureka Server中的服務實例租約進行管理,方法有:服務注冊,下線,續約,剔除。此接口管理的類目前是InstanceInfo。InstanceInfo代表服務實例信息。
    • PeerAwareInstanceRegistryImpl 增加了對peer節點的同步復制操作。使得eureka server集群中注冊表信息保持一致。

    四,接受服務注冊

    Eureka Client在發起服務注冊時會將自身的服務實例元數據封裝在InstanceInfo中,然后將InstanceInfo發送到Eureka Server。Eureka Server在接收到Eureka Client發送的InstanceInfo后將會嘗試將其放到本地注冊表中以供其他Eureka Client進行服務發現。

    EurekaServerAutoConfiguration中定義了 public FilterRegistrationBean jerseyFilterRegistration ,表名了 表明eureka-server使用了Jersey實現 對外的 restFull接口。注冊一個 Jerseyfilter ,配置好相應的Filterurl映射。

    com.netflix.eureka.resources包下,是Eureka Server對于Eureka client的REST請求的定義。看ApplicationResource類(這是一類請求,應用類的請求):

    @Produces({"application/xml", "application/json"})
    public class ApplicationResource {
        
        // 省略部分代碼
        
        private final PeerAwareInstanceRegistry registry;
        
        @POST
        @Consumes({"application/json", "application/xml"})
        public Response addInstance(InstanceInfo info,
                                    @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication) {
            logger.debug("Registering instance {} (replication={})", info.getId(), isReplication);
            // validate that the instanceinfo contains all the necessary required fields
            if (isBlank(info.getId())) {
                return Response.status(400).entity("Missing instanceId").build();
            } else if (isBlank(info.getHostName())) {
                return Response.status(400).entity("Missing hostname").build();
            } else if (isBlank(info.getIPAddr())) {
                return Response.status(400).entity("Missing ip address").build();
            } else if (isBlank(info.getAppName())) {
                return Response.status(400).entity("Missing appName").build();
            } else if (!appName.equals(info.getAppName())) {
                return Response.status(400).entity("Mismatched appName, expecting " + appName + " but was " + info.getAppName()).build();
            } else if (info.getDataCenterInfo() == null) {
                return Response.status(400).entity("Missing dataCenterInfo").build();
            } else if (info.getDataCenterInfo().getName() == null) {
                return Response.status(400).entity("Missing dataCenterInfo Name").build();
            }
    
            // handle cases where clients may be registering with bad DataCenterInfo with missing data
            DataCenterInfo dataCenterInfo = info.getDataCenterInfo();
            if (dataCenterInfo instanceof UniqueIdentifier) {
                String dataCenterInfoId = ((UniqueIdentifier) dataCenterInfo).getId();
                if (isBlank(dataCenterInfoId)) {
                    boolean experimental = "true".equalsIgnoreCase(serverConfig.getExperimental("registration.validation.dataCenterInfoId"));
                    if (experimental) {
                        String entity = "DataCenterInfo of type " + dataCenterInfo.getClass() + " must contain a valid id";
                        return Response.status(400).entity(entity).build();
                    } else if (dataCenterInfo instanceof AmazonInfo) {
                        AmazonInfo amazonInfo = (AmazonInfo) dataCenterInfo;
                        String effectiveId = amazonInfo.get(AmazonInfo.MetaDataKey.instanceId);
                        if (effectiveId == null) {
                            amazonInfo.getMetadata().put(AmazonInfo.MetaDataKey.instanceId.getName(), info.getId());
                        }
                    } else {
                        logger.warn("Registering DataCenterInfo of type {} without an appropriate id", dataCenterInfo.getClass());
                    }
                }
            }
    
            registry.register(info, "true".equals(isReplication));
            return Response.status(204).build();  // 204 to be backwards compatible
        }
        
        // 省略部分代碼
    }
    

    addInstance()方法用于接受服務注冊,進入PeerAwareInstanceRegistry的register方法:

    @Singleton
    public class PeerAwareInstanceRegistryImpl extends AbstractInstanceRegistry implements PeerAwareInstanceRegistry {
        
    	// 省略部分代碼
        
        @Override
        public void register(final InstanceInfo info, final boolean isReplication) {
            int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS;
            if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) {
                leaseDuration = info.getLeaseInfo().getDurationInSecs();
            }
            super.register(info, leaseDuration, isReplication);
            replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication);
        }
        
        // 省略部分代碼
    }
    

    調用到父類AbstractInstanceRegistryregister方法,跟進去:

    public abstract class AbstractInstanceRegistry implements InstanceRegistry {
        
        // 省略部分代碼
        
        private final ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>> registry
                = new ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>>();
        
        public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {
            try {
                read.lock();
                Map<String, Lease<InstanceInfo>> gMap = registry.get(registrant.getAppName());
                REGISTER.increment(isReplication);
                if (gMap == null) {
                    final ConcurrentHashMap<String, Lease<InstanceInfo>> gNewMap = new ConcurrentHashMap<String, Lease<InstanceInfo>>();
                    gMap = registry.putIfAbsent(registrant.getAppName(), gNewMap);
                    if (gMap == null) {
                        gMap = gNewMap;
                    }
                }
                
                // 省略部分代碼
                
            } finally {
                read.unlock();
            }
        }
    }
    

    register中,服務實例的InstanceInfo保存在Lease中,LeaseAbstractInstanceRegistry中統一通過ConcurrentHashMap保存在內存中。在服務注冊過程中,會先獲取一個讀鎖,防止其他線程對registry注冊表進行數據操作,避免數據的不一致。然后從resgitry查詢對應的InstanceInfo租約是否已經存在注冊表中,根據appName劃分服務集群,使用InstanceId唯一標記服務實例。如果租約存在,比較兩個租約中的InstanceInfo的最后更新時間lastDirtyTimestamp,保留時間戳大的服務實例信息InstanceInfo。如果租約不存在,意味這是一次全新的服務注冊,將會進行自我保護的統計,創建新的租約保存InstanceInfo。接著將租約放到resgitry注冊表中。
    之后將進行一系列緩存操作并根據覆蓋狀態規則設置服務實例的狀態,緩存操作包括將InstanceInfo加入用于統計Eureka Client增量式獲取注冊表信息的recentlyChangedQueue和失效responseCache中對應的緩存。最后設置服務實例租約的上線時間用于計算租約的有效時間,釋放讀鎖并完成服務注冊。

    五,接受心跳 續租,renew

    在Eureka Client完成服務注冊之后,它需要定時向Eureka Server發送心跳請求(默認30秒一次),維持自己在Eureka Server中租約的有效性。

    看另一類請求com.netflix.eureka.resources.InstanceResource

    @Produces({"application/xml", "application/json"})
    public class InstanceResource {
        
        // 省略部分代碼
        
    	private final PeerAwareInstanceRegistry registry;
         @PUT
        public Response renewLease(
                @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication,
                @QueryParam("overriddenstatus") String overriddenStatus,
                @QueryParam("status") String status,
                @QueryParam("lastDirtyTimestamp") String lastDirtyTimestamp) {
            boolean isFromReplicaNode = "true".equals(isReplication);
            boolean isSuccess = registry.renew(app.getName(), id, isFromReplicaNode);
    
            // Not found in the registry, immediately ask for a register
            if (!isSuccess) {
                logger.warn("Not Found (Renew): {} - {}", app.getName(), id);
                return Response.status(Status.NOT_FOUND).build();
            }
            // Check if we need to sync based on dirty time stamp, the client
            // instance might have changed some value
            Response response;
            if (lastDirtyTimestamp != null && serverConfig.shouldSyncWhenTimestampDiffers()) {
                response = this.validateDirtyTimestamp(Long.valueOf(lastDirtyTimestamp), isFromReplicaNode);
                // Store the overridden status since the validation found out the node that replicates wins
                if (response.getStatus() == Response.Status.NOT_FOUND.getStatusCode()
                        && (overriddenStatus != null)
                        && !(InstanceStatus.UNKNOWN.name().equals(overriddenStatus))
                        && isFromReplicaNode) {
                    registry.storeOverriddenStatusIfRequired(app.getAppName(), id, InstanceStatus.valueOf(overriddenStatus));
                }
            } else {
                response = Response.ok().build();
            }
            logger.debug("Found (Renew): {} - {}; reply status={}", app.getName(), id, response.getStatus());
            return response;
        }
        
        // 省略部分代碼
    }
    

    public Response renewLease()方法。看到一行boolean isSuccess = registry.renew(app.getName(), id, isFromReplicaNode);點擊renew的實現:

    public abstract class AbstractInstanceRegistry implements InstanceRegistry {
        
        //省略部分代碼
        
        private final ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>> registry
                = new ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>>();
        
        public boolean renew(String appName, String id, boolean isReplication) {
            RENEW.increment(isReplication);
            // 根據appName獲取服務集群的租約集合
            Map<String, Lease<InstanceInfo>> gMap = registry.get(appName);
            Lease<InstanceInfo> leaseToRenew = null;
            if (gMap != null) {
                leaseToRenew = gMap.get(id);
            }
            if (leaseToRenew == null) {
                RENEW_NOT_FOUND.increment(isReplication);
                logger.warn("DS: Registry: lease doesn't exist, registering resource: {} - {}", appName, id);
                return false;
            } else {
                InstanceInfo instanceInfo = leaseToRenew.getHolder();
                if (instanceInfo != null) {
                    // 查看服務實例狀態
                    InstanceStatus overriddenInstanceStatus = this.getOverriddenInstanceStatus(
                            instanceInfo, leaseToRenew, isReplication);
                    if (overriddenInstanceStatus == InstanceStatus.UNKNOWN) {
                        logger.info("Instance status UNKNOWN possibly due to deleted override for instance {}"
                                + "; re-register required", instanceInfo.getId());
                        RENEW_NOT_FOUND.increment(isReplication);
                        return false;
                    }
                    if (!instanceInfo.getStatus().equals(overriddenInstanceStatus)) {
                        logger.info(
                                "The instance status {} is different from overridden instance status {} for instance {}. "
                                        + "Hence setting the status to overridden status", instanceInfo.getStatus().name(),
                                        instanceInfo.getOverriddenStatus().name(),
                                        instanceInfo.getId());
                        instanceInfo.setStatusWithoutDirty(overriddenInstanceStatus);
    
                    }
                }
                // 統計每分鐘續租次數
                renewsLastMin.increment();
                // 更新租約更新租約
                leaseToRenew.renew();
                return true;
            }
        }
        
        // 省略部分代碼
    }
    

    此方法中不關注InstanceInfo,僅關注于租約本身以及租約的服務實例狀態。如果根據服務實例的appNameinstanceInfoId查詢出服務實例的租約,并且根據#getOverriddenInstanceStatus方法得到的instanceStatus不為InstanceStatus.UNKNOWN,那么更新租約中的有效時間,即更新租約Lease中的lastUpdateTimestamp,達到續約的目的;如果租約不存在,那么返回續租失敗的結果。

    六,服務剔除

    如果Eureka Client在注冊后,既沒有續約,也沒有下線(服務崩潰或者網絡異常等原因),那么服務的狀態就處于不可知的狀態,不能保證能夠從該服務實例中獲取到回饋,所以需要服務剔除此方法定時清理這些不穩定的服務。

    我們上面分析Eureka Server啟動的時候,EurekaServerInitializerConfiguration類的start()方法—>EurekaServerBootstrap類的initEurekaServerContext()方法—>PeerAwareInstanceRegistryImpl類的openForTraffic()方法—>AbstractInstanceRegistry類的postInit()方法,可以看到最后又回到了AbstractInstanceRegistry類里面。

    public abstract class AbstractInstanceRegistry implements InstanceRegistry {
    	protected void postInit() {
            renewsLastMin.start();
            if (evictionTaskRef.get() != null) {
                evictionTaskRef.get().cancel();
            }
            // 剔除是定時任務,默認60秒執行一次。延時60秒,間隔60秒
            evictionTaskRef.set(new EvictionTask());
            evictionTimer.schedule(evictionTaskRef.get(),
                    serverConfig.getEvictionIntervalTimerInMs(),
                    serverConfig.getEvictionIntervalTimerInMs());
        }
        
        // 定時任務
        class EvictionTask extends TimerTask {
    
            private final AtomicLong lastExecutionNanosRef = new AtomicLong(0l);
    
            @Override
            public void run() {
                try {
                    long compensationTimeMs = getCompensationTimeMs();
                    logger.info("Running the evict task with compensationTime {}ms", compensationTimeMs);
                    evict(compensationTimeMs);
                } catch (Throwable e) {
                    logger.error("Could not run the evict task", e);
                }
            }
        }
        
        // 剔除服務
        public void evict(long additionalLeaseMs) {
            logger.debug("Running the evict task");
    		// 判斷是否開啟自我保護,如果開啟自我保護,不剔除。
            if (!isLeaseExpirationEnabled()) {
                logger.debug("DS: lease expiration is currently disabled.");
                return;
            }
    
           	// 緊接著一個大的for循環,便利注冊表register,依次判斷租約是否過期。一次性獲取所有的過期租約。
            List<Lease<InstanceInfo>> expiredLeases = new ArrayList<>();
            for (Entry<String, Map<String, Lease<InstanceInfo>>> groupEntry : registry.entrySet()) {
                Map<String, Lease<InstanceInfo>> leaseMap = groupEntry.getValue();
                if (leaseMap != null) {
                    for (Entry<String, Lease<InstanceInfo>> leaseEntry : leaseMap.entrySet()) {
                        Lease<InstanceInfo> lease = leaseEntry.getValue();
                        if (lease.isExpired(additionalLeaseMs) && lease.getHolder() != null) {
                            expiredLeases.add(lease);
                        }
                    }
                }
            }
    
            // 獲取注冊表租約總數
            int registrySize = (int) getLocalRegistrySize();
            // 計算注冊表租約的閾值 (總數乘以 續租百分比),得出要續租的數量
            int registrySizeThreshold = (int) (registrySize * serverConfig.getRenewalPercentThreshold());
            // 總數減去要續租的數量,就是理論要剔除的數量
            int evictionLimit = registrySize - registrySizeThreshold;
    		//求 上面理論剔除數量,和過期租約總數的最小值。就是最終要提出的數量。
            int toEvict = Math.min(expiredLeases.size(), evictionLimit);
            if (toEvict > 0) {
                logger.info("Evicting {} items (expired={}, evictionLimit={})", toEvict, expiredLeases.size(), evictionLimit);
    
                Random random = new Random(System.currentTimeMillis());
                for (int i = 0; i < toEvict; i++) {
                    // Pick a random item (Knuth shuffle algorithm)
                    int next = i + random.nextInt(expiredLeases.size() - i);
                    Collections.swap(expiredLeases, i, next);
                    Lease<InstanceInfo> lease = expiredLeases.get(i);
    
                    String appName = lease.getHolder().getAppName();
                    String id = lease.getHolder().getId();
                    EXPIRED.increment();
                    logger.warn("DS: Registry: expired lease for {}/{}", appName, id);
                    //執行剔除
                    internalCancel(appName, id, false);
                }
            }
        }
            
    }
    
    1. 自我保護期間不清除。
    2. 分批次清除。
    3. 服務是逐個隨機剔除,剔除均勻分布在所有應用中,防止在同一時間內同一服務集群中的服務全部過期被剔除,造成在大量剔除服務時,并在進行自我保護時,促使程序崩潰。
    4. 剔除服務是個定時任務,用EvictionTask執行,默認60秒執行一次,延時60秒執行。定時剔除過期服務。
    5. 服務剔除將會遍歷registry注冊表,找出其中所有的過期租約,然后根據配置文件中續租百分比閥值和當前注冊表的租約總數量計算出最大允許的剔除租約的數量(當前注冊表中租約總數量減去當前注冊表租約閥值),分批次剔除過期的服務實例租約。對過期的服務實例租約調用AbstractInstanceRegistry#internalCancel服務下線的方法將其從注冊表中清除掉。

    自我保護機制:

    • 自我保護機制主要在Eureka ClientEureka Server之間存在網絡分區的情況下發揮保護作用,在服務器端和客戶端都有對應實現。假設在某種特定的情況下(如網絡故障),Eureka ClientEureka Server無法進行通信,此時Eureka Client無法向Eureka Server發起注冊和續約請求,Eureka Server中就可能因注冊表中的服務實例租約出現大量過期而面臨被剔除的危險,然而此時的Eureka Client可能是處于健康狀態的(可接受服務訪問),如果直接將注冊表中大量過期的服務實例租約剔除顯然是不合理的。
    • 針對這種情況,Eureka設計了“自我保護機制”。在Eureka Server處,如果出現大量的服務實例過期被剔除的現象,那么該Server節點將進入自我保護模式,保護注冊表中的信息不再被剔除,在通信穩定后再退出該模式;在Eureka Client處,如果向Eureka Server注冊失敗,將快速超時并嘗試與其他的Eureka Server進行通信。“自我保護機制”的設計大大提高了Eureka的可用性。

    七,服務下線

    Eureka Client在應用銷毀時,會向Eureka Server發送服務下線請求,清除注冊表中關于本應用的租約,避免無效的服務調用。在服務剔除的過程中,也是通過服務下線的邏輯完成對單個服務實例過期租約的清除工作。

    InstanceResource類中

    @Produces({"application/xml", "application/json"})
    public class InstanceResource {
    	@DELETE
        public Response cancelLease(
                @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication) {
            try {
                boolean isSuccess = registry.cancel(app.getName(), id,
                    "true".equals(isReplication));
    
                if (isSuccess) {
                    logger.debug("Found (Cancel): {} - {}", app.getName(), id);
                    return Response.ok().build();
                } else {
                    logger.info("Not Found (Cancel): {} - {}", app.getName(), id);
                    return Response.status(Status.NOT_FOUND).build();
                }
            } catch (Throwable e) {
                logger.error("Error (cancel): {} - {}", app.getName(), id, e);
                return Response.serverError().build();
            }
    
        }
    }
    

    boolean isSuccess = registry.cancel(app.getName(), id,"true".equals(isReplication));跟進去,又又進入到了AbstractInstanceRegistry類中:

    public abstract class AbstractInstanceRegistry implements InstanceRegistry {
        
        //省略部分代碼
        
    	@Override
        public boolean cancel(String appName, String id, boolean isReplication) {
            return internalCancel(appName, id, isReplication);
        }
        
        protected boolean internalCancel(String appName, String id, boolean isReplication) {
            try {
                // 先獲取讀鎖,防止被其他線程修改
                read.lock();
                CANCEL.increment(isReplication);
                // 根據appName獲取服務實力集群。
                Map<String, Lease<InstanceInfo>> gMap = registry.get(appName);
                Lease<InstanceInfo> leaseToCancel = null;
                // 在內存中取消實例 id的服務
                if (gMap != null) {
                    leaseToCancel = gMap.remove(id);
                }
                // 添加到最近下線服務的統計隊列
                recentCanceledQueue.add(new Pair<Long, String>(System.currentTimeMillis(), appName + "(" + id + ")"));
                InstanceStatus instanceStatus = overriddenInstanceStatusMap.remove(id);
                if (instanceStatus != null) {
                    logger.debug("Removed instance id {} from the overridden map which has value {}", id, instanceStatus.name());
                }
                // 判斷leaseToCancel是否為空,租約不存在,返回false
                if (leaseToCancel == null) {
                    CANCEL_NOT_FOUND.increment(isReplication);
                    logger.warn("DS: Registry: cancel failed because Lease is not registered for: {}/{}", appName, id);
                    return false;
                } else {
                    // 如果存在
                    // 設置租約下線時間
                    leaseToCancel.cancel();
                    // 獲取持有租約的服務信息
                    InstanceInfo instanceInfo = leaseToCancel.getHolder();
                    String vip = null;
                    String svip = null;
                    if (instanceInfo != null) {
                        //標記服務實例為instanceInfo.setActionType(ActionType.DELETED);
                        instanceInfo.setActionType(ActionType.DELETED);
                        // 添加到租約變更記錄隊列,用于eureka client的增量拉取注冊表信息。
                        recentlyChangedQueue.add(new RecentlyChangedItem(leaseToCancel));
                        instanceInfo.setLastUpdatedTimestamp();
                        vip = instanceInfo.getVIPAddress();
                        svip = instanceInfo.getSecureVipAddress();
                    }
                    invalidateCache(appName, vip, svip);
                    logger.info("Cancelled instance {}/{} (replication={})", appName, id, isReplication);
                }
            } finally {
                read.unlock();
            }
    
            synchronized (lock) {
                if (this.expectedNumberOfClientsSendingRenews > 0) {
                    // Since the client wants to cancel it, reduce the number of clients to send renews.
                    this.expectedNumberOfClientsSendingRenews = this.expectedNumberOfClientsSendingRenews - 1;
                    updateRenewsPerMinThreshold();
                }
            }
    
            return true;
        }
        
        // 省略部分代碼
    }
    

    首先通過registry根據服務名和服務實例id查詢關于服務實例的租約Lease是否存在,統計最近請求下線的服務實例用于Eureka Server主頁展示。如果租約不存在,返回下線失敗;如果租約存在,從registry注冊表中移除,設置租約的下線時間,同時在最近租約變更記錄隊列中添加新的下線記錄,以用于Eureka Client的增量式獲取注冊表信息。

    八,集群同步

    如果Eureka Server是通過集群的方式進行部署,那么為了維護整個集群中Eureka Server注冊表數據的一致性,勢必需要一個機制同步Eureka Server集群中的注冊表數據。

    Eureka Server集群同步包含兩個部分:

    • 一部分是Eureka Server在啟動過程中從它的peer節點中拉取注冊表信息,并將這些服務實例的信息注冊到本地注冊表中;
    • 另一部分是Eureka Server每次對本地注冊表進行操作時,同時會將操作同步到它的peer節點中,達到集群注冊表數據統一的目的。

    1,啟動拉取別的peer

    上面我們說到,在Eureka Server啟動類中:EurekaServerInitializerConfigurationstart()方法中—>eurekaServerBootstrap.contextInitialized()方法—>initEurekaServerContext()方法

    public class EurekaServerBootstrap {
    
    	// 省略部分代碼
    
    	public void contextInitialized(ServletContext context) {
    		try {
                // 初始化Eureka的環境變量
    			initEurekaEnvironment();
                // 初始化Eureka的上下文
    			initEurekaServerContext();
    
    			context.setAttribute(EurekaServerContext.class.getName(), this.serverContext);
    		}
    		catch (Throwable e) {
    			log.error("Cannot bootstrap eureka server :", e);
    			throw new RuntimeException("Cannot bootstrap eureka server :", e);
    		}
    	}
    
    	protected void initEurekaServerContext() throws Exception {
    		// For backward compatibility
    		JsonXStream.getInstance().registerConverter(new V1AwareInstanceInfoConverter(),
    				XStream.PRIORITY_VERY_HIGH);
    		XmlXStream.getInstance().registerConverter(new V1AwareInstanceInfoConverter(),
    				XStream.PRIORITY_VERY_HIGH);
    
    		if (isAws(this.applicationInfoManager.getInfo())) {
    			this.awsBinder = new AwsBinderDelegate(this.eurekaServerConfig,
    					this.eurekaClientConfig, this.registry, this.applicationInfoManager);
    			this.awsBinder.start();
    		}
    
    		EurekaServerContextHolder.initialize(this.serverContext);
    
    		log.info("Initialized server context");
    
    		// 從相鄰的eureka 節點復制注冊表,集群同步
    		int registryCount = this.registry.syncUp();
            // 默認每30秒發送心跳,1分鐘就是2次
            // 修改eureka狀態為up 
            // 同時,這里面會開啟一個定時任務,用于清理 60秒沒有心跳的客戶端,自動剔除。
    		this.registry.openForTraffic(this.applicationInfoManager, registryCount);
    
    		// Register all monitoring statistics.
    		EurekaMonitors.registerAllStats();
    	}
        
        // 省略部分代碼
    }
    

    int registryCount = this.registry.syncUp();集群同步,然后再跟進去:

    public class PeerAwareInstanceRegistryImpl extends AbstractInstanceRegistry implements PeerAwareInstanceRegistry{
        @Override
        public int syncUp() {
            // Copy entire entry from neighboring DS node
            int count = 0;
    		// 意思是,如果是i第一次進來,為0,不夠等待的代碼,直接執行下面的拉取服務實例。
            for (int i = 0; ((i < serverConfig.getRegistrySyncRetries()) && (count == 0)); i++) {
                if (i > 0) {
                    try {
                        Thread.sleep(serverConfig.getRegistrySyncRetryWaitMs());
                    } catch (InterruptedException e) {
                        logger.warn("Interrupted during registry transfer..");
                        break;
                    }
                }
                // 將自己作為一個eureka client,拉取注冊表。
                Applications apps = eurekaClient.getApplications();
                for (Application app : apps.getRegisteredApplications()) {
                    for (InstanceInfo instance : app.getInstances()) {
                        try {
                            if (isRegisterable(instance)) {
                                // 注冊到自身的注冊表中。
                                register(instance, instance.getLeaseInfo().getDurationInSecs(), true);
                                count++;
                            }
                        } catch (Throwable t) {
                            logger.error("During DS init copy", t);
                        }
                    }
                }
            }
            return count;
        }
    }
    

    Eureka Server也是一個Eureka Client,在啟動的時候也會進行DiscoveryClient的初始化,會從其對應的Eureka Server中拉取全量的注冊表信息。在Eureka Server集群部署的情況下,Eureka Server從它的peer節點中拉取到注冊表信息后,將遍歷這個Applications,將所有的服務實例通過AbstractRegistry#register方法注冊到自身注冊表中。

    int registryCount = this.registry.syncUp();// 集群同步
    this.registry.openForTraffic(this.applicationInfoManager, registryCount);// 定時服務剔除
    
    	@Override
        public void openForTraffic(ApplicationInfoManager applicationInfoManager, int count) {
            // 初始化期望client發送過來的服務數量,即上面獲取到的服務數量
            this.expectedNumberOfClientsSendingRenews = count;
            //計算自我保護的統計參數
            updateRenewsPerMinThreshold();
            logger.info("Got {} instances from neighboring DS node", count);
            logger.info("Renew threshold is: {}", numberOfRenewsPerMinThreshold);
            this.startupTime = System.currentTimeMillis();
            //   如果count=0,沒有拉取到注冊表信息,將此值設為true,表示其他peer來取空的實例信息,意味著,將不允許client從此server獲取注冊表信息。如果count>0,將此值設置為false,允許client來獲取注冊表。
            if (count > 0) {
                this.peerInstancesTransferEmptyOnStartup = false;
            }
            DataCenterInfo.Name selfName = applicationInfoManager.getInfo().getDataCenterInfo().getName();
            boolean isAws = Name.Amazon == selfName;
            if (isAws && serverConfig.shouldPrimeAwsReplicaConnections()) {
                logger.info("Priming AWS connections for all replicas..");
                primeAwsReplicas(applicationInfoManager);
            }
            logger.info("Changing status to UP");
            // 服務置為上線
            applicationInfoManager.setInstanceStatus(InstanceStatus.UP);
            // 開啟剔除的定時任務
            super.postInit();
        }
    
        protected void updateRenewsPerMinThreshold() {
            this.numberOfRenewsPerMinThreshold = (int) (this.expectedNumberOfClientsSendingRenews
                    * (60.0 / serverConfig.getExpectedClientRenewalIntervalSeconds())
                    * serverConfig.getRenewalPercentThreshold());
        }
    

    當執行完上面的syncUp邏輯后,在下面的openForTraffic,開啟此server接受別的client注冊,拉取注冊表等操作。而在它首次拉取其他peer節點時,是不允許client的通信請求的。

    Server的狀態不為UP時,將拒絕所有的請求。在Client請求獲取注冊表信息時,Server會判斷此時是否允許獲取注冊表中的信息。上述做法是為了避免Eureka Server#syncUp方法中沒有獲取到任何服務實例信息時(Eureka Server集群部署的情況下),Eureka Server注冊表中的信息影響到Eureka Client緩存的注冊表中的信息。因為是全量同步,如果server什么也沒同步過來,會導致client清空注冊表。導致服務調用出問題。

    2,Server之間注冊表信息的同步復制

    為了保證Eureka Server集群運行時注冊表信息的一致性,每個Eureka Server在對本地注冊表進行管理操作時,會將相應的操作同步到所有peer節點中。

    在外部調用server的restful方法時,在com.netflix.eureka.resources包下的ApplicationResource資源中,查看每個服務的操作。比如服務注冊public Response addInstance()方法,在PeerAwareInstanceRegistryImpl類中,看其他操作,cancelrenew等中都有replicateToPeers()方法:

    public class PeerAwareInstanceRegistryImpl extends AbstractInstanceRegistry implements PeerAwareInstanceRegistry {
        
        // 省略部分代碼
        
        // 下線
       @Override
        public boolean cancel(final String appName, final String id,
                              final boolean isReplication) {
            if (super.cancel(appName, id, isReplication)) {
                replicateToPeers(Action.Cancel, appName, id, null, null, isReplication);
    
                return true;
            }
            return false;
        }
        // 注冊
        @Override
        public void register(final InstanceInfo info, final boolean isReplication) {
            int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS;
            if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) {
                leaseDuration = info.getLeaseInfo().getDurationInSecs();
            }
            super.register(info, leaseDuration, isReplication);
            replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication);
        }
        // 續約
        public boolean renew(final String appName, final String id, final boolean isReplication) {
            if (super.renew(appName, id, isReplication)) {
                replicateToPeers(Action.Heartbeat, appName, id, null, null, isReplication);
                return true;
            }
            return false;
        }
        
        // 省略部分代碼
    }
    

    都有replicateToPeers()方法,它將遍歷Eureka Server中peer節點,向每個peer節點發送同步請求。

    //它將遍歷Eureka Server中peer節點,向每個peer節點發送同步請求
    private void replicateToPeers(Action action, String appName, String id,
                                      InstanceInfo info /* optional */,
                                      InstanceStatus newStatus /* optional */, boolean isReplication) {
            Stopwatch tracer = action.getTimer().start();
            try {
                if (isReplication) {
                    numberOfReplicationsLastMin.increment();
                }
                // If it is a replication already, do not replicate again as this will create a poison replication
                if (peerEurekaNodes == Collections.EMPTY_LIST || isReplication) {
                    return;
                }
    
                for (final PeerEurekaNode node : peerEurekaNodes.getPeerEurekaNodes()) {
                    // If the url represents this host, do not replicate to yourself.
                    if (peerEurekaNodes.isThisMyUrl(node.getServiceUrl())) {
                        continue;
                    }
                    replicateInstanceActionsToPeers(action, appName, id, info, newStatus, node);
                }
            } finally {
                tracer.stop();
            }
        }
    
     
    private void replicateInstanceActionsToPeers(Action action, String appName,
                                                     String id, InstanceInfo info, InstanceStatus newStatus,
                                                     PeerEurekaNode node) {
            try {
                InstanceInfo infoFromRegistry;
                CurrentRequestVersion.set(Version.V2);
                switch (action) {
                    case Cancel:
                        node.cancel(appName, id);
                        break;
                    case Heartbeat:
                        InstanceStatus overriddenStatus = overriddenInstanceStatusMap.get(id);
                        infoFromRegistry = getInstanceByAppAndId(appName, id, false);
                        node.heartbeat(appName, id, infoFromRegistry, overriddenStatus, false);
                        break;
                    case Register:
                        node.register(info);
                        break;
                    case StatusUpdate:
                        infoFromRegistry = getInstanceByAppAndId(appName, id, false);
                        node.statusUpdate(appName, id, newStatus, infoFromRegistry);
                        break;
                    case DeleteStatusOverride:
                        infoFromRegistry = getInstanceByAppAndId(appName, id, false);
                        node.deleteStatusOverride(appName, id, infoFromRegistry);
                        break;
                }
            } catch (Throwable t) {
                logger.error("Cannot replicate information to {} for action {}", node.getServiceUrl(), action.name(), t);
            } finally {
                CurrentRequestVersion.remove();
            }
        }
    

    replicateInstanceActionsToPeers方法中,類PeerEurekaNode的實例node的各種方法,cancelregister,等,用了batchingDispatcher.process(),作用是將同一時間段內,相同服務實例的相同操作將使用相同的任務編號,在進行同步復制的時候,將根據任務編號合并操作,減少同步操作的數量和網絡消耗,但是同時也造成了同步復制的延時性,不滿足CAP中的C(強一致性)。

    所以Eureka,只滿足AP。

    通過Eureka Server在啟動過程中初始化本地注冊表信息和Eureka Server集群間的同步復制操作,最終達到了集群中Eureka Server注冊表信息一致的目的。

    九,獲取注冊表中服務實例信息

    Eureka Server中獲取注冊表的服務實例信息主要通過兩個方法實現:

    • AbstractInstanceRegistry#getApplicationsFromMultipleRegions從多地區獲取全量注冊表數據。
    • AbstractInstanceRegistry#getApplicationDeltasFromMultipleRegions從多地區獲取增量式注冊表數據。
    public abstract class AbstractInstanceRegistry implements InstanceRegistry {
        
        // 省略部分源碼
        
    	private final ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>> registry
                = new ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>>();
        
        // 從多地區獲取全量注冊表數據
        public Applications getApplicationsFromMultipleRegions(String[] remoteRegions) {
    
            boolean includeRemoteRegion = null != remoteRegions && remoteRegions.length != 0;
    
            logger.debug("Fetching applications registry with remote regions: {}, Regions argument {}",
                    includeRemoteRegion, remoteRegions);
    
            if (includeRemoteRegion) {
                GET_ALL_WITH_REMOTE_REGIONS_CACHE_MISS.increment();
            } else {
                GET_ALL_CACHE_MISS.increment();
            }
            Applications apps = new Applications();
            apps.setVersion(1L);
            for (Entry<String, Map<String, Lease<InstanceInfo>>> entry : registry.entrySet()) {
                Application app = null;
    
                if (entry.getValue() != null) {
                    for (Entry<String, Lease<InstanceInfo>> stringLeaseEntry : entry.getValue().entrySet()) {
                        Lease<InstanceInfo> lease = stringLeaseEntry.getValue();
                        if (app == null) {
                            app = new Application(lease.getHolder().getAppName());
                        }
                        app.addInstance(decorateInstanceInfo(lease));
                    }
                }
                if (app != null) {
                    apps.addApplication(app);
                }
            }
            if (includeRemoteRegion) {
                for (String remoteRegion : remoteRegions) {
                    RemoteRegionRegistry remoteRegistry = regionNameVSRemoteRegistry.get(remoteRegion);
                    if (null != remoteRegistry) {
                        Applications remoteApps = remoteRegistry.getApplications();
                        for (Application application : remoteApps.getRegisteredApplications()) {
                            if (shouldFetchFromRemoteRegistry(application.getName(), remoteRegion)) {
                                logger.info("Application {}  fetched from the remote region {}",
                                        application.getName(), remoteRegion);
    
                                Application appInstanceTillNow = apps.getRegisteredApplications(application.getName());
                                if (appInstanceTillNow == null) {
                                    appInstanceTillNow = new Application(application.getName());
                                    apps.addApplication(appInstanceTillNow);
                                }
                                for (InstanceInfo instanceInfo : application.getInstances()) {
                                    appInstanceTillNow.addInstance(instanceInfo);
                                }
                            } else {
                                logger.debug("Application {} not fetched from the remote region {} as there exists a "
                                                + "whitelist and this app is not in the whitelist.",
                                        application.getName(), remoteRegion);
                            }
                        }
                    } else {
                        logger.warn("No remote registry available for the remote region {}", remoteRegion);
                    }
                }
            }
            apps.setAppsHashCode(apps.getReconcileHashCode());
            return apps;
        }
        
        
        // 從多地區獲取增量式注冊表數據
        public Applications getApplicationDeltasFromMultipleRegions(String[] remoteRegions) {
            if (null == remoteRegions) {
                remoteRegions = allKnownRemoteRegions; // null means all remote regions.
            }
    
            boolean includeRemoteRegion = remoteRegions.length != 0;
    
            if (includeRemoteRegion) {
                GET_ALL_WITH_REMOTE_REGIONS_CACHE_MISS_DELTA.increment();
            } else {
                GET_ALL_CACHE_MISS_DELTA.increment();
            }
    
            Applications apps = new Applications();
            apps.setVersion(responseCache.getVersionDeltaWithRegions().get());
            Map<String, Application> applicationInstancesMap = new HashMap<String, Application>();
            try {
                write.lock();
                Iterator<RecentlyChangedItem> iter = this.recentlyChangedQueue.iterator();
                logger.debug("The number of elements in the delta queue is :{}", this.recentlyChangedQueue.size());
                while (iter.hasNext()) {
                    Lease<InstanceInfo> lease = iter.next().getLeaseInfo();
                    InstanceInfo instanceInfo = lease.getHolder();
                    logger.debug("The instance id {} is found with status {} and actiontype {}",
                            instanceInfo.getId(), instanceInfo.getStatus().name(), instanceInfo.getActionType().name());
                    Application app = applicationInstancesMap.get(instanceInfo.getAppName());
                    if (app == null) {
                        app = new Application(instanceInfo.getAppName());
                        applicationInstancesMap.put(instanceInfo.getAppName(), app);
                        apps.addApplication(app);
                    }
                    app.addInstance(new InstanceInfo(decorateInstanceInfo(lease)));
                }
    
                if (includeRemoteRegion) {
                    for (String remoteRegion : remoteRegions) {
                        RemoteRegionRegistry remoteRegistry = regionNameVSRemoteRegistry.get(remoteRegion);
                        if (null != remoteRegistry) {
                            Applications remoteAppsDelta = remoteRegistry.getApplicationDeltas();
                            if (null != remoteAppsDelta) {
                                for (Application application : remoteAppsDelta.getRegisteredApplications()) {
                                    if (shouldFetchFromRemoteRegistry(application.getName(), remoteRegion)) {
                                        Application appInstanceTillNow =
                                                apps.getRegisteredApplications(application.getName());
                                        if (appInstanceTillNow == null) {
                                            appInstanceTillNow = new Application(application.getName());
                                            apps.addApplication(appInstanceTillNow);
                                        }
                                        for (InstanceInfo instanceInfo : application.getInstances()) {
                                            appInstanceTillNow.addInstance(new InstanceInfo(instanceInfo));
                                        }
                                    }
                                }
                            }
                        }
                    }
                }
    
                Applications allApps = getApplicationsFromMultipleRegions(remoteRegions);
                apps.setAppsHashCode(allApps.getReconcileHashCode());
                return apps;
            } finally {
                write.unlock();
            }
        }
        
        // 省略部分源碼
    }
    
    1. 全量:上面講到從節點復制注冊信息的時候,用方法public int syncUp() ,一行Applications apps = eurekaClient.getApplications();點進去實現類,有一行getApplicationsFromAllRemoteRegions(); 下面getApplicationsFromMultipleRegions,作用從多個地區中獲取全量注冊表信息,并封裝成Applications返回,它首先會將本地注冊表registry中的所有服務實例信息提取出來封裝到Applications中,再根據是否需要拉取Region的注冊信息,將遠程拉取過來的Application放到上面的Applications中。最后得到一個全量的Applications

    2. 增量:在前面提到接受服務注冊,接受心跳等方法中,都有recentlyChangedQueue.add(new RecentlyChangedItem(lease));作用是將新變動的服務放到最近變化的服務實例信息隊列中,用于記錄增量是注冊表信息。getApplicationDeltasFromMultipleRegions,實現了從遠處eureka server中獲取增量式注冊表信息的能力。在EurekaServer對外restful中,在com.netflix.eureka.resources下,
      @GET
      public Response getApplication(@PathParam(“version”) String version,
      @HeaderParam(“Accept”) final String acceptHeader,
      @HeaderParam(EurekaAccept.HTTP_X_EUREKA_ACCEPT) String eurekaAccept) {

      其中有一句:String payLoad = responseCache.get(cacheKey);responseCache初始化的時候,它的構造方法ResponseCacheImpl(EurekaServerConfig serverConfig, ServerCodecs serverCodecs, AbstractInstanceRegistry registry) {中,Value value = generatePayload(key);點進去有一句:registry.getApplicationDeltasFromMultipleRegions(key.getRegions()));從遠程獲取delta增量注冊信息。但是這個只是向client提供,不向server提供,因為server可以通過每次變更自動同步到peer。

      獲取增量式注冊表信息將會從recentlyChangedQueue中獲取最近變化的服務實例信息。recentlyChangedQueue中統計了近3分鐘內進行注冊、修改和剔除的服務實例信息,在服務注冊AbstractInstanceRegistry#registry、接受心跳請求AbstractInstanceRegistry#renew和服務下線AbstractInstanceRegistry#internalCancel等方法中均可見到recentlyChangedQueue對這些服務實例進行登記,用于記錄增量式注冊表信息。#getApplicationsFromMultipleRegions方法同樣提供了從遠程Region的Eureka Server獲取增量式注冊表信息的能力。

    版權聲明:本文為u013277209原創文章,遵循 CC 4.0 BY-SA 版權協議,轉載請附上原文出處鏈接和本聲明。
    本文鏈接:https://blog.csdn.net/u013277209/article/details/109625336

    智能推薦

    SpringCloud Eureka 源碼分析

    目錄 目錄 SpringCloud-Eureka 整合項目 Eureka架構圖 關鍵概念 Region區域 Zone可用區 租約lease Eureka Server Application Service Application Client SpringCloud啟動Eureka 的過程 EnableEurekaServer注解 SpringCloud與jersey Rest框架 jersey...

    SpringCloud Eureka源碼分析

    Eureka核心功能點 服務注冊(register):Eureka Client會通過發送REST請求的方式向Eureka Server注冊自己的服務,提供自身的元數 據,比如ip地址、端口、運行狀況指標的url、主頁地址等信息。Eureka Server接收到注冊請求后,就會把這些元數 據信息存儲在一個雙層的Map中。   服務續約(renew):在服務注冊后,Eureka Clien...

    SpringCloud Eureka源碼分析

    老規矩,先找XXXAutoConfiguration。 EurekaClientAutoConfiguration 這個里面主要看這幾個bean: EurekaAutoServiceRegistration EurekaClient ApplicationInfoManager EurekaRegistration EurekaClient : 這里初始化的是CloudEurekaClient,看...

    Eureka源碼分析之eureka-client

        很多人對Eureka進行了源碼分析,但是主要著重于Eureka server端的源碼分析,本篇博文著重分析eureka-client的分析,先上圖看類結構(首先聲明以下內容為本人淺顯見解,如有不妥請指正批評) eureka客戶端核心jar包為以上截圖,其中核心的類包為截圖選中的com.netflix.discovery包。 com.netflix.discovery包主...

    LiveData詳細分析

    目錄介紹 01.LiveData是什么東西 02.使用LiveData的優勢 03.使用LiveData的步驟 04.簡單使用LiveData 05.observe()和observerForever() 06.LiveData原理介紹 07.observe訂閱源碼分析 08.setValue發送源碼分析 09.observeForever源碼 10.LiveData源碼總結 00.使用LiveD...

    猜你喜歡

    HTML中常用操作關于:頁面跳轉,空格

    1.頁面跳轉 2.空格的代替符...

    freemarker + ItextRender 根據模板生成PDF文件

    1. 制作模板 2. 獲取模板,并將所獲取的數據加載生成html文件 2. 生成PDF文件 其中由兩個地方需要注意,都是關于獲取文件路徑的問題,由于項目部署的時候是打包成jar包形式,所以在開發過程中時直接安照傳統的獲取方法沒有一點文件,但是當打包后部署,總是出錯。于是參考網上文章,先將文件讀出來到項目的臨時目錄下,然后再按正常方式加載該臨時文件; 還有一個問題至今沒有解決,就是關于生成PDF文件...

    電腦空間不夠了?教你一個小秒招快速清理 Docker 占用的磁盤空間!

    Docker 很占用空間,每當我們運行容器、拉取鏡像、部署應用、構建自己的鏡像時,我們的磁盤空間會被大量占用。 如果你也被這個問題所困擾,咱們就一起看一下 Docker 是如何使用磁盤空間的,以及如何回收。 docker 占用的空間可以通過下面的命令查看: TYPE 列出了docker 使用磁盤的 4 種類型: Images:所有鏡像占用的空間,包括拉取下來的鏡像,和本地構建的。 Con...

    requests實現全自動PPT模板

    http://www.1ppt.com/moban/ 可以免費的下載PPT模板,當然如果要人工一個個下,還是挺麻煩的,我們可以利用requests輕松下載 訪問這個主頁,我們可以看到下面的樣式 點每一個PPT模板的圖片,我們可以進入到詳細的信息頁面,翻到下面,我們可以看到對應的下載地址 點擊這個下載的按鈕,我們便可以下載對應的PPT壓縮包 那我們就開始做吧 首先,查看網頁的源代碼,我們可以看到每一...

    精品国产乱码久久久久久蜜桃不卡