主意调用的时候即便调用代理,一 客户端请求总体流程

此地就推行到了netty内部,通过netty自身的NioClientSocketChannel将音信发送给服务端。(这里发送在此以前有编码行为,后续会讲)

public void send(Object message, boolean sent) throws RemotingException { //重连 if (send_reconnect && !isConnected{ connect(); } //先获取Channel,是在NettyClient中实现的 Channel channel = getChannel(); //TODO getChannel返回的状态是否包含null需要改进 if (channel == null || ! channel.isConnected { throw new RemotingException(this, "message can not send, because channel is closed . url:" + getUrl; } channel是NettyChannel channel.send(message, sent);}

第三将请求参数封装成二个MuranopcInvocation实例,如下:

public void send(Object message) throws RemotingException { //NettyChannel send(message, url.getParameter(Constants.SENT_KEY, false));}
 1     private volatile List<Router> routers;
 2     public List<Invoker<T>> list(Invocation invocation) throws RpcException {
 3         ...
 4         List<Invoker<T>> invokers = doList(invocation);
 5         List<Router> localRouters = this.routers; // local reference
 6         if (localRouters != null && localRouters.size() > 0) {
 7             for (Router router : localRouters) {
 8                 try {
 9                     if (router.getUrl() == null || router.getUrl().getParameter(Constants.RUNTIME_KEY, true)) {
10                         invokers = router.route(invokers, getConsumerUrl(), invocation);
11                     }
12                 } catch (Throwable t) {
13                     logger.error("Failed to execute router: " + getUrl() + ", cause: " + t.getMessage(), t);
14                 }
15             }
16         }
17         return invokers;
18     }
Invoker<?> getInvoker(Channel channel, Invocation inv) throws RemotingException{ boolean isCallBackServiceInvoke = false; boolean isStubServiceInvoke = false; int port = channel.getLocalAddress().getPort(); String path = inv.getAttachments().get(Constants.PATH_KEY); //如果是客户端的回调服务. isStubServiceInvoke = Boolean.TRUE.toString().equals(inv.getAttachments().get(Constants.STUB_EVENT_KEY)); if (isStubServiceInvoke){ port = channel.getRemoteAddress().getPort(); } //callback isCallBackServiceInvoke = isClientSide && !isStubServiceInvoke; if(isCallBackServiceInvoke){ path = inv.getAttachments().get(Constants.PATH_KEY)+"."+inv.getAttachments().get(Constants.CALLBACK_SERVICE_KEY); inv.getAttachments().put(IS_CALLBACK_SERVICE_INVOKE, Boolean.TRUE.toString; } String serviceKey = serviceKey(port, path, inv.getAttachments().get(Constants.VERSION_KEY), inv.getAttachments().get(Constants.GROUP_KEY)); //从之前缓存的exporterMap中查找Exporter //key:dubbo.common.hello.service.HelloService:20880 DubboExporter<?> exporter = (DubboExporter<?>) exporterMap.get(serviceKey); if (exporter == null) throw new RemotingException; //得到Invoker,返回 return exporter.getInvoker();}

第3实施doList(invocation)方法赢得出List<Invoker<T>>,之后选用router循环过滤,最终回来过滤后的List<Invoker<T>>。

买主端的DubboInvoker发起请求后,后续的逻辑是异步的或是钦赐超时时间内阻塞的,直到获得响应结果后,继续执行DubboInvoker中逻辑。

 1 public class InvokerInvocationHandler implements InvocationHandler {
 2     private final Invoker<?> invoker;//MockClusterInvoker实例
 3 
 4     public InvokerInvocationHandler(Invoker<?> handler) {
 5         this.invoker = handler;
 6     }
 7 
 8     public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
 9         String methodName = method.getName();
10         Class<?>[] parameterTypes = method.getParameterTypes();
11         if (method.getDeclaringClass() == Object.class) {
12             return method.invoke(invoker, args);
13         }
14         if ("toString".equals(methodName) && parameterTypes.length == 0) {
15             return invoker.toString();
16         }
17         if ("hashCode".equals(methodName) && parameterTypes.length == 0) {
18             return invoker.hashCode();
19         }
20         if ("equals".equals(methodName) && parameterTypes.length == 1) {
21             return invoker.equals(args[0]);
22         }
23         return invoker.invoke(new RpcInvocation(method, args)).recreate();
24     }
25 }

Result result =
invoker.invoke(invocation);调用并回到结果,会率先进入InvokerWrapper,然后进入ListenerInvokerWrapper的invoke方法,接着进入AbstractInvoker的invoke:

RandomLoadBalance.doSelect

此间会去运转新线程执行Channel伊夫ntRunnable的run方法,接着去调用DecodeHandler的received方法:

 1     public void send(Object message, boolean sent) throws RemotingException {
 2         if (send_reconnect && !isConnected()) {
 3             connect();
 4         }
 5         Channel channel = getChannel();//NettyChannel
 6         //TODO getChannel返回的状态是否包含null需要改进
 7         if (channel == null || !channel.isConnected()) {
 8             throw new RemotingException(this, "message can not send, because channel is closed . url:" + getUrl());
 9         }
10         channel.send(message, sent);
11     }

注意:

八.二创设客户端源码解析中我们看到最终收获的demoService是二个proxy0代理对象。以后来分析第1行代码。

public Result invoke(Invocation inv) throws RpcException { if(destroyed) { throw new RpcException; } //转成RpcInvocation RpcInvocation invocation = (RpcInvocation) inv; invocation.setInvoker; if (attachment != null && attachment.size { invocation.addAttachmentsIfAbsent(attachment); } Map<String, String> context = RpcContext.getContext().getAttachments(); if (context != null) { invocation.addAttachmentsIfAbsent; } if .getMethodParameter(invocation.getMethodName(), Constants.ASYNC_KEY, false)){ invocation.setAttachment(Constants.ASYNC_KEY, Boolean.TRUE.toString; } //异步的话,需要添加id RpcUtils.attachInvocationIdIfAsync, invocation); try { //这里是DubboInvoker return doInvoke(invocation); } catch (InvocationTargetException e) { } }

一 客户端请求总体流程

public void received(Channel channel, Object message) throws RemotingException { //获取线程池执行 ExecutorService cexecutor = getExecutorService(); try { //handler是DecodeHandler cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message)); } catch (Throwable t) { }}
 1     public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
 2         List<Invoker<T>> copyinvokers = invokers;
 3         checkInvokers(copyinvokers, invocation);
 4         int len = getUrl().getMethodParameter(invocation.getMethodName(), Constants.RETRIES_KEY, Constants.DEFAULT_RETRIES) + 1;//默认是2+1次
 5         if (len <= 0) {
 6             len = 1;
 7         }
 8         // retry loop.
 9         RpcException le = null; // last exception.
10         List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyinvokers.size()); // invoked invokers.
11         Set<String> providers = new HashSet<String>(len);
12         for (int i = 0; i < len; i++) {
13             //重试时,进行重新选择,避免重试时invoker列表已发生变化.
14             //注意:如果列表发生了变化,那么invoked判断会失效,因为invoker示例已经改变
15             if (i > 0) {
16                 checkWhetherDestroyed();
17                 copyinvokers = list(invocation);
18                 //重新检查一下
19                 checkInvokers(copyinvokers, invocation);
20             }
21             Invoker<T> invoker = select(loadbalance, invocation, copyinvokers, invoked);
22             invoked.add(invoker);
23             RpcContext.getContext().setInvokers((List) invoked);
24             try {
25                 Result result = invoker.invoke(invocation);
26                 ...
27                 return result;
28             } catch (RpcException e) {
29                 if (e.isBiz()) { // biz exception.
30                     throw e;
31                 }
32                 le = e;
33             } catch (Throwable e) {
34                 le = new RpcException(e.getMessage(), e);
35             } finally {
36                 providers.add(invoker.getUrl().getAddress());
37             }
38         }
39         throw new RpcException(le ...);
40     }
public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException { List<Invoker<T>> copyinvokers = invokers; //检查invokers是否为空 checkInvokers(copyinvokers, invocation); //重试次数 int len = getUrl().getMethodParameter(invocation.getMethodName(), Constants.RETRIES_KEY, Constants.DEFAULT_RETRIES) + 1; if (len <= 0) { len = 1; } // retry loop. RpcException le = null; // last exception. //已经调用过的invoker List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyinvokers.size; // invoked invokers. Set<String> providers = new HashSet<String>; for (int i = 0; i < len; i++) { //重试时,进行重新选择,避免重试时invoker列表已发生变化. //注意:如果列表发生了变化,那么invoked判断会失效,因为invoker示例已经改变 if  { checkWheatherDestoried(); copyinvokers = list(invocation); //重新检查一下 checkInvokers(copyinvokers, invocation); } //使用负载均衡选择invoker.(负载均衡咱先不做解释) Invoker<T> invoker = select(loadbalance, invocation, copyinvokers, invoked); invoked.add; //添加到以调用过的列表中 RpcContext.getContext().setInvokersinvoked); try { //开始调用,返回结果 Result result = invoker.invoke(invocation); return result; } catch (RpcException e) {。。。 } finally { providers.add(invoker.getUrl().getAddress; } } throw new RpcException;}
1         DemoService demoService = (DemoService) context.getBean("demoService"); // 获取远程服务代理
2         String hello = demoService.sayHello("world"); // 执行远程方法

进入MultiMessageHandler:

2 源码分析

进去拍卖response的decode方法,进行解码response:

ReferenceCountExchangeClient.request

进入MultiMessageHandler的received方法:

 1     protected Invoker<T> select(LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, List<Invoker<T>> selected) throws RpcException {
 2         ...
 3         Invoker<T> invoker = doselect(loadbalance, invocation, invokers, selected);
 4         ..
 5         return invoker;
 6     }
 7 
 8     private Invoker<T> doselect(LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, List<Invoker<T>> selected) throws RpcException {
 9         if (invokers == null || invokers.size() == 0)
10             return null;
11         if (invokers.size() == 1)
12             return invokers.get(0);
13         // 如果只有两个invoker,并且其中一个已经有至少一个被选过了,退化成轮循
14         if (invokers.size() == 2 && selected != null && selected.size() > 0) {
15             return selected.get(0) == invokers.get(0) ? invokers.get(1) : invokers.get(0);
16         }
17         Invoker<T> invoker = loadbalance.select(invokers, getUrl(), invocation);
18 
19         //如果 selected中包含(优先判断) 或者 不可用&&availablecheck=true 则重试.
20         if ((selected != null && selected.contains(invoker))
21                 || (!invoker.isAvailable() && getUrl() != null && availablecheck)) {
22             try {
23                 Invoker<T> rinvoker = reselect(loadbalance, invocation, invokers, selected, availablecheck);
24                 ...
25             } catch (Throwable t) {
26                ...
27             }
28         }
29         return invoker;
30     }

随之会跻身HeaderExchangerHandler.received () 方法:

1 ExchangeClient[] clients = [ReferenceCountExchangeClient实例]//如果设置了多条连接,此处有多个client

对于异步请求时,消费者获得Future,别的逻辑均是异步的。

//代理发出请求
proxy0.sayHello(String paramString)
-->InvokerInvocationHandler.invoke(Object proxy, Method method, Object[] args)
  -->new RpcInvocation(method, args)
  -->MockClusterInvoker.invoke(Invocation invocation)//服务降级的地方
    //ClusterInvoker将多个Invoker伪装成一个集群版的Invoker
    -->AbstractClusterInvoker.invoke(final Invocation invocation)
      //获取Invokers
      -->list(Invocation invocation)
        -->AbstractDirectory.list(Invocation invocation)
          -->RegistryDirectory.doList(Invocation invocation)//从Map<String, List<Invoker<T>>> methodInvokerMap中获取key为sayHello的List<Invoker<T>>
          -->MockInvokersSelector.getNormalInvokers(final List<Invoker<T>> invokers)//对上述的List<Invoker<T>>再进行一次过滤(这里比如说过滤出所有协议为mock的Invoker,如果一个也没有就全部返回),这就是router的作用
      //获取负载均衡器
      -->loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(invokers.get(0).getUrl()
                    .getMethodParameter(invocation.getMethodName(), Constants.LOADBALANCE_KEY, Constants.DEFAULT_LOADBALANCE))//默认为random
      -->RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation)//异步操作添加invocationID
      -->FailoverClusterInvoker.doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance)
        //使用负载均衡器选择一个Invoker出来:RegistryDirectory$InvokerDelegete实例
        -->AbstractClusterInvoker.select(LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, List<Invoker<T>> selected)
          -->doselect(LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, List<Invoker<T>> selected)
            -->AbstractLoadBalance.select(List<Invoker<T>> invokers, URL url, Invocation invocation)
              -->RandomLoadBalance.doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation)
        //执行listener和filter链
        -->ListenerInvokerWrapper.invoke
          -->ConsumerContextFilter.invoke(Invoker<?> invoker, Invocation invocation)//设置一些RpcContext属性,并且设置invocation中的invoker属性
            -->FutureFilter.invoke(Invocation invocation)
              -->MonitorFilter.invoke(Invocation invocation)//monitor在这里收集数据
                -->AbstractInvoker.invoke(Invocation inv)//重新设置了invocation中的invoker属性和attachment属性
                  -->DubboInvoker.doInvoke(final Invocation invocation)
                    //获取ExchangeClient进行消息的发送
                    -->ReferenceCountExchangeClient.request(Object request, int timeout)
                      -->HeaderExchangeClient.request(Object request, int timeout)
                        -->HeaderExchangeChannel.request(Object request, int timeout)
                          -->AbstractClient.send(Object message, boolean sent)//NettyClient的父类
                            -->getChannel()//NettyChannel实例,其内部channel实例=NioClientSocketChannel实例
                            -->NettyChannel.send(Object message, boolean sent)
                              -->NioClientSocketChannel.write(Object message)//已经是netty的东西了,这里的message=Request实例:最重要的是RpcInvocation [methodName=sayHello, parameterTypes=[class java.lang.String], arguments=[world], attachments={path=com.alibaba.dubbo.demo.DemoService, interface=com.alibaba.dubbo.demo.DemoService, version=0.0.0}]
