Feign调用流程解析

0 前言

本文是对上一篇文章的补完,旨在明确以下问题:

  1. feign如何代理本地方法的调用;
  2. http调用参数是如何设置的;
  3. 注册中心的服务数据是如何与feign关联的;
  4. feign的负载均衡的实现。

1 FeignClient对象注入

上一篇文章说过,@FeignClient注解的对象基于FeignClientFactoryBean注册,那么我们分析feign的调用流程就从这个类开始:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
class FeignClientFactoryBean
implements FactoryBean<Object>, InitializingBean, ApplicationContextAware {

@Override
public void afterPropertiesSet() throws Exception {
Assert.hasText(this.contextId, "Context id must be set");
Assert.hasText(this.name, "Name must be set");
}

@Override
public Object getObject() throws Exception {
return getTarget();
}


@Override
public Class<?> getObjectType() {
return this.type;
}

@Override
public boolean isSingleton() {
return true;
}


@Override
public void setApplicationContext(ApplicationContext context) throws BeansException {
this.applicationContext = context;
}


@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
FeignClientFactoryBean that = (FeignClientFactoryBean) o;
return Objects.equals(this.applicationContext, that.applicationContext)
&& this.decode404 == that.decode404
&& Objects.equals(this.fallback, that.fallback)
&& Objects.equals(this.fallbackFactory, that.fallbackFactory)
&& Objects.equals(this.name, that.name)
&& Objects.equals(this.path, that.path)
&& Objects.equals(this.type, that.type)
&& Objects.equals(this.url, that.url);
}

@Override
public int hashCode() {
return Objects.hash(this.applicationContext, this.decode404, this.fallback,
this.fallbackFactory, this.name, this.path, this.type, this.url);
}

***部分省略***
}

FeignClientFactoryBean实现了FactoryBean<Object>接口,那么@FeignClient注解的bean注入时依赖于实现的getObject()方法。从上述源码可以看到该方法调用了getTarget()方法,那么我们来看看getTarget()的具体实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
<T> T getTarget() {
FeignContext context = this.applicationContext.getBean(FeignContext.class);
Feign.Builder builder = feign(context);

if (!StringUtils.hasText(this.url)) {
if (!this.name.startsWith("http")) {
this.url = "http://" + this.name;
}
else {
this.url = this.name;
}
this.url += cleanPath();
return (T) loadBalance(builder, context,
new HardCodedTarget<>(this.type, this.name, this.url));
}
if (StringUtils.hasText(this.url) && !this.url.startsWith("http")) {
this.url = "http://" + this.url;
}
String url = this.url + cleanPath();
Client client = getOptional(context, Client.class);
if (client != null) {
if (client instanceof LoadBalancerFeignClient) {
// not load balancing because we have a url,
// but ribbon is on the classpath, so unwrap
client = ((LoadBalancerFeignClient) client).getDelegate();
}
builder.client(client);
}
Targeter targeter = get(context, Targeter.class);
return (T) targeter.target(this, builder, context,
new HardCodedTarget<>(this.type, this.name, url));
}

1.1 feign(FeignContext context)

首先来看builder的创建,调用了feign(context),来看一下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
protected Feign.Builder feign(FeignContext context) {
FeignLoggerFactory loggerFactory = get(context, FeignLoggerFactory.class);
Logger logger = loggerFactory.create(this.type);

// @formatter:off
Feign.Builder builder = get(context, Feign.Builder.class)
// required values
.logger(logger)
.encoder(get(context, Encoder.class))
.decoder(get(context, Decoder.class))
.contract(get(context, Contract.class));
// @formatter:on

configureFeign(context, builder);

return builder;
}

这里涉及两个重要方法get(FeignContext context, Class<T> type)configureFeign(FeignContext context, Feign.Builder builder),接着逐一来看一下。

1.1.1 get(FeignContext context, Class type)

1
2
3
4
5
6
7
8
protected <T> T get(FeignContext context, Class<T> type) {
T instance = context.getInstance(this.contextId, type);
if (instance == null) {
throw new IllegalStateException(
"No bean found of type " + type + " for " + this.contextId);
}
return instance;
}

可以看到调用了context.getInstance(this.contextId, type),该方法继承自FeignContext的父类NamedContextFactory,来看一下

