优化sendRtp管理

结构优化
648540858 2024-01-25 19:58:59 +08:00
parent 932eff1d73
commit dfb1b701d5
12 changed files with 234 additions and 52 deletions

View File

@ -2,6 +2,8 @@ package com.genersoft.iot.vmp.gb28181.bean;
public class SendRtpItem {
private String id;
/**
* ip
*/
@ -18,14 +20,14 @@ public class SendRtpItem {
private String ssrc;
/**
* id
* /id
*/
private String platformId;
private String destId;
/**
* id
/**
* /id
*/
private String deviceId;
private String sourceId;
/**
*
@ -141,20 +143,12 @@ public class SendRtpItem {
this.ssrc = ssrc;
}
public String getPlatformId() {
return platformId;
public String getDestId() {
return destId;
}
public void setPlatformId(String platformId) {
this.platformId = platformId;
}
public String getDeviceId() {
return deviceId;
}
public void setDeviceId(String deviceId) {
this.deviceId = deviceId;
public void setDestId(String destId) {
this.destId = destId;
}
public String getChannelId() {
@ -292,4 +286,20 @@ public class SendRtpItem {
public void setRtcp(boolean rtcp) {
this.rtcp = rtcp;
}
public String getSourceId() {
return sourceId;
}
public void setSourceId(String sourceId) {
this.sourceId = sourceId;
}
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
}

View File

@ -106,7 +106,7 @@ public class SipRunner implements CommandLineRunner {
if (sendRtpItems.size() > 0) {
for (SendRtpItem sendRtpItem : sendRtpItems) {
MediaServerItem mediaServerItem = mediaServerService.getOne(sendRtpItem.getMediaServerId());
redisCatchStorage.deleteSendRTPServer(sendRtpItem.getPlatformId(),sendRtpItem.getChannelId(), sendRtpItem.getCallId(),sendRtpItem.getStreamId());
redisCatchStorage.deleteSendRTPServer(sendRtpItem.getDestId(),sendRtpItem.getChannelId(), sendRtpItem.getCallId(),sendRtpItem.getStreamId());
if (mediaServerItem != null) {
ssrcFactory.releaseSsrc(sendRtpItem.getMediaServerId(), sendRtpItem.getSsrc());
Map<String, Object> param = new HashMap<>();
@ -116,7 +116,7 @@ public class SipRunner implements CommandLineRunner {
param.put("ssrc",sendRtpItem.getSsrc());
JSONObject jsonObject = zlmresTfulUtils.stopSendRtp(mediaServerItem, param);
if (jsonObject != null && jsonObject.getInteger("code") == 0) {
ParentPlatform platform = platformService.queryPlatformByServerGBId(sendRtpItem.getPlatformId());
ParentPlatform platform = platformService.queryPlatformByServerGBId(sendRtpItem.getDestId());
if (platform != null) {
try {
commanderForPlatform.streamByeCmd(platform, sendRtpItem.getCallId());

View File

@ -5,7 +5,6 @@ import com.genersoft.iot.vmp.common.InviteSessionType;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.conf.exception.SsrcTransactionNotFoundException;
import com.genersoft.iot.vmp.gb28181.bean.*;
import com.genersoft.iot.vmp.gb28181.session.SSRCFactory;
import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager;
import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander;
@ -31,7 +30,6 @@ import javax.sip.header.CallIdHeader;
import javax.sip.message.Response;
import java.text.ParseException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
@ -67,9 +65,6 @@ public class ByeRequestProcessor extends SIPRequestProcessorParent implements In
@Autowired
private ZLMServerFactory zlmServerFactory;
@Autowired
private SSRCFactory ssrcFactory;
@Autowired
private IMediaServerService mediaServerService;
@ -105,7 +100,7 @@ public class ByeRequestProcessor extends SIPRequestProcessorParent implements In
SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(null, null, null, callIdHeader.getCallId());
if (sendRtpItem != null){
logger.info("[收到bye] 来自平台{} 停止通道:{}", sendRtpItem.getPlatformId(), sendRtpItem.getChannelId());
logger.info("[收到bye] 来自平台{} 停止通道:{}", sendRtpItem.getDestId(), sendRtpItem.getChannelId());
String streamId = sendRtpItem.getStreamId();
Map<String, Object> param = new HashMap<>();
param.put("vhost","__defaultVhost__");
@ -114,19 +109,19 @@ public class ByeRequestProcessor extends SIPRequestProcessorParent implements In
param.put("ssrc",sendRtpItem.getSsrc());
logger.info("[收到bye] 停止向上级推流:{}", streamId);
MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());
redisCatchStorage.deleteSendRTPServer(sendRtpItem.getPlatformId(), sendRtpItem.getChannelId(),
redisCatchStorage.deleteSendRTPServer(sendRtpItem.getDestId(), sendRtpItem.getChannelId(),
callIdHeader.getCallId(), null);
zlmServerFactory.stopSendRtpStream(mediaInfo, param);
if (sendRtpItem.getPlayType().equals(InviteStreamType.PUSH)) {
ParentPlatform platform = platformService.queryPlatformByServerGBId(sendRtpItem.getPlatformId());
ParentPlatform platform = platformService.queryPlatformByServerGBId(sendRtpItem.getDestId());
if (platform != null) {
MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(0,
sendRtpItem.getApp(), sendRtpItem.getStreamId(), sendRtpItem.getChannelId(),
sendRtpItem.getPlatformId(), platform.getName(), userSetting.getServerId(), sendRtpItem.getMediaServerId());
sendRtpItem.getDestId(), platform.getName(), userSetting.getServerId(), sendRtpItem.getMediaServerId());
messageForPushChannel.setPlatFormIndex(platform.getId());
redisCatchStorage.sendPlatformStopPlayMsg(messageForPushChannel);
}else {
logger.info("[上级平台停止观看] 未找到平台{}的信息发送redis消息失败", sendRtpItem.getPlatformId());
logger.info("[上级平台停止观看] 未找到平台{}的信息发送redis消息失败", sendRtpItem.getDestId());
}
}
@ -134,6 +129,7 @@ public class ByeRequestProcessor extends SIPRequestProcessorParent implements In
if (totalReaderCount <= 0) {
logger.info("[收到bye] {} 无其它观看者,通知设备停止推流", streamId);
if (sendRtpItem.getPlayType().equals(InviteStreamType.PLAY)) {
Device device = deviceService.getDevice(sendRtpItem.getDeviceId());
if (device == null) {
logger.info("[收到bye] {} 通知设备停止推流时未找到设备信息", streamId);

View File

@ -112,9 +112,9 @@ public class MediaStatusNotifyMessageHandler extends SIPRequestProcessorParent i
// 如果级联播放,需要给上级发送此通知 TODO 多个上级同时观看一个下级 可能存在停错的问题需要将点播CallId进行上下级绑定
SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(null, ssrcTransaction.getChannelId(), null, null);
if (sendRtpItem != null) {
ParentPlatform parentPlatform = storage.queryParentPlatByServerGBId(sendRtpItem.getPlatformId());
ParentPlatform parentPlatform = storage.queryParentPlatByServerGBId(sendRtpItem.getDestId());
if (parentPlatform == null) {
logger.warn("[级联消息发送]发送MediaStatus发现上级平台{}不存在", sendRtpItem.getPlatformId());
logger.warn("[级联消息发送]发送MediaStatus发现上级平台{}不存在", sendRtpItem.getDestId());
return;
}
try {

View File

@ -0,0 +1,31 @@
package com.genersoft.iot.vmp.media.zlm;
import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import java.util.List;
public interface IStreamSendManager {
void add(SendRtpItem sendRtpItem);
void update(SendRtpItem sendRtpItem);
SendRtpItem getByCallId(String callId);
List<SendRtpItem> getByAppAndStream(String app, String stream);
List<SendRtpItem> getByMediaServerId(String mediaServerId);
List<SendRtpItem> getBySourceId(String sourceId);
List<SendRtpItem> getByDestId(String destId);
List<SendRtpItem> getByByChanelId(String channelId);
void remove(String id);
void removeByCallID(String id);
void remove(List<SendRtpItem> sendRtpItemList);
}

View File

@ -445,7 +445,7 @@ public class ZLMHttpHookListener {
if (sendRtpItems.size() > 0) {
for (SendRtpItem sendRtpItem : sendRtpItems) {
if (sendRtpItem != null && sendRtpItem.getApp().equals(param.getApp())) {
String platformId = sendRtpItem.getPlatformId();
String platformId = sendRtpItem.getDestId();
ParentPlatform platform = storager.queryParentPlatByServerGBId(platformId);
Device device = deviceService.getDevice(platformId);
@ -500,7 +500,7 @@ public class ZLMHttpHookListener {
inviteInfo.getChannelId());
if (sendRtpItems.size() > 0) {
for (SendRtpItem sendRtpItem : sendRtpItems) {
ParentPlatform parentPlatform = storager.queryParentPlatByServerGBId(sendRtpItem.getPlatformId());
ParentPlatform parentPlatform = storager.queryParentPlatByServerGBId(sendRtpItem.getDestId());
try {
commanderFroPlatform.streamByeCmd(parentPlatform, sendRtpItem.getCallId());
} catch (SipException | InvalidArgumentException | ParseException e) {
@ -511,7 +511,7 @@ public class ZLMHttpHookListener {
if (InviteStreamType.PUSH == sendRtpItem.getPlayType()) {
MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(0,
sendRtpItem.getApp(), sendRtpItem.getStreamId(), sendRtpItem.getChannelId(),
sendRtpItem.getPlatformId(), parentPlatform.getName(), userSetting.getServerId(), sendRtpItem.getMediaServerId());
sendRtpItem.getDestId(), parentPlatform.getName(), userSetting.getServerId(), sendRtpItem.getMediaServerId());
messageForPushChannel.setPlatFormIndex(parentPlatform.getId());
redisCatchStorage.sendPlatformStopPlayMsg(messageForPushChannel);
}
@ -729,7 +729,7 @@ public class ZLMHttpHookListener {
List<SendRtpItem> sendRtpItems = redisCatchStorage.querySendRTPServerByStream(param.getStream());
if (sendRtpItems.size() > 0) {
for (SendRtpItem sendRtpItem : sendRtpItems) {
ParentPlatform parentPlatform = storager.queryParentPlatByServerGBId(sendRtpItem.getPlatformId());
ParentPlatform parentPlatform = storager.queryParentPlatByServerGBId(sendRtpItem.getDestId());
ssrcFactory.releaseSsrc(sendRtpItem.getMediaServerId(), sendRtpItem.getSsrc());
try {
commanderFroPlatform.streamByeCmd(parentPlatform, sendRtpItem.getCallId());

View File

@ -172,7 +172,7 @@ public class ZLMServerFactory {
sendRtpItem.setIp(ip);
sendRtpItem.setPort(port);
sendRtpItem.setSsrc(ssrc);
sendRtpItem.setPlatformId(platformId);
sendRtpItem.setDestId(platformId);
sendRtpItem.setChannelId(channelId);
sendRtpItem.setTcp(tcp);
sendRtpItem.setRtcp(rtcp);
@ -206,7 +206,7 @@ public class ZLMServerFactory {
sendRtpItem.setSsrc(ssrc);
sendRtpItem.setApp(app);
sendRtpItem.setStreamId(stream);
sendRtpItem.setPlatformId(platformId);
sendRtpItem.setDestId(platformId);
sendRtpItem.setChannelId(channelId);
sendRtpItem.setTcp(tcp);
sendRtpItem.setLocalPort(localPort);

View File

@ -0,0 +1,154 @@
package com.genersoft.iot.vmp.media.zlm.impl;
import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
import com.genersoft.iot.vmp.media.zlm.IStreamSendManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.UUID;
/**
*
*/
@Component
public class StreamSendManagerImpl implements IStreamSendManager {
private Logger logger = LoggerFactory.getLogger("StreamSendManagerImpl");
private final static String datePrefix = "VMP_SEND_STREAM:DATA:";
private final static String queryPrefix = "VMP_SEND_STREAM:QUERY:";
@Autowired
private RedisTemplate<Object, Object> redisTemplate;
@Override
public void add(SendRtpItem sendRtpItem) {
}
@Override
public void update(SendRtpItem sendRtpItem) {
if (sendRtpItem.getId() != null) {
sendRtpItem.setId(UUID.randomUUID().toString());
}
String dateId = datePrefix + sendRtpItem.getId();
redisTemplate.opsForValue().set(datePrefix + sendRtpItem.getId(), sendRtpItem);
if (sendRtpItem.getCallId() != null) {
redisTemplate.opsForValue().set(getCallIdKey(sendRtpItem.getCallId()), dateId);
}
if (sendRtpItem.getApp() != null && sendRtpItem.getStreamId() != null) {
redisTemplate.opsForSet().add(getAppAndStreamKey(sendRtpItem.getApp(), sendRtpItem.getStreamId()), dateId);
}
if (sendRtpItem.getMediaServerId() != null) {
redisTemplate.opsForSet().add(getMediaServerIdKey(sendRtpItem.getMediaServerId()), dateId);
}
if (sendRtpItem.getDestId() != null) {
redisTemplate.opsForSet().add(getDestIdKey(sendRtpItem.getDestId()), dateId);
}
if (sendRtpItem.getSourceId() != null) {
redisTemplate.opsForSet().add(getSourceIdKey(sendRtpItem.getSourceId()), dateId);
}
if (sendRtpItem.getChannelId() != null) {
redisTemplate.opsForSet().add(getChannelIdKey(sendRtpItem.getChannelId()), dateId);
}
}
private String getCallIdKey(String callId) {
return queryPrefix + "CALL_ID:" + callId;
}
private String getAppAndStreamKey(String app, String stream) {
return queryPrefix + "APP_STREAM:" + app + "_" + stream;
}
private String getMediaServerIdKey(String mediaServerId) {
return queryPrefix + "MEDIA_SERVER:" + mediaServerId;
}
private String getDestIdKey(String destId) {
return queryPrefix + "DEST:" + destId;
}
private String getSourceIdKey(String sourceId) {
return queryPrefix + "SOURCE:" + sourceId;
}
private String getChannelIdKey(String channelId) {
return queryPrefix + "CHANNEL:" + channelId;
}
@Override
public SendRtpItem getByCallId(String callId) {
String dateId = (String) redisTemplate.opsForValue().get(getCallIdKey(callId));
if (dateId == null) {
return null;
}
return (SendRtpItem)redisTemplate.opsForValue().get(dateId);
}
@Override
public List<SendRtpItem> getByAppAndStream(String app, String stream) {
Set<Object> dateIds = redisTemplate.opsForSet().members(getAppAndStreamKey(app, stream));
return getSendRtpItems(dateIds);
}
private List<SendRtpItem> getSendRtpItems(Set<Object> dateIds) {
if (dateIds == null || dateIds.isEmpty()) {
return null;
}
List<SendRtpItem> result = new ArrayList<>();
for (Object dateId : dateIds) {
result.add((SendRtpItem)redisTemplate.opsForValue().get(dateId));
}
return result;
}
@Override
public List<SendRtpItem> getByMediaServerId(String mediaServerId) {
Set<Object> dateIds = redisTemplate.opsForSet().members(getMediaServerIdKey(mediaServerId));
return getSendRtpItems(dateIds);
}
@Override
public List<SendRtpItem> getBySourceId(String sourceId) {
Set<Object> dateIds = redisTemplate.opsForSet().members(getSourceIdKey(sourceId));
return getSendRtpItems(dateIds);
}
@Override
public List<SendRtpItem> getByDestId(String destId) {
Set<Object> dateIds = redisTemplate.opsForSet().members(getDestIdKey(destId));
return getSendRtpItems(dateIds);
}
@Override
public List<SendRtpItem> getByByChanelId(String channelId) {
Set<Object> dateIds = redisTemplate.opsForSet().members(getChannelIdKey(channelId));
return getSendRtpItems(dateIds);
}
@Override
public void remove(String id) {
if (id == null) {
return;
}
SendRtpItem sendRtpItem = (SendRtpItem) redisTemplate.opsForValue().get(datePrefix);
if (sendRtpItem == null) {
return;
}
}
@Override
public void remove(List<SendRtpItem> sendRtpItemList) {
}
}

View File

@ -211,15 +211,6 @@ public class PlatformChannelServiceImpl implements IPlatformChannelService {
getAllParentGroup(parentGroup, allGroupMap, resultGroupList);
}
private void getAllParentRegion(Region region, Map<String, Region> allRegionMap, List<Region> resultRegionList) {
if (region.getCommonRegionDeviceId().length() == 2) {
return;
}
Region parentRegion = allRegionMap.get(region.getCommonRegionDeviceId());
resultRegionList.add(parentRegion);
getAllParentRegion(parentRegion, allRegionMap, resultRegionList);
}
@Override
public CommonGbChannel queryChannelByPlatformIdAndChannelDeviceId(Integer platformId, String channelId) {
return platformChannelMapper.queryChannelByPlatformIdAndChannelDeviceId(platformId, channelId);

View File

@ -886,7 +886,7 @@ public class PlayServiceImpl implements IPlayService {
if (sendRtpItems.size() > 0) {
for (SendRtpItem sendRtpItem : sendRtpItems) {
if (sendRtpItem.getMediaServerId().equals(mediaServerId)) {
ParentPlatform platform = storager.queryParentPlatByServerGBId(sendRtpItem.getPlatformId());
ParentPlatform platform = storager.queryParentPlatByServerGBId(sendRtpItem.getDestId());
try {
sipCommanderFroPlatform.streamByeCmd(platform, sendRtpItem.getCallId());
} catch (SipException | InvalidArgumentException | ParseException e) {

View File

@ -77,9 +77,9 @@ public class RedisPushStreamCloseResponseListener implements MessageListener {
push.getGbId());
if (!sendRtpItems.isEmpty()) {
for (SendRtpItem sendRtpItem : sendRtpItems) {
ParentPlatform parentPlatform = storager.queryParentPlatByServerGBId(sendRtpItem.getPlatformId());
ParentPlatform parentPlatform = storager.queryParentPlatByServerGBId(sendRtpItem.getDestId());
if (parentPlatform != null) {
redisCatchStorage.deleteSendRTPServer(sendRtpItem.getPlatformId(), sendRtpItem.getChannelId(), sendRtpItem.getCallId(), sendRtpItem.getStreamId());
redisCatchStorage.deleteSendRTPServer(sendRtpItem.getDestId(), sendRtpItem.getChannelId(), sendRtpItem.getCallId(), sendRtpItem.getStreamId());
try {
commanderFroPlatform.streamByeCmd(parentPlatform, sendRtpItem);
} catch (SipException | InvalidArgumentException | ParseException e) {
@ -96,12 +96,12 @@ public class RedisPushStreamCloseResponseListener implements MessageListener {
param.put("ssrc",sendRtpItem.getSsrc());
logger.info("[REDIS消息-推流结束] 停止向上级推流:{}", streamId);
MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());
redisCatchStorage.deleteSendRTPServer(sendRtpItem.getPlatformId(), sendRtpItem.getChannelId(), sendRtpItem.getCallId(), sendRtpItem.getStreamId());
redisCatchStorage.deleteSendRTPServer(sendRtpItem.getDestId(), sendRtpItem.getChannelId(), sendRtpItem.getCallId(), sendRtpItem.getStreamId());
zlmServerFactory.stopSendRtpStream(mediaInfo, param);
if (InviteStreamType.PUSH == sendRtpItem.getPlayType()) {
MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(0,
sendRtpItem.getApp(), sendRtpItem.getStreamId(), sendRtpItem.getChannelId(),
sendRtpItem.getPlatformId(), parentPlatform.getName(), userSetting.getServerId(), sendRtpItem.getMediaServerId());
sendRtpItem.getDestId(), parentPlatform.getName(), userSetting.getServerId(), sendRtpItem.getMediaServerId());
messageForPushChannel.setPlatFormIndex(parentPlatform.getId());
redisCatchStorage.sendPlatformStopPlayMsg(messageForPushChannel);
}

View File

@ -148,7 +148,7 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage {
String key = VideoManagerConstants.PLATFORM_SEND_RTP_INFO_PREFIX +
userSetting.getServerId() + "_"
+ sendRtpItem.getMediaServerId() + "_"
+ sendRtpItem.getPlatformId() + "_"
+ sendRtpItem.getDestId() + "_"
+ sendRtpItem.getChannelId() + "_"
+ sendRtpItem.getStreamId() + "_"
+ sendRtpItem.getCallId();