Dubbo Provider 接收请求

引言

在 Dubbo 系列文章的最后,我们回过头来看一下整个 RPC 过程是如何运作起来的,本文着重介绍整个调用链路中 Provider 接收请求的执行过程,其他 Dubbo 相关文章均收录于 <Dubbo系列文章>

提供方接收请求

前面说过,默认情况下 Dubbo 使用 Netty 作为底层的通信框架。Netty 检测到有数据入站后,首先会通过解码器对数据进行解码,并将解码后的数据传递给下一个入站处理器的指定方法。所以在进行后续的分析之前,我们先来看一下数据解码过程。

这里直接分析请求数据的解码逻辑,忽略中间过程,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
public class ExchangeCodec extends TelnetCodec {

@Override
public Object decode(Channel channel, ChannelBuffer buffer) throws IOException {
int readable = buffer.readableBytes();
// 创建消息头字节数组
byte[] header = new byte[Math.min(readable, HEADER_LENGTH)];
// 读取消息头数据
buffer.readBytes(header);
// 调用重载方法进行后续解码工作
return decode(channel, buffer, readable, header);
}

@Override
protected Object decode(Channel channel, ChannelBuffer buffer, int readable, byte[] header) throws IOException {
// 检查魔数是否相等
if (readable > 0 && header[0] != MAGIC_HIGH
|| readable > 1 && header[1] != MAGIC_LOW) {
int length = header.length;
if (header.length < readable) {
header = Bytes.copyOf(header, readable);
buffer.readBytes(header, length, readable - length);
}
for (int i = 1; i < header.length - 1; i++) {
if (header[i] == MAGIC_HIGH && header[i + 1] == MAGIC_LOW) {
buffer.readerIndex(buffer.readerIndex() - header.length + i);
header = Bytes.copyOf(header, i);
break;
}
}
// 通过 telnet 命令行发送的数据包不包含消息头,所以这里
// 调用 TelnetCodec 的 decode 方法对数据包进行解码
return super.decode(channel, buffer, readable, header);
}

// 检测可读数据量是否少于消息头长度,若小于则立即返回 DecodeResult.NEED_MORE_INPUT
if (readable < HEADER_LENGTH) {
return DecodeResult.NEED_MORE_INPUT;
}

// 从消息头中获取消息体长度
int len = Bytes.bytes2int(header, 12);
// 检测消息体长度是否超出限制,超出则抛出异常
checkPayload(channel, len);

int tt = len + HEADER_LENGTH;
// 检测可读的字节数是否小于实际的字节数
if (readable < tt) {
return DecodeResult.NEED_MORE_INPUT;
}

ChannelBufferInputStream is = new ChannelBufferInputStream(buffer, len);

try {
// 继续进行解码工作
return decodeBody(channel, is, header);
} finally {
if (is.available() > 0) {
try {
StreamUtils.skipUnusedStream(is);
} catch (IOException e) {
logger.warn(e.getMessage(), e);
}
}
}
}
}