1
2
3
4
5
6
7
8
public <T> T getInstance(String name, Class<T> type) {
AnnotationConfigApplicationContext context = getContext(name);
if (BeanFactoryUtils.beanNamesForTypeIncludingAncestors(context,
type).length > 0) {
return context.getBean(type);
}
return null;
}
1
2
3
4
5
6
7
8
9
10
protected AnnotationConfigApplicationContext getContext(String name) {
if (!this.contexts.containsKey(name)) {
synchronized (this.contexts) {
if (!this.contexts.containsKey(name)) {
this.contexts.put(name, createContext(name));
}
}
}
return this.contexts.get(name);
}

根据上述两个方法可以看出当在contexts中根据contextId找不到对应实例时,将会调用createContext(name)创建一个加入到contexts中,那么我们来看看createContext(name)方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
protected AnnotationConfigApplicationContext createContext(String name) {
AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext();
//根据name找出configurations中注册的FeignClient类
if (this.configurations.containsKey(name)) {
for (Class<?> configuration : this.configurations.get(name)
.getConfiguration()) {
context.register(configuration);
}
}
//根据default前缀找出configurations中注册的EnableFeignClients类
for (Map.Entry<String, C> entry : this.configurations.entrySet()) {
if (entry.getKey().startsWith("default.")) {
for (Class<?> configuration : entry.getValue().getConfiguration()) {
context.register(configuration);
}
}
}
context.register(PropertyPlaceholderAutoConfiguration.class,
this.defaultConfigType);
context.getEnvironment().getPropertySources().addFirst(new MapPropertySource(
this.propertySourceName,
Collections.<String, Object>singletonMap(this.propertyName, name)));
if (this.parent != null) {
// Uses Environment from parent as well as beans
context.setParent(this.parent);
// jdk11 issue
// https://github.com/spring-cloud/spring-cloud-netflix/issues/3101
context.setClassLoader(this.parent.getClassLoader());
}
context.setDisplayName(generateDisplayName(name));
context.refresh();
return context;
}

该方法从configurations中分别找到指定的FeignClient注解类和EnableFeignClients注解的类,并注册到context中,返回一个AnnotationConfigApplicationContext类型对象。最终通过context.getBean(type)方法,得到已经创建好的bean对象。

1.1.2 configureFeign(FeignContext context, Feign.Builder builder)

回到feign(FeignContext context)方法中接着来看configureFeign(FeignContext context, Feign.Builder builder)的流程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
protected void configureFeign(FeignContext context, Feign.Builder builder) {
FeignClientProperties properties = this.applicationContext
.getBean(FeignClientProperties.class);
if (properties != null) {
if (properties.isDefaultToProperties()) {
configureUsingConfiguration(context, builder);
configureUsingProperties(
properties.getConfig().get(properties.getDefaultConfig()),
builder);
configureUsingProperties(properties.getConfig().get(this.contextId),
builder);
}
else {
configureUsingProperties(
properties.getConfig().get(properties.getDefaultConfig()),
builder);
configureUsingProperties(properties.getConfig().get(this.contextId),
builder);
configureUsingConfiguration(context, builder);
}
}
else {
configureUsingConfiguration(context, builder);
}
}

这里主要关注configureUsingConfiguration

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
protected void configureUsingConfiguration(FeignContext context,
Feign.Builder builder) {
Logger.Level level = getOptional(context, Logger.Level.class);
if (level != null) {
builder.logLevel(level);
}
Retryer retryer = getOptional(context, Retryer.class);
if (retryer != null) {
builder.retryer(retryer);
}
ErrorDecoder errorDecoder = getOptional(context, ErrorDecoder.class);
if (errorDecoder != null) {
builder.errorDecoder(errorDecoder);
}
Request.Options options = getOptional(context, Request.Options.class);
if (options != null) {
builder.options(options);
}
Map<String, RequestInterceptor> requestInterceptors = context
.getInstances(this.contextId, RequestInterceptor.class);
if (requestInterceptors != null) {
builder.requestInterceptors(requestInterceptors.values());
}

if (this.decode404) {
builder.decode404();
}
}

该方法从context中为builder赋值logLevelretryererrorDecoder等属性,这里需要说明的是retryer的默认实现如下所示,即默认关闭重试机制。

