[集群] 增加设备基本配置和设备配置查询

master
lin 2025-02-08 10:45:47 +08:00
parent 0cde02c397
commit 8ca982485c
6 changed files with 185 additions and 47 deletions

View File

@ -22,6 +22,7 @@ import io.swagger.v3.oas.annotations.security.SecurityRequirement;
import io.swagger.v3.oas.annotations.tags.Tag; import io.swagger.v3.oas.annotations.tags.Tag;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.util.Assert;
import org.springframework.util.ObjectUtils; import org.springframework.util.ObjectUtils;
import org.springframework.web.bind.annotation.*; import org.springframework.web.bind.annotation.*;
import org.springframework.web.context.request.async.DeferredResult; import org.springframework.web.context.request.async.DeferredResult;
@ -74,36 +75,19 @@ public class DeviceConfig {
log.debug("报警复位API调用"); log.debug("报警复位API调用");
} }
Device device = deviceService.getDeviceByDeviceId(deviceId); Device device = deviceService.getDeviceByDeviceId(deviceId);
String uuid = UUID.randomUUID().toString(); Assert.notNull(device, "设备不存在");
String key = DeferredResultHolder.CALLBACK_CMD_DEVICECONFIG + deviceId + channelId; DeferredResult<String> result = deviceService.deviceBasicConfig(device, channelId, name, expiration, heartBeatInterval, heartBeatCount);
try {
cmder.deviceBasicConfigCmd(device, channelId, name, expiration, heartBeatInterval, heartBeatCount, event -> {
RequestMessage msg = new RequestMessage();
msg.setId(uuid);
msg.setKey(key);
msg.setData(String.format("设备配置操作失败,错误码: %s, %s", event.statusCode, event.msg));
resultHolder.invokeResult(msg);
});
} catch (InvalidArgumentException | SipException | ParseException e) {
log.error("[命令发送失败] 设备配置: {}", e.getMessage());
throw new ControllerException(ErrorCode.ERROR100.getCode(), "命令发送失败: " + e.getMessage());
}
DeferredResult<String> result = new DeferredResult<String>(3 * 1000L);
result.onTimeout(() -> { result.onTimeout(() -> {
log.warn(String.format("设备配置操作超时, 设备未返回应答指令")); log.warn(String.format("设备配置操作超时, 设备未返回应答指令"));
// 释放rtpserver
RequestMessage msg = new RequestMessage();
msg.setId(uuid);
msg.setKey(key);
JSONObject json = new JSONObject(); JSONObject json = new JSONObject();
json.put("DeviceID", deviceId); json.put("DeviceID", device.getDeviceId());
json.put("Status", "Timeout"); json.put("Status", "Timeout");
json.put("Description", "设备配置操作超时, 设备未返回应答指令"); json.put("Description", "设备配置操作超时, 设备未返回应答指令");
msg.setData(json); //("看守位控制操作超时, 设备未返回应答指令"); result.setResult(json.toString());
resultHolder.invokeResult(msg);
}); });
resultHolder.put(key, uuid, result);
return result; return result;
} }
/** /**
@ -124,32 +108,19 @@ public class DeviceConfig {
if (log.isDebugEnabled()) { if (log.isDebugEnabled()) {
log.debug("设备状态查询API调用"); log.debug("设备状态查询API调用");
} }
String key = DeferredResultHolder.CALLBACK_CMD_CONFIGDOWNLOAD + (ObjectUtils.isEmpty(channelId) ? deviceId : channelId);
String uuid = UUID.randomUUID().toString();
Device device = deviceService.getDeviceByDeviceId(deviceId); Device device = deviceService.getDeviceByDeviceId(deviceId);
try { Assert.notNull(device, "设备不存在");
cmder.deviceConfigQuery(device, channelId, configType, event -> {
RequestMessage msg = new RequestMessage(); DeferredResult<String> result = deviceService.deviceConfigQuery(device, channelId, configType);
msg.setId(uuid);
msg.setKey(key);
msg.setData(String.format("获取设备配置失败,错误码: %s, %s", event.statusCode, event.msg));
resultHolder.invokeResult(msg);
});
} catch (InvalidArgumentException | SipException | ParseException e) {
log.error("[命令发送失败] 获取设备配置: {}", e.getMessage());
throw new ControllerException(ErrorCode.ERROR100.getCode(), "命令发送失败: " + e.getMessage());
}
DeferredResult<String> result = new DeferredResult<String > (3 * 1000L);
result.onTimeout(() -> { result.onTimeout(() -> {
log.warn(String.format("获取设备配置超时")); log.warn("获取设备配置超时");
// 释放rtpserver JSONObject json = new JSONObject();
RequestMessage msg = new RequestMessage(); json.put("DeviceID", device.getDeviceId());
msg.setId(uuid); json.put("Status", "Timeout");
msg.setKey(key); json.put("Description", "操作超时");
msg.setData("Timeout. Device did not response to this command."); result.setResult(json.toString());
resultHolder.invokeResult(msg);
}); });
resultHolder.put(key, uuid, result);
return result; return result;
} }

View File

@ -7,6 +7,7 @@ import com.genersoft.iot.vmp.gb28181.bean.SyncStatus;
import com.genersoft.iot.vmp.vmanager.bean.ResourceBaseInfo; import com.genersoft.iot.vmp.vmanager.bean.ResourceBaseInfo;
import com.genersoft.iot.vmp.vmanager.bean.WVPResult; import com.genersoft.iot.vmp.vmanager.bean.WVPResult;
import com.github.pagehelper.PageInfo; import com.github.pagehelper.PageInfo;
import org.springframework.web.context.request.async.DeferredResult;
import java.util.List; import java.util.List;
@ -168,4 +169,8 @@ public interface IDeviceService {
void subscribeMobilePosition(int id, int cycle, int interval); void subscribeMobilePosition(int id, int cycle, int interval);
WVPResult<SyncStatus> devicesSync(Device device); WVPResult<SyncStatus> devicesSync(Device device);
DeferredResult<String> deviceBasicConfig(Device device, String channelId, String name, String expiration, String heartBeatInterval, String heartBeatCount);
DeferredResult<String> deviceConfigQuery(Device device, String channelId, String configType);
} }

View File

@ -1,6 +1,7 @@
package com.genersoft.iot.vmp.gb28181.service.impl; package com.genersoft.iot.vmp.gb28181.service.impl;
import com.alibaba.fastjson2.JSON; import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONObject;
import com.genersoft.iot.vmp.common.CommonCallback; import com.genersoft.iot.vmp.common.CommonCallback;
import com.genersoft.iot.vmp.common.VideoManagerConstants; import com.genersoft.iot.vmp.common.VideoManagerConstants;
import com.genersoft.iot.vmp.common.enums.ChannelDataType; import com.genersoft.iot.vmp.common.enums.ChannelDataType;
@ -18,6 +19,8 @@ import com.genersoft.iot.vmp.gb28181.session.SipInviteSessionManager;
import com.genersoft.iot.vmp.gb28181.task.ISubscribeTask; import com.genersoft.iot.vmp.gb28181.task.ISubscribeTask;
import com.genersoft.iot.vmp.gb28181.task.impl.CatalogSubscribeTask; import com.genersoft.iot.vmp.gb28181.task.impl.CatalogSubscribeTask;
import com.genersoft.iot.vmp.gb28181.task.impl.MobilePositionSubscribeTask; import com.genersoft.iot.vmp.gb28181.task.impl.MobilePositionSubscribeTask;
import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder;
import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander; import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.response.cmd.CatalogResponseMessageHandler; import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.response.cmd.CatalogResponseMessageHandler;
import com.genersoft.iot.vmp.media.bean.MediaServer; import com.genersoft.iot.vmp.media.bean.MediaServer;
@ -36,12 +39,15 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional; import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.Assert; import org.springframework.util.Assert;
import org.springframework.util.ObjectUtils;
import org.springframework.web.context.request.async.DeferredResult;
import javax.sip.InvalidArgumentException; import javax.sip.InvalidArgumentException;
import javax.sip.SipException; import javax.sip.SipException;
import java.text.ParseException; import java.text.ParseException;
import java.time.Instant; import java.time.Instant;
import java.util.List; import java.util.List;
import java.util.UUID;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
/** /**
@ -632,4 +638,49 @@ public class DeviceServiceImpl implements IDeviceService {
wvpResult.setMsg("开始同步"); wvpResult.setMsg("开始同步");
return wvpResult; return wvpResult;
} }
@Override
public DeferredResult<String> deviceBasicConfig(Device device, String channelId, String name, String expiration,
String heartBeatInterval, String heartBeatCount) {
if (!userSetting.getServerId().equals(device.getServerId())) {
String result = redisRpcService.deviceBasicConfig(device.getServerId(), device, channelId, name, expiration,
heartBeatInterval, heartBeatCount);
DeferredResult<String> deferredResult = new DeferredResult<String>(3 * 1000L);
deferredResult.setResult(result);
return deferredResult;
}
DeferredResult<String> result = new DeferredResult<String>(3 * 1000L);
try {
sipCommander.deviceBasicConfigCmd(device, channelId, name, expiration, heartBeatInterval, heartBeatCount, event -> {
result.setResult(String.format("设备配置操作失败,错误码: %s, %s", event.statusCode, event.msg));
});
} catch (InvalidArgumentException | SipException | ParseException e) {
log.error("[命令发送失败] 设备配置: {}", e.getMessage());
throw new ControllerException(ErrorCode.ERROR100.getCode(), "命令发送失败: " + e.getMessage());
}
return result;
}
@Override
public DeferredResult<String> deviceConfigQuery(Device device, String channelId, String configType) {
if (!userSetting.getServerId().equals(device.getServerId())) {
String result = redisRpcService.deviceConfigQuery(device.getServerId(), device, channelId, configType);
DeferredResult<String> deferredResult = new DeferredResult<String>(3 * 1000L);
deferredResult.setResult(result);
return deferredResult;
}
DeferredResult<String> result = new DeferredResult<String>(3 * 1000L);
try {
sipCommander.deviceConfigQuery(device, channelId, configType, event -> {
result.setResult(String.format("获取设备配置失败,错误码: %s, %s", event.statusCode, event.msg));
});
} catch (InvalidArgumentException | SipException | ParseException e) {
log.error("[命令发送失败] 获取设备配置: {}", e.getMessage());
throw new ControllerException(ErrorCode.ERROR100.getCode(), "命令发送失败: " + e.getMessage());
}
return result;
}
} }

View File

@ -2,6 +2,7 @@ package com.genersoft.iot.vmp.service.redisMsg;
import com.genersoft.iot.vmp.common.CommonCallback; import com.genersoft.iot.vmp.common.CommonCallback;
import com.genersoft.iot.vmp.common.StreamInfo; import com.genersoft.iot.vmp.common.StreamInfo;
import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.bean.Platform; import com.genersoft.iot.vmp.gb28181.bean.Platform;
import com.genersoft.iot.vmp.gb28181.bean.SendRtpInfo; import com.genersoft.iot.vmp.gb28181.bean.SendRtpInfo;
import com.genersoft.iot.vmp.gb28181.bean.SyncStatus; import com.genersoft.iot.vmp.gb28181.bean.SyncStatus;
@ -38,4 +39,8 @@ public interface IRedisRpcService {
WVPResult<SyncStatus> devicesSync(String serverId, String deviceId); WVPResult<SyncStatus> devicesSync(String serverId, String deviceId);
SyncStatus getChannelSyncStatus(String serverId, String deviceId); SyncStatus getChannelSyncStatus(String serverId, String deviceId);
String deviceBasicConfig(String serverId, Device device, String channelId, String name, String expiration, String heartBeatInterval, String heartBeatCount);
String deviceConfigQuery(String serverId, Device device, String channelId, String configType);
} }

View File

@ -20,6 +20,7 @@ import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import org.springframework.web.context.request.async.DeferredResult;
@Component @Component
@Slf4j @Slf4j
@ -88,6 +89,82 @@ public class RedisRpcDeviceController extends RpcController {
return response; return response;
} }
@RedisRpcMapping("deviceBasicConfig")
public RedisRpcResponse deviceBasicConfig(RedisRpcRequest request) {
JSONObject paramJson = JSONObject.parseObject(request.getParam().toString());
String deviceId = paramJson.getString("deviceId");
String channelId = paramJson.getString("channelId");
String name = paramJson.getString("configType");
String expiration = paramJson.getString("expiration");
String heartBeatInterval = paramJson.getString("heartBeatInterval");
Device device = deviceService.getDeviceByDeviceId(deviceId);
RedisRpcResponse response = request.getResponse();
if (device == null || !userSetting.getServerId().equals(device.getServerId())) {
response.setStatusCode(ErrorCode.ERROR400.getCode());
response.setBody("param error");
return response;
}
DeferredResult<String> deferredResult = deviceService.deviceBasicConfig(device, channelId, name, expiration, heartBeatInterval, heartBeatInterval);
deferredResult.onCompletion(() ->{
String resultStr = (String)deferredResult.getResult();
response.setStatusCode(ErrorCode.SUCCESS.getCode());
response.setBody(resultStr);
// 手动发送结果
sendResponse(response);
});
deferredResult.onTimeout(() -> {
log.warn(String.format("设备配置操作超时, 设备未返回应答指令"));
JSONObject json = new JSONObject();
json.put("DeviceID", device.getDeviceId());
json.put("Status", "Timeout");
json.put("Description", "设备配置操作超时, 设备未返回应答指令");
response.setStatusCode(ErrorCode.SUCCESS.getCode());
response.setBody(json);
// 手动发送结果
sendResponse(response);
});
return response;
}
@RedisRpcMapping("deviceConfigQuery")
public RedisRpcResponse deviceConfigQuery(RedisRpcRequest request) {
JSONObject paramJson = JSONObject.parseObject(request.getParam().toString());
String deviceId = paramJson.getString("deviceId");
String channelId = paramJson.getString("channelId");
String configType = paramJson.getString("configType");
Device device = deviceService.getDeviceByDeviceId(deviceId);
RedisRpcResponse response = request.getResponse();
if (device == null || !userSetting.getServerId().equals(device.getServerId())) {
response.setStatusCode(ErrorCode.ERROR400.getCode());
response.setBody("param error");
return response;
}
DeferredResult<String> deferredResult = deviceService.deviceConfigQuery(device, channelId, configType);
deferredResult.onCompletion(() ->{
String resultStr = (String)deferredResult.getResult();
response.setStatusCode(ErrorCode.SUCCESS.getCode());
response.setBody(resultStr);
// 手动发送结果
sendResponse(response);
});
deferredResult.onTimeout(() -> {
log.warn(String.format("设备配置操作超时, 设备未返回应答指令"));
JSONObject json = new JSONObject();
json.put("DeviceID", device.getDeviceId());
json.put("Status", "Timeout");
json.put("Description", "设备配置操作超时, 设备未返回应答指令");
response.setStatusCode(ErrorCode.SUCCESS.getCode());
response.setBody(json);
// 手动发送结果
sendResponse(response);
});
return response;
}
} }

View File

@ -8,6 +8,7 @@ import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.conf.redis.RedisRpcConfig; import com.genersoft.iot.vmp.conf.redis.RedisRpcConfig;
import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcRequest; import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcRequest;
import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcResponse; import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcResponse;
import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.bean.Platform; import com.genersoft.iot.vmp.gb28181.bean.Platform;
import com.genersoft.iot.vmp.gb28181.bean.SendRtpInfo; import com.genersoft.iot.vmp.gb28181.bean.SendRtpInfo;
import com.genersoft.iot.vmp.gb28181.bean.SyncStatus; import com.genersoft.iot.vmp.gb28181.bean.SyncStatus;
@ -263,4 +264,32 @@ public class RedisRpcServiceImpl implements IRedisRpcService {
RedisRpcResponse response = redisRpcConfig.request(request, 100, TimeUnit.MILLISECONDS); RedisRpcResponse response = redisRpcConfig.request(request, 100, TimeUnit.MILLISECONDS);
return JSON.parseObject(response.getBody().toString(), SyncStatus.class); return JSON.parseObject(response.getBody().toString(), SyncStatus.class);
} }
@Override
public String deviceBasicConfig(String serverId, Device device, String channelId, String name, String expiration,
String heartBeatInterval, String heartBeatCount) {
JSONObject jsonObject = new JSONObject();
jsonObject.put("device", device.getDeviceId());
jsonObject.put("channelId", channelId);
jsonObject.put("name", name);
jsonObject.put("expiration", expiration);
jsonObject.put("heartBeatInterval", heartBeatInterval);
jsonObject.put("heartBeatCount", heartBeatCount);
RedisRpcRequest request = buildRequest("device/deviceBasicConfig", jsonObject);
request.setToId(serverId);
RedisRpcResponse response = redisRpcConfig.request(request, 50, TimeUnit.MILLISECONDS);
return response.getBody().toString();
}
@Override
public String deviceConfigQuery(String serverId, Device device, String channelId, String configType) {
JSONObject jsonObject = new JSONObject();
jsonObject.put("device", device.getDeviceId());
jsonObject.put("channelId", channelId);
jsonObject.put("configType", configType);
RedisRpcRequest request = buildRequest("device/deviceConfigQuery", jsonObject);
request.setToId(serverId);
RedisRpcResponse response = redisRpcConfig.request(request, 50, TimeUnit.MILLISECONDS);
return response.getBody().toString();
}
} }