[集群] 增加通道刷新状态消息

master
648540858 2025-01-21 17:04:51 +08:00
parent 01f491c026
commit 4e66e2bc6d
5 changed files with 38 additions and 2 deletions

View File

@ -154,7 +154,6 @@ public class RedisRpcConfig implements MessageListener {
}catch (InvocationTargetException | IllegalAccessException e) {
log.error("[redis-rpc ] 处理请求失败 ", e);
}
}
private void sendResponse(RedisRpcResponse response){
@ -172,7 +171,6 @@ public class RedisRpcConfig implements MessageListener {
redisTemplate.convertAndSend(REDIS_REQUEST_CHANNEL_KEY, message);
}
private final Map<Long, SynchronousQueue<RedisRpcResponse>> topicSubscribers = new ConcurrentHashMap<>();
private final Map<Long, CommonCallback<RedisRpcResponse>> callbacks = new ConcurrentHashMap<>();

View File

@ -346,6 +346,13 @@ public class DeviceServiceImpl implements IDeviceService {
@Override
public SyncStatus getChannelSyncStatus(String deviceId) {
Device device = deviceMapper.getDeviceByDeviceId(deviceId);
if (device == null) {
throw new ControllerException(ErrorCode.ERROR404.getCode(), "设备不存在");
}
if (!userSetting.getServerId().equals(device.getServerId())) {
return redisRpcService.getChannelSyncStatus(device.getServerId(), deviceId);
}
return catalogResponseMessageHandler.getChannelSyncProgress(deviceId);
}

View File

@ -36,4 +36,6 @@ public interface IRedisRpcService {
void catalogEventPublish(String serverId, CatalogEvent catalogEvent);
WVPResult<SyncStatus> devicesSync(String serverId, String deviceId);
SyncStatus getChannelSyncStatus(String serverId, String deviceId);
}

View File

@ -66,4 +66,25 @@ public class RedisRpcDeviceController extends RpcController {
return response;
}
/**
*
*/
@RedisRpcMapping("getChannelSyncStatus")
public RedisRpcResponse getChannelSyncStatus(RedisRpcRequest request) {
String deviceId = request.getParam().toString();
Device device = deviceService.getDeviceByDeviceId(deviceId);
RedisRpcResponse response = request.getResponse();
if (device == null || !userSetting.getServerId().equals(device.getServerId())) {
response.setStatusCode(ErrorCode.ERROR400.getCode());
response.setBody("param error");
return response;
}
SyncStatus channelSyncStatus = deviceService.getChannelSyncStatus(deviceId);
response.setStatusCode(ErrorCode.SUCCESS.getCode());
response.setBody(JSONObject.toJSONString(channelSyncStatus));
return response;
}
}

View File

@ -255,4 +255,12 @@ public class RedisRpcServiceImpl implements IRedisRpcService {
RedisRpcResponse response = redisRpcConfig.request(request, 100, TimeUnit.MILLISECONDS);
return JSON.parseObject(response.getBody().toString(), WVPResult.class);
}
@Override
public SyncStatus getChannelSyncStatus(String serverId, String deviceId) {
RedisRpcRequest request = buildRequest("device/getChannelSyncStatus", deviceId);
request.setToId(serverId);
RedisRpcResponse response = redisRpcConfig.request(request, 100, TimeUnit.MILLISECONDS);
return JSON.parseObject(response.getBody().toString(), SyncStatus.class);
}
}