引言 前面的文章中,我们已经详细介绍了服务暴露的相关细节,本文中,我们主要深入介绍服务引用的实现细节,其他 Dubbo 相关文章均收录于 <Dubbo系列文章> 。
引用服务方式 在 Dubbo 中,我们可以通过两种方式引用远程服务。第一种是使用服务直连的方式引用服务,第二种方式是基于注册中心进行引用。服务直连的方式仅适合在调试或测试服务的场景下使用,不适合在线上环境使用。因此,接下来我将重点分析通过注册中心引用服务的过程。从注册中心中获取服务配置只是服务引用过程中的一环,除此之外,服务消费者还需要经历 Invoker 创建、代理类创建等步骤。
Dubbo 服务引用的时机有两个,第一个是在 Spring 容器调用 ReferenceBean 的 afterPropertiesSet 方法时引用服务,第二个是在 ReferenceBean 对应的服务被注入到其他类中时引用。这两个引用服务的时机区别在于,第一个是饿汉式的,第二个是懒汉式的。默认情况下,Dubbo 使用懒汉式引用服务。下面我们按照 Dubbo 默认配置进行分析,整个分析过程从 ReferenceBean 的 getObject 方法开始。当我们的服务被注入到其他类中时,Spring 会第一时间调用 getObject 方法,并由该方法执行服务引用逻辑。按照惯例,在进行具体工作之前,需先进行配置检查与收集工作。接着根据收集到的信息决定服务用的方式,有三种,第一种是引用本地 (JVM) 服务,第二是通过直连方式引用远程服务,第三是通过注册中心引用远程服务。不管是哪种引用方式,最后都会得到一个 Invoker 实例。如果有多个注册中心,多个服务提供者,这个时候会得到一组 Invoker 实例,此时需要通过集群管理类 Cluster 将多个 Invoker 合并成一个实例。合并后的 Invoker 实例已经具备调用本地或远程服务的能力了,但并不能将此实例暴露给用户使用,这会对用户业务代码造成侵入。此时框架还需要通过代理工厂类 (ProxyFactory) 为服务接口生成代理类,并让代理类去调用 Invoker 逻辑。避免了 Dubbo 框架代码对业务代码的侵入,同时也让框架更容易使用。
引用服务要从 createProxy 开始看起。从字面意思上来看,createProxy 似乎只是用于创建代理对象的。但实际上并非如此,该方法还会调用其他方法构建以及合并 Invoker 实例。具体细节如下。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 private T createProxy (Map<String, String> map) { URL tmpUrl = new URL("temp" , "localhost" , 0 , map); final boolean isJvmRefer; if (isInjvm() == null ) { if (url != null && url.length() > 0 ) { isJvmRefer = false ; } else if (InjvmProtocol.getInjvmProtocol().isInjvmRefer(tmpUrl)) { isJvmRefer = true ; } else { isJvmRefer = false ; } } else { isJvmRefer = isInjvm().booleanValue(); } if (isJvmRefer) { URL url = new URL(Constants.LOCAL_PROTOCOL, NetUtils.LOCALHOST, 0 , interfaceClass.getName()).addParameters(map); invoker = refprotocol.refer(interfaceClass, url); } else { if (url != null && url.length() > 0 ) { String[] us = Constants.SEMICOLON_SPLIT_PATTERN.split(url); if (us != null && us.length > 0 ) { for (String u : us) { URL url = URL.valueOf(u); if (url.getPath() == null || url.getPath().length() == 0 ) { url = url.setPath(interfaceName); } if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) { urls.add(url.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map))); } else { urls.add(ClusterUtils.mergeUrl(url, map)); } } } } else { List<URL> us = loadRegistries(false ); if (us != null && !us.isEmpty()) { for (URL u : us) { URL monitorUrl = loadMonitor(u); if (monitorUrl != null ) { map.put(Constants.MONITOR_KEY, URL.encode(monitorUrl.toFullString())); } urls.add(u.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map))); } } if (urls.isEmpty()) { throw new IllegalStateException("No such any registry to reference..." ); } } if (urls.size() == 1 ) { invoker = refprotocol.refer(interfaceClass, urls.get(0 )); } else { List<Invoker<?>> invokers = new ArrayList<Invoker<?>>(); URL registryURL = null ; for (URL url : urls) { invokers.add(refprotocol.refer(interfaceClass, url)); if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) { registryURL = url; } } if (registryURL != null ) { URL u = registryURL.addParameter(Constants.CLUSTER_KEY, AvailableCluster.NAME); invoker = cluster.join(new StaticDirectory(u, invokers)); } else { invoker = cluster.join(new StaticDirectory(invokers)); } } } Boolean c = check; if (c == null && consumer != null ) { c = consumer.isCheck(); } if (c == null ) { c = true ; } if (c && !invoker.isAvailable()) { throw new IllegalStateException("No provider available for the service..." ); } return (T) proxyFactory.getProxy(invoker); }
上面代码很多,不过逻辑比较清晰。首先根据配置检查是否为本地调用,若是,则调用 InjvmProtocol 的 refer 方法生成 InjvmInvoker 实例。若不是,则读取直连配置项,或注册中心 url,并将读取到的 url 存储到 urls 中。然后根据 urls 元素数量进行后续操作。若 urls 元素数量为1,则直接通过 Protocol 自适应拓展类构建 Invoker 实例接口。若 urls 元素数量大于1,即存在多个注册中心或服务直连 url,此时先根据 url 构建 Invoker。然后再通过 Cluster 合并多个 Invoker,最后调用 ProxyFactory 生成代理类。
Invoker 是 Dubbo 的核心模型,代表一个可执行体。在服务提供方,Invoker 用于调用服务提供类。在服务消费方,Invoker 用于执行远程调用。Invoker 是由 Protocol 实现类构建而来。
1 2 3 4 5 6 7 public <T> Invoker<T> refer (Class<T> serviceType, URL url) throws RpcException { optimizeSerialization(url); DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers); invokers.add(invoker); return invoker; }
上面方法看起来比较简单,不过这里有一个调用需要我们注意一下,即 getClients。这个方法用于获取客户端实例,实例类型为 ExchangeClient。ExchangeClient 实际上并不具备通信能力,它需要基于更底层的客户端实例进行通信。比如 NettyClient、MinaClient 等,默认情况下,Dubbo 使用 NettyClient 进行通信。接下来,我们简单看一下 getClients 方法的逻辑。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 private ExchangeClient[] getClients(URL url) { boolean service_share_connect = false ; int connections = url.getParameter(Constants.CONNECTIONS_KEY, 0 ); if (connections == 0 ) { service_share_connect = true ; connections = 1 ; } ExchangeClient[] clients = new ExchangeClient[connections]; for (int i = 0 ; i < clients.length; i++) { if (service_share_connect) { clients[i] = getSharedClient(url); } else { clients[i] = initClient(url); } } return clients; }
这里根据 connections 数量决定是获取共享客户端还是创建新的客户端实例,默认情况下,使用共享客户端实例。getSharedClient 方法中也会调用 initClient 方法,因此下面我们一起看一下这两个方法。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 private ExchangeClient getSharedClient (URL url) { String key = url.getAddress(); ReferenceCountExchangeClient client = referenceClientMap.get(key); if (client != null ) { if (!client.isClosed()) { client.incrementAndGetCount(); return client; } else { referenceClientMap.remove(key); } } locks.putIfAbsent(key, new Object()); synchronized (locks.get(key)) { if (referenceClientMap.containsKey(key)) { return referenceClientMap.get(key); } ExchangeClient exchangeClient = initClient(url); client = new ReferenceCountExchangeClient(exchangeClient, ghostClientMap); referenceClientMap.put(key, client); ghostClientMap.remove(key); locks.remove(key); return client; } }
上面方法先访问缓存,若缓存未命中,则通过 initClient 方法创建新的 ExchangeClient 实例,并将该实例传给 ReferenceCountExchangeClient 构造方法创建一个带有引用计数功能的 ExchangeClient 实例。ReferenceCountExchangeClient 内部实现比较简单,就不分析了。下面我们再来看一下 initClient 方法的代码。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 private ExchangeClient initClient (URL url) { String str = url.getParameter(Constants.CLIENT_KEY, url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_CLIENT)); url = url.addParameter(Constants.CODEC_KEY, DubboCodec.NAME); url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT)); if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) { throw new RpcException("Unsupported client type: ..." ); } ExchangeClient client; try { if (url.getParameter(Constants.LAZY_CONNECT_KEY, false )) { client = new LazyConnectExchangeClient(url, requestHandler); } else { client = Exchangers.connect(url, requestHandler); } } catch (RemotingException e) { throw new RpcException("Fail to create remoting client for service..." ); } return client; }
initClient 方法首先获取用户配置的客户端类型,默认为 netty。然后检测用户配置的客户端类型是否存在,不存在则抛出异常。最后根据 lazy 配置决定创建什么类型的客户端。这里的 LazyConnectExchangeClient 代码并不是很复杂,该类会在 request 方法被调用时通过 Exchangers 的 connect 方法创建 ExchangeClient 客户端,该类的代码本节就不分析了。下面我们分析一下 Exchangers 的 connect 方法。
1 2 3 4 5 6 7 8 9 10 11 public static ExchangeClient connect (URL url, ExchangeHandler handler) throws RemotingException { if (url == null ) { throw new IllegalArgumentException("url == null" ); } if (handler == null ) { throw new IllegalArgumentException("handler == null" ); } url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange" ); return getExchanger(url).connect(url, handler); }
如上,getExchanger 会通过 SPI 加载 HeaderExchangeClient 实例,这个方法比较简单,大家自己看一下吧。接下来分析 HeaderExchangeClient 的实现。
1 2 3 4 5 6 7 8 public ExchangeClient connect (URL url, ExchangeHandler handler) throws RemotingException { return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true ); }
这里的调用比较多,我们这里重点看一下 Transporters 的 connect 方法。如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 public static Client connect (URL url, ChannelHandler... handlers) throws RemotingException { if (url == null ) { throw new IllegalArgumentException("url == null" ); } ChannelHandler handler; if (handlers == null || handlers.length == 0 ) { handler = new ChannelHandlerAdapter(); } else if (handlers.length == 1 ) { handler = handlers[0 ]; } else { handler = new ChannelHandlerDispatcher(handlers); } return getTransporter().connect(url, handler); }
如上,getTransporter 方法返回的是自适应拓展类,该类会在运行时根据客户端类型加载指定的 Transporter 实现类。若用户未配置客户端类型,则默认加载 NettyTransporter,并调用该类的 connect 方法。
到这里就不继续跟下去了,在往下就是通过 Netty 提供的 API 构建 Netty 客户端了,到这里,关于 DubboProtocol 的 refer 方法就分析完了。接下来,继续分析 RegistryProtocol 的 refer 方法逻辑。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 public <T> Invoker<T> refer (Class<T> type, URL url) throws RpcException { url = url.setProtocol(url.getParameter(Constants.REGISTRY_KEY, Constants.DEFAULT_REGISTRY)).removeParameter(Constants.REGISTRY_KEY); Registry registry = registryFactory.getRegistry(url); if (RegistryService.class.equals(type)) { return proxyFactory.getInvoker((T) registry, type, url); } Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(Constants.REFER_KEY)); String group = qs.get(Constants.GROUP_KEY); if (group != null && group.length() > 0 ) { if ((Constants.COMMA_SPLIT_PATTERN.split(group)).length > 1 || "*" .equals(group)) { return doRefer(getMergeableCluster(), registry, type, url); } } return doRefer(cluster, registry, type, url); }
上面代码首先为 url 设置协议头,然后根据 url 参数加载注册中心实例。然后获取 group 配置,根据 group 配置决定 doRefer 第一个参数的类型。这里的重点是 doRefer 方法,如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 private <T> Invoker<T> doRefer (Cluster cluster, Registry registry, Class<T> type, URL url) { RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url); directory.setRegistry(registry); directory.setProtocol(protocol); Map<String, String> parameters = new HashMap<String, String>(directory.getUrl().getParameters()); URL subscribeUrl = new URL(Constants.CONSUMER_PROTOCOL, parameters.remove(Constants.REGISTER_IP_KEY), 0 , type.getName(), parameters); if (!Constants.ANY_VALUE.equals(url.getServiceInterface()) && url.getParameter(Constants.REGISTER_KEY, true )) { registry.register(subscribeUrl.addParameters(Constants.CATEGORY_KEY, Constants.CONSUMERS_CATEGORY, Constants.CHECK_KEY, String.valueOf(false ))); } directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY, Constants.PROVIDERS_CATEGORY + "," + Constants.CONFIGURATORS_CATEGORY + "," + Constants.ROUTERS_CATEGORY)); Invoker invoker = cluster.join(directory); ProviderConsumerRegTable.registerConsumer(invoker, url, subscribeUrl, directory); return invoker; }
如上,doRefer 方法创建一个 RegistryDirectory 实例,然后生成服务者消费者链接,并向注册中心进行注册。注册完毕后,紧接着订阅 providers、configurators、routers 等节点下的数据。完成订阅后,RegistryDirectory 会收到这几个节点下的子节点信息。由于一个服务可能部署在多台服务器上,这样就会在 providers 产生多个节点,这个时候就需要 Cluster 将多个服务节点合并为一个,并生成一个 Invoker。
Invoker 创建完毕后,接下来要做的事情是为服务接口生成代理对象。有了代理对象,即可进行远程调用。代理对象生成的入口方法为 ProxyFactory 的 getProxy,接下来进行分析。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 public <T> T getProxy (Invoker<T> invoker) throws RpcException { return getProxy(invoker, false ); } public <T> T getProxy (Invoker<T> invoker, boolean generic) throws RpcException { Class<?>[] interfaces = null ; String config = invoker.getUrl().getParameter("interfaces" ); if (config != null && config.length() > 0 ) { String[] types = Constants.COMMA_SPLIT_PATTERN.split(config); if (types != null && types.length > 0 ) { interfaces = new Class<?>[types.length + 2 ]; interfaces[0 ] = invoker.getInterface(); interfaces[1 ] = EchoService.class; for (int i = 0 ; i < types.length; i++) { interfaces[i + 1 ] = ReflectUtils.forName(types[i]); } } } if (interfaces == null ) { interfaces = new Class<?>[]{invoker.getInterface(), EchoService.class}; } if (!invoker.getInterface().equals(GenericService.class) && generic) { int len = interfaces.length; Class<?>[] temp = interfaces; interfaces = new Class<?>[len + 1 ]; System.arraycopy(temp, 0 , interfaces, 0 , len); interfaces[len] = GenericService.class; } return getProxy(invoker, interfaces); } public abstract <T> T getProxy (Invoker<T> invoker, Class<?>[] types) ;
如上,上面大段代码都是用来获取 interfaces 数组的,我们继续往下看。getProxy(Invoker, Class<?>[]) 这个方法是一个抽象方法,下面我们到 JavassistProxyFactory 类中看一下该方法的实现代码。
1 2 3 4 public <T> T getProxy (Invoker<T> invoker, Class<?>[] interfaces) { return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker)); }
上面代码并不多,首先是通过 Proxy 的 getProxy 方法获取 Proxy 子类,然后创建 InvokerInvocationHandler 对象,并将该对象传给 newInstance 生成 Proxy 实例。InvokerInvocationHandler 实现自 JDK 的 InvocationHandler 接口,具体的用途是拦截接口类调用。该类逻辑比较简单,这里就不分析了。下面我们重点关注一下 Proxy 的 getProxy 方法,如下。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 public static Proxy getProxy (Class<?>... ics) { return getProxy(ClassHelper.getClassLoader(Proxy.class), ics); } public static Proxy getProxy (ClassLoader cl, Class<?>... ics) { if (ics.length > 65535 ) throw new IllegalArgumentException("interface limit exceeded" ); StringBuilder sb = new StringBuilder(); for (int i = 0 ; i < ics.length; i++) { String itf = ics[i].getName(); if (!ics[i].isInterface()) throw new RuntimeException(itf + " is not a interface." ); Class<?> tmp = null ; try { tmp = Class.forName(itf, false , cl); } catch (ClassNotFoundException e) { } if (tmp != ics[i]) throw new IllegalArgumentException(ics[i] + " is not visible from class loader" ); sb.append(itf).append(';' ); } String key = sb.toString(); Map<String, Object> cache; synchronized (ProxyCacheMap) { cache = ProxyCacheMap.get(cl); if (cache == null ) { cache = new HashMap<String, Object>(); ProxyCacheMap.put(cl, cache); } } Proxy proxy = null ; synchronized (cache) { do { Object value = cache.get(key); if (value instanceof Reference<?>) { proxy = (Proxy) ((Reference<?>) value).get(); if (proxy != null ) { return proxy; } } if (value == PendingGenerationMarker) { try { cache.wait(); } catch (InterruptedException e) { } } else { cache.put(key, PendingGenerationMarker); break ; } } while (true ); } long id = PROXY_CLASS_COUNTER.getAndIncrement(); String pkg = null ; ClassGenerator ccp = null , ccm = null ; try { ccp = ClassGenerator.newInstance(cl); Set<String> worked = new HashSet<String>(); List<Method> methods = new ArrayList<Method>(); for (int i = 0 ; i < ics.length; i++) { if (!Modifier.isPublic(ics[i].getModifiers())) { String npkg = ics[i].getPackage().getName(); if (pkg == null ) { pkg = npkg; } else { if (!pkg.equals(npkg)) throw new IllegalArgumentException("non-public interfaces from different packages" ); } } ccp.addInterface(ics[i]); for (Method method : ics[i].getMethods()) { String desc = ReflectUtils.getDesc(method); if (worked.contains(desc)) continue ; worked.add(desc); int ix = methods.size(); Class<?> rt = method.getReturnType(); Class<?>[] pts = method.getParameterTypes(); StringBuilder code = new StringBuilder("Object[] args = new Object[" ).append(pts.length).append("];" ); for (int j = 0 ; j < pts.length; j++) code.append(" args[" ).append(j).append("] = ($w)$" ).append(j + 1 ).append(";" ); code.append(" Object ret = handler.invoke(this, methods[" + ix + "], args);" ); if (!Void.TYPE.equals(rt)) code.append(" return " ).append(asArgument(rt, "ret" )).append(";" ); methods.add(method); ccp.addMethod(method.getName(), method.getModifiers(), rt, pts, method.getExceptionTypes(), code.toString()); } } if (pkg == null ) pkg = PACKAGE_NAME; String pcn = pkg + ".proxy" + id; ccp.setClassName(pcn); ccp.addField("public static java.lang.reflect.Method[] methods;" ); ccp.addField("private " + InvocationHandler.class.getName() + " handler;"); ccp.addConstructor(Modifier.PUBLIC, new Class<?>[]{InvocationHandler.class}, new Class<?>[0 ], "handler=$1;" ); ccp.addDefaultConstructor(); Class<?> clazz = ccp.toClass(); clazz.getField("methods" ).set(null , methods.toArray(new Method[0 ])); String fcn = Proxy.class.getName() + id; ccm = ClassGenerator.newInstance(cl); ccm.setClassName(fcn); ccm.addDefaultConstructor(); ccm.setSuperClass(Proxy.class); ccm.addMethod("public Object newInstance(" + InvocationHandler.class.getName() + " h){ return new " + pcn + " ($1 ); }"); // 生成 Proxy 实现类 Class<?> pc = ccm.toClass(); // 通过反射创建 Proxy 实例 proxy = (Proxy) pc.newInstance(); } catch (RuntimeException e) { throw e; } catch (Exception e) { throw new RuntimeException(e.getMessage(), e); } finally { if (ccp != null) // 释放资源 ccp.release(); if (ccm != null) ccm.release(); synchronized (cache) { if (proxy == null) cache.remove(key); else // 写缓存 cache.put(key, new WeakReference<Proxy>(proxy)); // 唤醒其他等待线程 cache.notifyAll(); } } return proxy; }
上面代码比较复杂,我们写了大量的注释。大家在阅读这段代码时,要搞清楚 ccp 和 ccm 的用途,不然会被搞晕。ccp 用于为服务接口生成代理类,比如我们有一个 DemoService 接口,这个接口代理类就是由 ccp 生成的。ccm 则是用于为 org.apache.dubbo.common.bytecode.Proxy 抽象类生成子类,主要是实现 Proxy 类的抽象方法。下面以 org.apache.dubbo.demo.DemoService 这个接口为例,来看一下该接口代理类代码大致是怎样的(忽略 EchoService 接口)。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 package org.apache.dubbo.common.bytecode;public class proxy0 implements org .apache .dubbo .demo .DemoService { public static java.lang.reflect.Method[] methods; private java.lang.reflect.InvocationHandler handler; public proxy0 () { } public proxy0 (java.lang.reflect.InvocationHandler arg0) { handler = $1 ; } public java.lang.String sayHello (java.lang.String arg0) { Object[] args = new Object[1 ]; args[0 ] = ($w) $1 ; Object ret = handler.invoke(this , methods[0 ], args); return (java.lang.String) ret; } }
参考内容 [1]《深入理解Apache Dubbo与实战》 [2] dubbo 官方文档