diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java index 77ff9b01..a5b78ed8 100755 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java @@ -241,98 +241,9 @@ public class ZLMHttpHookListener { logger.info("[ZLM HOOK]流无人观看:{}->{}->{}/{}", param.getMediaServerId(), param.getSchema(), param.getApp(), param.getStream()); JSONObject ret = new JSONObject(); - ret.put("code", 0); - // 国标类型的流 - if ("rtp".equals(param.getApp())) { - ret.put("close", userSetting.getStreamOnDemand()); - // 国标流, 点播/录像回放/录像下载 - InviteInfo inviteInfo = inviteStreamService.getInviteInfoByStream(null, param.getStream()); - // 点播 - if (inviteInfo != null) { - // 录像下载 - if (inviteInfo.getType() == InviteSessionType.DOWNLOAD) { - ret.put("close", false); - return ret; - } - // 收到无人观看说明流也没有在往上级推送 - if (redisCatchStorage.isChannelSendingRTP(inviteInfo.getChannelId())) { - List sendRtpItems = redisCatchStorage.querySendRTPServerByChannelId( - inviteInfo.getChannelId()); - if (!sendRtpItems.isEmpty()) { - for (SendRtpItem sendRtpItem : sendRtpItems) { - ParentPlatform parentPlatform = storager.queryParentPlatByServerGBId(sendRtpItem.getPlatformId()); - try { - commanderFroPlatform.streamByeCmd(parentPlatform, sendRtpItem.getCallId()); - } catch (SipException | InvalidArgumentException | ParseException e) { - logger.error("[命令发送失败] 国标级联 发送BYE: {}", e.getMessage()); - } - redisCatchStorage.deleteSendRTPServer(parentPlatform.getServerGBId(), sendRtpItem.getChannelId(), - sendRtpItem.getCallId(), sendRtpItem.getStream()); - if (InviteStreamType.PUSH == sendRtpItem.getPlayType()) { - MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(0, - sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getChannelId(), - sendRtpItem.getPlatformId(), parentPlatform.getName(), userSetting.getServerId(), sendRtpItem.getMediaServerId()); - messageForPushChannel.setPlatFormIndex(parentPlatform.getId()); - redisCatchStorage.sendPlatformStopPlayMsg(messageForPushChannel); - } - } - } - } - Device device = deviceService.getDevice(inviteInfo.getDeviceId()); - if (device != null) { - try { - // 多查询一次防止已经被处理了 - InviteInfo info = inviteStreamService.getInviteInfo(inviteInfo.getType(), - inviteInfo.getDeviceId(), inviteInfo.getChannelId(), inviteInfo.getStream()); - if (info != null) { - cmder.streamByeCmd(device, inviteInfo.getChannelId(), - inviteInfo.getStream(), null); - } else { - logger.info("[无人观看] 未找到设备的点播信息: {}, 流:{}", inviteInfo.getDeviceId(), param.getStream()); - } - } catch (InvalidArgumentException | ParseException | SipException | - SsrcTransactionNotFoundException e) { - logger.error("[无人观看]点播, 发送BYE失败 {}", e.getMessage()); - } - } else { - logger.info("[无人观看] 未找到设备: {},流:{}", inviteInfo.getDeviceId(), param.getStream()); - } - inviteStreamService.removeInviteInfo(inviteInfo.getType(), inviteInfo.getDeviceId(), - inviteInfo.getChannelId(), inviteInfo.getStream()); - storager.stopPlay(inviteInfo.getDeviceId(), inviteInfo.getChannelId()); - return ret; - } - SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(null, null, param.getStream(), null); - if (sendRtpItem != null && "talk".equals(sendRtpItem.getApp())) { - ret.put("close", false); - return ret; - } - } else if ("talk".equals(param.getApp()) || "broadcast".equals(param.getApp())) { - ret.put("close", false); - } else { - // 非国标流 推流/拉流代理 - // 拉流代理 - StreamProxyItem streamProxyItem = streamProxyService.getStreamProxyByAppAndStream(param.getApp(), param.getStream()); - if (streamProxyItem != null) { - if (streamProxyItem.isEnableRemoveNoneReader()) { - // 无人观看自动移除 - ret.put("close", true); - streamProxyService.del(param.getApp(), param.getStream()); - String url = streamProxyItem.getUrl() != null ? streamProxyItem.getUrl() : streamProxyItem.getSrcUrl(); - logger.info("[{}/{}]<-[{}] 拉流代理无人观看已经移除", param.getApp(), param.getStream(), url); - } else if (streamProxyItem.isEnableDisableNoneReader()) { - // 无人观看停用 - ret.put("close", true); - // 修改数据 - streamProxyService.stop(param.getApp(), param.getStream()); - } else { - // 无人观看不做处理 - ret.put("close", false); - } - return ret; - } - } + boolean close = mediaService.closeStreamOnNoneReader(param.getMediaServerId(), param.getApp(), param.getStream(), param.getSchema()); + ret.put("code", close); return ret; } @@ -346,8 +257,8 @@ public class ZLMHttpHookListener { DeferredResult defaultResult = new DeferredResult<>(); - MediaServer mediaInfo = mediaServerService.getOne(param.getMediaServerId()); - if (!userSetting.isAutoApplyPlay() || mediaInfo == null) { + MediaServer mediaServer = mediaServerService.getOne(param.getMediaServerId()); + if (!userSetting.isAutoApplyPlay() || mediaServer == null) { defaultResult.setResult(new HookResult(ErrorCode.ERROR404.getCode(), ErrorCode.ERROR404.getMsg())); return defaultResult; } @@ -392,7 +303,7 @@ public class ZLMHttpHookListener { resultHolder.put(key, uuid, result); if (!exist) { - playService.play(mediaInfo, deviceId, channelId, null, (code, message, data) -> { + playService.play(mediaServer, deviceId, channelId, null, (code, message, data) -> { msg.setData(new HookResult(code, message)); resultHolder.invokeResult(msg); }); @@ -431,9 +342,9 @@ public class ZLMHttpHookListener { resultHolder.put(key, uuid, result); if (!exist) { - SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaInfo, param.getStream(), null, + SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServer, param.getStream(), null, device.isSsrcCheck(), true, 0, false, false, device.getStreamModeForParam()); - playService.playBack(mediaInfo, ssrcInfo, deviceId, channelId, startTime, endTime, (code, message, data) -> { + playService.playBack(mediaServer, ssrcInfo, deviceId, channelId, startTime, endTime, (code, message, data) -> { msg.setData(new HookResult(code, message)); resultHolder.invokeResult(msg); }); diff --git a/src/main/java/com/genersoft/iot/vmp/service/IMediaService.java b/src/main/java/com/genersoft/iot/vmp/service/IMediaService.java index 49054559..d3a57f7a 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/IMediaService.java +++ b/src/main/java/com/genersoft/iot/vmp/service/IMediaService.java @@ -49,4 +49,6 @@ public interface IMediaService { boolean authenticatePlay(String app, String stream, String callId); ResultForOnPublish authenticatePublish(MediaServer mediaServer, String app, String stream, String params); + + boolean closeStreamOnNoneReader(String mediaServerId, String app, String stream, String schema); } diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServiceImpl.java index 21c3472c..e57c7eaf 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServiceImpl.java @@ -7,9 +7,11 @@ import com.genersoft.iot.vmp.common.VideoManagerConstants; import com.genersoft.iot.vmp.conf.MediaConfig; import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.conf.exception.ControllerException; -import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel; -import com.genersoft.iot.vmp.gb28181.bean.SsrcTransaction; +import com.genersoft.iot.vmp.conf.exception.SsrcTransactionNotFoundException; +import com.genersoft.iot.vmp.gb28181.bean.*; import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager; +import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander; +import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform; import com.genersoft.iot.vmp.media.bean.MediaInfo; import com.genersoft.iot.vmp.media.bean.ResultForOnPublish; import com.genersoft.iot.vmp.media.service.IMediaServerService; @@ -22,6 +24,7 @@ import com.genersoft.iot.vmp.media.zlm.dto.StreamAuthorityInfo; import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem; import com.genersoft.iot.vmp.media.zlm.dto.hook.HookResultForOnPublish; import com.genersoft.iot.vmp.service.*; +import com.genersoft.iot.vmp.service.bean.MessageForPushChannel; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.IVideoManagerStorage; import com.genersoft.iot.vmp.utils.DateUtil; @@ -35,6 +38,9 @@ import org.springframework.data.redis.core.RedisTemplate; import org.springframework.stereotype.Service; import org.springframework.util.ObjectUtils; +import javax.sip.InvalidArgumentException; +import javax.sip.SipException; +import java.text.ParseException; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -77,6 +83,15 @@ public class MediaServiceImpl implements IMediaService { @Autowired private ZLMMediaListManager zlmMediaListManager; + @Autowired + private IDeviceService deviceService; + + @Autowired + private ISIPCommanderForPlatform commanderForPlatform; + + @Autowired + private ISIPCommander commander; + @Override @@ -291,4 +306,98 @@ public class MediaServiceImpl implements IMediaService { } return map; } + + @Override + public boolean closeStreamOnNoneReader(String mediaServerId, String app, String stream, String schema) { + boolean result = false; + // 国标类型的流 + if ("rtp".equals(app)) { + result = userSetting.getStreamOnDemand(); + // 国标流, 点播/录像回放/录像下载 + InviteInfo inviteInfo = inviteStreamService.getInviteInfoByStream(null, stream); + // 点播 + if (inviteInfo != null) { + // 录像下载 + if (inviteInfo.getType() == InviteSessionType.DOWNLOAD) { + return false; + } + // 收到无人观看说明流也没有在往上级推送 + if (redisCatchStorage.isChannelSendingRTP(inviteInfo.getChannelId())) { + List sendRtpItems = redisCatchStorage.querySendRTPServerByChannelId( + inviteInfo.getChannelId()); + if (!sendRtpItems.isEmpty()) { + for (SendRtpItem sendRtpItem : sendRtpItems) { + ParentPlatform parentPlatform = storager.queryParentPlatByServerGBId(sendRtpItem.getPlatformId()); + try { + commanderForPlatform.streamByeCmd(parentPlatform, sendRtpItem.getCallId()); + } catch (SipException | InvalidArgumentException | ParseException e) { + logger.error("[命令发送失败] 国标级联 发送BYE: {}", e.getMessage()); + } + redisCatchStorage.deleteSendRTPServer(parentPlatform.getServerGBId(), sendRtpItem.getChannelId(), + sendRtpItem.getCallId(), sendRtpItem.getStream()); + if (InviteStreamType.PUSH == sendRtpItem.getPlayType()) { + MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(0, + sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getChannelId(), + sendRtpItem.getPlatformId(), parentPlatform.getName(), userSetting.getServerId(), sendRtpItem.getMediaServerId()); + messageForPushChannel.setPlatFormIndex(parentPlatform.getId()); + redisCatchStorage.sendPlatformStopPlayMsg(messageForPushChannel); + } + } + } + } + Device device = deviceService.getDevice(inviteInfo.getDeviceId()); + if (device != null) { + try { + // 多查询一次防止已经被处理了 + InviteInfo info = inviteStreamService.getInviteInfo(inviteInfo.getType(), + inviteInfo.getDeviceId(), inviteInfo.getChannelId(), inviteInfo.getStream()); + if (info != null) { + commander.streamByeCmd(device, inviteInfo.getChannelId(), + inviteInfo.getStream(), null); + } else { + logger.info("[无人观看] 未找到设备的点播信息: {}, 流:{}", inviteInfo.getDeviceId(), stream); + } + } catch (InvalidArgumentException | ParseException | SipException | + SsrcTransactionNotFoundException e) { + logger.error("[无人观看]点播, 发送BYE失败 {}", e.getMessage()); + } + } else { + logger.info("[无人观看] 未找到设备: {},流:{}", inviteInfo.getDeviceId(), stream); + } + + inviteStreamService.removeInviteInfo(inviteInfo.getType(), inviteInfo.getDeviceId(), + inviteInfo.getChannelId(), inviteInfo.getStream()); + storager.stopPlay(inviteInfo.getDeviceId(), inviteInfo.getChannelId()); + return result; + } + SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(null, null, stream, null); + if (sendRtpItem != null && "talk".equals(sendRtpItem.getApp())) { + return false; + } + } else if ("talk".equals(app) || "broadcast".equals(app)) { + return false; + } else { + // 非国标流 推流/拉流代理 + // 拉流代理 + StreamProxyItem streamProxyItem = streamProxyService.getStreamProxyByAppAndStream(app, stream); + if (streamProxyItem != null) { + if (streamProxyItem.isEnableRemoveNoneReader()) { + // 无人观看自动移除 + result = true; + streamProxyService.del(app, stream); + String url = streamProxyItem.getUrl() != null ? streamProxyItem.getUrl() : streamProxyItem.getSrcUrl(); + logger.info("[{}/{}]<-[{}] 拉流代理无人观看已经移除", app, stream, url); + } else if (streamProxyItem.isEnableDisableNoneReader()) { + // 无人观看停用 + result = true; + // 修改数据 + streamProxyService.stop(app, stream); + } else { + // 无人观看不做处理 + result = false; + } + } + } + return result; + } }