//子类处理,接着是AbstractClient执行发送public void send(Object message) throws RemotingException { send(message, url.getParameter(Constants.SENT_KEY, false));}
 1     private final Directory<T> directory;//RegistryDirectory
 2     private final Invoker<T> invoker;//FailoverClusterInvoker
 3 
 4     /**
 5      * 这里实际上会根据配置的mock参数来做服务降级:
 6      * 1 如果没有配置mock参数或者mock=false,则进行远程调用;
 7      * 2 如果配置了mock=force:return null,则直接返回null,不进行远程调用;
 8      * 3 如果配置了mock=fail:return null,先进行远程调用,失败了在进行mock调用。
 9      */
10     public Result invoke(Invocation invocation) throws RpcException {
11         Result result = null;
12         //sayHello.mock->mock->default.mock
13         String value = directory.getUrl().getMethodParameter(invocation.getMethodName(), Constants.MOCK_KEY, Boolean.FALSE.toString()).trim();
14         if (value.length() == 0 || value.equalsIgnoreCase("false")) {
15             //no mock
16             result = this.invoker.invoke(invocation);
17         } else if (value.startsWith("force")) {
18             if (logger.isWarnEnabled()) {
19                 logger.info("force-mock: " + invocation.getMethodName() + " force-mock enabled , url : " + directory.getUrl());
20             }
21             //force:direct mock
22             result = doMockInvoke(invocation, null);
23         } else {
24             //fail-mock
25             try {
26                 result = this.invoker.invoke(invocation);
27             } catch (RpcException e) {
28                 if (e.isBiz()) {
29                     throw e;
30                 } else {
31                     if (logger.isWarnEnabled()) {
32                         logger.info("fail-mock: " + invocation.getMethodName() + " fail-mock enabled , url : " + directory.getUrl(), e);
33                     }
34                     result = doMockInvoke(invocation, e);
35                 }
36             }
37         }
38         return result;
39     }

