diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/BasicParam.java b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/BasicParam.java new file mode 100644 index 00000000..1ec7a721 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/BasicParam.java @@ -0,0 +1,49 @@ +package com.genersoft.iot.vmp.gb28181.bean; + +import io.swagger.v3.oas.annotations.media.Schema; +import lombok.Data; + +/** + * 基础配置 + */ +@Data +@Schema(description = "基础配置") +public class BasicParam { + + @Schema(description = "设备ID") + private String deviceId; + + @Schema(description = "通道ID,如果时对设备配置直接设置同设备ID一样即可") + private String channelId; + + @Schema(description = "名称") + private String name; + + @Schema(description = "注册过期时间") + private String expiration; + + @Schema(description = "心跳间隔时间") + private Integer heartBeatInterval; + + @Schema(description = "心跳超时次数") + private Integer heartBeatCount; + + @Schema(description = "定位功能支持情况。取值:0-不支持;1-支持 GPS定位;2-支持北斗定位(可选,默认取值为0)," + + "用于接受配置查询结果, 基础配置时无效") + private Integer positionCapability; + + @Schema(description = "经度(可选),用于接受配置查询结果, 基础配置时无效") + private Double longitude; + + @Schema(description = "纬度(可选),用于接受配置查询结果, 基础配置时无效") + private Double latitude; + + public static BasicParam getInstance(String name, String expiration, Integer heartBeatInterval, Integer heartBeatCount) { + BasicParam basicParam = new BasicParam(); + basicParam.setName(name); + basicParam.setExpiration(expiration); + basicParam.setHeartBeatInterval(heartBeatInterval); + basicParam.setHeartBeatCount(heartBeatCount); + return basicParam; + } +} diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/controller/DeviceConfig.java b/src/main/java/com/genersoft/iot/vmp/gb28181/controller/DeviceConfig.java index bf9ec95d..15ae1b20 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/controller/DeviceConfig.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/controller/DeviceConfig.java @@ -10,12 +10,14 @@ package com.genersoft.iot.vmp.gb28181.controller; import com.alibaba.fastjson2.JSONObject; import com.genersoft.iot.vmp.conf.exception.ControllerException; import com.genersoft.iot.vmp.conf.security.JwtUtils; +import com.genersoft.iot.vmp.gb28181.bean.BasicParam; import com.genersoft.iot.vmp.gb28181.bean.Device; import com.genersoft.iot.vmp.gb28181.service.IDeviceService; import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder; import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage; import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander; import com.genersoft.iot.vmp.vmanager.bean.ErrorCode; +import com.genersoft.iot.vmp.vmanager.bean.StreamContent; import com.genersoft.iot.vmp.vmanager.bean.WVPResult; import io.swagger.v3.oas.annotations.Operation; import io.swagger.v3.oas.annotations.Parameter; @@ -49,45 +51,30 @@ public class DeviceConfig { private DeferredResultHolder resultHolder; /** - * 看守位控制命令API接口 - * @param deviceId 设备ID - * @param channelId 通道ID - * @param name 名称 - * @param expiration 到期时间 - * @param heartBeatInterval 心跳间隔 - * @param heartBeatCount 心跳计数 - * @return + * 基本配置设置命令 */ - @GetMapping("/basicParam/{deviceId}") + @GetMapping("/basicParam") @Operation(summary = "基本配置设置命令", security = @SecurityRequirement(name = JwtUtils.HEADER)) - @Parameter(name = "deviceId", description = "设备国标编号", required = true) - @Parameter(name = "channelId", description = "通道国标编号", required = true) - @Parameter(name = "name", description = "名称") - @Parameter(name = "expiration", description = "到期时间") - @Parameter(name = "heartBeatInterval", description = "心跳间隔") - @Parameter(name = "heartBeatCount", description = "心跳计数") - public DeferredResult homePositionApi(@PathVariable String deviceId, - @RequestParam(required = false) String channelId, - @RequestParam(required = false) String name, - @RequestParam(required = false) String expiration, - @RequestParam(required = false) String heartBeatInterval, - @RequestParam(required = false) String heartBeatCount) { + @Parameter(name = "basicParam", description = "基础配置参数", required = true) + public DeferredResult> homePositionApi(BasicParam basicParam) { if (log.isDebugEnabled()) { log.debug("报警复位API调用"); } - Device device = deviceService.getDeviceByDeviceId(deviceId); - Assert.notNull(device, "设备不存在"); - DeferredResult result = deviceService.deviceBasicConfig(device, channelId, name, expiration, heartBeatInterval, heartBeatCount); + Assert.notNull(basicParam.getDeviceId(), "设备ID必须存在"); - result.onTimeout(() -> { - log.warn(String.format("设备配置操作超时, 设备未返回应答指令")); - JSONObject json = new JSONObject(); - json.put("DeviceID", device.getDeviceId()); - json.put("Status", "Timeout"); - json.put("Description", "设备配置操作超时, 设备未返回应答指令"); - result.setResult(json.toString()); + Device device = deviceService.getDeviceByDeviceId(basicParam.getDeviceId()); + Assert.notNull(device, "设备不存在"); + + DeferredResult> deferredResult = new DeferredResult<>(); + deviceService.deviceBasicConfig(device, basicParam, (code, msg, data) -> { + deferredResult.setResult(new WVPResult<>(code, msg, data)); }); - return result; + + deferredResult.onTimeout(() -> { + log.warn("[设备配置] 超时, {}", device.getDeviceId()); + deferredResult.setResult(WVPResult.fail(ErrorCode.ERROR100.getCode(), "超时")); + }); + return deferredResult; } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/controller/DeviceQuery.java b/src/main/java/com/genersoft/iot/vmp/gb28181/controller/DeviceQuery.java index b768a7ac..1a3c4fca 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/controller/DeviceQuery.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/controller/DeviceQuery.java @@ -368,32 +368,32 @@ public class DeviceQuery { return result; - String key = DeferredResultHolder.CALLBACK_CMD_ALARM + deviceId; - String uuid = UUID.randomUUID().toString(); - try { - cmder.alarmInfoQuery(device, startPriority, endPriority, alarmMethod, alarmType, startTime, endTime, event -> { - RequestMessage msg = new RequestMessage(); - msg.setId(uuid); - msg.setKey(key); - msg.setData(String.format("设备报警查询失败,错误码: %s, %s",event.statusCode, event.msg)); - resultHolder.invokeResult(msg); - }); - } catch (InvalidArgumentException | SipException | ParseException e) { - log.error("[命令发送失败] 设备报警查询: {}", e.getMessage()); - throw new ControllerException(ErrorCode.ERROR100.getCode(), "命令发送失败: " + e.getMessage()); - } - DeferredResult> result = new DeferredResult> (3 * 1000L); - result.onTimeout(()->{ - log.warn(String.format("设备报警查询超时")); - // 释放rtpserver - RequestMessage msg = new RequestMessage(); - msg.setId(uuid); - msg.setKey(key); - msg.setData("设备报警查询超时"); - resultHolder.invokeResult(msg); - }); - resultHolder.put(DeferredResultHolder.CALLBACK_CMD_ALARM + deviceId, uuid, result); - return result; +// String key = DeferredResultHolder.CALLBACK_CMD_ALARM + deviceId; +// String uuid = UUID.randomUUID().toString(); +// try { +// cmder.alarmInfoQuery(device, startPriority, endPriority, alarmMethod, alarmType, startTime, endTime, event -> { +// RequestMessage msg = new RequestMessage(); +// msg.setId(uuid); +// msg.setKey(key); +// msg.setData(String.format("设备报警查询失败,错误码: %s, %s",event.statusCode, event.msg)); +// resultHolder.invokeResult(msg); +// }); +// } catch (InvalidArgumentException | SipException | ParseException e) { +// log.error("[命令发送失败] 设备报警查询: {}", e.getMessage()); +// throw new ControllerException(ErrorCode.ERROR100.getCode(), "命令发送失败: " + e.getMessage()); +// } +// DeferredResult> result = new DeferredResult> (3 * 1000L); +// result.onTimeout(()->{ +// log.warn(String.format("设备报警查询超时")); +// // 释放rtpserver +// RequestMessage msg = new RequestMessage(); +// msg.setId(uuid); +// msg.setKey(key); +// msg.setData("设备报警查询超时"); +// resultHolder.invokeResult(msg); +// }); +// resultHolder.put(DeferredResultHolder.CALLBACK_CMD_ALARM + deviceId, uuid, result); +// return result; } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/event/MessageSubscribe.java b/src/main/java/com/genersoft/iot/vmp/gb28181/event/MessageSubscribe.java new file mode 100755 index 00000000..11f9e7f8 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/event/MessageSubscribe.java @@ -0,0 +1,84 @@ +package com.genersoft.iot.vmp.gb28181.event; + +import com.genersoft.iot.vmp.gb28181.bean.DeviceNotFoundEvent; +import com.genersoft.iot.vmp.gb28181.event.sip.MessageEvent; +import com.genersoft.iot.vmp.gb28181.event.sip.SipEvent; +import com.genersoft.iot.vmp.vmanager.bean.ErrorCode; +import gov.nist.javax.sip.message.SIPRequest; +import gov.nist.javax.sip.message.SIPResponse; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.ObjectUtils; +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.stereotype.Component; + +import javax.sip.DialogTerminatedEvent; +import javax.sip.ResponseEvent; +import javax.sip.TimeoutEvent; +import javax.sip.TransactionTerminatedEvent; +import javax.sip.header.WarningHeader; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.DelayQueue; + +/** + * @author lin + */ +@Slf4j +@Component +public class MessageSubscribe { + + private final Map> subscribes = new ConcurrentHashMap<>(); + + private final DelayQueue> delayQueue = new DelayQueue<>(); + + @Scheduled(fixedDelay = 200) //每200毫秒执行 + public void execute(){ + if (delayQueue.isEmpty()) { + return; + } + try { + MessageEvent take = delayQueue.take(); + // 出现超时异常 + if(take.getCallback() != null) { + take.getCallback().run(ErrorCode.ERROR486.getCode(), "消息超时未回复", null); + } + subscribes.remove(take.getKey()); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + + + public void addSubscribe(MessageEvent event) { + MessageEvent messageEvent = subscribes.get(event.getKey()); + if (messageEvent != null) { + subscribes.remove(event.getKey()); + delayQueue.remove(messageEvent); + } + subscribes.put(event.getKey(), event); + delayQueue.offer(event); + } + + public MessageEvent getSubscribe(String key) { + return subscribes.get(key); + } + + public void removeSubscribe(String key) { + if(key == null){ + return; + } + MessageEvent messageEvent = subscribes.get(key); + if (messageEvent != null) { + subscribes.remove(key); + delayQueue.remove(messageEvent); + } + } + + public boolean isEmpty(){ + return subscribes.isEmpty(); + } + + public Integer size() { + return subscribes.size(); + } +} diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/event/SipSubscribe.java b/src/main/java/com/genersoft/iot/vmp/gb28181/event/SipSubscribe.java index a2034c72..d29c77ea 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/event/SipSubscribe.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/event/SipSubscribe.java @@ -78,7 +78,9 @@ public class SipSubscribe { // 消息发送失败 cmdSendFailEvent, // 消息发送失败 - failedToGetPort + failedToGetPort, + // 收到失败的回复 + failedResult } public static class EventResult{ diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/event/sip/MessageEvent.java b/src/main/java/com/genersoft/iot/vmp/gb28181/event/sip/MessageEvent.java new file mode 100644 index 00000000..67f135bd --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/event/sip/MessageEvent.java @@ -0,0 +1,56 @@ +package com.genersoft.iot.vmp.gb28181.event.sip; + +import com.genersoft.iot.vmp.service.bean.ErrorCallback; +import lombok.Data; +import org.jetbrains.annotations.NotNull; + +import java.util.concurrent.Delayed; +import java.util.concurrent.TimeUnit; + +@Data +public class MessageEvent implements Delayed { + /** + * 超时时间(单位: 毫秒) + */ + private long delay; + + private String cmdType; + + private String sn; + + private String deviceId; + + private String result; + + private T t; + + private ErrorCallback callback; + + @Override + public long getDelay(@NotNull TimeUnit unit) { + return unit.convert(delay, TimeUnit.MILLISECONDS); + } + + @Override + public int compareTo(@NotNull Delayed o) { + return (int) (this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS)); + } + + public String getKey(){ + return cmdType + sn; + } + + public static MessageEvent getInstance(String cmdType, String sn, String deviceId, Long delay, ErrorCallback callback){ + MessageEvent messageEvent = new MessageEvent<>(); + messageEvent.cmdType = cmdType; + messageEvent.sn = sn; + messageEvent.deviceId = deviceId; + messageEvent.callback = callback; + if (delay == null) { + messageEvent.delay = 1000; + }else { + messageEvent.delay = delay; + } + return messageEvent; + } +} diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/service/IDeviceService.java b/src/main/java/com/genersoft/iot/vmp/gb28181/service/IDeviceService.java index dfcaa993..023dad9d 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/service/IDeviceService.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/service/IDeviceService.java @@ -1,9 +1,11 @@ package com.genersoft.iot.vmp.gb28181.service; import com.genersoft.iot.vmp.common.CommonCallback; +import com.genersoft.iot.vmp.gb28181.bean.BasicParam; import com.genersoft.iot.vmp.gb28181.bean.Device; import com.genersoft.iot.vmp.gb28181.bean.SipTransactionInfo; import com.genersoft.iot.vmp.gb28181.bean.SyncStatus; +import com.genersoft.iot.vmp.service.bean.ErrorCallback; import com.genersoft.iot.vmp.vmanager.bean.ResourceBaseInfo; import com.genersoft.iot.vmp.vmanager.bean.WVPResult; import com.github.pagehelper.PageInfo; @@ -170,7 +172,7 @@ public interface IDeviceService { WVPResult devicesSync(Device device); - DeferredResult> deviceBasicConfig(Device device, String channelId, String name, String expiration, String heartBeatInterval, String heartBeatCount); + void deviceBasicConfig(Device device, BasicParam basicParam, ErrorCallback callback); DeferredResult> deviceConfigQuery(Device device, String channelId, String configType); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/service/IGbChannelPlayService.java b/src/main/java/com/genersoft/iot/vmp/gb28181/service/IGbChannelPlayService.java index 2c8bca33..1d6e1ebe 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/service/IGbChannelPlayService.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/service/IGbChannelPlayService.java @@ -13,14 +13,12 @@ public interface IGbChannelPlayService { void stopPlay(InviteSessionType type, CommonGBChannel channel, String stream); - void play(CommonGBChannel channel, Platform platform, ErrorCallback callback); void play(CommonGBChannel channel, Platform platform, Boolean record, ErrorCallback callback); void playGbDeviceChannel(CommonGBChannel channel, Boolean record, ErrorCallback callback); void stopPlayDeviceChannel(InviteSessionType type, CommonGBChannel channel, String stream); - void playProxy(CommonGBChannel channel, ErrorCallback callback); void playProxy(CommonGBChannel channel, Boolean record, ErrorCallback callback); void stopPlayProxy(CommonGBChannel channel); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/DeviceServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/DeviceServiceImpl.java index 7ddbc270..8d8579ac 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/DeviceServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/DeviceServiceImpl.java @@ -12,6 +12,7 @@ import com.genersoft.iot.vmp.gb28181.bean.*; import com.genersoft.iot.vmp.gb28181.dao.DeviceChannelMapper; import com.genersoft.iot.vmp.gb28181.dao.DeviceMapper; import com.genersoft.iot.vmp.gb28181.dao.PlatformChannelMapper; +import com.genersoft.iot.vmp.gb28181.event.MessageSubscribe; import com.genersoft.iot.vmp.gb28181.service.IDeviceService; import com.genersoft.iot.vmp.gb28181.service.IInviteStreamService; import com.genersoft.iot.vmp.gb28181.session.AudioBroadcastManager; @@ -26,6 +27,7 @@ import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.respons import com.genersoft.iot.vmp.media.bean.MediaServer; import com.genersoft.iot.vmp.media.service.IMediaServerService; import com.genersoft.iot.vmp.service.ISendRtpServerService; +import com.genersoft.iot.vmp.service.bean.ErrorCallback; import com.genersoft.iot.vmp.service.redisMsg.IRedisRpcService; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.utils.DateUtil; @@ -655,26 +657,23 @@ public class DeviceServiceImpl implements IDeviceService { } @Override - public DeferredResult> deviceBasicConfig(Device device, String channelId, String name, String expiration, - String heartBeatInterval, String heartBeatCount) { + public void deviceBasicConfig(Device device, BasicParam basicParam, ErrorCallback callback) { if (!userSetting.getServerId().equals(device.getServerId())) { - WVPResult result = redisRpcService.deviceBasicConfig(device.getServerId(), device, channelId, name, expiration, - heartBeatInterval, heartBeatCount); - DeferredResult> deferredResult = new DeferredResult<>(3 * 1000L); - deferredResult.setResult(result); - return deferredResult; + WVPResult result = redisRpcService.deviceBasicConfig(device.getServerId(), device, basicParam); + if (result.getCode() == ErrorCode.SUCCESS.getCode()) { + callback.run(result.getCode(), result.getMsg(), result.getData()); + } + return; } - DeferredResult> result = new DeferredResult<>(3 * 1000L); try { - sipCommander.deviceBasicConfigCmd(device, channelId, name, expiration, heartBeatInterval, heartBeatCount, event -> { - result.setResult(WVPResult.fail(ErrorCode.ERROR100.getCode(), "操作超时")); + sipCommander.deviceBasicConfigCmd(device, basicParam, event -> { + callback.run(ErrorCode.ERROR100.getCode(), "操作超时", null); }); } catch (InvalidArgumentException | SipException | ParseException e) { log.error("[命令发送失败] 设备配置: {}", e.getMessage()); throw new ControllerException(ErrorCode.ERROR100.getCode(), "命令发送: " + e.getMessage()); } - return result; } @Override diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlayServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlayServiceImpl.java index c9089b4b..e443f68b 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlayServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlayServiceImpl.java @@ -302,7 +302,7 @@ public class PlayServiceImpl implements IPlayService { log.warn("[点播] 未找到可用的zlm deviceId: {},channelId:{}", device.getDeviceId(), channel.getDeviceId()); throw new ControllerException(ErrorCode.ERROR100.getCode(), "未找到可用的zlm"); } - play(mediaServerItem, device, channel, null, callback); + play(mediaServerItem, device, channel, null, userSetting.getRecordSip(), callback); } @Override @@ -1661,11 +1661,11 @@ public class PlayServiceImpl implements IPlayService { } return; } - inviteStreamService.removeInviteInfo( "rtp", inviteInfo); + inviteStreamService.removeInviteInfo(inviteInfo); if (InviteSessionStatus.ok == inviteInfo.getStatus()) { try { log.info("[停止点播/回放/下载] {}/{}", device.getDeviceId(), channel.getDeviceId()); - cmder.streamByeCmd(device, channel.getDeviceId(), inviteInfo.getStream(), null, null); + cmder.streamByeCmd(device, channel.getDeviceId(), "rtp", inviteInfo.getStream(), null, null); } catch (InvalidArgumentException | SipException | ParseException | SsrcTransactionNotFoundException e) { log.error("[命令发送失败] 停止点播/回放/下载, 发送BYE: {}", e.getMessage()); throw new ControllerException(ErrorCode.ERROR100.getCode(), "命令发送失败: " + e.getMessage()); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommander.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommander.java index b21dba9c..c9f4cb1b 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommander.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommander.java @@ -229,15 +229,8 @@ public interface ISIPCommander { /** * 设备配置命令:basicParam - * - * @param device 视频设备 - * @param channelId 通道编码(可选) - * @param name 设备/通道名称(可选) - * @param expiration 注册过期时间(可选) - * @param heartBeatInterval 心跳间隔时间(可选) - * @param heartBeatCount 心跳超时次数(可选) - */ - void deviceBasicConfigCmd(Device device, String channelId, String name, String expiration, String heartBeatInterval, String heartBeatCount, SipSubscribe.Event errorEvent) throws InvalidArgumentException, SipException, ParseException; + */ + void deviceBasicConfigCmd(Device device, BasicParam basicParam, SipSubscribe.Event errorEvent) throws InvalidArgumentException, SipException, ParseException; /** * 查询设备状态 diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java index eed06646..55625ef8 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java @@ -1,5 +1,6 @@ package com.genersoft.iot.vmp.gb28181.transmit.cmd.impl; +import com.alibaba.fastjson2.JSONObject; import com.genersoft.iot.vmp.common.InviteSessionType; import com.genersoft.iot.vmp.common.StreamInfo; import com.genersoft.iot.vmp.conf.SipConfig; @@ -7,7 +8,9 @@ import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.conf.exception.SsrcTransactionNotFoundException; import com.genersoft.iot.vmp.gb28181.SipLayer; import com.genersoft.iot.vmp.gb28181.bean.*; +import com.genersoft.iot.vmp.gb28181.event.MessageSubscribe; import com.genersoft.iot.vmp.gb28181.event.SipSubscribe; +import com.genersoft.iot.vmp.gb28181.event.sip.MessageEvent; import com.genersoft.iot.vmp.gb28181.session.SipInviteSessionManager; import com.genersoft.iot.vmp.gb28181.transmit.SIPSender; import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander; @@ -19,8 +22,10 @@ import com.genersoft.iot.vmp.media.event.hook.Hook; import com.genersoft.iot.vmp.media.event.hook.HookSubscribe; import com.genersoft.iot.vmp.media.event.hook.HookType; import com.genersoft.iot.vmp.media.service.IMediaServerService; +import com.genersoft.iot.vmp.service.bean.ErrorCallback; import com.genersoft.iot.vmp.service.bean.SSRCInfo; import com.genersoft.iot.vmp.utils.DateUtil; +import com.genersoft.iot.vmp.vmanager.bean.ErrorCode; import gov.nist.javax.sip.message.SIPRequest; import gov.nist.javax.sip.message.SIPResponse; import lombok.extern.slf4j.Slf4j; @@ -71,6 +76,9 @@ public class SIPCommander implements ISIPCommander { @Autowired private IMediaServerService mediaServerService; + @Autowired + private MessageSubscribe messageSubscribe; + /** @@ -861,52 +869,53 @@ public class SIPCommander implements ISIPCommander { /** * 设备配置命令:basicParam - * - * @param device 视频设备 - * @param channelId 通道编码(可选) - * @param name 设备/通道名称(可选) - * @param expiration 注册过期时间(可选) - * @param heartBeatInterval 心跳间隔时间(可选) - * @param heartBeatCount 心跳超时次数(可选) */ @Override - public void deviceBasicConfigCmd(Device device, String channelId, String name, String expiration, - String heartBeatInterval, String heartBeatCount, SipSubscribe.Event errorEvent) throws InvalidArgumentException, SipException, ParseException { + public void deviceBasicConfigCmd(Device device, BasicParam basicParam, SipSubscribe.Event errorEvent) throws InvalidArgumentException, SipException, ParseException { + int sn = (int) ((Math.random() * 9 + 1) * 100000); + String cmdType = "DeviceConfig"; StringBuffer cmdXml = new StringBuffer(200); String charset = device.getCharset(); cmdXml.append("\r\n"); cmdXml.append("\r\n"); - cmdXml.append("DeviceConfig\r\n"); - cmdXml.append("" + (int) ((Math.random() * 9 + 1) * 100000) + "\r\n"); + cmdXml.append("" + cmdType + "\r\n"); + cmdXml.append("" + sn + "\r\n"); + String channelId = basicParam.getChannelId(); if (ObjectUtils.isEmpty(channelId)) { - cmdXml.append("" + device.getDeviceId() + "\r\n"); - } else { - cmdXml.append("" + channelId + "\r\n"); + channelId = device.getDeviceId(); } + cmdXml.append("" + channelId + "\r\n"); cmdXml.append("\r\n"); - if (!ObjectUtils.isEmpty(name)) { - cmdXml.append("" + name + "\r\n"); + if (!ObjectUtils.isEmpty(basicParam.getName())) { + cmdXml.append("" + basicParam.getName() + "\r\n"); } - if (NumericUtil.isInteger(expiration)) { - if (Integer.valueOf(expiration) > 0) { - cmdXml.append("" + expiration + "\r\n"); + if (NumericUtil.isInteger(basicParam.getExpiration())) { + if (Integer.parseInt(basicParam.getExpiration()) > 0) { + cmdXml.append("" + basicParam.getExpiration() + "\r\n"); } } - if (NumericUtil.isInteger(heartBeatInterval)) { - if (Integer.valueOf(heartBeatInterval) > 0) { - cmdXml.append("" + heartBeatInterval + "\r\n"); - } + if (basicParam.getHeartBeatInterval() != null && basicParam.getHeartBeatInterval() > 0) { + cmdXml.append("" + basicParam.getHeartBeatInterval() + "\r\n"); } - if (NumericUtil.isInteger(heartBeatCount)) { - if (Integer.valueOf(heartBeatCount) > 0) { - cmdXml.append("" + heartBeatCount + "\r\n"); - } + if (basicParam.getHeartBeatCount() != null && basicParam.getHeartBeatCount() > 0) { + cmdXml.append("" + basicParam.getHeartBeatCount() + "\r\n"); } cmdXml.append("\r\n"); cmdXml.append("\r\n"); + ErrorCallback errorCallback = (code, msg, data) -> { + if (code != ErrorCode.SUCCESS.getCode()) { + SipSubscribe.EventResult eventResult = new SipSubscribe.EventResult<>(); + eventResult.type = SipSubscribe.EventResultType.failedResult; + eventResult.msg = msg; + eventResult.statusCode = code; + errorEvent.response(eventResult); + } + }; + MessageEvent messageEvent = MessageEvent.getInstance(cmdType, sn + "", channelId, 1000L, errorCallback); + messageSubscribe.addSubscribe(messageEvent); Request request = headerProvider.createMessageRequest(device, cmdXml.toString(), null, SipUtils.getNewFromTag(), null,sipSender.getNewCallIdHeader(sipLayer.getLocalIp(device.getLocalIp()),device.getTransport())); sipSender.transmitRequest(sipLayer.getLocalIp(device.getLocalIp()), request, errorEvent); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/ResponseMessageHandler.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/ResponseMessageHandler.java index 18da9cde..e806cbdb 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/ResponseMessageHandler.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/ResponseMessageHandler.java @@ -1,11 +1,21 @@ package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.response; +import com.genersoft.iot.vmp.gb28181.bean.Device; +import com.genersoft.iot.vmp.gb28181.event.MessageSubscribe; +import com.genersoft.iot.vmp.gb28181.event.sip.MessageEvent; +import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.IMessageHandler; import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.MessageHandlerAbstract; import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.MessageRequestProcessor; +import com.genersoft.iot.vmp.vmanager.bean.ErrorCode; +import org.dom4j.Element; import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; +import javax.sip.RequestEvent; + +import static com.genersoft.iot.vmp.gb28181.utils.XmlUtil.getText; + /** * 命令类型: 请求动作的应答 * 命令类型: 设备控制, 报警通知, 设备目录信息查询, 目录信息查询, 目录收到, 设备信息查询, 设备状态信息查询 ...... @@ -18,8 +28,34 @@ public class ResponseMessageHandler extends MessageHandlerAbstract implements In @Autowired private MessageRequestProcessor messageRequestProcessor; + @Autowired + private MessageSubscribe messageSubscribe; + @Override public void afterPropertiesSet() throws Exception { messageRequestProcessor.addHandler(messageType, this); } + + @Override + public void handForDevice(RequestEvent evt, Device device, Element element) { + super.handForDevice(evt, device, element); + handMessageEvent(element, null); + } + + public void handMessageEvent(Element element, Object data) { + String cmd = getText(element, "CmdType"); + IMessageHandler messageHandler = messageHandlerMap.get(cmd); + if (messageHandler == null) { + String sn = getText(element, "SN"); + MessageEvent subscribe = (MessageEvent)messageSubscribe.getSubscribe(cmd + sn); + if (subscribe != null) { + String result = getText(element, "Result"); + if ("OK".equalsIgnoreCase(result)) { + subscribe.getCallback().run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), data); + }else { + subscribe.getCallback().run(ErrorCode.ERROR100.getCode(), ErrorCode.ERROR100.getMsg(), result); + } + } + } + } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/ConfigDownloadResponseMessageHandler.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/ConfigDownloadResponseMessageHandler.java index d124f236..d7765132 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/ConfigDownloadResponseMessageHandler.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/ConfigDownloadResponseMessageHandler.java @@ -10,6 +10,7 @@ import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorP import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.IMessageHandler; import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.response.ResponseMessageHandler; import com.genersoft.iot.vmp.gb28181.utils.XmlUtil; +import com.genersoft.iot.vmp.vmanager.bean.WVPResult; import gov.nist.javax.sip.message.SIPRequest; import lombok.extern.slf4j.Slf4j; import org.dom4j.Element; @@ -49,12 +50,7 @@ public class ConfigDownloadResponseMessageHandler extends SIPRequestProcessorPar @Override public void handForDevice(RequestEvent evt, Device device, Element element) { String channelId = getText(element, "DeviceID"); - String key; - if (device.getDeviceId().equals(channelId)) { - key = DeferredResultHolder.CALLBACK_CMD_CONFIGDOWNLOAD + device.getDeviceId(); - }else { - key = DeferredResultHolder.CALLBACK_CMD_CONFIGDOWNLOAD + device.getDeviceId() + channelId; - } + try { // 回复200 OK responseAck((SIPRequest) evt.getRequest(), Response.OK); @@ -76,13 +72,7 @@ public class ConfigDownloadResponseMessageHandler extends SIPRequestProcessorPar device.setPositionCapability(positionCapability); deviceService.updateDeviceHeartInfo(device); - - - - RequestMessage msg = new RequestMessage(); - msg.setKey(key); - msg.setData(json); - deferredResultHolder.invokeAllResult(msg); + responseMessageHandler.handMessageEvent(element, basicParam); } diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/IRedisRpcService.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/IRedisRpcService.java index e7f9d6e8..fffb3ba2 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/IRedisRpcService.java +++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/IRedisRpcService.java @@ -2,11 +2,9 @@ package com.genersoft.iot.vmp.service.redisMsg; import com.genersoft.iot.vmp.common.CommonCallback; import com.genersoft.iot.vmp.common.StreamInfo; -import com.genersoft.iot.vmp.gb28181.bean.Device; -import com.genersoft.iot.vmp.gb28181.bean.Platform; -import com.genersoft.iot.vmp.gb28181.bean.SendRtpInfo; -import com.genersoft.iot.vmp.gb28181.bean.SyncStatus; +import com.genersoft.iot.vmp.gb28181.bean.*; import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent; +import com.genersoft.iot.vmp.service.bean.ErrorCallback; import com.genersoft.iot.vmp.vmanager.bean.WVPResult; public interface IRedisRpcService { @@ -40,7 +38,7 @@ public interface IRedisRpcService { SyncStatus getChannelSyncStatus(String serverId, String deviceId); - WVPResult deviceBasicConfig(String serverId, Device device, String channelId, String name, String expiration, String heartBeatInterval, String heartBeatCount); + WVPResult deviceBasicConfig(String serverId, Device device, BasicParam basicParam); WVPResult deviceConfigQuery(String serverId, Device device, String channelId, String configType); diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcDeviceController.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcDeviceController.java index 0a4867e6..3d836977 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcDeviceController.java +++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcDeviceController.java @@ -8,6 +8,7 @@ import com.genersoft.iot.vmp.conf.redis.RedisRpcConfig; import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcMessage; import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcRequest; import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcResponse; +import com.genersoft.iot.vmp.gb28181.bean.BasicParam; import com.genersoft.iot.vmp.gb28181.bean.Device; import com.genersoft.iot.vmp.gb28181.bean.SyncStatus; import com.genersoft.iot.vmp.gb28181.service.IDeviceService; @@ -92,14 +93,9 @@ public class RedisRpcDeviceController extends RpcController { @RedisRpcMapping("deviceBasicConfig") public RedisRpcResponse deviceBasicConfig(RedisRpcRequest request) { - JSONObject paramJson = JSONObject.parseObject(request.getParam().toString()); - String deviceId = paramJson.getString("deviceId"); - String channelId = paramJson.getString("channelId"); - String name = paramJson.getString("configType"); - String expiration = paramJson.getString("expiration"); - String heartBeatInterval = paramJson.getString("heartBeatInterval"); + BasicParam basicParam = JSONObject.parseObject(request.getParam().toString(), BasicParam.class); - Device device = deviceService.getDeviceByDeviceId(deviceId); + Device device = deviceService.getDeviceByDeviceId(basicParam.getDeviceId()); RedisRpcResponse response = request.getResponse(); if (device == null || !userSetting.getServerId().equals(device.getServerId())) { @@ -107,22 +103,13 @@ public class RedisRpcDeviceController extends RpcController { response.setBody("param error"); return response; } - DeferredResult> deferredResult = deviceService.deviceBasicConfig(device, channelId, name, - expiration, heartBeatInterval, heartBeatInterval); - deferredResult.onCompletion(() ->{ - response.setStatusCode(ErrorCode.SUCCESS.getCode()); - response.setBody(deferredResult.getResult()); - // 手动发送结果 - sendResponse(response); - }); - deferredResult.onTimeout(() -> { - log.warn(String.format("设备配置操作超时, 设备未返回应答指令")); - response.setStatusCode(ErrorCode.SUCCESS.getCode()); - response.setBody(WVPResult.fail(ErrorCode.ERROR100.getCode(), "操作超时, 设备未应答")); - // 手动发送结果 - sendResponse(response); - }); - return response; + deviceService.deviceBasicConfig(device, basicParam, (code, msg, data) -> { + response.setStatusCode(code); + response.setBody(new WVPResult<>(code, msg, data)); + // 手动发送结果 + sendResponse(response); + }); + return null; } @RedisRpcMapping("deviceConfigQuery") diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/service/RedisRpcServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/service/RedisRpcServiceImpl.java index d365d47e..d12f8ff9 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/service/RedisRpcServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/service/RedisRpcServiceImpl.java @@ -9,10 +9,7 @@ import com.genersoft.iot.vmp.conf.exception.ControllerException; import com.genersoft.iot.vmp.conf.redis.RedisRpcConfig; import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcRequest; import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcResponse; -import com.genersoft.iot.vmp.gb28181.bean.Device; -import com.genersoft.iot.vmp.gb28181.bean.Platform; -import com.genersoft.iot.vmp.gb28181.bean.SendRtpInfo; -import com.genersoft.iot.vmp.gb28181.bean.SyncStatus; +import com.genersoft.iot.vmp.gb28181.bean.*; import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent; import com.genersoft.iot.vmp.gb28181.session.SSRCFactory; import com.genersoft.iot.vmp.media.event.hook.Hook; @@ -267,16 +264,8 @@ public class RedisRpcServiceImpl implements IRedisRpcService { } @Override - public WVPResult deviceBasicConfig(String serverId, Device device, String channelId, String name, String expiration, - String heartBeatInterval, String heartBeatCount) { - JSONObject jsonObject = new JSONObject(); - jsonObject.put("device", device.getDeviceId()); - jsonObject.put("channelId", channelId); - jsonObject.put("name", name); - jsonObject.put("expiration", expiration); - jsonObject.put("heartBeatInterval", heartBeatInterval); - jsonObject.put("heartBeatCount", heartBeatCount); - RedisRpcRequest request = buildRequest("device/deviceBasicConfig", jsonObject); + public WVPResult deviceBasicConfig(String serverId, Device device, BasicParam basicParam) { + RedisRpcRequest request = buildRequest("device/deviceBasicConfig", JSONObject.toJSONString(basicParam)); request.setToId(serverId); RedisRpcResponse response = redisRpcConfig.request(request, 50, TimeUnit.MILLISECONDS); return JSON.parseObject(response.getBody().toString(), WVPResult.class); diff --git a/src/main/java/com/genersoft/iot/vmp/streamPush/service/IStreamPushPlayService.java b/src/main/java/com/genersoft/iot/vmp/streamPush/service/IStreamPushPlayService.java index 0b42a2e3..657e5cb2 100644 --- a/src/main/java/com/genersoft/iot/vmp/streamPush/service/IStreamPushPlayService.java +++ b/src/main/java/com/genersoft/iot/vmp/streamPush/service/IStreamPushPlayService.java @@ -8,4 +8,5 @@ public interface IStreamPushPlayService { void stop(String app, String stream); + void stop(Integer id); } diff --git a/src/main/java/com/genersoft/iot/vmp/streamPush/service/impl/StreamPushPlayServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/streamPush/service/impl/StreamPushPlayServiceImpl.java index 68e54343..6f44f36e 100644 --- a/src/main/java/com/genersoft/iot/vmp/streamPush/service/impl/StreamPushPlayServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/streamPush/service/impl/StreamPushPlayServiceImpl.java @@ -127,4 +127,16 @@ public class StreamPushPlayServiceImpl implements IStreamPushPlayService { Assert.notNull(mediaServer, "未找到使用的节点"); mediaServerService.closeStreams(mediaServer, app, stream); } + + @Override + public void stop(Integer id) { + StreamPush streamPush = streamPushMapper.queryOne(id); + if (streamPush == null || !streamPush.isPushing()) { + return; + } + String mediaServerId = streamPush.getMediaServerId(); + MediaServer mediaServer = mediaServerService.getOne(mediaServerId); + Assert.notNull(mediaServer, "未找到使用的节点"); + mediaServerService.closeStreams(mediaServer, streamPush.getApp(), streamPush.getStream()); + } }