diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/controller/DeviceControl.java b/src/main/java/com/genersoft/iot/vmp/gb28181/controller/DeviceControl.java index 55017deb..a9daa543 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/controller/DeviceControl.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/controller/DeviceControl.java @@ -24,6 +24,7 @@ import io.swagger.v3.oas.annotations.tags.Tag; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.http.ResponseEntity; +import org.springframework.util.Assert; import org.springframework.util.ObjectUtils; import org.springframework.web.bind.annotation.*; import org.springframework.web.context.request.async.DeferredResult; @@ -61,12 +62,8 @@ public class DeviceControl { log.debug("设备远程启动API调用"); } Device device = deviceService.getDeviceByDeviceId(deviceId); - try { - cmder.teleBootCmd(device); - } catch (InvalidArgumentException | SipException | ParseException e) { - log.error("[命令发送失败] 远程启动: {}", e.getMessage()); - throw new ControllerException(ErrorCode.ERROR100.getCode(), "命令发送失败: " + e.getMessage()); - } + Assert.notNull(device, "设备不存在"); + deviceService.teleboot(device); } /** @@ -81,41 +78,18 @@ public class DeviceControl { @Parameter(name = "channelId", description = "通道国标编号", required = true) @Parameter(name = "recordCmdStr", description = "命令, 可选值:Record(手动录像),StopRecord(停止手动录像)", required = true) @GetMapping("/record/{deviceId}/{recordCmdStr}") - public DeferredResult>> recordApi(@PathVariable String deviceId, + public DeferredResult recordApi(@PathVariable String deviceId, @PathVariable String recordCmdStr, String channelId) { if (log.isDebugEnabled()) { log.debug("开始/停止录像API调用"); } Device device = deviceService.getDeviceByDeviceId(deviceId); - String uuid = UUID.randomUUID().toString(); - String key = DeferredResultHolder.CALLBACK_CMD_DEVICECONTROL + deviceId + channelId; - DeferredResult>> result = new DeferredResult<>(3 * 1000L); + Assert.notNull(device, "设备不存在"); + DeferredResult result = deviceService.record(device, channelId, recordCmdStr); result.onTimeout(() -> { - log.warn(String.format("开始/停止录像操作超时, 设备未返回应答指令")); - // 释放rtpserver - RequestMessage msg = new RequestMessage(); - msg.setKey(key); - msg.setId(uuid); - msg.setData(WVPResult.fail(ErrorCode.ERROR100.getCode(), "操作超时, 设备未应答")); - resultHolder.invokeAllResult(msg); + log.warn("[开始/停止录像] 操作超时, 设备未返回应答指令, {}", deviceId); + result.setResult("操作超时, 设备未应答"); }); - if (resultHolder.exist(key, null)){ - return result; - } - resultHolder.put(key, uuid, result); - try { - cmder.recordCmd(device, channelId, recordCmdStr, event -> { - RequestMessage msg = new RequestMessage(); - msg.setId(uuid); - msg.setKey(key); - msg.setData(WVPResult.fail(ErrorCode.ERROR100.getCode(), String.format("开始/停止录像操作失败,错误码: %s, %s", event.statusCode, event.msg))); - resultHolder.invokeAllResult(msg); - },null); - } catch (InvalidArgumentException | SipException | ParseException e) { - log.error("[命令发送失败] 开始/停止录像: {}", e.getMessage()); - throw new ControllerException(ErrorCode.ERROR100.getCode(), "命令发送失败: " + e.getMessage()); - } - return result; } @@ -134,32 +108,12 @@ public class DeviceControl { log.debug("布防/撤防API调用"); } Device device = deviceService.getDeviceByDeviceId(deviceId); - String key = DeferredResultHolder.CALLBACK_CMD_DEVICECONTROL + deviceId + deviceId; - String uuid =UUID.randomUUID().toString(); - try { - cmder.guardCmd(device, guardCmdStr, event -> { - RequestMessage msg = new RequestMessage(); - msg.setId(uuid); - msg.setKey(key); - msg.setData(WVPResult.fail(ErrorCode.ERROR100.getCode(), String.format("布防/撤防操作失败,错误码: %s, %s", event.statusCode, event.msg))); - resultHolder.invokeResult(msg); - },null); - } catch (InvalidArgumentException | SipException | ParseException e) { - log.error("[命令发送失败] 布防/撤防操作: {}", e.getMessage()); - throw new ControllerException(ErrorCode.ERROR100.getCode(), "命令发送: " + e.getMessage()); - } - DeferredResult> result = new DeferredResult<>(3 * 1000L); - resultHolder.put(key, uuid, result); + Assert.notNull(device, "设备不存在"); + DeferredResult> result = deviceService.guard(device, guardCmdStr); result.onTimeout(() -> { - log.warn(String.format("布防/撤防操作超时, 设备未返回应答指令")); - // 释放rtpserver - RequestMessage msg = new RequestMessage(); - msg.setKey(key); - msg.setId(uuid); - msg.setData(WVPResult.fail(ErrorCode.ERROR100.getCode(), "操作超时, 设备未应答")); - resultHolder.invokeResult(msg); + log.warn("[布防/撤防] 操作超时, 设备未返回应答指令, {}", deviceId); + result.setResult(WVPResult.fail(ErrorCode.ERROR100.getCode(), "操作超时, 设备未应答")); }); - return result; } @@ -176,38 +130,19 @@ public class DeviceControl { @Parameter(name = "alarmMethod", description = "报警方式") @Parameter(name = "alarmType", description = "报警类型") @GetMapping("/reset_alarm/{deviceId}") - public DeferredResult>> resetAlarmApi(@PathVariable String deviceId, String channelId, + public DeferredResult> resetAlarmApi(@PathVariable String deviceId, String channelId, @RequestParam(required = false) String alarmMethod, @RequestParam(required = false) String alarmType) { if (log.isDebugEnabled()) { log.debug("报警复位API调用"); } Device device = deviceService.getDeviceByDeviceId(deviceId); - String uuid = UUID.randomUUID().toString(); - String key = DeferredResultHolder.CALLBACK_CMD_DEVICECONTROL + deviceId + channelId; - try { - cmder.alarmCmd(device, alarmMethod, alarmType, event -> { - RequestMessage msg = new RequestMessage(); - msg.setId(uuid); - msg.setKey(key); - msg.setData(WVPResult.fail(ErrorCode.ERROR100.getCode(), String.format("操作失败,错误码: %s, %s", event.statusCode, event.msg))); - resultHolder.invokeResult(msg); - },null); - } catch (InvalidArgumentException | SipException | ParseException e) { - log.error("[命令发送失败] 报警复位: {}", e.getMessage()); - throw new ControllerException(ErrorCode.ERROR100.getCode(), "命令发送失败: " + e.getMessage()); - } - DeferredResult>> result = new DeferredResult<>(3 * 1000L); + Assert.notNull(device, "设备不存在"); + DeferredResult> result = deviceService.resetAlarm(device, channelId, alarmMethod, alarmType); result.onTimeout(() -> { - log.warn(String.format("报警复位操作超时, 设备未返回应答指令")); - // 释放rtpserver - RequestMessage msg = new RequestMessage(); - msg.setId(uuid); - msg.setKey(key); - msg.setData(WVPResult.fail(ErrorCode.ERROR100.getCode(), "操作超时, 设备未应答")); - resultHolder.invokeResult(msg); + log.warn("[布防/撤防] 操作超时, 设备未返回应答指令, {}", deviceId); + result.setResult(WVPResult.fail(ErrorCode.ERROR100.getCode(), "操作超时, 设备未应答")); }); - resultHolder.put(key, uuid, result); return result; } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/service/IDeviceService.java b/src/main/java/com/genersoft/iot/vmp/gb28181/service/IDeviceService.java index e9cc4945..43e262e9 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/service/IDeviceService.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/service/IDeviceService.java @@ -173,4 +173,12 @@ public interface IDeviceService { DeferredResult deviceBasicConfig(Device device, String channelId, String name, String expiration, String heartBeatInterval, String heartBeatCount); DeferredResult deviceConfigQuery(Device device, String channelId, String configType); + + void teleboot(Device device); + + DeferredResult record(Device device, String channelId, String recordCmdStr); + + DeferredResult> guard(Device device, String guardCmdStr); + + DeferredResult> resetAlarm(Device device, String channelId, String alarmMethod, String alarmType); } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/DeviceServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/DeviceServiceImpl.java index a55174e6..5d9c0b9c 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/DeviceServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/DeviceServiceImpl.java @@ -36,6 +36,7 @@ import com.github.pagehelper.PageHelper; import com.github.pagehelper.PageInfo; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.http.ResponseEntity; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; import org.springframework.util.Assert; @@ -683,4 +684,85 @@ public class DeviceServiceImpl implements IDeviceService { } return result; } + + @Override + public void teleboot(Device device) { + + if (!userSetting.getServerId().equals(device.getServerId())) { + redisRpcService.teleboot(device.getServerId(), device); + } + try { + sipCommander.teleBootCmd(device); + } catch (InvalidArgumentException | SipException | ParseException e) { + log.error("[命令发送失败] 远程启动: {}", e.getMessage()); + throw new ControllerException(ErrorCode.ERROR100.getCode(), "命令发送失败: " + e.getMessage()); + } + } + + @Override + public DeferredResult record(Device device, String channelId, String recordCmdStr) { + + if (!userSetting.getServerId().equals(device.getServerId())) { + String result = redisRpcService.recordControl(device.getServerId(), device, channelId, recordCmdStr); + DeferredResult deferredResult = new DeferredResult(3 * 1000L); + deferredResult.setResult(result); + return deferredResult; + } + + DeferredResult result = new DeferredResult<>(3 * 1000L); + try { + sipCommander.recordCmd(device, channelId, recordCmdStr, event -> { + result.setResult(String.format("开始/停止录像操作失败,错误码: %s, %s", event.statusCode, event.msg)); + },null); + } catch (InvalidArgumentException | SipException | ParseException e) { + log.error("[命令发送失败] 开始/停止录像: {}", e.getMessage()); + throw new ControllerException(ErrorCode.ERROR100.getCode(), "命令发送失败: " + e.getMessage()); + } + + return result; + } + + @Override + public DeferredResult> guard(Device device, String guardCmdStr) { + if (!userSetting.getServerId().equals(device.getServerId())) { + WVPResult result = redisRpcService.guard(device.getServerId(), device, guardCmdStr); + DeferredResult> deferredResult = new DeferredResult<>(3 * 1000L); + deferredResult.setResult(result); + return deferredResult; + } + + DeferredResult> result = new DeferredResult<>(3 * 1000L); + try { + sipCommander.guardCmd(device, guardCmdStr, event -> { + result.setResult(WVPResult.fail(ErrorCode.ERROR100.getCode(), String.format("布防/撤防操作失败,错误码: %s, %s", event.statusCode, event.msg))); + },null); + } catch (InvalidArgumentException | SipException | ParseException e) { + log.error("[命令发送失败] 布防/撤防操作: {}", e.getMessage()); + throw new ControllerException(ErrorCode.ERROR100.getCode(), "命令发送: " + e.getMessage()); + } + + return result; + } + + @Override + public DeferredResult> resetAlarm(Device device, String channelId, String alarmMethod, String alarmType) { + if (!userSetting.getServerId().equals(device.getServerId())) { + WVPResult result = redisRpcService.resetAlarm(device.getServerId(), device, channelId, alarmMethod, alarmType); + DeferredResult> deferredResult = new DeferredResult<>(3 * 1000L); + deferredResult.setResult(result); + return deferredResult; + } + + DeferredResult> result = new DeferredResult<>(3 * 1000L); + try { + sipCommander.alarmCmd(device, alarmMethod, alarmType, event -> { + result.setResult(WVPResult.fail(ErrorCode.ERROR100.getCode(), String.format("操作失败,错误码: %s, %s", event.statusCode, event.msg))); + },null); + } catch (InvalidArgumentException | SipException | ParseException e) { + log.error("[命令发送失败] 布防/撤防操作: {}", e.getMessage()); + throw new ControllerException(ErrorCode.ERROR100.getCode(), "命令发送: " + e.getMessage()); + } + + return result; + } } diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/IRedisRpcService.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/IRedisRpcService.java index 7a362db7..eb44b204 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/IRedisRpcService.java +++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/IRedisRpcService.java @@ -43,4 +43,10 @@ public interface IRedisRpcService { String deviceBasicConfig(String serverId, Device device, String channelId, String name, String expiration, String heartBeatInterval, String heartBeatCount); String deviceConfigQuery(String serverId, Device device, String channelId, String configType); + + void teleboot(String serverId, Device device); + + String recordControl(String serverId, Device device, String channelId, String recordCmdStr); + + WVPResult guard(String serverId, Device device, String guardCmdStr); } diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcDeviceController.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcDeviceController.java index 8180cd9c..10067fc1 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcDeviceController.java +++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcDeviceController.java @@ -3,6 +3,7 @@ package com.genersoft.iot.vmp.service.redisMsg.control; import com.alibaba.fastjson2.JSON; import com.alibaba.fastjson2.JSONObject; import com.genersoft.iot.vmp.conf.UserSetting; +import com.genersoft.iot.vmp.conf.exception.ControllerException; import com.genersoft.iot.vmp.conf.redis.RedisRpcConfig; import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcMessage; import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcRequest; @@ -165,6 +166,110 @@ public class RedisRpcDeviceController extends RpcController { return response; } + @RedisRpcMapping("teleboot") + public RedisRpcResponse teleboot(RedisRpcRequest request) { + String deviceId = request.getParam().toString(); + + Device device = deviceService.getDeviceByDeviceId(deviceId); + + RedisRpcResponse response = request.getResponse(); + if (device == null || !userSetting.getServerId().equals(device.getServerId())) { + response.setStatusCode(ErrorCode.ERROR400.getCode()); + response.setBody("param error"); + return response; + } + try { + deviceService.teleboot(device); + }catch (ControllerException e) { + response.setStatusCode(e.getCode()); + response.setBody(e.getMsg()); + return response; + } + response.setStatusCode(ErrorCode.SUCCESS.getCode()); + response.setBody(ErrorCode.SUCCESS.getMsg()); + return response; + } + + @RedisRpcMapping("record") + public RedisRpcResponse record(RedisRpcRequest request) { + JSONObject paramJson = JSONObject.parseObject(request.getParam().toString()); + String deviceId = paramJson.getString("deviceId"); + String channelId = paramJson.getString("channelId"); + String recordCmdStr = paramJson.getString("recordCmdStr"); + + Device device = deviceService.getDeviceByDeviceId(deviceId); + + RedisRpcResponse response = request.getResponse(); + if (device == null || !userSetting.getServerId().equals(device.getServerId())) { + response.setStatusCode(ErrorCode.ERROR400.getCode()); + response.setBody("param error"); + return response; + } + try { + DeferredResult deferredResult = deviceService.record(device, channelId, recordCmdStr); + deferredResult.onCompletion(() ->{ + String resultStr = (String)deferredResult.getResult(); + response.setStatusCode(ErrorCode.SUCCESS.getCode()); + response.setBody(resultStr); + // 手动发送结果 + sendResponse(response); + }); + deferredResult.onTimeout(() -> { + log.warn("设备录像控制操作超时, 设备未返回应答指令"); + JSONObject json = new JSONObject(); + json.put("DeviceID", device.getDeviceId()); + json.put("Status", "Timeout"); + json.put("Description", "设备录像控制操作超时, 设备未返回应答指令"); + response.setStatusCode(ErrorCode.SUCCESS.getCode()); + response.setBody(json); + // 手动发送结果 + sendResponse(response); + }); + }catch (ControllerException e) { + response.setStatusCode(e.getCode()); + response.setBody(e.getMsg()); + sendResponse(response); + } + return null; + } + + @RedisRpcMapping("guard") + public RedisRpcResponse guard(RedisRpcRequest request) { + JSONObject paramJson = JSONObject.parseObject(request.getParam().toString()); + String deviceId = paramJson.getString("deviceId"); + String guardCmdStr = paramJson.getString("guardCmdStr"); + + Device device = deviceService.getDeviceByDeviceId(deviceId); + + RedisRpcResponse response = request.getResponse(); + if (device == null || !userSetting.getServerId().equals(device.getServerId())) { + response.setStatusCode(ErrorCode.ERROR400.getCode()); + response.setBody("param error"); + return response; + } + try { + DeferredResult> deferredResult = deviceService.guard(device, guardCmdStr); + deferredResult.onCompletion(() ->{ + response.setStatusCode(ErrorCode.SUCCESS.getCode()); + response.setBody(deferredResult.getResult()); + // 手动发送结果 + sendResponse(response); + }); + deferredResult.onTimeout(() -> { + log.warn("设备录像控制操作超时, 设备未返回应答指令"); + response.setStatusCode(ErrorCode.SUCCESS.getCode()); + response.setBody(WVPResult.fail(ErrorCode.ERROR100.getCode(), "操作超时, 设备未应答")); + // 手动发送结果 + sendResponse(response); + }); + }catch (ControllerException e) { + response.setStatusCode(e.getCode()); + response.setBody(e.getMsg()); + sendResponse(response); + } + return null; + } + } diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/service/RedisRpcServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/service/RedisRpcServiceImpl.java index ca8eda9e..169d4c04 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/service/RedisRpcServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/service/RedisRpcServiceImpl.java @@ -5,6 +5,7 @@ import com.alibaba.fastjson2.JSONObject; import com.genersoft.iot.vmp.common.CommonCallback; import com.genersoft.iot.vmp.common.StreamInfo; import com.genersoft.iot.vmp.conf.UserSetting; +import com.genersoft.iot.vmp.conf.exception.ControllerException; import com.genersoft.iot.vmp.conf.redis.RedisRpcConfig; import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcRequest; import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcResponse; @@ -292,4 +293,37 @@ public class RedisRpcServiceImpl implements IRedisRpcService { RedisRpcResponse response = redisRpcConfig.request(request, 50, TimeUnit.MILLISECONDS); return response.getBody().toString(); } + + @Override + public void teleboot(String serverId, Device device) { + RedisRpcRequest request = buildRequest("device/teleboot", device.getDeviceId()); + request.setToId(serverId); + RedisRpcResponse response = redisRpcConfig.request(request, 50, TimeUnit.MILLISECONDS); + if (response.getStatusCode() != ErrorCode.SUCCESS.getCode()) { + throw new ControllerException(response.getStatusCode(), response.getBody().toString()); + } + } + + @Override + public String recordControl(String serverId, Device device, String channelId, String recordCmdStr) { + JSONObject jsonObject = new JSONObject(); + jsonObject.put("device", device.getDeviceId()); + jsonObject.put("channelId", channelId); + jsonObject.put("recordCmdStr", recordCmdStr); + RedisRpcRequest request = buildRequest("device/record", jsonObject); + request.setToId(serverId); + RedisRpcResponse response = redisRpcConfig.request(request, 50, TimeUnit.MILLISECONDS); + return response.getBody().toString(); + } + + @Override + public WVPResult guard(String serverId, Device device, String guardCmdStr) { + JSONObject jsonObject = new JSONObject(); + jsonObject.put("device", device.getDeviceId()); + jsonObject.put("guardCmdStr", guardCmdStr); + RedisRpcRequest request = buildRequest("device/guard", jsonObject); + request.setToId(serverId); + RedisRpcResponse response = redisRpcConfig.request(request, 50, TimeUnit.MILLISECONDS); + return JSON.parseObject(response.getBody().toString(), WVPResult.class); + } } diff --git a/src/main/java/com/genersoft/iot/vmp/streamPush/controller/StreamPushController.java b/src/main/java/com/genersoft/iot/vmp/streamPush/controller/StreamPushController.java index d6c24cb5..67fb1cfa 100755 --- a/src/main/java/com/genersoft/iot/vmp/streamPush/controller/StreamPushController.java +++ b/src/main/java/com/genersoft/iot/vmp/streamPush/controller/StreamPushController.java @@ -260,4 +260,12 @@ public class StreamPushController { }, null, null); return result; } + + @GetMapping(value = "/forceClose") + @ResponseBody + @Operation(summary = "强制停止推流", security = @SecurityRequirement(name = JwtUtils.HEADER)) + public void stop(String app, String stream){ + + streamPushPlayService.stop(app, stream); + } } diff --git a/src/main/java/com/genersoft/iot/vmp/streamPush/service/IStreamPushPlayService.java b/src/main/java/com/genersoft/iot/vmp/streamPush/service/IStreamPushPlayService.java index 9b75ffc7..0b42a2e3 100644 --- a/src/main/java/com/genersoft/iot/vmp/streamPush/service/IStreamPushPlayService.java +++ b/src/main/java/com/genersoft/iot/vmp/streamPush/service/IStreamPushPlayService.java @@ -6,5 +6,6 @@ import com.genersoft.iot.vmp.service.bean.ErrorCallback; public interface IStreamPushPlayService { void start(Integer id, ErrorCallback callback, String platformDeviceId, String platformName ); - void stop(Integer streamPushId); + void stop(String app, String stream); + } diff --git a/src/main/java/com/genersoft/iot/vmp/streamPush/service/impl/StreamPushPlayServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/streamPush/service/impl/StreamPushPlayServiceImpl.java index 16fdc4cf..68e54343 100644 --- a/src/main/java/com/genersoft/iot/vmp/streamPush/service/impl/StreamPushPlayServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/streamPush/service/impl/StreamPushPlayServiceImpl.java @@ -117,7 +117,14 @@ public class StreamPushPlayServiceImpl implements IStreamPushPlayService { } @Override - public void stop(Integer streamPushId) { - // 推流无需主动停止 + public void stop(String app, String stream) { + StreamPush streamPush = streamPushMapper.selectByAppAndStream(app, stream); + if (streamPush == null || !streamPush.isPushing()) { + return; + } + String mediaServerId = streamPush.getMediaServerId(); + MediaServer mediaServer = mediaServerService.getOne(mediaServerId); + Assert.notNull(mediaServer, "未找到使用的节点"); + mediaServerService.closeStreams(mediaServer, app, stream); } }