这一步设置response到买主请求的Future中,以供消费者通过DefaultFuture.get()取得提供者的响应,此为同步转异步首要一步,且请求超时也由DefaultFuture控制。

首先使用负载均衡器获取四个RegistryDirectory$InvokerDelegete实例,然后使用选出的RegistryDirectory$InvokerDelegete.invoke举办呼吁发送。

public Result invoke(final Invocation invocation) throws RpcException { //检查是否被销毁 checkWheatherDestoried(); LoadBalance loadbalance; //根据invocation中的参数来获取所有的invoker列表 List<Invoker<T>> invokers = list(invocation); if (invokers != null && invokers.size { //我们没有配置负载均衡的参数,默认使用random //这里得到的是RandomLoadBalance loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(invokers.get.getUrl() .getMethodParameter(invocation.getMethodName(),Constants.LOADBALANCE_KEY, Constants.DEFAULT_LOADBALANCE)); } else { loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(Constants.DEFAULT_LOADBALANCE); } //如果是异步操作默认添加invocation id RpcUtils.attachInvocationIdIfAsync, invocation); //这里是子类实现,FailoverClusterInvoker中,执行调用 return doInvoke(invocation, invokers, loadbalance);}
 1     private final ExchangeClient[] clients;
 2 
 3     protected Result doInvoke(final Invocation invocation) throws Throwable {
 4         RpcInvocation inv = (RpcInvocation) invocation;
 5         final String methodName = RpcUtils.getMethodName(invocation);
 6         inv.setAttachment(Constants.PATH_KEY, getUrl().getPath());
 7         inv.setAttachment(Constants.VERSION_KEY, version);
 8 
 9         ExchangeClient currentClient;
10         if (clients.length == 1) {
11             currentClient = clients[0];//单一长连接。默认
12         } else {
13             currentClient = clients[index.getAndIncrement() % clients.length];
14         }
15         try {
16             boolean isAsync = RpcUtils.isAsync(getUrl(), invocation);//是否异步
17             boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);//是否没有返回值
18             int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
19             if (isOneway) {
20                 boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
21                 currentClient.send(inv, isSent);
22                 RpcContext.getContext().setFuture(null);
23                 return new RpcResult();
24             } else if (isAsync) {
25                 ResponseFuture future = currentClient.request(inv, timeout);
26                 RpcContext.getContext().setFuture(new FutureAdapter<Object>(future));
27                 return new RpcResult();
28             } else {
29                 RpcContext.getContext().setFuture(null);
30                 return (Result) currentClient.request(inv, timeout).get();
31             }
32         } catch (TimeoutException e) {
33             throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
34         } catch (RemotingException e) {
35             throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
36         }
37     }

