概要
分析的版本为nacos 2.3.0,nacos 2.x版本使用grpc通信,先看主要的proto文件
GRPC通信
获取配置
先从代码入口入手,nacos源码中有一个Example(com.alibaba.nacos.example.ConfigExample)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
| String serverAddr = "localhost";
String dataId = "test";
String group = "DEFAULT_GROUP";
Properties properties = new Properties();
properties.put("serverAddr", serverAddr);
// 创建一个NacosConfigService
ConfigService configService = NacosFactory.createConfigService(properties);
// 获取配置
String content = configService.getConfig(dataId, group, 5000);
System.out.println("[config content] " + content);
// 注册回调
configService.addListener(dataId, group, new Listener() {
@Override
public void receiveConfigInfo(String configInfo) {
System.out.println("receive:" + configInfo);
}
@Override
public Executor getExecutor() {
return null;
}
});
|
进入getConfig方法,看看是如何获取配置的
1
2
3
4
5
6
7
8
9
10
| private String getConfigInner(String tenant, String dataId, String group, long timeoutMs) throws NacosException {
// ...
// 从本地获取配置,这个是用于灾备场景的
String content = LocalConfigInfoProcessor.getFailover(worker.getAgentName(), dataId, group, tenant);
// ...
// 从服务端获取配置
ConfigResponse response = worker.getServerConfig(dataId, group, tenant, timeoutMs, false);
cr.setContent(response.getContent());
}
|
调用的是ClientWorker.getServerConfig() -> ConfigRpcTransportClient.queryConfig()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
| public ConfigResponse queryConfig(String dataId, String group, String tenant, long readTimeouts, boolean notify) throws NacosException {
// 创建一个ConfigQueryRequest的请求
ConfigQueryRequest request = ConfigQueryRequest.build(dataId, group, tenant);
request.putHeader(NOTIFY_HEADER, String.valueOf(notify));
RpcClient rpcClient = getOneRunningClient();
...
// 向服务端请求
ConfigQueryResponse response = (ConfigQueryResponse) requestProxy(rpcClient, request, readTimeouts);
ConfigResponse configResponse = new ConfigResponse();
if (response.isSuccess()) {
// 写成文件
LocalConfigInfoProcessor.saveSnapshot(this.getName(), dataId, group, tenant, response.getContent());
configResponse.setContent(response.getContent());
...
}
|
跟踪代码,ConfigRpcTransportClient.requestProxy() -> rpcClientInner.request() -> GrpcConnection#request()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
| public Response request(Request request, long timeouts) throws NacosException {
// 序列化请求
Payload grpcRequest = GrpcUtils.convert(request);
// 调用PB方法
ListenableFuture<Payload> requestFuture = grpcFutureServiceStub.request(grpcRequest);
Payload grpcResponse;
try {
if (timeouts <= 0) {
grpcResponse = requestFuture.get();
} else {
grpcResponse = requestFuture.get(timeouts, TimeUnit.MILLISECONDS);
}
} catch (Exception e) {
throw new NacosException(NacosException.SERVER_ERROR, e);
}
// 反序列化结果
return (Response) GrpcUtils.parse(grpcResponse);
}
|
这里涉及到grpc的调用,从proto文件(src/main/proto/nacos_grpc_service.proto)看,主要有两个接口
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
| message Payload {
Metadata metadata = 2;
google.protobuf.Any body = 3; // 主要的Request对象序列化成这个
}
service Request {
// 注册、获取配置
rpc request (Payload) returns (Payload) {
}
}
service BiRequestStream {
// stream接口,双向通信,用于接收服务端推送的更新
rpc requestBiStream (stream Payload) returns (stream Payload) {
}
}
|
客户端的请求是ConfigQueryRequest,对应服务端的代码在com.alibaba.nacos.config.server.remote.ConfigQueryRequestHandler
// 待填充
// 服务端这里主要是从磁盘里(直接文件获取或者通过rocksDB)获取内容
监听配置
监听采用的回调的思想,当服务端通知的时候,调用回调方法。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
| public void addListener(String dataId, String group, Listener listener) throws NacosException {
worker.addTenantListeners(dataId, group, Collections.singletonList(listener));
}
public void addTenantListeners(String dataId, String group, List<? extends Listener> listeners) throws NacosException {
group = blank2defaultGroup(group);
String tenant = agent.getTenant();
// 增加一个CacheData,会存放在ClientWorker.cacheMap里面
CacheData cache = addCacheDataIfAbsent(dataId, group, tenant);
synchronized (cache) {
for (Listener listener : listeners) {
// 加入到CopyOnWriteArrayList<ManagerListenerWrap> listeners里面
cache.addListener(listener);
}
cache.setDiscard(false);
cache.setConsistentWithServer(false);
// 通知
agent.notifyListenConfig();
}
}
private final BlockingQueue<Object> listenExecutebell = new ArrayBlockingQueue<>(1);
public void notifyListenConfig() {
// 放入一个BlockingQueue里面,有一个线程消费这个queue,执行executeConfigListen方法
listenExecutebell.offer(bellItem);
}
public void startInternal() {
executor.schedule(() -> {
while (!executor.isShutdown() && !executor.isTerminated()) {
try {
listenExecutebell.poll(5L, TimeUnit.SECONDS);
// 省略一部分代码...
executeConfigListen();
} catch (Throwable e) {
// 省略一部分代码...
notifyListenConfig();
}
}
}, 0L, TimeUnit.MILLISECONDS);
}
public void executeConfigListen() {
// 省略一部分代码...
for (CacheData cache : cacheMap.get().values()) {
// consistentWithServer 在新加入进来时是false,在收到服务端一致时才是true
if (cache.isConsistentWithServer()) {
cache.checkListenerMd5();
if (!needAllSync) {
continue;
}
}
// 省略一部分代码...
// 检查数据有没有变化
boolean hasChangedKeys = checkListenCache(listenCachesMap);
}
// 省略一部分代码...
}
|
和服务端对比MD5,检查配置是否有变更
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
| private boolean checkListenCache(Map<String, List<CacheData>> listenCachesMap) {
//...
for (Map.Entry<String, List<CacheData>> entry : listenCachesMap.entrySet()) {
// 按照taskId多线程执行
...
// 发起RPC请求,批量检查配置是否有变更
ConfigBatchListenRequest configChangeListenRequest = buildConfigRequest(listenCaches);
RpcClient rpcClient = ensureRpcClient(taskId);
ConfigChangeBatchListenResponse listenResponse = (ConfigChangeBatchListenResponse) requestProxy(rpcClient, configChangeListenRequest);
List<ConfigChangeBatchListenResponse.ConfigContext> changedConfigs = listenResponse.getChangedConfigs();
//handle changed keys,notify listener
if (!CollectionUtils.isEmpty(changedConfigs)) {
hasChangedKeys.set(true);
for (ConfigChangeBatchListenResponse.ConfigContext changeConfig : changedConfigs) {
String changeKey = GroupKey.getKeyTenant(changeConfig.getDataId(),
changeConfig.getGroup(), changeConfig.getTenant());
changeKeys.add(changeKey);
boolean isInitializing = cacheMap.get().get(changeKey).isInitializing();
// 获取数据,并且执行用户回调
refreshContentAndCheck(changeKey, !isInitializing);
}
}
}
//...
}
|
从服务端获取配置,回调用户方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
| private void refreshContentAndCheck(CacheData cacheData, boolean notify) {
try {
// 从服务端获取配置
ConfigResponse response = getServerConfig(cacheData.dataId, cacheData.group, cacheData.tenant, 3000L, notify);
cacheData.setEncryptedDataKey(response.getEncryptedDataKey());
cacheData.setContent(response.getContent());
if (null != response.getConfigType()) {
cacheData.setType(response.getConfigType());
}
// ...
// 和listener最后执行的md5比较,如果不一样就调用用户注册的回调方法
cacheData.checkListenerMd5();
} catch (Exception e) {
// ...
}
}
|
配置变更推送
再回来看看服务端怎么处理ConfigBatchListenRequest请求,服务端代码在ConfigChangeBatchListenRequestHandler
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
| public ConfigChangeBatchListenResponse handle(ConfigBatchListenRequest configChangeListenRequest, RequestMeta meta)
throws NacosException {
for (ConfigBatchListenRequest.ConfigListenContext listenContext : configChangeListenRequest.getConfigListenContexts()) {
if (configChangeListenRequest.isListen()) {
// 加入监听列表
configChangeListenContext.addListen(groupKey, md5, connectionId);
boolean isUptoDate = ConfigCacheService.isUptodate(groupKey, md5, meta.getClientIp(), tag);
if (!isUptoDate) {
configChangeBatchListenResponse.addChangeConfig(listenContext.getDataId(), listenContext.getGroup(), listenContext.getTenant());
}
} else {
// 从监听列表移除
configChangeListenContext.removeListen(groupKey, connectionId);
}
}
}
|
加入监控列表
1
2
3
4
5
6
7
8
9
10
| // com.alibaba.nacos.config.server.remote.ConfigChangeListenContext#addListen
// 维护一个 {connectionId:{groupKey:md5}} 的关系
private ConcurrentHashMap<String, HashMap<String, String>> connectionIdContext = new ConcurrentHashMap<>();
// 维护 groupKey-> connection 关系
private ConcurrentHashMap<String, HashSet<String>> groupKeyContext = new ConcurrentHashMap<>();
public synchronized void addListen(String groupKey, String md5, String connectionId) {
//...
}
|
RpcConfigChangeNotifier 是负责推送配置的类,在接收到LocalDataChangeEvent变更之后,触发推送
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
| public void configDataChanged(String groupKey, String dataId, String group, String tenant, boolean isBeta,
List<String> betaIps, String tag) {
// 获取所有和这个groupKey相关的连接
Set<String> listeners = configChangeListenContext.getListeners(groupKey);
if (CollectionUtils.isEmpty(listeners)) {
return;
}
int notifyClientCount = 0;
for (final String client : listeners) {
Connection connection = connectionManager.getConnection(client);
if (connection == null) {
continue;
}
ConnectionMeta metaInfo = connection.getMetaInfo();
String clientIp = metaInfo.getClientIp();
String clientTag = metaInfo.getTag();
//tag check
if (StringUtils.isNotBlank(tag) && !tag.equals(clientTag)) {
continue;
}
ConfigChangeNotifyRequest notifyRequest = ConfigChangeNotifyRequest.build(dataId, group, tenant);
// 构造RPC推送
RpcPushTask rpcPushRetryTask = new RpcPushTask(notifyRequest,
ConfigCommonConfig.getInstance().getMaxPushRetryTimes(), client, clientIp, metaInfo.getAppName());
push(rpcPushRetryTask, connectionManager);
notifyClientCount++;
}
Loggers.REMOTE_PUSH.info("push [{}] clients, groupKey=[{}]", notifyClientCount, groupKey);
}
|
这里的grpc推送,从RpcPushTask跟进去,调用rpcPushService.pushWithCallback,GrpcConnection#sendRequestNoAck方法里面通过streamObserver来发送。
这个streamObserver是在com.alibaba.nacos.core.remote.grpc.GrpcBiStreamRequestAcceptor#requestBiStream调用时,前面看到的grpc方法,客户端在建立双向流后,使用streamObserver给客户端推送请求。
参考文章
Nacos源码分析
程序猿阿越 Nacos源码分东西