1
2
3
4
5
@Bean
@ConditionalOnMissingBean
public Retryer feignRetryer() {
return Retryer.NEVER_RETRY;
}

1.2 loadBalance(Feign.Builder builder, FeignContext context, HardCodedTarget target)

1
2
3
4
5
6
7
8
9
10
11
if (!StringUtils.hasText(this.url)) {
if (!this.name.startsWith("http")) {
this.url = "http://" + this.name;
}
else {
this.url = this.name;
}
this.url += cleanPath();
return (T) loadBalance(builder, context,
new HardCodedTarget<>(this.type, this.name, this.url));
}

回到getTarget()方法,分析完feign(context)创建builder的流程后,来看上述代码段,该情况对应的是@FeignClient注解没有设置url值,也就是没有设置请求的绝对路径的情况。这种情况下得到的url就是类似http://name这样的值,这就需要调用loadBalance方法来处理,来看一下该方法:

1
2
3
4
5
6
7
8
9
10
11
12
protected <T> T loadBalance(Feign.Builder builder, FeignContext context,
HardCodedTarget<T> target) {
Client client = getOptional(context, Client.class);
if (client != null) {
builder.client(client);
Targeter targeter = get(context, Targeter.class);
return targeter.target(this, builder, context, target);
}

throw new IllegalStateException(
"No Feign Client for loadBalancing defined. Did you forget to include spring-cloud-starter-netflix-ribbon?");
}

可以看到这里得到了一个Client类型的实例,由DefaultFeignLoadBalancedConfiguration类注入,具体源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
@Configuration
class DefaultFeignLoadBalancedConfiguration {

@Bean
@ConditionalOnMissingBean
public Client feignClient(CachingSpringLoadBalancerFactory cachingFactory,
SpringClientFactory clientFactory) {
return new LoadBalancerFeignClient(new Client.Default(null, null), cachingFactory,
clientFactory);
}

}

接着还得到了Targeter类型的实例,该对象上一篇文章中有分析过,根据feign.hystrix.HystrixFeign类存在与否会初始化TargeterHystrixTargeter或者DefaultTargeter类型,这里为了简化分析暂不考虑Hystrix相关内容,就以DefaultTargeter做为实现进行分析,来看看该类的target(this, builder, context, target)方法做了什么。

1
2
3
4
5
@Override
public <T> T target(FeignClientFactoryBean factory, Feign.Builder feign,
FeignContext context, Target.HardCodedTarget<T> target) {
return feign.target(target);
}

调用了Feign.Buildertarget方法:

1
2
3
public <T> T target(Target<T> target) {
return build().newInstance(target);
}

那么先来看一下build方法

1
2
3
4
5
6
7
8
9
public Feign build() {
SynchronousMethodHandler.Factory synchronousMethodHandlerFactory =
new SynchronousMethodHandler.Factory(client, retryer, requestInterceptors, logger,
logLevel, decode404, closeAfterDecode, propagationPolicy);
ParseHandlersByName handlersByName =
new ParseHandlersByName(contract, options, encoder, decoder, queryMapEncoder,
errorDecoder, synchronousMethodHandlerFactory);
return new ReflectiveFeign(handlersByName, invocationHandlerFactory, queryMapEncoder);
}

这里主要都是些初始化的工作,创建并返回一个ReflectiveFeign类型的对象,其中Feign.Builder对传入的contractoptionsencoder等参数均做了初始化,具体如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
private final List<RequestInterceptor> requestInterceptors =
new ArrayList<RequestInterceptor>();
private Logger.Level logLevel = Logger.Level.NONE;
private Contract contract = new Contract.Default();
private Client client = new Client.Default(null, null);
private Retryer retryer = new Retryer.Default();
private Logger logger = new NoOpLogger();
private Encoder encoder = new Encoder.Default();
private Decoder decoder = new Decoder.Default();
private QueryMapEncoder queryMapEncoder = new QueryMapEncoder.Default();
private ErrorDecoder errorDecoder = new ErrorDecoder.Default();
private Options options = new Options();
private InvocationHandlerFactory invocationHandlerFactory =
new InvocationHandlerFactory.Default();
private boolean decode404;
private boolean closeAfterDecode = true;
private ExceptionPropagationPolicy propagationPolicy = NONE;