上面方法通过检测消息头中的魔数是否与规定的魔数相等,提前拦截掉非常规数据包,比如通过 telnet 命令行发出的数据包。接着再对消息体长度,以及可读字节数进行检测。最后调用 decodeBody 方法进行后续的解码工作,ExchangeCodec 中实现了 decodeBody 方法,但因其子类 DubboCodec 覆写了该方法,所以在运行时 DubboCodec 中的 decodeBody 方法会被调用。下面我们来看一下该方法的代码。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
public class DubboCodec extends ExchangeCodec implements Codec2 {

@Override
protected Object decodeBody(Channel channel, InputStream is, byte[] header) throws IOException {
// 获取消息头中的第三个字节,并通过逻辑与运算得到序列化器编号
byte flag = header[2], proto = (byte) (flag & SERIALIZATION_MASK);
Serialization s = CodecSupport.getSerialization(channel.getUrl(), proto);
// 获取调用编号
long id = Bytes.bytes2long(header, 4);
// 通过逻辑与运算得到调用类型,0 - Response,1 - Request
if ((flag & FLAG_REQUEST) == 0) {
// 对响应结果进行解码,得到 Response 对象。这个非本节内容,后面再分析
// ...
} else {
// 创建 Request 对象
Request req = new Request(id);
req.setVersion(Version.getProtocolVersion());
// 通过逻辑与运算得到通信方式,并设置到 Request 对象中
req.setTwoWay((flag & FLAG_TWOWAY) != 0);

// 通过位运算检测数据包是否为事件类型
if ((flag & FLAG_EVENT) != 0) {
// 设置心跳事件到 Request 对象中
req.setEvent(Request.HEARTBEAT_EVENT);
}
try {
Object data;
if (req.isHeartbeat()) {
// 对心跳包进行解码,该方法已被标注为废弃
data = decodeHeartbeatData(channel, deserialize(s, channel.getUrl(), is));
} else if (req.isEvent()) {
// 对事件数据进行解码
data = decodeEventData(channel, deserialize(s, channel.getUrl(), is));
} else {
DecodeableRpcInvocation inv;
// 根据 url 参数判断是否在 IO 线程上对消息体进行解码
if (channel.getUrl().getParameter(
Constants.DECODE_IN_IO_THREAD_KEY,
Constants.DEFAULT_DECODE_IN_IO_THREAD)) {
inv = new DecodeableRpcInvocation(channel, req, is, proto);
// 在当前线程,也就是 IO 线程上进行后续的解码工作。此工作完成后,可将
// 调用方法名、attachment、以及调用参数解析出来
inv.decode();
} else {
// 仅创建 DecodeableRpcInvocation 对象,但不在当前线程上执行解码逻辑
inv = new DecodeableRpcInvocation(channel, req,
new UnsafeByteArrayInputStream(readMessageData(is)), proto);
}
data = inv;
}

// 设置 data 到 Request 对象中
req.setData(data);
} catch (Throwable t) {
// 若解码过程中出现异常,则将 broken 字段设为 true,
// 并将异常对象设置到 Reqeust 对象中
req.setBroken(true);
req.setData(t);
}
return req;
}
}
}

如上,decodeBody 对部分字段进行了解码,并将解码得到的字段封装到 Request 中。随后会调用 DecodeableRpcInvocation 的 decode 方法进行后续的解码工作。此工作完成后,可将调用方法名、attachment、以及调用参数解析出来。下面我们来看一下 DecodeableRpcInvocation 的 decode 方法逻辑。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
public class DecodeableRpcInvocation extends RpcInvocation implements Codec, Decodeable {

@Override
public Object decode(Channel channel, InputStream input) throws IOException {
ObjectInput in = CodecSupport.getSerialization(channel.getUrl(), serializationType)
.deserialize(channel.getUrl(), input);

// 通过反序列化得到 dubbo version,并保存到 attachments 变量中
String dubboVersion = in.readUTF();
request.setVersion(dubboVersion);
setAttachment(Constants.DUBBO_VERSION_KEY, dubboVersion);

// 通过反序列化得到 path,version,并保存到 attachments 变量中
setAttachment(Constants.PATH_KEY, in.readUTF());
setAttachment(Constants.VERSION_KEY, in.readUTF());

// 通过反序列化得到调用方法名
setMethodName(in.readUTF());
try {
Object[] args;
Class<?>[] pts;
// 通过反序列化得到参数类型字符串,比如 Ljava/lang/String;
String desc = in.readUTF();
if (desc.length() == 0) {
pts = DubboCodec.EMPTY_CLASS_ARRAY;
args = DubboCodec.EMPTY_OBJECT_ARRAY;
} else {
// 将 desc 解析为参数类型数组
pts = ReflectUtils.desc2classArray(desc);
args = new Object[pts.length];
for (int i = 0; i < args.length; i++) {
try {
// 解析运行时参数
args[i] = in.readObject(pts[i]);
} catch (Exception e) {
if (log.isWarnEnabled()) {
log.warn("Decode argument failed: " + e.getMessage(), e);
}
}
}
}

// 设置参数类型数组
setParameterTypes(pts);

// 通过反序列化得到原 attachment 的内容
Map<String, String> map = (Map<String, String>) in.readObject(Map.class);
if (map != null && map.size() > 0) {
Map<String, String> attachment = getAttachments();
if (attachment == null) {
attachment = new HashMap<String, String>();
}
// 将 map 与当前对象中的 attachment 集合进行融合
attachment.putAll(map);
setAttachments(attachment);
}

// 对 callback 类型的参数进行处理
for (int i = 0; i < args.length; i++) {
args[i] = decodeInvocationArgument(channel, this, pts, i, args[i]);
}

// 设置参数列表
setArguments(args);

} catch (ClassNotFoundException e) {
throw new IOException(StringUtils.toString("Read invocation data failed.", e));
} finally {
if (in instanceof Cleanable) {
((Cleanable) in).cleanup();
}
}
return this;
}
}

