diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/controller/DeviceQuery.java b/src/main/java/com/genersoft/iot/vmp/gb28181/controller/DeviceQuery.java index fc72bfe9..8e4324ea 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/controller/DeviceQuery.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/controller/DeviceQuery.java @@ -67,11 +67,6 @@ public class DeviceQuery { @Autowired private IRedisRpcService redisRpcService; - /** - * 使用ID查询国标设备 - * @param deviceId 国标ID - * @return 国标设备 - */ @Operation(summary = "查询国标设备", security = @SecurityRequirement(name = JwtUtils.HEADER)) @Parameter(name = "deviceId", description = "设备国标编号", required = true) @GetMapping("/devices/{deviceId}") @@ -80,12 +75,7 @@ public class DeviceQuery { return deviceService.getDeviceByDeviceId(deviceId); } - /** - * 分页查询国标设备 - * @param page 当前页 - * @param count 每页查询数量 - * @return 分页国标列表 - */ + @Operation(summary = "分页查询国标设备", security = @SecurityRequirement(name = JwtUtils.HEADER)) @Parameter(name = "page", description = "当前页", required = true) @Parameter(name = "count", description = "每页查询数量", required = true) @@ -100,9 +90,7 @@ public class DeviceQuery { return deviceService.getAll(page, count, query, status); } - /** - * 分页查询通道数 - */ + @GetMapping("/devices/{deviceId}/channels") @Operation(summary = "分页查询通道", security = @SecurityRequirement(name = JwtUtils.HEADER)) @Parameter(name = "deviceId", description = "设备国标编号", required = true) @@ -123,9 +111,7 @@ public class DeviceQuery { return deviceChannelService.queryChannelsByDeviceId(deviceId, query, channelType, online, page, count); } - /** - * 同步设备通道 - */ + @Operation(summary = "同步设备通道", security = @SecurityRequirement(name = JwtUtils.HEADER)) @Parameter(name = "deviceId", description = "设备国标编号", required = true) @GetMapping("/devices/{deviceId}/sync") @@ -135,19 +121,11 @@ public class DeviceQuery { log.debug("设备通道信息同步API调用,deviceId:" + deviceId); } Device device = deviceService.getDeviceByDeviceId(deviceId); - if (!userSetting.getServerId().equals(device.getServerId())) { - return redisRpcService.devicesSync(device.getServerId(), deviceId); - } return deviceService.devicesSync(device); } - /** - * 移除设备 - * @param deviceId 设备id - * @return - */ @Operation(summary = "移除设备", security = @SecurityRequirement(name = JwtUtils.HEADER)) @Parameter(name = "deviceId", description = "设备国标编号", required = true) @DeleteMapping("/devices/{deviceId}/delete") @@ -182,17 +160,6 @@ public class DeviceQuery { } } - /** - * 分页查询子目录通道 - * @param deviceId 通道id - * @param channelId 通道id - * @param page 当前页 - * @param count 每页条数 - * @param query 查询内容 - * @param online 是否在线 - * @param channelType 通道类型 - * @return 子通道列表 - */ @Operation(summary = "分页查询子目录通道", security = @SecurityRequirement(name = JwtUtils.HEADER)) @Parameter(name = "deviceId", description = "设备国标编号", required = true) @Parameter(name = "channelId", description = "通道国标编号", required = true) @@ -235,12 +202,7 @@ public class DeviceQuery { deviceChannelService.updateChannelStreamIdentification(channel); } - /** - * 修改数据流传输模式 - * @param deviceId 设备id - * @param streamMode 数据流传输模式 - * @return - */ + @Operation(summary = "修改数据流传输模式", security = @SecurityRequirement(name = JwtUtils.HEADER)) @Parameter(name = "deviceId", description = "设备国标编号", required = true) @Parameter(name = "streamMode", description = "数据流传输模式, 取值:" + @@ -252,11 +214,7 @@ public class DeviceQuery { deviceService.updateCustomDevice(device); } - /** - * 添加设备信息 - * @param device 设备信息 - * @return - */ + @Operation(summary = "添加设备信息", security = @SecurityRequirement(name = JwtUtils.HEADER)) @Parameter(name = "device", description = "设备", required = true) @PostMapping("/device/add/") @@ -274,11 +232,7 @@ public class DeviceQuery { deviceService.addDevice(device); } - /** - * 更新设备信息 - * @param device 设备信息 - * @return - */ + @Operation(summary = "更新设备信息", security = @SecurityRequirement(name = JwtUtils.HEADER)) @Parameter(name = "device", description = "设备", required = true) @PostMapping("/device/update/") @@ -289,11 +243,6 @@ public class DeviceQuery { deviceService.updateCustomDevice(device); } - /** - * 设备状态查询请求API接口 - * - * @param deviceId 设备id - */ @Operation(summary = "设备状态查询", security = @SecurityRequirement(name = JwtUtils.HEADER)) @Parameter(name = "deviceId", description = "设备国标编号", required = true) @GetMapping("/devices/{deviceId}/status") @@ -314,27 +263,17 @@ public class DeviceQuery { return result; } - /** - * 设备报警查询请求API接口 - * @param deviceId 设备id - * @param startPriority 报警起始级别(可选) - * @param endPriority 报警终止级别(可选) - * @param alarmMethod 报警方式条件(可选) - * @param alarmType 报警类型 - * @param startTime 报警发生起始时间(可选) - * @param endTime 报警发生终止时间(可选) - * @return true = 命令发送成功 - */ @Operation(summary = "设备报警查询", security = @SecurityRequirement(name = JwtUtils.HEADER)) @Parameter(name = "deviceId", description = "设备国标编号", required = true) - @Parameter(name = "startPriority", description = "报警起始级别") - @Parameter(name = "endPriority", description = "报警终止级别") - @Parameter(name = "alarmMethod", description = "报警方式条件") + @Parameter(name = "startPriority", description = "报警起始级别, 0为全部,1为一级警情,2为二级警情,3为三级警情,4为四级警情") + @Parameter(name = "endPriority", description = "报警终止级别, ,0为全部,1为一级警情,2为二级警情,3为三级警情,4为四级警情") + @Parameter(name = "alarmMethod", description = "报警方式条件,取值0为全部,1为电话报警,2为设备报警,3为短信报警,4为GPS报警," + + "5为视频报警,6为设备故障报警,7其他报警;可以为直接组合如12为电话报警或设备报警") @Parameter(name = "alarmType", description = "报警类型") @Parameter(name = "startTime", description = "报警发生起始时间") @Parameter(name = "endTime", description = "报警发生终止时间") - @GetMapping("/alarm/{deviceId}") - public DeferredResult> alarmApi(@PathVariable String deviceId, + @GetMapping("/alarm") + public DeferredResult> alarmApi(String deviceId, @RequestParam(required = false) String startPriority, @RequestParam(required = false) String endPriority, @RequestParam(required = false) String alarmMethod, @@ -346,8 +285,8 @@ public class DeviceQuery { } Device device = deviceService.getDeviceByDeviceId(deviceId); Assert.notNull(device, "设备不存在"); - DeferredResult> result = new DeferredResult<>(); - deviceService.deviceStatus(device, (code, msg, data) -> { + DeferredResult> result = new DeferredResult<>(); + deviceService.alarm(device, startPriority,endPriority ,alarmMethod ,alarmType ,startTime ,endTime, (code, msg, data) -> { result.setResult(new WVPResult<>(code, msg, data)); }); result.onTimeout(() -> { @@ -355,34 +294,26 @@ public class DeviceQuery { result.setResult(WVPResult.fail(ErrorCode.ERROR100.getCode(), "操作超时, 设备未应答")); }); return result; + } - -// String key = DeferredResultHolder.CALLBACK_CMD_ALARM + deviceId; -// String uuid = UUID.randomUUID().toString(); -// try { -// cmder.alarmInfoQuery(device, startPriority, endPriority, alarmMethod, alarmType, startTime, endTime, event -> { -// RequestMessage msg = new RequestMessage(); -// msg.setId(uuid); -// msg.setKey(key); -// msg.setData(String.format("设备报警查询失败,错误码: %s, %s",event.statusCode, event.msg)); -// resultHolder.invokeResult(msg); -// }); -// } catch (InvalidArgumentException | SipException | ParseException e) { -// log.error("[命令发送失败] 设备报警查询: {}", e.getMessage()); -// throw new ControllerException(ErrorCode.ERROR100.getCode(), "命令发送失败: " + e.getMessage()); -// } -// DeferredResult> result = new DeferredResult> (3 * 1000L); -// result.onTimeout(()->{ -// log.warn(String.format("设备报警查询超时")); -// // 释放rtpserver -// RequestMessage msg = new RequestMessage(); -// msg.setId(uuid); -// msg.setKey(key); -// msg.setData("设备报警查询超时"); -// resultHolder.invokeResult(msg); -// }); -// resultHolder.put(DeferredResultHolder.CALLBACK_CMD_ALARM + deviceId, uuid, result); -// return result; + @Operation(summary = "设备信息查询", security = @SecurityRequirement(name = JwtUtils.HEADER)) + @Parameter(name = "deviceId", description = "设备国标编号", required = true) + @GetMapping("/info") + public DeferredResult> deviceInfo(String deviceId) { + if (log.isDebugEnabled()) { + log.debug("设备信息查询API调用"); + } + Device device = deviceService.getDeviceByDeviceId(deviceId); + Assert.notNull(device, "设备不存在"); + DeferredResult> result = new DeferredResult<>(); + deviceService.deviceInfo(device, (code, msg, data) -> { + result.setResult(new WVPResult<>(code, msg, data)); + }); + result.onTimeout(() -> { + log.warn("[设备信息查询] 操作超时, 设备未返回应答指令, {}", deviceId); + result.setResult(WVPResult.fail(ErrorCode.ERROR100.getCode(), "操作超时, 设备未应答")); + }); + return result; } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/controller/PlayController.java b/src/main/java/com/genersoft/iot/vmp/gb28181/controller/PlayController.java index a70abec8..6d5352fe 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/controller/PlayController.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/controller/PlayController.java @@ -93,28 +93,20 @@ public class PlayController { DeviceChannel channel = deviceChannelService.getOne(deviceId, channelId); Assert.notNull(channel, "通道不存在"); - RequestMessage requestMessage = new RequestMessage(); - String key = DeferredResultHolder.CALLBACK_CMD_PLAY + deviceId + channelId; - requestMessage.setKey(key); - String uuid = UUID.randomUUID().toString(); - requestMessage.setId(uuid); DeferredResult> result = new DeferredResult<>(userSetting.getPlayTimeout().longValue()); result.onTimeout(()->{ log.info("[点播等待超时] deviceId:{}, channelId:{}, ", deviceId, channelId); // 释放rtpserver - WVPResult wvpResult = new WVPResult<>(); + WVPResult wvpResult = new WVPResult<>(); wvpResult.setCode(ErrorCode.ERROR100.getCode()); wvpResult.setMsg("点播超时"); - requestMessage.setData(wvpResult); - resultHolder.invokeAllResult(requestMessage); + result.setResult(wvpResult); + inviteStreamService.removeInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, channel.getId()); deviceChannelService.stopPlay(channel.getId()); }); - // 录像查询以channelId作为deviceId查询 - resultHolder.put(key, uuid, result); - ErrorCallback callback = (code, msg, streamInfo) -> { WVPResult wvpResult = new WVPResult<>(); if (code == InviteErrorCode.SUCCESS.getCode()) { @@ -145,9 +137,7 @@ public class PlayController { wvpResult.setCode(code); wvpResult.setMsg(msg); } - requestMessage.setData(wvpResult); - // 此处必须释放所有请求 - resultHolder.invokeAllResult(requestMessage); + result.setResult(wvpResult); }; playService.play(device, channel, callback); return result; diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/controller/PtzController.java b/src/main/java/com/genersoft/iot/vmp/gb28181/controller/PtzController.java index 233050b6..585889a7 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/controller/PtzController.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/controller/PtzController.java @@ -7,9 +7,9 @@ import com.genersoft.iot.vmp.gb28181.bean.Device; import com.genersoft.iot.vmp.gb28181.service.IDeviceService; import com.genersoft.iot.vmp.gb28181.service.IPTZService; import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder; -import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage; import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander; import com.genersoft.iot.vmp.vmanager.bean.ErrorCode; +import com.genersoft.iot.vmp.vmanager.bean.WVPResult; import io.swagger.v3.oas.annotations.Operation; import io.swagger.v3.oas.annotations.Parameter; import io.swagger.v3.oas.annotations.security.SecurityRequirement; @@ -17,18 +17,12 @@ import io.swagger.v3.oas.annotations.tags.Tag; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.util.Assert; -import org.springframework.util.ObjectUtils; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import org.springframework.web.context.request.async.DeferredResult; -import javax.sip.InvalidArgumentException; -import javax.sip.SipException; -import java.text.ParseException; -import java.util.UUID; - @Tag(name = "前端设备控制") @Slf4j @RestController @@ -225,40 +219,22 @@ public class PtzController { @Parameter(name = "deviceId", description = "设备国标编号", required = true) @Parameter(name = "channelId", description = "通道国标编号", required = true) @GetMapping("/preset/query/{deviceId}/{channelId}") - public DeferredResult queryPreset(@PathVariable String deviceId, @PathVariable String channelId) { + public DeferredResult> queryPreset(@PathVariable String deviceId, @PathVariable String channelId) { if (log.isDebugEnabled()) { log.debug("设备预置位查询API调用"); } Device device = deviceService.getDeviceByDeviceId(deviceId); - String uuid = UUID.randomUUID().toString(); - String key = DeferredResultHolder.CALLBACK_CMD_PRESETQUERY + (ObjectUtils.isEmpty(channelId) ? deviceId : channelId); - DeferredResult result = new DeferredResult (3 * 1000L); - result.onTimeout(()->{ - log.warn(String.format("获取设备预置位超时")); - // 释放rtpserver - RequestMessage msg = new RequestMessage(); - msg.setId(uuid); - msg.setKey(key); - msg.setData("获取设备预置位超时"); - resultHolder.invokeResult(msg); + Assert.notNull(device, "设备不存在"); + DeferredResult> deferredResult = new DeferredResult<> (3 * 1000L); + deviceService.queryPreset(device, channelId, (code, msg, data) -> { + deferredResult.setResult(new WVPResult<>(code, msg, data)); }); - if (resultHolder.exist(key, null)) { - return result; - } - resultHolder.put(key, uuid, result); - try { - cmder.presetQuery(device, channelId, event -> { - RequestMessage msg = new RequestMessage(); - msg.setId(uuid); - msg.setKey(key); - msg.setData(String.format("获取设备预置位失败,错误码: %s, %s", event.statusCode, event.msg)); - resultHolder.invokeResult(msg); - }); - } catch (InvalidArgumentException | SipException | ParseException e) { - log.error("[命令发送失败] 获取设备预置位: {}", e.getMessage()); - throw new ControllerException(ErrorCode.ERROR100.getCode(), "命令发送失败: " + e.getMessage()); - } - return result; + + deferredResult.onTimeout(()->{ + log.warn("[获取设备预置位] 超时, {}", device.getDeviceId()); + deferredResult.setResult(WVPResult.fail(ErrorCode.ERROR100.getCode(), "超时")); + }); + return deferredResult; } @Operation(summary = "预置位指令-设置预置位", security = @SecurityRequirement(name = JwtUtils.HEADER)) diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/service/IDeviceService.java b/src/main/java/com/genersoft/iot/vmp/gb28181/service/IDeviceService.java index 738e0313..45a46ff5 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/service/IDeviceService.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/service/IDeviceService.java @@ -1,10 +1,7 @@ package com.genersoft.iot.vmp.gb28181.service; import com.genersoft.iot.vmp.common.CommonCallback; -import com.genersoft.iot.vmp.gb28181.bean.BasicParam; -import com.genersoft.iot.vmp.gb28181.bean.Device; -import com.genersoft.iot.vmp.gb28181.bean.SipTransactionInfo; -import com.genersoft.iot.vmp.gb28181.bean.SyncStatus; +import com.genersoft.iot.vmp.gb28181.bean.*; import com.genersoft.iot.vmp.service.bean.ErrorCallback; import com.genersoft.iot.vmp.vmanager.bean.ResourceBaseInfo; import com.genersoft.iot.vmp.vmanager.bean.WVPResult; @@ -194,4 +191,10 @@ public interface IDeviceService { void deviceStatus(Device device, ErrorCallback callback); void updateDeviceHeartInfo(Device device); + + void alarm(Device device, String startPriority, String endPriority, String alarmMethod, String alarmType, String startTime, String endTime, ErrorCallback callback); + + void deviceInfo(Device device, ErrorCallback callback); + + void queryPreset(Device device, String channelId, ErrorCallback callback); } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/DeviceServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/DeviceServiceImpl.java index cb32f93c..05b27c88 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/DeviceServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/DeviceServiceImpl.java @@ -37,7 +37,6 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; import org.springframework.util.Assert; -import org.springframework.web.context.request.async.DeferredResult; import javax.sip.InvalidArgumentException; import javax.sip.SipException; @@ -140,7 +139,7 @@ public class DeviceServiceImpl implements IDeviceService { deviceMapper.add(device); redisCatchStorage.updateDevice(device); try { - commander.deviceInfoQuery(device); + commander.deviceInfoQuery(device, null); commander.deviceConfigQuery(device, null, "BasicParam", null); } catch (InvalidArgumentException | SipException | ParseException e) { log.error("[命令发送失败] 查询设备信息: {}", e.getMessage()); @@ -155,7 +154,7 @@ public class DeviceServiceImpl implements IDeviceService { if (userSetting.getSyncChannelOnDeviceOnline()) { log.info("[设备上线,离线状态下重新注册]: {},查询设备信息以及通道信息", device.getDeviceId()); try { - commander.deviceInfoQuery(device); + commander.deviceInfoQuery(device, null); } catch (InvalidArgumentException | SipException | ParseException e) { log.error("[命令发送失败] 查询设备信息: {}", e.getMessage()); } @@ -623,7 +622,9 @@ public class DeviceServiceImpl implements IDeviceService { @Override public WVPResult devicesSync(Device device) { - + if (!userSetting.getServerId().equals(device.getServerId())) { + return redisRpcService.devicesSync(device.getServerId(), device.getDeviceId()); + } // 已存在则返回进度 if (isSyncRunning(device.getDeviceId())) { SyncStatus channelSyncStatus = getChannelSyncStatus(device.getDeviceId()); @@ -742,7 +743,7 @@ public class DeviceServiceImpl implements IDeviceService { return; } try { - sipCommander.alarmCmd(device, alarmMethod, alarmType, callback); + sipCommander.alarmResetCmd(device, alarmMethod, alarmType, callback); } catch (InvalidArgumentException | SipException | ParseException e) { log.error("[命令发送失败] 布防/撤防操作: {}", e.getMessage()); callback.run(ErrorCode.ERROR100.getCode(), "命令发送: " + e.getMessage(), null); @@ -842,7 +843,6 @@ public class DeviceServiceImpl implements IDeviceService { return; } - DeferredResult> result = new DeferredResult<>(2*1000L); try { sipCommander.deviceStatusQuery(device, callback); } catch (InvalidArgumentException | SipException | ParseException e) { @@ -851,4 +851,65 @@ public class DeviceServiceImpl implements IDeviceService { throw new ControllerException(ErrorCode.ERROR100.getCode(), "命令发送失败: " + e.getMessage()); } } + + + @Override + public void alarm(Device device, String startPriority, String endPriority, String alarmMethod, String alarmType, String startTime, String endTime, ErrorCallback callback) { + if (!userSetting.getServerId().equals(device.getServerId())) { + WVPResult result = redisRpcService.alarm(device.getServerId(), device, startPriority, endPriority, alarmMethod, alarmType, startTime, endTime); + callback.run(result.getCode(), result.getMsg(), result.getData()); + return; + } + + String startAlarmTime = ""; + if (startTime != null) { + startAlarmTime = DateUtil.yyyy_MM_dd_HH_mm_ssToISO8601(startTime); + } + String endAlarmTime = ""; + if (startTime != null) { + endAlarmTime = DateUtil.yyyy_MM_dd_HH_mm_ssToISO8601(endTime); + } + + try { + sipCommander.alarmInfoQuery(device, startPriority, endPriority, alarmMethod, alarmType, startAlarmTime, endAlarmTime, callback); + } catch (InvalidArgumentException | SipException | ParseException e) { + log.error("[命令发送失败] 获取设备状态: {}", e.getMessage()); + callback.run(ErrorCode.ERROR100.getCode(), "命令发送: " + e.getMessage(), null); + throw new ControllerException(ErrorCode.ERROR100.getCode(), "命令发送失败: " + e.getMessage()); + } + } + + @Override + public void deviceInfo(Device device, ErrorCallback callback) { + if (!userSetting.getServerId().equals(device.getServerId())) { + WVPResult result = redisRpcService.deviceInfo(device.getServerId(), device); + callback.run(result.getCode(), result.getMsg(), result.getData()); + return; + } + + try { + sipCommander.deviceInfoQuery(device, callback); + } catch (InvalidArgumentException | SipException | ParseException e) { + log.error("[命令发送失败] 获取设备信息: {}", e.getMessage()); + callback.run(ErrorCode.ERROR100.getCode(), "命令发送: " + e.getMessage(), null); + throw new ControllerException(ErrorCode.ERROR100.getCode(), "命令发送失败: " + e.getMessage()); + } + } + + @Override + public void queryPreset(Device device, String channelId, ErrorCallback callback) { + if (!userSetting.getServerId().equals(device.getServerId())) { + WVPResult result = redisRpcService.queryPreset(device.getServerId(), device, channelId); + callback.run(result.getCode(), result.getMsg(), result.getData()); + return; + } + + try { + sipCommander.presetQuery(device, channelId, callback); + } catch (InvalidArgumentException | SipException | ParseException e) { + log.error("[命令发送失败] 预制位查询: {}", e.getMessage()); + callback.run(ErrorCode.ERROR100.getCode(), "命令发送: " + e.getMessage(), null); + throw new ControllerException(ErrorCode.ERROR100.getCode(), "命令发送失败: " + e.getMessage()); + } + } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/session/RecordDataCatch.java b/src/main/java/com/genersoft/iot/vmp/gb28181/session/RecordDataCatch.java deleted file mode 100755 index e89cedf2..00000000 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/session/RecordDataCatch.java +++ /dev/null @@ -1,92 +0,0 @@ -package com.genersoft.iot.vmp.gb28181.session; - -import com.genersoft.iot.vmp.gb28181.bean.*; -import com.genersoft.iot.vmp.gb28181.event.record.RecordInfoEventListener; -import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder; -import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.scheduling.annotation.Scheduled; -import org.springframework.stereotype.Component; - -import java.time.Instant; -import java.util.*; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.TimeUnit; - -/** - * @author lin - */ -@Component -public class RecordDataCatch { - - public static Map data = new ConcurrentHashMap<>(); - - @Autowired - private DeferredResultHolder deferredResultHolder; - - @Autowired - private RecordInfoEventListener recordEndEventListener; - - - public int put(String deviceId,String channelId, String sn, int sumNum, List recordItems) { - String key = deviceId + sn; - RecordInfo recordInfo = data.get(key); - if (recordInfo == null) { - recordInfo = new RecordInfo(); - recordInfo.setDeviceId(deviceId); - recordInfo.setChannelId(channelId); - recordInfo.setSn(sn.trim()); - recordInfo.setSumNum(sumNum); - recordInfo.setRecordList(Collections.synchronizedList(new ArrayList<>())); - recordInfo.setLastTime(Instant.now()); - recordInfo.getRecordList().addAll(recordItems); - data.put(key, recordInfo); - }else { - // 同一个设备的通道同步请求只考虑一个,其他的直接忽略 - if (!Objects.equals(sn.trim(), recordInfo.getSn())) { - return 0; - } - recordInfo.getRecordList().addAll(recordItems); - recordInfo.setLastTime(Instant.now()); - } - return recordInfo.getRecordList().size(); - } - - @Scheduled(fixedRate = 5 * 1000) //每5秒执行一次, 发现数据5秒未更新则移除数据并认为数据接收超时 - private void timerTask(){ - Set keys = data.keySet(); - // 获取五秒前的时刻 - Instant instantBefore5S = Instant.now().minusMillis(TimeUnit.SECONDS.toMillis(5)); - for (String key : keys) { - RecordInfo recordInfo = data.get(key); - // 超过五秒收不到消息任务超时, 只更新这一部分数据 - if ( recordInfo.getLastTime().isBefore(instantBefore5S)) { - // 处理录像数据, 返回给前端 - String msgKey = DeferredResultHolder.CALLBACK_CMD_RECORDINFO + recordInfo.getDeviceId() + recordInfo.getSn(); - - // 对数据进行排序 - Collections.sort(recordInfo.getRecordList()); - - RequestMessage msg = new RequestMessage(); - msg.setKey(msgKey); - msg.setData(recordInfo); - deferredResultHolder.invokeAllResult(msg); - recordEndEventListener.delEndEventHandler(recordInfo.getDeviceId(),recordInfo.getChannelId()); - data.remove(key); - } - } - } - - public boolean isComplete(String deviceId, String sn) { - RecordInfo recordInfo = data.get(deviceId + sn); - return recordInfo != null && recordInfo.getRecordList().size() == recordInfo.getSumNum(); - } - - public RecordInfo getRecordInfo(String deviceId, String sn) { - return data.get(deviceId + sn); - } - - public void remove(String deviceId, String sn) { - data.remove(deviceId + sn); - } -} diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/callback/DeferredResultHolder.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/callback/DeferredResultHolder.java index 61aaac4a..d39ce28e 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/callback/DeferredResultHolder.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/callback/DeferredResultHolder.java @@ -18,20 +18,6 @@ import java.util.concurrent.ConcurrentHashMap; @SuppressWarnings(value = {"rawtypes", "unchecked"}) @Component public class DeferredResultHolder { - - public static final String CALLBACK_CMD_DEVICESTATUS = "CALLBACK_DEVICESTATUS"; - - public static final String CALLBACK_CMD_DEVICEINFO = "CALLBACK_DEVICEINFO"; - - public static final String CALLBACK_CMD_DEVICECONTROL = "CALLBACK_DEVICECONTROL"; - - public static final String CALLBACK_CMD_DEVICECONFIG = "CALLBACK_DEVICECONFIG"; - - public static final String CALLBACK_CMD_CONFIGDOWNLOAD = "CALLBACK_CONFIGDOWNLOAD"; - - public static final String CALLBACK_CMD_CATALOG = "CALLBACK_CATALOG"; - - public static final String CALLBACK_CMD_RECORDINFO = "CALLBACK_RECORDINFO"; public static final String CALLBACK_CMD_PLAY = "CALLBACK_PLAY"; @@ -39,20 +25,11 @@ public class DeferredResultHolder { public static final String CALLBACK_CMD_DOWNLOAD = "CALLBACK_DOWNLOAD"; - public static final String CALLBACK_CMD_PROXY = "CALLBACK_PROXY"; - - public static final String CALLBACK_CMD_STOP = "CALLBACK_STOP"; public static final String UPLOAD_FILE_CHANNEL = "UPLOAD_FILE_CHANNEL"; public static final String CALLBACK_CMD_MOBILE_POSITION = "CALLBACK_CMD_MOBILE_POSITION"; - public static final String CALLBACK_CMD_PRESETQUERY = "CALLBACK_PRESETQUERY"; - - public static final String CALLBACK_CMD_ALARM = "CALLBACK_ALARM"; - - public static final String CALLBACK_CMD_BROADCAST = "CALLBACK_BROADCAST"; - public static final String CALLBACK_CMD_SNAP= "CALLBACK_SNAP"; private Map> map = new ConcurrentHashMap<>(); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommander.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommander.java index 6bead77e..b9fdbe8d 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommander.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommander.java @@ -205,7 +205,7 @@ public interface ISIPCommander { * @param alarmMethod 报警方式(可选) * @param alarmType 报警类型(可选) */ - void alarmCmd(Device device, String alarmMethod, String alarmType, ErrorCallback callback) throws InvalidArgumentException, SipException, ParseException; + void alarmResetCmd(Device device, String alarmMethod, String alarmType, ErrorCallback callback) throws InvalidArgumentException, SipException, ParseException; /** * 强制关键帧命令,设备收到此命令应立刻发送一个IDR帧 @@ -242,11 +242,12 @@ public interface ISIPCommander { /** * 查询设备信息 - * - * @param device 视频设备 - * @return + * + * @param device 视频设备 + * @param callback + * @return */ - void deviceInfoQuery(Device device) throws InvalidArgumentException, SipException, ParseException; + void deviceInfoQuery(Device device, ErrorCallback callback) throws InvalidArgumentException, SipException, ParseException; /** * 查询目录列表 @@ -278,7 +279,7 @@ public interface ISIPCommander { * @return true = 命令发送成功 */ void alarmInfoQuery(Device device, String startPriority, String endPriority, String alarmMethod, - String alarmType, String startTime, String endTime, SipSubscribe.Event errorEvent) throws InvalidArgumentException, SipException, ParseException; + String alarmType, String startTime, String endTime, ErrorCallback callback) throws InvalidArgumentException, SipException, ParseException; /** * 查询设备配置 @@ -294,7 +295,7 @@ public interface ISIPCommander { * * @param device 视频设备 */ - void presetQuery(Device device, String channelId, SipSubscribe.Event errorEvent) throws InvalidArgumentException, SipException, ParseException; + void presetQuery(Device device, String channelId, ErrorCallback callback) throws InvalidArgumentException, SipException, ParseException; /** * 查询移动设备位置数据 diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java index 5b32e96c..791e1e26 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java @@ -771,7 +771,7 @@ public class SIPCommander implements ISIPCommander { * @param device 视频设备 */ @Override - public void alarmCmd(Device device, String alarmMethod, String alarmType, ErrorCallback callback) throws InvalidArgumentException, SipException, ParseException { + public void alarmResetCmd(Device device, String alarmMethod, String alarmType, ErrorCallback callback) throws InvalidArgumentException, SipException, ParseException { String cmdType = "DeviceControl"; int sn = (int) ((Math.random() * 9 + 1) * 100000); @@ -975,25 +975,35 @@ public class SIPCommander implements ISIPCommander { /** * 查询设备信息 * - * @param device 视频设备 + * @param device 视频设备 + * @param callback */ @Override - public void deviceInfoQuery(Device device) throws InvalidArgumentException, SipException, ParseException { + public void deviceInfoQuery(Device device, ErrorCallback callback) throws InvalidArgumentException, SipException, ParseException { + + String cmdType = "DeviceInfo"; + String sn = (int) ((Math.random() * 9 + 1) * 100000) + ""; StringBuffer catalogXml = new StringBuffer(200); String charset = device.getCharset(); catalogXml.append("\r\n"); catalogXml.append("\r\n"); - catalogXml.append("DeviceInfo\r\n"); - catalogXml.append("" + (int) ((Math.random() * 9 + 1) * 100000) + "\r\n"); + catalogXml.append("" + cmdType +"\r\n"); + catalogXml.append("" + sn + "\r\n"); catalogXml.append("" + device.getDeviceId() + "\r\n"); catalogXml.append("\r\n"); - + MessageEvent messageEvent = MessageEvent.getInstance(cmdType, sn, device.getDeviceId(), 1000L, callback); + messageSubscribe.addSubscribe(messageEvent); Request request = headerProvider.createMessageRequest(device, catalogXml.toString(), SipUtils.getNewViaTag(), SipUtils.getNewFromTag(), null,sipSender.getNewCallIdHeader(sipLayer.getLocalIp(device.getLocalIp()),device.getTransport())); - sipSender.transmitRequest(sipLayer.getLocalIp(device.getLocalIp()), request); + sipSender.transmitRequest(sipLayer.getLocalIp(device.getLocalIp()), request, eventResult -> { + messageSubscribe.removeSubscribe(messageEvent.getKey()); + if (callback != null) { + callback.run(ErrorCode.ERROR100.getCode(), "失败," + eventResult.msg, null); + } + }); } @@ -1079,14 +1089,17 @@ public class SIPCommander implements ISIPCommander { */ @Override public void alarmInfoQuery(Device device, String startPriority, String endPriority, String alarmMethod, String alarmType, - String startTime, String endTime, SipSubscribe.Event errorEvent) throws InvalidArgumentException, SipException, ParseException { + String startTime, String endTime, ErrorCallback callback) throws InvalidArgumentException, SipException, ParseException { + + String cmdType = "Alarm"; + String sn = (int) ((Math.random() * 9 + 1) * 100000) + ""; StringBuffer cmdXml = new StringBuffer(200); String charset = device.getCharset(); cmdXml.append("\r\n"); cmdXml.append("\r\n"); - cmdXml.append("Alarm\r\n"); - cmdXml.append("" + (int) ((Math.random() * 9 + 1) * 100000) + "\r\n"); + cmdXml.append("" + cmdType + "\r\n"); + cmdXml.append("" + sn + "\r\n"); cmdXml.append("" + device.getDeviceId() + "\r\n"); if (!ObjectUtils.isEmpty(startPriority)) { cmdXml.append("" + startPriority + "\r\n"); @@ -1108,10 +1121,14 @@ public class SIPCommander implements ISIPCommander { } cmdXml.append("\r\n"); - + MessageEvent messageEvent = MessageEvent.getInstance(cmdType, sn, device.getDeviceId(), 1000L, callback); + messageSubscribe.addSubscribe(messageEvent); Request request = headerProvider.createMessageRequest(device, cmdXml.toString(), null, SipUtils.getNewFromTag(), null,sipSender.getNewCallIdHeader(sipLayer.getLocalIp(device.getLocalIp()),device.getTransport())); - sipSender.transmitRequest(sipLayer.getLocalIp(device.getLocalIp()), request, errorEvent); + sipSender.transmitRequest(sipLayer.getLocalIp(device.getLocalIp()), request, eventResult -> { + messageSubscribe.removeSubscribe(messageEvent.getKey()); + callback.run(ErrorCode.ERROR100.getCode(), "失败," + eventResult.msg, null); + }); } /** @@ -1156,14 +1173,17 @@ public class SIPCommander implements ISIPCommander { * @param device 视频设备 */ @Override - public void presetQuery(Device device, String channelId, SipSubscribe.Event errorEvent) throws InvalidArgumentException, SipException, ParseException { + public void presetQuery(Device device, String channelId, ErrorCallback callback) throws InvalidArgumentException, SipException, ParseException { + + String cmdType = "PresetQuery"; + int sn = (int) ((Math.random() * 9 + 1) * 100000); StringBuffer cmdXml = new StringBuffer(200); String charset = device.getCharset(); cmdXml.append("\r\n"); cmdXml.append("\r\n"); - cmdXml.append("PresetQuery\r\n"); - cmdXml.append("" + (int) ((Math.random() * 9 + 1) * 100000) + "\r\n"); + cmdXml.append("" + cmdType + "\r\n"); + cmdXml.append("" + sn + "\r\n"); if (ObjectUtils.isEmpty(channelId)) { cmdXml.append("" + device.getDeviceId() + "\r\n"); } else { @@ -1171,9 +1191,14 @@ public class SIPCommander implements ISIPCommander { } cmdXml.append("\r\n"); + MessageEvent messageEvent = MessageEvent.getInstance(cmdType, sn + "", channelId, 1000L, callback); + messageSubscribe.addSubscribe(messageEvent); Request request = headerProvider.createMessageRequest(device, cmdXml.toString(), null, SipUtils.getNewFromTag(), null,sipSender.getNewCallIdHeader(sipLayer.getLocalIp(device.getLocalIp()),device.getTransport())); - sipSender.transmitRequest(sipLayer.getLocalIp(device.getLocalIp()), request, errorEvent); + sipSender.transmitRequest(sipLayer.getLocalIp(device.getLocalIp()), request, eventResult -> { + messageSubscribe.removeSubscribe(messageEvent.getKey()); + callback.run(ErrorCode.ERROR100.getCode(), "失败," + eventResult.msg, null); + }); } /** diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/control/cmd/DeviceControlQueryMessageHandler.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/control/cmd/DeviceControlQueryMessageHandler.java index 1e32ddb1..46048b78 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/control/cmd/DeviceControlQueryMessageHandler.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/control/cmd/DeviceControlQueryMessageHandler.java @@ -424,7 +424,7 @@ public class DeviceControlQueryMessageHandler extends SIPRequestProcessorParent } } try { - cmder.alarmCmd(device, alarmMethod, alarmType, (code, msg, data) -> { + cmder.alarmResetCmd(device, alarmMethod, alarmType, (code, msg, data) -> { if (code == ErrorCode.SUCCESS.getCode()) { onOk(request); }else { diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/AlarmResponseMessageHandler.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/AlarmResponseMessageHandler.java index 41853465..ffbe9075 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/AlarmResponseMessageHandler.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/AlarmResponseMessageHandler.java @@ -4,18 +4,22 @@ import com.alibaba.fastjson2.JSONObject; import com.genersoft.iot.vmp.gb28181.bean.Device; import com.genersoft.iot.vmp.gb28181.bean.Platform; import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder; -import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage; import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent; import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.IMessageHandler; import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.response.ResponseMessageHandler; import com.genersoft.iot.vmp.gb28181.utils.XmlUtil; +import gov.nist.javax.sip.message.SIPRequest; import lombok.extern.slf4j.Slf4j; import org.dom4j.Element; import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; +import javax.sip.InvalidArgumentException; import javax.sip.RequestEvent; +import javax.sip.SipException; +import javax.sip.message.Response; +import java.text.ParseException; @Slf4j @Component @@ -36,18 +40,18 @@ public class AlarmResponseMessageHandler extends SIPRequestProcessorParent imple @Override public void handForDevice(RequestEvent evt, Device device, Element rootElement) { - Element deviceIdElement = rootElement.element("DeviceID"); - String channelId = deviceIdElement.getText().toString(); - String key = DeferredResultHolder.CALLBACK_CMD_ALARM + device.getDeviceId() + channelId; + // 回复200 OK + try { + responseAck((SIPRequest) evt.getRequest(), Response.OK); + } catch (SipException | InvalidArgumentException | ParseException e) { + log.error("[命令发送失败] 目录查询回复: {}", e.getMessage()); + } JSONObject json = new JSONObject(); XmlUtil.node2Json(rootElement, json); if (log.isDebugEnabled()) { log.debug(json.toJSONString()); } - RequestMessage msg = new RequestMessage(); - msg.setKey(key); - msg.setData(json); - deferredResultHolder.invokeAllResult(msg); + responseMessageHandler.handMessageEvent(rootElement, null); } @Override diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/DeviceConfigResponseMessageHandler.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/DeviceConfigResponseMessageHandler.java deleted file mode 100755 index 645fe63e..00000000 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/DeviceConfigResponseMessageHandler.java +++ /dev/null @@ -1,69 +0,0 @@ -package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.response.cmd; - -import com.alibaba.fastjson2.JSONObject; -import com.genersoft.iot.vmp.gb28181.bean.Device; -import com.genersoft.iot.vmp.gb28181.bean.Platform; -import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder; -import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage; -import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent; -import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.IMessageHandler; -import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.response.ResponseMessageHandler; -import com.genersoft.iot.vmp.gb28181.utils.XmlUtil; -import gov.nist.javax.sip.message.SIPRequest; -import lombok.extern.slf4j.Slf4j; -import org.dom4j.Element; -import org.springframework.beans.factory.InitializingBean; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Component; - -import javax.sip.InvalidArgumentException; -import javax.sip.RequestEvent; -import javax.sip.SipException; -import javax.sip.message.Response; - -import java.text.ParseException; - -import static com.genersoft.iot.vmp.gb28181.utils.XmlUtil.getText; - -@Slf4j -@Component -public class DeviceConfigResponseMessageHandler extends SIPRequestProcessorParent implements InitializingBean, IMessageHandler { - - private final String cmdType = "DeviceConfig"; - - @Autowired - private ResponseMessageHandler responseMessageHandler; - - @Autowired - private DeferredResultHolder deferredResultHolder; - - @Override - public void afterPropertiesSet() throws Exception { - responseMessageHandler.addHandler(cmdType, this); - } - - @Override - public void handForDevice(RequestEvent evt, Device device, Element element) { - JSONObject json = new JSONObject(); - try { - // 回复200 OK - responseAck((SIPRequest) evt.getRequest(), Response.OK); - } catch (SipException | InvalidArgumentException | ParseException e) { - log.error("[命令发送失败] 设备配置查询: {}", e.getMessage()); - } - XmlUtil.node2Json(element, json); - String channelId = getText(element, "DeviceID"); - if (log.isDebugEnabled()) { - log.debug(json.toJSONString()); - } - String key = DeferredResultHolder.CALLBACK_CMD_DEVICECONFIG + device.getDeviceId() + channelId; - RequestMessage msg = new RequestMessage(); - msg.setKey(key); - msg.setData(json); - deferredResultHolder.invokeAllResult(msg); - } - - @Override - public void handForPlatform(RequestEvent evt, Platform parentPlatform, Element rootElement) { - } -} diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/DeviceControlResponseMessageHandler.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/DeviceControlResponseMessageHandler.java deleted file mode 100755 index f3f48978..00000000 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/DeviceControlResponseMessageHandler.java +++ /dev/null @@ -1,75 +0,0 @@ -package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.response.cmd; - -import com.alibaba.fastjson2.JSONObject; -import com.genersoft.iot.vmp.gb28181.bean.Device; -import com.genersoft.iot.vmp.gb28181.bean.Platform; -import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder; -import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage; -import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent; -import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.IMessageHandler; -import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.response.ResponseMessageHandler; -import com.genersoft.iot.vmp.vmanager.bean.ErrorCode; -import com.genersoft.iot.vmp.vmanager.bean.WVPResult; -import gov.nist.javax.sip.message.SIPRequest; -import lombok.extern.slf4j.Slf4j; -import org.dom4j.Element; -import org.springframework.beans.factory.InitializingBean; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Component; - -import javax.sip.InvalidArgumentException; -import javax.sip.RequestEvent; -import javax.sip.SipException; -import javax.sip.message.Response; -import java.text.ParseException; - -import static com.genersoft.iot.vmp.gb28181.utils.XmlUtil.getText; - -@Slf4j -@Component -public class DeviceControlResponseMessageHandler extends SIPRequestProcessorParent implements InitializingBean, IMessageHandler { - - private final String cmdType = "DeviceControl"; - - @Autowired - private ResponseMessageHandler responseMessageHandler; - - @Autowired - private DeferredResultHolder deferredResultHolder; - - @Override - public void afterPropertiesSet() throws Exception { - responseMessageHandler.addHandler(cmdType, this); - } - - @Override - public void handForDevice(RequestEvent evt, Device device, Element element) { - // 此处是对本平台发出DeviceControl指令的应答 - try { - responseAck((SIPRequest) evt.getRequest(), Response.OK); - } catch (SipException | InvalidArgumentException | ParseException e) { - log.error("[命令发送失败] 国标级联 设备控制: {}", e.getMessage()); - } - JSONObject json = new JSONObject(); - String channelId = getText(element, "DeviceID"); - String result = getText(element, "Result"); - - RequestMessage msg = new RequestMessage(); - String key = DeferredResultHolder.CALLBACK_CMD_DEVICECONTROL + device.getDeviceId() + channelId; - msg.setKey(key); - if ("OK".equalsIgnoreCase(result)) { - msg.setData(WVPResult.success()); - }else { - msg.setData(WVPResult.fail(ErrorCode.ERROR100)); - } - if (log.isDebugEnabled()) { - log.debug(json.toJSONString()); - } - deferredResultHolder.invokeAllResult(msg); - - } - - @Override - public void handForPlatform(RequestEvent evt, Platform parentPlatform, Element rootElement) { - } -} diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/DeviceInfoResponseMessageHandler.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/DeviceInfoResponseMessageHandler.java index 1de3ff7a..24e7f580 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/DeviceInfoResponseMessageHandler.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/DeviceInfoResponseMessageHandler.java @@ -2,12 +2,10 @@ package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.respon import com.genersoft.iot.vmp.gb28181.bean.Device; import com.genersoft.iot.vmp.gb28181.bean.Platform; -import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder; -import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage; +import com.genersoft.iot.vmp.gb28181.service.IDeviceService; import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent; import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.IMessageHandler; import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.response.ResponseMessageHandler; -import com.genersoft.iot.vmp.gb28181.service.IDeviceService; import gov.nist.javax.sip.message.SIPRequest; import lombok.extern.slf4j.Slf4j; import org.dom4j.DocumentException; @@ -37,9 +35,6 @@ public class DeviceInfoResponseMessageHandler extends SIPRequestProcessorParent @Autowired private ResponseMessageHandler responseMessageHandler; - @Autowired - private DeferredResultHolder deferredResultHolder; - @Autowired private IDeviceService deviceService; @@ -70,9 +65,6 @@ public class DeviceInfoResponseMessageHandler extends SIPRequestProcessorParent } return; } - Element deviceIdElement = rootElement.element("DeviceID"); - String channelId = deviceIdElement.getTextTrim(); - String key = DeferredResultHolder.CALLBACK_CMD_DEVICEINFO + device.getDeviceId() + channelId; device.setName(getText(rootElement, "DeviceName")); device.setManufacturer(getText(rootElement, "Manufacturer")); @@ -82,11 +74,8 @@ public class DeviceInfoResponseMessageHandler extends SIPRequestProcessorParent device.setStreamMode("TCP-PASSIVE"); } deviceService.updateDevice(device); + responseMessageHandler.handMessageEvent(rootElement, device); - RequestMessage msg = new RequestMessage(); - msg.setKey(key); - msg.setData(device); - deferredResultHolder.invokeAllResult(msg); } catch (DocumentException e) { throw new RuntimeException(e); } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/DeviceStatusResponseMessageHandler.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/DeviceStatusResponseMessageHandler.java index c6e36644..a3c376aa 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/DeviceStatusResponseMessageHandler.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/DeviceStatusResponseMessageHandler.java @@ -3,14 +3,11 @@ package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.respon import com.alibaba.fastjson2.JSONObject; import com.genersoft.iot.vmp.gb28181.bean.Device; import com.genersoft.iot.vmp.gb28181.bean.Platform; -import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder; -import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage; +import com.genersoft.iot.vmp.gb28181.service.IDeviceService; import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent; import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.IMessageHandler; import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.response.ResponseMessageHandler; import com.genersoft.iot.vmp.gb28181.utils.XmlUtil; -import com.genersoft.iot.vmp.gb28181.service.IDeviceService; -import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import gov.nist.javax.sip.message.SIPRequest; import lombok.extern.slf4j.Slf4j; import org.dom4j.Element; @@ -33,15 +30,9 @@ public class DeviceStatusResponseMessageHandler extends SIPRequestProcessorParen @Autowired private ResponseMessageHandler responseMessageHandler; - @Autowired - private DeferredResultHolder deferredResultHolder; - @Autowired private IDeviceService deviceService; - @Autowired - private IRedisCatchStorage redisCatchStorage; - @Override public void afterPropertiesSet() throws Exception { responseMessageHandler.addHandler(cmdType, this); @@ -60,9 +51,7 @@ public class DeviceStatusResponseMessageHandler extends SIPRequestProcessorParen } catch (SipException | InvalidArgumentException | ParseException e) { log.error("[命令发送失败] 国标级联 设备状态应答回复200OK: {}", e.getMessage()); } - Element deviceIdElement = element.element("DeviceID"); Element onlineElement = element.element("Online"); - String channelId = deviceIdElement.getText(); JSONObject json = new JSONObject(); XmlUtil.node2Json(element, json); if (log.isDebugEnabled()) { @@ -74,10 +63,8 @@ public class DeviceStatusResponseMessageHandler extends SIPRequestProcessorParen }else { deviceService.offline(device.getDeviceId(), "设备状态查询结果:" + text.trim()); } - RequestMessage msg = new RequestMessage(); - msg.setKey(DeferredResultHolder.CALLBACK_CMD_DEVICESTATUS + device.getDeviceId()); - msg.setData(json); - deferredResultHolder.invokeAllResult(msg); + responseMessageHandler.handMessageEvent(element, text); + } @Override diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/PresetQueryResponseMessageHandler.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/PresetQueryResponseMessageHandler.java index 21626d8b..5e154f6e 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/PresetQueryResponseMessageHandler.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/PresetQueryResponseMessageHandler.java @@ -4,7 +4,6 @@ import com.genersoft.iot.vmp.gb28181.bean.Device; import com.genersoft.iot.vmp.gb28181.bean.Platform; import com.genersoft.iot.vmp.gb28181.bean.Preset; import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder; -import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage; import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent; import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.IMessageHandler; import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.response.ResponseMessageHandler; @@ -25,8 +24,6 @@ import java.util.ArrayList; import java.util.Iterator; import java.util.List; -import static com.genersoft.iot.vmp.gb28181.utils.XmlUtil.getText; - /** * 设备预置位查询应答 */ @@ -68,8 +65,6 @@ public class PresetQueryResponseMessageHandler extends SIPRequestProcessorParent Element presetListNumElement = rootElement.element("PresetList"); Element snElement = rootElement.element("SN"); //该字段可能为通道或则设备的id - String deviceId = getText(rootElement, "DeviceID"); - String key = DeferredResultHolder.CALLBACK_CMD_PRESETQUERY + deviceId; if (snElement == null || presetListNumElement == null) { try { responseAck(request, Response.BAD_REQUEST, "xml error"); @@ -98,10 +93,7 @@ public class PresetQueryResponseMessageHandler extends SIPRequestProcessorParent presetQuerySipReqList.add(presetQuerySipReq); } } - RequestMessage requestMessage = new RequestMessage(); - requestMessage.setKey(key); - requestMessage.setData(presetQuerySipReqList); - deferredResultHolder.invokeAllResult(requestMessage); + responseMessageHandler.handMessageEvent(rootElement, presetQuerySipReqList); try { responseAck(request, Response.OK); } catch (InvalidArgumentException | ParseException | SipException e) { diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/IRedisRpcService.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/IRedisRpcService.java index fffb3ba2..549af39b 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/IRedisRpcService.java +++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/IRedisRpcService.java @@ -4,7 +4,6 @@ import com.genersoft.iot.vmp.common.CommonCallback; import com.genersoft.iot.vmp.common.StreamInfo; import com.genersoft.iot.vmp.gb28181.bean.*; import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent; -import com.genersoft.iot.vmp.service.bean.ErrorCallback; import com.genersoft.iot.vmp.vmanager.bean.WVPResult; public interface IRedisRpcService { @@ -59,4 +58,10 @@ public interface IRedisRpcService { void dragZoomOut(String serverId, Device device, String channelId, int length, int width, int midpointx, int midpointy, int lengthx, int lengthy); WVPResult deviceStatus(String serverId, Device device); + + WVPResult alarm(String serverId, Device device, String startPriority, String endPriority, String alarmMethod, String alarmType, String startTime, String endTime); + + WVPResult deviceInfo(String serverId, Device device); + + WVPResult queryPreset(String serverId, Device device, String channelId); } diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcDeviceController.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcDeviceController.java index 713cf66a..a6d727e1 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcDeviceController.java +++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcDeviceController.java @@ -376,6 +376,41 @@ public class RedisRpcDeviceController extends RpcController { return null; } + @RedisRpcMapping("alarm") + public RedisRpcResponse alarm(RedisRpcRequest request) { + + JSONObject paramJson = JSONObject.parseObject(request.getParam().toString()); + String deviceId = paramJson.getString("deviceId"); + String startPriority = paramJson.getString("startPriority"); + String endPriority = paramJson.getString("endPriority"); + String alarmMethod = paramJson.getString("alarmMethod"); + String alarmType = paramJson.getString("alarmType"); + String startTime = paramJson.getString("startTime"); + String endTime = paramJson.getString("endTime"); + + Device device = deviceService.getDeviceByDeviceId(deviceId); + + RedisRpcResponse response = request.getResponse(); + if (device == null || !userSetting.getServerId().equals(device.getServerId())) { + response.setStatusCode(ErrorCode.ERROR400.getCode()); + response.setBody("param error"); + return response; + } + try { + deviceService.alarm(device, startPriority, endPriority, alarmMethod, alarmType, startTime, endTime, (code, msg, data) -> { + response.setStatusCode(ErrorCode.SUCCESS.getCode()); + response.setBody(new WVPResult<>(code, msg, data)); + // 手动发送结果 + sendResponse(response); + }); + }catch (ControllerException e) { + response.setStatusCode(e.getCode()); + response.setBody(WVPResult.fail(ErrorCode.ERROR100.getCode(), e.getMsg())); + sendResponse(response); + } + return null; + } + @RedisRpcMapping("deviceStatus") public RedisRpcResponse deviceStatus(RedisRpcRequest request) { String deviceId = request.getParam().toString(); @@ -403,5 +438,61 @@ public class RedisRpcDeviceController extends RpcController { return null; } + @RedisRpcMapping("info") + public RedisRpcResponse info(RedisRpcRequest request) { + String deviceId = request.getParam().toString(); + + Device device = deviceService.getDeviceByDeviceId(deviceId); + + RedisRpcResponse response = request.getResponse(); + if (device == null || !userSetting.getServerId().equals(device.getServerId())) { + response.setStatusCode(ErrorCode.ERROR400.getCode()); + response.setBody("param error"); + return response; + } + try { + deviceService.deviceInfo(device, (code, msg, data) -> { + response.setStatusCode(ErrorCode.SUCCESS.getCode()); + response.setBody(new WVPResult<>(code, msg, data)); + // 手动发送结果 + sendResponse(response); + }); + }catch (ControllerException e) { + response.setStatusCode(e.getCode()); + response.setBody(WVPResult.fail(ErrorCode.ERROR100.getCode(), e.getMsg())); + sendResponse(response); + } + return null; + } + + @RedisRpcMapping("info") + public RedisRpcResponse queryPreset(RedisRpcRequest request) { + JSONObject paramJson = JSONObject.parseObject(request.getParam().toString()); + String deviceId = paramJson.getString("deviceId"); + String channelId = paramJson.getString("channelId"); + + Device device = deviceService.getDeviceByDeviceId(deviceId); + + RedisRpcResponse response = request.getResponse(); + if (device == null || !userSetting.getServerId().equals(device.getServerId())) { + response.setStatusCode(ErrorCode.ERROR400.getCode()); + response.setBody("param error"); + return response; + } + try { + deviceService.queryPreset(device, channelId, (code, msg, data) -> { + response.setStatusCode(ErrorCode.SUCCESS.getCode()); + response.setBody(new WVPResult<>(code, msg, data)); + // 手动发送结果 + sendResponse(response); + }); + }catch (ControllerException e) { + response.setStatusCode(e.getCode()); + response.setBody(WVPResult.fail(ErrorCode.ERROR100.getCode(), e.getMsg())); + sendResponse(response); + } + return null; + } + } diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/service/RedisRpcServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/service/RedisRpcServiceImpl.java index d12f8ff9..b2d65963 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/service/RedisRpcServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/service/RedisRpcServiceImpl.java @@ -393,4 +393,41 @@ public class RedisRpcServiceImpl implements IRedisRpcService { RedisRpcResponse response = redisRpcConfig.request(request, 50, TimeUnit.MILLISECONDS); return JSON.parseObject(response.getBody().toString(), WVPResult.class); } + + @Override + public WVPResult deviceInfo(String serverId, Device device) { + RedisRpcRequest request = buildRequest("device/info", device.getDeviceId()); + request.setToId(serverId); + RedisRpcResponse response = redisRpcConfig.request(request, 50, TimeUnit.MILLISECONDS); + return JSON.parseObject(response.getBody().toString(), WVPResult.class); + } + + @Override + public WVPResult queryPreset(String serverId, Device device, String channelId) { + JSONObject jsonObject = new JSONObject(); + jsonObject.put("device", device.getDeviceId()); + jsonObject.put("channelId", channelId); + RedisRpcRequest request = buildRequest("device/queryPreset", jsonObject); + request.setToId(serverId); + RedisRpcResponse response = redisRpcConfig.request(request, 60000, TimeUnit.MILLISECONDS); + return JSON.parseObject(response.getBody().toString(), WVPResult.class); + } + + @Override + public WVPResult alarm(String serverId, Device device, String startPriority, String endPriority, + String alarmMethod, String alarmType, String startTime, String endTime) { + JSONObject jsonObject = new JSONObject(); + jsonObject.put("device", device.getDeviceId()); +// jsonObject.put("channelId", channelId); + jsonObject.put("startPriority", startPriority); + jsonObject.put("endPriority", endPriority); + jsonObject.put("alarmMethod", alarmMethod); + jsonObject.put("alarmType", alarmType); + jsonObject.put("startTime", startTime); + jsonObject.put("endTime", endTime); + RedisRpcRequest request = buildRequest("device/alarm", jsonObject); + request.setToId(serverId); + RedisRpcResponse response = redisRpcConfig.request(request, 50, TimeUnit.MILLISECONDS); + return JSON.parseObject(response.getBody().toString(), WVPResult.class); + } } diff --git a/src/main/java/com/genersoft/iot/vmp/utils/DateUtil.java b/src/main/java/com/genersoft/iot/vmp/utils/DateUtil.java index f6f60d7f..24a6b378 100755 --- a/src/main/java/com/genersoft/iot/vmp/utils/DateUtil.java +++ b/src/main/java/com/genersoft/iot/vmp/utils/DateUtil.java @@ -3,6 +3,7 @@ package com.genersoft.iot.vmp.utils; import org.apache.commons.lang3.ObjectUtils; +import javax.validation.constraints.NotNull; import java.time.Instant; import java.time.LocalDate; import java.time.LocalDateTime; @@ -66,8 +67,7 @@ public class DateUtil { public static final DateTimeFormatter DateFormatter = DateTimeFormatter.ofPattern(date_PATTERN, Locale.getDefault()).withZone(ZoneId.of(zoneStr)); public static final DateTimeFormatter urlFormatter = DateTimeFormatter.ofPattern(URL_PATTERN, Locale.getDefault()).withZone(ZoneId.of(zoneStr)); - public static String yyyy_MM_dd_HH_mm_ssToISO8601(String formatTime) { - + public static String yyyy_MM_dd_HH_mm_ssToISO8601(@NotNull String formatTime) { return formatterISO8601.format(formatter.parse(formatTime)); } diff --git a/src/main/java/com/genersoft/iot/vmp/web/gb28181/ApiDeviceController.java b/src/main/java/com/genersoft/iot/vmp/web/gb28181/ApiDeviceController.java index c8d04952..b0cf09b3 100644 --- a/src/main/java/com/genersoft/iot/vmp/web/gb28181/ApiDeviceController.java +++ b/src/main/java/com/genersoft/iot/vmp/web/gb28181/ApiDeviceController.java @@ -2,20 +2,19 @@ package com.genersoft.iot.vmp.web.gb28181; import com.alibaba.fastjson2.JSONArray; import com.alibaba.fastjson2.JSONObject; -import com.genersoft.iot.vmp.conf.exception.ControllerException; import com.genersoft.iot.vmp.gb28181.bean.Device; import com.genersoft.iot.vmp.gb28181.bean.Preset; import com.genersoft.iot.vmp.gb28181.service.IDeviceChannelService; import com.genersoft.iot.vmp.gb28181.service.IDeviceService; import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder; -import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage; import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander; -import com.genersoft.iot.vmp.vmanager.bean.DeferredResultEx; import com.genersoft.iot.vmp.vmanager.bean.ErrorCode; +import com.genersoft.iot.vmp.vmanager.bean.WVPResult; import com.genersoft.iot.vmp.web.gb28181.dto.DeviceChannelExtend; import com.github.pagehelper.PageInfo; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.util.Assert; import org.springframework.util.ObjectUtils; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; @@ -23,9 +22,6 @@ import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; import org.springframework.web.context.request.async.DeferredResult; -import javax.sip.InvalidArgumentException; -import javax.sip.SipException; -import java.text.ParseException; import java.util.*; /** @@ -185,7 +181,7 @@ public class ApiDeviceController { * @return */ @GetMapping(value = "/fetchpreset") - private DeferredResult list(String serial, + private DeferredResult> list(String serial, @RequestParam(required = false)Integer channel, @RequestParam(required = false)String code, @RequestParam(required = false)Boolean fill, @@ -197,55 +193,34 @@ public class ApiDeviceController { } Device device = deviceService.getDeviceByDeviceId(serial); - String uuid = UUID.randomUUID().toString(); - String key = DeferredResultHolder.CALLBACK_CMD_PRESETQUERY + (ObjectUtils.isEmpty(code) ? serial : code); - DeferredResult result = new DeferredResult<> (timeout * 1000L); - DeferredResultEx deferredResultEx = new DeferredResultEx<>(result); - result.onTimeout(()->{ - log.warn("<模拟接口> 获取设备预置位超时"); - // 释放rtpserver - RequestMessage msg = new RequestMessage(); - msg.setId(uuid); - msg.setKey(key); - msg.setData("wait for presetquery timeout["+timeout+"s]"); - resultHolder.invokeResult(msg); - }); - if (resultHolder.exist(key, null)) { - return result; - } - - deferredResultEx.setFilter(filterResult->{ - List presetQuerySipReqList = (List)filterResult; - HashMap resultMap = new HashMap<>(); - resultMap.put("DeviceID", code); - resultMap.put("Result", "OK"); - resultMap.put("SumNum", presetQuerySipReqList.size()); - ArrayList> presetItemList = new ArrayList<>(presetQuerySipReqList.size()); - for (Preset presetQuerySipReq : presetQuerySipReqList) { - Map item = new HashMap<>(); - item.put("PresetID", presetQuerySipReq.getPresetId()); - item.put("PresetName", presetQuerySipReq.getPresetName()); - item.put("PresetEnable", true); - presetItemList.add(item); + Assert.notNull(device, "设备不存在"); + DeferredResult> deferredResult = new DeferredResult<> (timeout * 1000L); + deviceService.queryPreset(device, code, (resultCode, msg, data) -> { + if (resultCode == ErrorCode.SUCCESS.getCode()) { + List presetQuerySipReqList = (List)data; + HashMap resultMap = new HashMap<>(); + resultMap.put("DeviceID", code); + resultMap.put("Result", "OK"); + resultMap.put("SumNum", presetQuerySipReqList.size()); + ArrayList> presetItemList = new ArrayList<>(presetQuerySipReqList.size()); + for (Preset presetQuerySipReq : presetQuerySipReqList) { + Map item = new HashMap<>(); + item.put("PresetID", presetQuerySipReq.getPresetId()); + item.put("PresetName", presetQuerySipReq.getPresetName()); + item.put("PresetEnable", true); + presetItemList.add(item); + } + resultMap.put("PresetItemList",presetItemList ); + deferredResult.setResult(new WVPResult<>(resultCode, msg, resultMap)); + }else { + deferredResult.setResult(new WVPResult<>(resultCode, msg, null)); } - resultMap.put("PresetItemList",presetItemList ); - return resultMap; }); - resultHolder.put(key, uuid, deferredResultEx); - - try { - cmder.presetQuery(device, code, event -> { - RequestMessage msg = new RequestMessage(); - msg.setId(uuid); - msg.setKey(key); - msg.setData(String.format("获取设备预置位失败,错误码: %s, %s", event.statusCode, event.msg)); - resultHolder.invokeResult(msg); - }); - } catch (InvalidArgumentException | SipException | ParseException e) { - log.error("[命令发送失败] 获取设备预置位: {}", e.getMessage()); - throw new ControllerException(ErrorCode.ERROR100.getCode(), "命令发送失败: " + e.getMessage()); - } - return result; + deferredResult.onTimeout(()->{ + log.warn("[获取设备预置位] 超时, {}", device.getDeviceId()); + deferredResult.setResult(WVPResult.fail(ErrorCode.ERROR100.getCode(), "wait for presetquery timeout["+timeout+"s]")); + }); + return deferredResult; } }