2 代理对象创建

接着来看ReflectiveFeign对象的newInstance方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
@Override
public <T> T newInstance(Target<T> target) {
Map<String, MethodHandler> nameToHandler = targetToHandlersByName.apply(target);
Map<Method, MethodHandler> methodToHandler = new LinkedHashMap<Method, MethodHandler>();
List<DefaultMethodHandler> defaultMethodHandlers = new LinkedList<DefaultMethodHandler>();

for (Method method : target.type().getMethods()) {
if (method.getDeclaringClass() == Object.class) {
continue;
} else if (Util.isDefault(method)) {
DefaultMethodHandler handler = new DefaultMethodHandler(method);
defaultMethodHandlers.add(handler);
methodToHandler.put(method, handler);
} else {
methodToHandler.put(method, nameToHandler.get(Feign.configKey(target.type(), method)));
}
}
InvocationHandler handler = factory.create(target, methodToHandler);
T proxy = (T) Proxy.newProxyInstance(target.type().getClassLoader(),
new Class<?>[] {target.type()}, handler);

for (DefaultMethodHandler defaultMethodHandler : defaultMethodHandlers) {
defaultMethodHandler.bindTo(proxy);
}
return proxy;
}

方法的前半部分主要是通过for循环将nameToHandler中的方法(即@FeignClient注解类下的方法)一一注册到methodToHandler中。

后半部分,熟悉JDK动态代理的话一下就会发现,这里是在通过动态代理方式创建代理对象proxy,至于代理对象的具体实现就要来看下factory.create(target, methodToHandler)了,这里的factory是之前ReflectiveFeign初始化时传入的InvocationHandlerFactory对象,其具体定义为:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public interface InvocationHandlerFactory {
InvocationHandler create(Target var1, Map<Method, InvocationHandlerFactory.MethodHandler> var2);

public static final class Default implements InvocationHandlerFactory {
public Default() {
}

public InvocationHandler create(Target target, Map<Method, InvocationHandlerFactory.MethodHandler> dispatch) {
return new FeignInvocationHandler(target, dispatch);
}
}

public interface MethodHandler {
Object invoke(Object[] var1) throws Throwable;
}
}

这里可以看到其create方法的具体实现,跟进来看一下这个FeignInvocationHandler类。

3 代理方法调用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
static class FeignInvocationHandler implements InvocationHandler {

private final Target target;
private final Map<Method, MethodHandler> dispatch;

FeignInvocationHandler(Target target, Map<Method, MethodHandler> dispatch) {
this.target = checkNotNull(target, "target");
this.dispatch = checkNotNull(dispatch, "dispatch for %s", target);
}

@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
if ("equals".equals(method.getName())) {
try {
Object otherHandler =
args.length > 0 && args[0] != null ? Proxy.getInvocationHandler(args[0]) : null;
return equals(otherHandler);
} catch (IllegalArgumentException e) {
return false;
}
} else if ("hashCode".equals(method.getName())) {
return hashCode();
} else if ("toString".equals(method.getName())) {
return toString();
}

return dispatch.get(method).invoke(args);
}

***部分省略***
}

该类继承了InvocationHandler接口,根据jdk的动态代理的实现,该类就是代理的实现类,当调用代理接口的方法,即@FeignClient注解类的方法时,最终都会由该FeignInvocationHandler对象的invoke方法来处理。

3.1 apply(Target key)

可以看到这里的实际业务处理调用的是dispatch.get(method).invoke(args)。这里的dispatch即是之前提到过的methodToHandler对象,其通过targetToHandlersByName.apply(target)创建,来看一下该方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
  public Map<String, MethodHandler> apply(Target key) {
List<MethodMetadata> metadata = contract.parseAndValidatateMetadata(key.type());
Map<String, MethodHandler> result = new LinkedHashMap<String, MethodHandler>();
for (MethodMetadata md : metadata) {
BuildTemplateByResolvingArgs buildTemplate;
if (!md.formParams().isEmpty() && md.template().bodyTemplate() == null) {
buildTemplate = new BuildFormEncodedTemplateFromArgs(md, encoder, queryMapEncoder);
} else if (md.bodyIndex() != null) {
buildTemplate = new BuildEncodedTemplateFromArgs(md, encoder, queryMapEncoder);
} else {
buildTemplate = new BuildTemplateByResolvingArgs(md, queryMapEncoder);
}
result.put(md.configKey(),
factory.create(key, md, buildTemplate, options, decoder, errorDecoder));
}
return result;
}
}