上面的方法通过反序列化将诸如 path、version、调用方法名、参数列表等信息依次解析出来,并设置到相应的字段中,最终得到一个具有完整调用信息的 DecodeableRpcInvocation 对象。

到这里,请求数据解码的过程就分析完了。此时我们得到了一个 Request 对象,这个对象会被传送到下一个入站处理器中,我们继续往下看。

解码器将数据包解析成 Request 对象后,NettyHandler 的 messageReceived 方法紧接着会收到这个对象,并将这个对象继续向下传递。这期间该对象会被依次传递给 NettyServer、MultiMessageHandler、HeartbeatHandler 以及 AllChannelHandler。最后由 AllChannelHandler 将该对象封装到 Runnable 实现类对象中,并将 Runnable 放入线程池中执行后续的调用逻辑。整个调用栈如下:

1
2
3
4
5
6
NettyHandler#messageReceived(ChannelHandlerContext, MessageEvent)
—> AbstractPeer#received(Channel, Object)
—> MultiMessageHandler#received(Channel, Object)
—> HeartbeatHandler#received(Channel, Object)
—> AllChannelHandler#received(Channel, Object)
—> ExecutorService#execute(Runnable) // 由线程池执行后续的调用逻辑

这里我们直接分析调用栈中的分析第一个和最后一个调用方法逻辑。如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
@Sharable
public class NettyHandler extends SimpleChannelHandler {

private final Map<String, Channel> channels = new ConcurrentHashMap<String, Channel>();

private final URL url;

private final ChannelHandler handler;

public NettyHandler(URL url, ChannelHandler handler) {
if (url == null) {
throw new IllegalArgumentException("url == null");
}
if (handler == null) {
throw new IllegalArgumentException("handler == null");
}
this.url = url;

// 这里的 handler 类型为 NettyServer
this.handler = handler;
}

public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
// 获取 NettyChannel
NettyChannel channel = NettyChannel.getOrAddChannel(ctx.getChannel(), url, handler);
try {
// 继续向下调用
handler.received(channel, e.getMessage());
} finally {
NettyChannel.removeChannelIfDisconnected(ctx.getChannel());
}
}
}

如上,NettyHandler 中的 messageReceived 逻辑比较简单。首先根据一些信息获取 NettyChannel 实例,然后将 NettyChannel 实例以及 Request 对象向下传递。下面再来看看 AllChannelHandler 的逻辑,在详细分析代码之前,我们先来了解一下 Dubbo 中的线程派发模型。

Dubbo 将底层通信框架中接收请求的线程称为 IO 线程。如果一些事件处理逻辑可以很快执行完,比如只在内存打一个标记,此时直接在 IO 线程上执行该段逻辑即可。但如果事件的处理逻辑比较耗时,比如该段逻辑会发起数据库查询或者 HTTP 请求。此时我们就不应该让事件处理逻辑在 IO 线程上执行,而是应该派发到线程池中去执行。原因也很简单,IO 线程主要用于接收请求,如果 IO 线程被占满,将导致它不能接收新的请求。
dubbo-thread-model
如上图,Dispatcher 就是线程派发器。需要说明的是,Dispatcher 真实的职责创建具有线程派发能力的 ChannelHandler,比如 AllChannelHandler、MessageOnlyChannelHandler 和 ExecutionChannelHandler 等,其本身并不具备线程派发能力。Dubbo 支持 5 种不同的线程派发策略,下面通过一个表格列举一下。
execution-policy
默认配置下,Dubbo 使用 all 派发策略,即将所有的消息都派发到线程池中。下面我们来分析一下 AllChannelHandler 的代码。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
public class AllChannelHandler extends WrappedChannelHandler {

public AllChannelHandler(ChannelHandler handler, URL url) {
super(handler, url);
}

/** 处理连接事件 */
@Override
public void connected(Channel channel) throws RemotingException {
// 获取线程池
ExecutorService cexecutor = getExecutorService();
try {
// 将连接事件派发到线程池中处理
cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CONNECTED));
} catch (Throwable t) {
throw new ExecutionException(..., " error when process connected event .", t);
}
}

/** 处理断开事件 */
@Override
public void disconnected(Channel channel) throws RemotingException {
ExecutorService cexecutor = getExecutorService();
try {
cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.DISCONNECTED));
} catch (Throwable t) {
throw new ExecutionException(..., "error when process disconnected event .", t);
}
}

