自己动手实现一个RPC框架(六)

发布于 — 2020 年 03 月 25 日
#RPC

rpc-server

消费者的部分,这里使用配置类,将各种实现的部分在配置类中进行定义。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class ServerConfig {
	/**
	 * 监听端口
	 */
	private int port = 9090;
	/**
	 * 网络传输
	 */
	private Class<? extends TransportServer> transportClass = NettyServer.class;
	/**
	 * 注册中心
	 */
	private Class<? extends RpcRegister> rpcRegister = ZookeeperRegistry.class;
	/**
	 * 编码
	 */
	private Class<? extends Encoder> encoder = FastJsonEncoder.class;
	/**
	 * 解码
	 */
	private Class<? extends Decoder> decoder = FastJsonDecoder.class;

}

这这个配置中,定义了服务启动的端口,网络传输,注册中心,编解码的各种实现,当我们需要更换实现时只需要在这里修改即可。

请求的实际处理类

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public class RpcRequestHandler implements RequestHandler {
   private final Decoder decoder;
   private final Encoder encoder;
   public RpcRequestHandler(Decoder decoder, Encoder encoder) {
      this.decoder = decoder;
      this.encoder = encoder;
   }

   @Override
   public Command handle(Command requestCommand) {
      Header header = requestCommand.getHeader();
      //反序列化RpcRequest
      RequestInfo requestInfo = decoder.decode(requestCommand.getBytes(), RequestInfo.class);
      try {
         //客户端在注册中心获取到实现类和地址
         ResponseServiceDescription responseServiceDescription = requestInfo.getResponseServiceDescription();
         //通过反射进行调用
         Class implClass = Class.forName(responseServiceDescription.getImplName());
         Object implInstance = implClass.newInstance();
         Method method = implClass.getMethod(requestInfo.getResponseServiceDescription().getMethod(), requestInfo.getResponseServiceDescription().getParameterTypes());
         Object result = method.invoke(implInstance, requestInfo.getParameters());
         //将结果封装成响应进行返回
         return new Command(new ResponseHeader(header.getRequestId(), header.getVersion(), ResponseHeader.SUCCESS_CODE, ResponseHeader.SUCCESS_MSG), encoder.encode(result));
      } catch (Throwable t) {
         //发生异常,返回错误信息
         log.warn("Exception:", t);
         return new Command(new ResponseHeader(header.getRequestId(), header.getVersion(), -1, t.getMessage()), new byte[0]);
      }
   }
}

这个类实现自rpc-transport中的RequestHandler接口

主类:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
public class RpcServer {
	/**
	 * 服务配置类
	 */
	private ServerConfig serverConfig;
	/**
	 * 网络传输服务端
	 */
	private TransportServer transportServer;
	/**
	 * 注册中心
	 */
	private RpcRegister rpcRegister;

	public RpcServer() {
		this(new ServerConfig());
	}

	public RpcServer(ServerConfig serverConfig) {
		this.serverConfig = serverConfig;
		this.transportServer = ReflectionUtils.newInstance(serverConfig.getTransportClass());
		Encoder encoder = ReflectionUtils.newInstance(serverConfig.getEncoder());
		Decoder decoder = ReflectionUtils.newInstance(serverConfig.getDecoder());
		this.transportServer.init(new RpcRequestHandler(decoder, encoder));
		this.rpcRegister = ReflectionUtils.newInstance(serverConfig.getRpcRegister());
	}

	/**
	 * 注册服务
	 * @param interfaceClass 接口类
	 * @param impl           实现类
	 * @param version        实现的版本号
	 * @param <T>            接口类型
	 */
	public <T> void register(Class<T> interfaceClass, Class<? extends T> impl, String version) {
		Method[] methods = ReflectionUtils.getPublicMethods(interfaceClass);
		for (Method method : methods) {
			ServiceDescriptor serviceDescriptor = ServiceDescriptor.from(interfaceClass, version, method);
			ResponseServiceDescription responseServiceDescription = formResponseServiceDescription(interfaceClass, method, version, impl);
			rpcRegister.register(serviceDescriptor, responseServiceDescription);
			log.info("register service:{}{} ", serviceDescriptor.getClazz(), serviceDescriptor.getMethod());
		}
	}

	/**
	 * 启动服务
	 */
	public void start() {
		try {
			this.transportServer.start(serverConfig.getPort());
		} catch (InterruptedException e) {
			e.printStackTrace();
			log.error("server start failed:{}", e.getMessage());
		}
	}

	public void stop() {
		this.transportServer.stop();
	}


	private <T> ResponseServiceDescription formResponseServiceDescription(Class<T> interfaceClass, Method method, String version, Class<? extends T> impl) {
		return ResponseServiceDescription.from(interfaceClass, version, method, impl, getURI());
	}

	/**
	 * 返回这个实例的地址和端口,由于本地调用所以就直接返回了localhost
	 * @return
	 */
	private URI getURI() {
		String host = "localhost";
		return URI.create("rpc://" + host + ":" + serverConfig.getPort());
	}
}

生产者调用流程:

  1. 初始化new RpcServer()

  2. 注册服务register()

  3. 启动服务start()

其他部分链接