Merge branch '2.6.8' into wvp-28181-2.0

# Conflicts:
#	src/main/java/com/genersoft/iot/vmp/conf/redis/RedisMsgListenConfig.java
#	src/main/java/com/genersoft/iot/vmp/gb28181/bean/Gb28181Sdp.java
#	src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java
#	src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java
#	src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java
#	src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java
#	src/main/java/com/genersoft/iot/vmp/gb28181/utils/SipUtils.java
#	src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java
#	src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java
#	src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java
pull/894/head
648540858 2023-06-29 09:52:05 +08:00
commit f62bf7b2c6
21 changed files with 492 additions and 126 deletions

View File

@ -1,6 +1,7 @@
<!-- 侧边栏 --> <!-- 侧边栏 -->
* **编译与部署** * **编译与部署**
* [测试](_content/introduction/test.md)
* [编译](_content/introduction/compile.md) * [编译](_content/introduction/compile.md)
* [配置](_content/introduction/config.md) * [配置](_content/introduction/config.md)
* [部署](_content/introduction/deployment.md) * [部署](_content/introduction/deployment.md)

View File

@ -100,6 +100,21 @@ public class VideoManagerConstants {
*/ */
public static final String VM_MSG_STREAM_PUSH_REQUESTED = "VM_MSG_STREAM_PUSH_REQUESTED"; public static final String VM_MSG_STREAM_PUSH_REQUESTED = "VM_MSG_STREAM_PUSH_REQUESTED";
/**
* redis
*/
public static final String VM_MSG_STREAM_START_PLAY_NOTIFY = "VM_MSG_STREAM_START_PLAY_NOTIFY";
/**
* redis
*/
public static final String VM_MSG_STREAM_STOP_PLAY_NOTIFY = "VM_MSG_STREAM_STOP_PLAY_NOTIFY";
/**
* redis
*/
public static final String VM_MSG_STREAM_PUSH_CLOSE_REQUESTED = "VM_MSG_STREAM_PUSH_CLOSE_REQUESTED";
/** /**
* redis * redis

View File

@ -46,6 +46,9 @@ public class RedisMsgListenConfig {
@Autowired @Autowired
private RedisCloseStreamMsgListener redisCloseStreamMsgListener; private RedisCloseStreamMsgListener redisCloseStreamMsgListener;
@Autowired
private RedisPushStreamCloseResponseListener redisPushStreamCloseResponseListener;
/** /**
* redis redis * redis redis
@ -67,6 +70,7 @@ public class RedisMsgListenConfig {
container.addMessageListener(redisPushStreamListMsgListener, new PatternTopic(VideoManagerConstants.VM_MSG_PUSH_STREAM_LIST_CHANGE)); container.addMessageListener(redisPushStreamListMsgListener, new PatternTopic(VideoManagerConstants.VM_MSG_PUSH_STREAM_LIST_CHANGE));
container.addMessageListener(redisPushStreamResponseListener, new PatternTopic(VideoManagerConstants.VM_MSG_STREAM_PUSH_RESPONSE)); container.addMessageListener(redisPushStreamResponseListener, new PatternTopic(VideoManagerConstants.VM_MSG_STREAM_PUSH_RESPONSE));
container.addMessageListener(redisCloseStreamMsgListener, new PatternTopic(VideoManagerConstants.VM_MSG_STREAM_PUSH_CLOSE)); container.addMessageListener(redisCloseStreamMsgListener, new PatternTopic(VideoManagerConstants.VM_MSG_STREAM_PUSH_CLOSE));
container.addMessageListener(redisPushStreamCloseResponseListener, new PatternTopic(VideoManagerConstants.VM_MSG_STREAM_PUSH_CLOSE_REQUESTED));
return container; return container;
} }
} }

View File

@ -64,7 +64,7 @@ public class SipLayer implements CommandLineRunner {
try { try {
sipStack = (SipStackImpl)SipFactory.getInstance().createSipStack(DefaultProperties.getProperties(monitorIp, userSetting.getSipLog())); sipStack = (SipStackImpl)SipFactory.getInstance().createSipStack(DefaultProperties.getProperties(monitorIp, userSetting.getSipLog()));
} catch (PeerUnavailableException e) { } catch (PeerUnavailableException e) {
logger.error("[Sip Server] SIP服务启动失败 监听地址{}失败,请检查ip是否正确", monitorIp); logger.error("[SIP SERVER] SIP服务启动失败 监听地址{}失败,请检查ip是否正确", monitorIp);
return; return;
} }
@ -76,12 +76,12 @@ public class SipLayer implements CommandLineRunner {
tcpSipProvider.addSipListener(sipProcessorObserver); tcpSipProvider.addSipListener(sipProcessorObserver);
tcpSipProviderMap.put(monitorIp, tcpSipProvider); tcpSipProviderMap.put(monitorIp, tcpSipProvider);
logger.info("[Sip Server] tcp://{}:{} 启动成功", monitorIp, port); logger.info("[SIP SERVER] tcp://{}:{} 启动成功", monitorIp, port);
} catch (TransportNotSupportedException } catch (TransportNotSupportedException
| TooManyListenersException | TooManyListenersException
| ObjectInUseException | ObjectInUseException
| InvalidArgumentException e) { | InvalidArgumentException e) {
logger.error("[Sip Server] tcp://{}:{} SIP服务启动失败,请检查端口是否被占用或者ip是否正确" logger.error("[SIP SERVER] tcp://{}:{} SIP服务启动失败,请检查端口是否被占用或者ip是否正确"
, monitorIp, port); , monitorIp, port);
} }
@ -93,12 +93,12 @@ public class SipLayer implements CommandLineRunner {
udpSipProviderMap.put(monitorIp, udpSipProvider); udpSipProviderMap.put(monitorIp, udpSipProvider);
logger.info("[Sip Server] udp://{}:{} 启动成功", monitorIp, port); logger.info("[SIP SERVER] udp://{}:{} 启动成功", monitorIp, port);
} catch (TransportNotSupportedException } catch (TransportNotSupportedException
| TooManyListenersException | TooManyListenersException
| ObjectInUseException | ObjectInUseException
| InvalidArgumentException e) { | InvalidArgumentException e) {
logger.error("[Sip Server] udp://{}:{} SIP服务启动失败,请检查端口是否被占用或者ip是否正确" logger.error("[SIP SERVER] udp://{}:{} SIP服务启动失败,请检查端口是否被占用或者ip是否正确"
, monitorIp, port); , monitorIp, port);
} }
} }

View File

@ -283,7 +283,7 @@ public class SIPRequestHeaderPlarformProvider {
viaHeader.setRPort(); viaHeader.setRPort();
viaHeaders.add(viaHeader); viaHeaders.add(viaHeader);
// from // from
SipURI fromSipURI = SipFactory.getInstance().createAddressFactory().createSipURI(platform.getDeviceGBId(), SipURI fromSipURI = SipFactory.getInstance().createAddressFactory().createSipURI(sendRtpItem.getChannelId(),
platform.getDeviceIp() + ":" + platform.getDevicePort()); platform.getDeviceIp() + ":" + platform.getDevicePort());
Address fromAddress = SipFactory.getInstance().createAddressFactory().createAddress(fromSipURI); Address fromAddress = SipFactory.getInstance().createAddressFactory().createAddress(fromSipURI);
FromHeader fromHeader = SipFactory.getInstance().createHeaderFactory().createFromHeader(fromAddress, sendRtpItem.getToTag()); FromHeader fromHeader = SipFactory.getInstance().createHeaderFactory().createFromHeader(fromAddress, sendRtpItem.getToTag());
@ -296,13 +296,10 @@ public class SIPRequestHeaderPlarformProvider {
MaxForwardsHeader maxForwards = SipFactory.getInstance().createHeaderFactory().createMaxForwardsHeader(70); MaxForwardsHeader maxForwards = SipFactory.getInstance().createHeaderFactory().createMaxForwardsHeader(70);
// ceq // ceq
CSeqHeader cSeqHeader = SipFactory.getInstance().createHeaderFactory().createCSeqHeader(redisCatchStorage.getCSEQ(), Request.BYE); CSeqHeader cSeqHeader = SipFactory.getInstance().createHeaderFactory().createCSeqHeader(redisCatchStorage.getCSEQ(), Request.BYE);
MessageFactoryImpl messageFactory = (MessageFactoryImpl) SipFactory.getInstance().createMessageFactory();
// 设置编码, 防止中文乱码
messageFactory.setDefaultContentEncodingCharset("gb2312");
CallIdHeader callIdHeader = SipFactory.getInstance().createHeaderFactory().createCallIdHeader(sendRtpItem.getCallId()); CallIdHeader callIdHeader = SipFactory.getInstance().createHeaderFactory().createCallIdHeader(sendRtpItem.getCallId());
request = (SIPRequest) messageFactory.createRequest(requestURI, Request.BYE, callIdHeader, cSeqHeader, fromHeader, request = (SIPRequest) SipFactory.getInstance().createMessageFactory().createRequest(requestURI, Request.BYE, callIdHeader, cSeqHeader, fromHeader,
toHeader, viaHeaders, maxForwards); toHeader, viaHeaders, maxForwards);
request.addHeader(SipUtils.createUserAgentHeader(gitUtil)); request.addHeader(SipUtils.createUserAgentHeader(gitUtil));
@ -310,6 +307,7 @@ public class SIPRequestHeaderPlarformProvider {
String sipAddress = platform.getDeviceIp() + ":" + platform.getDevicePort(); String sipAddress = platform.getDeviceIp() + ":" + platform.getDevicePort();
Address concatAddress = SipFactory.getInstance().createAddressFactory().createAddress(SipFactory.getInstance().createAddressFactory() Address concatAddress = SipFactory.getInstance().createAddressFactory().createAddress(SipFactory.getInstance().createAddressFactory()
.createSipURI(platform.getDeviceGBId(), sipAddress)); .createSipURI(platform.getDeviceGBId(), sipAddress));
request.addHeader(SipFactory.getInstance().createHeaderFactory().createContactHeader(concatAddress)); request.addHeader(SipFactory.getInstance().createHeaderFactory().createContactHeader(concatAddress));
return request; return request;

View File

@ -371,7 +371,6 @@ public class SIPCommander implements ISIPCommander {
mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc()); mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
errorEvent.response(e); errorEvent.response(e);
}), e -> { }), e -> {
// 这里为例避免一个通道的点播只有一个callID这个参数使用一个固定值
ResponseEvent responseEvent = (ResponseEvent) e.event; ResponseEvent responseEvent = (ResponseEvent) e.event;
SIPResponse response = (SIPResponse) responseEvent.getResponse(); SIPResponse response = (SIPResponse) responseEvent.getResponse();
streamSession.put(device.getDeviceId(), channelId, "play", stream, ssrcInfo.getSsrc(), mediaServerItem.getId(), response, streamSession.put(device.getDeviceId(), channelId, "play", stream, ssrcInfo.getSsrc(), mediaServerItem.getId(), response,

View File

@ -214,6 +214,8 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform {
continue; continue;
}else { }else {
if (channel.getChannelId().length() != 20) { if (channel.getChannelId().length() != 20) {
catalogXml.append("</Item>\r\n");
logger.warn("[编号长度异常] {} 长度错误请使用20位长度的国标编号,当前长度:{}", channel.getChannelId(), channel.getChannelId().length());
catalogXml.append("</Item>\r\n"); catalogXml.append("</Item>\r\n");
continue; continue;
} }

View File

@ -3,6 +3,8 @@ package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl;
import com.alibaba.fastjson2.JSON; import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONObject; import com.alibaba.fastjson2.JSONObject;
import com.genersoft.iot.vmp.conf.DynamicTask; import com.genersoft.iot.vmp.conf.DynamicTask;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.gb28181.bean.InviteStreamType;
import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform; import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform;
import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem; import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver; import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver;
@ -14,6 +16,7 @@ import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory;
import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe; import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.service.IMediaServerService; import com.genersoft.iot.vmp.service.IMediaServerService;
import com.genersoft.iot.vmp.service.bean.MessageForPushChannel;
import com.genersoft.iot.vmp.service.bean.RequestPushStreamMsg; import com.genersoft.iot.vmp.service.bean.RequestPushStreamMsg;
import com.genersoft.iot.vmp.service.redisMsg.RedisGbPlayMsgListener; import com.genersoft.iot.vmp.service.redisMsg.RedisGbPlayMsgListener;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
@ -57,6 +60,9 @@ public class AckRequestProcessor extends SIPRequestProcessorParent implements In
@Autowired @Autowired
private IRedisCatchStorage redisCatchStorage; private IRedisCatchStorage redisCatchStorage;
@Autowired
private UserSetting userSetting;
@Autowired @Autowired
private IVideoManagerStorage storager; private IVideoManagerStorage storager;
@ -155,6 +161,13 @@ public class AckRequestProcessor extends SIPRequestProcessorParent implements In
} else if (jsonObject.getInteger("code") == 0) { } else if (jsonObject.getInteger("code") == 0) {
logger.info("调用ZLM推流接口, 结果: {}", jsonObject); logger.info("调用ZLM推流接口, 结果: {}", jsonObject);
logger.info("RTP推流成功[ {}/{} ]{}->{}:{}, " ,param.get("app"), param.get("stream"), jsonObject.getString("local_port"), param.get("dst_url"), param.get("dst_port")); logger.info("RTP推流成功[ {}/{} ]{}->{}:{}, " ,param.get("app"), param.get("stream"), jsonObject.getString("local_port"), param.get("dst_url"), param.get("dst_port"));
if (sendRtpItem.getPlayType() == InviteStreamType.PUSH) {
MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(0, sendRtpItem.getApp(), sendRtpItem.getStreamId(),
sendRtpItem.getChannelId(), parentPlatform.getServerGBId(), parentPlatform.getName(), userSetting.getServerId(),
sendRtpItem.getMediaServerId());
messageForPushChannel.setPlatFormIndex(parentPlatform.getId());
redisCatchStorage.sendPlatformStartPlayMsg(messageForPushChannel);
}
} else { } else {
logger.error("RTP推流失败: {}, 参数:{}",jsonObject.getString("msg"), JSON.toJSONString(param)); logger.error("RTP推流失败: {}, 参数:{}",jsonObject.getString("msg"), JSON.toJSONString(param));
if (sendRtpItem.isOnlyAudio()) { if (sendRtpItem.isOnlyAudio()) {

View File

@ -2,12 +2,15 @@ package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl;
import com.genersoft.iot.vmp.common.InviteInfo; import com.genersoft.iot.vmp.common.InviteInfo;
import com.genersoft.iot.vmp.common.InviteSessionType; import com.genersoft.iot.vmp.common.InviteSessionType;
import com.genersoft.iot.vmp.common.StreamInfo;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.conf.exception.SsrcTransactionNotFoundException; import com.genersoft.iot.vmp.conf.exception.SsrcTransactionNotFoundException;
import com.genersoft.iot.vmp.gb28181.bean.Device; import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.bean.InviteStreamType; import com.genersoft.iot.vmp.gb28181.bean.InviteStreamType;
import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem; import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
import com.genersoft.iot.vmp.gb28181.bean.SsrcTransaction; import com.genersoft.iot.vmp.gb28181.bean.SsrcTransaction;
import com.genersoft.iot.vmp.gb28181.session.SSRCFactory; import com.genersoft.iot.vmp.gb28181.session.SSRCFactory;
import com.genersoft.iot.vmp.gb28181.bean.*;
import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager; import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager;
import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver; import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander; import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander;
@ -15,9 +18,11 @@ import com.genersoft.iot.vmp.gb28181.transmit.event.request.ISIPRequestProcessor
import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent; import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent;
import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory; import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.service.IDeviceChannelService;
import com.genersoft.iot.vmp.service.IDeviceService; import com.genersoft.iot.vmp.service.IDeviceService;
import com.genersoft.iot.vmp.service.IInviteStreamService; import com.genersoft.iot.vmp.service.IInviteStreamService;
import com.genersoft.iot.vmp.service.IMediaServerService; import com.genersoft.iot.vmp.service.IMediaServerService;
import com.genersoft.iot.vmp.service.IPlatformService;
import com.genersoft.iot.vmp.service.bean.MessageForPushChannel; import com.genersoft.iot.vmp.service.bean.MessageForPushChannel;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.IVideoManagerStorage; import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
@ -32,10 +37,10 @@ import javax.sip.InvalidArgumentException;
import javax.sip.RequestEvent; import javax.sip.RequestEvent;
import javax.sip.SipException; import javax.sip.SipException;
import javax.sip.address.SipURI; import javax.sip.address.SipURI;
import javax.sip.InvalidArgumentException;
import javax.sip.RequestEvent;
import javax.sip.SipException;
import javax.sip.header.CallIdHeader; import javax.sip.header.CallIdHeader;
import javax.sip.header.FromHeader;
import javax.sip.header.HeaderAddress;
import javax.sip.header.ToHeader;
import javax.sip.message.Response; import javax.sip.message.Response;
import java.text.ParseException; import java.text.ParseException;
import java.util.HashMap; import java.util.HashMap;
@ -59,9 +64,15 @@ public class ByeRequestProcessor extends SIPRequestProcessorParent implements In
@Autowired @Autowired
private IInviteStreamService inviteStreamService; private IInviteStreamService inviteStreamService;
@Autowired
private IPlatformService platformService;
@Autowired @Autowired
private IDeviceService deviceService; private IDeviceService deviceService;
@Autowired
private IDeviceChannelService channelService;
@Autowired @Autowired
private IVideoManagerStorage storager; private IVideoManagerStorage storager;
@ -80,6 +91,9 @@ public class ByeRequestProcessor extends SIPRequestProcessorParent implements In
@Autowired @Autowired
private VideoStreamSessionManager streamSession; private VideoStreamSessionManager streamSession;
@Autowired
private UserSetting userSetting;
@Override @Override
public void afterPropertiesSet() throws Exception { public void afterPropertiesSet() throws Exception {
// 添加消息处理的订阅 // 添加消息处理的订阅
@ -92,18 +106,18 @@ public class ByeRequestProcessor extends SIPRequestProcessorParent implements In
*/ */
@Override @Override
public void process(RequestEvent evt) { public void process(RequestEvent evt) {
SIPRequest request = (SIPRequest) evt.getRequest();
try { try {
responseAck((SIPRequest) evt.getRequest(), Response.OK); responseAck(request, Response.OK);
} catch (SipException | InvalidArgumentException | ParseException e) { } catch (SipException | InvalidArgumentException | ParseException e) {
logger.error("[回复BYE信息失败]{}", e.getMessage()); logger.error("[回复BYE信息失败]{}", e.getMessage());
} }
CallIdHeader callIdHeader = (CallIdHeader)evt.getRequest().getHeader(CallIdHeader.NAME); CallIdHeader callIdHeader = (CallIdHeader)evt.getRequest().getHeader(CallIdHeader.NAME);
String platformGbId = ((SipURI) ((HeaderAddress) evt.getRequest().getHeader(FromHeader.NAME)).getAddress().getURI()).getUser();
String channelId = ((SipURI) ((HeaderAddress) evt.getRequest().getHeader(ToHeader.NAME)).getAddress().getURI()).getUser(); SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(null, null, null, callIdHeader.getCallId());
SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(platformGbId, channelId, null, callIdHeader.getCallId());
logger.info("[收到bye] {}/{}", platformGbId, channelId);
if (sendRtpItem != null){ if (sendRtpItem != null){
logger.info("[收到bye] 来自平台{} 停止通道:{}", sendRtpItem.getPlatformId(), sendRtpItem.getChannelId());
String streamId = sendRtpItem.getStreamId(); String streamId = sendRtpItem.getStreamId();
Map<String, Object> param = new HashMap<>(); Map<String, Object> param = new HashMap<>();
param.put("vhost","__defaultVhost__"); param.put("vhost","__defaultVhost__");
@ -112,9 +126,22 @@ public class ByeRequestProcessor extends SIPRequestProcessorParent implements In
param.put("ssrc",sendRtpItem.getSsrc()); param.put("ssrc",sendRtpItem.getSsrc());
logger.info("[收到bye] 停止向上级推流:{}", streamId); logger.info("[收到bye] 停止向上级推流:{}", streamId);
MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId()); MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());
redisCatchStorage.deleteSendRTPServer(platformGbId, channelId, callIdHeader.getCallId(), null); redisCatchStorage.deleteSendRTPServer(sendRtpItem.getPlatformId(), sendRtpItem.getChannelId(),
ssrcFactory.releaseSsrc(sendRtpItem.getMediaServerId(), sendRtpItem.getSsrc()); callIdHeader.getCallId(), null);
zlmrtpServerFactory.stopSendRtpStream(mediaInfo, param); zlmrtpServerFactory.stopSendRtpStream(mediaInfo, param);
if (sendRtpItem.getPlayType().equals(InviteStreamType.PUSH)) {
ParentPlatform platform = platformService.queryPlatformByServerGBId(sendRtpItem.getPlatformId());
if (platform != null) {
MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(0,
sendRtpItem.getApp(), sendRtpItem.getStreamId(), sendRtpItem.getChannelId(),
sendRtpItem.getPlatformId(), platform.getName(), userSetting.getServerId(), sendRtpItem.getMediaServerId());
messageForPushChannel.setPlatFormIndex(platform.getId());
redisCatchStorage.sendPlatformStopPlayMsg(messageForPushChannel);
}else {
logger.info("[上级平台停止观看] 未找到平台{}的信息发送redis消息失败", sendRtpItem.getPlatformId());
}
}
int totalReaderCount = zlmrtpServerFactory.totalReaderCount(mediaInfo, sendRtpItem.getApp(), streamId); int totalReaderCount = zlmrtpServerFactory.totalReaderCount(mediaInfo, sendRtpItem.getApp(), streamId);
if (totalReaderCount <= 0) { if (totalReaderCount <= 0) {
logger.info("[收到bye] {} 无其它观看者,通知设备停止推流", streamId); logger.info("[收到bye] {} 无其它观看者,通知设备停止推流", streamId);
@ -124,61 +151,46 @@ public class ByeRequestProcessor extends SIPRequestProcessorParent implements In
logger.info("[收到bye] {} 通知设备停止推流时未找到设备信息", streamId); logger.info("[收到bye] {} 通知设备停止推流时未找到设备信息", streamId);
} }
try { try {
logger.warn("[停止点播] {}/{}", sendRtpItem.getDeviceId(), channelId); logger.warn("[停止点播] {}/{}", sendRtpItem.getDeviceId(), sendRtpItem.getChannelId());
cmder.streamByeCmd(device, channelId, streamId, null); cmder.streamByeCmd(device, sendRtpItem.getChannelId(), streamId, null);
} catch (InvalidArgumentException | ParseException | SipException | } catch (InvalidArgumentException | ParseException | SipException |
SsrcTransactionNotFoundException e) { SsrcTransactionNotFoundException e) {
logger.error("[收到bye] {} 无其它观看者,通知设备停止推流, 发送BYE失败 {}",streamId, e.getMessage()); logger.error("[收到bye] {} 无其它观看者,通知设备停止推流, 发送BYE失败 {}",streamId, e.getMessage());
} }
} }
if (sendRtpItem.getPlayType().equals(InviteStreamType.PUSH)) {
MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(0,
sendRtpItem.getApp(), sendRtpItem.getStreamId(), sendRtpItem.getChannelId(),
sendRtpItem.getPlatformId(), null, null, sendRtpItem.getMediaServerId());
redisCatchStorage.sendStreamPushRequestedMsg(messageForPushChannel);
} }
}else {
// 可能是设备发送的停止
SsrcTransaction ssrcTransaction = streamSession.getSsrcTransaction(null, null, callIdHeader.getCallId(), null);
if (ssrcTransaction == null) {
logger.info("[收到bye] 但是无法获取推流信息和发流信息,忽略此请求");
logger.info(request.toString());
return;
} }
} logger.info("[收到bye] 来自设备:{}, 通道已停止推流: {}", ssrcTransaction.getDeviceId(), ssrcTransaction.getChannelId());
// 可能是设备主动停止
Device device = storager.queryVideoDeviceByChannelId(platformGbId);
if (device != null) {
storager.stopPlay(device.getDeviceId(), channelId);
SsrcTransaction ssrcTransactionForPlay = streamSession.getSsrcTransaction(device.getDeviceId(), channelId, "play", null);
if (ssrcTransactionForPlay != null){
if (ssrcTransactionForPlay.getCallId().equals(callIdHeader.getCallId())){
// 释放ssrc
MediaServerItem mediaServerItem = mediaServerService.getOne(ssrcTransactionForPlay.getMediaServerId());
if (mediaServerItem != null) {
mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcTransactionForPlay.getSsrc());
}
streamSession.remove(device.getDeviceId(), channelId, ssrcTransactionForPlay.getStream());
}
InviteInfo inviteInfo = inviteStreamService.getInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, device.getDeviceId(), channelId);
inviteStreamService.removeInviteInfo(inviteInfo);
if (inviteInfo != null) {
if (inviteInfo.getStreamInfo() != null) {
mediaServerService.closeRTPServer(inviteInfo.getStreamInfo().getMediaServerId(), inviteInfo.getStream());
}
}
}
SsrcTransaction ssrcTransactionForPlayBack = streamSession.getSsrcTransaction(device.getDeviceId(), channelId, callIdHeader.getCallId(), null);
if (ssrcTransactionForPlayBack != null) {
// 释放ssrc
MediaServerItem mediaServerItem = mediaServerService.getOne(ssrcTransactionForPlayBack.getMediaServerId());
if (mediaServerItem != null) {
mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcTransactionForPlayBack.getSsrc());
}
streamSession.remove(device.getDeviceId(), channelId, ssrcTransactionForPlayBack.getStream());
InviteInfo inviteInfo = inviteStreamService.getInviteInfoByDeviceAndChannel(InviteSessionType.PLAYBACK, device.getDeviceId(), channelId);
if (inviteInfo != null) { Device device = deviceService.getDevice(ssrcTransaction.getDeviceId());
inviteStreamService.removeInviteInfo(inviteInfo); if (device == null) {
if (inviteInfo.getStreamInfo() != null) { logger.info("[收到bye] 未找到设备:{} ", ssrcTransaction.getDeviceId());
mediaServerService.closeRTPServer(inviteInfo.getStreamInfo().getMediaServerId(), inviteInfo.getStream()); return;
} }
DeviceChannel channel = channelService.getOne(ssrcTransaction.getDeviceId(), ssrcTransaction.getChannelId());
if (channel == null) {
logger.info("[收到bye] 未找到通道,设备:{} 通道:{}", ssrcTransaction.getDeviceId(), ssrcTransaction.getChannelId());
return;
} }
storager.stopPlay(device.getDeviceId(), channel.getChannelId());
StreamInfo streamInfo = redisCatchStorage.queryPlayByDevice(device.getDeviceId(), channel.getChannelId());
if (streamInfo != null) {
redisCatchStorage.stopPlay(streamInfo);
mediaServerService.closeRTPServer(streamInfo.getMediaServerId(), streamInfo.getStream());
} }
// 释放ssrc
MediaServerItem mediaServerItem = mediaServerService.getOne(ssrcTransaction.getMediaServerId());
if (mediaServerItem != null) {
mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcTransaction.getSsrc());
}
streamSession.remove(device.getDeviceId(), channel.getChannelId(), ssrcTransaction.getStream());
} }
} }
} }