/** 处理请求和响应消息,这里的 message 变量类型可能是 Request,也可能是 Response */
@Override
public void received(Channel channel, Object message) throws RemotingException {
ExecutorService cexecutor = getExecutorService();
try {
// 将请求和响应消息派发到线程池中处理
cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
} catch (Throwable t) {
if(message instanceof Request && t instanceof RejectedExecutionException){
Request request = (Request)message;
// 如果通信方式为双向通信,此时将 Server side ... threadpool is exhausted
// 错误信息封装到 Response 中,并返回给服务消费方。
if(request.isTwoWay()){
String msg = "Server side(" + url.getIp() + "," + url.getPort()
+ ") threadpool is exhausted ,detail msg:" + t.getMessage();
Response response = new Response(request.getId(), request.getVersion());
response.setStatus(Response.SERVER_THREADPOOL_EXHAUSTED_ERROR);
response.setErrorMessage(msg);
// 返回包含错误信息的 Response 对象
channel.send(response);
return;
}
}
throw new ExecutionException(..., " error when process received event .", t);
}
}

/** 处理异常信息 */
@Override
public void caught(Channel channel, Throwable exception) throws RemotingException {
ExecutorService cexecutor = getExecutorService();
try {
cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CAUGHT, exception));
} catch (Throwable t) {
throw new ExecutionException(..., "error when process caught event ...");
}
}
}

如上,请求对象会被封装 ChannelEventRunnable 中,ChannelEventRunnable 将会是服务调用过程的新起点。所以接下来我们以 ChannelEventRunnable 为起点向下探索。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
public class ChannelEventRunnable implements Runnable {

private final ChannelHandler handler;
private final Channel channel;
private final ChannelState state;
private final Throwable exception;
private final Object message;

@Override
public void run() {
// 检测通道状态,对于请求或响应消息,此时 state = RECEIVED
if (state == ChannelState.RECEIVED) {
try {
// 将 channel 和 message 传给 ChannelHandler 对象,进行后续的调用
handler.received(channel, message);
} catch (Exception e) {
logger.warn("... operation error, channel is ... message is ...");
}
}

// 其他消息类型通过 switch 进行处理
else {
switch (state) {
case CONNECTED:
try {
handler.connected(channel);
} catch (Exception e) {
logger.warn("... operation error, channel is ...");
}
break;
case DISCONNECTED:
// ...
case SENT:
// ...
case CAUGHT:
// ...
default:
logger.warn("unknown state: " + state + ", message is " + message);
}
}

}
}

如上,请求和响应消息出现频率明显比其他类型消息高,所以这里对该类型的消息进行了针对性判断。ChannelEventRunnable 仅是一个中转站,它的 run 方法中并不包含具体的调用逻辑,仅用于将参数传给其他 ChannelHandler 对象进行处理,该对象类型为 DecodeHandler。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
public class DecodeHandler extends AbstractChannelHandlerDelegate {

public DecodeHandler(ChannelHandler handler) {
super(handler);
}

@Override
public void received(Channel channel, Object message) throws RemotingException {
if (message instanceof Decodeable) {
// 对 Decodeable 接口实现类对象进行解码
decode(message);
}

if (message instanceof Request) {
// 对 Request 的 data 字段进行解码
decode(((Request) message).getData());
}

if (message instanceof Response) {
// 对 Request 的 result 字段进行解码
decode(((Response) message).getResult());
}

// 执行后续逻辑
handler.received(channel, message);
}

private void decode(Object message) {
// Decodeable 接口目前有两个实现类,
// 分别为 DecodeableRpcInvocation 和 DecodeableRpcResult
if (message != null && message instanceof Decodeable) {
try {
// 执行解码逻辑
((Decodeable) message).decode();
} catch (Throwable e) {
if (log.isWarnEnabled()) {
log.warn("Call Decodeable.decode failed: " + e.getMessage(), e);
}
}
}
}
}

