博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
聊聊spring cloud eureka的instanceEnabledOnit属性
阅读量:7226 次
发布时间:2019-06-29

本文共 14131 字,大约阅读时间需要 47 分钟。

  hot3.png

本文主要研究下spring cloud eureka的instanceEnabledOnit属性

EurekaInstanceConfigBean

spring-cloud-netflix-eureka-client-2.0.0.RC1-sources.jar!/org/springframework/cloud/netflix/eureka/EurekaInstanceConfigBean.java

@ConfigurationProperties("eureka.instance")public class EurekaInstanceConfigBean implements CloudEurekaInstanceConfig, EnvironmentAware {	//......	/**	 * Indicates whether the instance should be enabled for taking traffic as soon as it	 * is registered with eureka. Sometimes the application might need to do some	 * pre-processing before it is ready to take traffic.	 */	private boolean instanceEnabledOnit;	//......}

这个属性用来决定应用服务是否一注册上就可以开始接收请求

配置定义如下 spring-cloud-netflix-eureka-client-2.0.0.RC1.jar!/META-INF/spring-configuration-metadata.json

{      "sourceType": "org.springframework.cloud.netflix.eureka.EurekaInstanceConfigBean",      "defaultValue": false,      "name": "eureka.instance.instance-enabled-onit",      "description": "Indicates whether the instance should be enabled for taking traffic as soon as it\n is registered with eureka. Sometimes the application might need to do some\n pre-processing before it is ready to take traffic.",      "type": "java.lang.Boolean"    }

PropertiesInstanceConfig

eureka-client-1.8.8-sources.jar!/com/netflix/appinfo/PropertiesInstanceConfig.java

static final String TRAFFIC_ENABLED_ON_INIT_KEY = "traffic.enabled";    @Override    public boolean isInstanceEnabledOnit() {        return configInstance.getBooleanProperty(namespace + TRAFFIC_ENABLED_ON_INIT_KEY,                super.isInstanceEnabledOnit()).get();    }

这里走的是super.isInstanceEnabledOnit()

变更为STARTING

InstanceInfoFactory

spring-cloud-netflix-eureka-client-2.0.0.RC1-sources.jar!/org/springframework/cloud/netflix/eureka/InstanceInfoFactory.java

public class InstanceInfoFactory {	private static final Log log = LogFactory.getLog(InstanceInfoFactory.class);	public InstanceInfo create(EurekaInstanceConfig config) {		LeaseInfo.Builder leaseInfoBuilder = LeaseInfo.Builder.newBuilder()				.setRenewalIntervalInSecs(config.getLeaseRenewalIntervalInSeconds())				.setDurationInSecs(config.getLeaseExpirationDurationInSeconds());		// Builder the instance information to be registered with eureka		// server		InstanceInfo.Builder builder = InstanceInfo.Builder.newBuilder();		String namespace = config.getNamespace();		if (!namespace.endsWith(".")) {			namespace = namespace + ".";		}		builder.setNamespace(namespace).setAppName(config.getAppname())				.setInstanceId(config.getInstanceId())				.setAppGroupName(config.getAppGroupName())				.setDataCenterInfo(config.getDataCenterInfo())				.setIPAddr(config.getIpAddress()).setHostName(config.getHostName(false))				.setPort(config.getNonSecurePort())				.enablePort(InstanceInfo.PortType.UNSECURE,						config.isNonSecurePortEnabled())				.setSecurePort(config.getSecurePort())				.enablePort(InstanceInfo.PortType.SECURE, config.getSecurePortEnabled())				.setVIPAddress(config.getVirtualHostName())				.setSecureVIPAddress(config.getSecureVirtualHostName())				.setHomePageUrl(config.getHomePageUrlPath(), config.getHomePageUrl())				.setStatusPageUrl(config.getStatusPageUrlPath(),						config.getStatusPageUrl())				.setHealthCheckUrls(config.getHealthCheckUrlPath(),						config.getHealthCheckUrl(), config.getSecureHealthCheckUrl())				.setASGName(config.getASGName());		// Start off with the STARTING state to avoid traffic		if (!config.isInstanceEnabledOnit()) {			InstanceInfo.InstanceStatus initialStatus = InstanceInfo.InstanceStatus.STARTING;			if (log.isInfoEnabled()) {				log.info("Setting initial instance status as: " + initialStatus);			}			builder.setStatus(initialStatus);		}		else {			if (log.isInfoEnabled()) {				log.info("Setting initial instance status as: "						+ InstanceInfo.InstanceStatus.UP						+ ". This may be too early for the instance to advertise itself as available. "						+ "You would instead want to control this via a healthcheck handler.");			}		}		// Add any user-specific metadata information		for (Map.Entry
mapEntry : config.getMetadataMap().entrySet()) { String key = mapEntry.getKey(); String value = mapEntry.getValue(); // only add the metadata if the value is present if (value != null && !value.isEmpty()) { builder.add(key, value); } } InstanceInfo instanceInfo = builder.build(); instanceInfo.setLeaseInfo(leaseInfoBuilder.build()); return instanceInfo; }}

可以看到这里判断isInstanceEnabledOnit,如果不是,则InstanceStatus初始状态为InstanceInfo.InstanceStatus.STARTING;而InstanceInfo创建的时候,默认初始化status为InstanceStatus.UP,所以如果为true,这里就log一下

EurekaRegistration

spring-cloud-netflix-eureka-client-2.0.0.RC1-sources.jar!/org/springframework/cloud/netflix/eureka/serviceregistry/EurekaRegistration.java

public EurekaRegistration build() {			Assert.notNull(instanceConfig, "instanceConfig may not be null");			if (this.applicationInfoManager == null) {				InstanceInfo instanceInfo = new InstanceInfoFactory().create(this.instanceConfig);				this.applicationInfoManager = new ApplicationInfoManager(this.instanceConfig, instanceInfo);			}			if (this.eurekaClient == null) {				Assert.notNull(this.clientConfig, "if eurekaClient is null, EurekaClientConfig may not be null");				Assert.notNull(this.publisher, "if eurekaClient is null, ApplicationEventPublisher may not be null");				this.eurekaClient = new CloudEurekaClient(this.applicationInfoManager, this.clientConfig, this.publisher);			}			return new EurekaRegistration(instanceConfig, eurekaClient, applicationInfoManager, healthCheckHandler);		}

EurekaRegistration创建的时候,使用了new InstanceInfoFactory().create(this.instanceConfig)来创建InstanceInfo

DiscoveryClient.initScheduledTasks

eureka-client-1.8.8-sources.jar!/com/netflix/discovery/DiscoveryClient.java

private void initScheduledTasks() {        if (clientConfig.shouldFetchRegistry()) {            // registry cache refresh timer            int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();            int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();            scheduler.schedule(                    new TimedSupervisorTask(                            "cacheRefresh",                            scheduler,                            cacheRefreshExecutor,                            registryFetchIntervalSeconds,                            TimeUnit.SECONDS,                            expBackOffBound,                            new CacheRefreshThread()                    ),                    registryFetchIntervalSeconds, TimeUnit.SECONDS);        }        if (clientConfig.shouldRegisterWithEureka()) {            int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();            int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();            logger.info("Starting heartbeat executor: " + "renew interval is: {}", renewalIntervalInSecs);            // Heartbeat timer            scheduler.schedule(                    new TimedSupervisorTask(                            "heartbeat",                            scheduler,                            heartbeatExecutor,                            renewalIntervalInSecs,                            TimeUnit.SECONDS,                            expBackOffBound,                            new HeartbeatThread()                    ),                    renewalIntervalInSecs, TimeUnit.SECONDS);            // InstanceInfo replicator            instanceInfoReplicator = new InstanceInfoReplicator(                    this,                    instanceInfo,                    clientConfig.getInstanceInfoReplicationIntervalSeconds(),                    2); // burstSize            statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {                @Override                public String getId() {                    return "statusChangeListener";                }                @Override                public void notify(StatusChangeEvent statusChangeEvent) {                    if (InstanceStatus.DOWN == statusChangeEvent.getStatus() ||                            InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) {                        // log at warn level if DOWN was involved                        logger.warn("Saw local status change event {}", statusChangeEvent);                    } else {                        logger.info("Saw local status change event {}", statusChangeEvent);                    }                    instanceInfoReplicator.onDemandUpdate();                }            };            if (clientConfig.shouldOnDemandUpdateStatusChange()) {                applicationInfoManager.registerStatusChangeListener(statusChangeListener);            }            instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());        } else {            logger.info("Not registering with Eureka server per configuration");        }    }

该client构造器初始化initScheduledTasks,这里头调用了instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds()),默认是延迟40秒执行

InstanceInfoReplicator

eureka-client-1.8.8-sources.jar!/com/netflix/discovery/InstanceInfoReplicator.java

class InstanceInfoReplicator implements Runnable {		public void start(int initialDelayMs) {        if (started.compareAndSet(false, true)) {            instanceInfo.setIsDirty();  // for initial register            Future next = scheduler.schedule(this, initialDelayMs, TimeUnit.SECONDS);            scheduledPeriodicRef.set(next);        }    }    public void run() {        try {            discoveryClient.refreshInstanceInfo();            Long dirtyTimestamp = instanceInfo.isDirtyWithTime();            if (dirtyTimestamp != null) {                discoveryClient.register();                instanceInfo.unsetIsDirty(dirtyTimestamp);            }        } catch (Throwable t) {            logger.warn("There was a problem with the instance info replicator", t);        } finally {            Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS);            scheduledPeriodicRef.set(next);        }    }    public boolean onDemandUpdate() {        if (rateLimiter.acquire(burstSize, allowedRatePerMinute)) {            if (!scheduler.isShutdown()) {                scheduler.submit(new Runnable() {                    @Override                    public void run() {                        logger.debug("Executing on-demand update of local InstanceInfo");                            Future latestPeriodic = scheduledPeriodicRef.get();                        if (latestPeriodic != null && !latestPeriodic.isDone()) {                            logger.debug("Canceling the latest scheduled update, it will be rescheduled at the end of on demand update");                            latestPeriodic.cancel(false);                        }                            InstanceInfoReplicator.this.run();                    }                });                return true;            } else {                logger.warn("Ignoring onDemand update due to stopped scheduler");                return false;            }        } else {            logger.warn("Ignoring onDemand update due to rate limiter");            return false;        }    }}

这里最开始设定为dirty,然后第一次执行的时候,就会触发register,远程调用注册。 DiscoveryClient有个statusChangeListener,它调用的是这里的onDemandUpdate方法,而onDemandUpdate方法在不超过频率限制的时候,执行的是这个run方法。

变更为UP

EurekaAutoServiceRegistration

spring-cloud-netflix-eureka-client-2.0.0.RC1-sources.jar!/org/springframework/cloud/netflix/eureka/serviceregistry/EurekaAutoServiceRegistration.java

public class EurekaAutoServiceRegistration implements AutoServiceRegistration, SmartLifecycle, Ordered {	//......	public void start() {		// only set the port if the nonSecurePort or securePort is 0 and this.port != 0		if (this.port.get() != 0) {			if (this.registration.getNonSecurePort() == 0) {				this.registration.setNonSecurePort(this.port.get());			}			if (this.registration.getSecurePort() == 0 && this.registration.isSecure()) {				this.registration.setSecurePort(this.port.get());			}		}		// only initialize if nonSecurePort is greater than 0 and it isn't already running		// because of containerPortInitializer below		if (!this.running.get() && this.registration.getNonSecurePort() > 0) {			this.serviceRegistry.register(this.registration);			this.context.publishEvent(					new InstanceRegisteredEvent<>(this, this.registration.getInstanceConfig()));			this.running.set(true);		}	}}

这里自动注册,serviceRegistry.register(this.registration)

public void register(EurekaRegistration reg) {		maybeInitializeClient(reg);		if (log.isInfoEnabled()) {			log.info("Registering application " + reg.getInstanceConfig().getAppname()					+ " with eureka with status "					+ reg.getInstanceConfig().getInitialStatus());		}		reg.getApplicationInfoManager()				.setInstanceStatus(reg.getInstanceConfig().getInitialStatus());		reg.getHealthCheckHandler().ifAvailable(healthCheckHandler ->				reg.getEurekaClient().registerHealthCheck(healthCheckHandler));	}

这里会变更状态为reg.getInstanceConfig().getInitialStatus(),该值默认为UP

spring-cloud-netflix-eureka-client-2.0.0.RC1-sources.jar!/org/springframework/cloud/netflix/eureka/EurekaInstanceConfigBean.java

/**	 * Initial status to register with rmeote Eureka server.	 */	private InstanceStatus initialStatus = InstanceStatus.UP;

小结

应用启动时,可以根据eureka.instance.instance-enabled-onit配置设定(默认为false),来配置初始注册到eureka server的时候,其status是UP,还是STARTING。默认初始化的时候是STARTING,之后自动注册的时候,变更为UP,表示可以开始接收请求。

  • DiscoveryClient初始化了一堆定时任务,其中包括了InstanceInfoReplicator的run方法,它默认是延时40秒执行,它在start的时候先设置为instanceInfo.setIsDirty(),之后第一次执行就可以触发discoveryClient.register()。
  • 而EurekaAutoServiceRegistration在启动的时候,会调用serviceRegistry.register(this.registration),之后变更状态,发布StatusChangeEvent,DiscoveryClient有个statusChangeListener,它调用的是InstanceInfoReplicator的onDemandUpdate方法,而onDemandUpdate方法在不超过频率限制的时候,执行的是InstanceInfoReplicator的run方法。

可以发现不管是InstanceInfoReplicator的延时任务,还是urekaAutoServiceRegistration变更StatusChangeEvent,最后都是由InstanceInfoReplicator的run方法去触发discoveryClient.register(),去远程eureka server注册。

转载于:https://my.oschina.net/go4it/blog/1807425

你可能感兴趣的文章
2019年最火热的Golang项目
查看>>
可实现RSSD云硬盘120万IOPS的SPDK IO路径优化实践
查看>>
Vue项目部署遇到的坑(你肯定会遇到!)
查看>>
资源分享计划第三期 0511
查看>>
awk 文本处理
查看>>
【JSConf EU 2018】主题总结 (部分主题已有中文文章)
查看>>
JavaScript面向对象名词详解
查看>>
Java设计模式学习 - 责任链模式
查看>>
JVM,DVM,ART
查看>>
webgl滤镜--会呼吸的痛
查看>>
用Go语言实现微信支付SDK
查看>>
oauth2在php实践
查看>>
LeetCode.914 卡牌分组
查看>>
填坑app:compileDebugJavaWithJavac
查看>>
Android 100+行实现本地跳一跳辅助(不需要连接电脑)
查看>>
位状态的使用
查看>>
面试技术题笔记
查看>>
Myth源码解析系列之一-项目简介
查看>>
JS易混淆的方法整理
查看>>
iOS下JS与OC互相调用(八)--Cordova详解+实战
查看>>