背景 当单体项目逐渐扩大后,一个项目编译,发布可能需要很久的时间,如果其中一个文件出现 bug,那么需要对整个项目进行打包发布。微服务就是对项目进行拆分,拆分成多个小项目,由这些小项目组成大项目。并且拆分成小项目后,其他单体项目中需要相同功能的地方就不用再次编写,直接用这个就可以了。有种分治
的思想。
那么原来单体项目拆分后,随之而来就会出现一些问题。原来一个项目中直接调用即可,现在请求的类被拆分到其他项目中,如何进行请求,是采用 http 这种请求还是rpc?多个模块如果进行管理等等一系列问题。这里主要写一下 rpc 的理解。
在微服务中rpc又是重要的一环,现在主流的rpc框架有很多,比如阿里的dubbo
,微博的Motan
,谷歌的gRpc
,还有Thrift
,现在主流的应该就这几种吧。按照文档学习了一下dubbo
如何使用后,发现并没有了解rpc是如何具体实现的。所以这篇文章记录了自己对rpc的一些理解与实战代码。
代码链接
HTTP 与 RPC 的对比 其实就像一些技术一样,没有绝对好的技术,不然大家都去使用它了。都只是在不同场景下有各自的优势。
rpc一般是自带负载均衡策略,而 http 一般是通过 nginx 这种来实现负载均衡
rpc可以使用 tcp 协议也可以使用 http 协议,而 http 就只能使用 http 协议
rpc可以自定义传输信息和序列化方法,减少传输报文大小。
所以RPC主要用于公司内部的服务调用。HTTP主要用于对外的环境,浏览器接口调用,APP接口调用,第三方接口调用等。
RPC的主要实现步骤
各个模块的作用
封装rpc请求
我们在调用其他代码的时候要让别人知道我们调用的是什么类的什么方法,传递了什么参数等信息。所以我们需要将这些信息进行封装起来,然后进行传输。
序列化与反序列化
这个主要是传输数据的大小和跨语言的实现,不同的序列化方式会导致我们在网络传输中传输大小不同的信息,所以这个也是影响性能的一部分
跨语言:这个就像我们说话的方言一样,我们说的方言怎么能让其他人听懂呢,那我们就都说普通话吧,这样大家就比较好理解你的意思了。在编程语言的世界里,java
说的话怎么能让go
,php
等语言听懂呢?我们就定一个协议吧,大家都遵守这个协议,就能明白干什么了,所以gRpc
和Thrift
就使用了这种的序列化规则。那么有人就说了,既然能使用*普通话*
这个标准,为什么其他的框架不用呢?其实这就看公司使用的技术了,如果各个部门都使用的是同一种技术框架,也没有发展其他语言的项目(都是一个地方的人说同一种方言),就没有必要非去弄*普通话*
了。
网络传输
序列化完成的数据在网络进行传输,比如现在大部分都在使用的netty
技术。
负载均衡
消费者从注册中心(如果有)获取生产者的ip地址要进行通信了,但是这些生产者的性能可能不一样,我们可以对性能好的多访问几次,性能差的少访问几次。最简单的方式就是轮询,有几个生产者就轮着来,这个基本上都是针对自己公司情况来实现。
动态代理
在消费者调用生产者时,我们只需调用接口就能接收到返回信息,那么什么时候封装rpc请求了呢,怎么从注册中心找的节点信息等。这里就用到了aop的原理。并且我们不可能仅仅调用一个方法,如果使用静态代理,那么我们有多少个类就要有多个的代理,并且框架也不知道我们会有什么类,所以就需要使用动态代理。动态代理的作用主要是:在不改变目标对象方法的情况下对方法进行增强
反射
动态代理其实也是基于反射实现的,常见的动态代理有:JDK动态代理 ,Cglib 。
jdk动态代理就是基于反射机制实现的。这些东西我也没有具体去理解去看这一块,我想应该是类似这样的:并不清楚调用的具体类是什么,使用一个Object
类型来接收,只有当你真正调用的时候才知道这个类是student
还是teacher
,知道了之后再去调用。
项目模块 这里在网络传输使用bio
的方式,序列化就使用java
默认的序列化方式。
1 2 3 4 5 6 7 8 9 10 11 12 13 learn-demo-rpc ├── bio-socket ├── nio-socket ├── simple-api ├── rpc-common ├── simple-rpc ├── simple-rpc-consumer ├── simple-rpc-provider └── simple-rpc-core └── zookeeper-register-rpc ├── zookeeper-register-consumer ├── zookeeper-register-provider └── zookeeper-register-core
bio-socket 以bio的方式实现生产者与消费者之间的通信模块
nio-socket 以nio的方式实现生产者与消费者之间的通信模块
simple-api 是定义的公共接口模块
rpc-common 对rpc请求和响应包装的一些实体类
simple-rpc 无注册中心的简单rpc调用实现
Zookeeper-register-rpc 通过zookeeper注册中心的rpc调用实现
-core 是核心实现模块
-provider 是生产者的实现模块
-consumer 是消费者的调用模块
定义接口模块和对应请求响应的包装类 接口模块 这个模块就比较简单了,写个接口,写个方法就可以了
1 2 3 4 5 public interface ISayHello { String sayHello (String name) ; }
包装请求和响应信息 这里就要考虑了,我们如果调用一个方法,需要知道一些什么东西才能调用呢。我们在本地调用一个的时候是这样调用的ClassA.methodB(paramC,paramD)
,那现在我们知道了classA,所以其他的就是下面这几个了:方法名,参数类型,参数。
返回信息的时候需要返回一些什么信息呢。首先肯定有个请求方法的返回值,其他的还需要什么呢,参考了一下网络调用的返回信息,我又添加了状态码,提示信息这两个字段。
然后序列化方式就用默认的方式即可,实现个接口然后定义id就可以了。
请求信息 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 public class RpcRequest implements Serializable { private static final long serialVersionUID = 5837872617706737632L ; private String methodName; private Object[] parameters; private Class<?>[] parameterTypes; public String getMethodName () { return methodName; } public RpcRequest setMethodName (String methodName) { this .methodName = methodName; return this ; } public Object[] getParameters() { return parameters; } public RpcRequest setParameters (Object[] parameters) { this .parameters = parameters; return this ; } public Class<?>[] getParameterTypes() { return parameterTypes; } public RpcRequest setParameterTypes (Class<?>[] parameterTypes) { this .parameterTypes = parameterTypes; return this ; } @Override public String toString () { return "RpcRequest{" + "methodName='" + methodName + '\'' + ", parameters=" + Arrays.toString(parameters) + ", parameterTypes=" + Arrays.toString(parameterTypes) + '}' ; } }
响应信息 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 public class RpcResponse implements Serializable { private static final long serialVersionUID = -4129585144798112980L ; public static int SUCCEED = 200 ; public static int FAILED = 500 ; private int status = 200 ; private String message; private Object data; public int getStatus () { return status; } public RpcResponse setStatus (int status) { this .status = status; return this ; } public String getMessage () { return message; } public RpcResponse setMessage (String message) { this .message = message; return this ; } public Object getData () { return data; } public RpcResponse setData (Object data) { this .data = data; return this ; } @Override public String toString () { return "RpcResponse{" + "status='" + status + '\'' + ", message='" + message + '\'' + ", data=" + data + '}' ; } }
定义bio的socket通信模块 也试过使用nio的方式,但是返回信息在子线程里面返回了,需要使用线程通知机制,后面研究后会再更新。
服务端代码 提供了一个开启服务端的方法,传入端口,请求的类名和实现类即可
收到客户端发送的消息后先将其转换为对象,判断客户端发送的信息是不是我们包装好的rpc请求信息,如果是rpc请求那么我们再进行处理,并将结果进行返回
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 public class BioServer <T > { private static final Logger log = LoggerFactory.getLogger(BioServer.class ) ; public void export (int port, Class<?> interfaceClass, T ref) { try { log.info(" bio rpc server is starting,address:{},port:{} " , InetAddress.getLocalHost().getHostAddress(), port); ServerSocket serverSocket = new ServerSocket(port); while (true ) { Socket client = serverSocket.accept(); ObjectInputStream objectInputStream = new ObjectInputStream(client.getInputStream()); Object object = objectInputStream.readObject(); if (object instanceof RpcRequest) { RpcRequest request = (RpcRequest) object; log.info("bio rpc server get the client request:{}" , request); RpcResponse response = handleRequest(request, interfaceClass, ref); ObjectOutputStream objectOutputStream = new ObjectOutputStream(client.getOutputStream()); objectOutputStream.writeObject(response); } } } catch (Exception e) { log.error("bio rpc server start failed:{}" , e.toString()); } } public RpcResponse handleRequest (RpcRequest request, Class<?> interfaceClass, T ref) { RpcResponse response = new RpcResponse(); try { Method method = interfaceClass.getMethod(request.getMethodName(), request.getParameterTypes()); Object data = method.invoke(ref, request.getParameters()); response.setData(data); } catch (Exception e) { response.setStatus(RpcResponse.FAILED).setMessage(e.getMessage()); } return response; } }
客户端代码 客户端的调用在设置好服务端的ip和端口后就可以直接发送数据了,发送的数据格式也是我们进行封装过的,返回信息格式也是我们进行封装完成的。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 public class BioClient { private static final Logger log = LoggerFactory.getLogger(BioClient.class ) ; private String address; private int port; public String getAddress () { return address; } public void setAddress (String address) { this .address = address; } public int getPort () { return port; } public void setPort (int port) { this .port = port; } public RpcResponse send (RpcRequest rpcRequest) { try { Socket socket = new Socket(address, port); ObjectOutputStream objectOutputStream = new ObjectOutputStream(socket.getOutputStream()); objectOutputStream.writeObject(rpcRequest); ObjectInputStream objectInputStream = new ObjectInputStream(socket.getInputStream()); Object object = objectInputStream.readObject(); if (object instanceof RpcResponse) { return (RpcResponse) object; } } catch (Exception e) { log.error("the rpc client start failed:{}" , e.toString()); } return null ; } }
无注册中心的简单rpc调用 核心实现 生产者: 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 private static final Logger log = LoggerFactory.getLogger(RpcProvider.class ) ; private Class<?> interfaceClass; private T interfaceImpl; public void setInterfaceImpl (T ref) { this .interfaceImpl = ref; } public RpcProvider<T> setInterfaceClass (Class<?> interfaceClass) { this .interfaceClass = interfaceClass; return this ; } public void export (int port) { BioServer bioServer = new BioServer(); bioServer.export(port, interfaceClass, interfaceImpl); } }
消费者 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 public class RpcConsumer { private String address; private int port; private Class<?> interfaceClass; public RpcConsumer setAddress (String address) { this .address = address; return this ; } public RpcConsumer setPort (int port) { this .port = port; return this ; } public RpcConsumer setInterface (Class<?> interfaceClass) { this .interfaceClass = interfaceClass; return this ; } public <T> T get () { BioClient client = new BioClient(); client.setAddress(address); client.setPort(port); RpcInvocationHandler handler = new RpcInvocationHandler(client); return (T) Proxy.newProxyInstance(interfaceClass.getClassLoader(), new Class[]{interfaceClass}, handler); } }
代理类: 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 public class RpcInvocationHandler implements InvocationHandler { private BioClient bioClient; public RpcInvocationHandler (BioClient bioClient) { this .bioClient = bioClient; } @Override public Object invoke (Object proxy, Method method, Object[] args) throws Throwable { RpcRequest rpcRequest = new RpcRequest(); rpcRequest.setMethodName(method.getName()) .setParameterTypes(method.getParameterTypes()) .setParameters(args); RpcResponse rpcResponse = bioClient.send(rpcRequest); if (RpcResponse.SUCCEED == rpcResponse.getStatus()) { return rpcResponse.getData(); } throw new RuntimeException(rpcResponse.getMessage()); } }
生产者 实现接口 1 2 3 4 5 6 7 8 public class SimpleRpcProviderImpl implements ISayHello { @Override public String sayHello (String name) { return "hello," + name + "\n I am simple rpc provider." ; } }
调用类 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 public class SimpleRpcBioProvider { public static void main (String[] args) { SimpleRpcProviderImpl simpleRpcProvider = new SimpleRpcProviderImpl(); RpcProvider<ISayHello> provider = new RpcProvider<>(); provider.setInterfaceClass(ISayHello.class ) .setInterfaceImpl (simpleRpcProvider ) ; provider.export(9090 ); } }
消费者 调用类 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 public class SimpleRpcBioConsumer { public static void main (String[] args) { RpcConsumer rpcConsumer = new RpcConsumer(); rpcConsumer.setAddress("127.0.0.1" ) .setPort(9090 ) .setInterface(ISayHello.class ) ; ISayHello iSayHello = rpcConsumer.get(); System.out.println(iSayHello.sayHello("niki" )); } }
这样就实现了一个简单的rpc调用。
使用zookeeper作为注册中心的简单实现 使用zookeeper后主要添加的东西是:
生产者启动后向注册中心注册
消费者调用时先向注册中心请求节点信息(没有加缓存)
负载就使用随机访问。
核心实现 生产者 定义了几个参数,主要是实现类,接口类,注册中心的类。
定义了启动bio socket
通信的方法。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 public class RpcProvider <T > { private static final Logger log = LoggerFactory.getLogger(RpcProvider.class ) ; private T interfaceImpl; private Class<?> interfaceClass; private RpcZKRegistryService rpcZKRegistryService; public void setInterfaceImpl (T interfaceImpl) { this .interfaceImpl = interfaceImpl; } public RpcProvider<T> setInterfaceClass (Class<?> interfaceClass) { this .interfaceClass = interfaceClass; return this ; } public RpcProvider<T> setRpcZKRegistryService (String zkConnectString) { this .rpcZKRegistryService = new RpcZKRegistryService(zkConnectString); return this ; } public void export (int port) { ProviderInfo providerInfo = new ProviderInfo(); try { providerInfo.setAddress(InetAddress.getLocalHost().getHostAddress()) .setPort(port) .setId(interfaceClass.getName()); rpcZKRegistryService.register(providerInfo); BioServer bioServer = new BioServer(); bioServer.export(port, interfaceClass, interfaceImpl); } catch (Exception e) { log.error(" zookeeper server start failed:{}" , e.toString()); } } }
消费者 定义了要请求的接口类,注册中心的调用类
定义了请求,获取节点,负载等方法。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 public class RpcConsumer { private String azConnectString; private Class<?> interfaceClass; private RpcZKRegistryService rpcZKRegistryService; public RpcConsumer setZKConnectString (String zkConnectString) { this .rpcZKRegistryService = new RpcZKRegistryService(zkConnectString); return this ; } public RpcConsumer setInterface (Class<?> interfaceClass) { this .interfaceClass = interfaceClass; return this ; } public <T> T get () { List<ProviderInfo> providers = getProviders(); ProviderInfo provider = chooseTarget(providers); BioClient bioClient = new BioClient(); bioClient.setAddress(provider.getAddress()); bioClient.setPort(provider.getPort()); RpcInvocationHandler handler = new RpcInvocationHandler(bioClient); return (T) Proxy.newProxyInstance(interfaceClass.getClassLoader(), new Class[]{interfaceClass}, handler); } private List<ProviderInfo> getProviders () { rpcZKRegistryService.subscribe(interfaceClass.getName()); Map<String, ProviderInfo> providers = rpcZKRegistryService.getRemoteProviders(); return new ArrayList<>(providers.values()); } private static ProviderInfo chooseTarget (List<ProviderInfo> providerInfos) { if (providerInfos == null || providerInfos.isEmpty()) { throw new RuntimeException("providers is empty" ); } int index = new Random().nextInt(providerInfos.size()); return providerInfos.get(index); } }
代理类 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 public class RpcInvocationHandler implements InvocationHandler { private BioClient bioClient; public RpcInvocationHandler (BioClient bioClient) { this .bioClient = bioClient; } @Override public Object invoke (Object proxy, Method method, Object[] args) throws Throwable { RpcRequest rpcRequest = new RpcRequest(); rpcRequest.setMethodName(method.getName()) .setParameterTypes(method.getParameterTypes()) .setParameters(args); RpcResponse rpcResponse = bioClient.send(rpcRequest); if (RpcResponse.SUCCEED == rpcResponse.getStatus()) { return rpcResponse.getData(); } throw new RuntimeException(rpcResponse.getMessage()); } }
注册中心 每个节点的信息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 public class ProviderInfo { private String id; private String address; private int port; public String getId () { return id; } public ProviderInfo setId (String id) { this .id = id; return this ; } public String getAddress () { return address; } public ProviderInfo setAddress (String address) { this .address = address; return this ; } public int getPort () { return port; } public ProviderInfo setPort (int port) { this .port = port; return this ; } public String toJsonString () { return JSON.toJSONString(this ); } @Override public String toString () { return "ProviderInfo{" + "id='" + id + '\'' + ", address='" + address + '\'' + ", port=" + port + '}' ; } }
java调用zk的类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 public class RpcZKRegistryService { private static final Logger log = LoggerFactory.getLogger(RpcZKRegistryService.class ) ; private static final String NAME_SPACE = "zk-rpc" ; private static final String RPC_PROVIDER_NODE = "/provider" ; private final Map<String, ProviderInfo> remoteProviders = new HashMap<>(); private CuratorFramework zkClient; public RpcZKRegistryService (String zkConnectString) { RetryPolicy retryPolicy = new RetryNTimes(3 , 5000 ); this .zkClient = CuratorFrameworkFactory.builder() .connectString(zkConnectString) .sessionTimeoutMs(10000 ) .retryPolicy(retryPolicy) .namespace(NAME_SPACE) .build(); this .zkClient.start(); } public void register (ProviderInfo providerInfo) { String nodePath = RPC_PROVIDER_NODE + "/" + providerInfo.getId(); try { Stat stat = zkClient.checkExists().forPath(nodePath); if (stat == null ) { zkClient.create() .creatingParentsIfNeeded() .withMode(CreateMode.EPHEMERAL_SEQUENTIAL) .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE) .forPath(nodePath, providerInfo.toJsonString().getBytes()); } else { log.error(" thr provider already exists,{} " , providerInfo); } } catch (Exception e) { log.error("zookeeper register provider failed,{}" , e.toString()); } } public void subscribe (String id) { try { List<String> providerIds = zkClient.getChildren().forPath(RPC_PROVIDER_NODE); for (String providerId : providerIds) { if (providerId.contains(id)) { String nodePath = RPC_PROVIDER_NODE + "/" + providerId; byte [] data = zkClient.getData().forPath(nodePath); ProviderInfo providerInfo = JSON.parseObject(data, ProviderInfo.class ) ; this .remoteProviders.put(providerId, providerInfo); } } addProviderWatch(id); } catch (Exception e) { e.printStackTrace(); } } public void addProviderWatch (String id) { try { final PathChildrenCache childrenCache = new PathChildrenCache(this .zkClient, RPC_PROVIDER_NODE, true ); childrenCache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE); childrenCache.getListenable().addListener((client, event) -> { String nodePath = event.getData().getPath(); if (nodePath.contains(id)) { if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_REMOVED)) { this .remoteProviders.remove(nodePath); } else if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_ADDED)) { byte [] data = event.getData().getData(); ProviderInfo providerInfo = JSON.parseObject(data, ProviderInfo.class ) ; this .remoteProviders.put(nodePath, providerInfo); } } }); } catch (Exception e) { e.printStackTrace(); } } public Map<String, ProviderInfo> getRemoteProviders () { return remoteProviders; } }
生产者 实现接口 1 2 3 4 5 6 public class ZkProviderImpl implements ISayHello { @Override public String sayHello (String name) { return "hello " + name + "\n i am zookeeper provider" ; } }
调用类 通过传入调用的类,zk的地址,实现的类进行注册中心注册。然后启动连接。
1 2 3 4 5 6 7 8 9 10 11 12 public class ZKProvider { public static void main (String[] args) { ZkProviderImpl zkProviderImpl = new ZkProviderImpl(); RpcProvider<ISayHello> provider = new RpcProvider<>(); provider.setInterfaceClass(ISayHello.class ) .setRpcZKRegistryService("localhost:2181") .setInterfaceImpl(zkProviderImpl); provider.export(9090 ); } }
消费者 调用类 传入zk的地址,接口类。然后调用方法即可。
1 2 3 4 5 6 7 8 9 10 11 12 public class ZKConsumer { public static void main (String[] args) { RpcConsumer rpcConsumer = new RpcConsumer(); rpcConsumer.setZKConnectString("localhost:2181" ); rpcConsumer.setInterface(ISayHello.class ) ; ISayHello iSayHello = rpcConsumer.get(); System.out.println(iSayHello.sayHello("zookeeper" )); } }