这里返回的是一个map对象,其key为类名加方法名的组合,用于唯一标识一个方法,value通过factory.create(key, md, buildTemplate, options, decoder, errorDecoder)生成,其中根据方法的参数和方法体情况不同会创建不同的buildTemplate。这里的factory是之前通过SynchronousMethodHandler.Factory synchronousMethodHandlerFactory = new SynchronousMethodHandler.Factory(client, retryer, requestInterceptors, logger,logLevel, decode404, closeAfterDecode, propagationPolicy);创建的,我们来看看其create方法

1
2
3
4
5
6
7
8
9
10
11
  public MethodHandler create(Target<?> target,
MethodMetadata md,
RequestTemplate.Factory buildTemplateFromArgs,
Options options,
Decoder decoder,
ErrorDecoder errorDecoder) {
return new SynchronousMethodHandler(target, client, retryer, requestInterceptors, logger,
logLevel, md, buildTemplateFromArgs, options, decoder,
errorDecoder, decode404, closeAfterDecode, propagationPolicy);
}
}

最后返回的value值即是一个SynchronousMethodHandler对象,那么dispatch.get(method).invoke(args)调用的就是该对象的invoke方法,来看一下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
@Override
public Object invoke(Object[] argv) throws Throwable {
RequestTemplate template = buildTemplateFromArgs.create(argv);
Retryer retryer = this.retryer.clone();
while (true) {
try {
return executeAndDecode(template);
} catch (RetryableException e) {
try {
retryer.continueOrPropagate(e);
} catch (RetryableException th) {
Throwable cause = th.getCause();
if (propagationPolicy == UNWRAP && cause != null) {
throw cause;
} else {
throw th;
}
}
if (logLevel != Logger.Level.NONE) {
logger.logRetry(metadata.configKey(), logLevel);
}
continue;
}
}
}
3.2 executeAndDecode(RequestTemplate template)

该方法实质是创建了一个template对象,然后调用executeAndDecode(template)方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
  Object executeAndDecode(RequestTemplate template) throws Throwable {
Request request = targetRequest(template);

if (logLevel != Logger.Level.NONE) {
logger.logRequest(metadata.configKey(), logLevel, request);
}

Response response;
long start = System.nanoTime();
try {
response = client.execute(request, options);
} catch (IOException e) {
if (logLevel != Logger.Level.NONE) {
logger.logIOException(metadata.configKey(), logLevel, e, elapsedTime(start));
}
throw errorExecuting(request, e);
}

***部分省略***
}

这里的核心是client.execute(request, options)的调用,因为clientLoadBalancerFeignClient的实例,那么需要看一下它的execute方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@Override
public Response execute(Request request, Request.Options options) throws IOException {
try {
URI asUri = URI.create(request.url());
String clientName = asUri.getHost();
URI uriWithoutHost = cleanUrl(request.url(), clientName);
FeignLoadBalancer.RibbonRequest ribbonRequest = new FeignLoadBalancer.RibbonRequest(
this.delegate, request, uriWithoutHost);

IClientConfig requestConfig = getClientConfig(options, clientName);
return lbClient(clientName)
.executeWithLoadBalancer(ribbonRequest, requestConfig).toResponse();
}

***部分省略***
}

分别来看一下其中最核心的getClientConfigexecuteWithLoadBalancer方法

3.2.1 getClientConfig
1
2
3
4
5
6
7
8
9
10
IClientConfig getClientConfig(Request.Options options, String clientName) {
IClientConfig requestConfig;
if (options == DEFAULT_OPTIONS) {
requestConfig = this.clientFactory.getClientConfig(clientName);
}
else {
requestConfig = new FeignOptionsClientConfig(options);
}
return requestConfig;
}

这里的clientFactorySpringClientFactory的实例,其getClientConfig方法为

