1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
|
public class ZookeeperRegistry implements RpcRegister {
/**
* 注册的名称
*/
private static final String NAME_SPACE = "zk-rpc";
/**
* 节点信息
*/
private static final String RPC_PROVIDER_NODE = "/provider";
/**
* 保存多个生产者信息,作为缓存容器
*/
private final Map<ServiceDescriptor, List<ResponseServiceDescription>> remoteProviders = new ConcurrentHashMap<>();
/**
* 客户端
*/
private CuratorFramework zkClient;
/**
* 编解码,将节点信息编码后存到节点中
*/
private Encoder encoder;
private Decoder decoder;
public ZookeeperRegistry() {
this("localhost:2181");
}
public ZookeeperRegistry(String zkConnectString) {
// 设置重试次数和两次重试间隔时间
RetryPolicy retryPolicy = new RetryNTimes(3, 5000);
//获取客户端
this.zkClient = CuratorFrameworkFactory.builder()
.connectString(zkConnectString)
.sessionTimeoutMs(10000)
.retryPolicy(retryPolicy)
.namespace(NAME_SPACE)
.build();
this.encoder = new FastJsonEncoder();
this.decoder = new FastJsonDecoder();
this.zkClient.start();
}
/**
* 注册服务
* @param serviceDescriptor 请求服务信息
* @param responseServiceDescription 响应信息,包括实现类和实例地址
*/
@Override
public void register(ServiceDescriptor serviceDescriptor, ResponseServiceDescription responseServiceDescription) {
String nodePath = RPC_PROVIDER_NODE + "/" + serviceDescriptor.toString();
try {
// 判断节点是否存在,如果不存在则创建
Stat stat = zkClient.checkExists().forPath(nodePath);
if (stat == null) {
zkClient.create()
.creatingParentsIfNeeded()
.withMode(CreateMode.EPHEMERAL_SEQUENTIAL)
.withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
//创建节点,并且将信息写入节点中
.forPath(nodePath, encoder.encode(responseServiceDescription));
} else {
//这里对于多个实例的情况没有处理
System.out.println("the provider already exist," + serviceDescriptor.toString());
}
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 订阅服务
*/
public void subscribe(ServiceDescriptor serviceDescriptor) {
try {
List<String> providerIds = zkClient.getChildren().forPath(RPC_PROVIDER_NODE);
for (String providerId : providerIds) {
//如果与订阅服务相同,则获取节点信息
if (providerId.contains(serviceDescriptor.toString())) {
String nodePath = RPC_PROVIDER_NODE + "/" + providerId;
byte[] data = zkClient.getData().forPath(nodePath);
ResponseServiceDescription providerInfo = decoder.decode(data, ResponseServiceDescription.class);
//获取到服务信息后,将它放到缓存中
if (remoteProviders.containsKey(serviceDescriptor)) {
remoteProviders.get(serviceDescriptor).add(providerInfo);
} else {
List<ResponseServiceDescription> list = new ArrayList<>();
list.add(providerInfo);
remoteProviders.put(serviceDescriptor, list);
}
}
}
//添加监听事件
addProviderWatch(serviceDescriptor);
} catch (Exception e) {
e.printStackTrace();
}
}
public void addProviderWatch(ServiceDescriptor serviceDescriptor) {
try {
//创建子节点缓存
final PathChildrenCache childrenCache = new PathChildrenCache(this.zkClient, RPC_PROVIDER_NODE, true);
childrenCache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE);
//添加子节点监听事件
childrenCache.getListenable().addListener((client, event) -> {
String nodePath = event.getData().getPath();
if (nodePath.contains(serviceDescriptor.toString())) {
if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_REMOVED)) {
//节点移除
this.remoteProviders.remove(nodePath);
} else if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_ADDED)) {
byte[] data = event.getData().getData();
ResponseServiceDescription providerInfo = decoder.decode(data, ResponseServiceDescription.class);
//添加节点
if (remoteProviders.containsKey(serviceDescriptor)) {
remoteProviders.get(serviceDescriptor).add(providerInfo);
} else {
List<ResponseServiceDescription> list = new ArrayList<>();
list.add(providerInfo);
remoteProviders.put(serviceDescriptor, list);
}
}
}
});
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 查找服务,先去缓存容器中查询,如果没有调用订阅的方法,
* 订阅后会将信息放到容器中。最后都从容器中返回。
*/
@Override
public ResponseServiceDescription lookup(ServiceDescriptor serviceDescriptor) {
if (!remoteProviders.containsKey(serviceDescriptor)) {
subscribe(serviceDescriptor);
}
List<ResponseServiceDescription> list = remoteProviders.get(serviceDescriptor);
return list.get(new Random().nextInt(list.size()));
}
}
|