序
本文主要研究下spring cloud eureka的instanceEnabledOnit属性
EurekaInstanceConfigBean
spring-cloud-netflix-eureka-client-2.0.0.RC1-sources.jar!/org/springframework/cloud/netflix/eureka/EurekaInstanceConfigBean.java
@ConfigurationProperties("eureka.instance")public class EurekaInstanceConfigBean implements CloudEurekaInstanceConfig, EnvironmentAware { //...... /** * Indicates whether the instance should be enabled for taking traffic as soon as it * is registered with eureka. Sometimes the application might need to do some * pre-processing before it is ready to take traffic. */ private boolean instanceEnabledOnit; //......}
这个属性用来决定应用服务是否一注册上就可以开始接收请求
配置定义如下 spring-cloud-netflix-eureka-client-2.0.0.RC1.jar!/META-INF/spring-configuration-metadata.json
{ "sourceType": "org.springframework.cloud.netflix.eureka.EurekaInstanceConfigBean", "defaultValue": false, "name": "eureka.instance.instance-enabled-onit", "description": "Indicates whether the instance should be enabled for taking traffic as soon as it\n is registered with eureka. Sometimes the application might need to do some\n pre-processing before it is ready to take traffic.", "type": "java.lang.Boolean" }
PropertiesInstanceConfig
eureka-client-1.8.8-sources.jar!/com/netflix/appinfo/PropertiesInstanceConfig.java
static final String TRAFFIC_ENABLED_ON_INIT_KEY = "traffic.enabled"; @Override public boolean isInstanceEnabledOnit() { return configInstance.getBooleanProperty(namespace + TRAFFIC_ENABLED_ON_INIT_KEY, super.isInstanceEnabledOnit()).get(); }
这里走的是super.isInstanceEnabledOnit()
变更为STARTING
InstanceInfoFactory
spring-cloud-netflix-eureka-client-2.0.0.RC1-sources.jar!/org/springframework/cloud/netflix/eureka/InstanceInfoFactory.java
public class InstanceInfoFactory { private static final Log log = LogFactory.getLog(InstanceInfoFactory.class); public InstanceInfo create(EurekaInstanceConfig config) { LeaseInfo.Builder leaseInfoBuilder = LeaseInfo.Builder.newBuilder() .setRenewalIntervalInSecs(config.getLeaseRenewalIntervalInSeconds()) .setDurationInSecs(config.getLeaseExpirationDurationInSeconds()); // Builder the instance information to be registered with eureka // server InstanceInfo.Builder builder = InstanceInfo.Builder.newBuilder(); String namespace = config.getNamespace(); if (!namespace.endsWith(".")) { namespace = namespace + "."; } builder.setNamespace(namespace).setAppName(config.getAppname()) .setInstanceId(config.getInstanceId()) .setAppGroupName(config.getAppGroupName()) .setDataCenterInfo(config.getDataCenterInfo()) .setIPAddr(config.getIpAddress()).setHostName(config.getHostName(false)) .setPort(config.getNonSecurePort()) .enablePort(InstanceInfo.PortType.UNSECURE, config.isNonSecurePortEnabled()) .setSecurePort(config.getSecurePort()) .enablePort(InstanceInfo.PortType.SECURE, config.getSecurePortEnabled()) .setVIPAddress(config.getVirtualHostName()) .setSecureVIPAddress(config.getSecureVirtualHostName()) .setHomePageUrl(config.getHomePageUrlPath(), config.getHomePageUrl()) .setStatusPageUrl(config.getStatusPageUrlPath(), config.getStatusPageUrl()) .setHealthCheckUrls(config.getHealthCheckUrlPath(), config.getHealthCheckUrl(), config.getSecureHealthCheckUrl()) .setASGName(config.getASGName()); // Start off with the STARTING state to avoid traffic if (!config.isInstanceEnabledOnit()) { InstanceInfo.InstanceStatus initialStatus = InstanceInfo.InstanceStatus.STARTING; if (log.isInfoEnabled()) { log.info("Setting initial instance status as: " + initialStatus); } builder.setStatus(initialStatus); } else { if (log.isInfoEnabled()) { log.info("Setting initial instance status as: " + InstanceInfo.InstanceStatus.UP + ". This may be too early for the instance to advertise itself as available. " + "You would instead want to control this via a healthcheck handler."); } } // Add any user-specific metadata information for (Map.EntrymapEntry : config.getMetadataMap().entrySet()) { String key = mapEntry.getKey(); String value = mapEntry.getValue(); // only add the metadata if the value is present if (value != null && !value.isEmpty()) { builder.add(key, value); } } InstanceInfo instanceInfo = builder.build(); instanceInfo.setLeaseInfo(leaseInfoBuilder.build()); return instanceInfo; }}
可以看到这里判断isInstanceEnabledOnit,如果不是,则InstanceStatus初始状态为InstanceInfo.InstanceStatus.STARTING;而InstanceInfo创建的时候,默认初始化status为InstanceStatus.UP,所以如果为true,这里就log一下
EurekaRegistration
spring-cloud-netflix-eureka-client-2.0.0.RC1-sources.jar!/org/springframework/cloud/netflix/eureka/serviceregistry/EurekaRegistration.java
public EurekaRegistration build() { Assert.notNull(instanceConfig, "instanceConfig may not be null"); if (this.applicationInfoManager == null) { InstanceInfo instanceInfo = new InstanceInfoFactory().create(this.instanceConfig); this.applicationInfoManager = new ApplicationInfoManager(this.instanceConfig, instanceInfo); } if (this.eurekaClient == null) { Assert.notNull(this.clientConfig, "if eurekaClient is null, EurekaClientConfig may not be null"); Assert.notNull(this.publisher, "if eurekaClient is null, ApplicationEventPublisher may not be null"); this.eurekaClient = new CloudEurekaClient(this.applicationInfoManager, this.clientConfig, this.publisher); } return new EurekaRegistration(instanceConfig, eurekaClient, applicationInfoManager, healthCheckHandler); }
EurekaRegistration创建的时候,使用了new InstanceInfoFactory().create(this.instanceConfig)来创建InstanceInfo
DiscoveryClient.initScheduledTasks
eureka-client-1.8.8-sources.jar!/com/netflix/discovery/DiscoveryClient.java
private void initScheduledTasks() { if (clientConfig.shouldFetchRegistry()) { // registry cache refresh timer int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds(); int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound(); scheduler.schedule( new TimedSupervisorTask( "cacheRefresh", scheduler, cacheRefreshExecutor, registryFetchIntervalSeconds, TimeUnit.SECONDS, expBackOffBound, new CacheRefreshThread() ), registryFetchIntervalSeconds, TimeUnit.SECONDS); } if (clientConfig.shouldRegisterWithEureka()) { int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs(); int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound(); logger.info("Starting heartbeat executor: " + "renew interval is: {}", renewalIntervalInSecs); // Heartbeat timer scheduler.schedule( new TimedSupervisorTask( "heartbeat", scheduler, heartbeatExecutor, renewalIntervalInSecs, TimeUnit.SECONDS, expBackOffBound, new HeartbeatThread() ), renewalIntervalInSecs, TimeUnit.SECONDS); // InstanceInfo replicator instanceInfoReplicator = new InstanceInfoReplicator( this, instanceInfo, clientConfig.getInstanceInfoReplicationIntervalSeconds(), 2); // burstSize statusChangeListener = new ApplicationInfoManager.StatusChangeListener() { @Override public String getId() { return "statusChangeListener"; } @Override public void notify(StatusChangeEvent statusChangeEvent) { if (InstanceStatus.DOWN == statusChangeEvent.getStatus() || InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) { // log at warn level if DOWN was involved logger.warn("Saw local status change event {}", statusChangeEvent); } else { logger.info("Saw local status change event {}", statusChangeEvent); } instanceInfoReplicator.onDemandUpdate(); } }; if (clientConfig.shouldOnDemandUpdateStatusChange()) { applicationInfoManager.registerStatusChangeListener(statusChangeListener); } instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds()); } else { logger.info("Not registering with Eureka server per configuration"); } }
该client构造器初始化initScheduledTasks,这里头调用了instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds()),默认是延迟40秒执行
InstanceInfoReplicator
eureka-client-1.8.8-sources.jar!/com/netflix/discovery/InstanceInfoReplicator.java
class InstanceInfoReplicator implements Runnable { public void start(int initialDelayMs) { if (started.compareAndSet(false, true)) { instanceInfo.setIsDirty(); // for initial register Future next = scheduler.schedule(this, initialDelayMs, TimeUnit.SECONDS); scheduledPeriodicRef.set(next); } } public void run() { try { discoveryClient.refreshInstanceInfo(); Long dirtyTimestamp = instanceInfo.isDirtyWithTime(); if (dirtyTimestamp != null) { discoveryClient.register(); instanceInfo.unsetIsDirty(dirtyTimestamp); } } catch (Throwable t) { logger.warn("There was a problem with the instance info replicator", t); } finally { Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS); scheduledPeriodicRef.set(next); } } public boolean onDemandUpdate() { if (rateLimiter.acquire(burstSize, allowedRatePerMinute)) { if (!scheduler.isShutdown()) { scheduler.submit(new Runnable() { @Override public void run() { logger.debug("Executing on-demand update of local InstanceInfo"); Future latestPeriodic = scheduledPeriodicRef.get(); if (latestPeriodic != null && !latestPeriodic.isDone()) { logger.debug("Canceling the latest scheduled update, it will be rescheduled at the end of on demand update"); latestPeriodic.cancel(false); } InstanceInfoReplicator.this.run(); } }); return true; } else { logger.warn("Ignoring onDemand update due to stopped scheduler"); return false; } } else { logger.warn("Ignoring onDemand update due to rate limiter"); return false; } }}
这里最开始设定为dirty,然后第一次执行的时候,就会触发register,远程调用注册。 DiscoveryClient有个statusChangeListener,它调用的是这里的onDemandUpdate方法,而onDemandUpdate方法在不超过频率限制的时候,执行的是这个run方法。
变更为UP
EurekaAutoServiceRegistration
spring-cloud-netflix-eureka-client-2.0.0.RC1-sources.jar!/org/springframework/cloud/netflix/eureka/serviceregistry/EurekaAutoServiceRegistration.java
public class EurekaAutoServiceRegistration implements AutoServiceRegistration, SmartLifecycle, Ordered { //...... public void start() { // only set the port if the nonSecurePort or securePort is 0 and this.port != 0 if (this.port.get() != 0) { if (this.registration.getNonSecurePort() == 0) { this.registration.setNonSecurePort(this.port.get()); } if (this.registration.getSecurePort() == 0 && this.registration.isSecure()) { this.registration.setSecurePort(this.port.get()); } } // only initialize if nonSecurePort is greater than 0 and it isn't already running // because of containerPortInitializer below if (!this.running.get() && this.registration.getNonSecurePort() > 0) { this.serviceRegistry.register(this.registration); this.context.publishEvent( new InstanceRegisteredEvent<>(this, this.registration.getInstanceConfig())); this.running.set(true); } }}
这里自动注册,serviceRegistry.register(this.registration)
public void register(EurekaRegistration reg) { maybeInitializeClient(reg); if (log.isInfoEnabled()) { log.info("Registering application " + reg.getInstanceConfig().getAppname() + " with eureka with status " + reg.getInstanceConfig().getInitialStatus()); } reg.getApplicationInfoManager() .setInstanceStatus(reg.getInstanceConfig().getInitialStatus()); reg.getHealthCheckHandler().ifAvailable(healthCheckHandler -> reg.getEurekaClient().registerHealthCheck(healthCheckHandler)); }
这里会变更状态为reg.getInstanceConfig().getInitialStatus(),该值默认为UP
spring-cloud-netflix-eureka-client-2.0.0.RC1-sources.jar!/org/springframework/cloud/netflix/eureka/EurekaInstanceConfigBean.java
/** * Initial status to register with rmeote Eureka server. */ private InstanceStatus initialStatus = InstanceStatus.UP;
小结
应用启动时,可以根据eureka.instance.instance-enabled-onit配置设定(默认为false
),来配置初始注册到eureka server的时候,其status是UP,还是STARTING。默认初始化的时候是STARTING,之后自动注册的时候,变更为UP,表示可以开始接收请求。
- DiscoveryClient初始化了一堆定时任务,其中包括了InstanceInfoReplicator的run方法,它默认是延时40秒执行,它在start的时候先设置为instanceInfo.setIsDirty(),之后第一次执行就可以触发discoveryClient.register()。
- 而EurekaAutoServiceRegistration在启动的时候,会调用serviceRegistry.register(this.registration),之后变更状态,发布StatusChangeEvent,DiscoveryClient有个statusChangeListener,它调用的是InstanceInfoReplicator的onDemandUpdate方法,而onDemandUpdate方法在不超过频率限制的时候,执行的是InstanceInfoReplicator的run方法。
可以发现不管是InstanceInfoReplicator的延时任务,还是urekaAutoServiceRegistration变更StatusChangeEvent,最后都是由InstanceInfoReplicator的run方法去触发discoveryClient.register(),去远程eureka server注册。