DecodeHandler 主要是包含了一些解码逻辑。前面分析请求解码时说过,请求解码可在 IO 线程上执行,也可在线程池中执行,这个取决于运行时配置。DecodeHandler 存在的意义就是保证请求或响应对象可在线程池中被解码。解码完毕后,完全解码后的 Request 对象会继续向后传递,下一站是 HeaderExchangeHandler。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
public class HeaderExchangeHandler implements ChannelHandlerDelegate {

private final ExchangeHandler handler;

public HeaderExchangeHandler(ExchangeHandler handler) {
if (handler == null) {
throw new IllegalArgumentException("handler == null");
}
this.handler = handler;
}

@Override
public void received(Channel channel, Object message) throws RemotingException {
channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis());
ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);
try {
// 处理请求对象
if (message instanceof Request) {
Request request = (Request) message;
if (request.isEvent()) {
// 处理事件
handlerEvent(channel, request);
}
// 处理普通的请求
else {
// 双向通信
if (request.isTwoWay()) {
// 向后调用服务,并得到调用结果
Response response = handleRequest(exchangeChannel, request);
// 将调用结果返回给服务消费端
channel.send(response);
}
// 如果是单向通信,仅向后调用指定服务即可,无需返回调用结果
else {
handler.received(exchangeChannel, request.getData());
}
}
}
// 处理响应对象,服务消费方会执行此处逻辑,后面分析
else if (message instanceof Response) {
handleResponse(channel, (Response) message);
} else if (message instanceof String) {
// telnet 相关,忽略
} else {
handler.received(exchangeChannel, message);
}
} finally {
HeaderExchangeChannel.removeChannelIfDisconnected(channel);
}
}

Response handleRequest(ExchangeChannel channel, Request req) throws RemotingException {
Response res = new Response(req.getId(), req.getVersion());
// 检测请求是否合法,不合法则返回状态码为 BAD_REQUEST 的响应
if (req.isBroken()) {
Object data = req.getData();

String msg;
if (data == null)
msg = null;
else if
(data instanceof Throwable) msg = StringUtils.toString((Throwable) data);
else
msg = data.toString();
res.setErrorMessage("Fail to decode request due to: " + msg);
// 设置 BAD_REQUEST 状态
res.setStatus(Response.BAD_REQUEST);

return res;
}

// 获取 data 字段值,也就是 RpcInvocation 对象
Object msg = req.getData();
try {
// 继续向下调用
Object result = handler.reply(channel, msg);
// 设置 OK 状态码
res.setStatus(Response.OK);
// 设置调用结果
res.setResult(result);
} catch (Throwable e) {
// 若调用过程出现异常,则设置 SERVICE_ERROR,表示服务端异常
res.setStatus(Response.SERVICE_ERROR);
res.setErrorMessage(StringUtils.toString(e));
}
return res;
}
}

到这里,我们看到了比较清晰的请求和响应逻辑。对于双向通信,HeaderExchangeHandler 首先向后进行调用,得到调用结果。然后将调用结果封装到 Response 对象中,最后再将该对象返回给服务消费方。如果请求不合法,或者调用失败,则将错误信息封装到 Response 对象中,并返回给服务消费方。接下来我们继续向后分析,把剩余的调用过程分析完。下面分析定义在 DubboProtocol 类中的匿名类对象逻辑,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
public class DubboProtocol extends AbstractProtocol {

public static final String NAME = "dubbo";

private ExchangeHandler requestHandler = new ExchangeHandlerAdapter() {

@Override
public Object reply(ExchangeChannel channel, Object message) throws RemotingException {
if (message instanceof Invocation) {
Invocation inv = (Invocation) message;
// 获取 Invoker 实例
Invoker<?> invoker = getInvoker(channel, inv);
if (Boolean.TRUE.toString().equals(inv.getAttachments().get(IS_CALLBACK_SERVICE_INVOKE))) {
// 回调相关,忽略
}
RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress());
// 通过 Invoker 调用具体的服务
return invoker.invoke(inv);
}
throw new RemotingException(channel, "Unsupported request: ...");
}

// 忽略其他方法
}

Invoker<?> getInvoker(Channel channel, Invocation inv) throws RemotingException {
// 忽略回调和本地存根相关逻辑
// ...

int port = channel.getLocalAddress().getPort();

// 计算 service key,格式为 groupName/serviceName:serviceVersion:port。比如:
// dubbo/com.alibaba.dubbo.demo.DemoService:1.0.0:20880
String serviceKey = serviceKey(port, path, inv.getAttachments().get(Constants.VERSION_KEY), inv.getAttachments().get(Constants.GROUP_KEY));

// 从 exporterMap 查找与 serviceKey 相对应的 DubboExporter 对象,
// 服务导出过程中会将 <serviceKey, DubboExporter> 映射关系存储到 exporterMap 集合中
DubboExporter<?> exporter = (DubboExporter<?>) exporterMap.get(serviceKey);

if (exporter == null)
throw new RemotingException(channel, "Not found exported service ...");

// 获取 Invoker 对象,并返回
return exporter.getInvoker();
}

