引言
在 Dubbo 系列文章的最后,我们回过头来看一下整个 RPC 过程是如何运作起来的,本文着重介绍整个调用链路中 Consumer 发送请求的执行过程,其他 Dubbo 相关文章均收录于 <Dubbo系列文章>。
消费方发送请求
本节我们来看一下同步调用模式下,服务消费方是如何发送调用请求的。在深入分析源码前,我们先来看一张图。

这张图展示了服务消费方发送请求过程的部分调用栈,略为复杂。从上图可以看出,经过多次调用后,才将请求数据送至 Netty NioClientSocketChannel。这样做的原因是通过 Exchange 层为框架引入 Request 和 Response 语义,这一点会在接下来的源码分析过程中会看到。其他的就不多说了,下面开始进行分析。首先分析 ReferenceCountExchangeClient 的源码。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47
| final class ReferenceCountExchangeClient implements ExchangeClient {
private final URL url; private final AtomicInteger referenceCount = new AtomicInteger(0);
public ReferenceCountExchangeClient(ExchangeClient client, ConcurrentMap<String, LazyConnectExchangeClient> ghostClientMap) { this.client = client; referenceCount.incrementAndGet(); this.url = client.getUrl();
}
@Override public ResponseFuture request(Object request) throws RemotingException { return client.request(request); }
@Override public ResponseFuture request(Object request, int timeout) throws RemotingException { return client.request(request, timeout); }
public void incrementAndGetCount() { referenceCount.incrementAndGet(); }
@Override public void close(int timeout) { if (referenceCount.decrementAndGet() <= 0) { if (timeout == 0) { client.close(); } else { client.close(timeout); } client = replaceWithLazyClient(); } }
}
|
ReferenceCountExchangeClient 内部定义了一个引用计数变量 referenceCount,每当该对象被引用一次 referenceCount 都会进行自增。每当 close 方法被调用时,referenceCount 进行自减。ReferenceCountExchangeClient 内部仅实现了一个引用计数的功能,其他方法并无复杂逻辑,均是直接调用被装饰对象的相关方法。所以这里就不多说了,继续向下分析,这次是 HeaderExchangeClient。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84
| public class HeaderExchangeClient implements ExchangeClient {
private static final ScheduledThreadPoolExecutor scheduled = new ScheduledThreadPoolExecutor(2, new NamedThreadFactory("dubbo-remoting-client-heartbeat", true)); private final Client client; private final ExchangeChannel channel; private ScheduledFuture<?> heartbeatTimer; private int heartbeat; private int heartbeatTimeout;
public HeaderExchangeClient(Client client, boolean needHeartbeat) { if (client == null) { throw new IllegalArgumentException("client == null"); } this.client = client;
this.channel = new HeaderExchangeChannel(client);
String dubbo = client.getUrl().getParameter(Constants.DUBBO_VERSION_KEY); this.heartbeat = client.getUrl().getParameter(Constants.HEARTBEAT_KEY, dubbo != null && dubbo.startsWith("1.0.") ? Constants.DEFAULT_HEARTBEAT : 0); this.heartbeatTimeout = client.getUrl().getParameter(Constants.HEARTBEAT_TIMEOUT_KEY, heartbeat * 3); if (heartbeatTimeout < heartbeat * 2) { throw new IllegalStateException("heartbeatTimeout < heartbeatInterval * 2"); } if (needHeartbeat) { startHeartbeatTimer(); } }
@Override public ResponseFuture request(Object request) throws RemotingException { return channel.request(request); }
@Override public ResponseFuture request(Object request, int timeout) throws RemotingException { return channel.request(request, timeout); }
@Override public void close() { doClose(); channel.close(); }
private void doClose() { stopHeartbeatTimer(); }
private void startHeartbeatTimer() { stopHeartbeatTimer(); if (heartbeat > 0) { heartbeatTimer = scheduled.scheduleWithFixedDelay( new HeartBeatTask(new HeartBeatTask.ChannelProvider() { @Override public Collection<Channel> getChannels() { return Collections.<Channel>singletonList(HeaderExchangeClient.this); } }, heartbeat, heartbeatTimeout), heartbeat, heartbeat, TimeUnit.MILLISECONDS); } }
private void stopHeartbeatTimer() { if (heartbeatTimer != null && !heartbeatTimer.isCancelled()) { try { heartbeatTimer.cancel(true); scheduled.purge(); } catch (Throwable e) { if (logger.isWarnEnabled()) { logger.warn(e.getMessage(), e); } } } heartbeatTimer = null; }
}
|
HeaderExchangeClient 中很多方法只有一行代码,即调用 HeaderExchangeChannel 对象的同签名方法。那 HeaderExchangeClient 有什么用处呢?答案是封装了一些关于心跳检测的逻辑。心跳检测并非本文所关注的点,因此就不多说了,继续向下看。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42
| final class HeaderExchangeChannel implements ExchangeChannel {
private final Channel channel;
HeaderExchangeChannel(Channel channel) { if (channel == null) { throw new IllegalArgumentException("channel == null"); } this.channel = channel; }
@Override public ResponseFuture request(Object request) throws RemotingException { return request(request, channel.getUrl().getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT)); }
@Override public ResponseFuture request(Object request, int timeout) throws RemotingException { if (closed) { throw new RemotingException(..., "Failed to send request ...); } // 创建 Request 对象 Request req = new Request(); req.setVersion(Version.getProtocolVersion()); // 设置双向通信标志为 true req.setTwoWay(true); // 这里的 request 变量类型为 RpcInvocation req.setData(request); // 创建 DefaultFuture 对象 DefaultFuture future = new DefaultFuture(channel, req, timeout); try { // 调用 NettyClient 的 send 方法发送请求 channel.send(req); } catch (RemotingException e) { future.cancel(); throw e; } // 返回 DefaultFuture 对象 return future; } }
|
到这里大家终于看到了 Request 语义了,上面的方法首先定义了一个 Request 对象,然后再将该对象传给 NettyClient 的 send 方法,进行后续的调用。需要说明的是,NettyClient 中并未实现 send 方法,该方法继承自父类 AbstractPeer,下面直接分析 AbstractPeer 的代码。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
| public abstract class AbstractPeer implements Endpoint, ChannelHandler {
@Override public void send(Object message) throws RemotingException { send(message, url.getParameter(Constants.SENT_KEY, false)); }
}
public abstract class AbstractClient extends AbstractEndpoint implements Client {
@Override public void send(Object message, boolean sent) throws RemotingException { if (send_reconnect && !isConnected()) { connect(); }
Channel channel = getChannel(); if (channel == null || !channel.isConnected()) { throw new RemotingException(this, "message can not send ..."); }
channel.send(message, sent); }
protected abstract Channel getChannel();
}
|
默认情况下,Dubbo 使用 Netty 作为底层的通信框架,因此下面我们到 NettyClient 类中看一下 getChannel 方法的实现逻辑。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52
| public class NettyClient extends AbstractClient {
private volatile Channel channel;
@Override protected com.alibaba.dubbo.remoting.Channel getChannel() { Channel c = channel; if (c == null || !c.isConnected()) return null; return NettyChannel.getOrAddChannel(c, getUrl(), this); } }
final class NettyChannel extends AbstractChannel {
private static final ConcurrentMap<org.jboss.netty.channel.Channel, NettyChannel> channelMap = new ConcurrentHashMap<org.jboss.netty.channel.Channel, NettyChannel>();
private final org.jboss.netty.channel.Channel channel;
private NettyChannel(org.jboss.netty.channel.Channel channel, URL url, ChannelHandler handler) { super(url, handler); if (channel == null) { throw new IllegalArgumentException("netty channel == null;"); } this.channel = channel; }
static NettyChannel getOrAddChannel(org.jboss.netty.channel.Channel ch, URL url, ChannelHandler handler) { if (ch == null) { return null; }
NettyChannel ret = channelMap.get(ch); if (ret == null) { NettyChannel nc = new NettyChannel(ch, url, handler); if (ch.isConnected()) { ret = channelMap.putIfAbsent(ch, nc); } if (ret == null) { ret = nc; } } return ret; } }
|
获取到 NettyChannel 实例后,即可进行后续的调用。下面看一下 NettyChannel 的 send 方法。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| public void send(Object message, boolean sent) throws RemotingException { super.send(message, sent);
boolean success = true; int timeout = 0; try { ChannelFuture future = channel.write(message);
if (sent) { timeout = getUrl().getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT); success = future.await(timeout); } Throwable cause = future.getCause(); if (cause != null) { throw cause; } } catch (Throwable e) { throw new RemotingException(this, "Failed to send message ..."); }
if (!success) { throw new RemotingException(this, "Failed to send message ..."); } }
|
经历多次调用,到这里请求数据的发送过程就结束了,过程漫长。为了便于大家阅读代码,这里以 DemoService 为例,将 sayHello 方法的整个调用路径贴出来。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| proxy0#sayHello(String) —> InvokerInvocationHandler#invoke(Object, Method, Object[]) —> MockClusterInvoker#invoke(Invocation) —> AbstractClusterInvoker#invoke(Invocation) —> FailoverClusterInvoker#doInvoke(Invocation, List<Invoker<T>>, LoadBalance) —> Filter#invoke(Invoker, Invocation) // 包含多个 Filter 调用 —> ListenerInvokerWrapper#invoke(Invocation) —> AbstractInvoker#invoke(Invocation) —> DubboInvoker#doInvoke(Invocation) —> ReferenceCountExchangeClient#request(Object, int) —> HeaderExchangeClient#request(Object, int) —> HeaderExchangeChannel#request(Object, int) —> AbstractPeer#send(Object) —> AbstractClient#send(Object, boolean) —> NettyChannel#send(Object, boolean) —> NioClientSocketChannel#write(Object)
|
在 Netty 中,出站数据在发出之前还需要进行编码操作,接下来我们来分析一下请求数据的编码逻辑。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94
| public class ExchangeCodec extends TelnetCodec {
protected static final int HEADER_LENGTH = 16; protected static final short MAGIC = (short) 0xdabb; protected static final byte MAGIC_HIGH = Bytes.short2bytes(MAGIC)[0]; protected static final byte MAGIC_LOW = Bytes.short2bytes(MAGIC)[1]; protected static final byte FLAG_REQUEST = (byte) 0x80; protected static final byte FLAG_TWOWAY = (byte) 0x40; protected static final byte FLAG_EVENT = (byte) 0x20; protected static final int SERIALIZATION_MASK = 0x1f; private static final Logger logger = LoggerFactory.getLogger(ExchangeCodec.class);
public Short getMagicCode() { return MAGIC; }
@Override public void encode(Channel channel, ChannelBuffer buffer, Object msg) throws IOException { if (msg instanceof Request) { encodeRequest(channel, buffer, (Request) msg); } else if (msg instanceof Response) { encodeResponse(channel, buffer, (Response) msg); } else { super.encode(channel, buffer, msg); } }
protected void encodeRequest(Channel channel, ChannelBuffer buffer, Request req) throws IOException { Serialization serialization = getSerialization(channel);
byte[] header = new byte[HEADER_LENGTH];
Bytes.short2bytes(MAGIC, header);
header[2] = (byte) (FLAG_REQUEST | serialization.getContentTypeId());
if (req.isTwoWay()) { header[2] |= FLAG_TWOWAY; }
if (req.isEvent()) { header[2] |= FLAG_EVENT; }
Bytes.long2bytes(req.getId(), header, 4);
int savedWriteIndex = buffer.writerIndex(); buffer.writerIndex(savedWriteIndex + HEADER_LENGTH); ChannelBufferOutputStream bos = new ChannelBufferOutputStream(buffer); ObjectOutput out = serialization.serialize(channel.getUrl(), bos); if (req.isEvent()) { encodeEventData(channel, out, req.getData()); } else { encodeRequestData(channel, out, req.getData(), req.getVersion()); } out.flushBuffer(); if (out instanceof Cleanable) { ((Cleanable) out).cleanup(); } bos.flush(); bos.close();
int len = bos.writtenBytes(); checkPayload(channel, len);
Bytes.int2bytes(len, header, 12);
buffer.writerIndex(savedWriteIndex); buffer.writeBytes(header); buffer.writerIndex(savedWriteIndex + HEADER_LENGTH + len); }
}
|
以上就是请求对象的编码过程,该过程首先会通过位运算将消息头写入到 header 数组中。然后对 Request 对象的 data 字段执行序列化操作,序列化后的数据最终会存储到 ChannelBuffer 中。序列化操作执行完后,可得到数据序列化后的长度 len,紧接着将 len 写入到 header 指定位置处。最后再将消息头字节数组 header 写入到 ChannelBuffer 中,整个编码过程就结束了。本节的最后,我们再来看一下 Request 对象的 data 字段序列化过程,也就是 encodeRequestData 方法的逻辑,如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| public class DubboCodec extends ExchangeCodec implements Codec2 {
protected void encodeRequestData(Channel channel, ObjectOutput out, Object data, String version) throws IOException { RpcInvocation inv = (RpcInvocation) data;
out.writeUTF(version); out.writeUTF(inv.getAttachment(Constants.PATH_KEY)); out.writeUTF(inv.getAttachment(Constants.VERSION_KEY));
out.writeUTF(inv.getMethodName()); out.writeUTF(ReflectUtils.getDesc(inv.getParameterTypes())); Object[] args = inv.getArguments(); if (args != null) for (int i = 0; i < args.length; i++) { out.writeObject(encodeInvocationArgument(channel, inv, i)); }
out.writeObject(inv.getAttachments()); } }
|
至此,关于服务消费方发送请求的过程就分析完了,接下来我们来看一下服务提供方是如何接收请求的。
参考内容
[1]《深入理解Apache Dubbo与实战》
[2] dubbo 官方文档