From dfb1b701d55611ac122d0e8b3c92f7a186e4dd7a Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: Thu, 25 Jan 2024 19:58:59 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96sendRtp=E7=AE=A1=E7=90=86?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../iot/vmp/gb28181/bean/SendRtpItem.java | 44 +++-- .../iot/vmp/gb28181/task/SipRunner.java | 4 +- .../request/impl/ByeRequestProcessor.java | 16 +- .../cmd/MediaStatusNotifyMessageHandler.java | 4 +- .../iot/vmp/media/zlm/IStreamSendManager.java | 31 ++++ .../vmp/media/zlm/ZLMHttpHookListener.java | 8 +- .../iot/vmp/media/zlm/ZLMServerFactory.java | 4 +- .../media/zlm/impl/StreamSendManagerImpl.java | 154 ++++++++++++++++++ .../impl/PlatformChannelServiceImpl.java | 9 - .../iot/vmp/service/impl/PlayServiceImpl.java | 2 +- .../RedisPushStreamCloseResponseListener.java | 8 +- .../storager/impl/RedisCatchStorageImpl.java | 2 +- 12 files changed, 234 insertions(+), 52 deletions(-) create mode 100644 src/main/java/com/genersoft/iot/vmp/media/zlm/IStreamSendManager.java create mode 100644 src/main/java/com/genersoft/iot/vmp/media/zlm/impl/StreamSendManagerImpl.java diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SendRtpItem.java b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SendRtpItem.java index c1fe2c1f..847f3824 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SendRtpItem.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SendRtpItem.java @@ -2,6 +2,8 @@ package com.genersoft.iot.vmp.gb28181.bean; public class SendRtpItem { + private String id; + /** * 推流ip */ @@ -18,14 +20,14 @@ public class SendRtpItem { private String ssrc; /** - * 平台id + * 目标设备/平台id */ - private String platformId; + private String destId; - /** - * 对应设备id + /** + * 源设备/平台id */ - private String deviceId; + private String sourceId; /** * 直播流的应用名 @@ -141,20 +143,12 @@ public class SendRtpItem { this.ssrc = ssrc; } - public String getPlatformId() { - return platformId; + public String getDestId() { + return destId; } - public void setPlatformId(String platformId) { - this.platformId = platformId; - } - - public String getDeviceId() { - return deviceId; - } - - public void setDeviceId(String deviceId) { - this.deviceId = deviceId; + public void setDestId(String destId) { + this.destId = destId; } public String getChannelId() { @@ -292,4 +286,20 @@ public class SendRtpItem { public void setRtcp(boolean rtcp) { this.rtcp = rtcp; } + + public String getSourceId() { + return sourceId; + } + + public void setSourceId(String sourceId) { + this.sourceId = sourceId; + } + + public String getId() { + return id; + } + + public void setId(String id) { + this.id = id; + } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/task/SipRunner.java b/src/main/java/com/genersoft/iot/vmp/gb28181/task/SipRunner.java index 83418be0..9d5843ba 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/task/SipRunner.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/task/SipRunner.java @@ -106,7 +106,7 @@ public class SipRunner implements CommandLineRunner { if (sendRtpItems.size() > 0) { for (SendRtpItem sendRtpItem : sendRtpItems) { MediaServerItem mediaServerItem = mediaServerService.getOne(sendRtpItem.getMediaServerId()); - redisCatchStorage.deleteSendRTPServer(sendRtpItem.getPlatformId(),sendRtpItem.getChannelId(), sendRtpItem.getCallId(),sendRtpItem.getStreamId()); + redisCatchStorage.deleteSendRTPServer(sendRtpItem.getDestId(),sendRtpItem.getChannelId(), sendRtpItem.getCallId(),sendRtpItem.getStreamId()); if (mediaServerItem != null) { ssrcFactory.releaseSsrc(sendRtpItem.getMediaServerId(), sendRtpItem.getSsrc()); Map param = new HashMap<>(); @@ -116,7 +116,7 @@ public class SipRunner implements CommandLineRunner { param.put("ssrc",sendRtpItem.getSsrc()); JSONObject jsonObject = zlmresTfulUtils.stopSendRtp(mediaServerItem, param); if (jsonObject != null && jsonObject.getInteger("code") == 0) { - ParentPlatform platform = platformService.queryPlatformByServerGBId(sendRtpItem.getPlatformId()); + ParentPlatform platform = platformService.queryPlatformByServerGBId(sendRtpItem.getDestId()); if (platform != null) { try { commanderForPlatform.streamByeCmd(platform, sendRtpItem.getCallId()); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java index 8fcec354..15489114 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java @@ -5,7 +5,6 @@ import com.genersoft.iot.vmp.common.InviteSessionType; import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.conf.exception.SsrcTransactionNotFoundException; import com.genersoft.iot.vmp.gb28181.bean.*; -import com.genersoft.iot.vmp.gb28181.session.SSRCFactory; import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager; import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver; import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander; @@ -31,7 +30,6 @@ import javax.sip.header.CallIdHeader; import javax.sip.message.Response; import java.text.ParseException; import java.util.HashMap; -import java.util.List; import java.util.Map; /** @@ -67,9 +65,6 @@ public class ByeRequestProcessor extends SIPRequestProcessorParent implements In @Autowired private ZLMServerFactory zlmServerFactory; - @Autowired - private SSRCFactory ssrcFactory; - @Autowired private IMediaServerService mediaServerService; @@ -105,7 +100,7 @@ public class ByeRequestProcessor extends SIPRequestProcessorParent implements In SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(null, null, null, callIdHeader.getCallId()); if (sendRtpItem != null){ - logger.info("[收到bye] 来自平台{}, 停止通道:{}", sendRtpItem.getPlatformId(), sendRtpItem.getChannelId()); + logger.info("[收到bye] 来自平台{}, 停止通道:{}", sendRtpItem.getDestId(), sendRtpItem.getChannelId()); String streamId = sendRtpItem.getStreamId(); Map param = new HashMap<>(); param.put("vhost","__defaultVhost__"); @@ -114,19 +109,19 @@ public class ByeRequestProcessor extends SIPRequestProcessorParent implements In param.put("ssrc",sendRtpItem.getSsrc()); logger.info("[收到bye] 停止向上级推流:{}", streamId); MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId()); - redisCatchStorage.deleteSendRTPServer(sendRtpItem.getPlatformId(), sendRtpItem.getChannelId(), + redisCatchStorage.deleteSendRTPServer(sendRtpItem.getDestId(), sendRtpItem.getChannelId(), callIdHeader.getCallId(), null); zlmServerFactory.stopSendRtpStream(mediaInfo, param); if (sendRtpItem.getPlayType().equals(InviteStreamType.PUSH)) { - ParentPlatform platform = platformService.queryPlatformByServerGBId(sendRtpItem.getPlatformId()); + ParentPlatform platform = platformService.queryPlatformByServerGBId(sendRtpItem.getDestId()); if (platform != null) { MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(0, sendRtpItem.getApp(), sendRtpItem.getStreamId(), sendRtpItem.getChannelId(), - sendRtpItem.getPlatformId(), platform.getName(), userSetting.getServerId(), sendRtpItem.getMediaServerId()); + sendRtpItem.getDestId(), platform.getName(), userSetting.getServerId(), sendRtpItem.getMediaServerId()); messageForPushChannel.setPlatFormIndex(platform.getId()); redisCatchStorage.sendPlatformStopPlayMsg(messageForPushChannel); }else { - logger.info("[上级平台停止观看] 未找到平台{}的信息,发送redis消息失败", sendRtpItem.getPlatformId()); + logger.info("[上级平台停止观看] 未找到平台{}的信息,发送redis消息失败", sendRtpItem.getDestId()); } } @@ -134,6 +129,7 @@ public class ByeRequestProcessor extends SIPRequestProcessorParent implements In if (totalReaderCount <= 0) { logger.info("[收到bye] {} 无其它观看者,通知设备停止推流", streamId); if (sendRtpItem.getPlayType().equals(InviteStreamType.PLAY)) { + Device device = deviceService.getDevice(sendRtpItem.getDeviceId()); if (device == null) { logger.info("[收到bye] {} 通知设备停止推流时未找到设备信息", streamId); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/MediaStatusNotifyMessageHandler.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/MediaStatusNotifyMessageHandler.java index fdcd5e4b..30d4370d 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/MediaStatusNotifyMessageHandler.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/MediaStatusNotifyMessageHandler.java @@ -112,9 +112,9 @@ public class MediaStatusNotifyMessageHandler extends SIPRequestProcessorParent i // 如果级联播放,需要给上级发送此通知 TODO 多个上级同时观看一个下级 可能存在停错的问题,需要将点播CallId进行上下级绑定 SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(null, ssrcTransaction.getChannelId(), null, null); if (sendRtpItem != null) { - ParentPlatform parentPlatform = storage.queryParentPlatByServerGBId(sendRtpItem.getPlatformId()); + ParentPlatform parentPlatform = storage.queryParentPlatByServerGBId(sendRtpItem.getDestId()); if (parentPlatform == null) { - logger.warn("[级联消息发送]:发送MediaStatus发现上级平台{}不存在", sendRtpItem.getPlatformId()); + logger.warn("[级联消息发送]:发送MediaStatus发现上级平台{}不存在", sendRtpItem.getDestId()); return; } try { diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/IStreamSendManager.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/IStreamSendManager.java new file mode 100644 index 00000000..72add757 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/IStreamSendManager.java @@ -0,0 +1,31 @@ +package com.genersoft.iot.vmp.media.zlm; + +import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem; +import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; + +import java.util.List; + +public interface IStreamSendManager { + + void add(SendRtpItem sendRtpItem); + + void update(SendRtpItem sendRtpItem); + + SendRtpItem getByCallId(String callId); + + List getByAppAndStream(String app, String stream); + + List getByMediaServerId(String mediaServerId); + + List getBySourceId(String sourceId); + + List getByDestId(String destId); + + List getByByChanelId(String channelId); + + void remove(String id); + + void removeByCallID(String id); + + void remove(List sendRtpItemList); +} diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java index ea6a1a8d..807569b9 100755 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java @@ -445,7 +445,7 @@ public class ZLMHttpHookListener { if (sendRtpItems.size() > 0) { for (SendRtpItem sendRtpItem : sendRtpItems) { if (sendRtpItem != null && sendRtpItem.getApp().equals(param.getApp())) { - String platformId = sendRtpItem.getPlatformId(); + String platformId = sendRtpItem.getDestId(); ParentPlatform platform = storager.queryParentPlatByServerGBId(platformId); Device device = deviceService.getDevice(platformId); @@ -500,7 +500,7 @@ public class ZLMHttpHookListener { inviteInfo.getChannelId()); if (sendRtpItems.size() > 0) { for (SendRtpItem sendRtpItem : sendRtpItems) { - ParentPlatform parentPlatform = storager.queryParentPlatByServerGBId(sendRtpItem.getPlatformId()); + ParentPlatform parentPlatform = storager.queryParentPlatByServerGBId(sendRtpItem.getDestId()); try { commanderFroPlatform.streamByeCmd(parentPlatform, sendRtpItem.getCallId()); } catch (SipException | InvalidArgumentException | ParseException e) { @@ -511,7 +511,7 @@ public class ZLMHttpHookListener { if (InviteStreamType.PUSH == sendRtpItem.getPlayType()) { MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(0, sendRtpItem.getApp(), sendRtpItem.getStreamId(), sendRtpItem.getChannelId(), - sendRtpItem.getPlatformId(), parentPlatform.getName(), userSetting.getServerId(), sendRtpItem.getMediaServerId()); + sendRtpItem.getDestId(), parentPlatform.getName(), userSetting.getServerId(), sendRtpItem.getMediaServerId()); messageForPushChannel.setPlatFormIndex(parentPlatform.getId()); redisCatchStorage.sendPlatformStopPlayMsg(messageForPushChannel); } @@ -729,7 +729,7 @@ public class ZLMHttpHookListener { List sendRtpItems = redisCatchStorage.querySendRTPServerByStream(param.getStream()); if (sendRtpItems.size() > 0) { for (SendRtpItem sendRtpItem : sendRtpItems) { - ParentPlatform parentPlatform = storager.queryParentPlatByServerGBId(sendRtpItem.getPlatformId()); + ParentPlatform parentPlatform = storager.queryParentPlatByServerGBId(sendRtpItem.getDestId()); ssrcFactory.releaseSsrc(sendRtpItem.getMediaServerId(), sendRtpItem.getSsrc()); try { commanderFroPlatform.streamByeCmd(parentPlatform, sendRtpItem.getCallId()); diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMServerFactory.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMServerFactory.java index 7d7ebc93..0ecbe396 100755 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMServerFactory.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMServerFactory.java @@ -172,7 +172,7 @@ public class ZLMServerFactory { sendRtpItem.setIp(ip); sendRtpItem.setPort(port); sendRtpItem.setSsrc(ssrc); - sendRtpItem.setPlatformId(platformId); + sendRtpItem.setDestId(platformId); sendRtpItem.setChannelId(channelId); sendRtpItem.setTcp(tcp); sendRtpItem.setRtcp(rtcp); @@ -206,7 +206,7 @@ public class ZLMServerFactory { sendRtpItem.setSsrc(ssrc); sendRtpItem.setApp(app); sendRtpItem.setStreamId(stream); - sendRtpItem.setPlatformId(platformId); + sendRtpItem.setDestId(platformId); sendRtpItem.setChannelId(channelId); sendRtpItem.setTcp(tcp); sendRtpItem.setLocalPort(localPort); diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/impl/StreamSendManagerImpl.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/impl/StreamSendManagerImpl.java new file mode 100644 index 00000000..a9cdbe92 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/impl/StreamSendManagerImpl.java @@ -0,0 +1,154 @@ +package com.genersoft.iot.vmp.media.zlm.impl; + +import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem; +import com.genersoft.iot.vmp.media.zlm.IStreamSendManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.data.redis.core.RedisTemplate; +import org.springframework.stereotype.Component; + +import java.util.ArrayList; +import java.util.List; +import java.util.Set; +import java.util.UUID; + +/** + * 管理流的发送 + */ +@Component +public class StreamSendManagerImpl implements IStreamSendManager { + + private Logger logger = LoggerFactory.getLogger("StreamSendManagerImpl"); + + private final static String datePrefix = "VMP_SEND_STREAM:DATA:"; + private final static String queryPrefix = "VMP_SEND_STREAM:QUERY:"; + + @Autowired + private RedisTemplate redisTemplate; + + @Override + public void add(SendRtpItem sendRtpItem) { + + } + + @Override + public void update(SendRtpItem sendRtpItem) { + if (sendRtpItem.getId() != null) { + sendRtpItem.setId(UUID.randomUUID().toString()); + } + String dateId = datePrefix + sendRtpItem.getId(); + redisTemplate.opsForValue().set(datePrefix + sendRtpItem.getId(), sendRtpItem); + + if (sendRtpItem.getCallId() != null) { + redisTemplate.opsForValue().set(getCallIdKey(sendRtpItem.getCallId()), dateId); + } + if (sendRtpItem.getApp() != null && sendRtpItem.getStreamId() != null) { + redisTemplate.opsForSet().add(getAppAndStreamKey(sendRtpItem.getApp(), sendRtpItem.getStreamId()), dateId); + } + if (sendRtpItem.getMediaServerId() != null) { + redisTemplate.opsForSet().add(getMediaServerIdKey(sendRtpItem.getMediaServerId()), dateId); + } + if (sendRtpItem.getDestId() != null) { + redisTemplate.opsForSet().add(getDestIdKey(sendRtpItem.getDestId()), dateId); + } + if (sendRtpItem.getSourceId() != null) { + redisTemplate.opsForSet().add(getSourceIdKey(sendRtpItem.getSourceId()), dateId); + } + if (sendRtpItem.getChannelId() != null) { + redisTemplate.opsForSet().add(getChannelIdKey(sendRtpItem.getChannelId()), dateId); + } + } + + private String getCallIdKey(String callId) { + return queryPrefix + "CALL_ID:" + callId; + } + + private String getAppAndStreamKey(String app, String stream) { + return queryPrefix + "APP_STREAM:" + app + "_" + stream; + } + + private String getMediaServerIdKey(String mediaServerId) { + return queryPrefix + "MEDIA_SERVER:" + mediaServerId; + } + + private String getDestIdKey(String destId) { + return queryPrefix + "DEST:" + destId; + } + + private String getSourceIdKey(String sourceId) { + return queryPrefix + "SOURCE:" + sourceId; + } + + private String getChannelIdKey(String channelId) { + return queryPrefix + "CHANNEL:" + channelId; + } + + @Override + public SendRtpItem getByCallId(String callId) { + String dateId = (String) redisTemplate.opsForValue().get(getCallIdKey(callId)); + if (dateId == null) { + return null; + } + return (SendRtpItem)redisTemplate.opsForValue().get(dateId); + } + + + @Override + public List getByAppAndStream(String app, String stream) { + Set dateIds = redisTemplate.opsForSet().members(getAppAndStreamKey(app, stream)); + return getSendRtpItems(dateIds); + } + + private List getSendRtpItems(Set dateIds) { + if (dateIds == null || dateIds.isEmpty()) { + return null; + } + List result = new ArrayList<>(); + for (Object dateId : dateIds) { + result.add((SendRtpItem)redisTemplate.opsForValue().get(dateId)); + } + return result; + } + + @Override + public List getByMediaServerId(String mediaServerId) { + Set dateIds = redisTemplate.opsForSet().members(getMediaServerIdKey(mediaServerId)); + return getSendRtpItems(dateIds); + } + + @Override + public List getBySourceId(String sourceId) { + Set dateIds = redisTemplate.opsForSet().members(getSourceIdKey(sourceId)); + return getSendRtpItems(dateIds); + } + + @Override + public List getByDestId(String destId) { + Set dateIds = redisTemplate.opsForSet().members(getDestIdKey(destId)); + return getSendRtpItems(dateIds); + } + + @Override + public List getByByChanelId(String channelId) { + Set dateIds = redisTemplate.opsForSet().members(getChannelIdKey(channelId)); + return getSendRtpItems(dateIds); + } + + @Override + public void remove(String id) { + if (id == null) { + return; + } + SendRtpItem sendRtpItem = (SendRtpItem) redisTemplate.opsForValue().get(datePrefix); + if (sendRtpItem == null) { + return; + } + + } + + @Override + public void remove(List sendRtpItemList) { + + } +} diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/PlatformChannelServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/PlatformChannelServiceImpl.java index 55060b79..1b3bfad2 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/PlatformChannelServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/PlatformChannelServiceImpl.java @@ -211,15 +211,6 @@ public class PlatformChannelServiceImpl implements IPlatformChannelService { getAllParentGroup(parentGroup, allGroupMap, resultGroupList); } - private void getAllParentRegion(Region region, Map allRegionMap, List resultRegionList) { - if (region.getCommonRegionDeviceId().length() == 2) { - return; - } - Region parentRegion = allRegionMap.get(region.getCommonRegionDeviceId()); - resultRegionList.add(parentRegion); - getAllParentRegion(parentRegion, allRegionMap, resultRegionList); - } - @Override public CommonGbChannel queryChannelByPlatformIdAndChannelDeviceId(Integer platformId, String channelId) { return platformChannelMapper.queryChannelByPlatformIdAndChannelDeviceId(platformId, channelId); diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java index 3066ce56..51085f00 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java @@ -886,7 +886,7 @@ public class PlayServiceImpl implements IPlayService { if (sendRtpItems.size() > 0) { for (SendRtpItem sendRtpItem : sendRtpItems) { if (sendRtpItem.getMediaServerId().equals(mediaServerId)) { - ParentPlatform platform = storager.queryParentPlatByServerGBId(sendRtpItem.getPlatformId()); + ParentPlatform platform = storager.queryParentPlatByServerGBId(sendRtpItem.getDestId()); try { sipCommanderFroPlatform.streamByeCmd(platform, sendRtpItem.getCallId()); } catch (SipException | InvalidArgumentException | ParseException e) { diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamCloseResponseListener.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamCloseResponseListener.java index f5603869..90a98cfe 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamCloseResponseListener.java +++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamCloseResponseListener.java @@ -77,9 +77,9 @@ public class RedisPushStreamCloseResponseListener implements MessageListener { push.getGbId()); if (!sendRtpItems.isEmpty()) { for (SendRtpItem sendRtpItem : sendRtpItems) { - ParentPlatform parentPlatform = storager.queryParentPlatByServerGBId(sendRtpItem.getPlatformId()); + ParentPlatform parentPlatform = storager.queryParentPlatByServerGBId(sendRtpItem.getDestId()); if (parentPlatform != null) { - redisCatchStorage.deleteSendRTPServer(sendRtpItem.getPlatformId(), sendRtpItem.getChannelId(), sendRtpItem.getCallId(), sendRtpItem.getStreamId()); + redisCatchStorage.deleteSendRTPServer(sendRtpItem.getDestId(), sendRtpItem.getChannelId(), sendRtpItem.getCallId(), sendRtpItem.getStreamId()); try { commanderFroPlatform.streamByeCmd(parentPlatform, sendRtpItem); } catch (SipException | InvalidArgumentException | ParseException e) { @@ -96,12 +96,12 @@ public class RedisPushStreamCloseResponseListener implements MessageListener { param.put("ssrc",sendRtpItem.getSsrc()); logger.info("[REDIS消息-推流结束] 停止向上级推流:{}", streamId); MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId()); - redisCatchStorage.deleteSendRTPServer(sendRtpItem.getPlatformId(), sendRtpItem.getChannelId(), sendRtpItem.getCallId(), sendRtpItem.getStreamId()); + redisCatchStorage.deleteSendRTPServer(sendRtpItem.getDestId(), sendRtpItem.getChannelId(), sendRtpItem.getCallId(), sendRtpItem.getStreamId()); zlmServerFactory.stopSendRtpStream(mediaInfo, param); if (InviteStreamType.PUSH == sendRtpItem.getPlayType()) { MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(0, sendRtpItem.getApp(), sendRtpItem.getStreamId(), sendRtpItem.getChannelId(), - sendRtpItem.getPlatformId(), parentPlatform.getName(), userSetting.getServerId(), sendRtpItem.getMediaServerId()); + sendRtpItem.getDestId(), parentPlatform.getName(), userSetting.getServerId(), sendRtpItem.getMediaServerId()); messageForPushChannel.setPlatFormIndex(parentPlatform.getId()); redisCatchStorage.sendPlatformStopPlayMsg(messageForPushChannel); } diff --git a/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java b/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java index d44bf834..6c1ff13c 100755 --- a/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java @@ -148,7 +148,7 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage { String key = VideoManagerConstants.PLATFORM_SEND_RTP_INFO_PREFIX + userSetting.getServerId() + "_" + sendRtpItem.getMediaServerId() + "_" - + sendRtpItem.getPlatformId() + "_" + + sendRtpItem.getDestId() + "_" + sendRtpItem.getChannelId() + "_" + sendRtpItem.getStreamId() + "_" + sendRtpItem.getCallId();