1
2
3
public IClientConfig getClientConfig(String name) {
return getInstance(name, IClientConfig.class);
}
1
2
3
4
5
6
7
8
9
@Override
public <C> C getInstance(String name, Class<C> type) {
C instance = super.getInstance(name, type);
if (instance != null) {
return instance;
}
IClientConfig config = getInstance(name, IClientConfig.class);
return instantiateWithConfig(getContext(name), type, config);
}

可以看到其实际调用的是super.getInstance(name, type),也就是父类NamedContextFactorygetInstance方法,这个方法之前已经分析过了,所以很自然的知道getClientConfig最后返回的是IClientConfig类型的bean对象,该bean的初始化在RibbonClientConfiguration类中,源码如下:

1
2
3
4
5
6
7
8
9
10
@Bean
@ConditionalOnMissingBean
public IClientConfig ribbonClientConfig() {
DefaultClientConfigImpl config = new DefaultClientConfigImpl();
config.loadProperties(this.name);
config.set(CommonClientConfigKey.ConnectTimeout, DEFAULT_CONNECT_TIMEOUT);
config.set(CommonClientConfigKey.ReadTimeout, DEFAULT_READ_TIMEOUT);
config.set(CommonClientConfigKey.GZipPayload, DEFAULT_GZIP_PAYLOAD);
return config;
}

这里的DefaultClientConfigImpl类中已经定义好了大部分http请求所需参数,如MaxTotalHttpConnectionsPoolKeepAliveTimeMaxAutoRetries等,而从代码也可以看出ConnectTimeoutReadTimeoutGZipPayload三个参数是单独设置的。

3.2.2 executeWithLoadBalancer

