Merge branch 'refs/heads/master' into develop-add-api-key

pull/1389/head
leesam 2024-04-10 22:18:40 +08:00
commit ce950dea4a
19 changed files with 207 additions and 226 deletions

View File

@ -1,5 +1,7 @@
package com.genersoft.iot.vmp.gb28181.bean;
import com.genersoft.iot.vmp.service.bean.RequestPushStreamMsg;
public class SendRtpItem {
/**
@ -122,6 +124,23 @@ public class SendRtpItem {
*/
private String receiveStream;
public static SendRtpItem getInstance(RequestPushStreamMsg requestPushStreamMsg) {
SendRtpItem sendRtpItem = new SendRtpItem();
sendRtpItem.setMediaServerId(requestPushStreamMsg.getMediaServerId());
sendRtpItem.setApp(requestPushStreamMsg.getApp());
sendRtpItem.setStream(requestPushStreamMsg.getStream());
sendRtpItem.setIp(requestPushStreamMsg.getIp());
sendRtpItem.setPort(requestPushStreamMsg.getPort());
sendRtpItem.setSsrc(requestPushStreamMsg.getSsrc());
sendRtpItem.setTcp(requestPushStreamMsg.isTcp());
sendRtpItem.setLocalPort(requestPushStreamMsg.getSrcPort());
sendRtpItem.setPt(requestPushStreamMsg.getPt());
sendRtpItem.setUsePs(requestPushStreamMsg.isPs());
sendRtpItem.setOnlyAudio(requestPushStreamMsg.isOnlyAudio());
return sendRtpItem;
}
public String getIp() {
return ip;
}

View File

@ -75,8 +75,6 @@ public class SIPCommander implements ISIPCommander {
@Autowired
private IMediaServerService mediaServerService;
@Autowired
private ZLMServerFactory zlmServerFactory;
/**

View File

@ -13,11 +13,10 @@ import com.genersoft.iot.vmp.gb28181.transmit.SIPSender;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.SIPRequestHeaderPlarformProvider;
import com.genersoft.iot.vmp.gb28181.utils.SipUtils;
import com.genersoft.iot.vmp.media.bean.MediaServer;
import com.genersoft.iot.vmp.media.event.hook.Hook;
import com.genersoft.iot.vmp.media.event.hook.HookSubscribe;
import com.genersoft.iot.vmp.media.event.hook.HookType;
import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory;
import com.genersoft.iot.vmp.media.bean.MediaServer;
import com.genersoft.iot.vmp.media.service.IMediaServerService;
import com.genersoft.iot.vmp.service.bean.GPSMsgInfo;
import com.genersoft.iot.vmp.service.bean.SSRCInfo;
@ -65,9 +64,6 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform {
@Autowired
private SipSubscribe sipSubscribe;
@Autowired
private ZLMServerFactory zlmServerFactory;
@Autowired
private SipLayer sipLayer;
@ -846,7 +842,7 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform {
MediaServer mediaServerItem = mediaServerService.getOne(mediaServerId);
if (mediaServerItem != null) {
mediaServerService.releaseSsrc(mediaServerItem.getId(), sendRtpItem.getSsrc());
zlmServerFactory.closeRtpServer(mediaServerItem, sendRtpItem.getStream());
mediaServerService.closeRTPServer(mediaServerItem, sendRtpItem.getStream());
}
SIPRequest byeRequest = headerProviderPlatformProvider.createByeRequest(platform, sendRtpItem);
if (byeRequest == null) {

View File

@ -1,22 +1,17 @@
package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl;
import com.alibaba.fastjson2.JSONObject;
import com.genersoft.iot.vmp.conf.DynamicTask;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.conf.exception.ControllerException;
import com.genersoft.iot.vmp.conf.exception.SsrcTransactionNotFoundException;
import com.genersoft.iot.vmp.gb28181.bean.AudioBroadcastCatch;
import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform;
import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.ISIPRequestProcessor;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent;
import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory;
import com.genersoft.iot.vmp.media.event.hook.HookSubscribe;
import com.genersoft.iot.vmp.media.bean.MediaServer;
import com.genersoft.iot.vmp.service.IDeviceService;
import com.genersoft.iot.vmp.media.service.IMediaServerService;
import com.genersoft.iot.vmp.service.IDeviceService;
import com.genersoft.iot.vmp.service.IPlayService;
import com.genersoft.iot.vmp.service.bean.RequestPushStreamMsg;
import com.genersoft.iot.vmp.service.redisMsg.RedisGbPlayMsgListener;
@ -28,17 +23,12 @@ import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.sip.InvalidArgumentException;
import javax.sip.RequestEvent;
import javax.sip.SipException;
import javax.sip.address.SipURI;
import javax.sip.header.CallIdHeader;
import javax.sip.header.FromHeader;
import javax.sip.header.HeaderAddress;
import javax.sip.header.ToHeader;
import java.text.ParseException;
import java.util.HashMap;
import java.util.Map;
/**
* SIP ACK
@ -71,12 +61,6 @@ public class AckRequestProcessor extends SIPRequestProcessorParent implements In
@Autowired
private IDeviceService deviceService;
@Autowired
private ZLMServerFactory zlmServerFactory;
@Autowired
private HookSubscribe hookSubscribe;
@Autowired
private IMediaServerService mediaServerService;
@ -122,11 +106,8 @@ public class AckRequestProcessor extends SIPRequestProcessorParent implements In
if (parentPlatform != null) {
if (!userSetting.getServerId().equals(sendRtpItem.getServerId())) {
RequestPushStreamMsg requestPushStreamMsg = RequestPushStreamMsg.getInstance(
sendRtpItem.getMediaServerId(), sendRtpItem.getApp(), sendRtpItem.getStream(),
sendRtpItem.getIp(), sendRtpItem.getPort(), sendRtpItem.getSsrc(), sendRtpItem.isTcp(),
sendRtpItem.getLocalPort(), sendRtpItem.getPt(), sendRtpItem.isUsePs(), sendRtpItem.isOnlyAudio());
redisGbPlayMsgListener.sendMsgForStartSendRtpStream(sendRtpItem.getServerId(), requestPushStreamMsg, json -> {
RequestPushStreamMsg requestPushStreamMsg = RequestPushStreamMsg.getInstance(sendRtpItem);
redisGbPlayMsgListener.sendMsgForStartSendRtpStream(sendRtpItem.getServerId(), requestPushStreamMsg, () -> {
playService.startSendRtpStreamFailHand(sendRtpItem, parentPlatform, callIdHeader);
});
} else {
@ -134,7 +115,7 @@ public class AckRequestProcessor extends SIPRequestProcessorParent implements In
if (sendRtpItem.isTcpActive()) {
mediaServerService.startSendRtpPassive(mediaInfo, parentPlatform, sendRtpItem, null);
} else {
mediaServerService.startSendRtpStream(mediaInfo, parentPlatform, sendRtpItem);
mediaServerService.startSendRtp(mediaInfo, parentPlatform, sendRtpItem);
}
}catch (ControllerException e) {
logger.error("RTP推流失败: {}", e.getMessage());
@ -159,7 +140,7 @@ public class AckRequestProcessor extends SIPRequestProcessorParent implements In
if (sendRtpItem.isTcpActive()) {
mediaServerService.startSendRtpPassive(mediaInfo, null, sendRtpItem, null);
} else {
mediaServerService.startSendRtpStream(mediaInfo, null, sendRtpItem);
mediaServerService.startSendRtp(mediaInfo, null, sendRtpItem);
}
}catch (ControllerException e) {
logger.error("RTP推流失败: {}", e.getMessage());

View File

@ -6,16 +6,15 @@ import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.conf.exception.SsrcTransactionNotFoundException;
import com.genersoft.iot.vmp.gb28181.bean.*;
import com.genersoft.iot.vmp.gb28181.session.AudioBroadcastManager;
import com.genersoft.iot.vmp.gb28181.session.SSRCFactory;
import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager;
import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.ISIPRequestProcessor;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent;
import com.genersoft.iot.vmp.media.service.IMediaServerService;
import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory;
import com.genersoft.iot.vmp.media.bean.MediaInfo;
import com.genersoft.iot.vmp.media.bean.MediaServer;
import com.genersoft.iot.vmp.media.service.IMediaServerService;
import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem;
import com.genersoft.iot.vmp.service.*;
import com.genersoft.iot.vmp.service.bean.MessageForPushChannel;
@ -36,8 +35,6 @@ import javax.sip.SipException;
import javax.sip.header.CallIdHeader;
import javax.sip.message.Response;
import java.text.ParseException;
import java.util.HashMap;
import java.util.Map;
/**
* SIP BYE
@ -75,12 +72,6 @@ public class ByeRequestProcessor extends SIPRequestProcessorParent implements In
@Autowired
private IVideoManagerStorage storager;
@Autowired
private ZLMServerFactory zlmServerFactory;
@Autowired
private SSRCFactory ssrcFactory;
@Autowired
private IMediaServerService mediaServerService;
@ -110,7 +101,6 @@ public class ByeRequestProcessor extends SIPRequestProcessorParent implements In
/**
* BYE
* @param evt
*/
@Override
public void process(RequestEvent evt) {
@ -128,11 +118,6 @@ public class ByeRequestProcessor extends SIPRequestProcessorParent implements In
logger.info("[收到bye] 来自{},停止通道:{}, 类型: {}, callId: {}", sendRtpItem.getPlatformId(), sendRtpItem.getChannelId(), sendRtpItem.getPlayType(), callIdHeader.getCallId());
String streamId = sendRtpItem.getStream();
Map<String, Object> param = new HashMap<>();
param.put("vhost","__defaultVhost__");
param.put("app",sendRtpItem.getApp());
param.put("stream",streamId);
param.put("ssrc",sendRtpItem.getSsrc());
logger.info("[收到bye] 停止推流:{}, 媒体节点: {}", streamId, sendRtpItem.getMediaServerId());
if (sendRtpItem.getPlayType().equals(InviteStreamType.PUSH)) {
@ -149,7 +134,7 @@ public class ByeRequestProcessor extends SIPRequestProcessorParent implements In
MediaServer mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());
redisCatchStorage.deleteSendRTPServer(sendRtpItem.getPlatformId(), sendRtpItem.getChannelId(),
callIdHeader.getCallId(), null);
zlmServerFactory.stopSendRtpStream(mediaInfo, param);
mediaServerService.stopSendRtp(mediaInfo, sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getSsrc());
if (userSetting.getUseCustomSsrcForParentInvite()) {
mediaServerService.releaseSsrc(mediaInfo.getId(), sendRtpItem.getSsrc());
}
@ -169,13 +154,13 @@ public class ByeRequestProcessor extends SIPRequestProcessorParent implements In
MediaServer mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());
redisCatchStorage.deleteSendRTPServer(sendRtpItem.getPlatformId(), sendRtpItem.getChannelId(),
callIdHeader.getCallId(), null);
zlmServerFactory.stopSendRtpStream(mediaInfo, param);
mediaServerService.stopSendRtp(mediaInfo, sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getSsrc());
if (userSetting.getUseCustomSsrcForParentInvite()) {
mediaServerService.releaseSsrc(mediaInfo.getId(), sendRtpItem.getSsrc());
}
}
MediaServer mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());
if (mediaInfo != null) {
MediaServer mediaServer = mediaServerService.getOne(sendRtpItem.getMediaServerId());
if (mediaServer != null) {
AudioBroadcastCatch audioBroadcastCatch = audioBroadcastManager.get(sendRtpItem.getDeviceId(), sendRtpItem.getChannelId());
if (audioBroadcastCatch != null && audioBroadcastCatch.getSipTransactionInfo().getCallId().equals(callIdHeader.getCallId())) {
// 来自上级平台的停止对讲
@ -183,8 +168,9 @@ public class ByeRequestProcessor extends SIPRequestProcessorParent implements In
audioBroadcastManager.del(sendRtpItem.getDeviceId(), sendRtpItem.getChannelId());
}
int totalReaderCount = zlmServerFactory.totalReaderCount(mediaInfo, sendRtpItem.getApp(), streamId);
if (totalReaderCount <= 0) {
MediaInfo mediaInfo = mediaServerService.getMediaInfo(mediaServer, sendRtpItem.getApp(), streamId);
if (mediaInfo.getReaderCount() <= 0) {
logger.info("[收到bye] {} 无其它观看者,通知设备停止推流", streamId);
if (sendRtpItem.getPlayType().equals(InviteStreamType.PLAY)) {
Device device = deviceService.getDevice(sendRtpItem.getDeviceId());

View File

@ -8,6 +8,7 @@ import com.genersoft.iot.vmp.common.VideoManagerConstants;
import com.genersoft.iot.vmp.conf.DynamicTask;
import com.genersoft.iot.vmp.conf.SipConfig;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.conf.exception.ControllerException;
import com.genersoft.iot.vmp.gb28181.bean.*;
import com.genersoft.iot.vmp.gb28181.session.AudioBroadcastManager;
import com.genersoft.iot.vmp.gb28181.session.SSRCFactory;
@ -24,7 +25,6 @@ import com.genersoft.iot.vmp.media.event.hook.HookSubscribe;
import com.genersoft.iot.vmp.media.event.hook.HookType;
import com.genersoft.iot.vmp.media.service.IMediaServerService;
import com.genersoft.iot.vmp.media.zlm.ZLMMediaListManager;
import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory;
import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem;
import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem;
import com.genersoft.iot.vmp.service.IInviteStreamService;
@ -61,7 +61,6 @@ import javax.sip.header.CallIdHeader;
import javax.sip.message.Response;
import java.text.ParseException;
import java.time.Instant;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;
import java.util.Vector;
@ -113,9 +112,6 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
@Autowired
private AudioBroadcastManager audioBroadcastManager;
@Autowired
private ZLMServerFactory zlmServerFactory;
@Autowired
private IMediaServerService mediaServerService;
@ -382,8 +378,9 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
} else {
streamTypeStr = "UDP";
}
logger.info("[上级Invite] {}, 平台:{} 通道:{}, 收流地址:{}:{},收流方式:{}, ssrc{}", sessionName, username, channelId, addressStr, port, streamTypeStr, ssrc);
SendRtpItem sendRtpItem = zlmServerFactory.createSendRtpItem(mediaServerItem, addressStr, port, ssrc, requesterId,
logger.info("[上级Invite] {}, 平台:{} 通道:{}, 收流地址:{}:{},收流方式:{}, ssrc{}",
sessionName, username, channelId, addressStr, port, streamTypeStr, ssrc);
SendRtpItem sendRtpItem = mediaServerService.createSendRtpItem(mediaServerItem, addressStr, port, ssrc, requesterId,
device.getDeviceId(), channelId, mediaTransmissionTCP, platform.isRtcp());
if (tcpActive != null) {
@ -462,30 +459,10 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
responseSdpAck(request, content.toString(), platform);
// tcp主动模式回复sdp后开启监听
if (sendRtpItem.isTcpActive()) {
MediaServer mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());
Map<String, Object> param = new HashMap<>(12);
param.put("vhost","__defaultVhost__");
param.put("app",sendRtpItem.getApp());
param.put("stream",sendRtpItem.getStream());
param.put("ssrc", sendRtpItem.getSsrc());
if (!sendRtpItem.isTcpActive()) {
param.put("dst_url",sendRtpItem.getIp());
param.put("dst_port", sendRtpItem.getPort());
}
String is_Udp = sendRtpItem.isTcp() ? "0" : "1";
param.put("is_udp", is_Udp);
param.put("src_port", localPort);
param.put("pt", sendRtpItem.getPt());
param.put("use_ps", sendRtpItem.isUsePs() ? "1" : "0");
param.put("only_audio", sendRtpItem.isOnlyAudio() ? "1" : "0");
if (!sendRtpItem.isTcp()) {
// 开启rtcp保活
param.put("udp_rtcp_timeout", sendRtpItem.isRtcp()? "1":"0");
}
JSONObject startSendRtpStreamResult = zlmServerFactory.startSendRtpPassive(mediaInfo, param);
if (startSendRtpStreamResult != null) {
startSendRtpStreamHand(evt, sendRtpItem, null, startSendRtpStreamResult, param, callIdHeader);
}
MediaServer mediaServer = mediaServerService.getOne(sendRtpItem.getMediaServerId());
try {
mediaServerService.startSendRtpPassive(mediaServer, platform, sendRtpItem, 5);
}catch (ControllerException e) {}
}
} catch (SipException | InvalidArgumentException | ParseException e) {
logger.error("[命令发送失败] 国标级联 回复SdpAck", e);
@ -638,13 +615,14 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
*
*/
private void pushProxyStream(RequestEvent evt, SIPRequest request, GbStream gbStream, ParentPlatform platform,
CallIdHeader callIdHeader, MediaServer mediaServerItem,
CallIdHeader callIdHeader, MediaServer mediaServer,
int port, Boolean tcpActive, boolean mediaTransmissionTCP,
String channelId, String addressStr, String ssrc, String requesterId) {
Boolean streamReady = mediaServerService.isStreamReady(mediaServerItem, gbStream.getApp(), gbStream.getStream());
Boolean streamReady = mediaServerService.isStreamReady(mediaServer, gbStream.getApp(), gbStream.getStream());
if (streamReady != null && streamReady) {
// 自平台内容
SendRtpItem sendRtpItem = zlmServerFactory.createSendRtpItem(mediaServerItem, addressStr, port, ssrc, requesterId,
SendRtpItem sendRtpItem = mediaServerService.createSendRtpItem(mediaServer, addressStr, port, ssrc, requesterId,
gbStream.getApp(), gbStream.getStream(), channelId, mediaTransmissionTCP, platform.isRtcp());
if (sendRtpItem == null) {
@ -665,7 +643,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
sendRtpItem.setCallId(callIdHeader.getCallId());
sendRtpItem.setFromTag(request.getFromTag());
SIPResponse response = sendStreamAck(mediaServerItem, request, sendRtpItem, platform, evt);
SIPResponse response = sendStreamAck(mediaServer, request, sendRtpItem, platform, evt);
if (response != null) {
sendRtpItem.setToTag(response.getToTag());
}
@ -684,7 +662,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
Boolean streamReady = mediaServerService.isStreamReady(mediaServerItem, gbStream.getApp(), gbStream.getStream());
if (streamReady != null && streamReady) {
// 自平台内容
SendRtpItem sendRtpItem = zlmServerFactory.createSendRtpItem(mediaServerItem, addressStr, port, ssrc, requesterId,
SendRtpItem sendRtpItem = mediaServerService.createSendRtpItem(mediaServerItem, addressStr, port, ssrc, requesterId,
gbStream.getApp(), gbStream.getStream(), channelId, mediaTransmissionTCP, platform.isRtcp());
if (sendRtpItem == null) {
@ -794,7 +772,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
dynamicTask.stop(callIdHeader.getCallId());
redisPushStreamResponseListener.removeEvent(gbStream.getApp(), gbStream.getStream());
if (serverId.equals(userSetting.getServerId())) {
SendRtpItem sendRtpItem = zlmServerFactory.createSendRtpItem(mediaServerItem, addressStr, finalPort, ssrc, requesterId,
SendRtpItem sendRtpItem = mediaServerService.createSendRtpItem(mediaServerItem, addressStr, finalPort, ssrc, requesterId,
app, stream, channelId, mediaTransmissionTCP, platform.isRtcp());
if (sendRtpItem == null) {
@ -1074,7 +1052,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
mediaTransmissionTCP ? (tcpActive ? "TCP主动" : "TCP被动") : "UDP", sdp.getSessionName().getValue());
CallIdHeader callIdHeader = (CallIdHeader) request.getHeader(CallIdHeader.NAME);
SendRtpItem sendRtpItem = zlmServerFactory.createSendRtpItem(mediaServerItem, addressStr, port, gb28181Sdp.getSsrc(), requesterId,
SendRtpItem sendRtpItem = mediaServerService.createSendRtpItem(mediaServerItem, addressStr, port, gb28181Sdp.getSsrc(), requesterId,
device.getDeviceId(), broadcastCatch.getChannelId(),
mediaTransmissionTCP, false);

View File

@ -1,16 +1,15 @@
package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.notify.cmd;
import com.alibaba.fastjson2.JSONObject;
import com.genersoft.iot.vmp.conf.exception.ControllerException;
import com.genersoft.iot.vmp.gb28181.bean.*;
import com.genersoft.iot.vmp.gb28181.session.AudioBroadcastManager;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.IMessageHandler;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.notify.NotifyMessageHandler;
import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory;
import com.genersoft.iot.vmp.media.bean.MediaServer;
import com.genersoft.iot.vmp.service.IDeviceService;
import com.genersoft.iot.vmp.media.service.IMediaServerService;
import com.genersoft.iot.vmp.service.IDeviceService;
import com.genersoft.iot.vmp.service.IPlatformService;
import com.genersoft.iot.vmp.service.IPlayService;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
@ -62,9 +61,6 @@ public class BroadcastNotifyMessageHandler extends SIPRequestProcessorParent imp
@Autowired
private AudioBroadcastManager audioBroadcastManager;
@Autowired
private ZLMServerFactory zlmServerFactory;
@Autowired
private IRedisCatchStorage redisCatchStorage;
@ -155,12 +151,13 @@ public class BroadcastNotifyMessageHandler extends SIPRequestProcessorParent imp
}
}else {
// 发流
JSONObject jsonObject = zlmServerFactory.startSendRtp(hookData.getMediaServer(), sendRtpItem);
if (jsonObject != null && jsonObject.getInteger("code") == 0 ) {
logger.info("[语音喊话] 自动推流成功, device: {}, channel: {}", device.getDeviceId(), targetId);
}else {
logger.info("[语音喊话] 推流失败, 结果: {}", jsonObject);
try {
mediaServerService.startSendRtp(hookData.getMediaServer(),null, sendRtpItem);
}catch (ControllerException e) {
logger.info("[语音喊话] 推流失败, 结果: {}", e.getMessage());
return;
}
logger.info("[语音喊话] 自动推流成功, device: {}, channel: {}", device.getDeviceId(), targetId);
}
}
}else {

View File

@ -141,5 +141,10 @@ public interface IMediaServerService {
void startSendRtpPassive(MediaServer mediaServer, ParentPlatform platform, SendRtpItem sendRtpItem, Integer timeout);
void startSendRtpStream(MediaServer mediaServer, ParentPlatform platform, SendRtpItem sendRtpItem);
void startSendRtp(MediaServer mediaServer, ParentPlatform platform, SendRtpItem sendRtpItem);
SendRtpItem createSendRtpItem(MediaServer mediaServerItem, String addressStr, int port, String ssrc, String requesterId, String deviceId, String channelId, boolean mediaTransmissionTCP, boolean rtcp);
SendRtpItem createSendRtpItem(MediaServer serverItem, String ip, int port, String ssrc, String platformId,
String app, String stream, String channelId, boolean tcp, boolean rtcp);
}

View File

@ -19,6 +19,7 @@ import com.genersoft.iot.vmp.media.event.mediaServer.MediaServerDeleteEvent;
import com.genersoft.iot.vmp.media.service.IMediaNodeServerService;
import com.genersoft.iot.vmp.media.service.IMediaServerService;
import com.genersoft.iot.vmp.media.bean.MediaServer;
import com.genersoft.iot.vmp.media.zlm.SendRtpPortManager;
import com.genersoft.iot.vmp.media.zlm.dto.StreamAuthorityInfo;
import com.genersoft.iot.vmp.service.IInviteStreamService;
import com.genersoft.iot.vmp.service.bean.MediaServerLoad;
@ -83,6 +84,9 @@ public class MediaServerServiceImpl implements IMediaServerService {
@Autowired
private MediaConfig mediaConfig;
@Autowired
private SendRtpPortManager sendRtpPortManager;
/**
@ -812,7 +816,7 @@ public class MediaServerServiceImpl implements IMediaServerService {
}
@Override
public void startSendRtpStream(MediaServer mediaServer, ParentPlatform platform, SendRtpItem sendRtpItem) {
public void startSendRtp(MediaServer mediaServer, ParentPlatform platform, SendRtpItem sendRtpItem) {
IMediaNodeServerService mediaNodeServerService = nodeServerServiceMap.get(mediaServer.getType());
if (mediaNodeServerService == null) {
logger.info("[startSendRtpStream] 失败, mediaServer的类型 {},未找到对应的实现类", mediaServer.getType());
@ -821,7 +825,10 @@ public class MediaServerServiceImpl implements IMediaServerService {
logger.info("[开始推流] rtp/{}, 目标={}:{}SSRC={}, RTCP={}", sendRtpItem.getStream(),
sendRtpItem.getIp(), sendRtpItem.getPort(), sendRtpItem.getSsrc(), sendRtpItem.isRtcp());
mediaNodeServerService.startSendRtpStream(mediaServer, sendRtpItem);
sendPlatformStartPlayMsg(platform, sendRtpItem);
if (platform != null) {
sendPlatformStartPlayMsg(platform, sendRtpItem);
}
}
@ -834,4 +841,50 @@ public class MediaServerServiceImpl implements IMediaServerService {
redisCatchStorage.sendPlatformStartPlayMsg(messageForPushChannel);
}
}
@Override
public SendRtpItem createSendRtpItem(MediaServer mediaServer, String ip, int port, String ssrc, String requesterId, String deviceId, String channelId, boolean isTcp, boolean rtcp) {
int localPort = sendRtpPortManager.getNextPort(mediaServer);
if (localPort == 0) {
return null;
}
SendRtpItem sendRtpItem = new SendRtpItem();
sendRtpItem.setIp(ip);
sendRtpItem.setPort(port);
sendRtpItem.setSsrc(ssrc);
sendRtpItem.setPlatformId(deviceId);
sendRtpItem.setDeviceId(deviceId);
sendRtpItem.setChannelId(channelId);
sendRtpItem.setTcp(isTcp);
sendRtpItem.setRtcp(rtcp);
sendRtpItem.setApp("rtp");
sendRtpItem.setLocalPort(localPort);
sendRtpItem.setServerId(userSetting.getServerId());
sendRtpItem.setMediaServerId(mediaServer.getId());
return sendRtpItem;
}
@Override
public SendRtpItem createSendRtpItem(MediaServer serverItem, String ip, int port, String ssrc, String platformId,
String app, String stream, String channelId, boolean tcp, boolean rtcp){
int localPort = sendRtpPortManager.getNextPort(serverItem);
if (localPort == 0) {
return null;
}
SendRtpItem sendRtpItem = new SendRtpItem();
sendRtpItem.setIp(ip);
sendRtpItem.setPort(port);
sendRtpItem.setSsrc(ssrc);
sendRtpItem.setApp(app);
sendRtpItem.setStream(stream);
sendRtpItem.setPlatformId(platformId);
sendRtpItem.setChannelId(channelId);
sendRtpItem.setTcp(tcp);
sendRtpItem.setLocalPort(localPort);
sendRtpItem.setServerId(userSetting.getServerId());
sendRtpItem.setMediaServerId(serverItem.getId());
sendRtpItem.setRtcp(rtcp);
return sendRtpItem;
}
}

View File

@ -3,15 +3,13 @@ package com.genersoft.iot.vmp.media.zlm;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.gb28181.bean.GbStream;
import com.genersoft.iot.vmp.media.bean.MediaServer;
import com.genersoft.iot.vmp.media.event.hook.HookSubscribe;
import com.genersoft.iot.vmp.media.zlm.dto.*;
import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam;
import com.genersoft.iot.vmp.media.service.IMediaServerService;
import com.genersoft.iot.vmp.service.IStreamProxyService;
import com.genersoft.iot.vmp.media.zlm.dto.ChannelOnlineEvent;
import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem;
import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam;
import com.genersoft.iot.vmp.service.IStreamPushService;
import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
import com.genersoft.iot.vmp.storager.dao.GbStreamMapper;
import com.genersoft.iot.vmp.storager.dao.PlatformGbStreamMapper;
import com.genersoft.iot.vmp.storager.dao.StreamPushMapper;
import com.genersoft.iot.vmp.utils.DateUtil;
import org.slf4j.Logger;
@ -20,7 +18,7 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.text.ParseException;
import java.util.*;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
@ -37,26 +35,16 @@ public class ZLMMediaListManager {
@Autowired
private GbStreamMapper gbStreamMapper;
@Autowired
private PlatformGbStreamMapper platformGbStreamMapper;
@Autowired
private IStreamPushService streamPushService;
@Autowired
private IStreamProxyService streamProxyService;
@Autowired
private StreamPushMapper streamPushMapper;
@Autowired
private HookSubscribe subscribe;
@Autowired
private UserSetting userSetting;
@Autowired
private ZLMServerFactory zlmServerFactory;
@Autowired
private IMediaServerService mediaServerService;

View File

@ -316,11 +316,23 @@ public class ZLMMediaNodeServerService implements IMediaNodeServerService {
if (timeout != null) {
param.put("close_delay_ms", timeout);
}
if (!sendRtpItem.isTcp()) {
// 开启rtcp保活
param.put("udp_rtcp_timeout", sendRtpItem.isRtcp()? "1":"0");
}
if (!sendRtpItem.isTcpActive()) {
param.put("dst_url",sendRtpItem.getIp());
param.put("dst_port", sendRtpItem.getPort());
}
JSONObject jsonObject = zlmServerFactory.startSendRtpPassive(mediaServer, param, null);
if (jsonObject == null || jsonObject.getInteger("code") != 0 ) {
logger.error("启动监听TCP被动推流失败: {}, 参数:{}", jsonObject.getString("msg"), JSON.toJSONString(param));
throw new ControllerException(jsonObject.getInteger("code"), jsonObject.getString("msg"));
}
logger.info("调用ZLM-TCP被动推流接口, 结果: {}", jsonObject);
logger.info("启动监听TCP被动推流成功[ {}/{} ]{}->{}:{}, " , sendRtpItem.getApp(), sendRtpItem.getStream(),
jsonObject.getString("local_port"), param.get("dst_url"), param.get("dst_port"));
}
@Override

View File

@ -5,7 +5,6 @@ import com.alibaba.fastjson2.JSONObject;
import com.genersoft.iot.vmp.common.CommonCallback;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
import com.genersoft.iot.vmp.media.event.hook.HookSubscribe;
import com.genersoft.iot.vmp.media.bean.MediaServer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -26,9 +25,6 @@ public class ZLMServerFactory {
@Autowired
private UserSetting userSetting;
@Autowired
private HookSubscribe hookSubscribe;
@Autowired
private SendRtpPortManager sendRtpPortManager;

View File

@ -1,5 +1,7 @@
package com.genersoft.iot.vmp.service.bean;
import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
/**
* redis
* @author lin
@ -80,6 +82,22 @@ public class RequestPushStreamMsg {
return requestPushStreamMsg;
}
public static RequestPushStreamMsg getInstance(SendRtpItem sendRtpItem) {
RequestPushStreamMsg requestPushStreamMsg = new RequestPushStreamMsg();
requestPushStreamMsg.setMediaServerId(sendRtpItem.getMediaServerId());
requestPushStreamMsg.setApp(sendRtpItem.getApp());
requestPushStreamMsg.setStream(sendRtpItem.getStream());
requestPushStreamMsg.setIp(sendRtpItem.getIp());
requestPushStreamMsg.setPort(sendRtpItem.getPort());
requestPushStreamMsg.setSsrc(sendRtpItem.getSsrc());
requestPushStreamMsg.setTcp(sendRtpItem.isTcp());
requestPushStreamMsg.setSrcPort(sendRtpItem.getLocalPort());
requestPushStreamMsg.setPt(sendRtpItem.getPt());
requestPushStreamMsg.setPs(sendRtpItem.isUsePs());
requestPushStreamMsg.setOnlyAudio(sendRtpItem.isOnlyAudio());
return requestPushStreamMsg;
}
public String getMediaServerId() {
return mediaServerId;
}

View File

@ -1,7 +1,9 @@
package com.genersoft.iot.vmp.service.impl;
import com.baomidou.dynamic.datasource.annotation.DS;
import com.genersoft.iot.vmp.common.*;
import com.genersoft.iot.vmp.common.InviteInfo;
import com.genersoft.iot.vmp.common.InviteSessionStatus;
import com.genersoft.iot.vmp.common.InviteSessionType;
import com.genersoft.iot.vmp.conf.DynamicTask;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.conf.exception.SsrcTransactionNotFoundException;
@ -11,13 +13,12 @@ import com.genersoft.iot.vmp.gb28181.session.SSRCFactory;
import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform;
import com.genersoft.iot.vmp.gb28181.utils.SipUtils;
import com.genersoft.iot.vmp.media.bean.MediaServer;
import com.genersoft.iot.vmp.media.event.hook.HookData;
import com.genersoft.iot.vmp.media.event.hook.HookSubscribe;
import com.genersoft.iot.vmp.media.event.media.MediaDepartureEvent;
import com.genersoft.iot.vmp.media.event.mediaServer.MediaSendRtpStoppedEvent;
import com.genersoft.iot.vmp.media.event.hook.HookSubscribe;
import com.genersoft.iot.vmp.media.service.IMediaServerService;
import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory;
import com.genersoft.iot.vmp.media.bean.MediaServer;
import com.genersoft.iot.vmp.service.IInviteStreamService;
import com.genersoft.iot.vmp.service.IPlatformService;
import com.genersoft.iot.vmp.service.IPlayService;
@ -41,7 +42,9 @@ import javax.sip.InvalidArgumentException;
import javax.sip.ResponseEvent;
import javax.sip.SipException;
import java.text.ParseException;
import java.util.*;
import java.util.List;
import java.util.UUID;
import java.util.Vector;
/**
* @author lin
@ -75,9 +78,6 @@ public class PlatformServiceImpl implements IPlatformService {
@Autowired
private DynamicTask dynamicTask;
@Autowired
private ZLMServerFactory zlmServerFactory;
@Autowired
private SubscribeHolder subscribeHolder;
@ -87,9 +87,6 @@ public class PlatformServiceImpl implements IPlatformService {
@Autowired
private UserSetting userSetting;
@Autowired
private HookSubscribe subscribe;
@Autowired
private VideoStreamSessionManager streamSession;
@ -437,11 +434,7 @@ public class PlatformServiceImpl implements IPlatformService {
ssrcFactory.releaseSsrc(sendRtpItem.getMediaServerId(), sendRtpItem.getSsrc());
redisCatchStorage.deleteSendRTPServer(platformId, sendRtpItem.getChannelId(), null, null);
MediaServer mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());
Map<String, Object> param = new HashMap<>(3);
param.put("vhost", "__defaultVhost__");
param.put("app", sendRtpItem.getApp());
param.put("stream", sendRtpItem.getStream());
zlmServerFactory.stopSendRtpStream(mediaInfo, param);
mediaServerService.stopSendRtp(mediaInfo, sendRtpItem.getApp(), sendRtpItem.getStream(), null);
}
}
}

View File

@ -1,6 +1,5 @@
package com.genersoft.iot.vmp.service.impl;
import com.alibaba.fastjson2.JSONObject;
import com.baomidou.dynamic.datasource.annotation.DS;
import com.genersoft.iot.vmp.common.*;
import com.genersoft.iot.vmp.conf.DynamicTask;
@ -25,7 +24,6 @@ import com.genersoft.iot.vmp.media.event.media.MediaDepartureEvent;
import com.genersoft.iot.vmp.media.event.media.MediaNotFoundEvent;
import com.genersoft.iot.vmp.media.service.IMediaServerService;
import com.genersoft.iot.vmp.media.zlm.SendRtpPortManager;
import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory;
import com.genersoft.iot.vmp.media.event.hook.HookSubscribe;
import com.genersoft.iot.vmp.media.bean.MediaServer;
import com.genersoft.iot.vmp.service.*;
@ -1421,11 +1419,8 @@ public class PlayServiceImpl implements IPlayService {
MediaServer mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());
if (mediaInfo == null) {
RequestPushStreamMsg requestPushStreamMsg = RequestPushStreamMsg.getInstance(
sendRtpItem.getMediaServerId(), sendRtpItem.getApp(), sendRtpItem.getStream(),
sendRtpItem.getIp(), sendRtpItem.getPort(), sendRtpItem.getSsrc(), sendRtpItem.isTcp(),
sendRtpItem.getLocalPort(), sendRtpItem.getPt(), sendRtpItem.isUsePs(), sendRtpItem.isOnlyAudio());
redisGbPlayMsgListener.sendMsgForStartSendRtpStream(sendRtpItem.getServerId(), requestPushStreamMsg, json -> {
RequestPushStreamMsg requestPushStreamMsg = RequestPushStreamMsg.getInstance(sendRtpItem);
redisGbPlayMsgListener.sendMsgForStartSendRtpStream(sendRtpItem.getServerId(), requestPushStreamMsg, () -> {
startSendRtpStreamFailHand(sendRtpItem, platform, callIdHeader);
});
} else {
@ -1433,7 +1428,7 @@ public class PlayServiceImpl implements IPlayService {
if (sendRtpItem.isTcpActive()) {
mediaServerService.startSendRtpPassive(mediaInfo, platform, sendRtpItem, null);
} else {
mediaServerService.startSendRtpStream(mediaInfo, platform, sendRtpItem);
mediaServerService.startSendRtp(mediaInfo, platform, sendRtpItem);
}
}catch (ControllerException e) {
logger.error("RTP推流失败: {}", e.getMessage());

View File

@ -9,15 +9,14 @@ import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.conf.exception.ControllerException;
import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent;
import com.genersoft.iot.vmp.media.bean.MediaInfo;
import com.genersoft.iot.vmp.media.bean.MediaServer;
import com.genersoft.iot.vmp.media.event.hook.Hook;
import com.genersoft.iot.vmp.media.event.hook.HookSubscribe;
import com.genersoft.iot.vmp.media.event.hook.HookType;
import com.genersoft.iot.vmp.media.event.media.MediaArrivalEvent;
import com.genersoft.iot.vmp.media.event.media.MediaDepartureEvent;
import com.genersoft.iot.vmp.media.event.media.MediaNotFoundEvent;
import com.genersoft.iot.vmp.media.service.IMediaServerService;
import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory;
import com.genersoft.iot.vmp.media.event.hook.HookSubscribe;
import com.genersoft.iot.vmp.media.bean.MediaServer;
import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem;
import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam;
import com.genersoft.iot.vmp.service.IGbStreamService;
@ -64,9 +63,6 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
@Autowired
private IVideoManagerStorage videoManagerStorager;
@Autowired
private ZLMServerFactory zlmServerFactory;
@Autowired
private StreamProxyMapper streamProxyMapper;

View File

@ -5,12 +5,12 @@ import com.alibaba.fastjson2.JSONObject;
import com.genersoft.iot.vmp.common.VideoManagerConstants;
import com.genersoft.iot.vmp.conf.DynamicTask;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.conf.exception.ControllerException;
import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
import com.genersoft.iot.vmp.media.event.hook.Hook;
import com.genersoft.iot.vmp.media.event.hook.HookType;
import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory;
import com.genersoft.iot.vmp.media.event.hook.HookSubscribe;
import com.genersoft.iot.vmp.media.bean.MediaServer;
import com.genersoft.iot.vmp.media.event.hook.Hook;
import com.genersoft.iot.vmp.media.event.hook.HookSubscribe;
import com.genersoft.iot.vmp.media.event.hook.HookType;
import com.genersoft.iot.vmp.media.service.IMediaServerService;
import com.genersoft.iot.vmp.service.bean.*;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
@ -27,7 +27,6 @@ import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;
import java.text.ParseException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
@ -72,9 +71,6 @@ public class RedisGbPlayMsgListener implements MessageListener {
@Autowired
private RedisTemplate<Object, Object> redisTemplate;
@Autowired
private ZLMServerFactory zlmServerFactory;
@Autowired
private IMediaServerService mediaServerService;
@ -101,7 +97,7 @@ public class RedisGbPlayMsgListener implements MessageListener {
}
public interface PlayMsgCallbackForStartSendRtpStream{
void handler(JSONObject jsonObject);
void handler();
}
public interface PlayMsgErrorCallback{
@ -181,11 +177,10 @@ public class RedisGbPlayMsgListener implements MessageListener {
String serial = wvpRedisMsg.getSerial();
switch (wvpResult.getCode()) {
case 0:
JSONObject jsonObject = (JSONObject)wvpResult.getData();
PlayMsgCallbackForStartSendRtpStream playMsgCallback = callbacksForStartSendRtpStream.get(serial);
if (playMsgCallback != null) {
callbacksForError.remove(serial);
playMsgCallback.handler(jsonObject);
playMsgCallback.handler();
}
break;
case ERROR_CODE_MEDIA_SERVER_NOT_FOUND:
@ -219,36 +214,24 @@ public class RedisGbPlayMsgListener implements MessageListener {
*
*/
private void requestPushStreamMsgHand(RequestPushStreamMsg requestPushStreamMsg, String fromId, String serial) {
MediaServer mediaInfo = mediaServerService.getOne(requestPushStreamMsg.getMediaServerId());
if (mediaInfo == null) {
MediaServer mediaServer = mediaServerService.getOne(requestPushStreamMsg.getMediaServerId());
if (mediaServer == null) {
// TODO 回复错误
return;
}
String is_Udp = requestPushStreamMsg.isTcp() ? "0" : "1";
Map<String, Object> param = new HashMap<>();
param.put("vhost","__defaultVhost__");
param.put("app",requestPushStreamMsg.getApp());
param.put("stream",requestPushStreamMsg.getStream());
param.put("ssrc", requestPushStreamMsg.getSsrc());
param.put("dst_url",requestPushStreamMsg.getIp());
param.put("dst_port", requestPushStreamMsg.getPort());
param.put("is_udp", is_Udp);
param.put("src_port", requestPushStreamMsg.getSrcPort());
param.put("pt", requestPushStreamMsg.getPt());
param.put("use_ps", requestPushStreamMsg.isPs() ? "1" : "0");
param.put("only_audio", requestPushStreamMsg.isOnlyAudio() ? "1" : "0");
JSONObject jsonObject = zlmServerFactory.startSendRtpStream(mediaInfo, param);
SendRtpItem sendRtpItem = SendRtpItem.getInstance(requestPushStreamMsg);
try {
mediaServerService.startSendRtp(mediaServer, null, sendRtpItem);
}catch (ControllerException e) {
return;
}
// 回复消息
responsePushStream(jsonObject, fromId, serial);
}
private void responsePushStream(JSONObject content, String toId, String serial) {
WVPResult<JSONObject> result = new WVPResult<>();
result.setCode(0);
result.setData(content);
WvpRedisMsg response = WvpRedisMsg.getResponseInstance(userSetting.getServerId(), toId,
WvpRedisMsg response = WvpRedisMsg.getResponseInstance(userSetting.getServerId(), fromId,
WvpRedisMsgCmd.REQUEST_PUSH_STREAM, serial, JSON.toJSONString(result));
JSONObject jsonObject = (JSONObject)JSON.toJSON(response);
redisTemplate.convertAndSend(WVP_PUSH_STREAM_KEY, jsonObject);
@ -317,7 +300,7 @@ public class RedisGbPlayMsgListener implements MessageListener {
* sendItem
*/
private void responseSendItem(MediaServer mediaServerItem, RequestSendItemMsg content, String toId, String serial) {
SendRtpItem sendRtpItem = zlmServerFactory.createSendRtpItem(mediaServerItem, content.getIp(),
SendRtpItem sendRtpItem = mediaServerService.createSendRtpItem(mediaServerItem, content.getIp(),
content.getPort(), content.getSsrc(), content.getPlatformId(),
content.getApp(), content.getStream(), content.getChannelId(),
content.getTcp(), content.getRtcp());
@ -453,13 +436,8 @@ public class RedisGbPlayMsgListener implements MessageListener {
// TODO 回复错误
return;
}
Map<String, Object> param = new HashMap<>();
param.put("vhost","__defaultVhost__");
param.put("app",sendRtpItem.getApp());
param.put("stream",sendRtpItem.getStream());
param.put("ssrc", sendRtpItem.getSsrc());
if (zlmServerFactory.stopSendRtpStream(mediaInfo, param)) {
if (mediaServerService.stopSendRtp(mediaInfo, sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getSsrc())) {
logger.info("[REDIS 执行其他平台的请求停止推流] 成功: {}/{}", sendRtpItem.getApp(), sendRtpItem.getStream());
// 发送redis消息
MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(0,

View File

@ -8,7 +8,6 @@ import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform;
import com.genersoft.iot.vmp.media.bean.MediaServer;
import com.genersoft.iot.vmp.media.service.IMediaServerService;
import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory;
import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem;
import com.genersoft.iot.vmp.service.IStreamPushService;
import com.genersoft.iot.vmp.service.bean.MessageForPushChannel;
@ -25,7 +24,6 @@ import org.springframework.stereotype.Component;
import javax.sip.InvalidArgumentException;
import javax.sip.SipException;
import java.text.ParseException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@ -57,9 +55,6 @@ public class RedisPushStreamCloseResponseListener implements MessageListener {
@Autowired
private IMediaServerService mediaServerService;
@Autowired
private ZLMServerFactory zlmServerFactory;
private Map<String, PushStreamResponseEvent> responseEvents = new ConcurrentHashMap<>();
@ -88,16 +83,10 @@ public class RedisPushStreamCloseResponseListener implements MessageListener {
}
if (push.isSelf()) {
// 停止向上级推流
String streamId = sendRtpItem.getStream();
Map<String, Object> param = new HashMap<>();
param.put("vhost","__defaultVhost__");
param.put("app",sendRtpItem.getApp());
param.put("stream",streamId);
param.put("ssrc",sendRtpItem.getSsrc());
logger.info("[REDIS消息-推流结束] 停止向上级推流:{}", streamId);
logger.info("[REDIS消息-推流结束] 停止向上级推流:{}", sendRtpItem.getStream());
MediaServer mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());
redisCatchStorage.deleteSendRTPServer(sendRtpItem.getPlatformId(), sendRtpItem.getChannelId(), sendRtpItem.getCallId(), sendRtpItem.getStream());
zlmServerFactory.stopSendRtpStream(mediaInfo, param);
mediaServerService.stopSendRtp(mediaInfo, sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getSsrc());
if (InviteStreamType.PUSH == sendRtpItem.getPlayType()) {
MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(0,
sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getChannelId(),

View File

@ -13,6 +13,7 @@ import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory;
import com.genersoft.iot.vmp.media.event.hook.HookSubscribe;
import com.genersoft.iot.vmp.media.bean.MediaServer;
import com.genersoft.iot.vmp.media.service.IMediaServerService;
import com.genersoft.iot.vmp.service.bean.SSRCInfo;
import com.genersoft.iot.vmp.utils.redis.RedisUtil;
import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
import com.genersoft.iot.vmp.vmanager.bean.OtherPsSendInfo;
@ -81,8 +82,8 @@ public class PsController {
logger.info("[第三方PS服务对接->开启收流和获取发流信息] isSend->{}, ssrc->{}, callId->{}, stream->{}, tcpMode->{}, callBack->{}",
isSend, ssrc, callId, stream, tcpMode==0?"UDP":"TCP被动", callBack);
MediaServer mediaServerItem = mediaServerService.getDefaultMediaServer();
if (mediaServerItem == null) {
MediaServer mediaServer = mediaServerService.getDefaultMediaServer();
if (mediaServer == null) {
throw new ControllerException(ErrorCode.ERROR100.getCode(),"没有可用的MediaServer");
}
if (stream == null) {
@ -100,13 +101,14 @@ public class PsController {
}
}
String receiveKey = VideoManagerConstants.WVP_OTHER_RECEIVE_PS_INFO + userSetting.getServerId() + "_" + callId + "_" + stream;
int localPort = zlmServerFactory.createRTPServer(mediaServerItem, stream, ssrcInt, null, false, false, tcpMode);
if (localPort == 0) {
SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServer, stream, ssrcInt + "", false, false, null, false, false, false, tcpMode);
if (ssrcInfo.getPort() == 0) {
throw new ControllerException(ErrorCode.ERROR100.getCode(), "获取端口失败");
}
// 注册回调如果rtp收流超时则通过回调发送通知
if (callBack != null) {
Hook hook = Hook.getInstance(HookType.on_rtp_server_timeout, "rtp", stream, mediaServerItem.getId());
Hook hook = Hook.getInstance(HookType.on_rtp_server_timeout, "rtp", stream, mediaServer.getId());
// 订阅 zlm启动事件, 新的zlm也会从这里进入系统
hookSubscribe.addSubscribe(hook,
(hookData)->{
@ -128,8 +130,8 @@ public class PsController {
});
}
OtherPsSendInfo otherPsSendInfo = new OtherPsSendInfo();
otherPsSendInfo.setReceiveIp(mediaServerItem.getSdpIp());
otherPsSendInfo.setReceivePort(localPort);
otherPsSendInfo.setReceiveIp(mediaServer.getSdpIp());
otherPsSendInfo.setReceivePort(ssrcInfo.getPort());
otherPsSendInfo.setCallId(callId);
otherPsSendInfo.setStream(stream);
@ -138,9 +140,9 @@ public class PsController {
if (isSend != null && isSend) {
String key = VideoManagerConstants.WVP_OTHER_SEND_PS_INFO + userSetting.getServerId() + "_" + callId;
// 预创建发流信息
int port = sendRtpPortManager.getNextPort(mediaServerItem);
int port = sendRtpPortManager.getNextPort(mediaServer);
otherPsSendInfo.setSendLocalIp(mediaServerItem.getSdpIp());
otherPsSendInfo.setSendLocalIp(mediaServer.getSdpIp());
otherPsSendInfo.setSendLocalPort(port);
// 将信息写入redis中以备后用
redisTemplate.opsForValue().set(key, otherPsSendInfo, 300, TimeUnit.SECONDS);
@ -156,7 +158,7 @@ public class PsController {
public void closeRtpServer(String stream) {
logger.info("[第三方PS服务对接->关闭收流] stream->{}", stream);
MediaServer mediaServerItem = mediaServerService.getDefaultMediaServer();
zlmServerFactory.closeRtpServer(mediaServerItem,stream);
mediaServerService.closeRTPServer(mediaServerItem, stream);
String receiveKey = VideoManagerConstants.WVP_OTHER_RECEIVE_PS_INFO + userSetting.getServerId() + "_*_" + stream;
List<Object> scan = RedisUtil.scan(redisTemplate, receiveKey);
if (!scan.isEmpty()) {
@ -198,7 +200,7 @@ public class PsController {
app,
stream,
callId);
MediaServer mediaServerItem = mediaServerService.getDefaultMediaServer();
MediaServer mediaServer = mediaServerService.getDefaultMediaServer();
String key = VideoManagerConstants.WVP_OTHER_SEND_PS_INFO + userSetting.getServerId() + "_" + callId;
OtherPsSendInfo sendInfo = (OtherPsSendInfo)redisTemplate.opsForValue().get(key);
if (sendInfo == null) {
@ -224,9 +226,10 @@ public class PsController {
param.put("src_port", sendInfo.getSendLocalPort());
Boolean streamReady = mediaServerService.isStreamReady(mediaServerItem, app, stream);
Boolean streamReady = mediaServerService.isStreamReady(mediaServer, app, stream);
if (streamReady) {
JSONObject jsonObject = zlmServerFactory.startSendRtpStream(mediaServerItem, param);
JSONObject jsonObject = zlmServerFactory.startSendRtpStream(mediaServer, param);
// mediaServerService.startSendRtp(mediaServer, );
if (jsonObject.getInteger("code") == 0) {
logger.info("[第三方PS服务对接->发送流] 视频流发流成功callId->{}param->{}", callId, param);
redisTemplate.opsForValue().set(key, sendInfo);
@ -238,7 +241,7 @@ public class PsController {
}else {
logger.info("[第三方PS服务对接->发送流] 流不存在等待流上线callId->{}", callId);
String uuid = UUID.randomUUID().toString();
Hook hook = Hook.getInstance(HookType.on_media_arrival, app, stream, mediaServerItem.getId());
Hook hook = Hook.getInstance(HookType.on_media_arrival, app, stream, mediaServer.getId());
dynamicTask.startDelay(uuid, ()->{
logger.info("[第三方PS服务对接->发送流] 等待流上线超时 callId->{}", callId);
redisTemplate.delete(key);
@ -257,7 +260,7 @@ public class PsController {
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
JSONObject jsonObject = zlmServerFactory.startSendRtpStream(mediaServerItem, param);
JSONObject jsonObject = zlmServerFactory.startSendRtpStream(mediaServer, param);
if (jsonObject.getInteger("code") == 0) {
logger.info("[第三方PS服务对接->发送流] 视频流发流成功callId->{}param->{}", callId, param);
redisTemplate.opsForValue().set(key, finalSendInfo);