From 8fdd10d1c2723ead1d2c3b232f64f292ba3ce1cc Mon Sep 17 00:00:00 2001 From: che_shuai Date: Mon, 24 Jul 2023 14:10:46 +0800 Subject: [PATCH 1/3] =?UTF-8?q?=E4=BF=AE=E5=A4=8D=E5=9C=A8=E5=9B=BD?= =?UTF-8?q?=E6=A0=87=E7=BA=A7=E8=81=94=E9=80=89=E6=8B=A9=E9=80=9A=E9=81=93?= =?UTF-8?q?=E6=97=B6=EF=BC=8C=E5=A6=82=E6=9E=9C=E6=B7=BB=E5=8A=A0=E9=80=9A?= =?UTF-8?q?=E9=81=93=E5=88=B0=E8=B7=9F=E5=B9=B3=E5=8F=B0=E6=A0=B9=E7=9B=AE?= =?UTF-8?q?=E5=BD=95=EF=BC=88=E5=8D=B3=E5=B9=B3=E5=8F=B0=E6=9C=AC=E8=BA=AB?= =?UTF-8?q?=EF=BC=89=EF=BC=8C=E6=97=A0=E6=B3=95=E8=A7=A6=E5=8F=91=E7=9B=AE?= =?UTF-8?q?=E5=BD=95=E5=8F=98=E6=9B=B4=E4=BA=8B=E4=BB=B6=E9=97=AE=E9=A2=98?= =?UTF-8?q?.=E8=AF=A6=E8=A7=81=20https://github.com/648540858/wvp-GB28181-?= =?UTF-8?q?pro/issues/958?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../vmp/service/impl/PlatformChannelServiceImpl.java | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/PlatformChannelServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/PlatformChannelServiceImpl.java index fa164289..517cb046 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/PlatformChannelServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/PlatformChannelServiceImpl.java @@ -126,7 +126,15 @@ public class PlatformChannelServiceImpl implements IPlatformChannelService { List deviceChannelList = new ArrayList<>(); if (channelReduces.size() > 0){ PlatformCatalog catalog = catalogManager.selectByPlatFormAndCatalogId(platform.getServerGBId(),catalogId); - if (catalog == null || !catalogId.equals(platform.getDeviceGBId())) { + if (catalog == null && catalogId.equals(platform.getDeviceGBId())) { + for (ChannelReduce channelReduce : channelReduces) { + DeviceChannel deviceChannel = deviceChannelMapper.queryChannel(channelReduce.getDeviceId(), channelReduce.getChannelId()); + deviceChannel.setParental(0); + deviceChannel.setCivilCode(platform.getServerGBDomain()); + deviceChannelList.add(deviceChannel); + } + return deviceChannelList; + } else if (catalog == null || !catalogId.equals(platform.getDeviceGBId())) { logger.warn("未查询到目录{}的信息", catalogId); return null; } From 7fd34cfda51657549667be84369c7f6794eeb6a3 Mon Sep 17 00:00:00 2001 From: wanghui <13282027326@163.com> Date: Fri, 21 Jul 2023 16:20:39 +0800 Subject: [PATCH 2/3] =?UTF-8?q?feat:=E6=94=AF=E6=8C=81=E7=BA=A7=E8=81=94tc?= =?UTF-8?q?p=E4=B8=BB=E5=8A=A8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../request/impl/AckRequestProcessor.java | 5 ++- .../request/impl/InviteRequestProcessor.java | 45 ++++++++++++++++++- .../iot/vmp/media/zlm/ZLMRESTfulUtils.java | 4 ++ .../iot/vmp/media/zlm/ZLMServerFactory.java | 7 +++ 4 files changed, 59 insertions(+), 2 deletions(-) diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java index 07946eb8..61a60041 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java @@ -130,7 +130,10 @@ public class AckRequestProcessor extends SIPRequestProcessorParent implements In // 开启rtcp保活 param.put("udp_rtcp_timeout", sendRtpItem.isRtcp()? "1":"0"); } - + // tcp主动时,此时是级联下级平台,在回复200ok时,本地已经请求zlm开启监听,跳过下面步骤 + if (sendRtpItem.isTcpActive()) { + return; + } if (mediaInfo == null) { RequestPushStreamMsg requestPushStreamMsg = RequestPushStreamMsg.getInstance( sendRtpItem.getMediaServerId(), sendRtpItem.getApp(), sendRtpItem.getStreamId(), diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java index df87e463..397a4a0f 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java @@ -1,5 +1,7 @@ package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl; +import com.alibaba.fastjson2.JSON; +import com.alibaba.fastjson2.JSONObject; import com.genersoft.iot.vmp.common.StreamInfo; import com.genersoft.iot.vmp.conf.DynamicTask; import com.genersoft.iot.vmp.conf.UserSetting; @@ -47,6 +49,8 @@ import javax.sip.header.CallIdHeader; import javax.sip.message.Response; import java.text.ParseException; import java.time.Instant; +import java.util.HashMap; +import java.util.Map; import java.util.Random; import java.util.Vector; @@ -396,6 +400,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements content.append("y=" + sendRtpItem.getSsrc() + "\r\n"); content.append("f=\r\n"); + try { // 超时未收到Ack应该回复bye,当前等待时间为10秒 dynamicTask.startDelay(callIdHeader.getCallId(), () -> { @@ -409,7 +414,33 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements } }, 60 * 1000); responseSdpAck(request, content.toString(), platform); - + // tcp主动模式,回复sdp后开启监听 + if (sendRtpItem.isTcpActive()) { + MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId()); + Map param = new HashMap<>(12); + param.put("vhost","__defaultVhost__"); + param.put("app",sendRtpItem.getApp()); + param.put("stream",sendRtpItem.getStreamId()); + param.put("ssrc", sendRtpItem.getSsrc()); + if (!sendRtpItem.isTcpActive()) { + param.put("dst_url",sendRtpItem.getIp()); + param.put("dst_port", sendRtpItem.getPort()); + } + String is_Udp = sendRtpItem.isTcp() ? "0" : "1"; + param.put("is_udp", is_Udp); + param.put("src_port", localPort); + param.put("pt", sendRtpItem.getPt()); + param.put("use_ps", sendRtpItem.isUsePs() ? "1" : "0"); + param.put("only_audio", sendRtpItem.isOnlyAudio() ? "1" : "0"); + if (!sendRtpItem.isTcp()) { + // 开启rtcp保活 + param.put("udp_rtcp_timeout", sendRtpItem.isRtcp()? "1":"0"); + } + JSONObject startSendRtpStreamResult = zlmServerFactory.startSendRtpStreamForPassive(mediaInfo, param); + if (startSendRtpStreamResult != null) { + startSendRtpStreamHand(evt, sendRtpItem, null, startSendRtpStreamResult, param, callIdHeader); + } + } } catch (SipException | InvalidArgumentException | ParseException e) { logger.error("[命令发送失败] 国标级联 回复SdpAck", e); } @@ -543,6 +574,18 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements } } + private void startSendRtpStreamHand(RequestEvent evt, SendRtpItem sendRtpItem, ParentPlatform parentPlatform, + JSONObject jsonObject, Map param, CallIdHeader callIdHeader) { + if (jsonObject == null) { + logger.error("下级TCP被动启动监听失败: 请检查ZLM服务"); + } else if (jsonObject.getInteger("code") == 0) { + logger.info("调用ZLM-TCP被动推流接口, 结果: {}", jsonObject); + logger.info("启动监听TCP被动推流成功[ {}/{} ],{}->{}:{}, " ,param.get("app"), param.get("stream"), jsonObject.getString("local_port"), param.get("dst_url"), param.get("dst_port")); + } else { + logger.error("启动监听TCP被动推流失败: {}, 参数:{}",jsonObject.getString("msg"), JSON.toJSONString(param)); + } + } + /** * 安排推流 */ diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRESTfulUtils.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRESTfulUtils.java index 054e5fcc..e58f2aeb 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRESTfulUtils.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRESTfulUtils.java @@ -289,6 +289,10 @@ public class ZLMRESTfulUtils { return sendPost(mediaServerItem, "startSendRtp",param, null); } + public JSONObject startSendRtpPassive(MediaServerItem mediaServerItem, Map param) { + return sendPost(mediaServerItem, "startSendRtpPassive",param, null); + } + public JSONObject stopSendRtp(MediaServerItem mediaServerItem, Map param) { return sendPost(mediaServerItem, "stopSendRtp",param, null); } diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMServerFactory.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMServerFactory.java index 34acf152..c190de18 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMServerFactory.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMServerFactory.java @@ -231,6 +231,13 @@ public class ZLMServerFactory { return zlmresTfulUtils.startSendRtp(mediaServerItem, param); } + /** + * 调用zlm RESTFUL API —— startSendRtpPassive + */ + public JSONObject startSendRtpStreamForPassive(MediaServerItem mediaServerItem, Mapparam) { + return zlmresTfulUtils.startSendRtpPassive(mediaServerItem, param); + } + /** * 查询待转推的流是否就绪 */ From 59d8f2f9152b3c3106abae64bef343cde842f225 Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: Tue, 25 Jul 2023 11:14:43 +0800 Subject: [PATCH 3/3] =?UTF-8?q?=E6=94=AF=E6=8C=81=E5=BD=95=E5=83=8F?= =?UTF-8?q?=E5=9B=9E=E6=94=BE=E4=BD=BF=E7=94=A8=E5=9B=BA=E5=AE=9A=E6=B5=81?= =?UTF-8?q?=E5=9C=B0=E5=9D=80=E4=BB=A5=E5=8F=8A=E8=87=AA=E5=8A=A8=E7=82=B9?= =?UTF-8?q?=E6=92=AD=E5=BD=95=E5=83=8F=E5=9B=9E=E6=94=BE?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- sql/2.6.9更新.sql | 2 + sql/初始化.sql | 2 +- .../cmd/KeepaliveNotifyMessageHandler.java | 1 + .../vmp/media/zlm/ZLMHttpHookListener.java | 94 ++++++++++++++----- .../iot/vmp/service/IPlayService.java | 3 +- .../iot/vmp/service/impl/PlayServiceImpl.java | 14 ++- .../com/genersoft/iot/vmp/utils/DateUtil.java | 5 + 7 files changed, 95 insertions(+), 26 deletions(-) create mode 100644 sql/2.6.9更新.sql diff --git a/sql/2.6.9更新.sql b/sql/2.6.9更新.sql new file mode 100644 index 00000000..2e047948 --- /dev/null +++ b/sql/2.6.9更新.sql @@ -0,0 +1,2 @@ +alter table wvp_device_channel + change stream_id stream_id varying(255) \ No newline at end of file diff --git a/sql/初始化.sql b/sql/初始化.sql index f2952ba1..93eef4ed 100644 --- a/sql/初始化.sql +++ b/sql/初始化.sql @@ -79,7 +79,7 @@ create table wvp_device_channel ( custom_longitude double precision, latitude double precision, custom_latitude double precision, - stream_id character varying(50), + stream_id character varying(255), device_id character varying(50) not null, parental character varying(50), has_audio bool default false, diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/KeepaliveNotifyMessageHandler.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/KeepaliveNotifyMessageHandler.java index 729eec39..5c577ba6 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/KeepaliveNotifyMessageHandler.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/KeepaliveNotifyMessageHandler.java @@ -59,6 +59,7 @@ public class KeepaliveNotifyMessageHandler extends SIPRequestProcessorParent imp // 未注册的设备不做处理 return; } + logger.info("[收到心跳], device: {}", device.getDeviceId()); SIPRequest request = (SIPRequest) evt.getRequest(); // 回复200 OK try { diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java index d19d98b5..9b2864fd 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java @@ -24,8 +24,10 @@ import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem; import com.genersoft.iot.vmp.media.zlm.dto.hook.*; import com.genersoft.iot.vmp.service.*; import com.genersoft.iot.vmp.service.bean.MessageForPushChannel; +import com.genersoft.iot.vmp.service.bean.SSRCInfo; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.IVideoManagerStorage; +import com.genersoft.iot.vmp.utils.DateUtil; import com.genersoft.iot.vmp.vmanager.bean.ErrorCode; import com.genersoft.iot.vmp.vmanager.bean.OtherRtpSendInfo; import com.genersoft.iot.vmp.vmanager.bean.StreamContent; @@ -559,7 +561,7 @@ public class ZLMHttpHookListener { if ("rtp".equals(param.getApp())) { String[] s = param.getStream().split("_"); - if (!mediaInfo.isRtpEnable() || s.length != 2) { + if (!mediaInfo.isRtpEnable() || (s.length != 2 && s.length != 4)) { defaultResult.setResult(HookResult.SUCCESS()); return defaultResult; } @@ -575,33 +577,79 @@ public class ZLMHttpHookListener { defaultResult.setResult(new HookResult(ErrorCode.ERROR404.getCode(), ErrorCode.ERROR404.getMsg())); return defaultResult; } - logger.info("[ZLM HOOK] 流未找到, 发起自动点播:{}->{}->{}/{}", param.getMediaServerId(), param.getSchema(), param.getApp(), param.getStream()); + if (s.length == 2) { + logger.info("[ZLM HOOK] 预览流未找到, 发起自动点播:{}->{}->{}/{}", param.getMediaServerId(), param.getSchema(), param.getApp(), param.getStream()); - RequestMessage msg = new RequestMessage(); - String key = DeferredResultHolder.CALLBACK_CMD_PLAY + deviceId + channelId; - boolean exist = resultHolder.exist(key, null); - msg.setKey(key); - String uuid = UUID.randomUUID().toString(); - msg.setId(uuid); - DeferredResult result = new DeferredResult<>(userSetting.getPlayTimeout().longValue()); + RequestMessage msg = new RequestMessage(); + String key = DeferredResultHolder.CALLBACK_CMD_PLAY + deviceId + channelId; + boolean exist = resultHolder.exist(key, null); + msg.setKey(key); + String uuid = UUID.randomUUID().toString(); + msg.setId(uuid); + DeferredResult result = new DeferredResult<>(userSetting.getPlayTimeout().longValue()); - result.onTimeout(() -> { - logger.info("[ZLM HOOK] 自动点播, 等待超时"); - // 释放rtpserver - msg.setData(new HookResult(ErrorCode.ERROR100.getCode(), "点播超时")); - resultHolder.invokeResult(msg); - }); - - // 录像查询以channelId作为deviceId查询 - resultHolder.put(key, uuid, result); - - if (!exist) { - playService.play(mediaInfo, deviceId, channelId, null, (code, message, data) -> { - msg.setData(new HookResult(code, message)); + result.onTimeout(() -> { + logger.info("[ZLM HOOK] 预览流自动点播, 等待超时"); + // 释放rtpserver + msg.setData(new HookResult(ErrorCode.ERROR100.getCode(), "点播超时")); resultHolder.invokeResult(msg); }); + + resultHolder.put(key, uuid, result); + + if (!exist) { + playService.play(mediaInfo, deviceId, channelId, null, (code, message, data) -> { + msg.setData(new HookResult(code, message)); + resultHolder.invokeResult(msg); + }); + } + return result; + }else if(s.length == 4){ + // 此时为录像回放, 录像回放格式为> 设备ID_通道ID_开始时间_结束时间 + String startTimeStr = s[2]; + String endTimeStr = s[3]; + if (startTimeStr == null || endTimeStr == null || startTimeStr.length() != 14 || endTimeStr.length() != 14) { + defaultResult.setResult(HookResult.SUCCESS()); + return defaultResult; + } + String startTime = DateUtil.urlToyyyy_MM_dd_HH_mm_ss(startTimeStr); + String endTime = DateUtil.urlToyyyy_MM_dd_HH_mm_ss(endTimeStr); + logger.info("[ZLM HOOK] 回放流未找到, 发起自动点播:{}->{}->{}/{}-{}-{}", + param.getMediaServerId(), param.getSchema(), + param.getApp(), param.getStream(), + startTime, endTime + ); + RequestMessage msg = new RequestMessage(); + String key = DeferredResultHolder.CALLBACK_CMD_PLAYBACK + deviceId + channelId; + boolean exist = resultHolder.exist(key, null); + msg.setKey(key); + String uuid = UUID.randomUUID().toString(); + msg.setId(uuid); + DeferredResult result = new DeferredResult<>(userSetting.getPlayTimeout().longValue()); + + result.onTimeout(() -> { + logger.info("[ZLM HOOK] 回放流自动点播, 等待超时"); + // 释放rtpserver + msg.setData(new HookResult(ErrorCode.ERROR100.getCode(), "点播超时")); + resultHolder.invokeResult(msg); + }); + + resultHolder.put(key, uuid, result); + + if (!exist) { + SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaInfo, param.getStream(), null, + device.isSsrcCheck(), true, 0, false, device.getStreamModeForParam()); + playService.playBack(mediaInfo, ssrcInfo, deviceId, channelId, startTime, endTime, (code, message, data) -> { + msg.setData(new HookResult(code, message)); + resultHolder.invokeResult(msg); + }); + } + return result; + }else { + defaultResult.setResult(HookResult.SUCCESS()); + return defaultResult; } - return result; + } else { // 拉流代理 StreamProxyItem streamProxyByAppAndStream = streamProxyService.getStreamProxyByAppAndStream(param.getApp(), param.getStream()); diff --git a/src/main/java/com/genersoft/iot/vmp/service/IPlayService.java b/src/main/java/com/genersoft/iot/vmp/service/IPlayService.java index bd2bc885..1effe96c 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/IPlayService.java +++ b/src/main/java/com/genersoft/iot/vmp/service/IPlayService.java @@ -29,7 +29,6 @@ public interface IPlayService { void playBack(String deviceId, String channelId, String startTime, String endTime, ErrorCallback callback); void playBack(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, String deviceId, String channelId, String startTime, String endTime, ErrorCallback callback); - void zlmServerOffline(String mediaServerId); void download(String deviceId, String channelId, String startTime, String endTime, int downloadSpeed, ErrorCallback callback); @@ -44,4 +43,6 @@ public interface IPlayService { void resumeRtp(String streamId) throws ServiceException, InvalidArgumentException, ParseException, SipException; void getSnap(String deviceId, String channelId, String fileName, ErrorCallback errorCallback); + + } diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java index 4f487ad9..0c287a9a 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java @@ -539,7 +539,19 @@ public class PlayServiceImpl implements IPlayService { return; } MediaServerItem newMediaServerItem = getNewMediaServerItem(device); - SSRCInfo ssrcInfo = mediaServerService.openRTPServer(newMediaServerItem, null, null, device.isSsrcCheck(), true, 0, false, device.getStreamModeForParam()); + String stream = null; + if (newMediaServerItem.isRtpEnable()) { + String startTimeStr = startTime.replace("-", "") + .replace(":", "") + .replace(" ", ""); + System.out.println(startTimeStr); + String endTimeTimeStr = endTime.replace("-", "") + .replace(":", "") + .replace(" ", ""); + System.out.println(endTimeTimeStr); + stream = deviceId + "_" + channelId + "_" + startTimeStr + "_" + endTimeTimeStr; + } + SSRCInfo ssrcInfo = mediaServerService.openRTPServer(newMediaServerItem, stream, null, device.isSsrcCheck(), true, 0, false, device.getStreamModeForParam()); playBack(newMediaServerItem, ssrcInfo, deviceId, channelId, startTime, endTime, callback); } diff --git a/src/main/java/com/genersoft/iot/vmp/utils/DateUtil.java b/src/main/java/com/genersoft/iot/vmp/utils/DateUtil.java index 7a65a610..923f8346 100644 --- a/src/main/java/com/genersoft/iot/vmp/utils/DateUtil.java +++ b/src/main/java/com/genersoft/iot/vmp/utils/DateUtil.java @@ -53,6 +53,10 @@ public class DateUtil { return formatter.format(formatterCompatibleISO8601.parse(formatTime)); } + public static String urlToyyyy_MM_dd_HH_mm_ss(String formatTime) { + return formatter.format(urlFormatter.parse(formatTime)); + } + /** * yyyy_MM_dd_HH_mm_ss 转时间戳 * @param formatTime @@ -82,6 +86,7 @@ public class DateUtil { return urlFormatter.format(nowDateTime); } + /** * 格式校验 * @param timeStr 时间字符串