首先lbClient(clientName)返回的是一个FeignLoadBalancer的实例,那么我们重点关注其executeWithLoadBalancer(ribbonRequest, requestConfig)的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
  public T executeWithLoadBalancer(final S request, final IClientConfig requestConfig) throws ClientException {
LoadBalancerCommand<T> command = buildLoadBalancerCommand(request, requestConfig);

try {
return command.submit(
new ServerOperation<T>() {
@Override
public Observable<T> call(Server server) {
URI finalUri = reconstructURIWithServer(server, request.getUri());
S requestForServer = (S) request.replaceUri(finalUri);
try {
return Observable.just(AbstractLoadBalancerAwareClient.this.execute(requestForServer, requestConfig));
}
catch (Exception e) {
return Observable.error(e);
}
}
})
.toBlocking()
.single();

***部分省略***
}

可以看到,这里通过AbstractLoadBalancerAwareClient.this.execute(requestForServer, requestConfig)方法即通过http请求方式获取到请求返回值,由于请求不是我关注的重点暂不深入分析,我关注的是server的获取,那么需要继续分析submit方法(这里开始大量使用了rxjava进行异步编程,暂不做讲解):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
   public Observable<T> submit(final ServerOperation<T> operation) {
final ExecutionInfoContext context = new ExecutionInfoContext();

***部分省略***

// Use the load balancer
Observable<T> o =
(server == null ? selectServer() : Observable.just(server))
.concatMap(new Func1<Server, Observable<T>>() {
@Override
// Called for each server being selected
public Observable<T> call(Server server) {
context.setServer(server);

***部分省略***
}

方法较长,取了其中为server赋值的一段,可以看到当server为空时,会调用selectServer()进行获取:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
private Observable<Server> selectServer() {
return Observable.create(new OnSubscribe<Server>() {
@Override
public void call(Subscriber<? super Server> next) {
try {
Server server = loadBalancerContext.getServerFromLoadBalancer(loadBalancerURI, loadBalancerKey);
next.onNext(server);
next.onCompleted();
} catch (Exception e) {
next.onError(e);
}
}
});
}

这里的serverloadBalancerContext.getServerFromLoadBalancer(loadBalancerURI, loadBalancerKey)创建:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
   public Server getServerFromLoadBalancer(@Nullable URI original, @Nullable Object loadBalancerKey) throws ClientException {
String host = null;
int port = -1;
if (original != null) {
host = original.getHost();
}
if (original != null) {
Pair<String, Integer> schemeAndPort = deriveSchemeAndPortFromPartialUri(original);
port = schemeAndPort.second();
}

ILoadBalancer lb = getLoadBalancer();
if (host == null) {

if (lb != null){
Server svc = lb.chooseServer(loadBalancerKey);
if (svc == null){
throw new ClientException(ClientException.ErrorType.GENERAL,
"Load balancer does not have available server for client: "
+ clientName);
}
host = svc.getHost();
if (host == null){
throw new ClientException(ClientException.ErrorType.GENERAL,
"Invalid Server for :" + svc);
}
logger.debug("{} using LB returned Server: {} for request {}", new Object[]{clientName, svc, original});
return svc;

***部分省略***
}

该方法操作的核心是lb对象,该对象的allServerList属性包含了从注册中心获取的对应服务的所有地址列表,该实例在RibbonClientConfiguration中初始化:

1
2
3
4
5
6
7
8
9
10
11
@Bean
@ConditionalOnMissingBean
public ILoadBalancer ribbonLoadBalancer(IClientConfig config,
ServerList<Server> serverList, ServerListFilter<Server> serverListFilter,
IRule rule, IPing ping, ServerListUpdater serverListUpdater) {
if (this.propertiesFactory.isSet(ILoadBalancer.class, name)) {
return this.propertiesFactory.get(ILoadBalancer.class, config, name);
}
return new ZoneAwareLoadBalancer<>(config, rule, ping, serverList,
serverListFilter, serverListUpdater);
}

顺带一提这里的serverList是注册中心的地址列表,并且其类型根据注册中心不同而有所不同,例如我使用的是Nacos,该类就是NacosServerList的实例。其他注册中心相关内容暂不做深入,所以继续来看lb.chooseServer(loadBalancerKey)

1
2
3
4
5
6
7
8
   @Override
public Server chooseServer(Object key) {
if (!ENABLED.get() || getLoadBalancerStats().getAvailableZones().size() <= 1) {
logger.debug("Zone aware logic disabled or there is only one zone");
return super.chooseServer(key);
}
***部分省略***
}

ZoneAwareLoadBalancer支持多区域的负载均衡,但这里我们默认以最常见的单个区域情况进行分析,可以看到调用了父类BaseLoadBalancerchooseServer方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public Server chooseServer(Object key) {
if (counter == null) {
counter = createCounter();
}
counter.increment();
if (rule == null) {
return null;
} else {
try {
return rule.choose(key);
} catch (Exception e) {
logger.warn("LoadBalancer [{}]: Error choosing server for key {}", name, key, e);
return null;
}
}
}

这里的rule对象是实现请求负载均衡的核心,其默认的实现为(如需要其他的负载均衡实现需要自行进行IRule的注入):

1
2
3
4
5
6
7
8
9
10
@Bean
@ConditionalOnMissingBean
public IRule ribbonRule(IClientConfig config) {
if (this.propertiesFactory.isSet(IRule.class, name)) {
return this.propertiesFactory.get(IRule.class, config, name);
}
ZoneAvoidanceRule rule = new ZoneAvoidanceRule();
rule.initWithNiwsConfig(config);
return rule;
}

ZoneAvoidanceRule类继承了ClientConfigEnabledRoundRobinRule,因此其本质是实现了轮询规则,其choose方法实现为:

1
2
3
4
5
6
7
8
9
10
@Override
public Server choose(Object key) {
ILoadBalancer lb = getLoadBalancer();
Optional<Server> server = getPredicate().chooseRoundRobinAfterFiltering(lb.getAllServers(), key);
if (server.isPresent()) {
return server.get();
} else {
return null;
}
}

主要实现在chooseRoundRobinAfterFiltering方法中:

1
2
3
4
5
6
7
public Optional<Server> chooseRoundRobinAfterFiltering(List<Server> servers, Object loadBalancerKey) {
List<Server> eligible = getEligibleServers(servers, loadBalancerKey);
if (eligible.size() == 0) {
return Optional.absent();
}
return Optional.of(eligible.get(incrementAndGetModulo(eligible.size())));
}

到这里就很明确了,该方法通过incrementAndGet方式进行计数以轮询方式从servers中获取可访问的Server对象。

4.总结

通过上述流程的分析基本了解了开篇提出的4个问题,对Feign有了一个大概的了解,也知道了如何对Feign进行一些个性化的配置,但我们知道Feign包含了Ribbon和Hystrix的功能,目前的分析中并未涉及Hystrix,这块也是后面有时间的时候要补上的内容。