diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SendRtpInfo.java b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SendRtpInfo.java index 29d40db7..988c2d44 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SendRtpInfo.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SendRtpInfo.java @@ -2,8 +2,6 @@ package com.genersoft.iot.vmp.gb28181.bean; import com.genersoft.iot.vmp.media.bean.MediaServer; import com.genersoft.iot.vmp.service.bean.RequestPushStreamMsg; - -import com.genersoft.iot.vmp.common.VideoManagerConstants; import lombok.Data; @Data @@ -34,6 +32,11 @@ public class SendRtpInfo { */ private String targetName; + /** + * 是否是发送给上级平台 + */ + private boolean sendToPlatform; + /** * 直播流的应用名 */ @@ -182,8 +185,13 @@ public class SendRtpInfo { sendRtpItem.setIp(ip); sendRtpItem.setPort(port); sendRtpItem.setSsrc(ssrc); - sendRtpItem.setDeviceId(deviceId); - sendRtpItem.setPlatformId(platformId); + if (deviceId != null) { + sendRtpItem.setTargetId(deviceId); + sendRtpItem.setSendToPlatform(false); + }else { + sendRtpItem.setTargetId(platformId); + sendRtpItem.setSendToPlatform(true); + } sendRtpItem.setChannelId(channelId); sendRtpItem.setTcp(isTcp); sendRtpItem.setRtcp(rtcp); @@ -200,9 +208,7 @@ public class SendRtpInfo { "ip='" + ip + '\'' + ", port=" + port + ", ssrc='" + ssrc + '\'' + - ", platformId='" + platformId + '\'' + - ", platformName='" + platformName + '\'' + - ", deviceId='" + deviceId + '\'' + + ", targetId='" + targetId + '\'' + ", app='" + app + '\'' + ", channelId='" + channelId + '\'' + ", status=" + status + diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/DeviceServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/DeviceServiceImpl.java index 46654cb3..1bdfbf86 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/DeviceServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/DeviceServiceImpl.java @@ -10,6 +10,9 @@ import com.genersoft.iot.vmp.gb28181.bean.*; import com.genersoft.iot.vmp.gb28181.dao.DeviceChannelMapper; import com.genersoft.iot.vmp.gb28181.dao.DeviceMapper; import com.genersoft.iot.vmp.gb28181.dao.PlatformChannelMapper; +import com.genersoft.iot.vmp.gb28181.service.IDeviceChannelService; +import com.genersoft.iot.vmp.gb28181.service.IDeviceService; +import com.genersoft.iot.vmp.gb28181.service.IInviteStreamService; import com.genersoft.iot.vmp.gb28181.session.AudioBroadcastManager; import com.genersoft.iot.vmp.gb28181.session.SipInviteSessionManager; import com.genersoft.iot.vmp.gb28181.task.ISubscribeTask; @@ -20,9 +23,6 @@ import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander; import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.response.cmd.CatalogResponseMessageHandler; import com.genersoft.iot.vmp.media.bean.MediaServer; import com.genersoft.iot.vmp.media.service.IMediaServerService; -import com.genersoft.iot.vmp.gb28181.service.IDeviceChannelService; -import com.genersoft.iot.vmp.gb28181.service.IDeviceService; -import com.genersoft.iot.vmp.gb28181.service.IInviteStreamService; import com.genersoft.iot.vmp.service.ISendRtpServerService; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.utils.DateUtil; @@ -32,9 +32,7 @@ import com.github.pagehelper.PageHelper; import com.github.pagehelper.PageInfo; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.jdbc.datasource.DataSourceTransactionManager; import org.springframework.stereotype.Service; -import org.springframework.transaction.TransactionDefinition; import org.springframework.transaction.annotation.Transactional; import org.springframework.util.ObjectUtils; @@ -234,10 +232,10 @@ public class DeviceServiceImpl implements IDeviceService { removeMobilePositionSubscribe(device, null); List audioBroadcastCatches = audioBroadcastManager.getByDeviceId(deviceId); - if (audioBroadcastCatches.size() > 0) { + if (!audioBroadcastCatches.isEmpty()) { for (AudioBroadcastCatch audioBroadcastCatch : audioBroadcastCatches) { - SendRtpInfo sendRtpItem = sendRtpServerService.queryByChannelId(audioBroadcastCatch.getChannelId()); + SendRtpInfo sendRtpItem = sendRtpServerService.queryByChannelId(audioBroadcastCatch.getChannelId(), deviceId); if (sendRtpItem != null) { sendRtpServerService.delete(sendRtpItem); MediaServer mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId()); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlatformServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlatformServiceImpl.java index e19b453f..d530b71f 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlatformServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlatformServiceImpl.java @@ -9,7 +9,6 @@ import com.genersoft.iot.vmp.conf.exception.SsrcTransactionNotFoundException; import com.genersoft.iot.vmp.gb28181.bean.*; import com.genersoft.iot.vmp.gb28181.dao.PlatformChannelMapper; import com.genersoft.iot.vmp.gb28181.dao.PlatformMapper; -import com.genersoft.iot.vmp.gb28181.event.EventPublisher; import com.genersoft.iot.vmp.gb28181.event.SipSubscribe; import com.genersoft.iot.vmp.gb28181.service.IGbChannelService; import com.genersoft.iot.vmp.gb28181.service.IInviteStreamService; @@ -111,20 +110,21 @@ public class PlatformServiceImpl implements IPlatformService { @Async("taskExecutor") @EventListener public void onApplicationEvent(MediaDepartureEvent event) { - SendRtpInfo sendRtpItems = sendRtpServerService.queryByStream(event.getStream()); - if (sendRtpItems != null) { - if (sendRtpItem != null && sendRtpItem.getApp().equals(event.getApp())) { - String platformId = sendRtpItem.getPlatformId(); - Platform platform = platformMapper.getParentPlatByServerGBId(platformId); - CommonGBChannel channel = channelService.getOne(sendRtpItem.getChannelId()); - try { - if (platform != null && channel != null) { - commanderForPlatform.streamByeCmd(platform, sendRtpItem, channel); - redisCatchStorage.deleteSendRTPServer(platformId, channel.getGbDeviceId(), - sendRtpItem.getCallId(), sendRtpItem.getStream()); + List sendRtpItems = sendRtpServerService.queryByStream(event.getStream()); + if (!sendRtpItems.isEmpty()) { + for (SendRtpInfo sendRtpItem : sendRtpItems) { + if (sendRtpItem != null && sendRtpItem.getApp().equals(event.getApp()) && sendRtpItem.isSendToPlatform()) { + String platformId = sendRtpItem.getTargetId(); + Platform platform = platformMapper.getParentPlatByServerGBId(platformId); + CommonGBChannel channel = channelService.getOne(sendRtpItem.getChannelId()); + try { + if (platform != null && channel != null) { + commanderForPlatform.streamByeCmd(platform, sendRtpItem, channel); + sendRtpServerService.delete(sendRtpItem); + } + } catch (SipException | InvalidArgumentException | ParseException e) { + log.error("[命令发送失败] 发送BYE: {}", e.getMessage()); } - } catch (SipException | InvalidArgumentException | ParseException e) { - log.error("[命令发送失败] 发送BYE: {}", e.getMessage()); } } } @@ -137,19 +137,20 @@ public class PlatformServiceImpl implements IPlatformService { @Async("taskExecutor") @EventListener public void onApplicationEvent(MediaSendRtpStoppedEvent event) { - List sendRtpItems = redisCatchStorage.querySendRTPServerByStream(event.getStream()); + List sendRtpItems = sendRtpServerService.queryByStream(event.getStream()); if (sendRtpItems != null && !sendRtpItems.isEmpty()) { for (SendRtpInfo sendRtpItem : sendRtpItems) { - Platform platform = platformMapper.getParentPlatByServerGBId(sendRtpItem.getPlatformId()); - CommonGBChannel channel = channelService.getOne(sendRtpItem.getChannelId()); - ssrcFactory.releaseSsrc(sendRtpItem.getMediaServerId(), sendRtpItem.getSsrc()); - try { - commanderForPlatform.streamByeCmd(platform, sendRtpItem, channel); - } catch (SipException | InvalidArgumentException | ParseException e) { - log.error("[命令发送失败] 国标级联 发送BYE: {}", e.getMessage()); + if (sendRtpItem != null && sendRtpItem.getApp().equals(event.getApp()) && sendRtpItem.isSendToPlatform()) { + Platform platform = platformMapper.getParentPlatByServerGBId(sendRtpItem.getTargetId()); + CommonGBChannel channel = channelService.getOne(sendRtpItem.getChannelId()); + ssrcFactory.releaseSsrc(sendRtpItem.getMediaServerId(), sendRtpItem.getSsrc()); + try { + commanderForPlatform.streamByeCmd(platform, sendRtpItem, channel); + } catch (SipException | InvalidArgumentException | ParseException e) { + log.error("[命令发送失败] 国标级联 发送BYE: {}", e.getMessage()); + } + sendRtpServerService.delete(sendRtpItem); } - redisCatchStorage.deleteSendRTPServer(platform.getServerGBId(), sendRtpItem.getChannelId(), - sendRtpItem.getCallId(), sendRtpItem.getStream()); } } } @@ -419,11 +420,11 @@ public class PlatformServiceImpl implements IPlatformService { } private void stopAllPush(String platformId) { - List sendRtpItems = redisCatchStorage.querySendRTPServer(platformId); + List sendRtpItems = sendRtpServerService.queryForPlatform(platformId); if (sendRtpItems != null && sendRtpItems.size() > 0) { for (SendRtpInfo sendRtpItem : sendRtpItems) { ssrcFactory.releaseSsrc(sendRtpItem.getMediaServerId(), sendRtpItem.getSsrc()); - redisCatchStorage.deleteSendRTPServer(platformId, sendRtpItem.getChannelId(), null, null); + sendRtpServerService.delete(sendRtpItem); MediaServer mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId()); mediaServerService.stopSendRtp(mediaInfo, sendRtpItem.getApp(), sendRtpItem.getStream(), null); } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlayServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlayServiceImpl.java index a353ae7a..6d36b8ab 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlayServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlayServiceImpl.java @@ -29,6 +29,7 @@ import com.genersoft.iot.vmp.media.event.media.MediaDepartureEvent; import com.genersoft.iot.vmp.media.event.media.MediaNotFoundEvent; import com.genersoft.iot.vmp.media.service.IMediaServerService; import com.genersoft.iot.vmp.media.zlm.SendRtpPortManager; +import com.genersoft.iot.vmp.service.ISendRtpServerService; import com.genersoft.iot.vmp.service.bean.DownloadFileInfo; import com.genersoft.iot.vmp.service.bean.ErrorCallback; import com.genersoft.iot.vmp.service.bean.InviteErrorCode; @@ -126,6 +127,9 @@ public class PlayServiceImpl implements IPlayService { @Autowired private IGbChannelService channelService; + @Autowired + private ISendRtpServerService sendRtpServerService; + /** * 流到来的处理 */ @@ -180,23 +184,23 @@ public class PlayServiceImpl implements IPlayService { @Async("taskExecutor") @EventListener public void onApplicationEvent(MediaDepartureEvent event) { - List sendRtpItems = redisCatchStorage.querySendRTPServerByStream(event.getStream()); - if (!sendRtpItems.isEmpty()) { - for (SendRtpInfo sendRtpItem : sendRtpItems) { - if (sendRtpItem != null && sendRtpItem.getApp().equals(event.getApp())) { - String platformId = sendRtpItem.getPlatformId(); + List sendRtpInfos = sendRtpServerService.queryByStream(event.getStream()); + if (!sendRtpInfos.isEmpty()) { + for (SendRtpInfo sendRtpInfo : sendRtpInfos) { + if (sendRtpInfo != null && sendRtpInfo.isSendToPlatform() && sendRtpInfo.getApp().equals(event.getApp())) { + String platformId = sendRtpInfo.getTargetId(); Device device = deviceService.getDeviceByDeviceId(platformId); - DeviceChannel channel = deviceChannelService.getOneById(sendRtpItem.getChannelId()); + DeviceChannel channel = deviceChannelService.getOneById(sendRtpInfo.getChannelId()); try { if (device != null && channel != null) { - cmder.streamByeCmd(device, channel.getDeviceId(), event.getStream(), sendRtpItem.getCallId()); - if (sendRtpItem.getPlayType().equals(InviteStreamType.BROADCAST) - || sendRtpItem.getPlayType().equals(InviteStreamType.TALK)) { + cmder.streamByeCmd(device, channel.getDeviceId(), event.getStream(), sendRtpInfo.getCallId()); + if (sendRtpInfo.getPlayType().equals(InviteStreamType.BROADCAST) + || sendRtpInfo.getPlayType().equals(InviteStreamType.TALK)) { AudioBroadcastCatch audioBroadcastCatch = audioBroadcastManager.get(channel.getId()); if (audioBroadcastCatch != null) { // 来自上级平台的停止对讲 - log.info("[停止对讲] 来自上级,平台:{}, 通道:{}", sendRtpItem.getDeviceId(), sendRtpItem.getChannelId()); - audioBroadcastManager.del(sendRtpItem.getChannelId()); + log.info("[停止对讲] 来自上级,平台:{}, 通道:{}", sendRtpInfo.getTargetId(), sendRtpInfo.getChannelId()); + audioBroadcastManager.del(sendRtpInfo.getChannelId()); } } } @@ -362,57 +366,47 @@ public class PlayServiceImpl implements IPlayService { audioEvent.call("ssrc已经用尽"); return; } - SendRtpInfo sendRtpItem = new SendRtpInfo(); - sendRtpItem.setApp("talk"); - sendRtpItem.setStream(stream); - sendRtpItem.setSsrc(playSsrc); - sendRtpItem.setDeviceId(device.getDeviceId()); - sendRtpItem.setPlatformId(device.getDeviceId()); - sendRtpItem.setChannelId(channel.getId()); - sendRtpItem.setRtcp(false); - sendRtpItem.setMediaServerId(mediaServerItem.getId()); - sendRtpItem.setOnlyAudio(true); - sendRtpItem.setPlayType(InviteStreamType.TALK); - sendRtpItem.setPt(8); - sendRtpItem.setStatus(1); - sendRtpItem.setTcpActive(false); - sendRtpItem.setTcp(true); - sendRtpItem.setUsePs(false); - sendRtpItem.setReceiveStream(stream + "_talk"); - - String callId = SipUtils.getNewCallId(); - int port = sendRtpPortManager.getNextPort(mediaServerItem); - //端口获取失败的ssrcInfo 没有必要发送点播指令 - if (port <= 0) { - log.info("[语音对讲] 端口分配异常,deviceId={},channelId={}", device.getDeviceId(), channel.getDeviceId()); - audioEvent.call("端口分配异常"); + SendRtpInfo sendRtpInfo; + try { + sendRtpInfo = sendRtpServerService.createSendRtpInfo(mediaServerItem, null, null, playSsrc, device.getDeviceId(), "talk", stream, + channel.getId(), true, false); + }catch (PlayException e) { + log.info("[语音对讲]开始 获取发流端口失败 deviceId: {}, channelId: {},", device.getDeviceId(), channel.getDeviceId()); return; } - sendRtpItem.setLocalPort(port); - sendRtpItem.setPort(port); - log.info("[语音对讲]开始 deviceId: {}, channelId: {},收流端口: {}, 收流模式:{}, SSRC: {}, SSRC校验:{}", device.getDeviceId(), channel.getDeviceId(), sendRtpItem.getLocalPort(), device.getStreamMode(), sendRtpItem.getSsrc(), false); + + + sendRtpInfo.setOnlyAudio(true); + sendRtpInfo.setPt(8); + sendRtpInfo.setStatus(1); + sendRtpInfo.setTcpActive(false); + sendRtpInfo.setUsePs(false); + sendRtpInfo.setReceiveStream(stream + "_talk"); + + String callId = SipUtils.getNewCallId(); + log.info("[语音对讲]开始 deviceId: {}, channelId: {},收流端口: {}, 收流模式:{}, SSRC: {}, SSRC校验:{}", device.getDeviceId(), channel.getDeviceId(), sendRtpInfo.getLocalPort(), device.getStreamMode(), sendRtpInfo.getSsrc(), false); // 超时处理 String timeOutTaskKey = UUID.randomUUID().toString(); dynamicTask.startDelay(timeOutTaskKey, () -> { - log.info("[语音对讲] 收流超时 deviceId: {}, channelId: {},端口:{}, SSRC: {}", device.getDeviceId(), channel.getDeviceId(), sendRtpItem.getPort(), sendRtpItem.getSsrc()); + log.info("[语音对讲] 收流超时 deviceId: {}, channelId: {},端口:{}, SSRC: {}", device.getDeviceId(), channel.getDeviceId(), sendRtpInfo.getPort(), sendRtpInfo.getSsrc()); timeoutCallback.run(); // 点播超时回复BYE 同时释放ssrc以及此次点播的资源 try { - cmder.streamByeCmd(device, channel.getDeviceId(), sendRtpItem.getStream(), null); + cmder.streamByeCmd(device, channel.getDeviceId(), stream, null); } catch (InvalidArgumentException | ParseException | SipException | SsrcTransactionNotFoundException e) { log.error("[语音对讲]超时, 发送BYE失败 {}", e.getMessage()); } finally { timeoutCallback.run(); - mediaServerService.releaseSsrc(mediaServerItem.getId(), sendRtpItem.getSsrc()); - sessionManager.removeByStream(sendRtpItem.getStream()); + mediaServerService.releaseSsrc(mediaServerItem.getId(), sendRtpInfo.getSsrc()); + sessionManager.removeByStream(sendRtpInfo.getStream()); } }, userSetting.getPlayTimeout()); try { - mediaServerService.startSendRtpPassive(mediaServerItem, sendRtpItem, userSetting.getPlayTimeout() * 1000); + mediaServerService.startSendRtpPassive(mediaServerItem, sendRtpInfo, userSetting.getPlayTimeout() * 1000); }catch (ControllerException e) { - mediaServerService.releaseSsrc(mediaServerItem.getId(), sendRtpItem.getSsrc()); + mediaServerService.releaseSsrc(mediaServerItem.getId(), sendRtpInfo.getSsrc()); log.info("[语音对讲]失败 deviceId: {}, channelId: {}", device.getDeviceId(), channel.getDeviceId()); audioEvent.call("失败, " + e.getMessage()); // 查看是否已经建立了通道,存在则发送bye @@ -422,7 +416,7 @@ public class PlayServiceImpl implements IPlayService { // 查看设备是否已经在推流 try { - cmder.talkStreamCmd(mediaServerItem, sendRtpItem, device, channel, callId, (hookData) -> { + cmder.talkStreamCmd(mediaServerItem, sendRtpInfo, device, channel, callId, (hookData) -> { log.info("[语音对讲] 流已生成, 开始推流: " + hookData); dynamicTask.stop(timeOutTaskKey); // TODO 暂不做处理 @@ -437,13 +431,13 @@ public class PlayServiceImpl implements IPlayService { ResponseEvent responseEvent = (ResponseEvent) event.event; if (responseEvent.getResponse() instanceof SIPResponse) { SIPResponse response = (SIPResponse) responseEvent.getResponse(); - sendRtpItem.setFromTag(response.getFromTag()); - sendRtpItem.setToTag(response.getToTag()); - sendRtpItem.setCallId(response.getCallIdHeader().getCallId()); - redisCatchStorage.updateSendRTPSever(sendRtpItem); + sendRtpInfo.setFromTag(response.getFromTag()); + sendRtpInfo.setToTag(response.getToTag()); + sendRtpInfo.setCallId(response.getCallIdHeader().getCallId()); + sendRtpServerService.update(sendRtpInfo); - SsrcTransaction ssrcTransaction = SsrcTransaction.buildForDevice(device.getDeviceId(), sendRtpItem.getChannelId(), "talk", - sendRtpItem.getStream(), sendRtpItem.getSsrc(), sendRtpItem.getMediaServerId(), + SsrcTransaction ssrcTransaction = SsrcTransaction.buildForDevice(device.getDeviceId(), sendRtpInfo.getChannelId(), "talk", + sendRtpInfo.getStream(), sendRtpInfo.getSsrc(), sendRtpInfo.getMediaServerId(), response, InviteSessionType.TALK); sessionManager.put(ssrcTransaction); @@ -456,21 +450,21 @@ public class PlayServiceImpl implements IPlayService { }, (event) -> { dynamicTask.stop(timeOutTaskKey); - mediaServerService.closeRTPServer(mediaServerItem, sendRtpItem.getStream()); + mediaServerService.closeRTPServer(mediaServerItem, sendRtpInfo.getStream()); // 释放ssrc - mediaServerService.releaseSsrc(mediaServerItem.getId(), sendRtpItem.getSsrc()); - sessionManager.removeByStream(sendRtpItem.getStream()); + mediaServerService.releaseSsrc(mediaServerItem.getId(), sendRtpInfo.getSsrc()); + sessionManager.removeByStream(sendRtpInfo.getStream()); errorEvent.response(event); }); } catch (InvalidArgumentException | SipException | ParseException e) { log.error("[命令发送失败] 对讲消息: {}", e.getMessage()); dynamicTask.stop(timeOutTaskKey); - mediaServerService.closeRTPServer(mediaServerItem, sendRtpItem.getStream()); + mediaServerService.closeRTPServer(mediaServerItem, sendRtpInfo.getStream()); // 释放ssrc - mediaServerService.releaseSsrc(mediaServerItem.getId(), sendRtpItem.getSsrc()); + mediaServerService.releaseSsrc(mediaServerItem.getId(), sendRtpInfo.getSsrc()); - sessionManager.removeByStream(sendRtpItem.getStream()); + sessionManager.removeByStream(sendRtpInfo.getStream()); SipSubscribe.EventResult eventResult = new SipSubscribe.EventResult(); eventResult.type = SipSubscribe.EventResultType.cmdSendFailEvent; eventResult.statusCode = -1; @@ -1138,14 +1132,14 @@ public class PlayServiceImpl implements IPlayService { @Override public void zlmServerOffline(String mediaServerId) { // 处理正在向上推流的上级平台 - List sendRtpItems = redisCatchStorage.querySendRTPServer(null); - if (sendRtpItems.size() > 0) { - for (SendRtpInfo sendRtpItem : sendRtpItems) { - if (sendRtpItem.getMediaServerId().equals(mediaServerId)) { - Platform platform = platformService.queryPlatformByServerGBId(sendRtpItem.getPlatformId()); - CommonGBChannel channel = channelService.getOne(sendRtpItem.getChannelId()); + List sendRtpInfos = sendRtpServerService.queryAll(); + if (!sendRtpInfos.isEmpty()) { + for (SendRtpInfo sendRtpInfo : sendRtpInfos) { + if (sendRtpInfo.getMediaServerId().equals(mediaServerId) && sendRtpInfo.isSendToPlatform()) { + Platform platform = platformService.queryPlatformByServerGBId(sendRtpInfo.getTargetId()); + CommonGBChannel channel = channelService.getOne(sendRtpInfo.getChannelId()); try { - sipCommanderFroPlatform.streamByeCmd(platform, sendRtpItem, channel); + sipCommanderFroPlatform.streamByeCmd(platform, sendRtpInfo, channel); } catch (SipException | InvalidArgumentException | ParseException e) { log.error("[命令发送失败] 国标级联 发送BYE: {}", e.getMessage()); } @@ -1216,10 +1210,10 @@ public class PlayServiceImpl implements IPlayService { } // 查询通道使用状态 if (audioBroadcastManager.exit(deviceChannel.getId())) { - SendRtpInfo sendRtpItem = redisCatchStorage.querySendRTPServer(device.getDeviceId(), channel.getDeviceId(), null, null); - if (sendRtpItem != null && sendRtpItem.isOnlyAudio()) { + SendRtpInfo sendRtpInfo = sendRtpServerService.queryByChannelId(channel.getId(), device.getDeviceId()); + if (sendRtpInfo != null && sendRtpInfo.isOnlyAudio()) { // 查询流是否存在,不存在则认为是异常状态 - Boolean streamReady = mediaServerService.isStreamReady(mediaServerItem, sendRtpItem.getApp(), sendRtpItem.getStream()); + Boolean streamReady = mediaServerService.isStreamReady(mediaServerItem, sendRtpInfo.getApp(), sendRtpInfo.getStream()); if (streamReady) { log.warn("语音广播已经开启: {}", channel.getDeviceId()); event.call("语音广播已经开启"); @@ -1256,11 +1250,11 @@ public class PlayServiceImpl implements IPlayService { @Override public boolean audioBroadcastInUse(Device device, DeviceChannel channel) { if (audioBroadcastManager.exit(channel.getId())) { - SendRtpInfo sendRtpItem = redisCatchStorage.querySendRTPServer(device.getDeviceId(), channel.getDeviceId(), null, null); - if (sendRtpItem != null && sendRtpItem.isOnlyAudio()) { + SendRtpInfo sendRtpInfo = sendRtpServerService.queryByChannelId(channel.getId(), device.getDeviceId()); + if (sendRtpInfo != null && sendRtpInfo.isOnlyAudio()) { // 查询流是否存在,不存在则认为是异常状态 - MediaServer mediaServerServiceOne = mediaServerService.getOne(sendRtpItem.getMediaServerId()); - Boolean streamReady = mediaServerService.isStreamReady(mediaServerServiceOne, sendRtpItem.getApp(), sendRtpItem.getStream()); + MediaServer mediaServerServiceOne = mediaServerService.getOne(sendRtpInfo.getMediaServerId()); + Boolean streamReady = mediaServerService.isStreamReady(mediaServerServiceOne, sendRtpInfo.getApp(), sendRtpInfo.getStream()); if (streamReady) { log.warn("语音广播通道使用中: {}", channel.getDeviceId()); return true; @@ -1285,11 +1279,11 @@ public class PlayServiceImpl implements IPlayService { if (audioBroadcastCatch == null) { continue; } - SendRtpInfo sendRtpItem = redisCatchStorage.querySendRTPServer(device.getDeviceId(), channel.getDeviceId(), null, null); - if (sendRtpItem != null) { - redisCatchStorage.deleteSendRTPServer(device.getDeviceId(), channel.getDeviceId(), null, null); - MediaServer mediaServer = mediaServerService.getOne(sendRtpItem.getMediaServerId()); - mediaServerService.stopSendRtp(mediaServer, sendRtpItem.getApp(), sendRtpItem.getStream(), null); + SendRtpInfo sendRtpInfo = sendRtpServerService.queryByChannelId(channel.getId(), device.getDeviceId()); + if (sendRtpInfo != null) { + sendRtpServerService.delete(sendRtpInfo); + MediaServer mediaServer = mediaServerService.getOne(sendRtpInfo.getMediaServerId()); + mediaServerService.stopSendRtp(mediaServer, sendRtpInfo.getApp(), sendRtpInfo.getStream(), null); try { cmder.streamByeCmdForDeviceInvite(device, channel.getDeviceId(), audioBroadcastCatch.getSipTransactionInfo(), null); } catch (InvalidArgumentException | ParseException | SipException | @@ -1305,54 +1299,6 @@ public class PlayServiceImpl implements IPlayService { @Override public void zlmServerOnline(String mediaServerId) { - // TODO 查找之前的点播,流如果不存在则给下级发送bye -// MediaServerItem mediaServerItem = mediaServerService.getOne(mediaServerId); -// zlmresTfulUtils.getMediaList(mediaServerItem, (mediaList ->{ -// Integer code = mediaList.getInteger("code"); -// if (code == 0) { -// JSONArray data = mediaList.getJSONArray("data"); -// if (data == null || data.size() == 0) { -// zlmServerOffline(mediaServerId); -// }else { -// Map mediaListMap = new HashMap<>(); -// for (int i = 0; i < data.size(); i++) { -// JSONObject json = data.getJSONObject(i); -// String app = json.getString("app"); -// if ("rtp".equals(app)) { -// String stream = json.getString("stream"); -// if (mediaListMap.get(stream) != null) { -// continue; -// } -// mediaListMap.put(stream, json); -// // 处理正在观看的国标设备 -// List ssrcTransactions = streamSession.getSsrcTransactionForAll(null, null, null, stream); -// if (ssrcTransactions.size() > 0) { -// for (SsrcTransaction ssrcTransaction : ssrcTransactions) { -// if(ssrcTransaction.getMediaServerId().equals(mediaServerId)) { -// cmder.streamByeCmd(ssrcTransaction.getDeviceId(), ssrcTransaction.getChannelId(), -// ssrcTransaction.getStream(), null); -// } -// } -// } -// } -// } -// if (mediaListMap.size() > 0 ) { -// // 处理正在向上推流的上级平台 -// List sendRtpItems = redisCatchStorage.querySendRTPServer(null); -// if (sendRtpItems.size() > 0) { -// for (SendRtpItem sendRtpItem : sendRtpItems) { -// if (sendRtpItem.getMediaServerId().equals(mediaServerId)) { -// if (mediaListMap.get(sendRtpItem.getStreamId()) == null) { -// ParentPlatform platform = storager.queryPlatformByServerGBId(sendRtpItem.getPlatformId()); -// sipCommanderFroPlatform.streamByeCmd(platform, sendRtpItem.getCallId()); -// } -// } -// } -// } -// } -// } -// } -// })); } @Override @@ -1414,36 +1360,36 @@ public class PlayServiceImpl implements IPlayService { } @Override - public void startPushStream(SendRtpInfo sendRtpItem, DeviceChannel channel, SIPResponse sipResponse, Platform platform, CallIdHeader callIdHeader) { + public void startPushStream(SendRtpInfo sendRtpInfo, DeviceChannel channel, SIPResponse sipResponse, Platform platform, CallIdHeader callIdHeader) { // 开始发流 - MediaServer mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId()); + MediaServer mediaInfo = mediaServerService.getOne(sendRtpInfo.getMediaServerId()); if (mediaInfo != null) { try { - if (sendRtpItem.isTcpActive()) { - mediaServerService.startSendRtpPassive(mediaInfo, sendRtpItem, null); + if (sendRtpInfo.isTcpActive()) { + mediaServerService.startSendRtpPassive(mediaInfo, sendRtpInfo, null); } else { - mediaServerService.startSendRtp(mediaInfo, sendRtpItem); + mediaServerService.startSendRtp(mediaInfo, sendRtpInfo); } - redisCatchStorage.sendPlatformStartPlayMsg(sendRtpItem, channel, platform); + redisCatchStorage.sendPlatformStartPlayMsg(sendRtpInfo, channel, platform); }catch (ControllerException e) { log.error("RTP推流失败: {}", e.getMessage()); - startSendRtpStreamFailHand(sendRtpItem, platform, callIdHeader); + startSendRtpStreamFailHand(sendRtpInfo, platform, callIdHeader); return; } - log.info("RTP推流成功[ {}/{} ],{}, ", sendRtpItem.getApp(), sendRtpItem.getStream(), - sendRtpItem.isTcpActive()?"被动发流": sendRtpItem.getIp() + ":" + sendRtpItem.getPort()); + log.info("RTP推流成功[ {}/{} ],{}, ", sendRtpInfo.getApp(), sendRtpInfo.getStream(), + sendRtpInfo.isTcpActive()?"被动发流": sendRtpInfo.getIp() + ":" + sendRtpInfo.getPort()); } } @Override - public void startSendRtpStreamFailHand(SendRtpInfo sendRtpItem, Platform platform, CallIdHeader callIdHeader) { - if (sendRtpItem.isOnlyAudio()) { - Device device = deviceService.getDeviceByDeviceId(sendRtpItem.getDeviceId()); - DeviceChannel deviceChannel = deviceChannelService.getOneById(sendRtpItem.getChannelId()); - AudioBroadcastCatch audioBroadcastCatch = audioBroadcastManager.get(sendRtpItem.getChannelId()); + public void startSendRtpStreamFailHand(SendRtpInfo sendRtpInfo, Platform platform, CallIdHeader callIdHeader) { + if (sendRtpInfo.isOnlyAudio()) { + Device device = deviceService.getDeviceByDeviceId(sendRtpInfo.getTargetId()); + DeviceChannel deviceChannel = deviceChannelService.getOneById(sendRtpInfo.getChannelId()); + AudioBroadcastCatch audioBroadcastCatch = audioBroadcastManager.get(sendRtpInfo.getChannelId()); if (audioBroadcastCatch != null) { try { cmder.streamByeCmd(device, deviceChannel.getDeviceId(), audioBroadcastCatch.getSipTransactionInfo(), null); @@ -1455,9 +1401,9 @@ public class PlayServiceImpl implements IPlayService { } else { if (platform != null) { // 向上级平台 - CommonGBChannel channel = channelService.getOne(sendRtpItem.getChannelId()); + CommonGBChannel channel = channelService.getOne(sendRtpInfo.getChannelId()); try { - commanderForPlatform.streamByeCmd(platform, sendRtpItem, channel); + commanderForPlatform.streamByeCmd(platform, sendRtpInfo, channel); } catch (SipException | InvalidArgumentException | ParseException e) { log.error("[命令发送失败] 国标级联 发送BYE: {}", e.getMessage()); } @@ -1475,11 +1421,11 @@ public class PlayServiceImpl implements IPlayService { log.info("[语音对讲] device: {}, channel: {}", device.getDeviceId(), channel.getDeviceId()); // 查询通道使用状态 if (audioBroadcastManager.exit(channel.getId())) { - SendRtpInfo sendRtpItem = redisCatchStorage.querySendRTPServer(device.getDeviceId(), channel.getDeviceId(), null, null); - if (sendRtpItem != null && sendRtpItem.isOnlyAudio()) { + SendRtpInfo sendRtpInfo = sendRtpServerService.queryByChannelId(channel.getId(), device.getDeviceId()); + if (sendRtpInfo != null && sendRtpInfo.isOnlyAudio()) { // 查询流是否存在,不存在则认为是异常状态 - MediaServer mediaServer = mediaServerService.getOne(sendRtpItem.getMediaServerId()); - Boolean streamReady = mediaServerService.isStreamReady(mediaServer, sendRtpItem.getApp(), sendRtpItem.getStream()); + MediaServer mediaServer = mediaServerService.getOne(sendRtpInfo.getMediaServerId()); + Boolean streamReady = mediaServerService.isStreamReady(mediaServer, sendRtpInfo.getApp(), sendRtpInfo.getStream()); if (streamReady) { log.warn("[语音对讲] 正在语音广播,无法开启语音通话: {}", channel.getDeviceId()); event.call("正在语音广播"); @@ -1490,10 +1436,10 @@ public class PlayServiceImpl implements IPlayService { } } - SendRtpInfo sendRtpItem = redisCatchStorage.querySendRTPServer(device.getDeviceId(), channel.getDeviceId(), stream, null); - if (sendRtpItem != null) { - MediaServer mediaServer = mediaServerService.getOne(sendRtpItem.getMediaServerId()); - Boolean streamReady = mediaServerService.isStreamReady(mediaServer, "rtp", sendRtpItem.getReceiveStream()); + SendRtpInfo sendRtpInfo = sendRtpServerService.queryByChannelId(channel.getId(), device.getDeviceId()); + if (sendRtpInfo != null) { + MediaServer mediaServer = mediaServerService.getOne(sendRtpInfo.getMediaServerId()); + Boolean streamReady = mediaServerService.isStreamReady(mediaServer, "rtp", sendRtpInfo.getReceiveStream()); if (streamReady) { log.warn("[语音对讲] 进行中: {}", channel.getDeviceId()); event.call("语音对讲进行中"); @@ -1526,13 +1472,13 @@ public class PlayServiceImpl implements IPlayService { @Override public void stopTalk(Device device, DeviceChannel channel, Boolean streamIsReady) { log.info("[语音对讲] 停止, {}/{}", device.getDeviceId(), channel.getDeviceId()); - SendRtpInfo sendRtpItem = redisCatchStorage.querySendRTPServer(device.getDeviceId(), channel.getDeviceId(), null, null); - if (sendRtpItem == null) { + SendRtpInfo sendRtpInfo = sendRtpServerService.queryByChannelId(channel.getId(), device.getDeviceId()); + if (sendRtpInfo == null) { log.info("[语音对讲] 停止失败, 未找到发送信息,可能已经停止"); return; } // 停止向设备推流 - String mediaServerId = sendRtpItem.getMediaServerId(); + String mediaServerId = sendRtpInfo.getMediaServerId(); if (mediaServerId == null) { return; } @@ -1540,20 +1486,20 @@ public class PlayServiceImpl implements IPlayService { MediaServer mediaServer = mediaServerService.getOne(mediaServerId); if (streamIsReady == null || streamIsReady) { - mediaServerService.stopSendRtp(mediaServer, sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getSsrc()); + mediaServerService.stopSendRtp(mediaServer, sendRtpInfo.getApp(), sendRtpInfo.getStream(), sendRtpInfo.getSsrc()); } - ssrcFactory.releaseSsrc(mediaServerId, sendRtpItem.getSsrc()); + ssrcFactory.releaseSsrc(mediaServerId, sendRtpInfo.getSsrc()); - SsrcTransaction ssrcTransaction = sessionManager.getSsrcTransactionByStream(sendRtpItem.getStream()); + SsrcTransaction ssrcTransaction = sessionManager.getSsrcTransactionByStream(sendRtpInfo.getStream()); if (ssrcTransaction != null) { try { - cmder.streamByeCmd(device, channel.getDeviceId(), sendRtpItem.getStream(), null); + cmder.streamByeCmd(device, channel.getDeviceId(), sendRtpInfo.getStream(), null); } catch (InvalidArgumentException | ParseException | SipException | SsrcTransactionNotFoundException e) { log.info("[语音对讲] 停止消息发送失败,可能已经停止"); } } - redisCatchStorage.deleteSendRTPServer(device.getDeviceId(), channel.getDeviceId(),null, null); + sendRtpServerService.deleteByChannel(channel.getId(), device.getDeviceId()); } @Override diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/task/SipRunner.java b/src/main/java/com/genersoft/iot/vmp/gb28181/task/SipRunner.java index 27e5f560..102f9d0c 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/task/SipRunner.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/task/SipRunner.java @@ -4,15 +4,14 @@ import com.genersoft.iot.vmp.gb28181.bean.CommonGBChannel; import com.genersoft.iot.vmp.gb28181.bean.Device; import com.genersoft.iot.vmp.gb28181.bean.Platform; import com.genersoft.iot.vmp.gb28181.bean.SendRtpInfo; +import com.genersoft.iot.vmp.gb28181.service.IDeviceService; import com.genersoft.iot.vmp.gb28181.service.IGbChannelService; +import com.genersoft.iot.vmp.gb28181.service.IPlatformService; import com.genersoft.iot.vmp.gb28181.session.SSRCFactory; import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform; import com.genersoft.iot.vmp.media.bean.MediaServer; import com.genersoft.iot.vmp.media.service.IMediaServerService; -import com.genersoft.iot.vmp.gb28181.service.IDeviceService; -import com.genersoft.iot.vmp.gb28181.service.IPlatformService; import com.genersoft.iot.vmp.service.ISendRtpServerService; -import com.genersoft.iot.vmp.service.impl.SendRtpServerServiceImpl; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; @@ -109,7 +108,7 @@ public class SipRunner implements CommandLineRunner { ssrcFactory.releaseSsrc(sendRtpItem.getMediaServerId(), sendRtpItem.getSsrc()); boolean stopResult = mediaServerService.initStopSendRtp(mediaServerItem, sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getSsrc()); if (stopResult) { - Platform platform = platformService.queryPlatformByServerGBId(sendRtpItem.getPlatformId()); + Platform platform = platformService.queryPlatformByServerGBId(sendRtpItem.getTargetId()); if (platform != null) { try { diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java index ede7dbf4..ab2bc9fc 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java @@ -10,7 +10,6 @@ import com.genersoft.iot.vmp.gb28181.session.AudioBroadcastManager; import com.genersoft.iot.vmp.gb28181.session.SipInviteSessionManager; import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver; import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander; -import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform; import com.genersoft.iot.vmp.gb28181.transmit.event.request.ISIPRequestProcessor; import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent; import com.genersoft.iot.vmp.media.bean.MediaInfo; @@ -118,14 +117,14 @@ public class ByeRequestProcessor extends SIPRequestProcessorParent implements In // 收流端发送的停止 if (sendRtpItem != null){ CommonGBChannel channel = channelService.getOne(sendRtpItem.getChannelId()); - log.info("[收到bye] 来自{},停止通道:{}, 类型: {}, callId: {}", sendRtpItem.getPlatformId(), channel.getGbDeviceId(), sendRtpItem.getPlayType(), callIdHeader.getCallId()); + log.info("[收到bye] 来自{},停止通道:{}, 类型: {}, callId: {}", sendRtpItem.getTargetId(), channel.getGbDeviceId(), sendRtpItem.getPlayType(), callIdHeader.getCallId()); String streamId = sendRtpItem.getStream(); log.info("[收到bye] 停止推流:{}, 媒体节点: {}", streamId, sendRtpItem.getMediaServerId()); if (sendRtpItem.getPlayType().equals(InviteStreamType.PUSH)) { // 不是本平台的就发送redis消息让其他wvp停止发流 - Platform platform = platformService.queryPlatformByServerGBId(sendRtpItem.getPlatformId()); + Platform platform = platformService.queryPlatformByServerGBId(sendRtpItem.getTargetId()); if (platform != null) { redisCatchStorage.sendPlatformStopPlayMsg(sendRtpItem, platform, channel); if (!userSetting.getServerId().equals(sendRtpItem.getServerId())) { @@ -140,7 +139,7 @@ public class ByeRequestProcessor extends SIPRequestProcessorParent implements In } } }else { - log.info("[上级平台停止观看] 未找到平台{}的信息,发送redis消息失败", sendRtpItem.getPlatformId()); + log.info("[上级平台停止观看] 未找到平台{}的信息,发送redis消息失败", sendRtpItem.getTargetId()); } }else { MediaServer mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId()); @@ -155,7 +154,7 @@ public class ByeRequestProcessor extends SIPRequestProcessorParent implements In AudioBroadcastCatch audioBroadcastCatch = audioBroadcastManager.get(sendRtpItem.getChannelId()); if (audioBroadcastCatch != null && audioBroadcastCatch.getSipTransactionInfo().getCallId().equals(callIdHeader.getCallId())) { // 来自上级平台的停止对讲 - log.info("[停止对讲] 来自上级,平台:{}, 通道:{}", sendRtpItem.getDeviceId(), sendRtpItem.getChannelId()); + log.info("[停止对讲] 来自上级,平台:{}, 通道:{}", sendRtpItem.getTargetId(), sendRtpItem.getChannelId()); audioBroadcastManager.del(sendRtpItem.getChannelId()); } @@ -164,7 +163,7 @@ public class ByeRequestProcessor extends SIPRequestProcessorParent implements In if (mediaInfo.getReaderCount() <= 0) { log.info("[收到bye] {} 无其它观看者,通知设备停止推流", streamId); if (sendRtpItem.getPlayType().equals(InviteStreamType.PLAY)) { - Device device = deviceService.getDeviceByDeviceId(sendRtpItem.getDeviceId()); + Device device = deviceService.getDeviceByDeviceId(sendRtpItem.getTargetId()); if (device == null) { log.info("[收到bye] {} 通知设备停止推流时未找到设备信息", streamId); return; @@ -175,7 +174,7 @@ public class ByeRequestProcessor extends SIPRequestProcessorParent implements In return; } try { - log.info("[停止点播] {}/{}", sendRtpItem.getDeviceId(), sendRtpItem.getChannelId()); + log.info("[停止点播] {}/{}", sendRtpItem.getTargetId(), sendRtpItem.getChannelId()); cmder.streamByeCmd(device, deviceChannel.getDeviceId(), streamId, null); } catch (InvalidArgumentException | ParseException | SipException | SsrcTransactionNotFoundException e) { diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java index d8973dc3..f4d01f5b 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java @@ -25,7 +25,6 @@ import com.genersoft.iot.vmp.service.bean.InviteErrorCode; import com.genersoft.iot.vmp.service.redisMsg.RedisPushStreamResponseListener; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.IVideoManagerStorage; -import com.genersoft.iot.vmp.streamProxy.service.IStreamProxyService; import gov.nist.javax.sdp.TimeDescriptionImpl; import gov.nist.javax.sdp.fields.TimeField; import gov.nist.javax.sdp.fields.URIField; @@ -893,7 +892,6 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements sendRtpItem.setPlayType(InviteStreamType.BROADCAST); sendRtpItem.setCallId(callIdHeader.getCallId()); - sendRtpItem.setPlatformId(requesterId); sendRtpItem.setStatus(1); sendRtpItem.setApp(broadcastCatch.getApp()); sendRtpItem.setStream(broadcastCatch.getStream()); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/BroadcastNotifyMessageHandler.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/BroadcastNotifyMessageHandler.java index f2276072..b8ad279b 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/BroadcastNotifyMessageHandler.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/BroadcastNotifyMessageHandler.java @@ -151,7 +151,7 @@ public class BroadcastNotifyMessageHandler extends SIPRequestProcessorParent imp broadcastCatch.setMediaServerItem(hookData.getMediaServer()); audioBroadcastManager.update(broadcastCatch); // 推流到设备 - SendRtpInfo sendRtpItem = sendRtpServerService.queryByStream(null, targetId, hookData.getStream(), null); + SendRtpInfo sendRtpItem = sendRtpServerService.queryByStream(hookData.getStream(), targetId); if (sendRtpItem == null) { log.warn("[国标级联] 语音喊话 异常,未找到发流信息, channelId: {}, stream: {}", targetId, hookData.getStream()); log.info("[国标级联] 语音喊话 重新开始,channelId: {}, stream: {}", targetId, hookData.getStream()); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/MediaStatusNotifyMessageHandler.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/MediaStatusNotifyMessageHandler.java index 5e06c65b..fd37b94f 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/MediaStatusNotifyMessageHandler.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/MediaStatusNotifyMessageHandler.java @@ -114,11 +114,11 @@ public class MediaStatusNotifyMessageHandler extends SIPRequestProcessorParent i Hook hook = Hook.getInstance(HookType.on_media_arrival, "rtp", ssrcTransaction.getStream(), ssrcTransaction.getMediaServerId()); subscribe.removeSubscribe(hook); // 如果级联播放,需要给上级发送此通知 TODO 多个上级同时观看一个下级 可能存在停错的问题,需要将点播CallId进行上下级绑定 - SendRtpInfo sendRtpItem = sendRtpServerService.queryByChannelId(ssrcTransaction.getChannelId()); + SendRtpInfo sendRtpItem = sendRtpServerService.queryByChannelId(ssrcTransaction.getChannelId(), ssrcTransaction.getPlatformId()); if (sendRtpItem != null) { - Platform parentPlatform = platformService.queryPlatformByServerGBId(sendRtpItem.getPlatformId()); + Platform parentPlatform = platformService.queryPlatformByServerGBId(sendRtpItem.getTargetId()); if (parentPlatform == null) { - log.warn("[级联消息发送]:发送MediaStatus发现上级平台{}不存在", sendRtpItem.getPlatformId()); + log.warn("[级联消息发送]:发送MediaStatus发现上级平台{}不存在", sendRtpItem.getTargetId()); return; } try { diff --git a/src/main/java/com/genersoft/iot/vmp/service/ISendRtpServerService.java b/src/main/java/com/genersoft/iot/vmp/service/ISendRtpServerService.java index 7c3590c8..8b05d3a4 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/ISendRtpServerService.java +++ b/src/main/java/com/genersoft/iot/vmp/service/ISendRtpServerService.java @@ -7,29 +7,37 @@ import java.util.List; public interface ISendRtpServerService { - SendRtpInfo createSendRtpInfo(MediaServer mediaServer, String ip, int port, String ssrc, String requesterId, - String deviceId, Integer channelId, boolean isTcp, boolean rtcp); + SendRtpInfo createSendRtpInfo(MediaServer mediaServer, String ip, Integer port, String ssrc, String requesterId, + String deviceId, Integer channelId, Boolean isTcp, Boolean rtcp); - SendRtpInfo createSendRtpInfo(MediaServer mediaServer, String ip, int port, String ssrc, String platformId, - String app, String stream, Integer channelId, boolean tcp, boolean rtcp); + SendRtpInfo createSendRtpInfo(MediaServer mediaServer, String ip, Integer port, String ssrc, String platformId, + String app, String stream, Integer channelId, Boolean tcp, Boolean rtcp); void update(SendRtpInfo sendRtpItem); - SendRtpInfo queryByChannelId(Integer channelId); + SendRtpInfo queryByChannelId(Integer channelId, String targetId); SendRtpInfo queryByCallId(String callId); - SendRtpInfo queryByStream(String stream); + List queryByStream(String stream); + + SendRtpInfo queryByStream(String stream, String targetId); void delete(SendRtpInfo sendRtpInfo); void deleteByCallId(String callId); - void deleteByStream(String Stream); + void deleteByStream(String Stream, String targetId); - void deleteByChannel(Integer channelId); + void deleteByChannel(Integer channelId, String targetId); List queryAll(); boolean isChannelSendingRTP(Integer channelId); + + List queryForPlatform(String platformId); + + List queryByChannelId(int id); + + void deleteByStream(String stream); } diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServiceImpl.java index 9df65333..639a4e43 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServiceImpl.java @@ -15,6 +15,7 @@ import com.genersoft.iot.vmp.media.bean.MediaServer; import com.genersoft.iot.vmp.media.bean.ResultForOnPublish; import com.genersoft.iot.vmp.media.zlm.dto.StreamAuthorityInfo; import com.genersoft.iot.vmp.service.IMediaService; +import com.genersoft.iot.vmp.service.ISendRtpServerService; import com.genersoft.iot.vmp.service.IUserService; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.streamProxy.bean.StreamProxy; @@ -78,6 +79,9 @@ public class MediaServiceImpl implements IMediaService { @Autowired private ISIPCommander commander; + @Autowired + private ISendRtpServerService sendRtpServerService; + @Override public boolean authenticatePlay(String app, String stream, String callId) { if (app == null || stream == null) { @@ -234,11 +238,11 @@ public class MediaServiceImpl implements IMediaService { return false; } // 收到无人观看说明流也没有在往上级推送 - if (redisCatchStorage.isChannelSendingRTP(deviceChannel.getDeviceId())) { - List sendRtpItems = redisCatchStorage.querySendRTPServerByChannelId(deviceChannel.getDeviceId()); + if (sendRtpServerService.isChannelSendingRTP(deviceChannel.getId())) { + List sendRtpItems = sendRtpServerService.queryByChannelId(deviceChannel.getId()); if (!sendRtpItems.isEmpty()) { for (SendRtpInfo sendRtpItem : sendRtpItems) { - Platform parentPlatform = platformService.queryPlatformByServerGBId(sendRtpItem.getPlatformId()); + Platform parentPlatform = platformService.queryPlatformByServerGBId(sendRtpItem.getTargetId()); CommonGBChannel channel = channelService.getOne(sendRtpItem.getChannelId()); if (channel == null) { continue; @@ -248,8 +252,7 @@ public class MediaServiceImpl implements IMediaService { } catch (SipException | InvalidArgumentException | ParseException e) { log.error("[命令发送失败] 国标级联 发送BYE: {}", e.getMessage()); } - redisCatchStorage.deleteSendRTPServer(parentPlatform.getServerGBId(), channel.getGbDeviceId(), - sendRtpItem.getCallId(), sendRtpItem.getStream()); + sendRtpServerService.delete(sendRtpItem); if (InviteStreamType.PUSH == sendRtpItem.getPlayType()) { redisCatchStorage.sendPlatformStopPlayMsg(sendRtpItem, parentPlatform, channel); redisCatchStorage.sendPlatformStopPlayMsg(sendRtpItem, parentPlatform, channel); @@ -280,8 +283,8 @@ public class MediaServiceImpl implements IMediaService { deviceChannelService.stopPlay(inviteInfo.getChannelId()); return result; } - SendRtpInfo sendRtpItem = redisCatchStorage.querySendRTPServer(null, null, stream, null); - if (sendRtpItem != null && "talk".equals(sendRtpItem.getApp())) { + List sendRtpItemList = sendRtpServerService.queryByStream(stream); + if (!sendRtpItemList.isEmpty()) { return false; } } else if ("talk".equals(app) || "broadcast".equals(app)) { diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/SendRtpServerServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/SendRtpServerServiceImpl.java index d2975dc9..1a50c64f 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/SendRtpServerServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/SendRtpServerServiceImpl.java @@ -4,6 +4,7 @@ import com.genersoft.iot.vmp.common.VideoManagerConstants; import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.gb28181.bean.PlayException; import com.genersoft.iot.vmp.gb28181.bean.SendRtpInfo; +import com.genersoft.iot.vmp.gb28181.conf.StackLoggerImpl; import com.genersoft.iot.vmp.media.bean.MediaServer; import com.genersoft.iot.vmp.media.zlm.SendRtpPortManager; import com.genersoft.iot.vmp.service.ISendRtpServerService; @@ -29,10 +30,12 @@ public class SendRtpServerServiceImpl implements ISendRtpServerService { @Autowired private RedisTemplate redisTemplate; + @Autowired + private StackLoggerImpl stackLoggerImpl; @Override - public SendRtpInfo createSendRtpInfo(MediaServer mediaServer, String ip, int port, String ssrc, String requesterId, - String deviceId, Integer channelId, boolean isTcp, boolean rtcp) { + public SendRtpInfo createSendRtpInfo(MediaServer mediaServer, String ip, Integer port, String ssrc, String requesterId, + String deviceId, Integer channelId, Boolean isTcp, Boolean rtcp) { int localPort = sendRtpPortManager.getNextPort(mediaServer); if (localPort == 0) { return null; @@ -42,8 +45,8 @@ public class SendRtpServerServiceImpl implements ISendRtpServerService { } @Override - public SendRtpInfo createSendRtpInfo(MediaServer mediaServer, String ip, int port, String ssrc, String platformId, - String app, String stream, Integer channelId, boolean tcp, boolean rtcp){ + public SendRtpInfo createSendRtpInfo(MediaServer mediaServer, String ip, Integer port, String ssrc, String platformId, + String app, String stream, Integer channelId, Boolean tcp, Boolean rtcp){ int localPort = sendRtpPortManager.getNextPort(mediaServer); if (localPort <= 0) { @@ -56,13 +59,13 @@ public class SendRtpServerServiceImpl implements ISendRtpServerService { @Override public void update(SendRtpInfo sendRtpItem) { redisTemplate.opsForValue().set(VideoManagerConstants.SEND_RTP_INFO_CALLID + sendRtpItem.getCallId(), sendRtpItem); - redisTemplate.opsForValue().set(VideoManagerConstants.SEND_RTP_INFO_STREAM + sendRtpItem.getStream(), sendRtpItem); - redisTemplate.opsForValue().set(VideoManagerConstants.SEND_RTP_INFO_CHANNEL + sendRtpItem.getChannelId(), sendRtpItem); + redisTemplate.opsForValue().set(VideoManagerConstants.SEND_RTP_INFO_STREAM + sendRtpItem.getStream() + ":" + sendRtpItem.getTargetId(), sendRtpItem); + redisTemplate.opsForValue().set(VideoManagerConstants.SEND_RTP_INFO_CHANNEL + sendRtpItem.getChannelId() + ":" + sendRtpItem.getTargetId(), sendRtpItem); } @Override - public SendRtpInfo queryByChannelId(Integer channelId) { - String key = VideoManagerConstants.SEND_RTP_INFO_CHANNEL + channelId; + public SendRtpInfo queryByChannelId(Integer channelId, String targetId) { + String key = VideoManagerConstants.SEND_RTP_INFO_CHANNEL + channelId + ":" + targetId; return JsonUtil.redisJsonToObject(redisTemplate, key, SendRtpInfo.class); } @@ -74,10 +77,24 @@ public class SendRtpServerServiceImpl implements ISendRtpServerService { @Override public SendRtpInfo queryByStream(String stream, String targetId) { - String key = VideoManagerConstants.SEND_RTP_INFO_STREAM + stream; + String key = VideoManagerConstants.SEND_RTP_INFO_STREAM + stream + ":" + targetId; return JsonUtil.redisJsonToObject(redisTemplate, key, SendRtpInfo.class); } + @Override + public List queryByStream(String stream) { + String key = VideoManagerConstants.SEND_RTP_INFO_STREAM + stream + ":*"; + List queryResult = RedisUtil.scan(redisTemplate, key); + List result= new ArrayList<>(); + + for (Object o : queryResult) { + String keyItem = (String) o; + result.add((SendRtpInfo) redisTemplate.opsForValue().get(keyItem)); + } + + return result; + } + /** * 删除RTP推送信息缓存 */ @@ -87,8 +104,8 @@ public class SendRtpServerServiceImpl implements ISendRtpServerService { return; } redisTemplate.delete(VideoManagerConstants.SEND_RTP_INFO_CALLID + sendRtpInfo.getCallId()); - redisTemplate.delete(VideoManagerConstants.SEND_RTP_INFO_STREAM + sendRtpInfo.getStream()); - redisTemplate.delete(VideoManagerConstants.SEND_RTP_INFO_CHANNEL + sendRtpInfo.getChannelId()); + redisTemplate.delete(VideoManagerConstants.SEND_RTP_INFO_STREAM + sendRtpInfo.getStream() + ":" + sendRtpInfo.getTargetId()); + redisTemplate.delete(VideoManagerConstants.SEND_RTP_INFO_CHANNEL + sendRtpInfo.getChannelId() + ":" + sendRtpInfo.getTargetId()); } @Override public void deleteByCallId(String callId) { @@ -99,22 +116,45 @@ public class SendRtpServerServiceImpl implements ISendRtpServerService { delete(sendRtpInfo); } @Override - public void deleteByStream(String Stream) { - SendRtpInfo sendRtpInfo = queryByStream(Stream); + public void deleteByStream(String stream, String targetId) { + SendRtpInfo sendRtpInfo = queryByStream(stream, targetId); if (sendRtpInfo == null) { return; } delete(sendRtpInfo); } + @Override - public void deleteByChannel(Integer channelId) { - SendRtpInfo sendRtpInfo = queryByChannelId(channelId); + public void deleteByStream(String stream) { + List sendRtpInfos = queryByStream(stream); + for (SendRtpInfo sendRtpInfo : sendRtpInfos) { + delete(sendRtpInfo); + } + } + + @Override + public void deleteByChannel(Integer channelId, String targetId) { + SendRtpInfo sendRtpInfo = queryByChannelId(channelId, targetId); if (sendRtpInfo == null) { return; } delete(sendRtpInfo); } + @Override + public List queryByChannelId(int channelId) { + String key = VideoManagerConstants.SEND_RTP_INFO_CHANNEL + channelId + ":*"; + List queryResult = RedisUtil.scan(redisTemplate, key); + List result= new ArrayList<>(); + + for (Object o : queryResult) { + String keyItem = (String) o; + result.add((SendRtpInfo) redisTemplate.opsForValue().get(keyItem)); + } + + return result; + } + @Override public List queryAll() { String key = VideoManagerConstants.SEND_RTP_INFO_CALLID @@ -135,8 +175,16 @@ public class SendRtpServerServiceImpl implements ISendRtpServerService { */ @Override public boolean isChannelSendingRTP(Integer channelId) { - SendRtpInfo sendRtpInfo = queryByChannelId(channelId); - return sendRtpInfo != null; + List sendRtpInfoList = queryByChannelId(channelId); + return !sendRtpInfoList.isEmpty(); } + @Override + public List queryForPlatform(String platformId) { + List sendRtpInfos = queryAll(); + if (!sendRtpInfos.isEmpty()) { + sendRtpInfos.removeIf(sendRtpInfo -> !sendRtpInfo.isSendToPlatform() || !sendRtpInfo.getTargetId().equals(platformId)); + } + return sendRtpInfos; + } } diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/IRedisRpcService.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/IRedisRpcService.java index 29bb5cbf..31e16e5a 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/IRedisRpcService.java +++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/IRedisRpcService.java @@ -13,7 +13,7 @@ public interface IRedisRpcService { WVPResult stopSendRtp(Integer sendRtpChannelId); - long waitePushStreamOnline(SendRtpInfo sendRtpItem, CommonCallback callback); + long waitePushStreamOnline(SendRtpInfo sendRtpItem, CommonCallback callback); void stopWaitePushStreamOnline(SendRtpInfo sendRtpItem); diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/service/RedisRpcServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/service/RedisRpcServiceImpl.java index 1bf6d6d0..6131eadb 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/service/RedisRpcServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/service/RedisRpcServiceImpl.java @@ -13,6 +13,7 @@ import com.genersoft.iot.vmp.media.event.hook.Hook; import com.genersoft.iot.vmp.media.event.hook.HookSubscribe; import com.genersoft.iot.vmp.media.event.hook.HookType; import com.genersoft.iot.vmp.media.service.IMediaServerService; +import com.genersoft.iot.vmp.service.ISendRtpServerService; import com.genersoft.iot.vmp.service.redisMsg.IRedisRpcService; import com.genersoft.iot.vmp.vmanager.bean.ErrorCode; import com.genersoft.iot.vmp.vmanager.bean.WVPResult; @@ -25,6 +26,7 @@ import org.springframework.stereotype.Service; @Service public class RedisRpcServiceImpl implements IRedisRpcService { + @Autowired private RedisRpcConfig redisRpcConfig; @@ -40,10 +42,12 @@ public class RedisRpcServiceImpl implements IRedisRpcService { @Autowired private RedisTemplate redisTemplate; - @Autowired private IMediaServerService mediaServerService; + @Autowired + private ISendRtpServerService sendRtpServerService; + private RedisRpcRequest buildRequest(String uri, Object param) { RedisRpcRequest request = new RedisRpcRequest(); request.setFromId(userSetting.getServerId()); @@ -53,7 +57,7 @@ public class RedisRpcServiceImpl implements IRedisRpcService { } @Override - public SendRtpInfo getSendRtpItem(String sendRtpItemKey) { + public SendRtpInfo getSendRtpItem(Integer sendRtpItemKey) { RedisRpcRequest request = buildRequest("getSendRtpItem", sendRtpItemKey); RedisRpcResponse response = redisRpcConfig.request(request, 10); if (response.getBody() == null) { @@ -63,7 +67,7 @@ public class RedisRpcServiceImpl implements IRedisRpcService { } @Override - public WVPResult startSendRtp(String sendRtpItemKey, SendRtpInfo sendRtpItem) { + public WVPResult startSendRtp(Integer sendRtpItemKey, SendRtpInfo sendRtpItem) { log.info("[请求其他WVP] 开始推流,wvp:{}, {}/{}", sendRtpItem.getServerId(), sendRtpItem.getApp(), sendRtpItem.getStream()); RedisRpcRequest request = buildRequest("startSendRtp", sendRtpItemKey); request.setToId(sendRtpItem.getServerId()); @@ -72,7 +76,7 @@ public class RedisRpcServiceImpl implements IRedisRpcService { } @Override - public WVPResult stopSendRtp(String sendRtpItemKey) { + public WVPResult stopSendRtp(Integer sendRtpItemKey) { SendRtpInfo sendRtpItem = (SendRtpInfo)redisTemplate.opsForValue().get(sendRtpItemKey); if (sendRtpItem == null) { log.info("[请求其他WVP] 停止推流, 未找到redis中的发流信息, key:{}", sendRtpItemKey); @@ -86,7 +90,7 @@ public class RedisRpcServiceImpl implements IRedisRpcService { } @Override - public long waitePushStreamOnline(SendRtpInfo sendRtpItem, CommonCallback callback) { + public long waitePushStreamOnline(SendRtpInfo sendRtpItem, CommonCallback callback) { log.info("[请求所有WVP监听流上线] {}/{}", sendRtpItem.getApp(), sendRtpItem.getStream()); // 监听流上线。 流上线直接发送sendRtpItem消息给实际的信令处理者 Hook hook = Hook.getInstance(HookType.on_media_arrival, sendRtpItem.getApp(), sendRtpItem.getStream(), null); @@ -103,9 +107,9 @@ public class RedisRpcServiceImpl implements IRedisRpcService { sendRtpItem.setMediaServerId(hookData.getMediaServer().getId()); sendRtpItem.setLocalIp(hookData.getMediaServer().getSdpIp()); sendRtpItem.setServerId(userSetting.getServerId()); - redisTemplate.opsForValue().set(sendRtpItem.getRedisKey(), sendRtpItem); + sendRtpServerService.update(sendRtpItem); if (callback != null) { - callback.run(sendRtpItem.getRedisKey()); + callback.run(sendRtpItem.getChannelId()); } hookSubscribe.removeSubscribe(hook); redisRpcConfig.removeCallback(request.getSn()); @@ -119,7 +123,7 @@ public class RedisRpcServiceImpl implements IRedisRpcService { log.info("[请求所有WVP监听流上线] 流上线 {}/{}->{}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.toString()); if (callback != null) { - callback.run(response.getBody().toString()); + callback.run(Integer.parseInt(response.getBody().toString())); } hookSubscribe.removeSubscribe(hook); }); @@ -137,7 +141,7 @@ public class RedisRpcServiceImpl implements IRedisRpcService { } @Override - public void rtpSendStopped(String sendRtpItemKey) { + public void rtpSendStopped(Integer sendRtpItemKey) { SendRtpInfo sendRtpItem = (SendRtpInfo)redisTemplate.opsForValue().get(sendRtpItemKey); if (sendRtpItem == null) { log.info("[停止WVP监听流上线] 未找到redis中的发流信息, key:{}", sendRtpItemKey); diff --git a/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java b/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java index 33002365..2425c49c 100755 --- a/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java @@ -511,7 +511,7 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage { MessageForPushChannel msg = MessageForPushChannel.getInstance(0, sendRtpItem.getApp(), sendRtpItem.getStream(), channel.getGbDeviceId(), - sendRtpItem.getPlatformId(), platform.getName(), userSetting.getServerId(), sendRtpItem.getMediaServerId()); + sendRtpItem.getTargetId(), platform.getName(), userSetting.getServerId(), sendRtpItem.getMediaServerId()); msg.setPlatFormIndex(platform.getId()); String key = VideoManagerConstants.VM_MSG_STREAM_STOP_PLAY_NOTIFY; @@ -562,14 +562,14 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage { @Override public void sendStartSendRtp(SendRtpInfo sendRtpItem) { String key = VideoManagerConstants.START_SEND_PUSH_STREAM + sendRtpItem.getApp() + "_" + sendRtpItem.getStream(); - log.info("[redis发送通知] 通知其他WVP推流 {}: {}/{}->{}", key, sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getPlatformId()); + log.info("[redis发送通知] 通知其他WVP推流 {}: {}/{}->{}", key, sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getTargetId()); redisTemplate.convertAndSend(key, JSON.toJSON(sendRtpItem)); } @Override public void sendPushStreamOnline(SendRtpInfo sendRtpItem) { String key = VideoManagerConstants.VM_MSG_STREAM_PUSH_CLOSE_REQUESTED; - log.info("[redis发送通知] 流上线 {}: {}/{}->{}", key, sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getPlatformId()); + log.info("[redis发送通知] 流上线 {}: {}/{}->{}", key, sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getTargetId()); redisTemplate.convertAndSend(key, JSON.toJSON(sendRtpItem)); } } diff --git a/src/main/java/com/genersoft/iot/vmp/streamPush/service/impl/StreamPushServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/streamPush/service/impl/StreamPushServiceImpl.java index c0e84cdc..6e31bd23 100755 --- a/src/main/java/com/genersoft/iot/vmp/streamPush/service/impl/StreamPushServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/streamPush/service/impl/StreamPushServiceImpl.java @@ -16,6 +16,7 @@ import com.genersoft.iot.vmp.media.event.mediaServer.MediaServerOnlineEvent; import com.genersoft.iot.vmp.media.service.IMediaServerService; import com.genersoft.iot.vmp.media.zlm.dto.StreamAuthorityInfo; import com.genersoft.iot.vmp.media.zlm.dto.hook.OriginType; +import com.genersoft.iot.vmp.service.ISendRtpServerService; import com.genersoft.iot.vmp.service.bean.StreamPushItemFromRedis; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.streamPush.bean.StreamPush; @@ -29,10 +30,8 @@ import com.github.pagehelper.PageInfo; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.event.EventListener; -import org.springframework.jdbc.datasource.DataSourceTransactionManager; import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Service; -import org.springframework.transaction.TransactionDefinition; import org.springframework.transaction.annotation.Transactional; import org.springframework.util.Assert; import org.springframework.util.ObjectUtils; @@ -57,10 +56,7 @@ public class StreamPushServiceImpl implements IStreamPushService { private IMediaServerService mediaServerService; @Autowired - DataSourceTransactionManager dataSourceTransactionManager; - - @Autowired - TransactionDefinition transactionDefinition; + private ISendRtpServerService sendRtpServerService; @Autowired private IGbChannelService gbChannelService; @@ -290,7 +286,7 @@ public class StreamPushServiceImpl implements IStreamPushService { if (userSetting.isUsePushingAsStatus()) { gbChannelService.offline(streamPush.buildCommonGBChannel()); } - redisCatchStorage.deleteSendRTPServer(null, streamPush.getGbDeviceId(), null, streamPush.getStream()); + sendRtpServerService.deleteByStream(streamPush.getStream()); mediaServerService.stopSendRtp(mediaServer, streamPush.getApp(), streamPush.getStream(), null); streamPush.setUpdateTime(DateUtil.getNow()); streamPushMapper.update(streamPush);