Eureka 缓存结构以及服务感知优化

目录Eureka-Client获取注册信息Eureka-Server管理注册信息服务感知优化果然好记性不如烂笔头,再简单的东西不记录下来总是会忘的!本文首先会分析eureka中的缓存架构。并在此基础上优化服务之间的感知Eureka-Client获取注册信息eureka-client获取注册信息可分为两种,分别是全量获取和增量获取。Eureka-Client启动时,首先执行一次全量获取进行本地缓存注...

Eureka 缓存结构以及服务感知优化

目录

  • Eureka-Client获取注册信息
  • Eureka-Server管理注册信息
  • 服务感知优化

果然好记性不如烂笔头,再简单的东西不记录下来总是会忘的!

本文首先会分析eureka中的缓存架构。并在此基础上优化服务之间的感知

Eureka-Client获取注册信息

eureka-client获取注册信息可分为两种,分别是全量获取和增量获取。

Eureka-Client 启动时,首先执行一次全量获取进行本地缓存注册信息,代码如下:

@Inject DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args,  Provider<BackupRegistry> backupRegistryProvider) {  if (clientConfig.shouldFetchRegistry() && !fetchRegistry(false)) {fetchRegistryFromBackup();  }  }

项目中配置

eureka.client.fetch-registry=true

便可以调用fetchRegistry方法,从eureka-server全量获取注册信息

Eureka-Client 启动时,还会初始化一个缓存刷新定时任务

private void initScheduledTasks() {  if (clientConfig.shouldFetchRegistry()) {// registry cache refresh timerint registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();scheduler.schedule(  new TimedSupervisorTask( “cacheRefresh“, scheduler, cacheRefreshExecutor, registryFetchIntervalSeconds, TimeUnit.SECONDS, expBackOffBound, new CacheRefreshThread()  ),  registryFetchIntervalSeconds, TimeUnit.SECONDS);  } }

每间隔 registryFetchIntervalSeconds(默认值是30) 秒执行一次CacheRefreshThread任务。CacheRefreshThread最终还是执行了fetchRegistry方法。

private boolean fetchRegistry(boolean forceFullRegistryFetch) {  try {Applications applications = getApplications();if (clientConfig.shouldDisableDelta()  || (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress()))  || forceFullRegistryFetch  || (applications == null)  || (applications.getRegisteredApplications().size() == 0)  || (applications.getVersion() == -1)) //Client application does not have latest library supporting delta{ getAndStoreFullRegistry();} else { getAndUpdateDelta(applications);}applications.setAppsHashCode(applications.getReconcileHashCode());  } catch (Throwable e) {logger.error(PREFIXappPathIdentifier“ - was unable to refresh its cache! status = “e.getMessage(), e);return false;  } finally {if (tracer != null) { tracer.stop();}  }  // Notify about cache refresh before updating the instance remote status  onCacheRefreshed();  // Update remote status based on refreshed data held in the cache  updateInstanceRemoteStatus();  // registry was fetched successfully, so return true  return true; }

fetchRegistry首先判断是全量获取还是增量获取,然后请求server端获取注册信息,成功后更新注册信息。再触发CacheRefreshed事件

Eureka-Server管理注册信息

客户端的请求到Server端后,通过ResponseCache返回服务信息

@GET public Response getContainers(@PathParam(“version“) String version, @HeaderParam(HEADER_ACCEPT) String acceptHeader, @HeaderParam(HEADER_ACCEPT_ENCODING) String acceptEncoding, @HeaderParam(EurekaAccept.HTTP_X_EUREKA_ACCEPT) String eurekaAccept, @Context UriInfo uriInfo, @Nullable @QueryParam(“regions“) String regionsStr) {  boolean isRemoteRegionRequested = null != regionsStr && !regionsStr.isEmpty();  String[] regions = null;  if (!isRemoteRegionRequested) {EurekaMonitors.GET_ALL.increment();  } else {regions = regionsStr.toLowerCase().split(“,“);Arrays.sort(regions); // So we don't have different caches for same regions queried in different order.EurekaMonitors.GET_ALL_WITH_REMOTE_REGIONS.increment();  }// 判断是否可以访问  if (!registry.shouldAllowAccess(isRemoteRegionRequested)) {return Response.status(Status.FORBIDDEN).build();  }  CurrentRequestVersion.set(Version.toEnum(version));  // 返回数据格式  KeyType keyType = Key.KeyType.JSON;  String returnMediaType = MediaType.APPLICATION_JSON;  if (acceptHeader == null || !acceptHeader.contains(HEADER_JSON_VALUE)) {keyType = Key.KeyType.XML;returnMediaType = MediaType.APPLICATION_XML;  }  // 响应缓存键( KEY )  Key cacheKey = new Key(Key.EntityType.Application, ResponseCacheImpl.ALL_APPS, keyType, CurrentRequestVersion.get(), EurekaAccept.fromString(eurekaAccept), regions  );  Response response;  if (acceptEncoding != null && acceptEncoding.contains(HEADER_GZIP_VALUE)) {  // 根据cacheKey返回注册信息response = Response.ok(responseCache.getGZIP(cacheKey))  .header(HEADER_CONTENT_ENCODING, HEADER_GZIP_VALUE)  .header(HEADER_CONTENT_TYPE, returnMediaType)  .build();  } else {response = Response.ok(responseCache.get(cacheKey))  .build();  }  return response; }

重点就是在responseCache中的get方法了了

String get(final Key key, boolean useReadOnlyCache) {  Value payload = getValue(key, useReadOnlyCache);  if (payload == null || payload.getPayload().equals(EMPTY_PAYLOAD)) {return null;  } else {return payload.getPayload();  } }private final ConcurrentMap<Key, Value> readOnlyCacheMap = new ConcurrentHashMap<Key, Value>();private final LoadingCache<Key, Value> readWriteCacheMap;this.readWriteCacheMap = CacheBuilder.newBuilder().initialCapacity(1000).expireAfterWrite(serverConfig.getResponseCacheAutoExpirationInSeconds(), TimeUnit.SECONDS).removalListener(new RemovalListener<Key, Value>() { @Override public void onRemoval(RemovalNotification<Key, Value> notification) {  Key removedKey = notification.getKey();  if (removedKey.hasRegions()) {Key cloneWithNoRegions = removedKey.cloneWithoutRegions();regionSpecificKeys.remove(cloneWithNoRegions, removedKey);  } }}).build(new CacheLoader<Key, Value>() { @Override public Value load(Key key) throws Exception {  if (key.hasRegions()) {Key cloneWithNoRegions = key.cloneWithoutRegions();regionSpecificKeys.put(cloneWithNoRegions, key);  }  Value value = generatePayload(key);  return value; }});Value getValue(final Key key, boolean useReadOnlyCache) {  Value payload
源文地址:https://www.guoxiongfei.cn/cntech/24501.html