优化sedRTP的管理

结构优化
648540858 2024-01-25 23:39:18 +08:00
parent dfb1b701d5
commit 993773860e
16 changed files with 162 additions and 447 deletions

View File

@ -7,6 +7,7 @@ import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform;
import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
import com.genersoft.iot.vmp.gb28181.session.SSRCFactory;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform;
import com.genersoft.iot.vmp.media.zlm.IStreamSendManager;
import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.service.IDeviceService;
@ -39,7 +40,7 @@ import java.util.Map;
public class SipRunner implements CommandLineRunner {
@Autowired
private IVideoManagerStorage storager;
private IStreamSendManager streamSendManager;
@Autowired
private IRedisCatchStorage redisCatchStorage;
@ -102,11 +103,11 @@ public class SipRunner implements CommandLineRunner {
// 查找国标推流
List<SendRtpItem> sendRtpItems = redisCatchStorage.queryAllSendRTPServer();
List<SendRtpItem> sendRtpItems = streamSendManager.getAll();
if (sendRtpItems.size() > 0) {
for (SendRtpItem sendRtpItem : sendRtpItems) {
MediaServerItem mediaServerItem = mediaServerService.getOne(sendRtpItem.getMediaServerId());
redisCatchStorage.deleteSendRTPServer(sendRtpItem.getDestId(),sendRtpItem.getChannelId(), sendRtpItem.getCallId(),sendRtpItem.getStreamId());
streamSendManager.remove(sendRtpItem);
if (mediaServerItem != null) {
ssrcFactory.releaseSsrc(sendRtpItem.getMediaServerId(), sendRtpItem.getSsrc());
Map<String, Object> param = new HashMap<>();

View File

@ -10,6 +10,7 @@ import com.genersoft.iot.vmp.gb28181.transmit.SIPSender;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.SIPRequestHeaderPlarformProvider;
import com.genersoft.iot.vmp.gb28181.utils.SipUtils;
import com.genersoft.iot.vmp.media.zlm.IStreamSendManager;
import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.service.IMediaServerService;
@ -60,7 +61,7 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform {
private ZLMServerFactory zlmServerFactory;
@Autowired
private SipLayer sipLayer;
private IStreamSendManager streamSendManager;
@Autowired
private SIPSender sipSender;
@ -816,7 +817,7 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform {
if (platform == null) {
return;
}
SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(platform.getServerGBId(), null, null, callId);
SendRtpItem sendRtpItem = streamSendManager.getByCallId(callId);
if (sendRtpItem != null) {
streamByeCmd(platform, sendRtpItem);
}

View File

@ -12,6 +12,7 @@ import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.ISIPRequestProcessor;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent;
import com.genersoft.iot.vmp.media.zlm.IStreamSendManager;
import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory;
import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
@ -70,7 +71,7 @@ public class AckRequestProcessor extends SIPRequestProcessorParent implements In
private ZLMServerFactory zlmServerFactory;
@Autowired
private ZlmHttpHookSubscribe hookSubscribe;
private IStreamSendManager streamSendManager;
@Autowired
private IMediaServerService mediaServerService;
@ -106,7 +107,7 @@ public class AckRequestProcessor extends SIPRequestProcessorParent implements In
// 取消设置的超时任务
dynamicTask.stop(callIdHeader.getCallId());
String channelId = ((SipURI) ((HeaderAddress) evt.getRequest().getHeader(ToHeader.NAME)).getAddress().getURI()).getUser();
SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(platformGbId, channelId, null, callIdHeader.getCallId());
SendRtpItem sendRtpItem = streamSendManager.getByCallId(callIdHeader.getCallId());
if (sendRtpItem == null) {
logger.warn("[收到ACK]:未找到通道({})的推流信息", channelId);
return;

View File

@ -1,5 +1,6 @@
package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl;
import com.genersoft.iot.vmp.common.CommonGbChannel;
import com.genersoft.iot.vmp.common.InviteInfo;
import com.genersoft.iot.vmp.common.InviteSessionType;
import com.genersoft.iot.vmp.conf.UserSetting;
@ -10,6 +11,7 @@ import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.ISIPRequestProcessor;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent;
import com.genersoft.iot.vmp.media.zlm.IStreamSendManager;
import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.service.*;
@ -44,6 +46,12 @@ public class ByeRequestProcessor extends SIPRequestProcessorParent implements In
@Autowired
private ISIPCommander cmder;
@Autowired
private IStreamSendManager streamSendManager;
@Autowired
private ICommonGbChannelService commonGbChannelService;
@Autowired
private IRedisCatchStorage redisCatchStorage;
@ -77,6 +85,9 @@ public class ByeRequestProcessor extends SIPRequestProcessorParent implements In
@Autowired
private UserSetting userSetting;
@Autowired
private Map<String, IResourceService> resourceServiceMap;
@Override
public void afterPropertiesSet() throws Exception {
// 添加消息处理的订阅
@ -97,10 +108,9 @@ public class ByeRequestProcessor extends SIPRequestProcessorParent implements In
}
CallIdHeader callIdHeader = (CallIdHeader)evt.getRequest().getHeader(CallIdHeader.NAME);
SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(null, null, null, callIdHeader.getCallId());
SendRtpItem sendRtpItem = streamSendManager.getByCallId(callIdHeader.getCallId());
if (sendRtpItem != null){
logger.info("[收到bye] 来自平台{} 停止通道:{}", sendRtpItem.getDestId(), sendRtpItem.getChannelId());
logger.info("[收到bye] 来自{} 停止通道:{}", sendRtpItem.getDestId(), sendRtpItem.getChannelId());
String streamId = sendRtpItem.getStreamId();
Map<String, Object> param = new HashMap<>();
param.put("vhost","__defaultVhost__");
@ -109,9 +119,13 @@ public class ByeRequestProcessor extends SIPRequestProcessorParent implements In
param.put("ssrc",sendRtpItem.getSsrc());
logger.info("[收到bye] 停止向上级推流:{}", streamId);
MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());
redisCatchStorage.deleteSendRTPServer(sendRtpItem.getDestId(), sendRtpItem.getChannelId(),
callIdHeader.getCallId(), null);
zlmServerFactory.stopSendRtpStream(mediaInfo, param);
streamSendManager.remove(sendRtpItem);
CommonGbChannel channel = commonGbChannelService.getChannel(sendRtpItem.getChannelId());
IResourceService resourceService = resourceServiceMap.get(channel.getType());
if (resourceService != null) {
resourceService.stopPlay(channel, null);
}
if (sendRtpItem.getPlayType().equals(InviteStreamType.PUSH)) {
ParentPlatform platform = platformService.queryPlatformByServerGBId(sendRtpItem.getDestId());
if (platform != null) {
@ -124,25 +138,6 @@ public class ByeRequestProcessor extends SIPRequestProcessorParent implements In
logger.info("[上级平台停止观看] 未找到平台{}的信息发送redis消息失败", sendRtpItem.getDestId());
}
}
int totalReaderCount = zlmServerFactory.totalReaderCount(mediaInfo, sendRtpItem.getApp(), streamId);
if (totalReaderCount <= 0) {
logger.info("[收到bye] {} 无其它观看者,通知设备停止推流", streamId);
if (sendRtpItem.getPlayType().equals(InviteStreamType.PLAY)) {
Device device = deviceService.getDevice(sendRtpItem.getDeviceId());
if (device == null) {
logger.info("[收到bye] {} 通知设备停止推流时未找到设备信息", streamId);
}
try {
logger.info("[停止点播] {}", sendRtpItem.getChannelId());
cmder.streamByeCmd(device, sendRtpItem.getChannelId(), streamId, null);
} catch (InvalidArgumentException | ParseException | SipException |
SsrcTransactionNotFoundException e) {
logger.error("[收到bye] {} 无其它观看者,通知设备停止推流, 发送BYE失败 {}",streamId, e.getMessage());
}
}
}
}else {
// 可能是设备发送的停止

View File

@ -14,11 +14,13 @@ import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommanderFroPlatform;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.ISIPRequestProcessor;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent;
import com.genersoft.iot.vmp.gb28181.utils.SipUtils;
import com.genersoft.iot.vmp.media.zlm.IStreamSendManager;
import com.genersoft.iot.vmp.media.zlm.ZLMMediaListManager;
import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory;
import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe;
import com.genersoft.iot.vmp.media.zlm.dto.*;
import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam;
import com.genersoft.iot.vmp.media.zlm.service.ISendRtpService;
import com.genersoft.iot.vmp.service.*;
import com.genersoft.iot.vmp.service.bean.ErrorCallback;
import com.genersoft.iot.vmp.service.bean.InviteErrorCode;
@ -81,6 +83,8 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
@Autowired
private IRedisCatchStorage redisCatchStorage;
private IStreamSendManager streamSendManager;
@Autowired
private SSRCFactory ssrcFactory;
@ -265,7 +269,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
sendRtpItem.setPlayType(InviteStreamType.DOWNLOAD);
}
sendRtpItem.setStatus(1);
redisCatchStorage.updateSendRTPSever(sendRtpItem);
streamSendManager.update(sendRtpItem);
StringBuffer content = new StringBuffer(200);
content.append("v=0\r\n");
@ -347,191 +351,6 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
resourceService.startDownload(channel, gb28181Sdp.getStartTime(), gb28181Sdp.getStopTime(),
gb28181Sdp.getDownloadSpeed(), callback);
}
//
// Device device = null;
// // 通过 channel 和 gbStream 是否为null 值判断来源是直播流合适国标
// if (channel != null) {
//
// ErrorCallback<Object> hookEvent = (code, msg, data) -> {
// StreamInfo streamInfo = (StreamInfo)data;
// MediaServerItem mediaServerItemInUSe = mediaServerService.getOne(streamInfo.getMediaServerId());
// logger.info("[上级Invite]下级已经开始推流。 回复200OK(SDP) {}/{}", streamInfo.getApp(), streamInfo.getStream());
// // * 0 等待设备推流上来
// // * 1 下级已经推流等待上级平台回复ack
// // * 2 推流中
// sendRtpItem.setStatus(1);
// redisCatchStorage.updateSendRTPSever(sendRtpItem);
//
//
//
//
// try {
// // 超时未收到Ack应该回复bye,当前等待时间为10秒
// dynamicTask.startDelay(callIdHeader.getCallId(), () -> {
// logger.info("Ack 等待超时");
// mediaServerService.releaseSsrc(mediaServerItemInUSe.getId(), sendRtpItem.getSsrc());
// // 回复bye
// try {
// cmderFroPlatform.streamByeCmd(platform, callIdHeader.getCallId());
// } catch (SipException | InvalidArgumentException | ParseException e) {
// logger.error("[命令发送失败] 国标级联 发送BYE: {}", e.getMessage());
// }
// }, 60 * 1000);
// responseSdpAck(request, content.toString(), platform);
// // tcp主动模式回复sdp后开启监听
// if (sendRtpItem.isTcpActive()) {
// MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());
// Map<String, Object> param = new HashMap<>(12);
// param.put("vhost","__defaultVhost__");
// param.put("app",sendRtpItem.getApp());
// param.put("stream",sendRtpItem.getStreamId());
// param.put("ssrc", sendRtpItem.getSsrc());
// if (!sendRtpItem.isTcpActive()) {
// param.put("dst_url",sendRtpItem.getIp());
// param.put("dst_port", sendRtpItem.getPort());
// }
// String is_Udp = sendRtpItem.isTcp() ? "0" : "1";
// param.put("is_udp", is_Udp);
// param.put("src_port", localPort);
// param.put("pt", sendRtpItem.getPt());
// param.put("use_ps", sendRtpItem.isUsePs() ? "1" : "0");
// param.put("only_audio", sendRtpItem.isOnlyAudio() ? "1" : "0");
// if (!sendRtpItem.isTcp()) {
// // 开启rtcp保活
// param.put("udp_rtcp_timeout", sendRtpItem.isRtcp()? "1":"0");
// }
// JSONObject startSendRtpStreamResult = zlmServerFactory.startSendRtpStreamForPassive(mediaInfo, param);
// if (startSendRtpStreamResult != null) {
// startSendRtpStreamHand(evt, sendRtpItem, null, startSendRtpStreamResult, param, callIdHeader);
// }
// }
// } catch (SipException | InvalidArgumentException | ParseException e) {
// logger.error("[命令发送失败] 国标级联 回复SdpAck", e);
// }
// };
// ErrorCallback<Object> errorEvent = ((statusCode, msg, data) -> {
// // 未知错误。直接转发设备点播的错误
// try {
// if (statusCode > 0) {
// Response response = getMessageFactory().createResponse(statusCode, evt.getRequest());
// sipSender.transmitRequest(request.getLocalAddress().getHostAddress(), response);
// }
// } catch (ParseException | SipException e) {
// logger.error("未处理的异常 ", e);
// }
// });
// sendRtpItem.setApp("rtp");
// if ("Playback".equalsIgnoreCase(sessionName)) {
// sendRtpItem.setPlayType(InviteStreamType.PLAYBACK);
// String startTimeStr = DateUtil.urlFormatter.format(start);
// String endTimeStr = DateUtil.urlFormatter.format(end);
// String stream = device.getDeviceId() + "_" + channelId + "_" + startTimeStr + "_" + endTimeStr;
// SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, stream, null, device.isSsrcCheck(), true, 0, false, device.getStreamModeForParam());
// sendRtpItem.setStreamId(ssrcInfo.getStream());
// // 写入redis 超时时回复
// redisCatchStorage.updateSendRTPSever(sendRtpItem);
// playService.playBack(mediaServerItem, ssrcInfo, device.getDeviceId(), channelId, DateUtil.formatter.format(start),
// DateUtil.formatter.format(end),
// (code, msg, data) -> {
// if (code == InviteErrorCode.SUCCESS.getCode()){
// hookEvent.run(code, msg, data);
// }else if (code == InviteErrorCode.ERROR_FOR_SIGNALLING_TIMEOUT.getCode() || code == InviteErrorCode.ERROR_FOR_STREAM_TIMEOUT.getCode()){
// logger.info("[录像回放]超时, 用户:{} 通道:{}", username, channelId);
// redisCatchStorage.deleteSendRTPServer(platform.getServerGBId(), channelId, callIdHeader.getCallId(), null);
// errorEvent.run(code, msg, data);
// }else {
// errorEvent.run(code, msg, data);
// }
// });
// } else if ("Download".equalsIgnoreCase(sessionName)) {
// // 获取指定的下载速度
// Vector sdpMediaDescriptions = sdp.getMediaDescriptions(true);
// MediaDescription mediaDescription = null;
// String downloadSpeed = "1";
// if (sdpMediaDescriptions.size() > 0) {
// mediaDescription = (MediaDescription) sdpMediaDescriptions.get(0);
// }
// if (mediaDescription != null) {
// downloadSpeed = mediaDescription.getAttribute("downloadspeed");
// }
//
// sendRtpItem.setPlayType(InviteStreamType.DOWNLOAD);
// SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, null, null, device.isSsrcCheck(), true, 0, false, device.getStreamModeForParam());
// sendRtpItem.setStreamId(ssrcInfo.getStream());
// // 写入redis 超时时回复
// redisCatchStorage.updateSendRTPSever(sendRtpItem);
// playService.download(mediaServerItem, ssrcInfo, device.getDeviceId(), channelId, DateUtil.formatter.format(start),
// DateUtil.formatter.format(end), Integer.parseInt(downloadSpeed),
// (code, msg, data) -> {
// if (code == InviteErrorCode.SUCCESS.getCode()) {
// hookEvent.run(code, msg, data);
// } else if (code == InviteErrorCode.ERROR_FOR_SIGNALLING_TIMEOUT.getCode() || code == InviteErrorCode.ERROR_FOR_STREAM_TIMEOUT.getCode()) {
// logger.info("[录像下载]超时, 用户:{} 通道:{}", username, channelId);
// redisCatchStorage.deleteSendRTPServer(platform.getServerGBId(), channelId, callIdHeader.getCallId(), null);
// errorEvent.run(code, msg, data);
// } else {
// errorEvent.run(code, msg, data);
// }
// });
// } else {
//
// SSRCInfo ssrcInfo = playService.play(mediaServerItem, device.getDeviceId(), channelId, ssrc, ((code, msg, data) -> {
// if (code == InviteErrorCode.SUCCESS.getCode()) {
// hookEvent.run(code, msg, data);
// } else if (code == InviteErrorCode.ERROR_FOR_SIGNALLING_TIMEOUT.getCode() || code == InviteErrorCode.ERROR_FOR_STREAM_TIMEOUT.getCode()) {
// logger.info("[上级点播]超时, 用户:{} 通道:{}", username, channelId);
// redisCatchStorage.deleteSendRTPServer(platform.getServerGBId(), channelId, callIdHeader.getCallId(), null);
// errorEvent.run(code, msg, data);
// } else {
// errorEvent.run(code, msg, data);
// }
// }));
// sendRtpItem.setPlayType(InviteStreamType.PLAY);
// String streamId = String.format("%s_%s", device.getDeviceId(), channelId);
// sendRtpItem.setStreamId(streamId);
// sendRtpItem.setSsrc(ssrcInfo.getSsrc());
// redisCatchStorage.updateSendRTPSever(sendRtpItem);
//
// }
// } else if (gbStream != null) {
//
// String ssrc;
// if (userSetting.getUseCustomSsrcForParentInvite() || gb28181Sdp.getSsrc() == null) {
// // 上级平台点播时不使用上级平台指定的ssrc使用自定义的ssrc参考国标文档-点播外域设备媒体流SSRC处理方式
// ssrc = "Play".equalsIgnoreCase(sessionName) ? ssrcFactory.getPlaySsrc(mediaServerItem.getId()) : ssrcFactory.getPlayBackSsrc(mediaServerItem.getId());
// }else {
// ssrc = gb28181Sdp.getSsrc();
// }
//
// if("push".equals(gbStream.getStreamType())) {
// if (streamPushItem != null && streamPushItem.isPushIng()) {
// // 推流状态
// pushStream(evt, request, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive,
// mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId);
// } else {
// // 未推流 拉起
// notifyStreamOnline(evt, request,gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive,
// mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId);
// }
// }else if ("proxy".equals(gbStream.getStreamType())){
// if (null != proxyByAppAndStream) {
// if(proxyByAppAndStream.isStatus()){
// pushProxyStream(evt, request, gbStream, platform, callIdHeader, mediaServerItem, port, tcpActive,
// mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId);
// }else{
// //开启代理拉流
// notifyStreamOnline(evt, request,gbStream, null, platform, callIdHeader, mediaServerItem, port, tcpActive,
// mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId);
// }
// }
//
//
// }
// }
}
} catch (SdpParseException e) {
logger.error("sdp解析错误", e);
@ -587,7 +406,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
if (response != null) {
sendRtpItem.setToTag(response.getToTag());
}
redisCatchStorage.updateSendRTPSever(sendRtpItem);
streamSendManager.update(sendRtpItem);
}
}
private void pushStream(RequestEvent evt, SIPRequest request, GbStream gbStream, StreamPush streamPushItem, ParentPlatform platform,
@ -625,7 +444,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
sendRtpItem.setToTag(response.getToTag());
}
redisCatchStorage.updateSendRTPSever(sendRtpItem);
streamSendManager.update(sendRtpItem);
} else {
// 不在线 拉起
@ -740,7 +559,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
if (response != null) {
sendRtpItem.setToTag(response.getToTag());
}
redisCatchStorage.updateSendRTPSever(sendRtpItem);
streamSendManager.update(sendRtpItem);
} else {
// 其他平台内容
otherWvpPushStream(evt, request, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive,
@ -803,7 +622,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
if (response != null) {
sendRtpItem.setToTag(response.getToTag());
}
redisCatchStorage.updateSendRTPSever(sendRtpItem);
streamSendManager.update(sendRtpItem);
}, (wvpResult) -> {
// 错误

View File

@ -10,6 +10,7 @@ import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.ISIPRequestProcessor;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent;
import com.genersoft.iot.vmp.gb28181.utils.SipUtils;
import com.genersoft.iot.vmp.media.zlm.IStreamSendManager;
import com.genersoft.iot.vmp.service.IInviteStreamService;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
@ -47,6 +48,9 @@ public class InfoRequestProcessor extends SIPRequestProcessorParent implements I
@Autowired
private IRedisCatchStorage redisCatchStorage;
@Autowired
private IStreamSendManager streamSendManager;
@Autowired
private IInviteStreamService inviteStreamService;
@ -108,7 +112,8 @@ public class InfoRequestProcessor extends SIPRequestProcessorParent implements I
String contentType = header.getContentType();
String contentSubType = header.getContentSubType();
if ("Application".equalsIgnoreCase(contentType) && "MANSRTSP".equalsIgnoreCase(contentSubType)) {
SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(null, null, null, callIdHeader.getCallId());
SendRtpItem sendRtpItem = streamSendManager.getByCallId(callIdHeader.getCallId());
String streamId = sendRtpItem.getStreamId();
InviteInfo inviteInfo = inviteStreamService.getInviteInfoByStream(InviteSessionType.PLAYBACK, streamId);
if (null == inviteInfo) {

View File

@ -13,6 +13,7 @@ import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommanderFroPlatform;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.IMessageHandler;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.notify.NotifyMessageHandler;
import com.genersoft.iot.vmp.media.zlm.IStreamSendManager;
import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe;
import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory;
import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange;
@ -33,6 +34,7 @@ import javax.sip.SipException;
import javax.sip.header.CallIdHeader;
import javax.sip.message.Response;
import java.text.ParseException;
import java.util.List;
import static com.genersoft.iot.vmp.gb28181.utils.XmlUtil.getText;
@ -57,6 +59,9 @@ public class MediaStatusNotifyMessageHandler extends SIPRequestProcessorParent i
@Autowired
private IRedisCatchStorage redisCatchStorage;
@Autowired
private IStreamSendManager streamSendManager;
@Autowired
private IVideoManagerStorage storage;
@ -110,7 +115,7 @@ public class MediaStatusNotifyMessageHandler extends SIPRequestProcessorParent i
subscribe.removeSubscribe(hookSubscribe);
// 如果级联播放,需要给上级发送此通知 TODO 多个上级同时观看一个下级 可能存在停错的问题需要将点播CallId进行上下级绑定
SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(null, ssrcTransaction.getChannelId(), null, null);
SendRtpItem sendRtpItem = streamSendManager.getByCallId(ssrcTransaction.getCallId());
if (sendRtpItem != null) {
ParentPlatform parentPlatform = storage.queryParentPlatByServerGBId(sendRtpItem.getDestId());
if (parentPlatform == null) {

View File

@ -7,10 +7,10 @@ import java.util.List;
public interface IStreamSendManager {
void add(SendRtpItem sendRtpItem);
void update(SendRtpItem sendRtpItem);
List<SendRtpItem> getAll();
SendRtpItem getByCallId(String callId);
List<SendRtpItem> getByAppAndStream(String app, String stream);
@ -27,5 +27,8 @@ public interface IStreamSendManager {
void removeByCallID(String id);
void remove(SendRtpItem sendRtpItem);
void remove(List<SendRtpItem> sendRtpItemList);
}

View File

@ -73,6 +73,9 @@ public class ZLMHttpHookListener {
@Autowired
private IRedisCatchStorage redisCatchStorage;
@Autowired
private IStreamSendManager streamSendManager;
@Autowired
private IInviteStreamService inviteStreamService;
@ -441,8 +444,8 @@ public class ZLMHttpHookListener {
}
}
if (!param.isRegist()) {
List<SendRtpItem> sendRtpItems = redisCatchStorage.querySendRTPServerByStream(param.getStream());
if (sendRtpItems.size() > 0) {
List<SendRtpItem> sendRtpItems = streamSendManager.getByAppAndStream(param.getApp(), param.getStream());
if (!sendRtpItems.isEmpty()) {
for (SendRtpItem sendRtpItem : sendRtpItems) {
if (sendRtpItem != null && sendRtpItem.getApp().equals(param.getApp())) {
String platformId = sendRtpItem.getDestId();
@ -452,8 +455,7 @@ public class ZLMHttpHookListener {
try {
if (platform != null) {
commanderFroPlatform.streamByeCmd(platform, sendRtpItem);
redisCatchStorage.deleteSendRTPServer(platformId, sendRtpItem.getChannelId(),
sendRtpItem.getCallId(), sendRtpItem.getStreamId());
streamSendManager.remove(sendRtpItem);
} else {
cmder.streamByeCmd(device, sendRtpItem.getChannelId(), param.getStream(), sendRtpItem.getCallId());
}
@ -495,10 +497,8 @@ public class ZLMHttpHookListener {
return ret;
}
// 收到无人观看说明流也没有在往上级推送
if (redisCatchStorage.isChannelSendingRTP(inviteInfo.getChannelId())) {
List<SendRtpItem> sendRtpItems = redisCatchStorage.querySendRTPServerByChnnelId(
inviteInfo.getChannelId());
if (sendRtpItems.size() > 0) {
List<SendRtpItem> sendRtpItems = streamSendManager.getByByChanelId(inviteInfo.getChannelId());
if (!sendRtpItems.isEmpty()) {
for (SendRtpItem sendRtpItem : sendRtpItems) {
ParentPlatform parentPlatform = storager.queryParentPlatByServerGBId(sendRtpItem.getDestId());
try {
@ -506,8 +506,7 @@ public class ZLMHttpHookListener {
} catch (SipException | InvalidArgumentException | ParseException e) {
logger.error("[命令发送失败] 国标级联 发送BYE: {}", e.getMessage());
}
redisCatchStorage.deleteSendRTPServer(parentPlatform.getServerGBId(), sendRtpItem.getChannelId(),
sendRtpItem.getCallId(), sendRtpItem.getStreamId());
streamSendManager.remove(sendRtpItem);
if (InviteStreamType.PUSH == sendRtpItem.getPlayType()) {
MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(0,
sendRtpItem.getApp(), sendRtpItem.getStreamId(), sendRtpItem.getChannelId(),
@ -517,7 +516,6 @@ public class ZLMHttpHookListener {
}
}
}
}
Device device = deviceService.getDevice(inviteInfo.getDeviceId());
if (device != null) {
try {
@ -726,8 +724,8 @@ public class ZLMHttpHookListener {
return HookResult.SUCCESS();
}
taskExecutor.execute(() -> {
List<SendRtpItem> sendRtpItems = redisCatchStorage.querySendRTPServerByStream(param.getStream());
if (sendRtpItems.size() > 0) {
List<SendRtpItem> sendRtpItems = streamSendManager.getByAppAndStream(param.getApp(), param.getStream());
if (!sendRtpItems.isEmpty()) {
for (SendRtpItem sendRtpItem : sendRtpItems) {
ParentPlatform parentPlatform = storager.queryParentPlatByServerGBId(sendRtpItem.getDestId());
ssrcFactory.releaseSsrc(sendRtpItem.getMediaServerId(), sendRtpItem.getSsrc());
@ -736,8 +734,7 @@ public class ZLMHttpHookListener {
} catch (SipException | InvalidArgumentException | ParseException e) {
logger.error("[命令发送失败] 国标级联 发送BYE: {}", e.getMessage());
}
redisCatchStorage.deleteSendRTPServer(parentPlatform.getServerGBId(), sendRtpItem.getChannelId(),
sendRtpItem.getCallId(), sendRtpItem.getStreamId());
streamSendManager.remove(sendRtpItem);
}
}
});

View File

@ -1,12 +1,15 @@
package com.genersoft.iot.vmp.media.zlm.impl;
import com.genersoft.iot.vmp.common.VideoManagerConstants;
import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
import com.genersoft.iot.vmp.media.zlm.IStreamSendManager;
import com.genersoft.iot.vmp.utils.redis.RedisUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import org.springframework.util.ObjectUtils;
import java.util.ArrayList;
import java.util.List;
@ -19,19 +22,12 @@ import java.util.UUID;
@Component
public class StreamSendManagerImpl implements IStreamSendManager {
private Logger logger = LoggerFactory.getLogger("StreamSendManagerImpl");
private final static String datePrefix = "VMP_SEND_STREAM:DATA:";
private final static String queryPrefix = "VMP_SEND_STREAM:QUERY:";
@Autowired
private RedisTemplate<Object, Object> redisTemplate;
@Override
public void add(SendRtpItem sendRtpItem) {
}
@Override
public void update(SendRtpItem sendRtpItem) {
if (sendRtpItem.getId() != null) {
@ -84,6 +80,20 @@ public class StreamSendManagerImpl implements IStreamSendManager {
return queryPrefix + "CHANNEL:" + channelId;
}
@Override
public List<SendRtpItem> getAll() {
String key = datePrefix + "_*_";
List<Object> scan = RedisUtil.scan(redisTemplate, key);
List<SendRtpItem> result = new ArrayList<>();
if (!scan.isEmpty()) {
for (Object keyStr : scan) {
SendRtpItem sendRtpItem = (SendRtpItem)redisTemplate.opsForValue().get(keyStr);
result.add(sendRtpItem);
}
}
return result;
}
@Override
public SendRtpItem getByCallId(String callId) {
String dateId = (String) redisTemplate.opsForValue().get(getCallIdKey(callId));
@ -135,6 +145,29 @@ public class StreamSendManagerImpl implements IStreamSendManager {
return getSendRtpItems(dateIds);
}
@Override
public void remove(SendRtpItem sendRtpItem) {
redisTemplate.delete(datePrefix);
if (sendRtpItem.getCallId() != null) {
redisTemplate.delete(getCallIdKey(sendRtpItem.getCallId()));
}
if (sendRtpItem.getApp() != null && sendRtpItem.getStreamId() != null) {
redisTemplate.opsForSet().remove(getAppAndStreamKey(sendRtpItem.getApp(), sendRtpItem.getStreamId()));
}
if (sendRtpItem.getMediaServerId() != null) {
redisTemplate.opsForSet().remove(getMediaServerIdKey(sendRtpItem.getMediaServerId()));
}
if (sendRtpItem.getDestId() != null) {
redisTemplate.opsForSet().remove(getDestIdKey(sendRtpItem.getDestId()));
}
if (sendRtpItem.getSourceId() != null) {
redisTemplate.opsForSet().remove(getSourceIdKey(sendRtpItem.getSourceId()));
}
if (sendRtpItem.getChannelId() != null) {
redisTemplate.opsForSet().remove(getChannelIdKey(sendRtpItem.getChannelId()));
}
}
@Override
public void remove(String id) {
if (id == null) {
@ -144,11 +177,28 @@ public class StreamSendManagerImpl implements IStreamSendManager {
if (sendRtpItem == null) {
return;
}
remove(sendRtpItem);
}
@Override
public void remove(List<SendRtpItem> sendRtpItemList) {
if (sendRtpItemList == null || sendRtpItemList.isEmpty()) {
return;
}
for (SendRtpItem sendRtpItem : sendRtpItemList) {
remove(sendRtpItem);
}
}
@Override
public void removeByCallID(String callId) {
if (ObjectUtils.isEmpty(callId)) {
return;
}
SendRtpItem sendRtpItem = getByCallId(callId);
if (sendRtpItem == null) {
return;
}
remove(sendRtpItem);
}
}

View File

@ -11,6 +11,7 @@ import com.genersoft.iot.vmp.gb28181.event.SipSubscribe;
import com.genersoft.iot.vmp.gb28181.session.SSRCFactory;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommanderFroPlatform;
import com.genersoft.iot.vmp.gb28181.utils.SipUtils;
import com.genersoft.iot.vmp.media.zlm.IStreamSendManager;
import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.service.IMediaServerService;
@ -63,6 +64,9 @@ public class PlatformServiceImpl implements IPlatformService {
@Autowired
private IRedisCatchStorage redisCatchStorage;
@Autowired
private IStreamSendManager streamSendManager;
@Autowired
private SSRCFactory ssrcFactory;
@ -360,11 +364,12 @@ public class PlatformServiceImpl implements IPlatformService {
}
private void stopAllPush(String platformId) {
List<SendRtpItem> sendRtpItems = redisCatchStorage.querySendRTPServer(platformId);
if (sendRtpItems != null && sendRtpItems.size() > 0) {
List<SendRtpItem> sendRtpItems = streamSendManager.getByDestId(platformId);
if (sendRtpItems != null && !sendRtpItems.isEmpty()) {
for (SendRtpItem sendRtpItem : sendRtpItems) {
ssrcFactory.releaseSsrc(sendRtpItem.getMediaServerId(), sendRtpItem.getSsrc());
redisCatchStorage.deleteSendRTPServer(platformId, sendRtpItem.getChannelId(), null, null);
streamSendManager.remove(sendRtpItem);
MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());
Map<String, Object> param = new HashMap<>(3);
param.put("vhost", "__defaultVhost__");

View File

@ -18,6 +18,7 @@ import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommanderFroPlatform;
import com.genersoft.iot.vmp.gb28181.utils.SipUtils;
import com.genersoft.iot.vmp.media.zlm.IStreamSendManager;
import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils;
import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory;
import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe;
@ -72,6 +73,9 @@ public class PlayServiceImpl implements IPlayService {
@Autowired
private IRedisCatchStorage redisCatchStorage;
@Autowired
private IStreamSendManager streamSendManager;
@Autowired
private IInviteStreamService inviteStreamService;
@ -882,8 +886,8 @@ public class PlayServiceImpl implements IPlayService {
@Override
public void zlmServerOffline(String mediaServerId) {
// 处理正在向上推流的上级平台
List<SendRtpItem> sendRtpItems = redisCatchStorage.querySendRTPServer(null);
if (sendRtpItems.size() > 0) {
List<SendRtpItem> sendRtpItems = streamSendManager.getByMediaServerId(mediaServerId);
if (!sendRtpItems.isEmpty()) {
for (SendRtpItem sendRtpItem : sendRtpItems) {
if (sendRtpItem.getMediaServerId().equals(mediaServerId)) {
ParentPlatform platform = storager.queryParentPlatByServerGBId(sendRtpItem.getDestId());

View File

@ -6,6 +6,7 @@ import com.genersoft.iot.vmp.common.VideoManagerConstants;
import com.genersoft.iot.vmp.conf.DynamicTask;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
import com.genersoft.iot.vmp.media.zlm.IStreamSendManager;
import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory;
import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe;
import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory;
@ -79,7 +80,7 @@ public class RedisGbPlayMsgListener implements MessageListener {
private IMediaServerService mediaServerService;
@Autowired
private IRedisCatchStorage redisCatchStorage;
private IStreamSendManager streamSendManager;
@Autowired
@ -326,7 +327,7 @@ public class RedisGbPlayMsgListener implements MessageListener {
responseSendItemMsg.setSendRtpItem(sendRtpItem);
responseSendItemMsg.setMediaServerItem(mediaServerItem);
result.setData(responseSendItemMsg);
redisCatchStorage.updateSendRTPSever(sendRtpItem);
streamSendManager.update(sendRtpItem);
WvpRedisMsg response = WvpRedisMsg.getResponseInstance(
userSetting.getServerId(), toId, WvpRedisMsgCmd.GET_SEND_ITEM, serial, JSON.toJSONString(result)

View File

@ -6,6 +6,7 @@ import com.genersoft.iot.vmp.gb28181.bean.InviteStreamType;
import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform;
import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform;
import com.genersoft.iot.vmp.media.zlm.IStreamSendManager;
import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.media.zlm.dto.StreamPush;
@ -45,6 +46,9 @@ public class RedisPushStreamCloseResponseListener implements MessageListener {
@Autowired
private IRedisCatchStorage redisCatchStorage;
@Autowired
private IStreamSendManager streamSendManager;
@Autowired
private IVideoManagerStorage storager;
@ -73,13 +77,12 @@ public class RedisPushStreamCloseResponseListener implements MessageListener {
MessageForPushChannel pushChannel = JSON.parseObject(message.getBody(), MessageForPushChannel.class);
StreamPush push = streamPushService.getPush(pushChannel.getApp(), pushChannel.getStream());
if (push != null) {
List<SendRtpItem> sendRtpItems = redisCatchStorage.querySendRTPServerByChnnelId(
push.getGbId());
List<SendRtpItem> sendRtpItems = streamSendManager.getByAppAndStream(pushChannel.getApp(), pushChannel.getStream());
if (!sendRtpItems.isEmpty()) {
for (SendRtpItem sendRtpItem : sendRtpItems) {
ParentPlatform parentPlatform = storager.queryParentPlatByServerGBId(sendRtpItem.getDestId());
if (parentPlatform != null) {
redisCatchStorage.deleteSendRTPServer(sendRtpItem.getDestId(), sendRtpItem.getChannelId(), sendRtpItem.getCallId(), sendRtpItem.getStreamId());
streamSendManager.remove(sendRtpItem);
try {
commanderFroPlatform.streamByeCmd(parentPlatform, sendRtpItem);
} catch (SipException | InvalidArgumentException | ParseException e) {
@ -96,7 +99,7 @@ public class RedisPushStreamCloseResponseListener implements MessageListener {
param.put("ssrc",sendRtpItem.getSsrc());
logger.info("[REDIS消息-推流结束] 停止向上级推流:{}", streamId);
MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());
redisCatchStorage.deleteSendRTPServer(sendRtpItem.getDestId(), sendRtpItem.getChannelId(), sendRtpItem.getCallId(), sendRtpItem.getStreamId());
streamSendManager.remove(sendRtpItem);
zlmServerFactory.stopSendRtpStream(mediaInfo, param);
if (InviteStreamType.PUSH == sendRtpItem.getPlayType()) {
MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(0,

View File

@ -41,31 +41,6 @@ public interface IRedisCatchStorage {
void delPlatformRegisterInfo(String callId);
void updateSendRTPSever(SendRtpItem sendRtpItem);
/**
* RTP
* @param platformGbId
* @param channelId
* @return sendRtpItem
*/
SendRtpItem querySendRTPServer(String platformGbId, String channelId, String streamId, String callId);
List<SendRtpItem> querySendRTPServer(String platformGbId);
/**
* RTP
* @param platformGbId
* @param channelId
*/
void deleteSendRTPServer(String platformGbId, String channelId, String callId, String streamId);
/**
* RTP
* @param channelId
*/
boolean isChannelSendingRTP(String channelId);
/**
* rediswvp
*/
@ -183,10 +158,6 @@ public interface IRedisCatchStorage {
*/
void sendStreamPushRequestedMsgForStatus();
List<SendRtpItem> querySendRTPServerByChnnelId(String channelId);
List<SendRtpItem> querySendRTPServerByStream(String stream);
SystemAllInfo getSystemInfo();
int getPushStreamCount(String id);
@ -197,8 +168,6 @@ public interface IRedisCatchStorage {
void addDiskInfo(List<Map<String, Object>> diskInfo);
List<SendRtpItem> queryAllSendRTPServer();
List<Device> getAllDevices();
void removeAllDevice();

View File

@ -142,150 +142,6 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage {
redisTemplate.delete(VideoManagerConstants.PLATFORM_REGISTER_INFO_PREFIX + userSetting.getServerId() + "_" + callId);
}
@Override
public void updateSendRTPSever(SendRtpItem sendRtpItem) {
String key = VideoManagerConstants.PLATFORM_SEND_RTP_INFO_PREFIX +
userSetting.getServerId() + "_"
+ sendRtpItem.getMediaServerId() + "_"
+ sendRtpItem.getDestId() + "_"
+ sendRtpItem.getChannelId() + "_"
+ sendRtpItem.getStreamId() + "_"
+ sendRtpItem.getCallId();
redisTemplate.opsForValue().set(key, sendRtpItem);
}
@Override
public SendRtpItem querySendRTPServer(String platformGbId, String channelId, String streamId, String callId) {
if (platformGbId == null) {
platformGbId = "*";
}
if (channelId == null) {
channelId = "*";
}
if (streamId == null) {
streamId = "*";
}
if (callId == null) {
callId = "*";
}
String key = VideoManagerConstants.PLATFORM_SEND_RTP_INFO_PREFIX
+ userSetting.getServerId() + "_*_"
+ platformGbId + "_"
+ channelId + "_"
+ streamId + "_"
+ callId;
List<Object> scan = RedisUtil.scan(redisTemplate, key);
if (scan.size() > 0) {
return (SendRtpItem)redisTemplate.opsForValue().get(scan.get(0));
}else {
return null;
}
}
@Override
public List<SendRtpItem> querySendRTPServerByChnnelId(String channelId) {
if (channelId == null) {
return null;
}
String platformGbId = "*";
String callId = "*";
String streamId = "*";
String key = VideoManagerConstants.PLATFORM_SEND_RTP_INFO_PREFIX
+ userSetting.getServerId() + "_*_"
+ platformGbId + "_"
+ channelId + "_"
+ streamId + "_"
+ callId;
List<Object> scan = RedisUtil.scan(redisTemplate, key);
List<SendRtpItem> result = new ArrayList<>();
for (Object o : scan) {
result.add((SendRtpItem) redisTemplate.opsForValue().get(o));
}
return result;
}
@Override
public List<SendRtpItem> querySendRTPServerByStream(String stream) {
if (stream == null) {
return null;
}
String platformGbId = "*";
String callId = "*";
String channelId = "*";
String key = VideoManagerConstants.PLATFORM_SEND_RTP_INFO_PREFIX
+ userSetting.getServerId() + "_*_"
+ platformGbId + "_"
+ channelId + "_"
+ stream + "_"
+ callId;
List<Object> scan = RedisUtil.scan(redisTemplate, key);
List<SendRtpItem> result = new ArrayList<>();
for (Object o : scan) {
result.add((SendRtpItem) redisTemplate.opsForValue().get(o));
}
return result;
}
@Override
public List<SendRtpItem> querySendRTPServer(String platformGbId) {
if (platformGbId == null) {
platformGbId = "*";
}
String key = VideoManagerConstants.PLATFORM_SEND_RTP_INFO_PREFIX
+ userSetting.getServerId() + "_*_"
+ platformGbId + "_*" + "_*" + "_*";
List<Object> queryResult = RedisUtil.scan(redisTemplate, key);
List<SendRtpItem> result= new ArrayList<>();
for (Object o : queryResult) {
String keyItem = (String) o;
result.add((SendRtpItem) redisTemplate.opsForValue().get(keyItem));
}
return result;
}
/**
* RTP
*/
@Override
public void deleteSendRTPServer(String platformGbId, String channelId, String callId, String streamId) {
if (streamId == null) {
streamId = "*";
}
if (callId == null) {
callId = "*";
}
String key = VideoManagerConstants.PLATFORM_SEND_RTP_INFO_PREFIX
+ userSetting.getServerId() + "_*_"
+ platformGbId + "_"
+ channelId + "_"
+ streamId + "_"
+ callId;
List<Object> scan = RedisUtil.scan(redisTemplate, key);
if (scan.size() > 0) {
for (Object keyStr : scan) {
redisTemplate.delete(keyStr);
}
}
}
@Override
public List<SendRtpItem> queryAllSendRTPServer() {
String key = VideoManagerConstants.PLATFORM_SEND_RTP_INFO_PREFIX
+ userSetting.getServerId() + "_*";
List<Object> queryResult = RedisUtil.scan(redisTemplate, key);
List<SendRtpItem> result= new ArrayList<>();
for (Object o : queryResult) {
String keyItem = (String) o;
result.add((SendRtpItem) redisTemplate.opsForValue().get(keyItem));
}
return result;
}
/**
* RTP
*/