diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/controller/DeviceConfig.java b/src/main/java/com/genersoft/iot/vmp/gb28181/controller/DeviceConfig.java index f6e7ddcb..c469dabc 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/controller/DeviceConfig.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/controller/DeviceConfig.java @@ -22,6 +22,7 @@ import io.swagger.v3.oas.annotations.security.SecurityRequirement; import io.swagger.v3.oas.annotations.tags.Tag; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.util.Assert; import org.springframework.util.ObjectUtils; import org.springframework.web.bind.annotation.*; import org.springframework.web.context.request.async.DeferredResult; @@ -74,36 +75,19 @@ public class DeviceConfig { log.debug("报警复位API调用"); } Device device = deviceService.getDeviceByDeviceId(deviceId); - String uuid = UUID.randomUUID().toString(); - String key = DeferredResultHolder.CALLBACK_CMD_DEVICECONFIG + deviceId + channelId; - try { - cmder.deviceBasicConfigCmd(device, channelId, name, expiration, heartBeatInterval, heartBeatCount, event -> { - RequestMessage msg = new RequestMessage(); - msg.setId(uuid); - msg.setKey(key); - msg.setData(String.format("设备配置操作失败,错误码: %s, %s", event.statusCode, event.msg)); - resultHolder.invokeResult(msg); - }); - } catch (InvalidArgumentException | SipException | ParseException e) { - log.error("[命令发送失败] 设备配置: {}", e.getMessage()); - throw new ControllerException(ErrorCode.ERROR100.getCode(), "命令发送失败: " + e.getMessage()); - } - DeferredResult result = new DeferredResult(3 * 1000L); + Assert.notNull(device, "设备不存在"); + DeferredResult result = deviceService.deviceBasicConfig(device, channelId, name, expiration, heartBeatInterval, heartBeatCount); + result.onTimeout(() -> { log.warn(String.format("设备配置操作超时, 设备未返回应答指令")); - // 释放rtpserver - RequestMessage msg = new RequestMessage(); - msg.setId(uuid); - msg.setKey(key); JSONObject json = new JSONObject(); - json.put("DeviceID", deviceId); + json.put("DeviceID", device.getDeviceId()); json.put("Status", "Timeout"); json.put("Description", "设备配置操作超时, 设备未返回应答指令"); - msg.setData(json); //("看守位控制操作超时, 设备未返回应答指令"); - resultHolder.invokeResult(msg); + result.setResult(json.toString()); }); - resultHolder.put(key, uuid, result); return result; + } /** @@ -124,32 +108,19 @@ public class DeviceConfig { if (log.isDebugEnabled()) { log.debug("设备状态查询API调用"); } - String key = DeferredResultHolder.CALLBACK_CMD_CONFIGDOWNLOAD + (ObjectUtils.isEmpty(channelId) ? deviceId : channelId); - String uuid = UUID.randomUUID().toString(); Device device = deviceService.getDeviceByDeviceId(deviceId); - try { - cmder.deviceConfigQuery(device, channelId, configType, event -> { - RequestMessage msg = new RequestMessage(); - msg.setId(uuid); - msg.setKey(key); - msg.setData(String.format("获取设备配置失败,错误码: %s, %s", event.statusCode, event.msg)); - resultHolder.invokeResult(msg); - }); - } catch (InvalidArgumentException | SipException | ParseException e) { - log.error("[命令发送失败] 获取设备配置: {}", e.getMessage()); - throw new ControllerException(ErrorCode.ERROR100.getCode(), "命令发送失败: " + e.getMessage()); - } - DeferredResult result = new DeferredResult (3 * 1000L); - result.onTimeout(()->{ - log.warn(String.format("获取设备配置超时")); - // 释放rtpserver - RequestMessage msg = new RequestMessage(); - msg.setId(uuid); - msg.setKey(key); - msg.setData("Timeout. Device did not response to this command."); - resultHolder.invokeResult(msg); + Assert.notNull(device, "设备不存在"); + + DeferredResult result = deviceService.deviceConfigQuery(device, channelId, configType); + + result.onTimeout(() -> { + log.warn("获取设备配置超时"); + JSONObject json = new JSONObject(); + json.put("DeviceID", device.getDeviceId()); + json.put("Status", "Timeout"); + json.put("Description", "操作超时"); + result.setResult(json.toString()); }); - resultHolder.put(key, uuid, result); return result; } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/service/IDeviceService.java b/src/main/java/com/genersoft/iot/vmp/gb28181/service/IDeviceService.java index f25c7d83..e9cc4945 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/service/IDeviceService.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/service/IDeviceService.java @@ -7,6 +7,7 @@ import com.genersoft.iot.vmp.gb28181.bean.SyncStatus; import com.genersoft.iot.vmp.vmanager.bean.ResourceBaseInfo; import com.genersoft.iot.vmp.vmanager.bean.WVPResult; import com.github.pagehelper.PageInfo; +import org.springframework.web.context.request.async.DeferredResult; import java.util.List; @@ -168,4 +169,8 @@ public interface IDeviceService { void subscribeMobilePosition(int id, int cycle, int interval); WVPResult devicesSync(Device device); + + DeferredResult deviceBasicConfig(Device device, String channelId, String name, String expiration, String heartBeatInterval, String heartBeatCount); + + DeferredResult deviceConfigQuery(Device device, String channelId, String configType); } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/DeviceServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/DeviceServiceImpl.java index 80e4582e..a55174e6 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/DeviceServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/DeviceServiceImpl.java @@ -1,6 +1,7 @@ package com.genersoft.iot.vmp.gb28181.service.impl; import com.alibaba.fastjson2.JSON; +import com.alibaba.fastjson2.JSONObject; import com.genersoft.iot.vmp.common.CommonCallback; import com.genersoft.iot.vmp.common.VideoManagerConstants; import com.genersoft.iot.vmp.common.enums.ChannelDataType; @@ -18,6 +19,8 @@ import com.genersoft.iot.vmp.gb28181.session.SipInviteSessionManager; import com.genersoft.iot.vmp.gb28181.task.ISubscribeTask; import com.genersoft.iot.vmp.gb28181.task.impl.CatalogSubscribeTask; import com.genersoft.iot.vmp.gb28181.task.impl.MobilePositionSubscribeTask; +import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder; +import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage; import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander; import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.response.cmd.CatalogResponseMessageHandler; import com.genersoft.iot.vmp.media.bean.MediaServer; @@ -36,12 +39,15 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; import org.springframework.util.Assert; +import org.springframework.util.ObjectUtils; +import org.springframework.web.context.request.async.DeferredResult; import javax.sip.InvalidArgumentException; import javax.sip.SipException; import java.text.ParseException; import java.time.Instant; import java.util.List; +import java.util.UUID; import java.util.concurrent.TimeUnit; /** @@ -632,4 +638,49 @@ public class DeviceServiceImpl implements IDeviceService { wvpResult.setMsg("开始同步"); return wvpResult; } + + @Override + public DeferredResult deviceBasicConfig(Device device, String channelId, String name, String expiration, + String heartBeatInterval, String heartBeatCount) { + if (!userSetting.getServerId().equals(device.getServerId())) { + String result = redisRpcService.deviceBasicConfig(device.getServerId(), device, channelId, name, expiration, + heartBeatInterval, heartBeatCount); + DeferredResult deferredResult = new DeferredResult(3 * 1000L); + deferredResult.setResult(result); + return deferredResult; + } + + DeferredResult result = new DeferredResult(3 * 1000L); + try { + sipCommander.deviceBasicConfigCmd(device, channelId, name, expiration, heartBeatInterval, heartBeatCount, event -> { + result.setResult(String.format("设备配置操作失败,错误码: %s, %s", event.statusCode, event.msg)); + }); + } catch (InvalidArgumentException | SipException | ParseException e) { + log.error("[命令发送失败] 设备配置: {}", e.getMessage()); + throw new ControllerException(ErrorCode.ERROR100.getCode(), "命令发送失败: " + e.getMessage()); + } + return result; + } + + @Override + public DeferredResult deviceConfigQuery(Device device, String channelId, String configType) { + + if (!userSetting.getServerId().equals(device.getServerId())) { + String result = redisRpcService.deviceConfigQuery(device.getServerId(), device, channelId, configType); + DeferredResult deferredResult = new DeferredResult(3 * 1000L); + deferredResult.setResult(result); + return deferredResult; + } + + DeferredResult result = new DeferredResult(3 * 1000L); + try { + sipCommander.deviceConfigQuery(device, channelId, configType, event -> { + result.setResult(String.format("获取设备配置失败,错误码: %s, %s", event.statusCode, event.msg)); + }); + } catch (InvalidArgumentException | SipException | ParseException e) { + log.error("[命令发送失败] 获取设备配置: {}", e.getMessage()); + throw new ControllerException(ErrorCode.ERROR100.getCode(), "命令发送失败: " + e.getMessage()); + } + return result; + } } diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/IRedisRpcService.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/IRedisRpcService.java index 22ab933f..7a362db7 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/IRedisRpcService.java +++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/IRedisRpcService.java @@ -2,6 +2,7 @@ package com.genersoft.iot.vmp.service.redisMsg; import com.genersoft.iot.vmp.common.CommonCallback; import com.genersoft.iot.vmp.common.StreamInfo; +import com.genersoft.iot.vmp.gb28181.bean.Device; import com.genersoft.iot.vmp.gb28181.bean.Platform; import com.genersoft.iot.vmp.gb28181.bean.SendRtpInfo; import com.genersoft.iot.vmp.gb28181.bean.SyncStatus; @@ -38,4 +39,8 @@ public interface IRedisRpcService { WVPResult devicesSync(String serverId, String deviceId); SyncStatus getChannelSyncStatus(String serverId, String deviceId); + + String deviceBasicConfig(String serverId, Device device, String channelId, String name, String expiration, String heartBeatInterval, String heartBeatCount); + + String deviceConfigQuery(String serverId, Device device, String channelId, String configType); } diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcDeviceController.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcDeviceController.java index d4ac7a4b..8180cd9c 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcDeviceController.java +++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcDeviceController.java @@ -20,6 +20,7 @@ import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.stereotype.Component; +import org.springframework.web.context.request.async.DeferredResult; @Component @Slf4j @@ -88,6 +89,82 @@ public class RedisRpcDeviceController extends RpcController { return response; } + @RedisRpcMapping("deviceBasicConfig") + public RedisRpcResponse deviceBasicConfig(RedisRpcRequest request) { + JSONObject paramJson = JSONObject.parseObject(request.getParam().toString()); + String deviceId = paramJson.getString("deviceId"); + String channelId = paramJson.getString("channelId"); + String name = paramJson.getString("configType"); + String expiration = paramJson.getString("expiration"); + String heartBeatInterval = paramJson.getString("heartBeatInterval"); + + Device device = deviceService.getDeviceByDeviceId(deviceId); + + RedisRpcResponse response = request.getResponse(); + if (device == null || !userSetting.getServerId().equals(device.getServerId())) { + response.setStatusCode(ErrorCode.ERROR400.getCode()); + response.setBody("param error"); + return response; + } + DeferredResult deferredResult = deviceService.deviceBasicConfig(device, channelId, name, expiration, heartBeatInterval, heartBeatInterval); + deferredResult.onCompletion(() ->{ + String resultStr = (String)deferredResult.getResult(); + response.setStatusCode(ErrorCode.SUCCESS.getCode()); + response.setBody(resultStr); + // 手动发送结果 + sendResponse(response); + }); + deferredResult.onTimeout(() -> { + log.warn(String.format("设备配置操作超时, 设备未返回应答指令")); + JSONObject json = new JSONObject(); + json.put("DeviceID", device.getDeviceId()); + json.put("Status", "Timeout"); + json.put("Description", "设备配置操作超时, 设备未返回应答指令"); + response.setStatusCode(ErrorCode.SUCCESS.getCode()); + response.setBody(json); + // 手动发送结果 + sendResponse(response); + }); + return response; + } + + @RedisRpcMapping("deviceConfigQuery") + public RedisRpcResponse deviceConfigQuery(RedisRpcRequest request) { + JSONObject paramJson = JSONObject.parseObject(request.getParam().toString()); + String deviceId = paramJson.getString("deviceId"); + String channelId = paramJson.getString("channelId"); + String configType = paramJson.getString("configType"); + + Device device = deviceService.getDeviceByDeviceId(deviceId); + + RedisRpcResponse response = request.getResponse(); + if (device == null || !userSetting.getServerId().equals(device.getServerId())) { + response.setStatusCode(ErrorCode.ERROR400.getCode()); + response.setBody("param error"); + return response; + } + DeferredResult deferredResult = deviceService.deviceConfigQuery(device, channelId, configType); + deferredResult.onCompletion(() ->{ + String resultStr = (String)deferredResult.getResult(); + response.setStatusCode(ErrorCode.SUCCESS.getCode()); + response.setBody(resultStr); + // 手动发送结果 + sendResponse(response); + }); + deferredResult.onTimeout(() -> { + log.warn(String.format("设备配置操作超时, 设备未返回应答指令")); + JSONObject json = new JSONObject(); + json.put("DeviceID", device.getDeviceId()); + json.put("Status", "Timeout"); + json.put("Description", "设备配置操作超时, 设备未返回应答指令"); + response.setStatusCode(ErrorCode.SUCCESS.getCode()); + response.setBody(json); + // 手动发送结果 + sendResponse(response); + }); + return response; + } + } diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/service/RedisRpcServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/service/RedisRpcServiceImpl.java index 9aa84533..ca8eda9e 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/service/RedisRpcServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/service/RedisRpcServiceImpl.java @@ -8,6 +8,7 @@ import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.conf.redis.RedisRpcConfig; import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcRequest; import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcResponse; +import com.genersoft.iot.vmp.gb28181.bean.Device; import com.genersoft.iot.vmp.gb28181.bean.Platform; import com.genersoft.iot.vmp.gb28181.bean.SendRtpInfo; import com.genersoft.iot.vmp.gb28181.bean.SyncStatus; @@ -263,4 +264,32 @@ public class RedisRpcServiceImpl implements IRedisRpcService { RedisRpcResponse response = redisRpcConfig.request(request, 100, TimeUnit.MILLISECONDS); return JSON.parseObject(response.getBody().toString(), SyncStatus.class); } + + @Override + public String deviceBasicConfig(String serverId, Device device, String channelId, String name, String expiration, + String heartBeatInterval, String heartBeatCount) { + JSONObject jsonObject = new JSONObject(); + jsonObject.put("device", device.getDeviceId()); + jsonObject.put("channelId", channelId); + jsonObject.put("name", name); + jsonObject.put("expiration", expiration); + jsonObject.put("heartBeatInterval", heartBeatInterval); + jsonObject.put("heartBeatCount", heartBeatCount); + RedisRpcRequest request = buildRequest("device/deviceBasicConfig", jsonObject); + request.setToId(serverId); + RedisRpcResponse response = redisRpcConfig.request(request, 50, TimeUnit.MILLISECONDS); + return response.getBody().toString(); + } + + @Override + public String deviceConfigQuery(String serverId, Device device, String channelId, String configType) { + JSONObject jsonObject = new JSONObject(); + jsonObject.put("device", device.getDeviceId()); + jsonObject.put("channelId", channelId); + jsonObject.put("configType", configType); + RedisRpcRequest request = buildRequest("device/deviceConfigQuery", jsonObject); + request.setToId(serverId); + RedisRpcResponse response = redisRpcConfig.request(request, 50, TimeUnit.MILLISECONDS); + return response.getBody().toString(); + } }