View File

@ -181,16 +181,11 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
return; return;
} else { } else {
streamPushItem = streamPushService.getPush(gbStream.getApp(), gbStream.getStream()); streamPushItem = streamPushService.getPush(gbStream.getApp(), gbStream.getStream());
if (streamPushItem == null || streamPushItem.getServerId().equals(userSetting.getServerId())) { if (streamPushItem != null) {
logger.info("[ app={}, stream={} ]找不到zlm {}返回410", gbStream.getApp(), gbStream.getStream(), mediaServerId); mediaServerItem = mediaServerService.getOne(streamPushItem.getMediaServerId());
try {
responseAck(request, Response.GONE);
} catch (SipException | InvalidArgumentException | ParseException e) {
logger.error("[命令发送失败] invite GONE: {}", e.getMessage());
} }
return; if (mediaServerItem == null) {
}else { mediaServerItem = mediaServerService.getDefaultMediaServer();
// TODO 可能漏回复消息
} }
} }
} else { } else {
@ -351,7 +346,9 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
} }
logger.info("[上级Invite] {}, 平台:{} 通道:{}, 收流地址:{}:{},收流方式:{}, ssrc{}", sessionName, username, channelId, addressStr, port, streamTypeStr, ssrc); logger.info("[上级Invite] {}, 平台:{} 通道:{}, 收流地址:{}:{},收流方式:{}, ssrc{}", sessionName, username, channelId, addressStr, port, streamTypeStr, ssrc);
SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(mediaServerItem, addressStr, port, ssrc, requesterId, SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(mediaServerItem, addressStr, port, ssrc, requesterId,
device.getDeviceId(), channelId, mediaTransmissionTCP, platform.isRtcp()); device.getDeviceId(), channelId, mediaTransmissionTCP, platform.isRtcp(), ssrcFromCallback -> {
return redisCatchStorage.querySendRTPServer(platform.getServerGBId(), channelId, null, callIdHeader.getCallId()) != null;
});
if (tcpActive != null) { if (tcpActive != null) {
sendRtpItem.setTcpActive(tcpActive); sendRtpItem.setTcpActive(tcpActive);
@ -557,7 +554,9 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
if (streamReady != null && streamReady) { if (streamReady != null && streamReady) {
// 自平台内容 // 自平台内容
SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(mediaServerItem, addressStr, port, ssrc, requesterId, SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(mediaServerItem, addressStr, port, ssrc, requesterId,
gbStream.getApp(), gbStream.getStream(), channelId, mediaTransmissionTCP, platform.isRtcp()); gbStream.getApp(), gbStream.getStream(), channelId, mediaTransmissionTCP, platform.isRtcp(), ssrcFromCallback ->{
return redisCatchStorage.querySendRTPServer(platform.getServerGBId(), channelId, null, callIdHeader.getCallId()) != null;
});
if (sendRtpItem == null) { if (sendRtpItem == null) {
logger.warn("服务器端口资源不足"); logger.warn("服务器端口资源不足");
@ -596,7 +595,9 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
if (streamReady != null && streamReady) { if (streamReady != null && streamReady) {
// 自平台内容 // 自平台内容
SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(mediaServerItem, addressStr, port, ssrc, requesterId, SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(mediaServerItem, addressStr, port, ssrc, requesterId,
gbStream.getApp(), gbStream.getStream(), channelId, mediaTransmissionTCP, platform.isRtcp()); gbStream.getApp(), gbStream.getStream(), channelId, mediaTransmissionTCP, platform.isRtcp(), ssrcFromCallback ->{
return redisCatchStorage.querySendRTPServer(platform.getServerGBId(), channelId, null, callIdHeader.getCallId()) != null;
});
if (sendRtpItem == null) { if (sendRtpItem == null) {
logger.warn("服务器端口资源不足"); logger.warn("服务器端口资源不足");
@ -712,7 +713,9 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
dynamicTask.stop(callIdHeader.getCallId()); dynamicTask.stop(callIdHeader.getCallId());
if (serverId.equals(userSetting.getServerId())) { if (serverId.equals(userSetting.getServerId())) {
SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(mediaServerItem, addressStr, finalPort, ssrc, requesterId, SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(mediaServerItem, addressStr, finalPort, ssrc, requesterId,
app, stream, channelId, mediaTransmissionTCP, platform.isRtcp()); app, stream, channelId, mediaTransmissionTCP, platform.isRtcp(), ssrcFromCallback -> {
return redisCatchStorage.querySendRTPServer(platform.getServerGBId(), channelId, null, callIdHeader.getCallId()) != null;
});
if (sendRtpItem == null) { if (sendRtpItem == null) {
logger.warn("上级点时创建sendRTPItem失败可能是服务器端口资源不足"); logger.warn("上级点时创建sendRTPItem失败可能是服务器端口资源不足");

View File

@ -22,6 +22,7 @@ import com.genersoft.iot.vmp.media.zlm.dto.StreamAuthorityInfo;
import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem; import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem;
import com.genersoft.iot.vmp.media.zlm.dto.hook.*; import com.genersoft.iot.vmp.media.zlm.dto.hook.*;
import com.genersoft.iot.vmp.service.*; import com.genersoft.iot.vmp.service.*;
import com.genersoft.iot.vmp.service.bean.MessageForPushChannel;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.IVideoManagerStorage; import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
import com.genersoft.iot.vmp.vmanager.bean.ErrorCode; import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
@ -468,6 +469,13 @@ public class ZLMHttpHookListener {
} }
redisCatchStorage.deleteSendRTPServer(parentPlatform.getServerGBId(), sendRtpItem.getChannelId(), redisCatchStorage.deleteSendRTPServer(parentPlatform.getServerGBId(), sendRtpItem.getChannelId(),
sendRtpItem.getCallId(), sendRtpItem.getStreamId()); sendRtpItem.getCallId(), sendRtpItem.getStreamId());
if (InviteStreamType.PUSH == sendRtpItem.getPlayType()) {
MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(0,
sendRtpItem.getApp(), sendRtpItem.getStreamId(), sendRtpItem.getChannelId(),
sendRtpItem.getPlatformId(), parentPlatform.getName(), userSetting.getServerId(), sendRtpItem.getMediaServerId());
messageForPushChannel.setPlatFormIndex(parentPlatform.getId());
redisCatchStorage.sendPlatformStopPlayMsg(messageForPushChannel);
}
} }
} }
} }
@ -513,7 +521,7 @@ public class ZLMHttpHookListener {
} }
return ret; return ret;
} }
// 推流具有主动性,暂时不做处理 // TODO 推流具有主动性,暂时不做处理
// StreamPushItem streamPushItem = streamPushService.getPush(app, streamId); // StreamPushItem streamPushItem = streamPushService.getPush(app, streamId);
// if (streamPushItem != null) { // if (streamPushItem != null) {
// // TODO 发送停止 // // TODO 发送停止

View File

@ -221,13 +221,14 @@ public class ZLMRTPServerFactory {
* @param tcp tcp * @param tcp tcp
* @return SendRtpItem * @return SendRtpItem
*/ */
public SendRtpItem createSendRtpItem(MediaServerItem serverItem, String ip, int port, String ssrc, String platformId, String deviceId, String channelId, boolean tcp, boolean rtcp){ public SendRtpItem createSendRtpItem(MediaServerItem serverItem, String ip, int port, String ssrc, String platformId,
String deviceId, String channelId, boolean tcp, boolean rtcp, KeepPortCallback callback){
// 默认为随机端口 // 默认为随机端口
int localPort = 0; int localPort = 0;
if (userSetting.getGbSendStreamStrict()) { if (userSetting.getGbSendStreamStrict()) {
if (userSetting.getGbSendStreamStrict()) { if (userSetting.getGbSendStreamStrict()) {
localPort = keepPort(serverItem, ssrc, localPort); localPort = keepPort(serverItem, ssrc, localPort, callback);
if (localPort == 0) { if (localPort == 0) {
return null; return null;
} }
@ -259,11 +260,12 @@ public class ZLMRTPServerFactory {
* @param tcp tcp * @param tcp tcp
* @return SendRtpItem * @return SendRtpItem
*/ */
public SendRtpItem createSendRtpItem(MediaServerItem serverItem, String ip, int port, String ssrc, String platformId, String app, String stream, String channelId, boolean tcp, boolean rtcp){ public SendRtpItem createSendRtpItem(MediaServerItem serverItem, String ip, int port, String ssrc, String platformId,
String app, String stream, String channelId, boolean tcp, boolean rtcp, KeepPortCallback callback){
// 默认为随机端口 // 默认为随机端口
int localPort = 0; int localPort = 0;
if (userSetting.getGbSendStreamStrict()) { if (userSetting.getGbSendStreamStrict()) {
localPort = keepPort(serverItem, ssrc, localPort); localPort = keepPort(serverItem, ssrc, localPort, callback);
if (localPort == 0) { if (localPort == 0) {
return null; return null;
} }
@ -284,10 +286,14 @@ public class ZLMRTPServerFactory {
return sendRtpItem; return sendRtpItem;
} }
public interface KeepPortCallback{
Boolean keep(String ssrc);
}
/** /**
* *
*/ */
public int keepPort(MediaServerItem serverItem, String ssrc, Integer localPort) { public int keepPort(MediaServerItem serverItem, String ssrc, int localPort, KeepPortCallback keepPortCallback) {
Map<String, Object> param = new HashMap<>(3); Map<String, Object> param = new HashMap<>(3);
param.put("port", localPort); param.put("port", localPort);
param.put("enable_tcp", 1); param.put("enable_tcp", 1);
@ -296,18 +302,20 @@ public class ZLMRTPServerFactory {
if (jsonObject.getInteger("code") == 0) { if (jsonObject.getInteger("code") == 0) {
localPort = jsonObject.getInteger("port"); localPort = jsonObject.getInteger("port");
HookSubscribeForRtpServerTimeout hookSubscribeForRtpServerTimeout = HookSubscribeFactory.on_rtp_server_timeout(ssrc, null, serverItem.getId()); HookSubscribeForRtpServerTimeout hookSubscribeForRtpServerTimeout = HookSubscribeFactory.on_rtp_server_timeout(ssrc, null, serverItem.getId());
// 订阅 zlm启动事件, 新的zlm也会从这里进入系统
Integer finalLocalPort = localPort; Integer finalLocalPort = localPort;
hookSubscribe.addSubscribe(hookSubscribeForRtpServerTimeout, hookSubscribe.addSubscribe(hookSubscribeForRtpServerTimeout,
(MediaServerItem mediaServerItem, HookParam hookParam)->{ (MediaServerItem mediaServerItem, HookParam hookParam)->{
logger.info("[上级点播] {}->监听端口到期继续保持监听: {}", ssrc, finalLocalPort); logger.info("[上级点播] {}->监听端口到期继续保持监听: {}", ssrc, finalLocalPort);
OnRtpServerTimeoutHookParam rtpServerTimeoutHookParam = (OnRtpServerTimeoutHookParam) hookParam; OnRtpServerTimeoutHookParam rtpServerTimeoutHookParam = (OnRtpServerTimeoutHookParam) hookParam;
if (!ssrc.equals(rtpServerTimeoutHookParam.getSsrc())) { if (ssrc.equals(rtpServerTimeoutHookParam.getSsrc())) {
return; if (keepPortCallback.keep(ssrc)) {
logger.info("[上级点播] {}->监听端口到期继续保持监听", ssrc);
keepPort(serverItem, ssrc, finalLocalPort, keepPortCallback);
}else {
logger.info("[上级点播] {}->发送取消,无需继续监听", ssrc);
releasePort(serverItem, ssrc);
} }
int port = keepPort(serverItem, ssrc, finalLocalPort);
if (port == 0) {
logger.info("[上级点播] {}->监听端口失败,移除监听", ssrc);
hookSubscribe.removeSubscribe(hookSubscribeForRtpServerTimeout);
} }
}); });
logger.info("[上级点播] {}->监听端口: {}", ssrc, localPort); logger.info("[上级点播] {}->监听端口: {}", ssrc, localPort);

View File

@ -1,7 +1,5 @@
package com.genersoft.iot.vmp.service.bean; package com.genersoft.iot.vmp.service.bean;
import java.util.stream.Stream;
/** /**
* *
* @author lin * @author lin
@ -29,10 +27,15 @@ public class MessageForPushChannel {
private String gbId; private String gbId;
/** /**
* ID *
*/ */
private String platFormId; private String platFormId;
/**
* ID
*/
private int platFormIndex;
/** /**
* *
*/ */
@ -128,4 +131,12 @@ public class MessageForPushChannel {
public void setMediaServerId(String mediaServerId) { public void setMediaServerId(String mediaServerId) {
this.mediaServerId = mediaServerId; this.mediaServerId = mediaServerId;
} }
public int getPlatFormIndex() {
return platFormIndex;
}
public void setPlatFormIndex(int platFormIndex) {
this.platFormIndex = platFormIndex;
}
} }

View File

@ -105,6 +105,7 @@ public class PlatformServiceImpl implements IPlatformService {
// 行政区划默认去编号的前6位 // 行政区划默认去编号的前6位
parentPlatform.setAdministrativeDivision(parentPlatform.getServerGBId().substring(0,6)); parentPlatform.setAdministrativeDivision(parentPlatform.getServerGBId().substring(0,6));
} }
parentPlatform.setTreeType("CivilCode");
parentPlatform.setCatalogId(parentPlatform.getDeviceGBId()); parentPlatform.setCatalogId(parentPlatform.getDeviceGBId());
int result = platformMapper.addParentPlatform(parentPlatform); int result = platformMapper.addParentPlatform(parentPlatform);
// 添加缓存 // 添加缓存

View File

@ -13,6 +13,7 @@ import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.service.IMediaServerService; import com.genersoft.iot.vmp.service.IMediaServerService;
import com.genersoft.iot.vmp.service.bean.*; import com.genersoft.iot.vmp.service.bean.*;
import com.genersoft.iot.vmp.utils.redis.RedisUtil;
import com.genersoft.iot.vmp.vmanager.bean.WVPResult; import com.genersoft.iot.vmp.vmanager.bean.WVPResult;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -26,6 +27,7 @@ import org.springframework.stereotype.Component;
import java.text.ParseException; import java.text.ParseException;
import java.util.HashMap; import java.util.HashMap;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
@ -127,6 +129,7 @@ public class RedisGbPlayMsgListener implements MessageListener {
case WvpRedisMsgCmd.REQUEST_PUSH_STREAM: case WvpRedisMsgCmd.REQUEST_PUSH_STREAM:
RequestPushStreamMsg param = JSON.to(RequestPushStreamMsg.class, wvpRedisMsg.getContent()); RequestPushStreamMsg param = JSON.to(RequestPushStreamMsg.class, wvpRedisMsg.getContent());
requestPushStreamMsgHand(param, wvpRedisMsg.getFromId(), wvpRedisMsg.getSerial()); requestPushStreamMsgHand(param, wvpRedisMsg.getFromId(), wvpRedisMsg.getSerial());
break; break;
default: default:
break; break;
@ -311,7 +314,9 @@ public class RedisGbPlayMsgListener implements MessageListener {
SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(mediaServerItem, content.getIp(), SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(mediaServerItem, content.getIp(),
content.getPort(), content.getSsrc(), content.getPlatformId(), content.getPort(), content.getSsrc(), content.getPlatformId(),
content.getApp(), content.getStream(), content.getChannelId(), content.getApp(), content.getStream(), content.getChannelId(),
content.getTcp(), content.getRtcp()); content.getTcp(), content.getRtcp(), ssrcFromCallback -> {
return querySendRTPServer(content.getPlatformId(), content.getChannelId(), content.getStream(), null) != null;
});
WVPResult<ResponseSendItemMsg> result = new WVPResult<>(); WVPResult<ResponseSendItemMsg> result = new WVPResult<>();
result.setCode(0); result.setCode(0);
@ -388,4 +393,31 @@ public class RedisGbPlayMsgListener implements MessageListener {
}); });
redisTemplate.convertAndSend(WVP_PUSH_STREAM_KEY, jsonObject); redisTemplate.convertAndSend(WVP_PUSH_STREAM_KEY, jsonObject);
} }
private SendRtpItem querySendRTPServer(String platformGbId, String channelId, String streamId, String callId) {
if (platformGbId == null) {
platformGbId = "*";
}
if (channelId == null) {
channelId = "*";
}
if (streamId == null) {
streamId = "*";
}
if (callId == null) {
callId = "*";
}
String key = VideoManagerConstants.PLATFORM_SEND_RTP_INFO_PREFIX
+ userSetting.getServerId() + "_*_"
+ platformGbId + "_"
+ channelId + "_"
+ streamId + "_"
+ callId;
List<Object> scan = RedisUtil.scan(redisTemplate, key);
if (scan.size() > 0) {
return (SendRtpItem)redisTemplate.opsForValue().get(scan.get(0));
}else {
return null;
}
}
} }

View File

@ -0,0 +1,120 @@
package com.genersoft.iot.vmp.service.redisMsg;
import com.alibaba.fastjson2.JSON;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.gb28181.bean.InviteStreamType;
import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform;
import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform;
import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem;
import com.genersoft.iot.vmp.service.IMediaServerService;
import com.genersoft.iot.vmp.service.IStreamPushService;
import com.genersoft.iot.vmp.service.bean.MessageForPushChannel;
import com.genersoft.iot.vmp.service.bean.MessageForPushChannelResponse;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.stereotype.Component;
import javax.sip.InvalidArgumentException;
import javax.sip.SipException;
import java.text.ParseException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* redis
* @author lin
*/
@Component
public class RedisPushStreamCloseResponseListener implements MessageListener {
private final static Logger logger = LoggerFactory.getLogger(RedisPushStreamCloseResponseListener.class);
@Autowired
private IStreamPushService streamPushService;
@Autowired
private IRedisCatchStorage redisCatchStorage;
@Autowired
private IVideoManagerStorage storager;
@Autowired
private ISIPCommanderForPlatform commanderFroPlatform;
@Autowired
private UserSetting userSetting;
@Autowired
private IMediaServerService mediaServerService;
@Autowired
private ZLMRTPServerFactory zlmrtpServerFactory;
private Map<String, PushStreamResponseEvent> responseEvents = new ConcurrentHashMap<>();
public interface PushStreamResponseEvent{
void run(MessageForPushChannelResponse response);
}
@Override
public void onMessage(Message message, byte[] bytes) {
logger.info("[REDIS消息-推流结束] {}", new String(message.getBody()));
MessageForPushChannel pushChannel = JSON.parseObject(message.getBody(), MessageForPushChannel.class);
StreamPushItem push = streamPushService.getPush(pushChannel.getApp(), pushChannel.getStream());
if (push != null) {
if (redisCatchStorage.isChannelSendingRTP(push.getGbId())) {
List<SendRtpItem> sendRtpItems = redisCatchStorage.querySendRTPServerByChnnelId(
push.getGbId());
if (sendRtpItems.size() > 0) {
for (SendRtpItem sendRtpItem : sendRtpItems) {
ParentPlatform parentPlatform = storager.queryParentPlatByServerGBId(sendRtpItem.getPlatformId());
// 停止向上级推流
String streamId = sendRtpItem.getStreamId();
Map<String, Object> param = new HashMap<>();
param.put("vhost","__defaultVhost__");
param.put("app",sendRtpItem.getApp());
param.put("stream",streamId);
param.put("ssrc",sendRtpItem.getSsrc());
logger.info("[REDIS消息-推流结束] 停止向上级推流:{}", streamId);
MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());
redisCatchStorage.deleteSendRTPServer(sendRtpItem.getPlatformId(), sendRtpItem.getChannelId(), sendRtpItem.getCallId(), sendRtpItem.getStreamId());
zlmrtpServerFactory.stopSendRtpStream(mediaInfo, param);
try {
commanderFroPlatform.streamByeCmd(parentPlatform, sendRtpItem);
} catch (SipException | InvalidArgumentException | ParseException e) {
logger.error("[命令发送失败] 国标级联 发送BYE: {}", e.getMessage());
}
if (InviteStreamType.PUSH == sendRtpItem.getPlayType()) {
MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(0,
sendRtpItem.getApp(), sendRtpItem.getStreamId(), sendRtpItem.getChannelId(),
sendRtpItem.getPlatformId(), parentPlatform.getName(), userSetting.getServerId(), sendRtpItem.getMediaServerId());
messageForPushChannel.setPlatFormIndex(parentPlatform.getId());
redisCatchStorage.sendPlatformStopPlayMsg(messageForPushChannel);
}
}
}
}
}
}
public void addEvent(String app, String stream, PushStreamResponseEvent callback) {
responseEvents.put(app + stream, callback);
}
public void removeEvent(String app, String stream) {
responseEvents.remove(app + stream);
}
}

View File

@ -202,5 +202,10 @@ public interface IRedisCatchStorage {
void removeAllDevice(); void removeAllDevice();
void sendDeviceOrChannelStatus(String deviceId, String channelId, boolean online); void sendDeviceOrChannelStatus(String deviceId, String channelId, boolean online);
void sendChannelAddOrDelete(String deviceId, String channelId, boolean add); void sendChannelAddOrDelete(String deviceId, String channelId, boolean add);
void sendPlatformStartPlayMsg(MessageForPushChannel messageForPushChannel);
void sendPlatformStopPlayMsg(MessageForPushChannel messageForPushChannel);
} }

View File

@ -622,4 +622,18 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage {
// 使用 RedisTemplate<Object, Object> 发送字符串消息会导致发送的消息多带了双引号 // 使用 RedisTemplate<Object, Object> 发送字符串消息会导致发送的消息多带了双引号
stringRedisTemplate.convertAndSend(key, msg.toString()); stringRedisTemplate.convertAndSend(key, msg.toString());
} }
@Override
public void sendPlatformStartPlayMsg(MessageForPushChannel msg) {
String key = VideoManagerConstants.VM_MSG_STREAM_START_PLAY_NOTIFY;
logger.info("[redis发送通知] 推流被上级平台观看 {}: {}/{}->{}", key, msg.getApp(), msg.getStream(), msg.getPlatFormId());
redisTemplate.convertAndSend(key, JSON.toJSON(msg));
}
@Override
public void sendPlatformStopPlayMsg(MessageForPushChannel msg) {
String key = VideoManagerConstants.VM_MSG_STREAM_STOP_PLAY_NOTIFY;
logger.info("[redis发送通知] 上级平台停止观看 {}: {}/{}->{}", key, msg.getApp(), msg.getStream(), msg.getPlatFormId());
redisTemplate.convertAndSend(key, JSON.toJSON(msg));
}
} }

View File

@ -97,6 +97,9 @@ public class PtzController {
cmdCode = 32; cmdCode = 32;
break; break;
case "stop": case "stop":
horizonSpeed = 0;
verticalSpeed = 0;
zoomSpeed = 0;
break; break;
default: default:
break; break;

View File

@ -0,0 +1,117 @@
package com.genersoft.iot.vmp.vmanager.rtp;
import com.genersoft.iot.vmp.conf.SipConfig;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.conf.VersionInfo;
import com.genersoft.iot.vmp.conf.exception.ControllerException;
import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.service.*;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.Parameter;
import io.swagger.v3.oas.annotations.tags.Tag;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.bind.annotation.RestController;
@SuppressWarnings("rawtypes")
@Tag(name = "第三方服务对接")
@RestController
@RequestMapping("/api/rtp")
public class RtpController {
@Autowired
private ZlmHttpHookSubscribe zlmHttpHookSubscribe;
@Autowired
private IMediaServerService mediaServerService;
@Autowired
private VersionInfo versionInfo;
@Autowired
private SipConfig sipConfig;
@Autowired
private UserSetting userSetting;
@Autowired
private IDeviceService deviceService;
@Autowired
private IDeviceChannelService channelService;
@Autowired
private IStreamPushService pushService;
@Autowired
private IStreamProxyService proxyService;
@Value("${server.port}")
private int serverPort;
@Autowired
private IRedisCatchStorage redisCatchStorage;
@GetMapping(value = "/receive/open")
@ResponseBody
@Operation(summary = "开启收流和获取发流信息")
@Parameter(name = "isSend", description = "是否发送false时只开启收流 true同时返回推流信息", required = true)
@Parameter(name = "callId", description = "整个过程的唯一标识,为了与后续接口关联", required = true)
@Parameter(name = "ssrc", description = "来源流的SSRC不传则不校验来源ssrc", required = false)
@Parameter(name = "stream", description = "形成的流的ID", required = true)
@Parameter(name = "tcpMode", description = "收流模式, 0为UDP 1为TCP被动", required = true)
@Parameter(name = "callBack", description = "回调地址如果收流超时会通道回调通知回调为get请求参数为callId", required = true)
public SendRtpItem openRtpServer(Boolean isSend, String ssrc, String callId, String stream, Integer tcpMode, String callBack) {
MediaServerItem mediaServerItem = mediaServerService.getMediaServerForMinimumLoad(null);
if (mediaServerItem == null) {
throw new ControllerException(ErrorCode.ERROR100.getCode(),"没有可用的MediaServer");
}
return null;
}
@GetMapping(value = "/receive/close")
@ResponseBody
@Operation(summary = "关闭收流")
@Parameter(name = "stream", description = "流的ID", required = true)
public void closeRtpServer(String stream) {
}
@GetMapping(value = "/send/start")
@ResponseBody
@Operation(summary = "发送流")
@Parameter(name = "ssrc", description = "发送流的SSRC", required = true)
@Parameter(name = "ip", description = "目标IP", required = true)
@Parameter(name = "port", description = "目标端口", required = true)
@Parameter(name = "app", description = "待发送应用名", required = true)
@Parameter(name = "stream", description = "待发送流Id", required = true)
@Parameter(name = "callId", description = "整个过程的唯一标识,不传则使用随机端口发流", required = true)
@Parameter(name = "onlyAudio", description = "是否只有音频", required = true)
@Parameter(name = "streamType", description = "流类型1为es流2为ps流 默认es流", required = false)
public void sendRTP(String ssrc, String ip, Integer port, String app, String stream, String callId, Boolean onlyAudio, Integer streamType) {
}
@GetMapping(value = "/send/stop")
@ResponseBody
@Operation(summary = "关闭发送流")
@Parameter(name = "callId", description = "整个过程的唯一标识,不传则使用随机端口发流", required = true)
public void closeSendRTP(String callId) {
}
}