diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/controller/DeviceConfig.java b/src/main/java/com/genersoft/iot/vmp/gb28181/controller/DeviceConfig.java index 15ae1b20..c94eb437 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/controller/DeviceConfig.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/controller/DeviceConfig.java @@ -7,17 +7,11 @@ package com.genersoft.iot.vmp.gb28181.controller; -import com.alibaba.fastjson2.JSONObject; -import com.genersoft.iot.vmp.conf.exception.ControllerException; import com.genersoft.iot.vmp.conf.security.JwtUtils; import com.genersoft.iot.vmp.gb28181.bean.BasicParam; import com.genersoft.iot.vmp.gb28181.bean.Device; import com.genersoft.iot.vmp.gb28181.service.IDeviceService; -import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder; -import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage; -import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander; import com.genersoft.iot.vmp.vmanager.bean.ErrorCode; -import com.genersoft.iot.vmp.vmanager.bean.StreamContent; import com.genersoft.iot.vmp.vmanager.bean.WVPResult; import io.swagger.v3.oas.annotations.Operation; import io.swagger.v3.oas.annotations.Parameter; @@ -26,15 +20,9 @@ import io.swagger.v3.oas.annotations.tags.Tag; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.util.Assert; -import org.springframework.util.ObjectUtils; import org.springframework.web.bind.annotation.*; import org.springframework.web.context.request.async.DeferredResult; -import javax.sip.InvalidArgumentException; -import javax.sip.SipException; -import java.text.ParseException; -import java.util.UUID; - @Slf4j @Tag(name = "国标设备配置") @RestController @@ -44,21 +32,12 @@ public class DeviceConfig { @Autowired private IDeviceService deviceService; - @Autowired - private SIPCommander cmder; - - @Autowired - private DeferredResultHolder resultHolder; - - /** - * 基本配置设置命令 - */ @GetMapping("/basicParam") @Operation(summary = "基本配置设置命令", security = @SecurityRequirement(name = JwtUtils.HEADER)) @Parameter(name = "basicParam", description = "基础配置参数", required = true) public DeferredResult> homePositionApi(BasicParam basicParam) { if (log.isDebugEnabled()) { - log.debug("报警复位API调用"); + log.debug("基本配置设置命令API调用"); } Assert.notNull(basicParam.getDeviceId(), "设备ID必须存在"); @@ -78,36 +57,36 @@ public class DeviceConfig { } - /** - * 设备配置查询请求API接口 - * @param deviceId 设备ID - * @param configType 配置类型 - * @param channelId 通道ID - * @return - */ - @Operation(summary = "设备配置查询请求", security = @SecurityRequirement(name = JwtUtils.HEADER)) + @Operation(summary = "设备配置查询", security = @SecurityRequirement(name = JwtUtils.HEADER)) @Parameter(name = "deviceId", description = "设备国标编号", required = true) @Parameter(name = "channelId", description = "通道国标编号", required = true) - @Parameter(name = "configType", description = "配置类型") + @Parameter(name = "configType", description = "配置类型, 可选值," + + "基本参数配置:BasicParam," + + "视频参数范围:VideoParamOpt, " + + "SVAC编码配置:SVACEncodeConfig, " + + "SVAC解码配置:SVACDecodeConfig。" + + "可同时查询多个配置类型,各类型以“/”分隔,") @GetMapping("/query/{deviceId}/{configType}") - public DeferredResult> configDownloadApi(@PathVariable String deviceId, + public DeferredResult> configDownloadApi(@PathVariable String deviceId, @PathVariable String configType, @RequestParam(required = false) String channelId) { if (log.isDebugEnabled()) { - log.debug("设备状态查询API调用"); + log.debug("设备配置查询请求API调用"); } - String key = DeferredResultHolder.CALLBACK_CMD_CONFIGDOWNLOAD + (ObjectUtils.isEmpty(channelId) ? deviceId : deviceId + channelId); - String uuid = UUID.randomUUID().toString(); Device device = deviceService.getDeviceByDeviceId(deviceId); Assert.notNull(device, "设备不存在"); - DeferredResult> result = deviceService.deviceConfigQuery(device, channelId, configType); + DeferredResult> deferredResult = new DeferredResult<>(); - result.onTimeout(() -> { - log.warn("[获取设备配置] 超时, {}", device.getDeviceId()); - result.setResult(WVPResult.fail(ErrorCode.ERROR100.getCode(), "超时")); + deviceService.deviceConfigQuery(device, channelId, configType, (code, msg, data) -> { + deferredResult.setResult(new WVPResult<>(code, msg, data)); }); - return result; + + deferredResult.onTimeout(() -> { + log.warn("[获取设备配置] 超时, {}", device.getDeviceId()); + deferredResult.setResult(WVPResult.fail(ErrorCode.ERROR100.getCode(), "超时")); + }); + return deferredResult; } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/event/MessageSubscribe.java b/src/main/java/com/genersoft/iot/vmp/gb28181/event/MessageSubscribe.java index 11f9e7f8..f86c8786 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/event/MessageSubscribe.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/event/MessageSubscribe.java @@ -1,21 +1,11 @@ package com.genersoft.iot.vmp.gb28181.event; -import com.genersoft.iot.vmp.gb28181.bean.DeviceNotFoundEvent; import com.genersoft.iot.vmp.gb28181.event.sip.MessageEvent; -import com.genersoft.iot.vmp.gb28181.event.sip.SipEvent; import com.genersoft.iot.vmp.vmanager.bean.ErrorCode; -import gov.nist.javax.sip.message.SIPRequest; -import gov.nist.javax.sip.message.SIPResponse; import lombok.extern.slf4j.Slf4j; -import org.apache.commons.lang3.ObjectUtils; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; -import javax.sip.DialogTerminatedEvent; -import javax.sip.ResponseEvent; -import javax.sip.TimeoutEvent; -import javax.sip.TransactionTerminatedEvent; -import javax.sip.header.WarningHeader; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.DelayQueue; @@ -33,18 +23,17 @@ public class MessageSubscribe { @Scheduled(fixedDelay = 200) //每200毫秒执行 public void execute(){ - if (delayQueue.isEmpty()) { - return; - } - try { - MessageEvent take = delayQueue.take(); - // 出现超时异常 - if(take.getCallback() != null) { - take.getCallback().run(ErrorCode.ERROR486.getCode(), "消息超时未回复", null); + while (!delayQueue.isEmpty()) { + try { + MessageEvent take = delayQueue.take(); + // 出现超时异常 + if(take.getCallback() != null) { + take.getCallback().run(ErrorCode.ERROR486.getCode(), "消息超时未回复", null); + } + subscribes.remove(take.getKey()); + } catch (InterruptedException e) { + throw new RuntimeException(e); } - subscribes.remove(take.getKey()); - } catch (InterruptedException e) { - throw new RuntimeException(e); } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/event/SipSubscribe.java b/src/main/java/com/genersoft/iot/vmp/gb28181/event/SipSubscribe.java index d29c77ea..02797003 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/event/SipSubscribe.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/event/SipSubscribe.java @@ -29,24 +29,24 @@ public class SipSubscribe { private final DelayQueue delayQueue = new DelayQueue<>(); + @Scheduled(fixedDelay = 200) //每200毫秒执行 public void execute(){ - if (delayQueue.isEmpty()) { - return; - } - try { - SipEvent take = delayQueue.take(); - // 出现超时异常 - if(take.getErrorEvent() != null) { - EventResult eventResult = new EventResult<>(); - eventResult.type = EventResultType.timeout; - eventResult.msg = "消息超时未回复"; - eventResult.statusCode = -1024; - take.getErrorEvent().response(eventResult); + while (!delayQueue.isEmpty()) { + try { + SipEvent take = delayQueue.take(); + // 出现超时异常 + if(take.getErrorEvent() != null) { + EventResult eventResult = new EventResult<>(); + eventResult.type = EventResultType.timeout; + eventResult.msg = "消息超时未回复"; + eventResult.statusCode = -1024; + take.getErrorEvent().response(eventResult); + } + subscribes.remove(take.getKey()); + } catch (InterruptedException e) { + throw new RuntimeException(e); } - subscribes.remove(take.getKey()); - } catch (InterruptedException e) { - throw new RuntimeException(e); } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/event/sip/MessageEvent.java b/src/main/java/com/genersoft/iot/vmp/gb28181/event/sip/MessageEvent.java index 67f135bd..e55fce36 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/event/sip/MessageEvent.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/event/sip/MessageEvent.java @@ -28,7 +28,7 @@ public class MessageEvent implements Delayed { @Override public long getDelay(@NotNull TimeUnit unit) { - return unit.convert(delay, TimeUnit.MILLISECONDS); + return unit.convert(delay - System.currentTimeMillis(), TimeUnit.MILLISECONDS); } @Override @@ -47,9 +47,9 @@ public class MessageEvent implements Delayed { messageEvent.deviceId = deviceId; messageEvent.callback = callback; if (delay == null) { - messageEvent.delay = 1000; + messageEvent.delay = System.currentTimeMillis() + 1000; }else { - messageEvent.delay = delay; + messageEvent.delay = System.currentTimeMillis() + delay; } return messageEvent; } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/event/sip/SipEvent.java b/src/main/java/com/genersoft/iot/vmp/gb28181/event/sip/SipEvent.java index e6af35ca..be20a54e 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/event/sip/SipEvent.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/event/sip/SipEvent.java @@ -32,13 +32,13 @@ public class SipEvent implements Delayed { sipEvent.setKey(key); sipEvent.setOkEvent(okEvent); sipEvent.setErrorEvent(errorEvent); - sipEvent.setDelay(delay); + sipEvent.setDelay(System.currentTimeMillis() + delay); return sipEvent; } @Override public long getDelay(@NotNull TimeUnit unit) { - return unit.convert(delay, TimeUnit.MILLISECONDS); + return unit.convert(delay - System.currentTimeMillis(), TimeUnit.MILLISECONDS); } @Override diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/service/IDeviceService.java b/src/main/java/com/genersoft/iot/vmp/gb28181/service/IDeviceService.java index 023dad9d..7e25d708 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/service/IDeviceService.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/service/IDeviceService.java @@ -174,7 +174,7 @@ public interface IDeviceService { void deviceBasicConfig(Device device, BasicParam basicParam, ErrorCallback callback); - DeferredResult> deviceConfigQuery(Device device, String channelId, String configType); + void deviceConfigQuery(Device device, String channelId, String configType, ErrorCallback callback); void teleboot(Device device); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/DeviceServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/DeviceServiceImpl.java index 8d8579ac..481e84ed 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/DeviceServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/DeviceServiceImpl.java @@ -1,7 +1,6 @@ package com.genersoft.iot.vmp.gb28181.service.impl; import com.alibaba.fastjson2.JSON; -import com.alibaba.fastjson2.JSONObject; import com.genersoft.iot.vmp.common.CommonCallback; import com.genersoft.iot.vmp.common.VideoManagerConstants; import com.genersoft.iot.vmp.common.enums.ChannelDataType; @@ -12,7 +11,6 @@ import com.genersoft.iot.vmp.gb28181.bean.*; import com.genersoft.iot.vmp.gb28181.dao.DeviceChannelMapper; import com.genersoft.iot.vmp.gb28181.dao.DeviceMapper; import com.genersoft.iot.vmp.gb28181.dao.PlatformChannelMapper; -import com.genersoft.iot.vmp.gb28181.event.MessageSubscribe; import com.genersoft.iot.vmp.gb28181.service.IDeviceService; import com.genersoft.iot.vmp.gb28181.service.IInviteStreamService; import com.genersoft.iot.vmp.gb28181.session.AudioBroadcastManager; @@ -20,8 +18,6 @@ import com.genersoft.iot.vmp.gb28181.session.SipInviteSessionManager; import com.genersoft.iot.vmp.gb28181.task.ISubscribeTask; import com.genersoft.iot.vmp.gb28181.task.impl.CatalogSubscribeTask; import com.genersoft.iot.vmp.gb28181.task.impl.MobilePositionSubscribeTask; -import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder; -import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage; import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander; import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.response.cmd.CatalogResponseMessageHandler; import com.genersoft.iot.vmp.media.bean.MediaServer; @@ -38,12 +34,9 @@ import com.github.pagehelper.PageHelper; import com.github.pagehelper.PageInfo; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.http.HttpStatus; -import org.springframework.http.ResponseEntity; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; import org.springframework.util.Assert; -import org.springframework.util.ObjectUtils; import org.springframework.web.context.request.async.DeferredResult; import javax.sip.InvalidArgumentException; @@ -51,7 +44,6 @@ import javax.sip.SipException; import java.text.ParseException; import java.time.Instant; import java.util.List; -import java.util.UUID; import java.util.Objects; import java.util.concurrent.TimeUnit; @@ -667,9 +659,7 @@ public class DeviceServiceImpl implements IDeviceService { } try { - sipCommander.deviceBasicConfigCmd(device, basicParam, event -> { - callback.run(ErrorCode.ERROR100.getCode(), "操作超时", null); - }); + sipCommander.deviceBasicConfigCmd(device, basicParam, callback); } catch (InvalidArgumentException | SipException | ParseException e) { log.error("[命令发送失败] 设备配置: {}", e.getMessage()); throw new ControllerException(ErrorCode.ERROR100.getCode(), "命令发送: " + e.getMessage()); @@ -677,25 +667,20 @@ public class DeviceServiceImpl implements IDeviceService { } @Override - public DeferredResult> deviceConfigQuery(Device device, String channelId, String configType) { + public void deviceConfigQuery(Device device, String channelId, String configType, ErrorCallback callback) { if (!userSetting.getServerId().equals(device.getServerId())) { WVPResult result = redisRpcService.deviceConfigQuery(device.getServerId(), device, channelId, configType); - DeferredResult> deferredResult = new DeferredResult<>(3 * 1000L); - deferredResult.setResult(result); - return deferredResult; + callback.run(result.getCode(), result.getMsg(), result.getData()); + return; } - DeferredResult> result = new DeferredResult<>(3 * 1000L); try { - sipCommander.deviceConfigQuery(device, channelId, configType, event -> { - result.setResult(WVPResult.fail(ErrorCode.ERROR100)); - }); + sipCommander.deviceConfigQuery(device, channelId, configType, callback); } catch (InvalidArgumentException | SipException | ParseException e) { log.error("[命令发送失败] 获取设备配置: {}", e.getMessage()); throw new ControllerException(ErrorCode.ERROR100.getCode(), "命令发送: " + e.getMessage()); } - return result; } @Override diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommander.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommander.java index c9f4cb1b..97c35dc8 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommander.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommander.java @@ -9,6 +9,7 @@ import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel; import com.genersoft.iot.vmp.gb28181.event.SipSubscribe; import com.genersoft.iot.vmp.media.event.hook.HookSubscribe; import com.genersoft.iot.vmp.media.bean.MediaServer; +import com.genersoft.iot.vmp.service.bean.ErrorCallback; import com.genersoft.iot.vmp.service.bean.SSRCInfo; import gov.nist.javax.sip.message.SIPRequest; @@ -230,7 +231,7 @@ public interface ISIPCommander { /** * 设备配置命令:basicParam */ - void deviceBasicConfigCmd(Device device, BasicParam basicParam, SipSubscribe.Event errorEvent) throws InvalidArgumentException, SipException, ParseException; + void deviceBasicConfigCmd(Device device, BasicParam basicParam, ErrorCallback callback) throws InvalidArgumentException, SipException, ParseException; /** * 查询设备状态 @@ -286,7 +287,7 @@ public interface ISIPCommander { * @param channelId 通道编码(可选) * @param configType 配置类型: */ - void deviceConfigQuery(Device device, String channelId, String configType, SipSubscribe.Event errorEvent) throws InvalidArgumentException, SipException, ParseException; + void deviceConfigQuery(Device device, String channelId, String configType, ErrorCallback callback) throws InvalidArgumentException, SipException, ParseException; /** * 查询设备预置位置 diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java index 55625ef8..cc58bc96 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java @@ -1,6 +1,5 @@ package com.genersoft.iot.vmp.gb28181.transmit.cmd.impl; -import com.alibaba.fastjson2.JSONObject; import com.genersoft.iot.vmp.common.InviteSessionType; import com.genersoft.iot.vmp.common.StreamInfo; import com.genersoft.iot.vmp.conf.SipConfig; @@ -871,7 +870,7 @@ public class SIPCommander implements ISIPCommander { * 设备配置命令:basicParam */ @Override - public void deviceBasicConfigCmd(Device device, BasicParam basicParam, SipSubscribe.Event errorEvent) throws InvalidArgumentException, SipException, ParseException { + public void deviceBasicConfigCmd(Device device, BasicParam basicParam, ErrorCallback callback) throws InvalidArgumentException, SipException, ParseException { int sn = (int) ((Math.random() * 9 + 1) * 100000); String cmdType = "DeviceConfig"; @@ -904,21 +903,14 @@ public class SIPCommander implements ISIPCommander { cmdXml.append("\r\n"); cmdXml.append("\r\n"); - ErrorCallback errorCallback = (code, msg, data) -> { - if (code != ErrorCode.SUCCESS.getCode()) { - SipSubscribe.EventResult eventResult = new SipSubscribe.EventResult<>(); - eventResult.type = SipSubscribe.EventResultType.failedResult; - eventResult.msg = msg; - eventResult.statusCode = code; - errorEvent.response(eventResult); - } - }; - MessageEvent messageEvent = MessageEvent.getInstance(cmdType, sn + "", channelId, 1000L, errorCallback); + MessageEvent messageEvent = MessageEvent.getInstance(cmdType, sn + "", channelId, 1000L, callback); messageSubscribe.addSubscribe(messageEvent); Request request = headerProvider.createMessageRequest(device, cmdXml.toString(), null, SipUtils.getNewFromTag(), null,sipSender.getNewCallIdHeader(sipLayer.getLocalIp(device.getLocalIp()),device.getTransport())); - sipSender.transmitRequest(sipLayer.getLocalIp(device.getLocalIp()), request, errorEvent); + sipSender.transmitRequest(sipLayer.getLocalIp(device.getLocalIp()), request, eventResult -> { + callback.run(ErrorCode.ERROR100.getCode(), "消息发送失败", null); + }); } /** @@ -1095,14 +1087,16 @@ public class SIPCommander implements ISIPCommander { * @param configType 配置类型: */ @Override - public void deviceConfigQuery(Device device, String channelId, String configType, SipSubscribe.Event errorEvent) throws InvalidArgumentException, SipException, ParseException { + public void deviceConfigQuery(Device device, String channelId, String configType, ErrorCallback callback) throws InvalidArgumentException, SipException, ParseException { + String cmdType = "ConfigDownload"; + int sn = (int) ((Math.random() * 9 + 1) * 100000); StringBuffer cmdXml = new StringBuffer(200); String charset = device.getCharset(); cmdXml.append("\r\n"); cmdXml.append("\r\n"); - cmdXml.append("ConfigDownload\r\n"); - cmdXml.append("" + (int) ((Math.random() * 9 + 1) * 100000) + "\r\n"); + cmdXml.append("" + cmdType + "\r\n"); + cmdXml.append("" + sn + "\r\n"); if (ObjectUtils.isEmpty(channelId)) { cmdXml.append("" + device.getDeviceId() + "\r\n"); } else { @@ -1111,10 +1105,13 @@ public class SIPCommander implements ISIPCommander { cmdXml.append("" + configType + "\r\n"); cmdXml.append("\r\n"); - + MessageEvent messageEvent = MessageEvent.getInstance(cmdType, sn + "", channelId, 1000L, callback); + messageSubscribe.addSubscribe(messageEvent); Request request = headerProvider.createMessageRequest(device, cmdXml.toString(), null, SipUtils.getNewFromTag(), null,sipSender.getNewCallIdHeader(sipLayer.getLocalIp(device.getLocalIp()),device.getTransport())); - sipSender.transmitRequest(sipLayer.getLocalIp(device.getLocalIp()), request, errorEvent); + sipSender.transmitRequest(sipLayer.getLocalIp(device.getLocalIp()), request, eventResult -> { + callback.run(ErrorCode.ERROR100.getCode(), "消息发送失败", null); + }); } /** diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/ResponseMessageHandler.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/ResponseMessageHandler.java index e806cbdb..6801622c 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/ResponseMessageHandler.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/ResponseMessageHandler.java @@ -3,7 +3,6 @@ package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.respon import com.genersoft.iot.vmp.gb28181.bean.Device; import com.genersoft.iot.vmp.gb28181.event.MessageSubscribe; import com.genersoft.iot.vmp.gb28181.event.sip.MessageEvent; -import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.IMessageHandler; import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.MessageHandlerAbstract; import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.MessageRequestProcessor; import com.genersoft.iot.vmp.vmanager.bean.ErrorCode; @@ -44,18 +43,16 @@ public class ResponseMessageHandler extends MessageHandlerAbstract implements In public void handMessageEvent(Element element, Object data) { String cmd = getText(element, "CmdType"); - IMessageHandler messageHandler = messageHandlerMap.get(cmd); - if (messageHandler == null) { - String sn = getText(element, "SN"); - MessageEvent subscribe = (MessageEvent)messageSubscribe.getSubscribe(cmd + sn); - if (subscribe != null) { - String result = getText(element, "Result"); - if ("OK".equalsIgnoreCase(result)) { - subscribe.getCallback().run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), data); - }else { - subscribe.getCallback().run(ErrorCode.ERROR100.getCode(), ErrorCode.ERROR100.getMsg(), result); - } + String sn = getText(element, "SN"); + MessageEvent subscribe = (MessageEvent)messageSubscribe.getSubscribe(cmd + sn); + if (subscribe != null) { + String result = getText(element, "Result"); + if (result == null || "OK".equalsIgnoreCase(result) || data != null) { + subscribe.getCallback().run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), data); + }else { + subscribe.getCallback().run(ErrorCode.ERROR100.getCode(), ErrorCode.ERROR100.getMsg(), result); } + messageSubscribe.removeSubscribe(cmd + sn); } } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/ConfigDownloadResponseMessageHandler.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/ConfigDownloadResponseMessageHandler.java index d7765132..6aa5cfa4 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/ConfigDownloadResponseMessageHandler.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/ConfigDownloadResponseMessageHandler.java @@ -5,12 +5,10 @@ import com.genersoft.iot.vmp.gb28181.bean.Device; import com.genersoft.iot.vmp.gb28181.bean.Platform; import com.genersoft.iot.vmp.gb28181.service.IDeviceService; import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder; -import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage; import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent; import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.IMessageHandler; import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.response.ResponseMessageHandler; import com.genersoft.iot.vmp.gb28181.utils.XmlUtil; -import com.genersoft.iot.vmp.vmanager.bean.WVPResult; import gov.nist.javax.sip.message.SIPRequest; import lombok.extern.slf4j.Slf4j; import org.dom4j.Element; @@ -24,8 +22,6 @@ import javax.sip.SipException; import javax.sip.message.Response; import java.text.ParseException; -import static com.genersoft.iot.vmp.gb28181.utils.XmlUtil.getText; - @Slf4j @Component public class ConfigDownloadResponseMessageHandler extends SIPRequestProcessorParent implements InitializingBean, IMessageHandler { @@ -49,8 +45,6 @@ public class ConfigDownloadResponseMessageHandler extends SIPRequestProcessorPar @Override public void handForDevice(RequestEvent evt, Device device, Element element) { - String channelId = getText(element, "DeviceID"); - try { // 回复200 OK responseAck((SIPRequest) evt.getRequest(), Response.OK); @@ -63,17 +57,33 @@ public class ConfigDownloadResponseMessageHandler extends SIPRequestProcessorPar if (log.isDebugEnabled()) { log.debug(json.toJSONString()); } + JSONObject jsonObject = new JSONObject(); + if (json.get("BasicParam") != null) { + jsonObject.put("BasicParam", json.getJSONObject("BasicParam")); + } + if (json.get("VideoParamOpt") != null) { + jsonObject.put("VideoParamOpt", json.getJSONObject("VideoParamOpt")); + } + if (json.get("SVACEncodeConfig") != null) { + jsonObject.put("SVACEncodeConfig", json.getJSONObject("SVACEncodeConfig")); + } + if (json.get("SVACDecodeConfig") != null) { + jsonObject.put("SVACDecodeConfig", json.getJSONObject("SVACDecodeConfig")); + } + + responseMessageHandler.handMessageEvent(element, jsonObject); + JSONObject basicParam = json.getJSONObject("BasicParam"); - Integer heartBeatInterval = basicParam.getInteger("HeartBeatInterval"); - Integer heartBeatCount = basicParam.getInteger("HeartBeatCount"); - Integer positionCapability = basicParam.getInteger("PositionCapability"); - device.setHeartBeatInterval(heartBeatInterval); - device.setHeartBeatCount(heartBeatCount); - device.setPositionCapability(positionCapability); - - deviceService.updateDeviceHeartInfo(device); - responseMessageHandler.handMessageEvent(element, basicParam); + if (basicParam != null) { + Integer heartBeatInterval = basicParam.getInteger("HeartBeatInterval"); + Integer heartBeatCount = basicParam.getInteger("HeartBeatCount"); + Integer positionCapability = basicParam.getInteger("PositionCapability"); + device.setHeartBeatInterval(heartBeatInterval); + device.setHeartBeatCount(heartBeatCount); + device.setPositionCapability(positionCapability); + deviceService.updateDeviceHeartInfo(device); + } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/DeviceConfigResponseMessageHandler.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/DeviceConfigResponseMessageHandler.java index 453d8066..645fe63e 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/DeviceConfigResponseMessageHandler.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/DeviceConfigResponseMessageHandler.java @@ -9,13 +9,19 @@ import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorP import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.IMessageHandler; import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.response.ResponseMessageHandler; import com.genersoft.iot.vmp.gb28181.utils.XmlUtil; +import gov.nist.javax.sip.message.SIPRequest; import lombok.extern.slf4j.Slf4j; import org.dom4j.Element; import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; +import javax.sip.InvalidArgumentException; import javax.sip.RequestEvent; +import javax.sip.SipException; +import javax.sip.message.Response; + +import java.text.ParseException; import static com.genersoft.iot.vmp.gb28181.utils.XmlUtil.getText; @@ -39,6 +45,12 @@ public class DeviceConfigResponseMessageHandler extends SIPRequestProcessorParen @Override public void handForDevice(RequestEvent evt, Device device, Element element) { JSONObject json = new JSONObject(); + try { + // 回复200 OK + responseAck((SIPRequest) evt.getRequest(), Response.OK); + } catch (SipException | InvalidArgumentException | ParseException e) { + log.error("[命令发送失败] 设备配置查询: {}", e.getMessage()); + } XmlUtil.node2Json(element, json); String channelId = getText(element, "DeviceID"); if (log.isDebugEnabled()) { diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcDeviceController.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcDeviceController.java index 3d836977..e18d4516 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcDeviceController.java +++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcDeviceController.java @@ -1,6 +1,5 @@ package com.genersoft.iot.vmp.service.redisMsg.control; -import com.alibaba.fastjson2.JSON; import com.alibaba.fastjson2.JSONObject; import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.conf.exception.ControllerException; @@ -114,6 +113,7 @@ public class RedisRpcDeviceController extends RpcController { @RedisRpcMapping("deviceConfigQuery") public RedisRpcResponse deviceConfigQuery(RedisRpcRequest request) { + JSONObject paramJson = JSONObject.parseObject(request.getParam().toString()); String deviceId = paramJson.getString("deviceId"); String channelId = paramJson.getString("channelId"); @@ -127,21 +127,13 @@ public class RedisRpcDeviceController extends RpcController { response.setBody("param error"); return response; } - DeferredResult> deferredResult = deviceService.deviceConfigQuery(device, channelId, configType); - deferredResult.onCompletion(() ->{ - response.setStatusCode(ErrorCode.SUCCESS.getCode()); - response.setBody(deferredResult.getResult()); + deviceService.deviceConfigQuery(device, channelId, configType, (code, msg, data) -> { + response.setStatusCode(code); + response.setBody(new WVPResult<>(code, msg, data)); // 手动发送结果 sendResponse(response); }); - deferredResult.onTimeout(() -> { - log.warn("[设备配置]操作超时, 设备未返回应答指令, {}", deviceId); - response.setStatusCode(ErrorCode.SUCCESS.getCode()); - response.setBody(WVPResult.fail(ErrorCode.ERROR100.getCode(), "超时")); - // 手动发送结果 - sendResponse(response); - }); - return response; + return null; } @RedisRpcMapping("teleboot")