引言 前面的文章中,我们已经详细介绍了服务暴露的相关细节,本文中,我们主要深入介绍服务引用的实现细节,其他 Dubbo 相关文章均收录于 <Dubbo系列文章> 。
引用服务方式 在 Dubbo 中,我们可以通过两种方式引用远程服务。第一种是使用服务直连的方式引用服务,第二种方式是基于注册中心进行引用。服务直连的方式仅适合在调试或测试服务的场景下使用,不适合在线上环境使用。因此,接下来我将重点分析通过注册中心引用服务的过程。从注册中心中获取服务配置只是服务引用过程中的一环,除此之外,服务消费者还需要经历 Invoker 创建、代理类创建等步骤。
Dubbo 服务引用的时机有两个,第一个是在 Spring 容器调用 ReferenceBean 的 afterPropertiesSet 方法时引用服务,第二个是在 ReferenceBean 对应的服务被注入到其他类中时引用。这两个引用服务的时机区别在于,第一个是饿汉式的,第二个是懒汉式的。默认情况下,Dubbo 使用懒汉式引用服务。下面我们按照 Dubbo 默认配置进行分析,整个分析过程从 ReferenceBean 的 getObject 方法开始。当我们的服务被注入到其他类中时,Spring 会第一时间调用 getObject 方法,并由该方法执行服务引用逻辑。按照惯例,在进行具体工作之前,需先进行配置检查与收集工作。接着根据收集到的信息决定服务用的方式,有三种,第一种是引用本地 (JVM) 服务,第二是通过直连方式引用远程服务,第三是通过注册中心引用远程服务。不管是哪种引用方式,最后都会得到一个 Invoker 实例。如果有多个注册中心,多个服务提供者,这个时候会得到一组 Invoker 实例,此时需要通过集群管理类 Cluster 将多个 Invoker 合并成一个实例。合并后的 Invoker 实例已经具备调用本地或远程服务的能力了,但并不能将此实例暴露给用户使用,这会对用户业务代码造成侵入。此时框架还需要通过代理工厂类 (ProxyFactory) 为服务接口生成代理类,并让代理类去调用 Invoker 逻辑。避免了 Dubbo 框架代码对业务代码的侵入,同时也让框架更容易使用。
引用服务要从 createProxy 开始看起。从字面意思上来看,createProxy 似乎只是用于创建代理对象的。但实际上并非如此,该方法还会调用其他方法构建以及合并 Invoker 实例。具体细节如下。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 private T createProxy (Map<String, String> map) { URL tmpUrl = new URL("temp" , "localhost" , 0 , map); final boolean isJvmRefer; if (isInjvm() == null ) { if (url != null && url.length() > 0 ) { isJvmRefer = false ; } else if (InjvmProtocol.getInjvmProtocol().isInjvmRefer(tmpUrl)) { isJvmRefer = true ; } else { isJvmRefer = false ; } } else { isJvmRefer = isInjvm().booleanValue(); } if (isJvmRefer) { URL url = new URL(Constants.LOCAL_PROTOCOL, NetUtils.LOCALHOST, 0 , interfaceClass.getName()).addParameters(map); invoker = refprotocol.refer(interfaceClass, url); } else { if (url != null && url.length() > 0 ) { String[] us = Constants.SEMICOLON_SPLIT_PATTERN.split(url); if (us != null && us.length > 0 ) { for (String u : us) { URL url = URL.valueOf(u); if (url.getPath() == null || url.getPath().length() == 0 ) { url = url.setPath(interfaceName); } if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) { urls.add(url.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map))); } else { urls.add(ClusterUtils.mergeUrl(url, map)); } } } } else { List<URL> us = loadRegistries(false ); if (us != null && !us.isEmpty()) { for (URL u : us) { URL monitorUrl = loadMonitor(u); if (monitorUrl != null ) { map.put(Constants.MONITOR_KEY, URL.encode(monitorUrl.toFullString())); } urls.add(u.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map))); } } if (urls.isEmpty()) { throw new IllegalStateException("No such any registry to reference..." ); } } if (urls.size() == 1 ) { invoker = refprotocol.refer(interfaceClass, urls.get(0 )); } else { List<Invoker<?>> invokers = new ArrayList<Invoker<?>>(); URL registryURL = null ; for (URL url : urls) { invokers.add(refprotocol.refer(interfaceClass, url)); if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) { registryURL = url; } } if (registryURL != null ) { URL u = registryURL.addParameter(Constants.CLUSTER_KEY, AvailableCluster.NAME); invoker = cluster.join(new StaticDirectory(u, invokers)); } else { invoker = cluster.join(new StaticDirectory(invokers)); } } } Boolean c = check; if (c == null && consumer != null ) { c = consumer.isCheck(); } if (c == null ) { c = true ; } if (c && !invoker.isAvailable()) { throw new IllegalStateException("No provider available for the service..." ); } return (T) proxyFactory.getProxy(invoker); }
上面代码很多,不过逻辑比较清晰。首先根据配置检查是否为本地调用,若是,则调用 InjvmProtocol 的 refer 方法生成 InjvmInvoker 实例。若不是,则读取直连配置项,或注册中心 url,并将读取到的 url 存储到 urls 中。然后根据 urls 元素数量进行后续操作。若 urls 元素数量为1,则直接通过 Protocol 自适应拓展类构建 Invoker 实例接口。若 urls 元素数量大于1,即存在多个注册中心或服务直连 url,此时先根据 url 构建 Invoker。然后再通过 Cluster 合并多个 Invoker,最后调用 ProxyFactory 生成代理类。
Invoker 是 Dubbo 的核心模型,代表一个可执行体。在服务提供方,Invoker 用于调用服务提供类。在服务消费方,Invoker 用于执行远程调用。Invoker 是由 Protocol 实现类构建而来。
1 2 3 4 5 6 7 public <T> Invoker<T> refer (Class<T> serviceType, URL url) throws RpcException { optimizeSerialization(url); DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers); invokers.add(invoker); return invoker; }
上面方法看起来比较简单,不过这里有一个调用需要我们注意一下,即 getClients。这个方法用于获取客户端实例,实例类型为 ExchangeClient。ExchangeClient 实际上并不具备通信能力,它需要基于更底层的客户端实例进行通信。比如 NettyClient、MinaClient 等,默认情况下,Dubbo 使用 NettyClient 进行通信。接下来,我们简单看一下 getClients 方法的逻辑。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 private ExchangeClient[] getClients(URL url) { boolean service_share_connect = false ; int connections = url.getParameter(Constants.CONNECTIONS_KEY, 0 ); if (connections == 0 ) { service_share_connect = true ; connections = 1 ; } ExchangeClient[] clients = new ExchangeClient[connections]; for (int i = 0 ; i < clients.length; i++) { if (service_share_connect) { clients[i] = getSharedClient(url); } else { clients[i] = initClient(url); } } return clients; }
这里根据 connections 数量决定是获取共享客户端还是创建新的客户端实例,默认情况下,使用共享客户端实例。getSharedClient 方法中也会调用 initClient 方法,因此下面我们一起看一下这两个方法。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 private ExchangeClient getSharedClient (URL url) { String key = url.getAddress(); ReferenceCountExchangeClient client = referenceClientMap.get(key); if (client != null ) { if (!client.isClosed()) { client.incrementAndGetCount(); return client; } else { referenceClientMap.remove(key); } } locks.putIfAbsent(key, new Object()); synchronized (locks.get(key)) { if (referenceClientMap.containsKey(key)) { return referenceClientMap.get(key); } ExchangeClient exchangeClient = initClient(url); client = new ReferenceCountExchangeClient(exchangeClient, ghostClientMap); referenceClientMap.put(key, client); ghostClientMap.remove(key); locks.remove(key); return client; } }
上面方法先访问缓存,若缓存未命中,则通过 initClient 方法创建新的 ExchangeClient 实例,并将该实例传给 ReferenceCountExchangeClient 构造方法创建一个带有引用计数功能的 ExchangeClient 实例。ReferenceCountExchangeClient 内部实现比较简单,就不分析了。下面我们再来看一下 initClient 方法的代码。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 private ExchangeClient initClient (URL url) { String str = url.getParameter(Constants.CLIENT_KEY, url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_CLIENT)); url = url.addParameter(Constants.CODEC_KEY, DubboCodec.NAME); url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT)); if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) { throw new RpcException("Unsupported client type: ..." ); } ExchangeClient client; try { if (url.getParameter(Constants.LAZY_CONNECT_KEY, false )) { client = new LazyConnectExchangeClient(url, requestHandler); } else { client = Exchangers.connect(url, requestHandler); } } catch (RemotingException e) { throw new RpcException("Fail to create remoting client for service..." ); } return client; }
initClient 方法首先获取用户配置的客户端类型,默认为 netty。然后检测用户配置的客户端类型是否存在,不存在则抛出异常。最后根据 lazy 配置决定创建什么类型的客户端。这里的 LazyConnectExchangeClient 代码并不是很复杂,该类会在 request 方法被调用时通过 Exchangers 的 connect 方法创建 ExchangeClient 客户端,该类的代码本节就不分析了。下面我们分析一下 Exchangers 的 connect 方法。
1 2 3 4 5 6 7 8 9 10 11 public static ExchangeClient connect (URL url, ExchangeHandler handler) throws RemotingException { if (url == null ) { throw new IllegalArgumentException("url == null" ); } if (handler == null ) { throw new IllegalArgumentException("handler == null" ); } url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange" ); return getExchanger(url).connect(url, handler); }
如上,getExchanger 会通过 SPI 加载 HeaderExchangeClient 实例,这个方法比较简单,大家自己看一下吧。接下来分析 HeaderExchangeClient 的实现。
1 2 3 4 5 6 7 8 public ExchangeClient connect (URL url, ExchangeHandler handler) throws RemotingException { return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true ); }
这里的调用比较多,我们这里重点看一下 Transporters 的 connect 方法。如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 public static Client connect (URL url, ChannelHandler... handlers) throws RemotingException { if (url == null ) { throw new IllegalArgumentException("url == null" ); } ChannelHandler handler; if (handlers == null || handlers.length == 0 ) { handler = new ChannelHandlerAdapter(); } else if (handlers.length == 1 ) { handler = handlers[0 ]; } else { handler = new ChannelHandlerDispatcher(handlers); } return getTransporter().connect(url, handler); }
如上,getTransporter 方法返回的是自适应拓展类,该类会在运行时根据客户端类型加载指定的 Transporter 实现类。若用户未配置客户端类型,则默认加载 NettyTransporter,并调用该类的 connect 方法。
到这里就不继续跟下去了,在往下就是通过 Netty 提供的 API 构建 Netty 客户端了,到这里,关于 DubboProtocol 的 refer 方法就分析完了。接下来,继续分析 RegistryProtocol 的 refer 方法逻辑。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 public <T> Invoker<T> refer (Class<T> type, URL url) throws RpcException { url = url.setProtocol(url.getParameter(Constants.REGISTRY_KEY, Constants.DEFAULT_REGISTRY)).removeParameter(Constants.REGISTRY_KEY); Registry registry = registryFactory.getRegistry(url); if (RegistryService.class.equals(type)) { return proxyFactory.getInvoker((T) registry, type, url); } Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(Constants.REFER_KEY)); String group = qs.get(Constants.GROUP_KEY); if (group != null && group.length() > 0 ) { if ((Constants.COMMA_SPLIT_PATTERN.split(group)).length > 1 || "*" .equals(group)) { return doRefer(getMergeableCluster(), registry, type, url); } } return doRefer(cluster, registry, type, url); }
上面代码首先为 url 设置协议头,然后根据 url 参数加载注册中心实例。然后获取 group 配置,根据 group 配置决定 doRefer 第一个参数的类型。这里的重点是 doRefer 方法,如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 private <T> Invoker<T> doRefer (Cluster cluster, Registry registry, Class<T> type, URL url) { RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url); directory.setRegistry(registry); directory.setProtocol(protocol); Map<String, String> parameters = new HashMap<String, String>(directory.getUrl().getParameters()); URL subscribeUrl = new URL(Constants.CONSUMER_PROTOCOL, parameters.remove(Constants.REGISTER_IP_KEY), 0 , type.getName(), parameters); if (!Constants.ANY_VALUE.equals(url.getServiceInterface()) && url.getParameter(Constants.REGISTER_KEY, true )) { registry.register(subscribeUrl.addParameters(Constants.CATEGORY_KEY, Constants.CONSUMERS_CATEGORY, Constants.CHECK_KEY, String.valueOf(false ))); } directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY, Constants.PROVIDERS_CATEGORY + "," + Constants.CONFIGURATORS_CATEGORY + "," + Constants.ROUTERS_CATEGORY)); Invoker invoker = cluster.join(directory); ProviderConsumerRegTable.registerConsumer(invoker, url, subscribeUrl, directory); return invoker; }
如上,doRefer 方法创建一个 RegistryDirectory 实例,然后生成服务者消费者链接,并向注册中心进行注册。注册完毕后,紧接着订阅 providers、configurators、routers 等节点下的数据。完成订阅后,RegistryDirectory 会收到这几个节点下的子节点信息。由于一个服务可能部署在多台服务器上,这样就会在 providers 产生多个节点,这个时候就需要 Cluster 将多个服务节点合并为一个,并生成一个 Invoker。
Invoker 创建完毕后,接下来要做的事情是为服务接口生成代理对象。有了代理对象,即可进行远程调用。代理对象生成的入口方法为 ProxyFactory 的 getProxy,接下来进行分析。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 public <T> T getProxy (Invoker<T> invoker) throws RpcException { return getProxy(invoker, false ); } public <T> T getProxy (Invoker<T> invoker, boolean generic) throws RpcException { Class<?>[] interfaces = null ; String config = invoker.getUrl().getParameter("interfaces" ); if (config != null && config.length() > 0 ) { String[] types = Constants.COMMA_SPLIT_PATTERN.split(config); if (types != null && types.length > 0 ) { interfaces = new Class<?>[types.length + 2 ]; interfaces[0 ] = invoker.getInterface(); interfaces[1 ] = EchoService.class; for (int i = 0 ; i < types.length; i++) { interfaces[i + 1 ] = ReflectUtils.forName(types[i]); } } } if (interfaces == null ) { interfaces = new Class<?>[]{invoker.getInterface(), EchoService.class}; } if (!invoker.getInterface().equals(GenericService.class) && generic) { int len = interfaces.length; Class<?>[] temp = interfaces; interfaces = new Class<?>[len + 1 ]; System.arraycopy(temp, 0 , interfaces, 0 , len); interfaces[len] = GenericService.class; } return getProxy(invoker, interfaces); } public abstract <T> T getProxy (Invoker<T> invoker, Class<?>[] types) ;
如上,上面大段代码都是用来获取 interfaces 数组的,我们继续往下看。getProxy(Invoker, Class<?>[]) 这个方法是一个抽象方法,下面我们到 JavassistProxyFactory 类中看一下该方法的实现代码。
1 2 3 4 public <T> T getProxy (Invoker<T> invoker, Class<?>[] interfaces) { return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker)); }
上面代码并不多,首先是通过 Proxy 的 getProxy 方法获取 Proxy 子类,然后创建 InvokerInvocationHandler 对象,并将该对象传给 newInstance 生成 Proxy 实例。InvokerInvocationHandler 实现自 JDK 的 InvocationHandler 接口,具体的用途是拦截接口类调用。该类逻辑比较简单,这里就不分析了。下面我们重点关注一下 Proxy 的 getProxy 方法,如下。
public static Proxy getProxy (Class<?>... ics) { return getProxy(ClassHelper.getClassLoader(Proxy.class), ics); } public static Proxy getProxy (ClassLoader cl, Class<?>... ics) { if (ics.length > 65535 ) throw new IllegalArgumentException("interface limit exceeded" ); StringBuilder sb = new StringBuilder(); for (int i = 0 ; i < ics.length; i++) { String itf = ics[i].getName(); if (!ics[i].isInterface()) throw new RuntimeException(itf + " is not a interface." ); Class<?> tmp = null ; try { tmp = Class.forName(itf, false , cl); } catch (ClassNotFoundException e) { } if (tmp != ics[i]) throw new IllegalArgumentException(ics[i] + " is not visible from class loader" ); sb.append(itf).append(';' ); } String key = sb.toString(); Map<String, Object> cache; synchronized (ProxyCacheMap) { cache = ProxyCacheMap.get(cl); if (cache == null ) { cache = new HashMap<String, Object>(); ProxyCacheMap.put(cl, cache); } } Proxy proxy = null ; synchronized (cache) { do { Object value = cache.get(key); if (value instanceof Reference<?>) { proxy = (Proxy) ((Reference<?>) value).get(); if (proxy != null ) { return proxy; } } if (value == PendingGenerationMarker) { try { cache.wait(); } catch (InterruptedException e) { } } else { cache.put(key, PendingGenerationMarker); break ; } } while (true ); } long id = PROXY_CLASS_COUNTER.getAndIncrement(); String pkg = null ; ClassGenerator ccp = null , ccm = null ; try { ccp = ClassGenerator.newInstance(cl); Set<String> worked = new HashSet<String>(); List<Method> methods = new ArrayList<Method>(); for (int i = 0 ; i < ics.length; i++) { if (!Modifier.isPublic(ics[i].getModifiers())) { String npkg = ics[i].getPackage().getName(); if (pkg == null ) { pkg = npkg; } else { if (!pkg.equals(npkg)) throw new IllegalArgumentException("non-public interfaces from different packages" ); } } ccp.addInterface(ics[i]); for (Method method : ics[i].getMethods()) { String desc = ReflectUtils.getDesc(method); if (worked.contains(desc)) continue ; worked.add(desc); int ix = methods.size(); Class<?> rt = method.getReturnType(); Class<?>[] pts = method.getParameterTypes(); StringBuilder code = new StringBuilder("Object[] args = new Object[" ).append(pts.length).append("];" ); for (int j = 0 ; j < pts.length; j++) code.append(" args[" ).append(j).append("] = ($w)$" ).append(j + 1 ).append(";" ); code.append(" Object ret = handler.invoke(this, methods[" + ix + "], args);" ); if (!Void.TYPE.equals(rt)) code.append(" return " ).append(asArgument(rt, "ret" )).append(";" ); methods.add(method); ccp.addMethod(method.getName(), method.getModifiers(), rt, pts, method.getExceptionTypes(), code.toString()); } } if (pkg == null ) pkg = PACKAGE_NAME; String pcn = pkg + ".proxy" + id; ccp.setClassName(pcn); ccp.addField("public static java.lang.reflect.Method[] methods;" ); ccp.addField("private " + InvocationHandler.class.getName() + " handler;"); ccp.addConstructor(Modifier.PUBLIC, new Class<?>[]{InvocationHandler.class}, new Class<?>[0 ], "handler=$1;" ); ccp.addDefaultConstructor(); Class<?> clazz = ccp.toClass(); clazz.getField("methods" ).set(null , methods.toArray(new Method[0 ])); String fcn = Proxy.class.getName() + id; ccm = ClassGenerator.newInstance(cl); ccm.setClassName(fcn); ccm.addDefaultConstructor(); ccm.setSuperClass(Proxy.class); ccm.addMethod("public Object newInstance(" + InvocationHandler.class.getName() + " h){ return new " + pcn + " ($1 ); }"); // 生成 Proxy 实现类 Class<?> pc = ccm.toClass(); // 通过反射创建 Proxy 实例 proxy = (Proxy) pc.newInstance(); } catch (RuntimeException e) { throw e; } catch (Exception e) { throw new RuntimeException(e.getMessage(), e); } finally { if (ccp != null) // 释放资源 ccp.release(); if (ccm != null) ccm.release(); synchronized (cache) { if (proxy == null) cache.remove(key); else // 写缓存 cache.put(key, new WeakReference<Proxy>(proxy)); // 唤醒其他等待线程 cache.notifyAll(); } } return proxy; }
上面代码比较复杂,我们写了大量的注释。大家在阅读这段代码时,要搞清楚 ccp 和 ccm 的用途,不然会被搞晕。ccp 用于为服务接口生成代理类,比如我们有一个 DemoService 接口,这个接口代理类就是由 ccp 生成的。ccm 则是用于为 org.apache.dubbo.common.bytecode.Proxy 抽象类生成子类,主要是实现 Proxy 类的抽象方法。下面以 org.apache.dubbo.demo.DemoService 这个接口为例,来看一下该接口代理类代码大致是怎样的(忽略 EchoService 接口)。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 package org.apache.dubbo.common.bytecode;public class proxy0 implements org .apache .dubbo .demo .DemoService { public static java.lang.reflect.Method[] methods; private java.lang.reflect.InvocationHandler handler; public proxy0 () { } public proxy0 (java.lang.reflect.InvocationHandler arg0) { handler = $1 ; } public java.lang.String sayHello (java.lang.String arg0) { Object[] args = new Object[1 ]; args[0 ] = ($w) $1 ; Object ret = handler.invoke(this , methods[0 ], args); return (java.lang.String) ret; } }
参考内容 [1]《深入理解Apache Dubbo与实战》 [2] dubbo 官方文档