From 39103e53c70898ea7ba0be3ec5f7251674514319 Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: Wed, 27 Mar 2024 18:23:22 +0800 Subject: [PATCH 1/6] =?UTF-8?q?=E4=BF=AE=E5=A4=8D=E7=82=B9=E6=92=AD?= =?UTF-8?q?=E8=AF=B7=E6=B1=82=E9=87=8A=E6=94=BE=E7=9A=84=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../iot/vmp/vmanager/gb28181/play/PlayController.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/play/PlayController.java b/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/play/PlayController.java index ca8cbcf8..6404f4e6 100755 --- a/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/play/PlayController.java +++ b/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/play/PlayController.java @@ -157,7 +157,8 @@ public class PlayController { wvpResult.setMsg(msg); } requestMessage.setData(wvpResult); - resultHolder.invokeResult(requestMessage); + // 此处必须释放所有请求 + resultHolder.invokeAllResult(requestMessage); }); return result; } From 5743917439f3989a4aa6748d8498b129e0521643 Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: Wed, 3 Apr 2024 15:03:49 +0800 Subject: [PATCH 2/6] =?UTF-8?q?=E6=B7=BB=E5=8A=A0idea=20logo?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- README.md | 10 ++-------- 1 file changed, 2 insertions(+), 8 deletions(-) diff --git a/README.md b/README.md index a50c36d8..a665157a 100644 --- a/README.md +++ b/README.md @@ -136,12 +136,6 @@ https://gitee.com/pan648540858/wvp-GB28181-pro.git [ydpd](https://github.com/ydpd) [szy833](https://github.com/szy833) [ydwxb](https://github.com/ydwxb) [Albertzhu666](https://github.com/Albertzhu666) [mk1990](https://github.com/mk1990) [SaltFish001](https://github.com/SaltFish001) +同时感谢JetBrains对开源项目的支持,本项目使用IntelliJ IDEA开发与调试: -ffmpeg -re -i 123.mp3 -acodec pcm_alaw -ar 8000 -ac 1 -f rtsp rtsp://192.168.1.3:30554/broadcast/34020000001320000101_34020000001310000001 - -ffmpeg -re -i 123.mp3 -acodec pcm_alaw -ar 8000 -ac 1 -f rtsp rtsp://192.168.1.3:30554/talk/34020000001320000011_34020000001370000001 - - - -ffmpeg -re -i 123.mp3 -acodec pcm_alaw -ar 8000 -ac 1 -f rtsp rtsp://192.168.1.3:30554/talk/34020000001320000101_34020000001310000001 - +[![JetBrains](https://resources.jetbrains.com/storage/products/company/brand/logos/IntelliJ_IDEA_icon.svg?_ga=2.143694769.529214288.1712023294-439039083.1711422571&_gl=1*102dv9n*_ga*NDM5MDM5MDgzLjE3MTE0MjI1NzE.*_ga_9J976DJZ68*MTcxMjEyNjg4NC45LjEuMTcxMjEyNzc2My4zMy4wLjA.) From 62f716be9cd1cee2a5fba566db3fcd43bd9fc4cc Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: Wed, 3 Apr 2024 15:04:33 +0800 Subject: [PATCH 3/6] =?UTF-8?q?=E6=B7=BB=E5=8A=A0idea=20logo?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index a665157a..df71b6c0 100644 --- a/README.md +++ b/README.md @@ -138,4 +138,4 @@ https://gitee.com/pan648540858/wvp-GB28181-pro.git 同时感谢JetBrains对开源项目的支持,本项目使用IntelliJ IDEA开发与调试: -[![JetBrains](https://resources.jetbrains.com/storage/products/company/brand/logos/IntelliJ_IDEA_icon.svg?_ga=2.143694769.529214288.1712023294-439039083.1711422571&_gl=1*102dv9n*_ga*NDM5MDM5MDgzLjE3MTE0MjI1NzE.*_ga_9J976DJZ68*MTcxMjEyNjg4NC45LjEuMTcxMjEyNzc2My4zMy4wLjA.) +![JetBrains](https://resources.jetbrains.com/storage/products/company/brand/logos/IntelliJ_IDEA_icon.svg?_ga=2.143694769.529214288.1712023294-439039083.1711422571&_gl=1*102dv9n*_ga*NDM5MDM5MDgzLjE3MTE0MjI1NzE.*_ga_9J976DJZ68*MTcxMjEyNjg4NC45LjEuMTcxMjEyNzc2My4zMy4wLjA.) From 7d530099878dbcf474182bbbfc694ecfe74c9fbd Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: Sun, 7 Apr 2024 20:06:48 +0800 Subject: [PATCH 4/6] =?UTF-8?q?=E4=BC=98=E5=8C=96=E6=8E=A8=E6=B5=81?= =?UTF-8?q?=E9=80=9A=E7=9F=A5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../iot/vmp/gb28181/bean/SendRtpItem.java | 29 ++++++ .../request/impl/AckRequestProcessor.java | 2 +- .../request/impl/InviteRequestProcessor.java | 22 +++-- .../vmp/media/zlm/ZLMHttpHookListener.java | 53 ++++++---- .../iot/vmp/service/impl/PlayServiceImpl.java | 2 +- .../service/impl/StreamProxyServiceImpl.java | 4 - .../redisMsg/RedisStreamMsgListener.java | 98 +++++++++---------- .../iot/vmp/storager/IRedisCatchStorage.java | 2 + .../storager/impl/RedisCatchStorageImpl.java | 6 ++ .../cloudRecord/CloudRecordController.java | 5 - 10 files changed, 132 insertions(+), 91 deletions(-) diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SendRtpItem.java b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SendRtpItem.java index 361bdc6d..30193d27 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SendRtpItem.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SendRtpItem.java @@ -305,4 +305,33 @@ public class SendRtpItem { public void setReceiveStream(String receiveStream) { this.receiveStream = receiveStream; } + + @Override + public String toString() { + return "SendRtpItem{" + + "ip='" + ip + '\'' + + ", port=" + port + + ", ssrc='" + ssrc + '\'' + + ", platformId='" + platformId + '\'' + + ", deviceId='" + deviceId + '\'' + + ", app='" + app + '\'' + + ", channelId='" + channelId + '\'' + + ", status=" + status + + ", stream='" + stream + '\'' + + ", tcp=" + tcp + + ", tcpActive=" + tcpActive + + ", localPort=" + localPort + + ", mediaServerId='" + mediaServerId + '\'' + + ", serverId='" + serverId + '\'' + + ", CallId='" + CallId + '\'' + + ", fromTag='" + fromTag + '\'' + + ", toTag='" + toTag + '\'' + + ", pt=" + pt + + ", usePs=" + usePs + + ", onlyAudio=" + onlyAudio + + ", rtcp=" + rtcp + + ", playType=" + playType + + ", receiveStream='" + receiveStream + '\'' + + '}'; + } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java index 70048209..242e5ef6 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java @@ -116,7 +116,7 @@ public class AckRequestProcessor extends SIPRequestProcessorParent implements In if (parentPlatform != null) { Map param = getSendRtpParam(sendRtpItem); - if (mediaInfo == null) { + if (!userSetting.getServerId().equals(sendRtpItem.getServerId())) { RequestPushStreamMsg requestPushStreamMsg = RequestPushStreamMsg.getInstance( sendRtpItem.getMediaServerId(), sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort(), sendRtpItem.getSsrc(), sendRtpItem.isTcp(), diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java index 3205498f..74fad542 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java @@ -575,14 +575,20 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements } if ("push".equals(gbStream.getStreamType())) { - if (streamPushItem != null && streamPushItem.isPushIng()) { - // 推流状态 - pushStream(evt, request, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive, - mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId); - } else { - // 未推流 拉起 - notifyStreamOnline(evt, request, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive, - mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId); + if (streamPushItem != null) { + // 从redis查询是否正在接收这个推流 + OnStreamChangedHookParam pushListItem = redisCatchStorage.getPushListItem(gbStream.getApp(), gbStream.getStream()); + if (pushListItem != null) { + StreamPushItem transform = streamPushService.transform(pushListItem); + transform.setSelf(userSetting.getServerId().equals(pushListItem.getSeverId())); + // 推流状态 + pushStream(evt, request, gbStream, transform, platform, callIdHeader, mediaServerItem, port, tcpActive, + mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId); + }else { + // 未推流 拉起 + notifyStreamOnline(evt, request, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive, + mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId); + } } } else if ("proxy".equals(gbStream.getStreamType())) { if (null != proxyByAppAndStream) { diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java index 5feb306b..3a7f4524 100755 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java @@ -509,32 +509,43 @@ public class ZLMHttpHookListener { List sendRtpItems = redisCatchStorage.querySendRTPServerByStream(param.getStream()); if (!sendRtpItems.isEmpty()) { for (SendRtpItem sendRtpItem : sendRtpItems) { - if (sendRtpItem != null && sendRtpItem.getApp().equals(param.getApp())) { - String platformId = sendRtpItem.getPlatformId(); - ParentPlatform platform = storager.queryParentPlatByServerGBId(platformId); - Device device = deviceService.getDevice(platformId); + if (sendRtpItem == null) { + continue; + } - try { - if (platform != null) { - commanderFroPlatform.streamByeCmd(platform, sendRtpItem); - redisCatchStorage.deleteSendRTPServer(platformId, sendRtpItem.getChannelId(), - sendRtpItem.getCallId(), sendRtpItem.getStream()); - } else { - cmder.streamByeCmd(device, sendRtpItem.getChannelId(), param.getStream(), sendRtpItem.getCallId()); - if (sendRtpItem.getPlayType().equals(InviteStreamType.BROADCAST) - || sendRtpItem.getPlayType().equals(InviteStreamType.TALK)) { - AudioBroadcastCatch audioBroadcastCatch = audioBroadcastManager.get(sendRtpItem.getDeviceId(), sendRtpItem.getChannelId()); - if (audioBroadcastCatch != null) { - // 来自上级平台的停止对讲 - logger.info("[停止对讲] 来自上级,平台:{}, 通道:{}", sendRtpItem.getDeviceId(), sendRtpItem.getChannelId()); - audioBroadcastManager.del(sendRtpItem.getDeviceId(), sendRtpItem.getChannelId()); + if (sendRtpItem.getApp().equals(param.getApp())) { + logger.info(sendRtpItem.toString()); + if (userSetting.getServerId().equals(sendRtpItem.getServerId())) { + // 通知其他wvp停止发流 +// redisCatchStorage.sendRtp + }else { + String platformId = sendRtpItem.getPlatformId(); + ParentPlatform platform = storager.queryParentPlatByServerGBId(platformId); + Device device = deviceService.getDevice(platformId); + + try { + if (platform != null) { + commanderFroPlatform.streamByeCmd(platform, sendRtpItem); + redisCatchStorage.deleteSendRTPServer(platformId, sendRtpItem.getChannelId(), + sendRtpItem.getCallId(), sendRtpItem.getStream()); + } else { + cmder.streamByeCmd(device, sendRtpItem.getChannelId(), param.getStream(), sendRtpItem.getCallId()); + if (sendRtpItem.getPlayType().equals(InviteStreamType.BROADCAST) + || sendRtpItem.getPlayType().equals(InviteStreamType.TALK)) { + AudioBroadcastCatch audioBroadcastCatch = audioBroadcastManager.get(sendRtpItem.getDeviceId(), sendRtpItem.getChannelId()); + if (audioBroadcastCatch != null) { + // 来自上级平台的停止对讲 + logger.info("[停止对讲] 来自上级,平台:{}, 通道:{}", sendRtpItem.getDeviceId(), sendRtpItem.getChannelId()); + audioBroadcastManager.del(sendRtpItem.getDeviceId(), sendRtpItem.getChannelId()); + } } } + } catch (SipException | InvalidArgumentException | ParseException | + SsrcTransactionNotFoundException e) { + logger.error("[命令发送失败] 发送BYE: {}", e.getMessage()); } - } catch (SipException | InvalidArgumentException | ParseException | - SsrcTransactionNotFoundException e) { - logger.error("[命令发送失败] 发送BYE: {}", e.getMessage()); } + } } } diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java index c8d203f4..8c0e9b04 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java @@ -1170,7 +1170,7 @@ public class PlayServiceImpl implements IPlayService { dynamicTask.startDelay(key, ()->{ logger.info("[语音广播]等待invite消息超时:{}/{}", device.getDeviceId(), channelId); stopAudioBroadcast(device.getDeviceId(), channelId); - }, 2000); + }, 10*1000); }, eventResultForError -> { // 发送失败 logger.error("语音广播发送失败: {}:{}", channelId, eventResultForError.msg); diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java index 59c5acef..f0230f76 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java @@ -8,7 +8,6 @@ import com.genersoft.iot.vmp.common.StreamInfo; import com.genersoft.iot.vmp.conf.DynamicTask; import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.conf.exception.ControllerException; -import com.genersoft.iot.vmp.gb28181.event.EventPublisher; import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent; import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils; import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory; @@ -25,7 +24,6 @@ import com.genersoft.iot.vmp.service.IStreamProxyService; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.IVideoManagerStorage; import com.genersoft.iot.vmp.storager.dao.GbStreamMapper; -import com.genersoft.iot.vmp.storager.dao.ParentPlatformMapper; import com.genersoft.iot.vmp.storager.dao.PlatformGbStreamMapper; import com.genersoft.iot.vmp.storager.dao.StreamProxyMapper; import com.genersoft.iot.vmp.utils.DateUtil; @@ -333,8 +331,6 @@ public class StreamProxyServiceImpl implements IStreamProxyService { result = zlmresTfulUtils.addStreamProxy(mediaServerItem, param.getApp(), param.getStream(), param.getUrl().trim(), param.isEnableAudio(), param.isEnableMp4(), param.getRtpType()); } - System.out.println("addStreamProxyToZlm===="); - System.out.println(result); if (result != null && result.getInteger("code") == 0) { JSONObject data = result.getJSONObject("data"); if (data == null) { diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisStreamMsgListener.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisStreamMsgListener.java index f5f29487..0912f0b7 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisStreamMsgListener.java +++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisStreamMsgListener.java @@ -1,11 +1,7 @@ package com.genersoft.iot.vmp.service.redisMsg; -import com.alibaba.fastjson2.JSON; -import com.alibaba.fastjson2.JSONObject; import com.genersoft.iot.vmp.conf.UserSetting; - import com.genersoft.iot.vmp.media.zlm.ZLMMediaListManager; -import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -41,52 +37,52 @@ public class RedisStreamMsgListener implements MessageListener { @Override public void onMessage(Message message, byte[] bytes) { - boolean isEmpty = taskQueue.isEmpty(); - taskQueue.offer(message); - if (isEmpty) { - taskExecutor.execute(() -> { - while (!taskQueue.isEmpty()) { - Message msg = taskQueue.poll(); - try { - JSONObject steamMsgJson = JSON.parseObject(msg.getBody(), JSONObject.class); - if (steamMsgJson == null) { - logger.warn("[收到redis 流变化]消息解析失败"); - continue; - } - String serverId = steamMsgJson.getString("serverId"); - - if (userSetting.getServerId().equals(serverId)) { - // 自己发送的消息忽略即可 - continue; - } - logger.info("[收到redis 流变化]: {}", new String(message.getBody())); - String app = steamMsgJson.getString("app"); - String stream = steamMsgJson.getString("stream"); - boolean register = steamMsgJson.getBoolean("register"); - String mediaServerId = steamMsgJson.getString("mediaServerId"); - OnStreamChangedHookParam onStreamChangedHookParam = new OnStreamChangedHookParam(); - onStreamChangedHookParam.setSeverId(serverId); - onStreamChangedHookParam.setApp(app); - onStreamChangedHookParam.setStream(stream); - onStreamChangedHookParam.setRegist(register); - onStreamChangedHookParam.setMediaServerId(mediaServerId); - onStreamChangedHookParam.setCreateStamp(System.currentTimeMillis()/1000); - onStreamChangedHookParam.setAliveSecond(0L); - onStreamChangedHookParam.setTotalReaderCount("0"); - onStreamChangedHookParam.setOriginType(0); - onStreamChangedHookParam.setOriginTypeStr("0"); - onStreamChangedHookParam.setOriginTypeStr("unknown"); - if (register) { - zlmMediaListManager.addPush(onStreamChangedHookParam); - }else { - zlmMediaListManager.removeMedia(app, stream); - } - }catch (Exception e) { - logger.warn("[REDIS消息-流变化] 发现未处理的异常, \r\n{}", JSON.toJSONString(message)); - logger.error("[REDIS消息-流变化] 异常内容: ", e); - } - } - }); - } +// boolean isEmpty = taskQueue.isEmpty(); +// taskQueue.offer(message); +// if (isEmpty) { +// taskExecutor.execute(() -> { +// while (!taskQueue.isEmpty()) { +// Message msg = taskQueue.poll(); +// try { +// JSONObject steamMsgJson = JSON.parseObject(msg.getBody(), JSONObject.class); +// if (steamMsgJson == null) { +// logger.warn("[收到redis 流变化]消息解析失败"); +// continue; +// } +// String serverId = steamMsgJson.getString("serverId"); +// +// if (userSetting.getServerId().equals(serverId)) { +// // 自己发送的消息忽略即可 +// continue; +// } +// logger.info("[收到redis 流变化]: {}", new String(message.getBody())); +// String app = steamMsgJson.getString("app"); +// String stream = steamMsgJson.getString("stream"); +// boolean register = steamMsgJson.getBoolean("register"); +// String mediaServerId = steamMsgJson.getString("mediaServerId"); +// OnStreamChangedHookParam onStreamChangedHookParam = new OnStreamChangedHookParam(); +// onStreamChangedHookParam.setSeverId(serverId); +// onStreamChangedHookParam.setApp(app); +// onStreamChangedHookParam.setStream(stream); +// onStreamChangedHookParam.setRegist(register); +// onStreamChangedHookParam.setMediaServerId(mediaServerId); +// onStreamChangedHookParam.setCreateStamp(System.currentTimeMillis()/1000); +// onStreamChangedHookParam.setAliveSecond(0L); +// onStreamChangedHookParam.setTotalReaderCount("0"); +// onStreamChangedHookParam.setOriginType(0); +// onStreamChangedHookParam.setOriginTypeStr("0"); +// onStreamChangedHookParam.setOriginTypeStr("unknown"); +// if (register) { +// zlmMediaListManager.addPush(onStreamChangedHookParam); +// }else { +// zlmMediaListManager.removeMedia(app, stream); +// } +// }catch (Exception e) { +// logger.warn("[REDIS消息-流变化] 发现未处理的异常, \r\n{}", JSON.toJSONString(message)); +// logger.error("[REDIS消息-流变化] 异常内容: ", e); +// } +// } +// }); +// } } } diff --git a/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java b/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java index 78fd2801..06b1d562 100755 --- a/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java @@ -211,5 +211,7 @@ public interface IRedisCatchStorage { void addPushListItem(String app, String stream, OnStreamChangedHookParam param); + OnStreamChangedHookParam getPushListItem(String app, String stream); + void removePushListItem(String app, String stream, String mediaServerId); } diff --git a/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java b/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java index 18a037dd..a1a00336 100755 --- a/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java @@ -656,6 +656,12 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage { redisTemplate.opsForValue().set(key, param); } + @Override + public OnStreamChangedHookParam getPushListItem(String app, String stream) { + String key = VideoManagerConstants.PUSH_STREAM_LIST + app + "_" + stream; + return (OnStreamChangedHookParam)redisTemplate.opsForValue().get(key); + } + @Override public void removePushListItem(String app, String stream, String mediaServerId) { String key = VideoManagerConstants.PUSH_STREAM_LIST + app + "_" + stream; diff --git a/src/main/java/com/genersoft/iot/vmp/vmanager/cloudRecord/CloudRecordController.java b/src/main/java/com/genersoft/iot/vmp/vmanager/cloudRecord/CloudRecordController.java index b3d19908..b37a3d93 100755 --- a/src/main/java/com/genersoft/iot/vmp/vmanager/cloudRecord/CloudRecordController.java +++ b/src/main/java/com/genersoft/iot/vmp/vmanager/cloudRecord/CloudRecordController.java @@ -1,12 +1,8 @@ package com.genersoft.iot.vmp.vmanager.cloudRecord; import com.alibaba.fastjson2.JSONArray; -import com.genersoft.iot.vmp.conf.DynamicTask; -import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.conf.exception.ControllerException; import com.genersoft.iot.vmp.conf.security.JwtUtils; -import com.genersoft.iot.vmp.media.zlm.SendRtpPortManager; -import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import com.genersoft.iot.vmp.service.ICloudRecordService; import com.genersoft.iot.vmp.service.IMediaServerService; @@ -22,7 +18,6 @@ import org.apache.commons.lang3.ObjectUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.data.redis.core.RedisTemplate; import org.springframework.web.bind.annotation.*; import javax.servlet.http.HttpServletRequest; From aeb75c0df1998a268114b382083170184ad99324 Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: Sun, 7 Apr 2024 20:25:44 +0800 Subject: [PATCH 5/6] =?UTF-8?q?=E4=BC=98=E5=8C=96=E6=8E=A8=E6=B5=81?= =?UTF-8?q?=E7=BB=93=E6=9D=9F=E5=90=8E=E5=8F=91=E9=80=81BYE?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../vmp/media/zlm/ZLMHttpHookListener.java | 5 +++- .../RedisPushStreamCloseResponseListener.java | 23 ------------------- .../iot/vmp/storager/IRedisCatchStorage.java | 3 +++ .../storager/impl/RedisCatchStorageImpl.java | 7 ++++++ 4 files changed, 14 insertions(+), 24 deletions(-) diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java index 3a7f4524..6076db4b 100755 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java @@ -516,8 +516,11 @@ public class ZLMHttpHookListener { if (sendRtpItem.getApp().equals(param.getApp())) { logger.info(sendRtpItem.toString()); if (userSetting.getServerId().equals(sendRtpItem.getServerId())) { + MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(0, + sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getChannelId(), + sendRtpItem.getPlatformId(), null, userSetting.getServerId(), param.getMediaServerId()); // 通知其他wvp停止发流 -// redisCatchStorage.sendRtp + redisCatchStorage.sendPushStreamClose(messageForPushChannel); }else { String platformId = sendRtpItem.getPlatformId(); ParentPlatform platform = storager.queryParentPlatByServerGBId(platformId); diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamCloseResponseListener.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamCloseResponseListener.java index fe0ccd2a..a0315732 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamCloseResponseListener.java +++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamCloseResponseListener.java @@ -2,12 +2,10 @@ package com.genersoft.iot.vmp.service.redisMsg; import com.alibaba.fastjson2.JSON; import com.genersoft.iot.vmp.conf.UserSetting; -import com.genersoft.iot.vmp.gb28181.bean.InviteStreamType; import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform; import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem; import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform; import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory; -import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem; import com.genersoft.iot.vmp.service.IMediaServerService; import com.genersoft.iot.vmp.service.IStreamPushService; @@ -25,7 +23,6 @@ import org.springframework.stereotype.Component; import javax.sip.InvalidArgumentException; import javax.sip.SipException; import java.text.ParseException; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -86,26 +83,6 @@ public class RedisPushStreamCloseResponseListener implements MessageListener { logger.error("[命令发送失败] 国标级联 发送BYE: {}", e.getMessage()); } } - if (push.isSelf()) { - // 停止向上级推流 - String streamId = sendRtpItem.getStream(); - Map param = new HashMap<>(); - param.put("vhost","__defaultVhost__"); - param.put("app",sendRtpItem.getApp()); - param.put("stream",streamId); - param.put("ssrc",sendRtpItem.getSsrc()); - logger.info("[REDIS消息-推流结束] 停止向上级推流:{}", streamId); - MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId()); - redisCatchStorage.deleteSendRTPServer(sendRtpItem.getPlatformId(), sendRtpItem.getChannelId(), sendRtpItem.getCallId(), sendRtpItem.getStream()); - zlmServerFactory.stopSendRtpStream(mediaInfo, param); - if (InviteStreamType.PUSH == sendRtpItem.getPlayType()) { - MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(0, - sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getChannelId(), - sendRtpItem.getPlatformId(), parentPlatform.getName(), userSetting.getServerId(), sendRtpItem.getMediaServerId()); - messageForPushChannel.setPlatFormIndex(parentPlatform.getId()); - redisCatchStorage.sendPlatformStopPlayMsg(messageForPushChannel); - } - } } } } diff --git a/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java b/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java index 06b1d562..66db1039 100755 --- a/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java @@ -214,4 +214,7 @@ public interface IRedisCatchStorage { OnStreamChangedHookParam getPushListItem(String app, String stream); void removePushListItem(String app, String stream, String mediaServerId); + + void sendPushStreamClose(MessageForPushChannel messageForPushChannel); + } diff --git a/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java b/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java index a1a00336..1eac4dfd 100755 --- a/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java @@ -671,4 +671,11 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage { } } + + @Override + public void sendPushStreamClose(MessageForPushChannel msg) { + String key = VideoManagerConstants.VM_MSG_STREAM_PUSH_CLOSE_REQUESTED; + logger.info("[redis发送通知] 发送 停止向上级推流 {}: {}/{}->{}", key, msg.getApp(), msg.getStream(), msg.getPlatFormId()); + redisTemplate.convertAndSend(key, JSON.toJSON(msg)); + } } From 48351c875e37a5e2edd9645c52050894b66984b1 Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: Mon, 8 Apr 2024 18:13:15 +0800 Subject: [PATCH 6/6] =?UTF-8?q?=E5=9B=BD=E6=A0=87=E7=BA=A7=E8=81=94?= =?UTF-8?q?=E6=94=AF=E6=8C=81=E5=88=B6=E5=AE=9A200OK=E4=B8=ADsdp=E7=9A=84I?= =?UTF-8?q?P?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../iot/vmp/gb28181/bean/ParentPlatform.java | 11 +++++++++++ .../request/impl/InviteRequestProcessor.java | 18 +++++++++++++----- .../vmp/storager/dao/ParentPlatformMapper.java | 5 +++-- web_src/src/components/dialog/platformEdit.vue | 12 +++++++++--- 数据库/2.7.0/初始化-mysql-2.7.0.sql | 1 + .../2.7.0/初始化-postgresql-kingbase-2.7.0.sql | 1 + 数据库/2.7.0/更新-mysql-2.7.0.sql | 5 ++++- .../2.7.0/更新-postgresql-kingbase-2.7.0.sql | 5 ++++- 8 files changed, 46 insertions(+), 12 deletions(-) diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/ParentPlatform.java b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/ParentPlatform.java index 7de5098d..5de9761e 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/ParentPlatform.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/ParentPlatform.java @@ -189,6 +189,9 @@ public class ParentPlatform { @Schema(description = "是否作为消息通道") private boolean autoPushChannel; + @Schema(description = "点播回复200OK使用次IP") + private String sendStreamIp; + public Integer getId() { return id; } @@ -436,4 +439,12 @@ public class ParentPlatform { public void setAutoPushChannel(boolean autoPushChannel) { this.autoPushChannel = autoPushChannel; } + + public String getSendStreamIp() { + return sendStreamIp; + } + + public void setSendStreamIp(String sendStreamIp) { + this.sendStreamIp = sendStreamIp; + } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java index 74fad542..96b8b11e 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java @@ -38,6 +38,7 @@ import gov.nist.javax.sdp.fields.TimeField; import gov.nist.javax.sdp.fields.URIField; import gov.nist.javax.sip.message.SIPRequest; import gov.nist.javax.sip.message.SIPResponse; +import org.apache.commons.lang3.ObjectUtils; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -404,12 +405,15 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements // * 2 推流中 sendRtpItem.setStatus(1); redisCatchStorage.updateSendRTPSever(sendRtpItem); - + String sdpIp = mediaServerItemInUSe.getSdpIp(); + if (!ObjectUtils.isEmpty(platform.getSendStreamIp())) { + sdpIp = platform.getSendStreamIp(); + } StringBuffer content = new StringBuffer(200); content.append("v=0\r\n"); - content.append("o=" + channelId + " 0 0 IN IP4 " + mediaServerItemInUSe.getSdpIp() + "\r\n"); + content.append("o=" + channelId + " 0 0 IN IP4 " + sdpIp + "\r\n"); content.append("s=" + sessionName + "\r\n"); - content.append("c=IN IP4 " + mediaServerItemInUSe.getSdpIp() + "\r\n"); + content.append("c=IN IP4 " + sdpIp + "\r\n"); if ("Playback".equalsIgnoreCase(sessionName)) { content.append("t=" + finalStartTime + " " + finalStopTime + "\r\n"); } else { @@ -907,11 +911,15 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements public SIPResponse sendStreamAck(MediaServerItem mediaServerItem, SIPRequest request, SendRtpItem sendRtpItem, ParentPlatform platform, RequestEvent evt) { + String sdpIp = mediaServerItem.getSdpIp(); + if (!ObjectUtils.isEmpty(platform.getSendStreamIp())) { + sdpIp = platform.getSendStreamIp(); + } StringBuffer content = new StringBuffer(200); content.append("v=0\r\n"); - content.append("o=" + sendRtpItem.getChannelId() + " 0 0 IN IP4 " + mediaServerItem.getSdpIp() + "\r\n"); + content.append("o=" + sendRtpItem.getChannelId() + " 0 0 IN IP4 " + sdpIp + "\r\n"); content.append("s=Play\r\n"); - content.append("c=IN IP4 " + mediaServerItem.getSdpIp() + "\r\n"); + content.append("c=IN IP4 " + sdpIp + "\r\n"); content.append("t=0 0\r\n"); // 非严格模式端口不统一, 增加兼容性,修改为一个不为0的端口 int localPort = sendRtpItem.getLocalPort(); diff --git a/src/main/java/com/genersoft/iot/vmp/storager/dao/ParentPlatformMapper.java b/src/main/java/com/genersoft/iot/vmp/storager/dao/ParentPlatformMapper.java index 9dc05034..63b19bb4 100755 --- a/src/main/java/com/genersoft/iot/vmp/storager/dao/ParentPlatformMapper.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/dao/ParentPlatformMapper.java @@ -17,10 +17,10 @@ public interface ParentPlatformMapper { @Insert("INSERT INTO wvp_platform (enable, name, server_gb_id, server_gb_domain, server_ip, server_port,device_gb_id,device_ip,"+ "device_port,username,password,expires,keep_timeout,transport,character_set,ptz,rtcp,as_message_channel,auto_push_channel,"+ - "status,start_offline_push,catalog_id,administrative_division,catalog_group,create_time,update_time) " + + "status,start_offline_push,catalog_id,administrative_division,catalog_group,create_time,update_time,send_stream_ip) " + " VALUES (#{enable}, #{name}, #{serverGBId}, #{serverGBDomain}, #{serverIP}, #{serverPort}, #{deviceGBId}, #{deviceIp}, " + " #{devicePort}, #{username}, #{password}, #{expires}, #{keepTimeout}, #{transport}, #{characterSet}, #{ptz}, #{rtcp}, #{asMessageChannel}, #{autoPushChannel}, " + - " #{status}, #{startOfflinePush}, #{catalogId}, #{administrativeDivision}, #{catalogGroup}, #{createTime}, #{updateTime})") + " #{status}, #{startOfflinePush}, #{catalogId}, #{administrativeDivision}, #{catalogGroup}, #{createTime}, #{updateTime}, #{sendStreamIp})") int addParentPlatform(ParentPlatform parentPlatform); @Update("UPDATE wvp_platform " + @@ -49,6 +49,7 @@ public interface ParentPlatformMapper { "administrative_division=#{administrativeDivision}, " + "create_time=#{createTime}, " + "update_time=#{updateTime}, " + + "send_stream_ip=#{sendStreamIp}, " + "catalog_id=#{catalogId} " + "WHERE id=#{id}") int updateParentPlatform(ParentPlatform parentPlatform); diff --git a/web_src/src/components/dialog/platformEdit.vue b/web_src/src/components/dialog/platformEdit.vue index f3f4255b..4c02dfcd 100755 --- a/web_src/src/components/dialog/platformEdit.vue +++ b/web_src/src/components/dialog/platformEdit.vue @@ -37,8 +37,8 @@ - - + + @@ -47,6 +47,9 @@ + + + @@ -159,7 +162,8 @@ export default { characterSet: "GB2312", startOfflinePush: false, catalogGroup: 1, - administrativeDivision: null, + administrativeDivision: "", + sendStreamIp: null, }, rules: { name: [{ required: true, message: "请输入平台名称", trigger: "blur" }], @@ -198,6 +202,7 @@ export default { that.platform.devicePort = res.data.data.devicePort; that.platform.username = res.data.data.username; that.platform.password = res.data.data.password; + that.platform.sendStreamIp = res.data.data.sendStreamIp; that.platform.administrativeDivision = res.data.data.username.substr(0, 6); } @@ -228,6 +233,7 @@ export default { this.platform.catalogId = platform.catalogId; this.platform.startOfflinePush = platform.startOfflinePush; this.platform.catalogGroup = platform.catalogGroup; + this.platform.sendStreamIp = platform.sendStreamIp; this.platform.administrativeDivision = platform.administrativeDivision; this.onSubmit_text = "保存"; this.saveUrl = "/api/platform/save"; diff --git a/数据库/2.7.0/初始化-mysql-2.7.0.sql b/数据库/2.7.0/初始化-mysql-2.7.0.sql index a3f4a1d9..2cd88e72 100644 --- a/数据库/2.7.0/初始化-mysql-2.7.0.sql +++ b/数据库/2.7.0/初始化-mysql-2.7.0.sql @@ -198,6 +198,7 @@ create table wvp_platform ( update_time character varying(50), as_message_channel bool default false, auto_push_channel bool default false, + send_stream_ip character varying(50), constraint uk_platform_unique_server_gb_id unique (server_gb_id) ); diff --git a/数据库/2.7.0/初始化-postgresql-kingbase-2.7.0.sql b/数据库/2.7.0/初始化-postgresql-kingbase-2.7.0.sql index 9f41667a..5cda9457 100644 --- a/数据库/2.7.0/初始化-postgresql-kingbase-2.7.0.sql +++ b/数据库/2.7.0/初始化-postgresql-kingbase-2.7.0.sql @@ -198,6 +198,7 @@ create table wvp_platform ( update_time character varying(50), as_message_channel bool default false, auto_push_channel bool default false, + send_stream_ip character varying(50), constraint uk_platform_unique_server_gb_id unique (server_gb_id) ); diff --git a/数据库/2.7.0/更新-mysql-2.7.0.sql b/数据库/2.7.0/更新-mysql-2.7.0.sql index 141c26e6..c229fb1e 100644 --- a/数据库/2.7.0/更新-mysql-2.7.0.sql +++ b/数据库/2.7.0/更新-mysql-2.7.0.sql @@ -2,4 +2,7 @@ alter table wvp_device_channel add stream_identification character varying(50); alter table wvp_device - drop switch_primary_sub_stream; \ No newline at end of file + drop switch_primary_sub_stream; + +alter table wvp_platform + add send_stream_ip character varying(50); \ No newline at end of file diff --git a/数据库/2.7.0/更新-postgresql-kingbase-2.7.0.sql b/数据库/2.7.0/更新-postgresql-kingbase-2.7.0.sql index 141c26e6..c229fb1e 100644 --- a/数据库/2.7.0/更新-postgresql-kingbase-2.7.0.sql +++ b/数据库/2.7.0/更新-postgresql-kingbase-2.7.0.sql @@ -2,4 +2,7 @@ alter table wvp_device_channel add stream_identification character varying(50); alter table wvp_device - drop switch_primary_sub_stream; \ No newline at end of file + drop switch_primary_sub_stream; + +alter table wvp_platform + add send_stream_ip character varying(50); \ No newline at end of file