• <noscript id="e0iig"><kbd id="e0iig"></kbd></noscript>
  • <td id="e0iig"></td>
  • <option id="e0iig"></option>
  • <noscript id="e0iig"><source id="e0iig"></source></noscript>
  • Eureka Client源碼詳細分析

    標簽: Spring Cloud  Eureka Client源碼  Eureka Client

    一,Eureka Client概述

    1,Eureka Client工作流程圖

    在這里插入圖片描述

    2,@EnableDiscoveryClient注解

    @SpringBootApplication
    @EnableDiscoveryClient
    public class EurekaClientApplication {
    
        public static void main(String[] args) {
            SpringApplication.run(EurekaClientApplication.class, args);
        }
    
    }
    

    注意:Eureka Client在1版本是必須要加@EnableDiscoveryClient注解的,2版本@EnableDiscoveryClient注解加不加都沒有影響。

    3,EurekaClientConfig和EurekaInstanceConfig

    @Configuration(proxyBeanMethods = false)
    @EnableConfigurationProperties
    @ConditionalOnClass(EurekaClientConfig.class)
    @ConditionalOnProperty(value = "eureka.client.enabled", matchIfMissing = true)
    @ConditionalOnDiscoveryEnabled
    @AutoConfigureBefore({ NoopDiscoveryClientAutoConfiguration.class,
    		CommonsClientAutoConfiguration.class, ServiceRegistryAutoConfiguration.class })
    @AutoConfigureAfter(name = {
    		"org.springframework.cloud.netflix.eureka.config.DiscoveryClientOptionalArgsConfiguration",
    		"org.springframework.cloud.autoconfigure.RefreshAutoConfiguration",
    		"org.springframework.cloud.netflix.eureka.EurekaDiscoveryClientConfiguration",
    		"org.springframework.cloud.client.serviceregistry.AutoServiceRegistrationAutoConfiguration" })
    public class EurekaClientAutoConfiguration {
        
        //省略部分代碼
        
    	@Bean
    	@ConditionalOnMissingBean(value = EurekaClientConfig.class,
    			search = SearchStrategy.CURRENT)
    	public EurekaClientConfigBean eurekaClientConfigBean(ConfigurableEnvironment env) {
    		return new EurekaClientConfigBean();
    	}
        
        @Bean
    	@ConditionalOnMissingBean(value = EurekaInstanceConfig.class,
    			search = SearchStrategy.CURRENT)
    	public EurekaInstanceConfigBean eurekaInstanceConfigBean(InetUtils inetUtils,
    			ManagementMetadataProvider managementMetadataProvider) {
    		String hostname = getProperty("eureka.instance.hostname");
    		boolean preferIpAddress = Boolean
    				.parseBoolean(getProperty("eureka.instance.prefer-ip-address"));
    		String ipAddress = getProperty("eureka.instance.ip-address");
    		boolean isSecurePortEnabled = Boolean
    				.parseBoolean(getProperty("eureka.instance.secure-port-enabled"));
    
    		String serverContextPath = env.getProperty("server.servlet.context-path", "/");
    		int serverPort = Integer.parseInt(
    				env.getProperty("server.port", env.getProperty("port", "8080")));
    
    		Integer managementPort = env.getProperty("management.server.port", Integer.class);
    		String managementContextPath = env
    				.getProperty("management.server.servlet.context-path");
    		Integer jmxPort = env.getProperty("com.sun.management.jmxremote.port",
    				Integer.class);
    		EurekaInstanceConfigBean instance = new EurekaInstanceConfigBean(inetUtils);
    
    		instance.setNonSecurePort(serverPort);
    		instance.setInstanceId(getDefaultInstanceId(env));
    		instance.setPreferIpAddress(preferIpAddress);
    		instance.setSecurePortEnabled(isSecurePortEnabled);
    		if (StringUtils.hasText(ipAddress)) {
    			instance.setIpAddress(ipAddress);
    		}
    
    		if (isSecurePortEnabled) {
    			instance.setSecurePort(serverPort);
    		}
    
    		if (StringUtils.hasText(hostname)) {
    			instance.setHostname(hostname);
    		}
    		String statusPageUrlPath = getProperty("eureka.instance.status-page-url-path");
    		String healthCheckUrlPath = getProperty("eureka.instance.health-check-url-path");
    
    		if (StringUtils.hasText(statusPageUrlPath)) {
    			instance.setStatusPageUrlPath(statusPageUrlPath);
    		}
    		if (StringUtils.hasText(healthCheckUrlPath)) {
    			instance.setHealthCheckUrlPath(healthCheckUrlPath);
    		}
    
    		ManagementMetadata metadata = managementMetadataProvider.get(instance, serverPort,
    				serverContextPath, managementContextPath, managementPort);
    
    		if (metadata != null) {
    			instance.setStatusPageUrl(metadata.getStatusPageUrl());
    			instance.setHealthCheckUrl(metadata.getHealthCheckUrl());
    			if (instance.isSecurePortEnabled()) {
    				instance.setSecureHealthCheckUrl(metadata.getSecureHealthCheckUrl());
    			}
    			Map<String, String> metadataMap = instance.getMetadataMap();
    			metadataMap.computeIfAbsent("management.port",
    					k -> String.valueOf(metadata.getManagementPort()));
    		}
    		else {
    			// without the metadata the status and health check URLs will not be set
    			// and the status page and health check url paths will not include the
    			// context path so set them here
    			if (StringUtils.hasText(managementContextPath)) {
    				instance.setHealthCheckUrlPath(
    						managementContextPath + instance.getHealthCheckUrlPath());
    				instance.setStatusPageUrlPath(
    						managementContextPath + instance.getStatusPageUrlPath());
    			}
    		}
    
    		setupJmxPort(instance, jmxPort);
    		return instance;
    	}
        
        //省略部分代碼
    }
    

    EurekaClientConfig

    EurekaClientConfig是個接口,其實現類是EurekaClientConfigBean,此類封裝了以"eureka.client"開頭的配置信息如:application.yaml

    eureka:
      client:
        # Eureka服務器的地址,類型為HashMap,缺省的Key為 defaultZone;缺省的Value為 http://localhost:8761/eureka
        # 如果服務注冊中心為高可用集群時,多個注冊中心地址以逗號分隔。
        service-url:
          defaultZone: http://euk-server1:7001/eureka/
        # 是否向注冊中心注冊自己,缺省:true
        register-with-eureka: true
        # 是否從Eureka獲取注冊信息,缺省:true
        fetch-registry: true
        # 客戶端拉取服務注冊信息間隔,單位:秒,缺省:30
        registry-fetch-interval-seconds: 30
        # 是否啟用客戶端健康檢查
        healthcheck:
          enabled: true
        # 連接Eureka服務器的超時時間,單位:秒,缺省:5
        eureka-server-connect-timeout-seconds: 5
        # 從Eureka服務器讀取信息的超時時間,單位:秒,缺省:8
        eureka-server-read-timeout-seconds: 8
        # 獲取實例時是否只保留狀態為 UP 的實例,缺省:true
        filter-only-up-instances: true
        # Eureka服務端連接空閑時的關閉時間,單位:秒,缺省:30
        eureka-connection-idle-timeout-seconds: 30
        # 從Eureka客戶端到所有Eureka服務端的連接總數,缺省:200
        eureka-server-total-connections: 200
        # 從Eureka客戶端到每個Eureka服務主機的連接總數,缺省:50
        eureka-server-total-connections-per-host: 50
    

    EurekaInstanceConfig

    EurekaInstanceConfig是個接口,其實現類是EurekaInstanceConfigBean,此類封裝了以"eureka.instance"開頭的配置信息,主要用于構建InstanceInfo。如:application.yaml

    eureka:
      instance:
        # 應用實例主機名
        hostname: euk-client1
        # 服務名,默認取 spring.application.name 配置值,如果沒有則為 unknown
        appname: EurekaClient
        # 實例ID
        instance-id: EurekaClientInstance1
        # 服務失效時間,失效的服務將被剔除。單位:秒,默認:90
        lease-expiration-duration-in-seconds: 90
        # 服務續約(心跳)頻率,單位:秒,缺省30
        lease-renewal-interval-in-seconds: 30
    

    4,InstanceInfo

    InstanceInfo封裝了將被發送到Eureka Server進行注冊的服務實例元數據。它在Eureka Server列表中代表一個服務實例,其他服務可以通過instanceInfo了解到該服務的實例相關信息,包括地址等,從而發起請求。

    public class InstanceInfo {
        
        //省略部分代碼
        
        public InstanceInfo(
                @JsonProperty("instanceId") String instanceId,
                @JsonProperty("app") String appName,
                @JsonProperty("appGroupName") String appGroupName,
                @JsonProperty("ipAddr") String ipAddr,
                @JsonProperty("sid") String sid,
                @JsonProperty("port") PortWrapper port,
                @JsonProperty("securePort") PortWrapper securePort,
                @JsonProperty("homePageUrl") String homePageUrl,
                @JsonProperty("statusPageUrl") String statusPageUrl,
                @JsonProperty("healthCheckUrl") String healthCheckUrl,
                @JsonProperty("secureHealthCheckUrl") String secureHealthCheckUrl,
                @JsonProperty("vipAddress") String vipAddress,
                @JsonProperty("secureVipAddress") String secureVipAddress,
                @JsonProperty("countryId") int countryId,
                @JsonProperty("dataCenterInfo") DataCenterInfo dataCenterInfo,
                @JsonProperty("hostName") String hostName,
                @JsonProperty("status") InstanceStatus status,
                @JsonProperty("overriddenstatus") InstanceStatus overriddenStatus,
                @JsonProperty("overriddenStatus") InstanceStatus overriddenStatusAlt,
                @JsonProperty("leaseInfo") LeaseInfo leaseInfo,
                @JsonProperty("isCoordinatingDiscoveryServer") Boolean isCoordinatingDiscoveryServer,
                @JsonProperty("metadata") HashMap<String, String> metadata,
                @JsonProperty("lastUpdatedTimestamp") Long lastUpdatedTimestamp,
                @JsonProperty("lastDirtyTimestamp") Long lastDirtyTimestamp,
                @JsonProperty("actionType") ActionType actionType,
                @JsonProperty("asgName") String asgName) {
            this.instanceId = instanceId;
            this.sid = sid;
            this.appName = StringCache.intern(appName);
            this.appGroupName = StringCache.intern(appGroupName);
            this.ipAddr = ipAddr;
            this.port = port == null ? 0 : port.getPort();
            this.isUnsecurePortEnabled = port != null && port.isEnabled();
            this.securePort = securePort == null ? 0 : securePort.getPort();
            this.isSecurePortEnabled = securePort != null && securePort.isEnabled();
            this.homePageUrl = homePageUrl;
            this.statusPageUrl = statusPageUrl;
            this.healthCheckUrl = healthCheckUrl;
            this.secureHealthCheckUrl = secureHealthCheckUrl;
            this.vipAddress = StringCache.intern(vipAddress);
            this.secureVipAddress = StringCache.intern(secureVipAddress);
            this.countryId = countryId;
            this.dataCenterInfo = dataCenterInfo;
            this.hostName = hostName;
            this.status = status;
            this.overriddenStatus = overriddenStatus == null ? overriddenStatusAlt : overriddenStatus;
            this.leaseInfo = leaseInfo;
            this.isCoordinatingDiscoveryServer = isCoordinatingDiscoveryServer;
            this.lastUpdatedTimestamp = lastUpdatedTimestamp;
            this.lastDirtyTimestamp = lastDirtyTimestamp;
            this.actionType = actionType;
            this.asgName = StringCache.intern(asgName);
    
            // ---------------------------------------------------------------
            // for compatibility
    
            if (metadata == null) {
                this.metadata = Collections.emptyMap();
            } else if (metadata.size() == 1) {
                this.metadata = removeMetadataMapLegacyValues(metadata);
            } else {
                this.metadata = metadata;
            }
    
            if (sid == null) {
                this.sid = SID_DEFAULT;
            }
        }
        
        //省略部分代碼
    }
    

    ApplicationInfoManager中有一個方法:

    public class ApplicationInfoManager {
        
        //省略部分代碼
        
    	public void initComponent(EurekaInstanceConfig config) {
            try {
                this.config = config;
                this.instanceInfo = new EurekaConfigBasedInstanceInfoProvider(config).get();
            } catch (Throwable e) {
                throw new RuntimeException("Failed to initialize ApplicationInfoManager", e);
            }
        }
        
        //省略部分代碼
    }
    

    通過EurekaInstanceConfig構造instanceInfo

    5,服務發現

    DiscoveryClient是在spring cloud中定義的,用于服務發現的頂級接口:

    在這里插入圖片描述

    package org.springframework.cloud.client.discovery;
    public interface DiscoveryClient extends Ordered {
    
        // 獲取實現類的描述。
    	String description();
    
    	// 通過服務id查詢服務實例信息列表。
    	List<ServiceInstance> getInstances(String serviceId);
    
    	// 獲取所有服務實例id
    	List<String> getServices();
    
    }
    

    spring-cloud-commons-2.2.1.realease包下,org.springframework.cloud.client.discovery.DiscoveryClient接口。定義用來服務發現的客戶端接口,是客戶端進行服務發現的核心接口,在common中可以看到其地位。在Netflix Eureka和Consul中都有具體的實現類。

    我們來看Spring Cloud中Eureka的實現:

    package org.springframework.cloud.netflix.eureka;
    public class EurekaDiscoveryClient implements DiscoveryClient {
    
    	/**
    	 * Client description {@link String}.
    	 */
    	public static final String DESCRIPTION = "Spring Cloud Eureka Discovery Client";
    
    	private final EurekaClient eurekaClient;
    
    	private final EurekaClientConfig clientConfig;
    
    	@Deprecated
    	public EurekaDiscoveryClient(EurekaInstanceConfig config, EurekaClient eurekaClient) {
    		this(eurekaClient, eurekaClient.getEurekaClientConfig());
    	}
    
    	public EurekaDiscoveryClient(EurekaClient eurekaClient,
    			EurekaClientConfig clientConfig) {
    		this.clientConfig = clientConfig;
    		this.eurekaClient = eurekaClient;
    	}
    
    	@Override
    	public String description() {
    		return DESCRIPTION;
    	}
    
    	@Override
    	public List<ServiceInstance> getInstances(String serviceId) {
    		List<InstanceInfo> infos = this.eurekaClient.getInstancesByVipAddress(serviceId,
    				false);
    		List<ServiceInstance> instances = new ArrayList<>();
    		for (InstanceInfo info : infos) {
    			instances.add(new EurekaServiceInstance(info));
    		}
    		return instances;
    	}
    
    	@Override
    	public List<String> getServices() {
    		Applications applications = this.eurekaClient.getApplications();
    		if (applications == null) {
    			return Collections.emptyList();
    		}
    		List<Application> registered = applications.getRegisteredApplications();
    		List<String> names = new ArrayList<>();
    		for (Application app : registered) {
    			if (app.getInstances().isEmpty()) {
    				continue;
    			}
    			names.add(app.getName().toLowerCase());
    
    		}
    		return names;
    	}
    
    	@Override
    	public int getOrder() {
    		return clientConfig instanceof Ordered ? ((Ordered) clientConfig).getOrder()
    				: DiscoveryClient.DEFAULT_ORDER;
    	}
    
    	/**
    	 * An Eureka-specific {@link ServiceInstance} implementation. Extends
    	 * {@link org.springframework.cloud.netflix.eureka.EurekaServiceInstance} for
    	 * backwards compatibility.
    	 *
    	 * @deprecated In favor of
    	 * {@link org.springframework.cloud.netflix.eureka.EurekaServiceInstance}.
    	 */
    	@Deprecated
    	public static class EurekaServiceInstance
    			extends org.springframework.cloud.netflix.eureka.EurekaServiceInstance {
    
    		public EurekaServiceInstance(InstanceInfo instance) {
    			super(instance);
    		}
    
    	}
    
    }
    

    EurekaDiscoveryClient類實現了DiscoveryClient接口,類的邏輯是通過組合了com.netflix.discovery.EurekaClient來實現。也就是說spring cloud中通過組合了netflixeureka-client包來實現服務發現的。

    下面我們來走進netflix的eureka-client包下看EurekaClient接口以及它的實現類DiscoveryClient

    package com.netflix.discovery;
    public class DiscoveryClient implements EurekaClient {
        
        // 省略部分代碼
        
        // 服務注冊
        boolean register() throws Throwable {
            logger.info(PREFIX + "{}: registering service...", appPathIdentifier);
            EurekaHttpResponse<Void> httpResponse;
            try {
                httpResponse = eurekaTransport.registrationClient.register(instanceInfo);
            } catch (Exception e) {
                logger.warn(PREFIX + "{} - registration failed {}", appPathIdentifier, e.getMessage(), e);
                throw e;
            }
            if (logger.isInfoEnabled()) {
                logger.info(PREFIX + "{} - registration status: {}", appPathIdentifier, httpResponse.getStatusCode());
            }
            return httpResponse.getStatusCode() == Status.NO_CONTENT.getStatusCode();
        }
    
        // 服務續約
        boolean renew() {
            EurekaHttpResponse<InstanceInfo> httpResponse;
            try {
                httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);
                logger.debug(PREFIX + "{} - Heartbeat status: {}", appPathIdentifier, httpResponse.getStatusCode());
                if (httpResponse.getStatusCode() == Status.NOT_FOUND.getStatusCode()) {
                    REREGISTER_COUNTER.increment();
                    logger.info(PREFIX + "{} - Re-registering apps/{}", appPathIdentifier, instanceInfo.getAppName());
                    long timestamp = instanceInfo.setIsDirtyWithTime();
                    boolean success = register();
                    if (success) {
                        instanceInfo.unsetIsDirty(timestamp);
                    }
                    return success;
                }
                return httpResponse.getStatusCode() == Status.OK.getStatusCode();
            } catch (Throwable e) {
                logger.error(PREFIX + "{} - was unable to send heartbeat!", appPathIdentifier, e);
                return false;
            }
        }
        
        // 服務下線
        @PreDestroy
        @Override
        public synchronized void shutdown() {
            if (isShutdown.compareAndSet(false, true)) {
                logger.info("Shutting down DiscoveryClient ...");
    
                if (statusChangeListener != null && applicationInfoManager != null) {
                    applicationInfoManager.unregisterStatusChangeListener(statusChangeListener.getId());
                }
    
                cancelScheduledTasks();
    
                // If APPINFO was registered
                if (applicationInfoManager != null
                        && clientConfig.shouldRegisterWithEureka()
                        && clientConfig.shouldUnregisterOnShutdown()) {
                    applicationInfoManager.setInstanceStatus(InstanceStatus.DOWN);
                    unregister();
                }
    
                if (eurekaTransport != null) {
                    eurekaTransport.shutdown();
                }
    
                heartbeatStalenessMonitor.shutdown();
                registryStalenessMonitor.shutdown();
    
                Monitors.unregisterObject(this);
    
                logger.info("Completed shut down of DiscoveryClient");
            }
        }
        
        // 省略部分代碼
    }
    

    EurekaClient有一個注解@ImplementedBy(DiscoveryClient.class),此類的默認實現類:com.netflix.discovery.DiscoveryClient。提供了:

    • 服務注冊到server方法register()。
    • 續約boolean renew()。
    • 下線public synchronized void shutdown()。
    • 查詢服務列表 等功能。

    此類DiscoveryClient正是上面spring cloud包下EurekaDiscoveryClient類成員變量EurekaClient接口的具體實現。

    EurekaClient繼承了LookupService

    LookupService作用:發現活躍的服務實例。

    package com.netflix.discovery.shared;
    public interface LookupService<T> {
    
        // 根據服務實例注冊的appName來獲取封裝有相同appName的服務實例信息容器
        Application getApplication(String appName);
    
        // 獲取所有的服務實例信息
        Applications getApplications();
    
        //根據實例id,獲取服務實例信息
        List<InstanceInfo> getInstancesById(String id);
    }
    
    • 上面提到一個Application,它持有服務實例信息列表。它是同一個服務的集群信息。比如EurekaClient1的所有服務信息,這些服務都在EurekaClient1服務名下面。
    • Applications是注冊表中,所有服務實例信息的集合。
    • InstanceInfo代表一個服務實例的信息。

    二,Eureka Client啟動

    我們回到啟動類EurekaClientAutoConfiguration

    @Configuration(proxyBeanMethods = false)
    @EnableConfigurationProperties
    @ConditionalOnClass(EurekaClientConfig.class)
    @ConditionalOnProperty(value = "eureka.client.enabled", matchIfMissing = true)
    @ConditionalOnDiscoveryEnabled
    @AutoConfigureBefore({ NoopDiscoveryClientAutoConfiguration.class,
    		CommonsClientAutoConfiguration.class, ServiceRegistryAutoConfiguration.class })
    @AutoConfigureAfter(name = {
    		"org.springframework.cloud.netflix.eureka.config.DiscoveryClientOptionalArgsConfiguration",
    		"org.springframework.cloud.autoconfigure.RefreshAutoConfiguration",
    		"org.springframework.cloud.netflix.eureka.EurekaDiscoveryClientConfiguration",
    		"org.springframework.cloud.client.serviceregistry.AutoServiceRegistrationAutoConfiguration" })
    public class EurekaClientAutoConfiguration {
    	@Configuration(proxyBeanMethods = false)
    	@ConditionalOnMissingRefreshScope
    	protected static class EurekaClientConfiguration {
    
    		@Autowired
    		private ApplicationContext context;
    
    		@Autowired
    		private AbstractDiscoveryClientOptionalArgs<?> optionalArgs;
    
    		@Bean(destroyMethod = "shutdown")
    		@ConditionalOnMissingBean(value = EurekaClient.class,
    				search = SearchStrategy.CURRENT)
    		public EurekaClient eurekaClient(ApplicationInfoManager manager,
    				EurekaClientConfig config) {
    			return new CloudEurekaClient(manager, config, this.optionalArgs,
    					this.context);
    		}
            
            // 省略部分代碼
        }
        
        //省略部分代碼
    }
    

    向spring容器注入了CloudEurekaClient,我們看其構造函數:

    public class CloudEurekaClient extends DiscoveryClient {
        public CloudEurekaClient(ApplicationInfoManager applicationInfoManager,
    			EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs<?> args,
    			ApplicationEventPublisher publisher) {
            // 調用父類構造函數
    		super(applicationInfoManager, config, args);
    		this.applicationInfoManager = applicationInfoManager;
    		this.publisher = publisher;
    		this.eurekaTransportField = ReflectionUtils.findField(DiscoveryClient.class,
    				"eurekaTransport");
    		ReflectionUtils.makeAccessible(this.eurekaTransportField);
    	}
    }
    

    看其父類DiscoveryClient構造函數:

    public class DiscoveryClient implements EurekaClient {
    
    	// 省略部分代碼
    
    	@Inject
        DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args,
                        Provider<BackupRegistry> backupRegistryProvider, EndpointRandomizer endpointRandomizer) {
            if (args != null) {
                this.healthCheckHandlerProvider = args.healthCheckHandlerProvider;
                this.healthCheckCallbackProvider = args.healthCheckCallbackProvider;
                this.eventListeners.addAll(args.getEventListeners());
                this.preRegistrationHandler = args.preRegistrationHandler;
            } else {
                this.healthCheckCallbackProvider = null;
                this.healthCheckHandlerProvider = null;
                this.preRegistrationHandler = null;
            }
            
            this.applicationInfoManager = applicationInfoManager;
            InstanceInfo myInfo = applicationInfoManager.getInfo();
    
            clientConfig = config;
            staticClientConfig = clientConfig;
            transportConfig = config.getTransportConfig();
            instanceInfo = myInfo;
            if (myInfo != null) {
                appPathIdentifier = instanceInfo.getAppName() + "/" + instanceInfo.getId();
            } else {
                logger.warn("Setting instanceInfo to a passed in null value");
            }
    
            this.backupRegistryProvider = backupRegistryProvider;
            this.endpointRandomizer = endpointRandomizer;
            this.urlRandomizer = new EndpointUtils.InstanceInfoBasedUrlRandomizer(instanceInfo);
            localRegionApps.set(new Applications());
    
            fetchRegistryGeneration = new AtomicLong(0);
    
            remoteRegionsToFetch = new AtomicReference<String>(clientConfig.fetchRegistryForRemoteRegions());
            remoteRegionsRef = new AtomicReference<>(remoteRegionsToFetch.get() == null ? null : remoteRegionsToFetch.get().split(","));
    
            // shouldFetchRegistry,點其實現類EurekaClientConfigBean,找到它其實對應于:
            // eureka.client.fetch-register,true:表示client從server拉取注冊表信息。
            if (config.shouldFetchRegistry()) {
                this.registryStalenessMonitor = new ThresholdLevelsMetric(this, METRIC_REGISTRY_PREFIX + "lastUpdateSec_", new long[]{15L, 30L, 60L, 120L, 240L, 480L});
            } else {
                this.registryStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC;
            }
    
            if (config.shouldRegisterWithEureka()) {
                this.heartbeatStalenessMonitor = new ThresholdLevelsMetric(this, METRIC_REGISTRATION_PREFIX + "lastHeartbeatSec_", new long[]{15L, 30L, 60L, 120L, 240L, 480L});
            } else {
                this.heartbeatStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC;
            }
    
            logger.info("Initializing Eureka in region {}", clientConfig.getRegion());
    
            // 如果以下兩個都為false,則直接返回,構造方法執行結束,既不服務注冊,也不服務發現。
            if (!config.shouldRegisterWithEureka() && !config.shouldFetchRegistry()) {
                logger.info("Client configured to neither register nor query for data.");
                scheduler = null;
                heartbeatExecutor = null;
                cacheRefreshExecutor = null;
                eurekaTransport = null;
                instanceRegionChecker = new InstanceRegionChecker(new PropertyBasedAzToRegionMapper(config), clientConfig.getRegion());
    
                DiscoveryManager.getInstance().setDiscoveryClient(this);
                DiscoveryManager.getInstance().setEurekaClientConfig(config);
    
                initTimestampMs = System.currentTimeMillis();
                initRegistrySize = this.getApplications().size();
                registrySize = initRegistrySize;
                logger.info("Discovery Client initialized at timestamp {} with initial instances count: {}",
                        initTimestampMs, initRegistrySize);
    
                return;  // no need to setup up an network tasks and we are done
            }
    
            try {
                /**
                 * 兩個定時任務
                 */
                // 定義了一個基于線程池的定時器線程池,大小為2。
                scheduler = Executors.newScheduledThreadPool(2,
                        new ThreadFactoryBuilder()
                                .setNameFormat("DiscoveryClient-%d")
                                .setDaemon(true)
                                .build());
    			// 用于發送心跳
                heartbeatExecutor = new ThreadPoolExecutor(
                        1, clientConfig.getHeartbeatExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
                        new SynchronousQueue<Runnable>(),
                        new ThreadFactoryBuilder()
                                .setNameFormat("DiscoveryClient-HeartbeatExecutor-%d")
                                .setDaemon(true)
                                .build()
                );  // use direct handoff
    
                // 用于刷新緩存
                cacheRefreshExecutor = new ThreadPoolExecutor(
                        1, clientConfig.getCacheRefreshExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
                        new SynchronousQueue<Runnable>(),
                        new ThreadFactoryBuilder()
                                .setNameFormat("DiscoveryClient-CacheRefreshExecutor-%d")
                                .setDaemon(true)
                                .build()
                );  // use direct handoff
    
                /**
                 * client和server交互的Jersey客戶端
                 */
                
                // 它是eureka Client和eureka server進行http交互jersey客戶端。
                // 點開EurekaTransport,看到許多httpclient相關的屬性。
                eurekaTransport = new EurekaTransport();
                scheduleServerEndpointTask(eurekaTransport, args);
    
                AzToRegionMapper azToRegionMapper;
                if (clientConfig.shouldUseDnsForFetchingServiceUrls()) {
                    azToRegionMapper = new DNSBasedAzToRegionMapper(clientConfig);
                } else {
                    azToRegionMapper = new PropertyBasedAzToRegionMapper(clientConfig);
                }
                if (null != remoteRegionsToFetch.get()) {
                    azToRegionMapper.setRegionsToFetch(remoteRegionsToFetch.get().split(","));
                }
                instanceRegionChecker = new InstanceRegionChecker(azToRegionMapper, clientConfig.getRegion());
            } catch (Throwable e) {
                throw new RuntimeException("Failed to initialize DiscoveryClient!", e);
            }
    		
            /**
             * 拉取注冊信息
             */
            // eureka.client.fetch-register,true:表示client從server拉取注冊表信息。
            if (clientConfig.shouldFetchRegistry()) {
                try {
                    // 從eureka server拉取注冊表中的信息。
                    boolean primaryFetchRegistryResult = fetchRegistry(false);
                    if (!primaryFetchRegistryResult) {
                        logger.info("Initial registry fetch from primary servers failed");
                    }
                    boolean backupFetchRegistryResult = true;
                    // fetchRegistryFromBackup():將注冊表緩存到本地,可以就近獲取其他服務信息,減少于server的交互。
                    if (!primaryFetchRegistryResult && !fetchRegistryFromBackup()) {
                        backupFetchRegistryResult = false;
                        logger.info("Initial registry fetch from backup servers failed");
                    }
                    if (!primaryFetchRegistryResult && !backupFetchRegistryResult && clientConfig.shouldEnforceFetchRegistryAtInit()) {
                        throw new IllegalStateException("Fetch registry error at startup. Initial fetch failed.");
                    }
                } catch (Throwable th) {
                    logger.error("Fetch registry error at startup: {}", th.getMessage());
                    throw new IllegalStateException(th);
                }
            }
    
            if (this.preRegistrationHandler != null) {
                this.preRegistrationHandler.beforeRegistration();
            }
    
            /**
             * 服務注冊
             */
            if (clientConfig.shouldRegisterWithEureka() && clientConfig.shouldEnforceRegistrationAtInit()) {
                try {
                    // 注冊失敗拋異常。
                    if (!register() ) {
                        throw new IllegalStateException("Registration error at startup. Invalid server response.");
                    }
                } catch (Throwable th) {
                    logger.error("Registration error at startup: {}", th.getMessage());
                    throw new IllegalStateException(th);
                }
            }
    
            /**
             * 啟動定時任務
             */
            // 此方法中,啟動3個定時任務。方法內有statusChangeListener,
            // 按需注冊是一個事件StatusChangeEvent,狀態改變,則向server注冊。
            initScheduledTasks();
    
            try {
                Monitors.registerObject(this);
            } catch (Throwable e) {
                logger.warn("Cannot register timers", e);
            }
    
            // This is a bit of hack to allow for existing code using DiscoveryManager.getInstance()
            // to work with DI'd DiscoveryClient
            DiscoveryManager.getInstance().setDiscoveryClient(this);
            DiscoveryManager.getInstance().setEurekaClientConfig(config);
    
            initTimestampMs = System.currentTimeMillis();
            initRegistrySize = this.getApplications().size();
            registrySize = initRegistrySize;
            logger.info("Discovery Client initialized at timestamp {} with initial instances count: {}",
                    initTimestampMs, initRegistrySize);
        }
        
        // 省略部分代碼
    }
    

    此函數中依次執行了 從eureka server中拉取注冊表,服務注冊,初始化發送心跳,緩存刷新(定時拉取注冊表信息),按需注冊定時任務等,貫穿了Eureka Client啟動階段的各項工作。

    總結DiscoveryClient構造關鍵過程:

    1. 初始化一堆信息。
    2. 拉取注冊表信息。
    3. 向server注冊自己。
    4. 初始化3個任務。

    下面我們來詳細看一下這幾個過程:

    1,拉取注冊表信息

    上面的fetchRegistry(false),點進去:

    public class DiscoveryClient implements EurekaClient {
        
        // 省略部分代碼
        
        //拉取服務信息
        private boolean fetchRegistry(boolean forceFullRegistryFetch) {
            Stopwatch tracer = FETCH_REGISTRY_TIMER.start();
    
            try {
                Applications applications = getApplications();
                // 如果增量式拉取被禁止或第一次拉取注冊表,則進行全量拉取
                if (clientConfig.shouldDisableDelta()
                        || (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress()))
                        || forceFullRegistryFetch
                        || (applications == null)
                        || (applications.getRegisteredApplications().size() == 0)
                        || (applications.getVersion() == -1)) //Client application does not have latest library supporting delta
                {
                    // 全量拉去
                    getAndStoreFullRegistry();
                } else {
                    // 否則進行增量拉取注冊表信息
                    getAndUpdateDelta(applications);
                }
               
            } catch (Throwable e) {
                return false;
            } finally {
                if (tracer != null) {
                    tracer.stop();
                }
            }
    
            // 在更新實例遠程狀態之前通知有關緩存刷新的信息
            onCacheRefreshed();
            // 根據緩存中保存的刷新數據更新遠程狀態
            updateInstanceRemoteStatus();
            // 拉取成功,返回true
            return true;
        }
        
        // 全量拉取
        private void getAndStoreFullRegistry() throws Throwable {
            long currentUpdateGeneration = fetchRegistryGeneration.get();
    
            logger.info("Getting all instance registry info from the eureka server");
    
            Applications apps = null;
            EurekaHttpResponse<Applications> httpResponse = clientConfig.getRegistryRefreshSingleVipAddress() == null
                    ? eurekaTransport.queryClient.getApplications(remoteRegionsRef.get())
                    : eurekaTransport.queryClient.getVip(clientConfig.getRegistryRefreshSingleVipAddress(), remoteRegionsRef.get());
            if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {
                apps = httpResponse.getEntity();
            }
            logger.info("The response status is {}", httpResponse.getStatusCode());
    
            if (apps == null) {
                logger.error("The application is null for some reason. Not storing this information");
            } else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {
                localRegionApps.set(this.filterAndShuffle(apps));
                logger.debug("Got full registry with apps hashcode {}", apps.getAppsHashCode());
            } else {
                logger.warn("Not updating applications as another thread is updating it already");
            }
        }
    }
    

    1,全量拉取

    	// 全量拉取
        private void getAndStoreFullRegistry() throws Throwable {
            long currentUpdateGeneration = fetchRegistryGeneration.get();
    
            logger.info("Getting all instance registry info from the eureka server");
    
            Applications apps = null;
            EurekaHttpResponse<Applications> httpResponse = clientConfig.getRegistryRefreshSingleVipAddress() == null
                    ? eurekaTransport.queryClient.getApplications(remoteRegionsRef.get())
                    : eurekaTransport.queryClient.getVip(clientConfig.getRegistryRefreshSingleVipAddress(), remoteRegionsRef.get());
            if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {
                apps = httpResponse.getEntity();
            }
            logger.info("The response status is {}", httpResponse.getStatusCode());
    
            if (apps == null) {
                logger.error("The application is null for some reason. Not storing this information");
            } else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {
                localRegionApps.set(this.filterAndShuffle(apps));
                logger.debug("Got full registry with apps hashcode {}", apps.getAppsHashCode());
            } else {
                logger.warn("Not updating applications as another thread is updating it already");
            }
        }
    

    有一方法:eurekaTransport.queryClient.getApplications。通過debug發現 實現類是AbstractJerseyEurekaHttpClient,點開,debug出 webResource地址為:http://root:root@eureka-7900:7900/eureka/apps/,此端點用于獲取server中所有的注冊表信息。

    getAndStoreFullRegistry()可能被多個線程同時調用,導致新拉取的注冊表被舊的覆蓋(如果新拉取的動作設置apps阻塞的情況下)。

    此時用了AutomicLong來進行版本管理,如果更新時版本不一致,不保存apps。

    通過這個判斷fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1),如果版本一致,并設置新版本(+1),接著執行localRegionApps.set(this.filterAndShuffle(apps));過濾并洗牌apps。

    點開this.filterAndShuffle(apps)實現,繼續點apps.shuffleAndIndexInstances,繼續點shuffleInstances,繼續點application.shuffleAndStoreInstances,繼續點_shuffleAndStoreInstances,發現if (filterUpInstances && InstanceStatus.UP != instanceInfo.getStatus())。只保留狀態為UP的服務。

    2,增量拉取

    private void getAndUpdateDelta(Applications applications) throws Throwable {
            long currentUpdateGeneration = fetchRegistryGeneration.get();
    
            Applications delta = null;
            EurekaHttpResponse<Applications> httpResponse = eurekaTransport.queryClient.getDelta(remoteRegionsRef.get());
            if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {
                delta = httpResponse.getEntity();
            }
    
            if (delta == null) {
                logger.warn("The server does not allow the delta revision to be applied because it is not safe. "
                        + "Hence got the full registry.");
                getAndStoreFullRegistry();
            } else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {
                logger.debug("Got delta update with apps hashcode {}", delta.getAppsHashCode());
                String reconcileHashCode = "";
                if (fetchRegistryUpdateLock.tryLock()) {
                    try {
                        updateDelta(delta);
                        reconcileHashCode = getReconcileHashCode(applications);
                    } finally {
                        fetchRegistryUpdateLock.unlock();
                    }
                } else {
                    logger.warn("Cannot acquire update lock, aborting getAndUpdateDelta");
                }
                // There is a diff in number of instances for some reason
                if (!reconcileHashCode.equals(delta.getAppsHashCode()) || clientConfig.shouldLogDeltaDiff()) {
                    reconcileAndLogDifference(delta, reconcileHashCode);  // this makes a remoteCall
                }
            } else {
                logger.warn("Not updating application delta as another thread is updating it already");
                logger.debug("Ignoring delta update with apps hashcode {}, as another thread is updating it already", delta.getAppsHashCode());
            }
        }
    
    • 通過getDelta方法,看到實際拉取的地址是:apps/delta,如果獲取到的delta為空,則全量拉取。
    • 通常來講是3分鐘之內注冊表的信息變化(在server端判斷),獲取到delta后,會更新本地注冊表。
    • 增量式拉取是為了維護client和server端 注冊表的一致性,防止本地數據過久,而失效,采用增量式拉取的方式,減少了client和server的通信量。
    • client有一個注冊表緩存刷新定時器,專門負責維護兩者之間的信息同步,但是當增量出現意外時,定時器將執行,全量拉取以更新本地緩存信息。

    更新本地注冊表方法updateDelta,有一個細節。

     if (ActionType.ADDED.equals(instance.getActionType()))public enum ActionType {
              ADDED, // Added in the discovery server
              MODIFIED, // Changed in the discovery server
              DELETED
              // Deleted from the discovery server
          }
    

    在InstanceInfo instance中有一個instance.getActionType()ADDEDMODIFIED狀態的將更新本地注冊表applications.addApplicationDELETED將從本地剔除掉existingApp.removeInstance(instance)

    2,服務注冊

    好了拉取完eureka server中的注冊表了,接著進行服務注冊。回到DiscoveryClient構造函數。

    拉取fetchRegistry完后進行register注冊。由于構造函數開始時已經將服務實例元數據封裝好了instanceInfo,所以此處之間向server發送instanceInfo:

    public class DiscoveryClient implements EurekaClient {
        
        // 省略部分代碼
        
        //服務注冊
        boolean register() throws Throwable {
            logger.info(PREFIX + "{}: registering service...", appPathIdentifier);
            EurekaHttpResponse<Void> httpResponse;
            try {
                httpResponse = eurekaTransport.registrationClient.register(instanceInfo);
            } catch (Exception e) {
                logger.warn(PREFIX + "{} - registration failed {}", appPathIdentifier, e.getMessage(), e);
                throw e;
            }
            if (logger.isInfoEnabled()) {
                logger.info(PREFIX + "{} - registration status: {}", appPathIdentifier, httpResponse.getStatusCode());
            }
            // 204狀態碼,則注冊成功。
            return httpResponse.getStatusCode() == Status.NO_CONTENT.getStatusCode();
        }
        
        // 省略部分代碼
    }
    

    3,初始化3個定時任務

    initScheduledTasks()方法初始化3個定時任務。

    1. client會定時向server發送心跳,維持自己服務租約的有效性,用心跳定時任務實現;
    2. 而server中會有不同的服務實例注冊進來,一進一出,就需要數據的同步。所以client需要定時從server拉取注冊表信息,用緩存定時任務實現;
    3. client如果有變化,也會及時更新server中自己的信息,用按需注冊定時任務實現。

    沒錯就是這三個定時任務,進 initScheduledTasks()方法中:

    public class DiscoveryClient implements EurekaClient {
        
        // 省略部分代碼
        
        //初始化心跳定時任務,緩存定時任務,按需注冊定時任務
        private void initScheduledTasks() {
            //從server拉取注冊表信息。
            if (clientConfig.shouldFetchRegistry()) {
                // 拉取的時間間隔,通過eureka.client.registry-fetch-interval-seconds進行設置。
                int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
                int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
                // 緩存刷新定時任務
                cacheRefreshTask = new TimedSupervisorTask(
                        "cacheRefresh",
                        scheduler,
                        cacheRefreshExecutor,
                        registryFetchIntervalSeconds,
                        TimeUnit.SECONDS,
                        expBackOffBound,
                        new CacheRefreshThread()
                );
                scheduler.schedule(
                        cacheRefreshTask,
                        registryFetchIntervalSeconds, TimeUnit.SECONDS);
            }
    
            if (clientConfig.shouldRegisterWithEureka()) {
                // 心跳定時器,默認30秒。
                int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();
                int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();
                logger.info("Starting heartbeat executor: " + "renew interval is: {}", renewalIntervalInSecs);
    
                // 心跳定時任務
                heartbeatTask = new TimedSupervisorTask(
                        "heartbeat",
                        scheduler,
                        heartbeatExecutor,
                        renewalIntervalInSecs,
                        TimeUnit.SECONDS,
                        expBackOffBound,
                        new HeartbeatThread()
                );
                scheduler.schedule(
                        heartbeatTask,
                        renewalIntervalInSecs, TimeUnit.SECONDS);
    
                // 按需注冊定時任務實現
                instanceInfoReplicator = new InstanceInfoReplicator(
                        this,
                        instanceInfo,
                        clientConfig.getInstanceInfoReplicationIntervalSeconds(),
                        2); // burstSize
    
                statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {
                    @Override
                    public String getId() {
                        return "statusChangeListener";
                    }
    
                    @Override
                    public void notify(StatusChangeEvent statusChangeEvent) {
                        if (InstanceStatus.DOWN == statusChangeEvent.getStatus() ||
                                InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) {
                            // log at warn level if DOWN was involved
                            logger.warn("Saw local status change event {}", statusChangeEvent);
                        } else {
                            logger.info("Saw local status change event {}", statusChangeEvent);
                        }
                        instanceInfoReplicator.onDemandUpdate();
                    }
                };
    
                if (clientConfig.shouldOnDemandUpdateStatusChange()) {
                    applicationInfoManager.registerStatusChangeListener(statusChangeListener);
                }
    
                instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());
            } else {
                logger.info("Not registering with Eureka server per configuration");
            }
        }
        // 省略部分代碼
    }
    

    1,緩存刷新定時任務

    public class DiscoveryClient implements EurekaClient {
        
        // 省略部分代碼
        
        class CacheRefreshThread implements Runnable {
            public void run() {
                refreshRegistry();
            }
        }
        
        void refreshRegistry() {
            
            // 省略部分代碼
            
            boolean success = fetchRegistry(remoteRegionsModified);
                
            // 省略部分代碼
        }
        
        /**
         *	最后回到了我們上面講過的拉取注冊表信息的方法
         */
        //拉取服務信息
        private boolean fetchRegistry(boolean forceFullRegistryFetch) {
            Stopwatch tracer = FETCH_REGISTRY_TIMER.start();
    
            try {
                Applications applications = getApplications();
                // 如果增量式拉取被禁止或第一次拉取注冊表,則進行全量拉取
                if (clientConfig.shouldDisableDelta()
                        || (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress()))
                        || forceFullRegistryFetch
                        || (applications == null)
                        || (applications.getRegisteredApplications().size() == 0)
                        || (applications.getVersion() == -1)) //Client application does not have latest library supporting delta
                {
                    // 全量拉去
                    getAndStoreFullRegistry();
                } else {
                    // 否則進行增量拉取注冊表信息
                    getAndUpdateDelta(applications);
                }
               
            } catch (Throwable e) {
                return false;
            } finally {
                if (tracer != null) {
                    tracer.stop();
                }
            }
    
            // 在更新實例遠程狀態之前通知有關緩存刷新的信息
            onCacheRefreshed();
            // 根據緩存中保存的刷新數據更新遠程狀態
            updateInstanceRemoteStatus();
            // 拉取成功,返回true
            return true;
        }
        
        // 省略部分代碼
    }
    

    2,心跳定時任務

    public class DiscoveryClient implements EurekaClient {
    	
    	// 省略部分代碼
    	
    	private class HeartbeatThread implements Runnable {
    
            public void run() {
                if (renew()) {
                    lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis();
                }
            }
        }
        
        boolean renew() {
            EurekaHttpResponse<InstanceInfo> httpResponse;
            try {
                httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);
                // 看到如果遇到404,server沒有此實例,則重新發起注冊。如果續約成功返回 200.
                if (httpResponse.getStatusCode() == Status.NOT_FOUND.getStatusCode()) {
                    REREGISTER_COUNTER.increment();
                    long timestamp = instanceInfo.setIsDirtyWithTime();
                    boolean success = register();
                    if (success) {
                        instanceInfo.unsetIsDirty(timestamp);
                    }
                    return success;
                }
                return httpResponse.getStatusCode() == Status.OK.getStatusCode();
            } catch (Throwable e) {
                logger.error(PREFIX + "{} - was unable to send heartbeat!", appPathIdentifier, e);
                return false;
            }
        }
    	//省略部分代碼
    }
    

    3,按需注冊定時任務

    instanceinfostatus發生變化時,需要向server同步,去更新自己在server中的實例信息。保證server注冊表中服務實例信息的有效和可用。

    instanceInfoReplicator = new InstanceInfoReplicator(
                        this,
                        instanceInfo,
                        clientConfig.getInstanceInfoReplicationIntervalSeconds(),
                        2); // burstSize
    
                statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {
                    @Override
                    public String getId() {
                        return "statusChangeListener";
                    }
    
                    @Override
                    public void notify(StatusChangeEvent statusChangeEvent) {
                        if (InstanceStatus.DOWN == statusChangeEvent.getStatus() ||
                                InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) {
                            // log at warn level if DOWN was involved
                            logger.warn("Saw local status change event {}", statusChangeEvent);
                        } else {
                            logger.info("Saw local status change event {}", statusChangeEvent);
                        }
                        instanceInfoReplicator.onDemandUpdate();
                    }
                };
    
                if (clientConfig.shouldOnDemandUpdateStatusChange()) {
                    applicationInfoManager.registerStatusChangeListener(statusChangeListener);
                }
    
                instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());
    

    此定時任務有2個部分:

    1,定時刷新服務實例信息和檢查應用狀態的變化,在服務實例信息發生改變的情況下向server重新發起注冊。InstanceInfoReplicator點進去。看到一個方法 :
    class InstanceInfoReplicator implements Runnable {
        
        // 省略部分代碼
        
        public void run() {
            try {
                // 刷新instanceinfo,如果有變化,在下次心跳時,同步向server。
                discoveryClient.refreshInstanceInfo();
    			// 如果實例信息有變,返回數據更新時間
                Long dirtyTimestamp = instanceInfo.isDirtyWithTime();
                if (dirtyTimestamp != null) {
                    discoveryClient.register();
                    instanceInfo.unsetIsDirty(dirtyTimestamp);
                }
            } catch (Throwable t) {
                logger.warn("There was a problem with the instance info replicator", t);
            } finally {
                // 延時執行下一個檢查任務。用于再次調用run方法,繼續檢查服務實例信息和狀態的變化
                Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS);
                scheduledPeriodicRef.set(next);
            }
        }
        
        // 省略部分代碼
    }
    
    2,注冊狀態改變監聽器,在應用狀態發生變化時,刷新服務實例信息,在服務實例信息發生改變時向server注冊。
    statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {
                    @Override
                    public String getId() {
                        return "statusChangeListener";
                    }
    
                    @Override
                    public void notify(StatusChangeEvent statusChangeEvent) {
                        if (InstanceStatus.DOWN == statusChangeEvent.getStatus() ||
                                InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) {
                            // log at warn level if DOWN was involved
                            logger.warn("Saw local status change event {}", statusChangeEvent);
                        } else {
                            logger.info("Saw local status change event {}", statusChangeEvent);
                        }
                        如果狀態發生改變,調用onDemandUpdate(),點onDemandUpdate進去,看到InstanceInfoReplicator.this.run();     
                        instanceInfoReplicator.onDemandUpdate();
                    }
                };
    

    總結:兩部分,一部分自己去檢查,一部分等待狀態監聽事件。

    三,服務下線

    服務下線:在應用關閉時,client會向server注銷自己,在Discoveryclient銷毀前,會執行下面清理方法。

    public class DiscoveryClient implements EurekaClient {
        
        // 省略部分代碼
        
        @PreDestroy
        @Override
        public synchronized void shutdown() {
            if (isShutdown.compareAndSet(false, true)) {
                logger.info("Shutting down DiscoveryClient ...");
    
                if (statusChangeListener != null && applicationInfoManager != null) {
        
     // 注銷監聽器
     applicationInfoManager.unregisterStatusChangeListener(statusChangeListener.getId());
                }
    			// 取消定時任務
                cancelScheduledTasks();
    
                // If APPINFO was registered
                if (applicationInfoManager != null
                        && clientConfig.shouldRegisterWithEureka()
                        && clientConfig.shouldUnregisterOnShutdown()) {
                    applicationInfoManager.setInstanceStatus(InstanceStatus.DOWN);
                    // 服務下線
                    unregister();
                }
    			// 關閉jersy客戶端
                if (eurekaTransport != null) {
                    eurekaTransport.shutdown();
                }
    
                heartbeatStalenessMonitor.shutdown();
                registryStalenessMonitor.shutdown();
    
                Monitors.unregisterObject(this);
    
                logger.info("Completed shut down of DiscoveryClient");
            }
        }
        
        // 省略部分代碼
    }
    
    版權聲明:本文為u013277209原創文章,遵循 CC 4.0 BY-SA 版權協議,轉載請附上原文出處鏈接和本聲明。
    本文鏈接:https://blog.csdn.net/u013277209/article/details/109679422

    智能推薦

    SpringCloud Eureka源碼分析

    Eureka核心功能點 服務注冊(register):Eureka Client會通過發送REST請求的方式向Eureka Server注冊自己的服務,提供自身的元數 據,比如ip地址、端口、運行狀況指標的url、主頁地址等信息。Eureka Server接收到注冊請求后,就會把這些元數 據信息存儲在一個雙層的Map中。   服務續約(renew):在服務注冊后,Eureka Clien...

    SpringCloud Eureka源碼分析

    老規矩,先找XXXAutoConfiguration。 EurekaClientAutoConfiguration 這個里面主要看這幾個bean: EurekaAutoServiceRegistration EurekaClient ApplicationInfoManager EurekaRegistration EurekaClient : 這里初始化的是CloudEurekaClient,看...

    LiveData詳細分析

    目錄介紹 01.LiveData是什么東西 02.使用LiveData的優勢 03.使用LiveData的步驟 04.簡單使用LiveData 05.observe()和observerForever() 06.LiveData原理介紹 07.observe訂閱源碼分析 08.setValue發送源碼分析 09.observeForever源碼 10.LiveData源碼總結 00.使用LiveD...

    Lifecycle詳細分析

    Lifecycle源碼分析 目錄介紹 01.Lifecycle的作用是什么 02.Lifecycle的簡單使用 03.Lifecycle的使用場景 04.如何實現生命周期感知 05.注解方法如何被調用 06.addObserver調用分析 07.知識點梳理和總結一下 00.使用AAC實現bus事件總線 利用LiveData實現事件總線,替代EventBus。充分利用了生命周期感知功能,可以在act...

    ConcurrentHashmap 詳細分析

    詳盡的分析 JDK8 后的ConcurrentHashmap,思路分析輔以源碼走讀,徹底讀懂 ConcurrentHashmap。 簡介 放入數據 容器元素總數更新 容器擴容 協助擴容 遍歷 簡介 在從 JDK8 開始,為了提高并發度,ConcurrentHashMap的源碼進行了很大的調整。在 JDK7 中,采用的是分段鎖的思路。簡單的說,就是ConcurrentHashMap是由多個HashM...

    猜你喜歡

    HTML中常用操作關于:頁面跳轉,空格

    1.頁面跳轉 2.空格的代替符...

    freemarker + ItextRender 根據模板生成PDF文件

    1. 制作模板 2. 獲取模板,并將所獲取的數據加載生成html文件 2. 生成PDF文件 其中由兩個地方需要注意,都是關于獲取文件路徑的問題,由于項目部署的時候是打包成jar包形式,所以在開發過程中時直接安照傳統的獲取方法沒有一點文件,但是當打包后部署,總是出錯。于是參考網上文章,先將文件讀出來到項目的臨時目錄下,然后再按正常方式加載該臨時文件; 還有一個問題至今沒有解決,就是關于生成PDF文件...

    電腦空間不夠了?教你一個小秒招快速清理 Docker 占用的磁盤空間!

    Docker 很占用空間,每當我們運行容器、拉取鏡像、部署應用、構建自己的鏡像時,我們的磁盤空間會被大量占用。 如果你也被這個問題所困擾,咱們就一起看一下 Docker 是如何使用磁盤空間的,以及如何回收。 docker 占用的空間可以通過下面的命令查看: TYPE 列出了docker 使用磁盤的 4 種類型: Images:所有鏡像占用的空間,包括拉取下來的鏡像,和本地構建的。 Con...

    requests實現全自動PPT模板

    http://www.1ppt.com/moban/ 可以免費的下載PPT模板,當然如果要人工一個個下,還是挺麻煩的,我們可以利用requests輕松下載 訪問這個主頁,我們可以看到下面的樣式 點每一個PPT模板的圖片,我們可以進入到詳細的信息頁面,翻到下面,我們可以看到對應的下載地址 點擊這個下載的按鈕,我們便可以下載對應的PPT壓縮包 那我們就開始做吧 首先,查看網頁的源代碼,我們可以看到每一...

    精品国产乱码久久久久久蜜桃不卡