rpc-register
注册中心,这里使用zookeeper
来实现。
生产者在启动服务时,将自己实现的服务注册到注册中心。
消费者调用服务时,来注册中心查找,返回调用服务实例的地址信息。
并且为了适应不同的注册实现,我们将功能定义为接口,在替换实现时在配置文件中进行替换即可。
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| public interface RpcRegister {
void register(ServiceDescriptor serviceDescriptor, ResponseServiceDescription responseServiceDescription);
ResponseServiceDescription lookup(ServiceDescriptor serviceDescriptor); }
|
zookeeper实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143
| public class ZookeeperRegistry implements RpcRegister {
private static final String NAME_SPACE = "zk-rpc";
private static final String RPC_PROVIDER_NODE = "/provider";
private final Map<ServiceDescriptor, List<ResponseServiceDescription>> remoteProviders = new ConcurrentHashMap<>();
private CuratorFramework zkClient;
private Encoder encoder; private Decoder decoder;
public ZookeeperRegistry() { this("localhost:2181"); }
public ZookeeperRegistry(String zkConnectString) { RetryPolicy retryPolicy = new RetryNTimes(3, 5000); this.zkClient = CuratorFrameworkFactory.builder() .connectString(zkConnectString) .sessionTimeoutMs(10000) .retryPolicy(retryPolicy) .namespace(NAME_SPACE) .build(); this.encoder = new FastJsonEncoder(); this.decoder = new FastJsonDecoder(); this.zkClient.start(); }
@Override public void register(ServiceDescriptor serviceDescriptor, ResponseServiceDescription responseServiceDescription) { String nodePath = RPC_PROVIDER_NODE + "/" + serviceDescriptor.toString(); try { Stat stat = zkClient.checkExists().forPath(nodePath); if (stat == null) { zkClient.create() .creatingParentsIfNeeded() .withMode(CreateMode.EPHEMERAL_SEQUENTIAL) .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE) .forPath(nodePath, encoder.encode(responseServiceDescription)); } else { System.out.println("the provider already exist," + serviceDescriptor.toString()); } } catch (Exception e) { e.printStackTrace(); } }
public void subscribe(ServiceDescriptor serviceDescriptor) { try { List<String> providerIds = zkClient.getChildren().forPath(RPC_PROVIDER_NODE); for (String providerId : providerIds) { if (providerId.contains(serviceDescriptor.toString())) { String nodePath = RPC_PROVIDER_NODE + "/" + providerId; byte[] data = zkClient.getData().forPath(nodePath); ResponseServiceDescription providerInfo = decoder.decode(data, ResponseServiceDescription.class); if (remoteProviders.containsKey(serviceDescriptor)) { remoteProviders.get(serviceDescriptor).add(providerInfo); } else { List<ResponseServiceDescription> list = new ArrayList<>(); list.add(providerInfo); remoteProviders.put(serviceDescriptor, list); } } } addProviderWatch(serviceDescriptor); } catch (Exception e) { e.printStackTrace(); } }
public void addProviderWatch(ServiceDescriptor serviceDescriptor) { try { final PathChildrenCache childrenCache = new PathChildrenCache(this.zkClient, RPC_PROVIDER_NODE, true); childrenCache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE); childrenCache.getListenable().addListener((client, event) -> { String nodePath = event.getData().getPath(); if (nodePath.contains(serviceDescriptor.toString())) { if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_REMOVED)) { this.remoteProviders.remove(nodePath); } else if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_ADDED)) { byte[] data = event.getData().getData(); ResponseServiceDescription providerInfo = decoder.decode(data, ResponseServiceDescription.class); if (remoteProviders.containsKey(serviceDescriptor)) { remoteProviders.get(serviceDescriptor).add(providerInfo); } else { List<ResponseServiceDescription> list = new ArrayList<>(); list.add(providerInfo); remoteProviders.put(serviceDescriptor, list); } } } }); } catch (Exception e) { e.printStackTrace(); } }
@Override public ResponseServiceDescription lookup(ServiceDescriptor serviceDescriptor) { if (!remoteProviders.containsKey(serviceDescriptor)) { subscribe(serviceDescriptor); } List<ResponseServiceDescription> list = remoteProviders.get(serviceDescriptor); return list.get(new Random().nextInt(list.size())); } }
|
这里有一个问题是如果有多个实现类,我这里只是随机返回一个,这种请求在spring中也需要进行手动声明,
所以暂时没有想到什么好的解决方法。
在注册中心维护了一个容器作为客户端调用的缓存。并且对节点进行监听,如果有变动会更改容器的内容。
rpc-codec
编解码模块,将对象转换成字节码从而进行网络传输。
将字节码进行解析成对象,从而进行业务处理。
这里使用了阿里的Fastjson
来进行实现。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| public interface Decoder {
<T> T decode(byte[] bytes, Class<T> clazz); }
public interface Encoder {
byte[] encode(Object obj); }
|
而实现对象也直接调用fastjson的方法即可。
1 2 3 4 5 6 7 8 9 10 11 12 13
| public class FastJsonDecoder implements Decoder { @Override public <T> T decode(byte[] bytes, Class<T> calzz) { return JSON.parseObject(bytes, calzz); } }
public class FastJsonEncoder implements Encoder { @Override public byte[] encode(Object obj) { return JSON.toJSONBytes(obj); } }
|
其他部分链接