diff --git a/src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java b/src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java index 4f3732e8..e8b0d27b 100644 --- a/src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java +++ b/src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java @@ -18,18 +18,9 @@ public class VideoManagerConstants { public static final String DEVICE_PREFIX = "VMP_DEVICE_"; - // 设备同步完成 - public static final String DEVICE_SYNC_PREFIX = "VMP_DEVICE_SYNC_"; - - public static final String CACHEKEY_PREFIX = "VMP_CHANNEL_"; - - public static final String KEEPLIVEKEY_PREFIX = "VMP_KEEPALIVE_"; // TODO 此处多了一个_,暂不修改 public static final String INVITE_PREFIX = "VMP_INVITE"; - public static final String PLAYER_PREFIX = "VMP_INVITE_PLAY_"; - public static final String PLAY_BLACK_PREFIX = "VMP_INVITE_PLAYBACK_"; - public static final String DOWNLOAD_PREFIX = "VMP_INVITE_DOWNLOAD_"; public static final String PLATFORM_KEEPALIVE_PREFIX = "VMP_PLATFORM_KEEPALIVE_"; @@ -41,16 +32,6 @@ public class VideoManagerConstants { public static final String PLATFORM_SEND_RTP_INFO_PREFIX = "VMP_PLATFORM_SEND_RTP_INFO_"; - public static final String EVENT_ONLINE_REGISTER = "1"; - - public static final String EVENT_ONLINE_MESSAGE = "3"; - - public static final String EVENT_OUTLINE_UNREGISTER = "1"; - - public static final String EVENT_OUTLINE_TIMEOUT = "2"; - - public static final String MEDIA_SSRC_USED_PREFIX = "VMP_MEDIA_USED_SSRC_"; - public static final String MEDIA_TRANSACTION_USED_PREFIX = "VMP_MEDIA_TRANSACTION_"; public static final String MEDIA_STREAM_AUTHORITY = "VMP_MEDIA_STREAM_AUTHORITY_"; diff --git a/src/main/java/com/genersoft/iot/vmp/service/IMediaService.java b/src/main/java/com/genersoft/iot/vmp/service/IMediaService.java index c05f1977..a4a14593 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/IMediaService.java +++ b/src/main/java/com/genersoft/iot/vmp/service/IMediaService.java @@ -41,4 +41,14 @@ public interface IMediaService { * @return */ StreamInfo getStreamInfoByAppAndStream(MediaServerItem mediaInfo, String app, String stream, Object tracks, String addr, String callId); + + /** + * 查看流是否已经注册 + */ + boolean isReady(MediaServerItem mediaInfo, String app, String stream); + + /** + * 关闭zlm的流 + */ + boolean closeStream(MediaServerItem mediaInfo, String app, String stream); } diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServiceImpl.java index f9aec2ea..4f042d16 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServiceImpl.java @@ -110,4 +110,15 @@ public class MediaServiceImpl implements IMediaService { return streamInfoResult; } + @Override + public boolean isReady(MediaServerItem mediaInfo, String app, String stream) { + JSONObject jsonObject = zlmresTfulUtils.getMediaInfo(mediaInfo, app, "rtsp", stream); + return jsonObject != null && jsonObject.getInteger("code") == 0 && jsonObject.getBoolean("online"); + } + + @Override + public boolean closeStream(MediaServerItem mediaInfo, String app, String stream) { + JSONObject jsonObject = zlmresTfulUtils.closeStreams(mediaInfo, app, stream); + return jsonObject != null && jsonObject.getInteger("code") == 0 && jsonObject.getInteger("count_hit") > 0; + } } diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java index 3e8abf12..34e8bb67 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java @@ -5,11 +5,10 @@ import com.alibaba.fastjson2.JSONObject; import com.genersoft.iot.vmp.common.CommonGbChannel; import com.genersoft.iot.vmp.common.GeneralCallback; import com.genersoft.iot.vmp.common.StreamInfo; +import com.genersoft.iot.vmp.common.VideoManagerConstants; import com.genersoft.iot.vmp.conf.DynamicTask; import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.conf.exception.ControllerException; -import com.genersoft.iot.vmp.gb28181.event.EventPublisher; -import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent; import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils; import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory; import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe; @@ -18,6 +17,7 @@ import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import com.genersoft.iot.vmp.media.zlm.dto.StreamProxy; import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam; +import com.genersoft.iot.vmp.media.zlm.dto.hook.OriginType; import com.genersoft.iot.vmp.service.ICommonGbChannelService; import com.genersoft.iot.vmp.service.IMediaServerService; import com.genersoft.iot.vmp.service.IMediaService; @@ -25,7 +25,6 @@ import com.genersoft.iot.vmp.service.IStreamProxyService; import com.genersoft.iot.vmp.service.bean.GPSMsgInfo; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.IVideoManagerStorage; -import com.genersoft.iot.vmp.storager.dao.ParentPlatformMapper; import com.genersoft.iot.vmp.storager.dao.StreamProxyMapper; import com.genersoft.iot.vmp.utils.DateUtil; import com.genersoft.iot.vmp.vmanager.bean.ErrorCode; @@ -35,11 +34,11 @@ import com.github.pagehelper.PageInfo; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.data.redis.core.RedisTemplate; import org.springframework.jdbc.datasource.DataSourceTransactionManager; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Service; import org.springframework.transaction.TransactionDefinition; -import org.springframework.transaction.TransactionStatus; import org.springframework.util.CollectionUtils; import org.springframework.transaction.annotation.Transactional; import org.springframework.util.ObjectUtils; @@ -63,7 +62,7 @@ public class StreamProxyServiceImpl implements IStreamProxyService { private final static Logger logger = LoggerFactory.getLogger(StreamProxyServiceImpl.class); @Autowired - private IVideoManagerStorage videoManagerStorager; + private RedisTemplate redisTemplate; @Autowired private IMediaService mediaService; @@ -162,28 +161,10 @@ public class StreamProxyServiceImpl implements IStreamProxyService { // 更新 StreamProxy streamProxyInDb = streamProxyMapper.selectOne(param.getApp(), param.getStream()); if (streamProxyInDb != null) { - if (streamProxyInDb.getCommonGbChannelId() == 0 && !ObjectUtils.isEmpty(param.getGbId()) ) { - // 新增通用通道 - CommonGbChannel commonGbChannel = CommonGbChannel.getInstance(param); - commonGbChannelService.add(commonGbChannel); - param.setCommonGbChannelId(commonGbChannel.getCommonGbId()); - } - if (streamProxyInDb.getCommonGbChannelId() > 0 && ObjectUtils.isEmpty(param.getGbId()) ) { - // 移除通用通道 - commonGbChannelService.deleteById(streamProxyInDb.getCommonGbChannelId()); - } - param.setUpdateTime(DateUtil.getNow()); - streamProxyMapper.update(param); + param.setId(streamProxyInDb.getId()); + updateProxyToDb(param); }else { // 新增 - if (!ObjectUtils.isEmpty(param.getGbId())) { - // 新增通用通道 - CommonGbChannel commonGbChannel = CommonGbChannel.getInstance(param); - commonGbChannelService.add(commonGbChannel); - param.setCommonGbChannelId(commonGbChannel.getCommonGbId()); - } - param.setCreateTime(DateUtil.getNow()); - param.setUpdateTime(DateUtil.getNow()); - streamProxyMapper.add(param); + addProxyToDb(param); } HookSubscribeForStreamChange hookSubscribeForStreamChange = HookSubscribeFactory.on_stream_changed(param.getApp(), param.getStream(), true, "rtsp", mediaInfo.getId()); hookSubscribe.addSubscribe(hookSubscribeForStreamChange, (mediaServerItem, response) -> { @@ -192,40 +173,23 @@ public class StreamProxyServiceImpl implements IStreamProxyService { callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), streamInfo); }); if (param.isEnable()) { - String talkKey = UUID.randomUUID().toString(); - String delayTalkKey = UUID.randomUUID().toString(); - dynamicTask.startDelay(delayTalkKey, ()->{ - StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStreamWithCheck(param.getApp(), param.getStream(), mediaInfo.getId(), false); - if (streamInfo != null) { - callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), streamInfo); + startProxy(param, mediaInfo, (code, msg, data) -> { + if (code != ErrorCode.SUCCESS.getCode()) { + callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), data); + param.setStatus(true); + streamProxyMapper.update(param); }else { - dynamicTask.stop(talkKey); - callback.run(ErrorCode.ERROR100.getCode(), "超时", null); + callback.run(code, msg, null); + param.setEnable(false); + // 直接移除 + if (param.isEnableRemoveNoneReader()) { + del(param.getApp(), param.getStream()); + }else { + updateStreamProxy(param); + } } - }, 7000); - JSONObject jsonObject = addStreamProxyToZlm(param); - if (jsonObject != null && jsonObject.getInteger("code") == 0) { - hookSubscribe.removeSubscribe(hookSubscribeForStreamChange); - dynamicTask.stop(talkKey); - StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStream( - mediaInfo, param.getApp(), param.getStream(), null, null); - callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), streamInfo); - }else { - param.setEnable(false); - // 直接移除 - if (param.isEnableRemoveNoneReader()) { - del(param.getApp(), param.getStream()); - }else { - updateStreamProxy(param); - } - if (jsonObject == null){ - callback.run(ErrorCode.ERROR100.getCode(), "记录已保存,启用失败", null); - }else { - callback.run(ErrorCode.ERROR100.getCode(), jsonObject.getString("msg"), null); - } - } - } - else{ + }); + } else{ StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStream( mediaInfo, param.getApp(), param.getStream(), null, null); callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), streamInfo); @@ -276,12 +240,22 @@ public class StreamProxyServiceImpl implements IStreamProxyService { } if (!param.isEnable()) { param.setStatus(false); - saveProxyToDb(param); + addProxyToDb(param); StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStream( mediaInfo, param.getApp(), param.getStream(), null, null); callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), streamInfo); return; } + startProxy(param, mediaInfo, (code, msg, data) -> { + callback.run(code, msg, data); + if (code == ErrorCode.SUCCESS.getCode()) { + param.setStatus(true); + addProxyToDb(param); + } else { + + } + }); + String talkKey = UUID.randomUUID().toString(); String delayTalkKey = UUID.randomUUID().toString(); @@ -289,7 +263,7 @@ public class StreamProxyServiceImpl implements IStreamProxyService { hookSubscribe.addSubscribe(hookSubscribeForStreamChange, (mediaServerItem, response) -> { dynamicTask.stop(talkKey); param.setStatus(true); - saveProxyToDb(param); + addProxyToDb(param); StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStream( mediaInfo, param.getApp(), param.getStream(), null, null); callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), streamInfo); @@ -304,7 +278,7 @@ public class StreamProxyServiceImpl implements IStreamProxyService { } param.setProxyError("启用超时"); param.setStatus(false); - saveProxyToDb(param); + addProxyToDb(param); }, 10000); JSONObject jsonObject = addStreamProxyToZlm(param); if (jsonObject != null && jsonObject.getInteger("code") != 0) { @@ -316,7 +290,7 @@ public class StreamProxyServiceImpl implements IStreamProxyService { } param.setProxyError("启用失败: " + jsonObject.getString("msg")); param.setStatus(false); - saveProxyToDb(param); + addProxyToDb(param); } } @@ -390,7 +364,7 @@ public class StreamProxyServiceImpl implements IStreamProxyService { } if (!param.isEnable()) { param.setStatus(false); - saveProxyToDb(param); + addProxyToDb(param); StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStream( mediaInfo, param.getApp(), param.getStream(), null, null); callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), streamInfo); @@ -403,7 +377,7 @@ public class StreamProxyServiceImpl implements IStreamProxyService { hookSubscribe.addSubscribe(hookSubscribeForStreamChange, (mediaServerItem, response) -> { dynamicTask.stop(talkKey); param.setStatus(true); - saveProxyToDb(param); + addProxyToDb(param); StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStream( mediaInfo, param.getApp(), param.getStream(), null, null); callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), streamInfo); @@ -418,7 +392,7 @@ public class StreamProxyServiceImpl implements IStreamProxyService { } param.setProxyError("启用超时"); param.setStatus(false); - saveProxyToDb(param); + addProxyToDb(param); }, 10000); JSONObject jsonObject = addStreamProxyToZlm(param); if (jsonObject != null && jsonObject.getInteger("code") != 0) { @@ -430,11 +404,29 @@ public class StreamProxyServiceImpl implements IStreamProxyService { } param.setProxyError("启用失败: " + jsonObject.getString("msg")); param.setStatus(false); - saveProxyToDb(param); + addProxyToDb(param); } } - public void startProxy(StreamProxy streamProxy, MediaServerItem mediaInfo, GeneralCallback callback) { + public void startProxy(StreamProxy streamProxy, MediaServerItem mediaInfo, GeneralCallback callback) { + // 检测是否在线 + boolean ready = mediaService.isReady(mediaInfo, streamProxy.getApp(), streamProxy.getStream()); + if (ready) { + // 检查redis内容是否正确 + String key = VideoManagerConstants.WVP_SERVER_STREAM_PREFIX + userSetting.getServerId() + "_" + + OriginType.PULL + "_" + streamProxy.getApp() + "_" + streamProxy.getStream() + "_" + + mediaInfo.getId(); + + if (redisTemplate.opsForValue().get(key) == null) { + logger.info("[拉起代理] 发现redis的流信息不存在,但是流存在。关闭流"); + mediaService.closeStream(mediaInfo, streamProxy.getApp(), streamProxy.getStream()); + }else { + StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStream( + mediaInfo, streamProxy.getApp(), streamProxy.getStream(), null, null); + callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), streamInfo); + } + return; + } String talkKey = UUID.randomUUID().toString(); String delayTalkKey = UUID.randomUUID().toString(); @@ -442,7 +434,6 @@ public class StreamProxyServiceImpl implements IStreamProxyService { hookSubscribe.addSubscribe(hookSubscribeForStreamChange, (mediaServerItem, response) -> { dynamicTask.stop(talkKey); streamProxy.setStatus(true); - saveProxyToDb(streamProxy); StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStream( mediaInfo, streamProxy.getApp(), streamProxy.getStream(), null, null); callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), streamInfo); @@ -452,33 +443,60 @@ public class StreamProxyServiceImpl implements IStreamProxyService { hookSubscribe.removeSubscribe(hookSubscribeForStreamChange); dynamicTask.stop(talkKey); callback.run(ErrorCode.ERROR100.getCode(), "启用超时,请检查源地址是否可用", null); - if (streamProxy.isEnableRemoveNoneReader()) { - return; - } streamProxy.setProxyError("启用超时"); - streamProxy.setStatus(false); - saveProxyToDb(streamProxy); }, 10000); - JSONObject jsonObject = addStreamProxyToZlm(streamProxy); - if (jsonObject != null && jsonObject.getInteger("code") != 0) { + JSONObject result; + if ("ffmpeg".equalsIgnoreCase(streamProxy.getType())){ + result = zlmresTfulUtils.addFFmpegSource(mediaInfo, streamProxy.getSrcUrl().trim(), streamProxy.getDstUrl(), + streamProxy.getTimeoutMs() + "", streamProxy.isEnableAudio(), streamProxy.isEnableMp4(), + streamProxy.getFfmpegCmdKey()); + }else { + result = zlmresTfulUtils.addStreamProxy(mediaInfo, streamProxy.getApp(), streamProxy.getStream(), streamProxy.getUrl().trim(), + streamProxy.isEnableAudio(), streamProxy.isEnableMp4(), streamProxy.getRtpType()); + } + if (result == null) { + callback.run(ErrorCode.ERROR100.getCode(), "接口调用失败", null); + return; + } + if (result.getInteger("code") != 0) { hookSubscribe.removeSubscribe(hookSubscribeForStreamChange); dynamicTask.stop(talkKey); - callback.run(ErrorCode.ERROR100.getCode(), jsonObject.getString("msg"), null); - if (streamProxy.isEnableRemoveNoneReader()) { + callback.run(result.getInteger("code"), result.getString("msg"), null); + }else { + JSONObject data = result.getJSONObject("data"); + if (data == null) { + logger.warn("[获取拉流代理的结果数据Data] 失败: {}", result ); + callback.run(result.getInteger("code"), result.getString("msg"), null); return; } - streamProxy.setProxyError("启用失败: " + jsonObject.getString("msg")); - streamProxy.setStatus(false); - saveProxyToDb(streamProxy); + String key = data.getString("key"); + if (key == null) { + logger.warn("[获取拉流代理的结果数据Data中的KEY] 失败: {}", result ); + callback.run(ErrorCode.ERROR100.getCode(), "获取代理流结果中的KEY失败", null); + return; + } + streamProxy.setStreamKey(key); } } - public void stopProxy(StreamProxy streamProxy, GeneralCallback callback) { + public void stopProxy(StreamProxy streamProxy, MediaServerItem mediaInfo, GeneralCallback callback) { + boolean ready = mediaService.isReady(mediaInfo, streamProxy.getApp(), streamProxy.getStream()); + if (ready) { + mediaService.closeStream(mediaInfo, streamProxy.getApp(), streamProxy.getStream()); + } + // 检查redis内容是否正确 + String key = VideoManagerConstants.WVP_SERVER_STREAM_PREFIX + userSetting.getServerId() + "_" + + OriginType.PULL + "_" + streamProxy.getApp() + "_" + streamProxy.getStream() + "_" + + mediaInfo.getId(); + if (redisTemplate.opsForValue().get(key) == null) { + redisTemplate.delete(key); + } + callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), null); } - private void saveProxyToDb(StreamProxy param) { + private void addProxyToDb(StreamProxy param) { // 未启用的数据可以直接保存了 if (!ObjectUtils.isEmpty(param.getGbId())) { CommonGbChannel commonGbChannel = CommonGbChannel.getInstance(param); @@ -496,6 +514,39 @@ public class StreamProxyServiceImpl implements IStreamProxyService { } } + @Transactional + public void updateProxyToDb(StreamProxy param) { + if (param.getId() <= 0) { + logger.error("[更新代理存储到数据库] 错误, 缺少ID"); + return; + } + StreamProxy streamProxyInDb = streamProxyMapper.selectOneById(param.getId()); + if (streamProxyInDb == null) { + logger.error("[更新代理存储到数据库] 错误,ID: {} 不在数据库中", param.getId()); + return; + } + if (!ObjectUtils.isEmpty(streamProxyInDb.getGbId().trim()) && ObjectUtils.isEmpty(param.getGbId().trim())) { + // 国标ID已经移除 + if (streamProxyInDb.getCommonGbChannelId() > 0) { + commonGbChannelService.deleteById(streamProxyInDb.getCommonGbChannelId()); + } + } + if (!ObjectUtils.isEmpty(param.getGbId().trim()) && ObjectUtils.isEmpty(streamProxyInDb.getGbId().trim())) { + CommonGbChannel commonGbChannel = CommonGbChannel.getInstance(param); + // 国标ID已经添加 + if (commonGbChannelService.add(commonGbChannel) > 0) { + param.setCommonGbChannelId(commonGbChannel.getCommonGbId()); + } + } + param.setUpdateTime(DateUtil.getNow()); + param.setStatus(streamProxyInDb.isStatus()); + int addStreamProxyResult = streamProxyMapper.add(param); + if (addStreamProxyResult <= 0) { + throw new ControllerException(ErrorCode.ERROR100.getCode(), "添加拉流代理失败"); + } + } + + private String getSchemaFromFFmpegCmd(String ffmpegCmd) { ffmpegCmd = ffmpegCmd.replaceAll(" + ", " "); String[] paramArray = ffmpegCmd.split(" "); @@ -776,7 +827,7 @@ public class StreamProxyServiceImpl implements IStreamProxyService { MediaServerItem mediaServer = mediaServerService.getOne(mediaServerId); if (mediaServer != null) { List allPullStream = redisCatchStorage.getStreams(mediaServerId, "PULL"); - if (allPullStream.size() > 0) { + if (!allPullStream.isEmpty()) { zlmresTfulUtils.getMediaList(mediaServer, jsonObject->{ Map stringStreamInfoMap = new HashMap<>(); if (jsonObject.getInteger("code") == 0) {