修复多wvp国标级联机制
parent
f357bece62
commit
bf6e09d231
|
@ -42,6 +42,9 @@ public class RedisMsgListenConfig {
|
||||||
@Autowired
|
@Autowired
|
||||||
private RedisRpcConfig redisRpcConfig;
|
private RedisRpcConfig redisRpcConfig;
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
private RedisPushStreamResponseListener redisPushStreamCloseResponseListener;
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* redis消息监听器容器 可以添加多个监听不同话题的redis监听器,只需要把消息监听器和相应的消息订阅处理器绑定,该消息监听器
|
* redis消息监听器容器 可以添加多个监听不同话题的redis监听器,只需要把消息监听器和相应的消息订阅处理器绑定,该消息监听器
|
||||||
|
@ -61,6 +64,7 @@ public class RedisMsgListenConfig {
|
||||||
container.addMessageListener(redisPushStreamListMsgListener, new PatternTopic(VideoManagerConstants.VM_MSG_PUSH_STREAM_LIST_CHANGE));
|
container.addMessageListener(redisPushStreamListMsgListener, new PatternTopic(VideoManagerConstants.VM_MSG_PUSH_STREAM_LIST_CHANGE));
|
||||||
container.addMessageListener(redisCloseStreamMsgListener, new PatternTopic(VideoManagerConstants.VM_MSG_STREAM_PUSH_CLOSE));
|
container.addMessageListener(redisCloseStreamMsgListener, new PatternTopic(VideoManagerConstants.VM_MSG_STREAM_PUSH_CLOSE));
|
||||||
container.addMessageListener(redisRpcConfig, new PatternTopic(RedisRpcConfig.REDIS_REQUEST_CHANNEL_KEY));
|
container.addMessageListener(redisRpcConfig, new PatternTopic(RedisRpcConfig.REDIS_REQUEST_CHANNEL_KEY));
|
||||||
|
container.addMessageListener(redisPushStreamCloseResponseListener, new PatternTopic(VideoManagerConstants.VM_MSG_STREAM_PUSH_RESPONSE));
|
||||||
return container;
|
return container;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -15,9 +15,7 @@ import com.genersoft.iot.vmp.gb28181.transmit.event.request.ISIPRequestProcessor
|
||||||
import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent;
|
import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent;
|
||||||
import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory;
|
import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory;
|
||||||
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
|
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
|
||||||
import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem;
|
|
||||||
import com.genersoft.iot.vmp.service.*;
|
import com.genersoft.iot.vmp.service.*;
|
||||||
import com.genersoft.iot.vmp.service.bean.MessageForPushChannel;
|
|
||||||
import com.genersoft.iot.vmp.service.redisMsg.IRedisRpcService;
|
import com.genersoft.iot.vmp.service.redisMsg.IRedisRpcService;
|
||||||
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
|
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
|
||||||
import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
|
import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
|
||||||
|
@ -135,29 +133,22 @@ public class ByeRequestProcessor extends SIPRequestProcessorParent implements In
|
||||||
logger.info("[收到bye] 停止推流:{}, 媒体节点: {}", streamId, sendRtpItem.getMediaServerId());
|
logger.info("[收到bye] 停止推流:{}, 媒体节点: {}", streamId, sendRtpItem.getMediaServerId());
|
||||||
|
|
||||||
if (sendRtpItem.getPlayType().equals(InviteStreamType.PUSH)) {
|
if (sendRtpItem.getPlayType().equals(InviteStreamType.PUSH)) {
|
||||||
// 查询这路流是否是本平台的
|
ParentPlatform platform = platformService.queryPlatformByServerGBId(sendRtpItem.getPlatformId());
|
||||||
StreamPushItem push = pushService.getPush(sendRtpItem.getApp(), sendRtpItem.getStream());
|
if (platform != null) {
|
||||||
if (!userSetting.getServerId().equals(sendRtpItem.getServerId())) {
|
redisCatchStorage.sendPlatformStopPlayMsg(sendRtpItem, platform);
|
||||||
redisRpcService.stopSendRtp(sendRtpItem);
|
if (!userSetting.getServerId().equals(sendRtpItem.getServerId())) {
|
||||||
}else {
|
redisRpcService.stopSendRtp(sendRtpItem);
|
||||||
MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());
|
|
||||||
redisCatchStorage.deleteSendRTPServer(sendRtpItem.getPlatformId(), sendRtpItem.getChannelId(),
|
|
||||||
callIdHeader.getCallId(), null);
|
|
||||||
zlmServerFactory.stopSendRtpStream(mediaInfo, param);
|
|
||||||
if (userSetting.getUseCustomSsrcForParentInvite()) {
|
|
||||||
mediaServerService.releaseSsrc(mediaInfo.getId(), sendRtpItem.getSsrc());
|
|
||||||
}
|
|
||||||
|
|
||||||
ParentPlatform platform = platformService.queryPlatformByServerGBId(sendRtpItem.getPlatformId());
|
|
||||||
if (platform != null) {
|
|
||||||
MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(0,
|
|
||||||
sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getChannelId(),
|
|
||||||
sendRtpItem.getPlatformId(), platform.getName(), userSetting.getServerId(), sendRtpItem.getMediaServerId());
|
|
||||||
messageForPushChannel.setPlatFormIndex(platform.getId());
|
|
||||||
redisCatchStorage.sendPlatformStopPlayMsg(messageForPushChannel);
|
|
||||||
}else {
|
}else {
|
||||||
logger.info("[上级平台停止观看] 未找到平台{}的信息,发送redis消息失败", sendRtpItem.getPlatformId());
|
MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());
|
||||||
|
redisCatchStorage.deleteSendRTPServer(sendRtpItem.getPlatformId(), sendRtpItem.getChannelId(),
|
||||||
|
callIdHeader.getCallId(), null);
|
||||||
|
zlmServerFactory.stopSendRtpStream(mediaInfo, param);
|
||||||
|
if (userSetting.getUseCustomSsrcForParentInvite()) {
|
||||||
|
mediaServerService.releaseSsrc(mediaInfo.getId(), sendRtpItem.getSsrc());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
}else {
|
||||||
|
logger.info("[上级平台停止观看] 未找到平台{}的信息,发送redis消息失败", sendRtpItem.getPlatformId());
|
||||||
}
|
}
|
||||||
}else {
|
}else {
|
||||||
MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());
|
MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());
|
||||||
|
|
|
@ -29,6 +29,7 @@ import com.genersoft.iot.vmp.service.bean.ErrorCallback;
|
||||||
import com.genersoft.iot.vmp.service.bean.InviteErrorCode;
|
import com.genersoft.iot.vmp.service.bean.InviteErrorCode;
|
||||||
import com.genersoft.iot.vmp.service.bean.MessageForPushChannel;
|
import com.genersoft.iot.vmp.service.bean.MessageForPushChannel;
|
||||||
import com.genersoft.iot.vmp.service.bean.SSRCInfo;
|
import com.genersoft.iot.vmp.service.bean.SSRCInfo;
|
||||||
|
import com.genersoft.iot.vmp.service.redisMsg.RedisPushStreamResponseListener;
|
||||||
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
|
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
|
||||||
import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
|
import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
|
||||||
import com.genersoft.iot.vmp.utils.DateUtil;
|
import com.genersoft.iot.vmp.utils.DateUtil;
|
||||||
|
@ -126,6 +127,9 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
|
||||||
@Autowired
|
@Autowired
|
||||||
private SendRtpPortManager sendRtpPortManager;
|
private SendRtpPortManager sendRtpPortManager;
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
private RedisPushStreamResponseListener redisPushStreamResponseListener;
|
||||||
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void afterPropertiesSet() throws Exception {
|
public void afterPropertiesSet() throws Exception {
|
||||||
|
@ -759,6 +763,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
|
||||||
redisCatchStorage.sendStreamPushRequestedMsg(messageForPushChannel);
|
redisCatchStorage.sendStreamPushRequestedMsg(messageForPushChannel);
|
||||||
// 设置超时
|
// 设置超时
|
||||||
dynamicTask.startDelay(sendRtpItem.getCallId(), () -> {
|
dynamicTask.startDelay(sendRtpItem.getCallId(), () -> {
|
||||||
|
redisRpcService.stopWaitePushStreamOnline(sendRtpItem);
|
||||||
logger.info("[ app={}, stream={} ] 等待设备开始推流超时", sendRtpItem.getApp(), sendRtpItem.getStream());
|
logger.info("[ app={}, stream={} ] 等待设备开始推流超时", sendRtpItem.getApp(), sendRtpItem.getStream());
|
||||||
try {
|
try {
|
||||||
responseAck(request, Response.REQUEST_TIMEOUT); // 超时
|
responseAck(request, Response.REQUEST_TIMEOUT); // 超时
|
||||||
|
@ -801,7 +806,18 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
|
||||||
// 其他平台内容
|
// 其他平台内容
|
||||||
otherWvpPushStream(sendRtpItemFromRedis, request, platform);
|
otherWvpPushStream(sendRtpItemFromRedis, request, platform);
|
||||||
}
|
}
|
||||||
|
});
|
||||||
|
// 添加回复的拒绝或者错误的通知
|
||||||
|
redisPushStreamResponseListener.addEvent(sendRtpItem.getApp(), sendRtpItem.getStream(), response -> {
|
||||||
|
if (response.getCode() != 0) {
|
||||||
|
dynamicTask.stop(sendRtpItem.getCallId());
|
||||||
|
redisRpcService.stopWaitePushStreamOnline(sendRtpItem);
|
||||||
|
try {
|
||||||
|
responseAck(request, Response.TEMPORARILY_UNAVAILABLE, response.getMsg());
|
||||||
|
} catch (SipException | InvalidArgumentException | ParseException e) {
|
||||||
|
logger.error("[命令发送失败] 国标级联 点播回复: {}", e.getMessage());
|
||||||
|
}
|
||||||
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -10,19 +10,22 @@ import com.genersoft.iot.vmp.conf.UserSetting;
|
||||||
import com.genersoft.iot.vmp.conf.exception.SsrcTransactionNotFoundException;
|
import com.genersoft.iot.vmp.conf.exception.SsrcTransactionNotFoundException;
|
||||||
import com.genersoft.iot.vmp.gb28181.bean.*;
|
import com.genersoft.iot.vmp.gb28181.bean.*;
|
||||||
import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
|
import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
|
||||||
import com.genersoft.iot.vmp.gb28181.session.AudioBroadcastManager;
|
|
||||||
import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent;
|
import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent;
|
||||||
|
import com.genersoft.iot.vmp.gb28181.session.AudioBroadcastManager;
|
||||||
import com.genersoft.iot.vmp.gb28181.session.SSRCFactory;
|
import com.genersoft.iot.vmp.gb28181.session.SSRCFactory;
|
||||||
import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager;
|
import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager;
|
||||||
import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder;
|
import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder;
|
||||||
import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage;
|
import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage;
|
||||||
import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform;
|
import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform;
|
||||||
import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander;
|
import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander;
|
||||||
import com.genersoft.iot.vmp.media.zlm.dto.*;
|
import com.genersoft.iot.vmp.media.zlm.dto.HookType;
|
||||||
|
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
|
||||||
|
import com.genersoft.iot.vmp.media.zlm.dto.StreamAuthorityInfo;
|
||||||
|
import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem;
|
||||||
import com.genersoft.iot.vmp.media.zlm.dto.hook.*;
|
import com.genersoft.iot.vmp.media.zlm.dto.hook.*;
|
||||||
import com.genersoft.iot.vmp.service.*;
|
import com.genersoft.iot.vmp.service.*;
|
||||||
import com.genersoft.iot.vmp.service.bean.MessageForPushChannel;
|
|
||||||
import com.genersoft.iot.vmp.service.bean.SSRCInfo;
|
import com.genersoft.iot.vmp.service.bean.SSRCInfo;
|
||||||
|
import com.genersoft.iot.vmp.service.redisMsg.IRedisRpcService;
|
||||||
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
|
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
|
||||||
import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
|
import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
|
||||||
import com.genersoft.iot.vmp.utils.DateUtil;
|
import com.genersoft.iot.vmp.utils.DateUtil;
|
||||||
|
@ -78,6 +81,10 @@ public class ZLMHttpHookListener {
|
||||||
@Autowired
|
@Autowired
|
||||||
private IRedisCatchStorage redisCatchStorage;
|
private IRedisCatchStorage redisCatchStorage;
|
||||||
|
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
private IRedisRpcService redisRpcService;
|
||||||
|
|
||||||
@Autowired
|
@Autowired
|
||||||
private IInviteStreamService inviteStreamService;
|
private IInviteStreamService inviteStreamService;
|
||||||
|
|
||||||
|
@ -511,41 +518,33 @@ public class ZLMHttpHookListener {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (sendRtpItem.getApp().equals(param.getApp())) {
|
if (sendRtpItem.getApp().equals(param.getApp())) {
|
||||||
logger.info(sendRtpItem.toString());
|
ParentPlatform platform = storager.queryParentPlatByServerGBId(sendRtpItem.getPlatformId());
|
||||||
if (userSetting.getServerId().equals(sendRtpItem.getServerId())) {
|
Device device = deviceService.getDevice(sendRtpItem.getPlatformId());
|
||||||
MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(0,
|
try {
|
||||||
sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getChannelId(),
|
if (platform != null) {
|
||||||
sendRtpItem.getPlatformId(), null, userSetting.getServerId(), param.getMediaServerId());
|
commanderFroPlatform.streamByeCmd(platform, sendRtpItem);
|
||||||
// 通知其他wvp停止发流
|
redisCatchStorage.deleteSendRTPServer(sendRtpItem.getPlatformId(), sendRtpItem.getChannelId(),
|
||||||
redisCatchStorage.sendPushStreamClose(messageForPushChannel);
|
sendRtpItem.getCallId(), sendRtpItem.getStream());
|
||||||
}else {
|
redisCatchStorage.sendPlatformStopPlayMsg(sendRtpItem, platform);
|
||||||
String platformId = sendRtpItem.getPlatformId();
|
} else if (device != null) {
|
||||||
ParentPlatform platform = storager.queryParentPlatByServerGBId(platformId);
|
cmder.streamByeCmd(device, sendRtpItem.getChannelId(), param.getStream(), sendRtpItem.getCallId());
|
||||||
Device device = deviceService.getDevice(platformId);
|
if (sendRtpItem.getPlayType().equals(InviteStreamType.BROADCAST)
|
||||||
|
|| sendRtpItem.getPlayType().equals(InviteStreamType.TALK)) {
|
||||||
try {
|
AudioBroadcastCatch audioBroadcastCatch = audioBroadcastManager.get(sendRtpItem.getDeviceId(), sendRtpItem.getChannelId());
|
||||||
if (platform != null) {
|
if (audioBroadcastCatch != null) {
|
||||||
commanderFroPlatform.streamByeCmd(platform, sendRtpItem);
|
// 来自上级平台的停止对讲
|
||||||
redisCatchStorage.deleteSendRTPServer(platformId, sendRtpItem.getChannelId(),
|
logger.info("[停止对讲] 来自上级,平台:{}, 通道:{}", sendRtpItem.getDeviceId(), sendRtpItem.getChannelId());
|
||||||
sendRtpItem.getCallId(), sendRtpItem.getStream());
|
audioBroadcastManager.del(sendRtpItem.getDeviceId(), sendRtpItem.getChannelId());
|
||||||
} else {
|
|
||||||
cmder.streamByeCmd(device, sendRtpItem.getChannelId(), param.getStream(), sendRtpItem.getCallId());
|
|
||||||
if (sendRtpItem.getPlayType().equals(InviteStreamType.BROADCAST)
|
|
||||||
|| sendRtpItem.getPlayType().equals(InviteStreamType.TALK)) {
|
|
||||||
AudioBroadcastCatch audioBroadcastCatch = audioBroadcastManager.get(sendRtpItem.getDeviceId(), sendRtpItem.getChannelId());
|
|
||||||
if (audioBroadcastCatch != null) {
|
|
||||||
// 来自上级平台的停止对讲
|
|
||||||
logger.info("[停止对讲] 来自上级,平台:{}, 通道:{}", sendRtpItem.getDeviceId(), sendRtpItem.getChannelId());
|
|
||||||
audioBroadcastManager.del(sendRtpItem.getDeviceId(), sendRtpItem.getChannelId());
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} catch (SipException | InvalidArgumentException | ParseException |
|
}else {
|
||||||
SsrcTransactionNotFoundException e) {
|
// 通知其他wvp停止发流
|
||||||
logger.error("[命令发送失败] 发送BYE: {}", e.getMessage());
|
redisRpcService.rtpSendStopped(sendRtpItem);
|
||||||
}
|
}
|
||||||
|
} catch (SipException | InvalidArgumentException | ParseException |
|
||||||
|
SsrcTransactionNotFoundException e) {
|
||||||
|
logger.error("[命令发送失败] 发送BYE: {}", e.getMessage());
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -593,11 +592,7 @@ public class ZLMHttpHookListener {
|
||||||
redisCatchStorage.deleteSendRTPServer(parentPlatform.getServerGBId(), sendRtpItem.getChannelId(),
|
redisCatchStorage.deleteSendRTPServer(parentPlatform.getServerGBId(), sendRtpItem.getChannelId(),
|
||||||
sendRtpItem.getCallId(), sendRtpItem.getStream());
|
sendRtpItem.getCallId(), sendRtpItem.getStream());
|
||||||
if (InviteStreamType.PUSH == sendRtpItem.getPlayType()) {
|
if (InviteStreamType.PUSH == sendRtpItem.getPlayType()) {
|
||||||
MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(0,
|
redisCatchStorage.sendPlatformStopPlayMsg(sendRtpItem, parentPlatform);
|
||||||
sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getChannelId(),
|
|
||||||
sendRtpItem.getPlatformId(), parentPlatform.getName(), userSetting.getServerId(), sendRtpItem.getMediaServerId());
|
|
||||||
messageForPushChannel.setPlatFormIndex(parentPlatform.getId());
|
|
||||||
redisCatchStorage.sendPlatformStopPlayMsg(messageForPushChannel);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -372,6 +372,6 @@ public class ZLMServerFactory {
|
||||||
param.put("app", sendRtpItem.getApp());
|
param.put("app", sendRtpItem.getApp());
|
||||||
param.put("stream", sendRtpItem.getStream());
|
param.put("stream", sendRtpItem.getStream());
|
||||||
param.put("ssrc", sendRtpItem.getSsrc());
|
param.put("ssrc", sendRtpItem.getSsrc());
|
||||||
return zlmresTfulUtils.startSendRtp(mediaServerItem, param);
|
return zlmresTfulUtils.stopSendRtp(mediaServerItem, param);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -13,4 +13,9 @@ public interface IRedisRpcService {
|
||||||
void waitePushStreamOnline(SendRtpItem sendRtpItem, CommonCallback<SendRtpItem> callback);
|
void waitePushStreamOnline(SendRtpItem sendRtpItem, CommonCallback<SendRtpItem> callback);
|
||||||
|
|
||||||
WVPResult stopSendRtp(SendRtpItem sendRtpItem);
|
WVPResult stopSendRtp(SendRtpItem sendRtpItem);
|
||||||
|
|
||||||
|
void stopWaitePushStreamOnline(SendRtpItem sendRtpItem);
|
||||||
|
|
||||||
|
void rtpSendStopped(SendRtpItem sendRtpItem);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,76 @@
|
||||||
|
package com.genersoft.iot.vmp.service.redisMsg;
|
||||||
|
|
||||||
|
import com.alibaba.fastjson2.JSON;
|
||||||
|
import com.genersoft.iot.vmp.service.bean.MessageForPushChannelResponse;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
|
import org.springframework.beans.factory.annotation.Qualifier;
|
||||||
|
import org.springframework.data.redis.connection.Message;
|
||||||
|
import org.springframework.data.redis.connection.MessageListener;
|
||||||
|
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
|
||||||
|
import org.springframework.stereotype.Component;
|
||||||
|
import org.springframework.util.ObjectUtils;
|
||||||
|
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
import java.util.concurrent.ConcurrentLinkedQueue;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 接收redis返回的推流结果
|
||||||
|
* @author lin
|
||||||
|
*/
|
||||||
|
@Component
|
||||||
|
public class RedisPushStreamResponseListener implements MessageListener {
|
||||||
|
|
||||||
|
private final static Logger logger = LoggerFactory.getLogger(RedisPushStreamResponseListener.class);
|
||||||
|
|
||||||
|
private ConcurrentLinkedQueue<Message> taskQueue = new ConcurrentLinkedQueue<>();
|
||||||
|
|
||||||
|
@Qualifier("taskExecutor")
|
||||||
|
@Autowired
|
||||||
|
private ThreadPoolTaskExecutor taskExecutor;
|
||||||
|
|
||||||
|
|
||||||
|
private Map<String, PushStreamResponseEvent> responseEvents = new ConcurrentHashMap<>();
|
||||||
|
|
||||||
|
public interface PushStreamResponseEvent{
|
||||||
|
void run(MessageForPushChannelResponse response);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onMessage(Message message, byte[] bytes) {
|
||||||
|
logger.info("[REDIS消息-请求推流结果]: {}", new String(message.getBody()));
|
||||||
|
boolean isEmpty = taskQueue.isEmpty();
|
||||||
|
taskQueue.offer(message);
|
||||||
|
if (isEmpty) {
|
||||||
|
taskExecutor.execute(() -> {
|
||||||
|
while (!taskQueue.isEmpty()) {
|
||||||
|
Message msg = taskQueue.poll();
|
||||||
|
try {
|
||||||
|
MessageForPushChannelResponse response = JSON.parseObject(new String(msg.getBody()), MessageForPushChannelResponse.class);
|
||||||
|
if (response == null || ObjectUtils.isEmpty(response.getApp()) || ObjectUtils.isEmpty(response.getStream())){
|
||||||
|
logger.info("[REDIS消息-请求推流结果]:参数不全");
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
// 查看正在等待的invite消息
|
||||||
|
if (responseEvents.get(response.getApp() + response.getStream()) != null) {
|
||||||
|
responseEvents.get(response.getApp() + response.getStream()).run(response);
|
||||||
|
}
|
||||||
|
}catch (Exception e) {
|
||||||
|
logger.warn("[REDIS消息-请求推流结果] 发现未处理的异常, \r\n{}", JSON.toJSONString(message));
|
||||||
|
logger.error("[REDIS消息-请求推流结果] 异常内容: ", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void addEvent(String app, String stream, PushStreamResponseEvent callback) {
|
||||||
|
responseEvents.put(app + stream, callback);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void removeEvent(String app, String stream) {
|
||||||
|
responseEvents.remove(app + stream);
|
||||||
|
}
|
||||||
|
}
|
|
@ -7,8 +7,10 @@ import com.genersoft.iot.vmp.conf.redis.RedisRpcConfig;
|
||||||
import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcMessage;
|
import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcMessage;
|
||||||
import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcRequest;
|
import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcRequest;
|
||||||
import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcResponse;
|
import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcResponse;
|
||||||
|
import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform;
|
||||||
import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
|
import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
|
||||||
import com.genersoft.iot.vmp.gb28181.session.SSRCFactory;
|
import com.genersoft.iot.vmp.gb28181.session.SSRCFactory;
|
||||||
|
import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform;
|
||||||
import com.genersoft.iot.vmp.media.zlm.SendRtpPortManager;
|
import com.genersoft.iot.vmp.media.zlm.SendRtpPortManager;
|
||||||
import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory;
|
import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory;
|
||||||
import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe;
|
import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe;
|
||||||
|
@ -18,6 +20,7 @@ import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
|
||||||
import com.genersoft.iot.vmp.media.zlm.dto.hook.HookParam;
|
import com.genersoft.iot.vmp.media.zlm.dto.hook.HookParam;
|
||||||
import com.genersoft.iot.vmp.service.IMediaServerService;
|
import com.genersoft.iot.vmp.service.IMediaServerService;
|
||||||
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
|
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
|
||||||
|
import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
|
||||||
import com.genersoft.iot.vmp.vmanager.bean.WVPResult;
|
import com.genersoft.iot.vmp.vmanager.bean.WVPResult;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
@ -25,6 +28,10 @@ import org.springframework.beans.factory.annotation.Autowired;
|
||||||
import org.springframework.data.redis.core.RedisTemplate;
|
import org.springframework.data.redis.core.RedisTemplate;
|
||||||
import org.springframework.stereotype.Component;
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
|
import javax.sip.InvalidArgumentException;
|
||||||
|
import javax.sip.SipException;
|
||||||
|
import java.text.ParseException;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 其他wvp发起的rpc调用,这里的方法被 RedisRpcConfig 通过反射寻找对应的方法名称调用
|
* 其他wvp发起的rpc调用,这里的方法被 RedisRpcConfig 通过反射寻找对应的方法名称调用
|
||||||
*/
|
*/
|
||||||
|
@ -59,6 +66,14 @@ public class RedisRpcController {
|
||||||
private RedisTemplate<Object, Object> redisTemplate;
|
private RedisTemplate<Object, Object> redisTemplate;
|
||||||
|
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
private ISIPCommanderForPlatform commanderFroPlatform;
|
||||||
|
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
private IVideoManagerStorage storager;
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 获取发流的信息
|
* 获取发流的信息
|
||||||
*/
|
*/
|
||||||
|
@ -133,6 +148,20 @@ public class RedisRpcController {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 停止监听流上线
|
||||||
|
*/
|
||||||
|
public RedisRpcResponse stopWaitePushStreamOnline(RedisRpcRequest request) {
|
||||||
|
SendRtpItem sendRtpItem = JSON.parseObject(request.getParam().toString(), SendRtpItem.class);
|
||||||
|
logger.info("[redis-rpc] 停止监听流上线: {}/{}", sendRtpItem.getApp(), sendRtpItem.getStream() );
|
||||||
|
|
||||||
|
// 监听流上线。 流上线直接发送sendRtpItem消息给实际的信令处理者
|
||||||
|
HookSubscribeForStreamChange hook = HookSubscribeFactory.on_stream_changed(
|
||||||
|
sendRtpItem.getApp(), sendRtpItem.getStream(), true, "rtsp", null);
|
||||||
|
hookSubscribe.removeSubscribe(hook);
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 开始发流
|
* 开始发流
|
||||||
|
@ -194,6 +223,34 @@ public class RedisRpcController {
|
||||||
return response;
|
return response;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 其他wvp通知推流已经停止了
|
||||||
|
*/
|
||||||
|
public RedisRpcResponse rtpSendStopped(RedisRpcRequest request) {
|
||||||
|
SendRtpItem sendRtpItem = JSON.parseObject(request.getParam().toString(), SendRtpItem.class);
|
||||||
|
logger.info("[redis-rpc] 推流已经停止: {}/{}", sendRtpItem.getApp(), sendRtpItem.getStream() );
|
||||||
|
SendRtpItem sendRtpItemInCatch = redisCatchStorage.querySendRTPServer(sendRtpItem.getPlatformId(), sendRtpItem.getChannelId(), sendRtpItem.getStream(), sendRtpItem.getCallId());
|
||||||
|
RedisRpcResponse response = request.getResponse();
|
||||||
|
response.setStatusCode(200);
|
||||||
|
if (sendRtpItemInCatch == null) {
|
||||||
|
return response;
|
||||||
|
}
|
||||||
|
String platformId = sendRtpItem.getPlatformId();
|
||||||
|
ParentPlatform platform = storager.queryParentPlatByServerGBId(platformId);
|
||||||
|
if (platform == null) {
|
||||||
|
return response;
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
commanderFroPlatform.streamByeCmd(platform, sendRtpItem);
|
||||||
|
redisCatchStorage.deleteSendRTPServer(platformId, sendRtpItem.getChannelId(),
|
||||||
|
sendRtpItem.getCallId(), sendRtpItem.getStream());
|
||||||
|
redisCatchStorage.sendPlatformStopPlayMsg(sendRtpItem, platform);
|
||||||
|
} catch (SipException | InvalidArgumentException | ParseException e) {
|
||||||
|
logger.error("[命令发送失败] 发送BYE: {}", e.getMessage());
|
||||||
|
}
|
||||||
|
return response;
|
||||||
|
}
|
||||||
|
|
||||||
private void sendResponse(RedisRpcResponse response){
|
private void sendResponse(RedisRpcResponse response){
|
||||||
response.setToId(userSetting.getServerId());
|
response.setToId(userSetting.getServerId());
|
||||||
RedisRpcMessage message = new RedisRpcMessage();
|
RedisRpcMessage message = new RedisRpcMessage();
|
||||||
|
|
|
@ -97,4 +97,21 @@ public class RedisRpcServiceImpl implements IRedisRpcService {
|
||||||
});
|
});
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void stopWaitePushStreamOnline(SendRtpItem sendRtpItem) {
|
||||||
|
HookSubscribeForStreamChange hook = HookSubscribeFactory.on_stream_changed(
|
||||||
|
sendRtpItem.getApp(), sendRtpItem.getStream(), true, "rtsp", null);
|
||||||
|
hookSubscribe.removeSubscribe(hook);
|
||||||
|
RedisRpcRequest request = buildRequest("stopWaitePushStreamOnline", sendRtpItem);
|
||||||
|
request.setToId(sendRtpItem.getServerId());
|
||||||
|
redisRpcConfig.request(request, 10);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void rtpSendStopped(SendRtpItem sendRtpItem) {
|
||||||
|
RedisRpcRequest request = buildRequest("rtpSendStopped", sendRtpItem);
|
||||||
|
request.setToId(sendRtpItem.getServerId());
|
||||||
|
redisRpcConfig.request(request, 10);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -2,10 +2,7 @@ package com.genersoft.iot.vmp.storager;
|
||||||
|
|
||||||
import com.alibaba.fastjson2.JSONObject;
|
import com.alibaba.fastjson2.JSONObject;
|
||||||
import com.genersoft.iot.vmp.common.SystemAllInfo;
|
import com.genersoft.iot.vmp.common.SystemAllInfo;
|
||||||
import com.genersoft.iot.vmp.gb28181.bean.AlarmChannelMessage;
|
import com.genersoft.iot.vmp.gb28181.bean.*;
|
||||||
import com.genersoft.iot.vmp.gb28181.bean.Device;
|
|
||||||
import com.genersoft.iot.vmp.gb28181.bean.ParentPlatformCatch;
|
|
||||||
import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
|
|
||||||
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
|
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
|
||||||
import com.genersoft.iot.vmp.media.zlm.dto.StreamAuthorityInfo;
|
import com.genersoft.iot.vmp.media.zlm.dto.StreamAuthorityInfo;
|
||||||
import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam;
|
import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam;
|
||||||
|
@ -207,7 +204,7 @@ public interface IRedisCatchStorage {
|
||||||
|
|
||||||
void sendPlatformStartPlayMsg(MessageForPushChannel messageForPushChannel);
|
void sendPlatformStartPlayMsg(MessageForPushChannel messageForPushChannel);
|
||||||
|
|
||||||
void sendPlatformStopPlayMsg(MessageForPushChannel messageForPushChannel);
|
void sendPlatformStopPlayMsg(SendRtpItem sendRtpItem, ParentPlatform platform);
|
||||||
|
|
||||||
void addPushListItem(String app, String stream, OnStreamChangedHookParam param);
|
void addPushListItem(String app, String stream, OnStreamChangedHookParam param);
|
||||||
|
|
||||||
|
|
|
@ -5,10 +5,7 @@ import com.alibaba.fastjson2.JSONObject;
|
||||||
import com.genersoft.iot.vmp.common.SystemAllInfo;
|
import com.genersoft.iot.vmp.common.SystemAllInfo;
|
||||||
import com.genersoft.iot.vmp.common.VideoManagerConstants;
|
import com.genersoft.iot.vmp.common.VideoManagerConstants;
|
||||||
import com.genersoft.iot.vmp.conf.UserSetting;
|
import com.genersoft.iot.vmp.conf.UserSetting;
|
||||||
import com.genersoft.iot.vmp.gb28181.bean.AlarmChannelMessage;
|
import com.genersoft.iot.vmp.gb28181.bean.*;
|
||||||
import com.genersoft.iot.vmp.gb28181.bean.Device;
|
|
||||||
import com.genersoft.iot.vmp.gb28181.bean.ParentPlatformCatch;
|
|
||||||
import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
|
|
||||||
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
|
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
|
||||||
import com.genersoft.iot.vmp.media.zlm.dto.StreamAuthorityInfo;
|
import com.genersoft.iot.vmp.media.zlm.dto.StreamAuthorityInfo;
|
||||||
import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam;
|
import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam;
|
||||||
|
@ -644,9 +641,15 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void sendPlatformStopPlayMsg(MessageForPushChannel msg) {
|
public void sendPlatformStopPlayMsg(SendRtpItem sendRtpItem, ParentPlatform platform) {
|
||||||
|
|
||||||
|
MessageForPushChannel msg = MessageForPushChannel.getInstance(0,
|
||||||
|
sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getChannelId(),
|
||||||
|
sendRtpItem.getPlatformId(), platform.getName(), userSetting.getServerId(), sendRtpItem.getMediaServerId());
|
||||||
|
msg.setPlatFormIndex(platform.getId());
|
||||||
|
|
||||||
String key = VideoManagerConstants.VM_MSG_STREAM_STOP_PLAY_NOTIFY;
|
String key = VideoManagerConstants.VM_MSG_STREAM_STOP_PLAY_NOTIFY;
|
||||||
logger.info("[redis发送通知] 发送 上级平台停止观看 {}: {}/{}->{}", key, msg.getApp(), msg.getStream(), msg.getPlatFormId());
|
logger.info("[redis发送通知] 发送 上级平台停止观看 {}: {}/{}->{}", key, sendRtpItem.getApp(), sendRtpItem.getStream(), platform.getServerGBId());
|
||||||
redisTemplate.convertAndSend(key, JSON.toJSON(msg));
|
redisTemplate.convertAndSend(key, JSON.toJSON(msg));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -2,4 +2,4 @@ spring:
|
||||||
application:
|
application:
|
||||||
name: wvp
|
name: wvp
|
||||||
profiles:
|
profiles:
|
||||||
active: local
|
active: local2
|
Loading…
Reference in New Issue