diff --git a/sql/2.6.6-2.6.7更新.sql b/sql/2.6.6-2.6.7更新.sql new file mode 100755 index 00000000..df595a81 --- /dev/null +++ b/sql/2.6.6-2.6.7更新.sql @@ -0,0 +1,6 @@ +alter table device + add asMessageChannel int default 0; + +alter table parent_platform + add asMessageChannel int default 0; + diff --git a/src/main/resources/db/migration/V2.6.7_20230201__初始化.sql b/sql/初始化.sql similarity index 99% rename from src/main/resources/db/migration/V2.6.7_20230201__初始化.sql rename to sql/初始化.sql index 7a15f905..a8dfaff1 100644 --- a/src/main/resources/db/migration/V2.6.7_20230201__初始化.sql +++ b/sql/初始化.sql @@ -47,6 +47,7 @@ CREATE TABLE `device` ( `mobilePositionSubmissionInterval` int DEFAULT '5', `subscribeCycleForAlarm` int DEFAULT NULL, `ssrcCheck` int DEFAULT '0', + `asMessageChannel` int DEFAULT '0', `geoCoordSys` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL, `treeType` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL, `custom_name` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL, @@ -329,6 +330,7 @@ CREATE TABLE `parent_platform` ( `catalogId` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL, `ptz` int DEFAULT NULL, `rtcp` int DEFAULT NULL, + `asMessageChannel` int DEFAULT '0', `status` bit(1) DEFAULT NULL, `startOfflinePush` int DEFAULT '0', `administrativeDivision` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL, diff --git a/src/main/java/com/genersoft/iot/vmp/conf/UserSetting.java b/src/main/java/com/genersoft/iot/vmp/conf/UserSetting.java index e944476f..130d147d 100644 --- a/src/main/java/com/genersoft/iot/vmp/conf/UserSetting.java +++ b/src/main/java/com/genersoft/iot/vmp/conf/UserSetting.java @@ -48,6 +48,7 @@ public class UserSetting { private Boolean syncChannelOnDeviceOnline = Boolean.FALSE; private Boolean sipLog = Boolean.FALSE; + private Boolean sendToPlatformsWhenIdLost = Boolean.FALSE; private String serverId = "000000"; @@ -228,4 +229,12 @@ public class UserSetting { public void setAllowedOrigins(List allowedOrigins) { this.allowedOrigins = allowedOrigins; } + + public Boolean getSendToPlatformsWhenIdLost() { + return sendToPlatformsWhenIdLost; + } + + public void setSendToPlatformsWhenIdLost(Boolean sendToPlatformsWhenIdLost) { + this.sendToPlatformsWhenIdLost = sendToPlatformsWhenIdLost; + } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/Device.java b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/Device.java index e454896e..83b9c6af 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/Device.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/Device.java @@ -188,6 +188,9 @@ public class Device { @Schema(description = "SIP交互IP(设备访问平台的IP)") private String localIp; + @Schema(description = "是否作为消息通道") + private boolean asMessageChannel; + public String getDeviceId() { return deviceId; @@ -428,4 +431,12 @@ public class Device { public void setKeepaliveIntervalTime(int keepaliveIntervalTime) { this.keepaliveIntervalTime = keepaliveIntervalTime; } + + public boolean isAsMessageChannel() { + return asMessageChannel; + } + + public void setAsMessageChannel(boolean asMessageChannel) { + this.asMessageChannel = asMessageChannel; + } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/ParentPlatform.java b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/ParentPlatform.java index b056cc71..01a11eb6 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/ParentPlatform.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/ParentPlatform.java @@ -189,6 +189,9 @@ public class ParentPlatform { @Schema(description = "树类型 国标规定了两种树的展现方式 行政区划 CivilCode 和业务分组:BusinessGrou") private String treeType; + @Schema(description = "是否作为消息通道") + private boolean asMessageChannel; + public Integer getId() { return id; } @@ -428,4 +431,12 @@ public class ParentPlatform { public void setTreeType(String treeType) { this.treeType = treeType; } + + public boolean isAsMessageChannel() { + return asMessageChannel; + } + + public void setAsMessageChannel(boolean asMessageChannel) { + this.asMessageChannel = asMessageChannel; + } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/RecordInfo.java b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/RecordInfo.java index 41aebf5d..7ff52830 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/RecordInfo.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/RecordInfo.java @@ -1,6 +1,8 @@ package com.genersoft.iot.vmp.gb28181.bean; +import io.swagger.v3.oas.annotations.media.Schema; + import java.time.Instant; import java.util.List; @@ -9,22 +11,29 @@ import java.util.List; * @author: swwheihei * @date: 2020年5月8日 下午2:05:56 */ +@Schema(description = "设备录像查询结果信息") public class RecordInfo { + @Schema(description = "设备编号") private String deviceId; + @Schema(description = "通道编号") private String channelId; + @Schema(description = "命令序列号") private String sn; + @Schema(description = "设备名称") private String name; - + + @Schema(description = "列表总数") private int sumNum; private int count; private Instant lastTime; - + + @Schema(description = "") private List recordList; public String getDeviceId() { diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/RecordItem.java b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/RecordItem.java index a47147a1..07e559c8 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/RecordItem.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/RecordItem.java @@ -2,9 +2,9 @@ package com.genersoft.iot.vmp.gb28181.bean; import com.genersoft.iot.vmp.utils.DateUtil; +import io.swagger.v3.oas.annotations.media.Schema; import org.jetbrains.annotations.NotNull; -import java.text.ParseException; import java.time.Instant; import java.time.temporal.TemporalAccessor; @@ -13,26 +13,37 @@ import java.time.temporal.TemporalAccessor; * @author: swwheihei * @date: 2020年5月8日 下午2:06:54 */ +@Schema(description = "设备录像详情") public class RecordItem implements Comparable{ + @Schema(description = "设备编号") private String deviceId; - + + @Schema(description = "名称") private String name; - + + @Schema(description = "文件路径名 (可选)") private String filePath; + @Schema(description = "录像文件大小,单位:Byte(可选)") private String fileSize; + @Schema(description = "录像地址(可选)") private String address; - + + @Schema(description = "录像开始时间(可选)") private String startTime; - + + @Schema(description = "录像结束时间(可选)") private String endTime; - + + @Schema(description = "保密属性(必选)缺省为0;0:不涉密,1:涉密") private int secrecy; - + + @Schema(description = "录像产生类型(可选)time或alarm 或 manua") private String type; - + + @Schema(description = "录像触发者ID(可选)") private String recorderId; public String getDeviceId() { diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommander.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommander.java index 1f1d10a6..fdd2288f 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommander.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommander.java @@ -122,7 +122,7 @@ public interface ISIPCommander { */ void downloadStreamCmd(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, Device device, String channelId, String startTime, String endTime, int downloadSpeed, InviteStreamCallback inviteStreamCallback, InviteStreamCallback hookEvent, - SipSubscribe.Event errorEvent) throws InvalidArgumentException, SipException, ParseException; + SipSubscribe.Event errorEvent,SipSubscribe.Event okEvent) throws InvalidArgumentException, SipException, ParseException; /** * 视频流停止 @@ -221,7 +221,6 @@ public interface ISIPCommander { * * @param device 视频设备 * @param channelId 通道id,非通道则是设备本身 - * @param frontCmd 上级平台的指令,如果存在则直接下发 * @param enabled 看守位使能:1 = 开启,0 = 关闭 * @param resetTime 自动归位时间间隔,开启看守位时使用,单位:秒(s) * @param presetIndex 调用预置位编号,开启看守位时使用,取值范围0~255 diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java index dabe1c87..ecd23202 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java @@ -470,8 +470,9 @@ public class SIPCommander implements ISIPCommander { */ @Override public void downloadStreamCmd(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, Device device, String channelId, - String startTime, String endTime, int downloadSpeed, InviteStreamCallback inviteStreamCallback, InviteStreamCallback hookEvent, - SipSubscribe.Event errorEvent) throws InvalidArgumentException, SipException, ParseException { + String startTime, String endTime, int downloadSpeed, + InviteStreamCallback inviteStreamCallback, InviteStreamCallback hookEvent, + SipSubscribe.Event errorEvent,SipSubscribe.Event okEvent) throws InvalidArgumentException, SipException, ParseException { logger.info("{} 分配的ZLM为: {} [{}:{}]", ssrcInfo.getStream(), mediaServerItem.getId(), mediaServerItem.getIp(), ssrcInfo.getPort()); String sdpIp; @@ -540,11 +541,14 @@ public class SIPCommander implements ISIPCommander { content.append("a=downloadspeed:" + downloadSpeed + "\r\n"); content.append("y=" + ssrcInfo.getSsrc() + "\r\n");//ssrc - + logger.debug("此时请求下载信令的ssrc===>{}",ssrcInfo.getSsrc()); HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed("rtp", ssrcInfo.getStream(), true, null, mediaServerItem.getId()); // 添加订阅 + CallIdHeader newCallIdHeader = sipSender.getNewCallIdHeader(sipLayer.getLocalIp(device.getLocalIp()), device.getTransport()); + String callId=newCallIdHeader.getCallId(); subscribe.addSubscribe(hookSubscribe, (MediaServerItem mediaServerItemInUse, JSONObject json) -> { - hookEvent.call(new InviteStreamInfo(mediaServerItem, json,sipSender.getNewCallIdHeader(sipLayer.getLocalIp(device.getLocalIp()),device.getTransport()).getCallId(), "rtp", ssrcInfo.getStream())); + logger.debug("sipc 添加订阅===callId {}",callId); + hookEvent.call(new InviteStreamInfo(mediaServerItem, json,callId, "rtp", ssrcInfo.getStream())); subscribe.removeSubscribe(hookSubscribe); hookSubscribe.getContent().put("regist", false); hookSubscribe.getContent().put("schema", "rtsp"); @@ -553,7 +557,7 @@ public class SIPCommander implements ISIPCommander { (MediaServerItem mediaServerItemForEnd, JSONObject jsonForEnd) -> { logger.info("[录像]下载结束, 发送BYE"); try { - streamByeCmd(device, channelId, ssrcInfo.getStream(),sipSender.getNewCallIdHeader(sipLayer.getLocalIp(device.getLocalIp()),device.getTransport()).getCallId()); + streamByeCmd(device, channelId, ssrcInfo.getStream(),callId); } catch (InvalidArgumentException | ParseException | SipException | SsrcTransactionNotFoundException e) { logger.error("[录像]下载结束, 发送BYE失败 {}", e.getMessage()); @@ -561,15 +565,24 @@ public class SIPCommander implements ISIPCommander { }); }); - Request request = headerProvider.createPlaybackInviteRequest(device, channelId, content.toString(), null, SipUtils.getNewFromTag(), null,sipSender.getNewCallIdHeader(sipLayer.getLocalIp(device.getLocalIp()),device.getTransport()), ssrcInfo.getSsrc()); + Request request = headerProvider.createPlaybackInviteRequest(device, channelId, content.toString(), null, SipUtils.getNewFromTag(), null,newCallIdHeader, ssrcInfo.getSsrc()); if (inviteStreamCallback != null) { - inviteStreamCallback.call(new InviteStreamInfo(mediaServerItem, null,sipSender.getNewCallIdHeader(sipLayer.getLocalIp(device.getLocalIp()),device.getTransport()).getCallId(), "rtp", ssrcInfo.getStream())); + inviteStreamCallback.call(new InviteStreamInfo(mediaServerItem, null,callId, "rtp", ssrcInfo.getStream())); } - sipSender.transmitRequest(sipLayer.getLocalIp(device.getLocalIp()), request, errorEvent, okEvent -> { - ResponseEvent responseEvent = (ResponseEvent) okEvent.event; + sipSender.transmitRequest(sipLayer.getLocalIp(device.getLocalIp()), request, errorEvent, event -> { + ResponseEvent responseEvent = (ResponseEvent) event.event; SIPResponse response = (SIPResponse) responseEvent.getResponse(); - streamSession.put(device.getDeviceId(), channelId, response.getCallIdHeader().getCallId(), ssrcInfo.getStream(), ssrcInfo.getSsrc(), mediaServerItem.getId(), response, VideoStreamSessionManager.SessionType.download); + String contentString =new String(response.getRawContent()); + int ssrcIndex = contentString.indexOf("y="); + String ssrc=ssrcInfo.getSsrc(); + if (ssrcIndex >= 0) { + ssrc = contentString.substring(ssrcIndex + 2, ssrcIndex + 12); + } + logger.debug("接收到的下载响应ssrc====>{}",ssrcInfo.getSsrc()); + logger.debug("接收到的下载响应ssrc====>{}",ssrc); + streamSession.put(device.getDeviceId(), channelId, response.getCallIdHeader().getCallId(), ssrcInfo.getStream(), ssrc, mediaServerItem.getId(), response, VideoStreamSessionManager.SessionType.download); + okEvent.response(event); }); } @@ -801,7 +814,6 @@ public class SIPCommander implements ISIPCommander { * * @param device 视频设备 * @param channelId 通道id,非通道则是设备本身 - * @param frontCmd 上级平台的指令,如果存在则直接下发 * @param enabled 看守位使能:1 = 开启,0 = 关闭 * @param resetTime 自动归位时间间隔,开启看守位时使用,单位:秒(s) * @param presetIndex 调用预置位编号,开启看守位时使用,取值范围0~255 @@ -1376,7 +1388,7 @@ public class SIPCommander implements ISIPCommander { if (device == null) { return; } - logger.info("[发送 报警通知] {}/{}->{},{}", device.getDeviceId(), deviceAlarm.getChannelId(), + logger.info("[发送报警通知]设备: {}/{}->{},{}", device.getDeviceId(), deviceAlarm.getChannelId(), deviceAlarm.getLongitude(), deviceAlarm.getLatitude()); String characterSet = device.getCharset(); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java index 64152415..b9d2c4cf 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java @@ -402,7 +402,7 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform { if (parentPlatform == null) { return; } - logger.info("[发送报警通知] {}/{}->{},{}: {}", parentPlatform.getServerGBId(), deviceAlarm.getChannelId(), + logger.info("[发送报警通知]平台: {}/{}->{},{}: {}", parentPlatform.getServerGBId(), deviceAlarm.getChannelId(), deviceAlarm.getLongitude(), deviceAlarm.getLatitude(), JSON.toJSONString(deviceAlarm)); String characterSet = parentPlatform.getCharacterSet(); StringBuffer deviceStatusXml = new StringBuffer(600); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/RecordInfoResponseMessageHandler.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/RecordInfoResponseMessageHandler.java index 6d8d8f56..93c9d2f7 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/RecordInfoResponseMessageHandler.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/RecordInfoResponseMessageHandler.java @@ -163,7 +163,11 @@ public class RecordInfoResponseMessageHandler extends SIPRequestProcessorParent public void releaseRequest(String deviceId, String sn,RecordInfo recordInfo){ String key = DeferredResultHolder.CALLBACK_CMD_RECORDINFO + deviceId + sn; // 对数据进行排序 - Collections.sort(recordInfo.getRecordList()); + if(recordInfo!=null && recordInfo.getRecordList()!=null) { + Collections.sort(recordInfo.getRecordList()); + }else{ + recordInfo.setRecordList(new ArrayList<>()); + } RequestMessage msg = new RequestMessage(); msg.setKey(key); diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java index a07a47cd..7a8ac98d 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java @@ -256,6 +256,7 @@ public class ZLMHttpHookListener { return result; } + /** * rtsp/rtmp流注册或注销时触发此事件;此事件对回复不敏感。 */ @@ -279,9 +280,12 @@ public class ZLMHttpHookListener { subscribe.response(mediaInfo, json); } } - // 流消失移除redis play + List tracks = param.getTracks(); + // TODO 重构此处逻辑 + if (param.isRegist()) { + // 处理流注册的鉴权信息 if (param.getOriginType() == OriginType.RTMP_PUSH.ordinal() || param.getOriginType() == OriginType.RTSP_PUSH.ordinal() || param.getOriginType() == OriginType.RTC_PUSH.ordinal()) { @@ -300,16 +304,15 @@ public class ZLMHttpHookListener { } if ("rtsp".equals(param.getSchema())) { + // 更新流媒体负载信息 if (param.isRegist()) { mediaServerService.addCount(param.getMediaServerId()); } else { mediaServerService.removeCount(param.getMediaServerId()); } - if (param.getOriginType() == OriginType.PULL.ordinal() - || param.getOriginType() == OriginType.FFMPEG_PULL.ordinal()) { - // 设置拉流代理上线/离线 - streamProxyService.updateStatus(param.isRegist(), param.getApp(), param.getStream()); - } + // 设置拉流代理上线/离线 + streamProxyService.updateStatus(param.isRegist(), param.getApp(), param.getStream()); + if ("rtp".equals(param.getApp()) && !param.isRegist()) { StreamInfo streamInfo = redisCatchStorage.queryPlayByStreamId(param.getStream()); if (streamInfo != null) { diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java index b8e7c8e5..8b460861 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java @@ -635,23 +635,75 @@ public class PlayServiceImpl implements IPlayService { hookCallBack.call(downloadResult); streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream()); }; - + InviteStreamCallback hookEvent = (InviteStreamInfo inviteStreamInfo) -> { + logger.info("收到订阅消息: " + inviteStreamInfo.getCallId()); + dynamicTask.stop(downLoadTimeOutTaskKey); + StreamInfo streamInfo = onPublishHandler(inviteStreamInfo.getMediaServerItem(), inviteStreamInfo.getResponse(), deviceId, channelId); + streamInfo.setStartTime(startTime); + streamInfo.setEndTime(endTime); + redisCatchStorage.startDownload(streamInfo, inviteStreamInfo.getCallId()); + downloadResult.setCode(ErrorCode.SUCCESS.getCode()); + downloadResult.setMsg(ErrorCode.SUCCESS.getMsg()); + downloadResult.setData(streamInfo); + downloadResult.setMediaServerItem(inviteStreamInfo.getMediaServerItem()); + downloadResult.setResponse(inviteStreamInfo.getResponse()); + hookCallBack.call(downloadResult); + }; try { cmder.downloadStreamCmd(mediaServerItem, ssrcInfo, device, channelId, startTime, endTime, downloadSpeed, infoCallBack, - inviteStreamInfo -> { - logger.info("收到订阅消息: " + inviteStreamInfo.getResponse().toJSONString()); - dynamicTask.stop(downLoadTimeOutTaskKey); - StreamInfo streamInfo = onPublishHandler(inviteStreamInfo.getMediaServerItem(), inviteStreamInfo.getResponse(), deviceId, channelId); - streamInfo.setStartTime(startTime); - streamInfo.setEndTime(endTime); - redisCatchStorage.startDownload(streamInfo, inviteStreamInfo.getCallId()); - downloadResult.setCode(ErrorCode.SUCCESS.getCode()); - downloadResult.setMsg(ErrorCode.SUCCESS.getMsg()); - downloadResult.setData(streamInfo); - downloadResult.setMediaServerItem(inviteStreamInfo.getMediaServerItem()); - downloadResult.setResponse(inviteStreamInfo.getResponse()); - hookCallBack.call(downloadResult); - }, errorEvent); + hookEvent, errorEvent, eventResult -> + { + if (eventResult.type == SipSubscribe.EventResultType.response) { + ResponseEvent responseEvent = (ResponseEvent) eventResult.event; + String contentString = new String(responseEvent.getResponse().getRawContent()); + // 获取ssrc + int ssrcIndex = contentString.indexOf("y="); + // 检查是否有y字段 + if (ssrcIndex >= 0) { + //ssrc规定长度为10字节,不取余下长度以避免后续还有“f=”字段 TODO 后续对不规范的非10位ssrc兼容 + String ssrcInResponse = contentString.substring(ssrcIndex + 2, ssrcIndex + 12); + // 查询到ssrc不一致且开启了ssrc校验则需要针对处理 + if (ssrcInfo.getSsrc().equals(ssrcInResponse)) { + return; + } + logger.info("[回放消息] 收到invite 200, 发现下级自定义了ssrc: {}", ssrcInResponse); + if (!mediaServerItem.isRtpEnable() || device.isSsrcCheck()) { + logger.info("[回放消息] SSRC修正 {}->{}", ssrcInfo.getSsrc(), ssrcInResponse); + + if (!mediaServerItem.getSsrcConfig().checkSsrc(ssrcInResponse)) { + // ssrc 不可用 + // 释放ssrc + mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc()); + streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream()); + eventResult.msg = "下级自定义了ssrc,但是此ssrc不可用"; + eventResult.statusCode = 400; + errorEvent.response(eventResult); + return; + } + + // 单端口模式streamId也有变化,需要重新设置监听 + if (!mediaServerItem.isRtpEnable()) { + // 添加订阅 + HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed("rtp", ssrcInfo.getStream(), true, "rtsp", mediaServerItem.getId()); + subscribe.removeSubscribe(hookSubscribe); + hookSubscribe.getContent().put("stream", String.format("%08x", Integer.parseInt(ssrcInResponse)).toUpperCase()); + subscribe.addSubscribe(hookSubscribe, (MediaServerItem mediaServerItemInUse, JSONObject response) -> { + logger.info("[ZLM HOOK] ssrc修正后收到订阅消息: " + response.toJSONString()); + dynamicTask.stop(downLoadTimeOutTaskKey); + // hook响应 + onPublishHandlerForPlayback(mediaServerItemInUse, response, device.getDeviceId(), channelId, hookCallBack); + hookEvent.call(new InviteStreamInfo(mediaServerItem, null, eventResult.callId, "rtp", ssrcInfo.getStream())); + }); + } + // 关闭rtp server + mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream()); + // 重新开启ssrc server + mediaServerService.openRTPServer(mediaServerItem, ssrcInfo.getStream(), ssrcInResponse, device.isSsrcCheck(), true, ssrcInfo.getPort()); + } + } + } + + }); } catch (InvalidArgumentException | SipException | ParseException e) { logger.error("[命令发送失败] 录像下载: {}", e.getMessage()); diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisAlarmMsgListener.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisAlarmMsgListener.java index 1cc754d2..43528ecc 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisAlarmMsgListener.java +++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisAlarmMsgListener.java @@ -1,6 +1,7 @@ package com.genersoft.iot.vmp.service.redisMsg; import com.alibaba.fastjson2.JSON; +import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.gb28181.bean.*; import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander; import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform; @@ -44,8 +45,12 @@ public class RedisAlarmMsgListener implements MessageListener { @Autowired private ThreadPoolTaskExecutor taskExecutor; + @Autowired + private UserSetting userSetting; + @Override public void onMessage(@NotNull Message message, byte[] bytes) { + // 消息示例: PUBLISH alarm_receive '{ "gbId": "", "alarmSn": 1, "alarmType": "111", "alarmDescription": "222", }' logger.info("收到来自REDIS的ALARM通知: {}", new String(message.getBody())); boolean isEmpty = taskQueue.isEmpty(); taskQueue.offer(message); @@ -74,17 +79,44 @@ public class RedisAlarmMsgListener implements MessageListener { deviceAlarm.setLatitude(0); if (ObjectUtils.isEmpty(gbId)) { - // 发送给所有的上级 - List parentPlatforms = storage.queryEnableParentPlatformList(true); - if (parentPlatforms.size() > 0) { - for (ParentPlatform parentPlatform : parentPlatforms) { + if (userSetting.getSendToPlatformsWhenIdLost()) { + // 发送给所有的上级 + List parentPlatforms = storage.queryEnableParentPlatformList(true); + if (parentPlatforms.size() > 0) { + for (ParentPlatform parentPlatform : parentPlatforms) { + try { + commanderForPlatform.sendAlarmMessage(parentPlatform, deviceAlarm); + } catch (SipException | InvalidArgumentException | ParseException e) { + logger.error("[命令发送失败] 国标级联 发送报警: {}", e.getMessage()); + } + } + } + }else { + // 获取开启了消息推送的设备和平台 + List parentPlatforms = storage.queryEnablePlatformListWithAsMessageChannel(); + if (parentPlatforms.size() > 0) { + for (ParentPlatform parentPlatform : parentPlatforms) { + try { + commanderForPlatform.sendAlarmMessage(parentPlatform, deviceAlarm); + } catch (SipException | InvalidArgumentException | ParseException e) { + logger.error("[命令发送失败] 国标级联 发送报警: {}", e.getMessage()); + } + } + } + + } + // 获取开启了消息推送的设备和平台 + List devices = storage.queryDeviceWithAsMessageChannel(); + if (devices.size() > 0) { + for (Device device : devices) { try { - commanderForPlatform.sendAlarmMessage(parentPlatform, deviceAlarm); - } catch (SipException | InvalidArgumentException | ParseException e) { - logger.error("[命令发送失败] 国标级联 发送报警: {}", e.getMessage()); + commander.sendAlarmMessage(device, deviceAlarm); + } catch (InvalidArgumentException | SipException | ParseException e) { + logger.error("[命令发送失败] 发送报警: {}", e.getMessage()); } } } + }else { Device device = storage.queryVideoDevice(gbId); ParentPlatform platform = storage.queryParentPlatByServerGBId(gbId); diff --git a/src/main/java/com/genersoft/iot/vmp/storager/IVideoManagerStorage.java b/src/main/java/com/genersoft/iot/vmp/storager/IVideoManagerStorage.java index 5965678e..b53c2d31 100644 --- a/src/main/java/com/genersoft/iot/vmp/storager/IVideoManagerStorage.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/IVideoManagerStorage.java @@ -378,4 +378,7 @@ public interface IVideoManagerStorage { List queryChannelsByDeviceId(String serial, List channelIds, Boolean online); + List queryEnablePlatformListWithAsMessageChannel(); + + List queryDeviceWithAsMessageChannel(); } diff --git a/src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceChannelMapper.java b/src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceChannelMapper.java index e33a63ed..93f2a097 100644 --- a/src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceChannelMapper.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceChannelMapper.java @@ -344,10 +344,10 @@ public interface DeviceChannelMapper { "select * " + "from device_channel " + "where deviceId=#{deviceId}" + - " and parentId = #{parentId} or left(channelId, #{parentId.length()}) = #{parentId} and length(channelId)=#{length} " + + " and parentId = #{parentId} or left(channelId, LENGTH(#{parentId})) = #{parentId} and length(channelId)=#{length} " + " and parentId = #{parentId} or length(channelId)=#{length} " + " and parentId = #{parentId} " + - " and parentId = #{parentId} or left(channelId, #{parentId.length()}) = #{parentId} " + + " and parentId = #{parentId} or left(channelId, LENGTH(#{parentId})) = #{parentId} " + " "}) List getChannelsWithCivilCodeAndLength(String deviceId, String parentId, Integer length); diff --git a/src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceMapper.java b/src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceMapper.java index 0aed8207..8f158e29 100644 --- a/src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceMapper.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceMapper.java @@ -39,6 +39,7 @@ public interface DeviceMapper { "mobilePositionSubmissionInterval," + "subscribeCycleForAlarm," + "ssrcCheck," + + "asMessageChannel," + "geoCoordSys," + "treeType," + "online" + @@ -70,6 +71,7 @@ public interface DeviceMapper { "mobilePositionSubmissionInterval," + "subscribeCycleForAlarm," + "ssrcCheck," + + "asMessageChannel," + "geoCoordSys," + "treeType," + "online" + @@ -98,6 +100,7 @@ public interface DeviceMapper { "#{mobilePositionSubmissionInterval}," + "#{subscribeCycleForAlarm}," + "#{ssrcCheck}," + + "#{asMessageChannel}," + "#{geoCoordSys}," + "#{treeType}," + "#{online}" + @@ -152,6 +155,7 @@ public interface DeviceMapper { "mobilePositionSubmissionInterval," + "subscribeCycleForAlarm," + "ssrcCheck," + + "asMessageChannel," + "geoCoordSys," + "treeType," + "online," + @@ -192,6 +196,7 @@ public interface DeviceMapper { "mobilePositionSubmissionInterval," + "subscribeCycleForAlarm," + "ssrcCheck," + + "asMessageChannel," + "geoCoordSys," + "treeType," + "online " + @@ -222,6 +227,7 @@ public interface DeviceMapper { "mobilePositionSubmissionInterval," + "subscribeCycleForAlarm," + "ssrcCheck," + + "asMessageChannel," + "geoCoordSys," + "treeType," + "online" + @@ -243,6 +249,7 @@ public interface DeviceMapper { ", mobilePositionSubmissionInterval=#{mobilePositionSubmissionInterval}" + ", subscribeCycleForAlarm=#{subscribeCycleForAlarm}" + ", ssrcCheck=#{ssrcCheck}" + + ", asMessageChannel=#{asMessageChannel}" + ", geoCoordSys=#{geoCoordSys}" + ", treeType=#{treeType}" + ", mediaServerId=#{mediaServerId}" + @@ -259,6 +266,7 @@ public interface DeviceMapper { "updateTime," + "charset," + "ssrcCheck," + + "asMessageChannel," + "geoCoordSys," + "treeType," + "online" + @@ -271,6 +279,7 @@ public interface DeviceMapper { "#{updateTime}," + "#{charset}," + "#{ssrcCheck}," + + "#{asMessageChannel}," + "#{geoCoordSys}," + "#{treeType}," + "#{online}" + @@ -282,4 +291,7 @@ public interface DeviceMapper { @Select("select * from device") List getAll(); + + @Select("select * from device where asMessageChannel = 1") + List queryDeviceWithAsMessageChannel(); } diff --git a/src/main/java/com/genersoft/iot/vmp/storager/dao/ParentPlatformMapper.java b/src/main/java/com/genersoft/iot/vmp/storager/dao/ParentPlatformMapper.java index 52025eb5..89ebd69f 100644 --- a/src/main/java/com/genersoft/iot/vmp/storager/dao/ParentPlatformMapper.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/dao/ParentPlatformMapper.java @@ -15,10 +15,10 @@ import java.util.List; public interface ParentPlatformMapper { @Insert("INSERT INTO parent_platform (enable, name, serverGBId, serverGBDomain, serverIP, serverPort, deviceGBId, deviceIp, " + - " devicePort, username, password, expires, keepTimeout, transport, characterSet, ptz, rtcp, " + + " devicePort, username, password, expires, keepTimeout, transport, characterSet, ptz, rtcp, asMessageChannel, " + " status, startOfflinePush, catalogId, administrativeDivision, catalogGroup, createTime, updateTime, treeType) " + " VALUES (#{enable}, #{name}, #{serverGBId}, #{serverGBDomain}, #{serverIP}, #{serverPort}, #{deviceGBId}, #{deviceIp}, " + - " #{devicePort}, #{username}, #{password}, #{expires}, #{keepTimeout}, #{transport}, #{characterSet}, #{ptz}, #{rtcp}, " + + " #{devicePort}, #{username}, #{password}, #{expires}, #{keepTimeout}, #{transport}, #{characterSet}, #{ptz}, #{rtcp}, #{asMessageChannel}, " + " #{status}, #{startOfflinePush}, #{catalogId}, #{administrativeDivision}, #{catalogGroup}, #{createTime}, #{updateTime}, #{treeType})") int addParentPlatform(ParentPlatform parentPlatform); @@ -40,6 +40,7 @@ public interface ParentPlatformMapper { "characterSet=#{characterSet}, " + "ptz=#{ptz}, " + "rtcp=#{rtcp}, " + + "asMessageChannel=#{asMessageChannel}, " + "status=#{status}, " + "startOfflinePush=#{startOfflinePush}, " + "catalogGroup=#{catalogGroup}, " + @@ -68,9 +69,12 @@ public interface ParentPlatformMapper { "FROM parent_platform pp ") List getParentPlatformList(); - @Select("SELECT * FROM parent_platform WHERE enable=#{enable}") + @Select("SELECT * FROM parent_platform WHERE enable=#{enable} ") List getEnableParentPlatformList(boolean enable); + @Select("SELECT * FROM parent_platform WHERE enable=1 and asMessageChannel = 1") + List queryEnablePlatformListWithAsMessageChannel(); + @Select("SELECT * FROM parent_platform WHERE serverGBId=#{platformGbId}") ParentPlatform getParentPlatByServerGBId(String platformGbId); diff --git a/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java b/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java index 4f229d78..0997fe11 100644 --- a/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java @@ -177,12 +177,14 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage { @Override public boolean startDownload(StreamInfo stream, String callId) { boolean result; + String key=String.format("%S_%s_%s_%s_%s_%s_%s", VideoManagerConstants.DOWNLOAD_PREFIX, + userSetting.getServerId(), stream.getMediaServerId(), stream.getDeviceID(), stream.getChannelId(), stream.getStream(), callId); if (stream.getProgress() == 1) { - result = RedisUtil.set(String.format("%S_%s_%s_%s_%s_%s_%s", VideoManagerConstants.DOWNLOAD_PREFIX, - userSetting.getServerId(), stream.getMediaServerId(), stream.getDeviceID(), stream.getChannelId(), stream.getStream(), callId), stream); + logger.debug("添加下载缓存==已完成下载=》{}",key); + result = RedisUtil.set(key, stream); }else { - result = RedisUtil.set(String.format("%S_%s_%s_%s_%s_%s_%s", VideoManagerConstants.DOWNLOAD_PREFIX, - userSetting.getServerId(), stream.getMediaServerId(), stream.getDeviceID(), stream.getChannelId(), stream.getStream(), callId), stream, 60*60); + logger.debug("添加下载缓存==未完成下载=》{}",key); + result = RedisUtil.set(key, stream, 60*60); } return result; } @@ -617,7 +619,7 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage { stream, callId ); - List streamInfoScan = RedisUtil.scan(key); + List streamInfoScan = RedisUtil.scan2(key); if (streamInfoScan.size() > 0) { return (StreamInfo) RedisUtil.get((String) streamInfoScan.get(0)); }else { diff --git a/src/main/java/com/genersoft/iot/vmp/storager/impl/VideoManagerStorageImpl.java b/src/main/java/com/genersoft/iot/vmp/storager/impl/VideoManagerStorageImpl.java index 9475abcd..3920fc7e 100644 --- a/src/main/java/com/genersoft/iot/vmp/storager/impl/VideoManagerStorageImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/impl/VideoManagerStorageImpl.java @@ -524,6 +524,16 @@ public class VideoManagerStorageImpl implements IVideoManagerStorage { return platformMapper.getEnableParentPlatformList(enable); } + @Override + public List queryEnablePlatformListWithAsMessageChannel() { + return platformMapper.queryEnablePlatformListWithAsMessageChannel(); + } + + @Override + public List queryDeviceWithAsMessageChannel() { + return deviceMapper.queryDeviceWithAsMessageChannel(); + } + @Override public void outlineForAllParentPlatform() { platformMapper.outlineForAllParentPlatform(); diff --git a/src/main/java/com/genersoft/iot/vmp/utils/redis/RedisUtil.java b/src/main/java/com/genersoft/iot/vmp/utils/redis/RedisUtil.java index a50553d4..e54ed539 100644 --- a/src/main/java/com/genersoft/iot/vmp/utils/redis/RedisUtil.java +++ b/src/main/java/com/genersoft/iot/vmp/utils/redis/RedisUtil.java @@ -881,7 +881,13 @@ public class RedisUtil { return new ArrayList<>(resultKeys); } - + public static List scan2(String query) { + if (redisTemplate == null) { + redisTemplate = SpringBeanFactory.getBean("redisTemplate"); + } + Set keys = redisTemplate.keys(query); + return new ArrayList<>(keys); + } // ============================== 消息发送与订阅 ============================== public static void convertAndSend(String channel, JSONObject msg) { if (redisTemplate == null) { diff --git a/src/main/resources/all-application.yml b/src/main/resources/all-application.yml index 3844971c..8b9b7b03 100644 --- a/src/main/resources/all-application.yml +++ b/src/main/resources/all-application.yml @@ -199,8 +199,8 @@ user-settings: sip-use-source-ip-as-remote-address: false # 是否开启sip日志 sip-log: true - # 自动数据库升级,保证表结构完整 - sync-db: true + # 消息通道功能-缺少国标ID是否给所有上级发送消息 + send-to-platforms-when-id-lost: true # 跨域配置,配置你访问前端页面的地址即可, 可以配置多个 allowed-origins: - http://localhost:8008 diff --git a/web_src/index.html b/web_src/index.html index 778591ed..bd66d392 100644 --- a/web_src/index.html +++ b/web_src/index.html @@ -15,5 +15,6 @@
+ diff --git a/web_src/src/components/common/jessibuca.vue b/web_src/src/components/common/jessibuca.vue index 4049721c..77150761 100644 --- a/web_src/src/components/common/jessibuca.vue +++ b/web_src/src/components/common/jessibuca.vue @@ -1,5 +1,6 @@