修复多wvp推流时,收到停止消息数据处理失败问题
parent
9b62fbe467
commit
47526e241e
|
@ -13,6 +13,7 @@ import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange;
|
||||||
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
|
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
|
||||||
import com.genersoft.iot.vmp.service.IMediaServerService;
|
import com.genersoft.iot.vmp.service.IMediaServerService;
|
||||||
import com.genersoft.iot.vmp.service.bean.*;
|
import com.genersoft.iot.vmp.service.bean.*;
|
||||||
|
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
|
||||||
import com.genersoft.iot.vmp.utils.redis.RedisUtil;
|
import com.genersoft.iot.vmp.utils.redis.RedisUtil;
|
||||||
import com.genersoft.iot.vmp.vmanager.bean.WVPResult;
|
import com.genersoft.iot.vmp.vmanager.bean.WVPResult;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
|
@ -77,6 +78,9 @@ public class RedisGbPlayMsgListener implements MessageListener {
|
||||||
@Autowired
|
@Autowired
|
||||||
private IMediaServerService mediaServerService;
|
private IMediaServerService mediaServerService;
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
private IRedisCatchStorage redisCatchStorage;
|
||||||
|
|
||||||
|
|
||||||
@Autowired
|
@Autowired
|
||||||
private DynamicTask dynamicTask;
|
private DynamicTask dynamicTask;
|
||||||
|
@ -322,6 +326,7 @@ public class RedisGbPlayMsgListener implements MessageListener {
|
||||||
responseSendItemMsg.setSendRtpItem(sendRtpItem);
|
responseSendItemMsg.setSendRtpItem(sendRtpItem);
|
||||||
responseSendItemMsg.setMediaServerItem(mediaServerItem);
|
responseSendItemMsg.setMediaServerItem(mediaServerItem);
|
||||||
result.setData(responseSendItemMsg);
|
result.setData(responseSendItemMsg);
|
||||||
|
redisCatchStorage.updateSendRTPSever(sendRtpItem);
|
||||||
|
|
||||||
WvpRedisMsg response = WvpRedisMsg.getResponseInstance(
|
WvpRedisMsg response = WvpRedisMsg.getResponseInstance(
|
||||||
userSetting.getServerId(), toId, WvpRedisMsgCmd.GET_SEND_ITEM, serial, JSON.toJSONString(result)
|
userSetting.getServerId(), toId, WvpRedisMsgCmd.GET_SEND_ITEM, serial, JSON.toJSONString(result)
|
||||||
|
|
|
@ -73,12 +73,20 @@ public class RedisPushStreamCloseResponseListener implements MessageListener {
|
||||||
MessageForPushChannel pushChannel = JSON.parseObject(message.getBody(), MessageForPushChannel.class);
|
MessageForPushChannel pushChannel = JSON.parseObject(message.getBody(), MessageForPushChannel.class);
|
||||||
StreamPushItem push = streamPushService.getPush(pushChannel.getApp(), pushChannel.getStream());
|
StreamPushItem push = streamPushService.getPush(pushChannel.getApp(), pushChannel.getStream());
|
||||||
if (push != null) {
|
if (push != null) {
|
||||||
if (redisCatchStorage.isChannelSendingRTP(push.getGbId())) {
|
|
||||||
List<SendRtpItem> sendRtpItems = redisCatchStorage.querySendRTPServerByChnnelId(
|
List<SendRtpItem> sendRtpItems = redisCatchStorage.querySendRTPServerByChnnelId(
|
||||||
push.getGbId());
|
push.getGbId());
|
||||||
if (sendRtpItems.size() > 0) {
|
if (!sendRtpItems.isEmpty()) {
|
||||||
for (SendRtpItem sendRtpItem : sendRtpItems) {
|
for (SendRtpItem sendRtpItem : sendRtpItems) {
|
||||||
ParentPlatform parentPlatform = storager.queryParentPlatByServerGBId(sendRtpItem.getPlatformId());
|
ParentPlatform parentPlatform = storager.queryParentPlatByServerGBId(sendRtpItem.getPlatformId());
|
||||||
|
if (parentPlatform != null) {
|
||||||
|
redisCatchStorage.deleteSendRTPServer(sendRtpItem.getPlatformId(), sendRtpItem.getChannelId(), sendRtpItem.getCallId(), sendRtpItem.getStreamId());
|
||||||
|
try {
|
||||||
|
commanderFroPlatform.streamByeCmd(parentPlatform, sendRtpItem);
|
||||||
|
} catch (SipException | InvalidArgumentException | ParseException e) {
|
||||||
|
logger.error("[命令发送失败] 国标级联 发送BYE: {}", e.getMessage());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (push.isSelf()) {
|
||||||
// 停止向上级推流
|
// 停止向上级推流
|
||||||
String streamId = sendRtpItem.getStreamId();
|
String streamId = sendRtpItem.getStreamId();
|
||||||
Map<String, Object> param = new HashMap<>();
|
Map<String, Object> param = new HashMap<>();
|
||||||
|
@ -90,12 +98,6 @@ public class RedisPushStreamCloseResponseListener implements MessageListener {
|
||||||
MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());
|
MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());
|
||||||
redisCatchStorage.deleteSendRTPServer(sendRtpItem.getPlatformId(), sendRtpItem.getChannelId(), sendRtpItem.getCallId(), sendRtpItem.getStreamId());
|
redisCatchStorage.deleteSendRTPServer(sendRtpItem.getPlatformId(), sendRtpItem.getChannelId(), sendRtpItem.getCallId(), sendRtpItem.getStreamId());
|
||||||
zlmServerFactory.stopSendRtpStream(mediaInfo, param);
|
zlmServerFactory.stopSendRtpStream(mediaInfo, param);
|
||||||
|
|
||||||
try {
|
|
||||||
commanderFroPlatform.streamByeCmd(parentPlatform, sendRtpItem);
|
|
||||||
} catch (SipException | InvalidArgumentException | ParseException e) {
|
|
||||||
logger.error("[命令发送失败] 国标级联 发送BYE: {}", e.getMessage());
|
|
||||||
}
|
|
||||||
if (InviteStreamType.PUSH == sendRtpItem.getPlayType()) {
|
if (InviteStreamType.PUSH == sendRtpItem.getPlayType()) {
|
||||||
MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(0,
|
MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(0,
|
||||||
sendRtpItem.getApp(), sendRtpItem.getStreamId(), sendRtpItem.getChannelId(),
|
sendRtpItem.getApp(), sendRtpItem.getStreamId(), sendRtpItem.getChannelId(),
|
||||||
|
|
Loading…
Reference in New Issue