先看下new
翼虎pcInvocation,Invocation是会话域,它富有调用进度中的变量,比如方法名,参数类型等。接着是invoker.invoke(),那里invoker是MockClusterInvoker,进入MockClusterInvoker.invoker():

 1     public String sayHello(String paramString) {
 2         Object[] arrayOfObject = new Object[1];
 3         arrayOfObject[0] = paramString;
 4         Object localObject = null;
 5         try {
 6             localObject = this.handler.invoke(this, DemoService.class.getMethod("sayHello"), arrayOfObject);
 7         } catch (Throwable e) {
 8             // TODO Auto-generated catch block
 9             e.printStackTrace();
10         }
11         return (String) localObject;
12     }
public ResponseFuture request(Object request, int timeout) throws RemotingException { //这里的Channel是HeaderExchangeChannel return channel.request(request, timeout);}
 1     public List<Invoker<T>> doList(Invocation invocation) {
 2         ...
 3         List<Invoker<T>> invokers = null;
 4         Map<String, List<Invoker<T>>> localMethodInvokerMap = this.methodInvokerMap; // local reference
 5         if (localMethodInvokerMap != null && localMethodInvokerMap.size() > 0) {
 6             String methodName = RpcUtils.getMethodName(invocation);
 7             Object[] args = RpcUtils.getArguments(invocation);
 8             if (args != null && args.length > 0 && args[0] != null
 9                     && (args[0] instanceof String || args[0].getClass().isEnum())) {
10                 invokers = localMethodInvokerMap.get(methodName + "." + args[0]); // 可根据第一个参数枚举路由 sayHello.world
11             }
12             if (invokers == null) {
13                 invokers = localMethodInvokerMap.get(methodName);
14             }
15             if (invokers == null) {
16                 invokers = localMethodInvokerMap.get(Constants.ANY_VALUE);
17             }
18             if (invokers == null) {
19                 Iterator<List<Invoker<T>>> iterator = localMethodInvokerMap.values().iterator();
20                 if (iterator.hasNext()) {
21                     invokers = iterator.next();
22                 }
23             }
24         }
25         return invokers == null ? new ArrayList<Invoker<T>>(0) : invokers;
26     }

解析完请求,构造完响应新闻,就从头发送响应了,channel.send;,先经过AbstractPeer:

RegistryDirectory.doList(invocation)

public void received(Channel ch, Object msg) throws RemotingException { if  { return; } //这里是MultiMessageHandler handler.received;}

先是来看proxy0.sayHello

public void received(Channel channel, Object message) throws RemotingException { if (message instanceof Decodeable) { decode; } if (message instanceof Request) { decode(message).getData; } //这里进行response类型的处理 if (message instanceof Response) { decode( (message).getResult; } handler.received(channel, message);}

来看一下客户端请求代码:

public Object reply(ExchangeChannel channel, Object message) throws RemotingException { if (message instanceof Invocation) { //Invocation中保存着方法名等 Invocation inv = (Invocation) message; //获取Invoker Invoker<?> invoker = getInvoker(channel, inv); //如果是callback 需要处理高版本调用低版本的问题 if (Boolean.TRUE.toString().equals(inv.getAttachments().get(IS_CALLBACK_SERVICE_INVOKE))){ String methodsStr = invoker.getUrl().getParameters().get("methods"); boolean hasMethod = false; if (methodsStr == null || methodsStr.indexOf == -1){ hasMethod = inv.getMethodName().equals(methodsStr); } else { String[] methods = methodsStr.split; for (String method : methods){ if (inv.getMethodName().equals{ hasMethod = true; break; } } } if (!hasMethod){ return null; } } RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress; //执行调用,然后返回结果 return invoker.invoke; } throw new RemotingException; }

最后来看RegistryDirectory$InvokerDelegete.invoke,该情势其实在其父类InvokerWrapper中:

先经过AbstractPeer的received方法:

一体化流程:

劳动提供者端接收到消费者端的请求并拍卖现在,重返给消费者端,消费者这边接受响应的入口跟提供者差不多,也是NettyCodec艾达pter.messageReceived(),经过解码,到NettyHandler.messageReceived()处理:

-->String methodName=sayHello
-->Class<?>[] parameterTypes=[class java.lang.String]
-->Object[] arguments=[world]
-->Map<String, String> attachments={}
public void received(Channel channel, Object message) throws RemotingException { if (message instanceof MultiMessage) { MultiMessage list = (MultiMessage)message; for(Object obj : list) { handler.received(channel, obj); } } else { //HeartbeatHandler handler.received(channel, message); }}

其中Map<String, List<Invoker<T>>>
methodInvokerMap在捌.二创设客户端源码解析已经先河化好了:

DubboInvoker.doInvoke():

先是是获得一个List<Invoker<T>>,之后收获二个LoadBalance,最终调用doInvoke实行调用。

public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception { NettyChannel channel = NettyChannel.getOrAddChannel(ctx.getChannel(), url, handler); try { //NettyClient handler.received(channel, e.getMessage; } finally { NettyChannel.removeChannelIfDisconnected(ctx.getChannel; }}

doInvoke(Invocation invocation, final List<Invoker<T>>
invokers, LoadBalance loadbalance):

public ResponseFuture request(Object request, int timeout) throws RemotingException { if  { throw new RemotingException; } //创建一个请求头 Request req = new Request(); req.setVersion; req.setTwoWay; //这里request参数里面保存着 //methodName = "sayHello" //parameterTypes = {Class[0]@2814} //arguments = {Object[0]@2768} //attachments = {HashMap@2822} size = 4 //invoker = {DubboInvoker@2658} req.setData; DefaultFuture future = new DefaultFuture(channel, req, timeout); try{ //这里的channel是NettyClient //发送请求 channel.send; }catch (RemotingException e) { future.cancel(); throw e; } return future;}

先是来看经过RegistryDirectory.list(Invocation
invocation),该方法在RegistryDirectory的父类AbstractDirectory中:

先看下处理请求,并组织响应音讯:

HeaderExchangeChannel.request

handleResponse方法:

留意:那里能够做劳动降级,后续会说。

此起彼伏进入AllChannelHandler的received方法:

从此以往调用FailoverClusterInvoker.invoke方法,该格局在其父类AbstractClusterInvoker中,

然后正是return currentClient.request(inv,
timeout).get();在DubboInvoker中,那里继续执行,然后实施Filter,最后回来到InvokerInvocationHandler.invoker()方法中,方法获得调用结果,截至!

 1     protected final Directory<T> directory;//RegistryDirectory    
 2     
 3     public Result invoke(final Invocation invocation) throws RpcException {
 4         ...
 5         LoadBalance loadbalance;
 6 
 7         List<Invoker<T>> invokers = list(invocation);
 8         if (invokers != null && invokers.size() > 0) {
 9             loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(invokers.get(0).getUrl()
10                     .getMethodParameter(invocation.getMethodName(), Constants.LOADBALANCE_KEY, Constants.DEFAULT_LOADBALANCE));
11         } else {
12             loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(Constants.DEFAULT_LOADBALANCE);
13         }
14         RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);//异步调用加调用ID
15         return doInvoke(invocation, invokers, loadbalance);
16     }
17 
18     protected List<Invoker<T>> list(Invocation invocation) throws RpcException {
19         List<Invoker<T>> invokers = directory.list(invocation);
20         return invokers;
21     }
public void received(Channel ch, Object msg) throws RemotingException { if  { return; } //MultiMessageHandler handler.received;}

其中ExchangeClient[] clients在八.2创设客户端源码解析曾经被开端化好了:

跻身NettyChannel中,进行响应新闻的出殡:

  • 将呼吁参数(方法名,方法参数类型,方法参数值,服务名,附加参数)封装成二个Invocation
    • 叠加参数中的path:即接口名,将会用于服务端接收请求音讯后从exportMap中选择Exporter实例
    • 形式名,方法参数类型,方法参数值:将用于JavassistProxyFactory$AbstractProxyInvoker执行相应的主意
  • 选拔Directory从Map<String, List<Invoker<T>>>
    methodInvokerMap中获取key为sayHello(内定方法名)的List<Invoker<T>>
  • 使用Router对上述的List<Invoker<T>>再开始展览1遍过滤,得到subList
  • 运用LoadBalancer从subList中再取得四个Invoker,实际上是InvokerDelegete实例
  • 采纳InvokerDelegete实例执行真正的DubboInvoker的listener和filter链,然后实施到真正的DubboInvoker
  • DubboInvoker使用NettyClient向服务端发出了请求
public void received(Channel channel, Object message) throws RemotingException { channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis; ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel; try { if (message instanceof Request) { Request request =  message; if (request.isEvent { handlerEvent(channel, request); } else { if (request.isTwoWay { Response response = handleRequest(exchangeChannel, request); channel.send; } else { handler.received(exchangeChannel, request.getData; } } } else if (message instanceof Response) { //这里处理response消息 handleResponse(channel,  message); } else if (message instanceof String) { if (isClientSide { Exception } else { String echo = handler.telnet(channel,  message); if (echo != null && echo.length { channel.send; } } } else { handler.received(exchangeChannel, message); } } finally { HeaderExchangeChannel.removeChannelIfDisconnected; }}
1     private final Random random = new Random();
2 
3     protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
4         int length = invokers.size(); // 总个数
5         ...//权重计算
6         // 如果权重相同或权重为0则均等随机
7         return invokers.get(random.nextInt(length));
8     }
public void received(Channel channel, Object message) throws RemotingException { channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis; ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel; try { //request类型的消息 if (message instanceof Request) { Request request =  message; if (request.isEvent {//判断心跳还是正常请求 // 处理心跳 handlerEvent(channel, request); } else {//正常的请求 //需要返回 if (request.isTwoWay { //处理请求,并构造响应信息,这在上面已经解析过了 Response response = handleRequest(exchangeChannel, request); //NettyChannel,发送响应信息 channel.send; } else {//不需要返回的处理 handler.received(exchangeChannel, request.getData; } } } else if (message instanceof Response) {//response类型的消息 handleResponse(channel,  message); } else if (message instanceof String) { if (isClientSide { Exception e = new Exception("Dubbo client can not supported string message: " + message + " in channel: " + channel + ", url: " + channel.getUrl; } else {//telnet类型 String echo = handler.telnet(channel,  message); if (echo != null && echo.length { channel.send; } } } else { handler.received(exchangeChannel, message); } } finally { HeaderExchangeChannel.removeChannelIfDisconnected; }}

那里直接重临了。到此就早已选出能够被调用的RegistryDirectory$InvokerDelegete实例子集了。记下来先取得负载均衡器,暗中同意是RandomLoadBalance。最终执行FailoverClusterInvoker.

public void received(Channel channel, Object message) throws RemotingException { channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis; ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel; try { //request类型的消息 if (message instanceof Request) { Request request =  message; if (request.isEvent {//判断心跳还是正常请求 // 处理心跳 handlerEvent(channel, request); } else {//正常的请求 //需要返回 if (request.isTwoWay { //处理请求,并构造响应信息 Response response = handleRequest(exchangeChannel, request); //NettyChannel,发送响应信息 channel.send; } else {//不需要返回的处理 handler.received(exchangeChannel, request.getData; } } } else if (message instanceof Response) {//response类型的消息 handleResponse(channel,  message); } else if (message instanceof String) { if (isClientSide { Exception e = new Exception("Dubbo client can not supported string message: " + message + " in channel: " + channel + ", url: " + channel.getUrl; } else {//telnet类型 String echo = handler.telnet(channel,  message); if (echo != null && echo.length { channel.send; } } } else { handler.received(exchangeChannel, message); } } finally { HeaderExchangeChannel.removeChannelIfDisconnected; }}
1     private final Invoker<T> invoker;//ProtocolFilterWrapper$Invoker
2 
3     public Result invoke(Invocation invocation) throws RpcException {
4         return invoker.invoke(invocation);
5     }

下一场在新线程,Channel伊夫ntRunnable的run方法中进入DecodeHandler:

尔后采用MockClusterInvoker.invoke(Invocation invocation)实行长途调用:

Response handleRequest(ExchangeChannel channel, Request req) throws RemotingException { Response res = new Response(req.getId(), req.getVersion; if (req.isBroken { Object data = req.getData(); String msg; if (data == null) msg = null; else if (data instanceof Throwable) msg = StringUtils.toString((Throwable) data); else msg = data.toString(); res.setErrorMessage("Fail to decode request due to: " + msg); res.setStatus(Response.BAD_REQUEST); return res; } // find handler by message class. Object msg = req.getData(); try { //处理请求数据,handler是DubboProtocol中的new的一个ExchangeHandlerAdapter Object result = handler.reply(channel, msg); res.setStatus(Response.OK); res.setResult; } catch (Throwable e) { } return res;}

这里的handler就是InvokerInvocationHandler

public void send(Object message, boolean sent) throws RemotingException { //AbstractChannel的处理 super.send(message, sent); boolean success = true; int timeout = 0; try { ChannelFuture future = channel.write; if  { timeout = getUrl().getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT); success = future.await; } Throwable cause = future.getCause(); if (cause != null) { throw cause; } } catch (Throwable e) { throw new RemotingException(this, "Failed to send message " + message + " to " + getRemoteAddress() + ", cause: " + e.getMessage; } if(! success) { throw new RemotingException(this, "Failed to send message " + message + " to " + getRemoteAddress() + "in timeout(" + timeout + "ms) limit"); }}

ListenerInvokerWrapper.invoke

消费者还足以通过安装async、sent、return来调动处理逻辑,async指异步依然二只请求,sent指是还是不是等待请求新闻爆发即阻塞等待是不是中标发出请求、return指是还是不是忽略重返值即但方向通讯,一般异步时行使以减小Future对象的创办和管理资金财产。

1     private final ExchangeChannel channel;//HeaderExchangeChannel
2 
3     public ResponseFuture request(Object request, int timeout) throws RemotingException {
4         return channel.request(request, timeout);
5     }

新闻处理完后回来到HeaderExchangeHandler的received方法:

 1     public <T> List<Invoker<T>> route(final List<Invoker<T>> invokers,
 2                                       URL url, final Invocation invocation) throws RpcException {
 3         if (invocation.getAttachments() == null) {
 4             return getNormalInvokers(invokers);
 5         } else {
 6             String value = invocation.getAttachments().get(Constants.INVOCATION_NEED_MOCK);
 7             if (value == null)
 8                 return getNormalInvokers(invokers);
 9             else if (Boolean.TRUE.toString().equalsIgnoreCase(value)) {
10                 return getMockedInvokers(invokers);
11             }
12         }
13         return invokers;
14     }
15 
16     private <T> List<Invoker<T>> getNormalInvokers(final List<Invoker<T>> invokers) {
17         if (!hasMockProviders(invokers)) {
18             return invokers;
19         } else {
20             List<Invoker<T>> sInvokers = new ArrayList<Invoker<T>>(invokers.size());
21             for (Invoker<T> invoker : invokers) {
22                 if (!invoker.getUrl().getProtocol().equals(Constants.MOCK_PROTOCOL)) {
23                     sInvokers.add(invoker);
24                 }
25             }
26             return sInvokers;
27         }
28     }
public Result invoke(Invocation invocation) throws RpcException { Result result = null; //获取mock属性的值,我们没有配置,默认false String value = directory.getUrl().getMethodParameter(invocation.getMethodName(), Constants.MOCK_KEY, Boolean.FALSE.toString; if (value.length() == 0 || value.equalsIgnoreCase{ //这里invoker是FailoverClusterInvoker result = this.invoker.invoke(invocation); } else if (value.startsWith { //force:direct mock result = doMockInvoke(invocation, null); } else { //fail-mock try { result = this.invoker.invoke(invocation); }catch (RpcException e) { if ) { throw e; } else { result = doMockInvoke(invocation, e); } } } return result;}

以往就会实行一多级的filter,那些filter后续会讲,现在径直实施到DubboInvoker.invoke,实际上该办法在其父类AbstractInvoker中,AbstractInvoker又调用了DubboInvoker.doInvoke:

result =
this.invoker.invoke(invocation);那里invoker是FailoverClusterInvoker,会首先进入AbstractClusterInvoker的invoke方法:

 1     private final org.jboss.netty.channel.Channel channel;//NioClientSocketChannel
 2 
 3     public void send(Object message, boolean sent) throws RemotingException {
 4         super.send(message, sent);
 5 
 6         boolean success = true;
 7         int timeout = 0;
 8         try {
 9             ChannelFuture future = channel.write(message);
10             if (sent) {
11                 timeout = getUrl().getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
12                 success = future.await(timeout);
13             }
14             Throwable cause = future.getCause();
15             if (cause != null) {
16                 throw cause;
17             }
18         } catch (Throwable e) {
19             throw new RemotingException(this, "Failed to send message " + message + " to " + getRemoteAddress() + ", cause: " + e.getMessage(), e);
20         }
21 
22         if (!success) {
23             throw new RemotingException(this, "Failed to send message " + message + " to " + getRemoteAddress()
24                     + "in timeout(" + timeout + "ms) limit");
25         }
26     }

先看下getInvoker获取Invoker:

那里依据办法名sayHello取出五个RegistryDirectory$InvokerDelegete实例。最后通过Router实行过滤,那里唯有3个Router,便是MockInvokersSelector。

public Result invoke(Invocation invocation) throws RpcException { return invoker.invoke(invocation);}
Map<String, List<Invoker<T>>> methodInvokerMap={
sayHello=[provider1的RegistryDirectory$InvokerDelegete实例, provider2的RegistryDirectory$InvokerDelegete实例], *=[provider1的RegistryDirectory$InvokerDelegete实例, provider2的RegistryDirectory$InvokerDelegete实例]}

解码之后到HeaderExchangeHandler的received方法:

1     private ExchangeClient client;//HeaderExchangeClient
2 
3     public ResponseFuture request(Object request, int timeout) throws RemotingException {
4         return client.request(request, timeout);
5     }

进入AllChannelHandler:

 1     private final Channel channel;//NettyClient
 2 
 3     public ResponseFuture request(Object request, int timeout) throws RemotingException {
 4         if (closed) {
 5             throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!");
 6         }
 7         // create request.
 8         Request req = new Request();
 9         req.setVersion("2.0.0");
10         req.setTwoWay(true);
11         req.setData(request);
12         DefaultFuture future = new DefaultFuture(channel, req, timeout);
13         try {
14             channel.send(req);
15         } catch (RemotingException e) {
16             future.cancel();
17             throw e;
18         }
19         return future;
20     }
private void decode(Object message) { if (message != null && message instanceof Decodeable) { try { ((Decodeable)message).decode(); } catch (Throwable e) {。。。} // ~ end of catch } // ~ end of if}

上边的channel是NettyClient实例,那里的send实际上是调用其父类AbstractClient的父类AbstractPeer,AbstractPeer调用AbstractClient.send:

大家那边运用的是手拉手调用,看 currentClient.request(inv,
timeout).get();方法,这里的client是ReferenceCountExchangeClient,直接调用HeaderExchangeClient的request方法:

NettyChannel.send

public void received(Channel channel, Object message) throws RemotingException { //不清楚啥意思 if (message instanceof Decodeable) { decode; } //解码请求类型 if (message instanceof Request) { decode(message).getData; } //解码响应类型 if (message instanceof Response) { decode( (message).getResult; } //解码之后到HeaderExchangeHandler中处理 handler.received(channel, message);}
1     private final Invoker<T> invoker;//ListenerInvokerWrapper
2 
3     public Result invoke(Invocation invocation) throws RpcException {
4         return invoker.invoke(invocation);
5     }
public Result invoke(Invocation invocation) throws RpcException { try { //先doInvoke //然后封装成结果返回 return new RpcResult(doInvoke(proxy, invocation.getMethodName(), invocation.getParameterTypes(), invocation.getArguments; } catch (InvocationTargetException e) {。。。}}

HeaderExchangeClient.request

public void received(Channel channel, Object message) throws RemotingException { ExecutorService cexecutor = getExecutorService(); try { cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message)); } catch (Throwable t) { throw new ExecutionException(message, channel, getClass() + " error when process received event .", t); }}
public void received(Channel channel, Object message) throws RemotingException { setReadTimestamp; //心跳请求处理 if (isHeartbeatRequest { Request req =  message; if (req.isTwoWay { Response res = new Response(req.getId(), req.getVersion; res.setEvent(Response.HEARTBEAT_EVENT); channel.send; } return; } //心跳回应消息处理 if (isHeartbeatResponse { return; } //这里是AllChannelHandler handler.received(channel, message);}
public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) { // TODO Wrapper类不能正确处理带$的类名 final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf < 0 ? proxy.getClass; return new AbstractProxyInvoker<T>(proxy, type, url) { @Override protected Object doInvoke(T proxy, String methodName, Class<?>[] parameterTypes, Object[] arguments) throws Throwable { //这里就调用了具体的方法 return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments); } };}

随后进入AbstractProxyInvoker:

public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception { //获取channel NettyChannel channel = NettyChannel.getOrAddChannel(ctx.getChannel(), url, handler); try { //这里handler是NettyServer handler.received(channel, e.getMessage; } finally { NettyChannel.removeChannelIfDisconnected(ctx.getChannel; }}

进入HeaderExchangeChannel的request方法:

跟着会跻身AbstractPeer的received方法:

这里的doInvoke是在JavassistProxyFactory中的AbstractProxyInvoker实例:

//proxy是代理的真实对象//method调用真实对象的方法//args调用真实对象的方法的参数public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { //方法名sayHello String methodName = method.getName(); //参数类型 Class<?>[] parameterTypes = method.getParameterTypes(); if (method.getDeclaringClass() == Object.class) { return method.invoke(invoker, args); } if ("toString".equals(methodName) && parameterTypes.length == 0) { return invoker.toString(); } if ("hashCode".equals(methodName) && parameterTypes.length == 0) { return invoker.hashCode(); } if ("equals".equals(methodName) && parameterTypes.length == 1) { return invoker.equals; } //invoker是MockClusterInvoker //首先new RpcInvocation //然后invoker.invoke //最后recreate //返回结果 return invoker.invoke(new RpcInvocation(method, args)).recreate();}

服务端已经打起头口并监听请求的过来,当服务消费者发送调用请求的时候,经过Netty的处理后会到dubbo中的codec相关办法中先实行解码,入口是NettyCodecAdapter.messageReceived():

Java动态代理,每四个动态代理类都必须要兑现InvocationHandler那一个接口,并且每3个代理类的实例都关涉到了叁个handler,当大家透过代办对象调用二个办法的时候,那一个点子就会被转载为由达成了InvocationHandler那么些接口的类的invoke方法来进展调用。

AbstractClient执行发送:

进入DubboProtocol中的ExchangeHandlerAdapter的replay方法:

channel.send(message,
sent);首先通过AbstractChannel的send方法处理,只是一口咬住不放是还是不是关闭了,然后是NettyChannel的send来持续处理,那里就把音讯发送到服务端了:

FailoverClusterInvoker.doInvoke():

再看执行调用invoker.invoke;,会先进入InvokerWrapper:

进入HeartbeatHandler的received方法:

static void handleResponse(Channel channel, Response response) throws RemotingException { if (response != null && !response.isHeartbeat { DefaultFuture.received(channel, response); }}
public void received(Channel channel, Object message) throws RemotingException { setReadTimestamp; if (isHeartbeatRequest { Request req =  message; if (req.isTwoWay { Response res = new Response(req.getId(), req.getVersion; res.setEvent(Response.HEARTBEAT_EVENT); channel.send; if (logger.isInfoEnabled { int heartbeat = channel.getUrl().getParameter(Constants.HEARTBEAT_KEY, 0); if(logger.isDebugEnabled { logger.debug("Received heartbeat from remote channel " + channel.getRemoteAddress() + ", cause: The channel has no data-transmission exceeds a heartbeat period" + (heartbeat > 0 ? ": " + heartbeat + "ms" : "")); } } } return; } if (isHeartbeatResponse { if (logger.isDebugEnabled { logger.debug( new StringBuilder .append("Receive heartbeat response in thread ") .append(Thread.currentThread().getName .toString; } return; } //AllChannelHandler handler.received(channel, message);}
public void received(Channel channel, Object message) throws RemotingException { //是多消息的话,使用多消息处理器处理 if (message instanceof MultiMessage) { MultiMessage list = (MultiMessage)message; for(Object obj : list) { handler.received(channel, obj); } } else { //这里是HeartbeatHandler handler.received(channel, message); }}

InvokerInvocationHandler完成了InvocationHandler接口,当大家调用helloService.sayHello();的时候,实际上会调用invoke()方法:

protected Result doInvoke(final Invocation invocation) throws Throwable { RpcInvocation inv = (RpcInvocation) invocation; final String methodName = RpcUtils.getMethodName(invocation); inv.setAttachment(Constants.PATH_KEY, getUrl().getPath; inv.setAttachment(Constants.VERSION_KEY, version); ExchangeClient currentClient; //在初始化的时候,引用服务的过程中会保存一个连接到服务端的Client if (clients.length == 1) { currentClient = clients[0]; } else { currentClient = clients[index.getAndIncrement() % clients.length]; } try { //异步标志 boolean isAsync = RpcUtils.isAsync, invocation); //单向标志 boolean isOneway = RpcUtils.isOneway, invocation); int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY,Constants.DEFAULT_TIMEOUT); //单向的,反送完不管结果 if  { boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false); currentClient.send(inv, isSent); RpcContext.getContext().setFuture; return new RpcResult(); } else if  {//异步的,发送完需要得到Future ResponseFuture future = currentClient.request(inv, timeout) ; RpcContext.getContext().setFuture(new FutureAdapter<Object>; return new RpcResult(); } else {//同步调用,我们这里使用的这种方式 RpcContext.getContext().setFuture; //HeaderExchangeClient return  currentClient.request(inv, timeout).get(); } } catch (TimeoutException e) {。。。}}

服务消费者通过开始化之后,得到的是3个动态代理类,InvokerInvocationHandler,包罗MockClusterInvoker,MockClusterInvoker包罗多个RegistryDirectory和FailoverClusterInvoker。

channel.send,首先会调用AbstractPeer的send方法:

public void send(Object message, boolean sent) throws RemotingException { super.send(message, sent); boolean success = true; int timeout = 0; try { //交给netty处理 ChannelFuture future = channel.write; if  { timeout = getUrl().getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT); success = future.await; } Throwable cause = future.getCause(); if (cause != null) { throw cause; } } catch (Throwable e) { throw new RemotingException(this, "Failed to send message " + message + " to " + getRemoteAddress() + ", cause: " + e.getMessage; } if(! success) { throw new RemotingException(this, "Failed to send message " + message + " to " + getRemoteAddress() + "in timeout(" + timeout + "ms) limit"); }}

进去HeartbeatHandler,根据不一样品类举办处理:

劳务提供者开始化完毕现在,对外暴光Exporter。服务消费者开首化完毕之后,获得的是Proxy代理,方法调用的时候就算调用代理。

相关文章