J.A.R.V.I.S

life is not just live

自己动手实现一个RPC框架(七)

rpc-client

消费者端,通过代理来进行调用。

与生产者端类型,首先定义配置类:

1
2
3
4
5
6
7
8
9
10
public class ClientConfig {

private Class<? extends Encoder> encoder = FastJsonEncoder.class;

private Class<? extends Decoder> decoder = FastJsonDecoder.class;

private Class<? extends TransportClient> transportClient = NettyClient.class;

private Class<? extends RpcRegister> rpcRegister = ZookeeperRegistry.class;
}

代理类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
public class RemoteInvoker implements InvocationHandler {

/**
* 请求的对象
*/
private Class clazz;
/**
* 编码
*/
private Encoder encoder;
/**
* 解码
*/
private Decoder decoder;
/**
* 网络传输
*/
private TransportClient transportClient;
/**
* 注册中心
*/
private RpcRegister rpcRegister;

private String version;


@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
//首先从注册中心查询
ResponseServiceDescription responseServiceDescription = lookup(clazz, version, method);
//创建连接
Transport transport = transportClient.createTransport(new InetSocketAddress(responseServiceDescription.getUri().getHost(), responseServiceDescription.getUri().getPort()), 30000L);
//构建请求信息
Header header = new Header();
header.setRequestId(IDUtil.nextId());
header.setVersion(1);
RequestInfo requestInfo = new RequestInfo(responseServiceDescription, args);
Command requestCommand = new Command(header, encoder.encode(requestInfo));
//发送请求
CompletableFuture<Command> future = transport.sendRequest(requestCommand);
//获取响应
Command responseCommand = future.get();
Header respHeader = responseCommand.getHeader();
if (respHeader instanceof ResponseHeader) {
//对响应信息做判断
ResponseHeader responseHeader = (ResponseHeader) respHeader;
if (responseHeader.getCode() != ResponseHeader.SUCCESS_CODE) {
throw new IllegalStateException(responseHeader.getMsg());
}
}
//返回响应结果
return decoder.decode(responseCommand.getBytes(), method.getReturnType());
}

/**
* 向注册中心查询
*/
private ResponseServiceDescription lookup(Class clazz, String version, Method method) {
ServiceDescriptor serviceDescriptor = ServiceDescriptor.from(clazz, version, method);
ResponseServiceDescription responseServiceDescription = rpcRegister.lookup(serviceDescriptor);
if (responseServiceDescription == null) {
throw new IllegalStateException("provider not exist!");
}
return responseServiceDescription;
}
}

其他部分链接