// 忽略其他方法
}

以上逻辑用于获取与指定服务对应的 Invoker 实例,并通过 Invoker 的 invoke 方法调用服务逻辑。invoke 方法定义在 AbstractProxyInvoker 中,代码如下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public abstract class AbstractProxyInvoker<T> implements Invoker<T> {

@Override
public Result invoke(Invocation invocation) throws RpcException {
try {
// 调用 doInvoke 执行后续的调用,并将调用结果封装到 RpcResult 中,并
return new RpcResult(doInvoke(proxy, invocation.getMethodName(), invocation.getParameterTypes(), invocation.getArguments()));
} catch (InvocationTargetException e) {
return new RpcResult(e.getTargetException());
} catch (Throwable e) {
throw new RpcException("Failed to invoke remote proxy method ...");
}
}

protected abstract Object doInvoke(T proxy, String methodName, Class<?>[] parameterTypes, Object[] arguments) throws Throwable;
}

如上,doInvoke 是一个抽象方法,这个需要由具体的 Invoker 实例实现。Invoker 实例是在运行时通过 JavassistProxyFactory 创建的,创建逻辑如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public class JavassistProxyFactory extends AbstractProxyFactory {

// 省略其他方法

@Override
public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
// 创建匿名类对象
return new AbstractProxyInvoker<T>(proxy, type, url) {
@Override
protected Object doInvoke(T proxy, String methodName,
Class<?>[] parameterTypes,
Object[] arguments) throws Throwable {
// 调用 invokeMethod 方法进行后续的调用
return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
}
};
}
}

Wrapper 是一个抽象类,其中 invokeMethod 是一个抽象方法。Dubbo 会在运行时通过 Javassist 框架为 Wrapper 生成实现类,并实现 invokeMethod 方法,该方法最终会根据调用信息调用具体的服务。以 DemoServiceImpl 为例,Javassist 为其生成的代理类如下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
/** Wrapper0 是在运行时生成的,大家可使用 Arthas 进行反编译 */
public class Wrapper0 extends Wrapper implements ClassGenerator.DC {
public static String[] pns;
public static Map pts;
public static String[] mns;
public static String[] dmns;
public static Class[] mts0;

// 省略其他方法

public Object invokeMethod(Object object, String string, Class[] arrclass, Object[] arrobject) throws InvocationTargetException {
DemoService demoService;
try {
// 类型转换
demoService = (DemoService)object;
}
catch (Throwable throwable) {
throw new IllegalArgumentException(throwable);
}
try {
// 根据方法名调用指定的方法
if ("sayHello".equals(string) && arrclass.length == 1) {
return demoService.sayHello((String)arrobject[0]);
}
}
catch (Throwable throwable) {
throw new InvocationTargetException(throwable);
}
throw new NoSuchMethodException(new StringBuffer().append("Not found method \"").append(string).append("\" in class com.alibaba.dubbo.demo.DemoService.").toString());
}
}

到这里,整个服务调用过程就分析完了。最后把调用过程贴出来,如下:

1
2
3
4
5
6
7
8
9
ChannelEventRunnable#run()
—> DecodeHandler#received(Channel, Object)
—> HeaderExchangeHandler#received(Channel, Object)
—> HeaderExchangeHandler#handleRequest(ExchangeChannel, Request)
—> DubboProtocol.requestHandler#reply(ExchangeChannel, Object)
—> Filter#invoke(Invoker, Invocation)
—> AbstractProxyInvoker#invoke(Invocation)
—> Wrapper0#invokeMethod(Object, String, Class[], Object[])
—> DemoServiceImpl#sayHello(String)

参考内容

[1]《深入理解Apache Dubbo与实战》
[2] dubbo 官方文档

贝克街的流浪猫 wechat
您的打赏将鼓励我继续分享!
  • 本文作者: 贝克街的流浪猫
  • 本文链接: https://www.beikejiedeliulangmao.top/middleware/dubbo/provider-receive-request/
  • 版权声明: 本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!
  • 创作声明: 本文基于上述所有参考内容进行创作,其中可能涉及复制、修改或者转换,图片均来自网络,如有侵权请联系我,我会第一时间进行删除。