diff --git a/src/main/java/com/genersoft/iot/vmp/media/bean/MediaInfo.java b/src/main/java/com/genersoft/iot/vmp/media/bean/MediaInfo.java index 2dd92eb5..dcd4944f 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/bean/MediaInfo.java +++ b/src/main/java/com/genersoft/iot/vmp/media/bean/MediaInfo.java @@ -4,12 +4,14 @@ import com.alibaba.fastjson2.JSONArray; import com.alibaba.fastjson2.JSONObject; import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam; import io.swagger.v3.oas.annotations.media.Schema; +import lombok.Data; import java.util.List; /** * 视频信息 */ +@Data @Schema(description = "视频信息") public class MediaInfo { @Schema(description = "应用名") @@ -41,6 +43,8 @@ public class MediaInfo { private Boolean online; @Schema(description = "unknown = 0,rtmp_push=1,rtsp_push=2,rtp_push=3,pull=4,ffmpeg_pull=5,mp4_vod=6,device_chn=7") private Integer originType; + @Schema(description = "产生流的源流地址") + private String originUrl; @Schema(description = "存活时间,单位秒") private Long aliveSecond; @Schema(description = "数据产生速度,单位byte/s") @@ -60,6 +64,7 @@ public class MediaInfo { Integer totalReaderCount = jsonObject.getInteger("totalReaderCount"); Boolean online = jsonObject.getBoolean("online"); Integer originType = jsonObject.getInteger("originType"); + String originUrl = jsonObject.getString("originUrl"); Long aliveSecond = jsonObject.getLong("aliveSecond"); Long bytesSpeed = jsonObject.getLong("bytesSpeed"); if (totalReaderCount != null) { @@ -71,6 +76,10 @@ public class MediaInfo { if (originType != null) { mediaInfo.setOriginType(originType); } + if (originUrl != null) { + mediaInfo.setOriginUrl(originUrl); + } + if (aliveSecond != null) { mediaInfo.setAliveSecond(aliveSecond); } @@ -138,6 +147,7 @@ public class MediaInfo { mediaInfo.setReaderCount(param.getTotalReaderCount()); mediaInfo.setOnline(param.isRegist()); mediaInfo.setOriginType(param.getOriginType()); + mediaInfo.setOriginUrl(param.getOriginUrl()); mediaInfo.setAliveSecond(param.getAliveSecond()); mediaInfo.setBytesSpeed(param.getBytesSpeed()); List tracks = param.getTracks(); @@ -177,140 +187,4 @@ public class MediaInfo { } return mediaInfo; } - - public Integer getReaderCount() { - return readerCount; - } - - public void setReaderCount(Integer readerCount) { - this.readerCount = readerCount; - } - - public String getVideoCodec() { - return videoCodec; - } - - public void setVideoCodec(String videoCodec) { - this.videoCodec = videoCodec; - } - - public Integer getWidth() { - return width; - } - - public void setWidth(Integer width) { - this.width = width; - } - - public Integer getHeight() { - return height; - } - - public void setHeight(Integer height) { - this.height = height; - } - - public String getAudioCodec() { - return audioCodec; - } - - public void setAudioCodec(String audioCodec) { - this.audioCodec = audioCodec; - } - - public Integer getAudioChannels() { - return audioChannels; - } - - public void setAudioChannels(Integer audioChannels) { - this.audioChannels = audioChannels; - } - - public Integer getAudioSampleRate() { - return audioSampleRate; - } - - public void setAudioSampleRate(Integer audioSampleRate) { - this.audioSampleRate = audioSampleRate; - } - - public Long getDuration() { - return duration; - } - - public void setDuration(Long duration) { - this.duration = duration; - } - - public Boolean getOnline() { - return online; - } - - public void setOnline(Boolean online) { - this.online = online; - } - - public Integer getOriginType() { - return originType; - } - - public void setOriginType(Integer originType) { - this.originType = originType; - } - - public Long getAliveSecond() { - return aliveSecond; - } - - public void setAliveSecond(Long aliveSecond) { - this.aliveSecond = aliveSecond; - } - - public Long getBytesSpeed() { - return bytesSpeed; - } - - public void setBytesSpeed(Long bytesSpeed) { - this.bytesSpeed = bytesSpeed; - } - - public String getApp() { - return app; - } - - public void setApp(String app) { - this.app = app; - } - - public String getStream() { - return stream; - } - - public void setStream(String stream) { - this.stream = stream; - } - - public MediaServer getMediaServer() { - return mediaServer; - } - - public void setMediaServer(MediaServer mediaServer) { - this.mediaServer = mediaServer; - } - - public String getSchema() { - return schema; - } - - public void setSchema(String schema) { - this.schema = schema; - } - - public String getCallId() { - return callId; - } - - public void setCallId(String callId) { - this.callId = callId; - } } diff --git a/src/main/java/com/genersoft/iot/vmp/media/event/mediaServer/MediaServerStatusEventListener.java b/src/main/java/com/genersoft/iot/vmp/media/event/mediaServer/MediaServerStatusEventListener.java index 20c3af37..2bb3beef 100755 --- a/src/main/java/com/genersoft/iot/vmp/media/event/mediaServer/MediaServerStatusEventListener.java +++ b/src/main/java/com/genersoft/iot/vmp/media/event/mediaServer/MediaServerStatusEventListener.java @@ -1,7 +1,6 @@ package com.genersoft.iot.vmp.media.event.mediaServer; import com.genersoft.iot.vmp.service.IPlayService; -import com.genersoft.iot.vmp.streamProxy.service.IStreamProxyService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -22,9 +21,6 @@ public class MediaServerStatusEventListener { private final static Logger logger = LoggerFactory.getLogger(MediaServerStatusEventListener.class); - @Autowired - private IStreamProxyService streamProxyService; - @Autowired private IPlayService playService; @@ -32,7 +28,6 @@ public class MediaServerStatusEventListener { @EventListener public void onApplicationEvent(MediaServerOnlineEvent event) { logger.info("[媒体节点] 上线 ID:" + event.getMediaServerId()); - streamProxyService.zlmServerOnline(event.getMediaServerId()); playService.zlmServerOnline(event.getMediaServerId()); } @@ -42,7 +37,6 @@ public class MediaServerStatusEventListener { logger.info("[媒体节点] 离线,ID:" + event.getMediaServerId()); // 处理ZLM离线 - streamProxyService.zlmServerOffline(event.getMediaServerId()); playService.zlmServerOffline(event.getMediaServerId()); } } diff --git a/src/main/java/com/genersoft/iot/vmp/media/service/IMediaNodeServerService.java b/src/main/java/com/genersoft/iot/vmp/media/service/IMediaNodeServerService.java index c07797d9..cdd01d92 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/service/IMediaNodeServerService.java +++ b/src/main/java/com/genersoft/iot/vmp/media/service/IMediaNodeServerService.java @@ -50,7 +50,7 @@ public interface IMediaNodeServerService { WVPResult addFFmpegSource(MediaServer mediaServer, String srcUrl, String dstUrl, int timeoutMs, boolean enableAudio, boolean enableMp4, String ffmpegCmdKey); - WVPResult addStreamProxy(MediaServer mediaServer, String app, String stream, String url, boolean enableAudio, boolean enableMp4, String rtpType); + WVPResult addStreamProxy(MediaServer mediaServer, String app, String stream, String url, boolean enableAudio, boolean enableMp4, String rtpType, Integer timeout); Boolean delFFmpegSource(MediaServer mediaServer, String streamKey); @@ -65,4 +65,6 @@ public interface IMediaNodeServerService { Long updateDownloadProcess(MediaServer mediaServer, String app, String stream); StreamInfo startProxy(MediaServer mediaServer, StreamProxy streamProxy); + + void stopProxy(MediaServer mediaServer, String streamKey); } diff --git a/src/main/java/com/genersoft/iot/vmp/media/service/IMediaServerService.java b/src/main/java/com/genersoft/iot/vmp/media/service/IMediaServerService.java index 8f5615ba..92b2946a 100755 --- a/src/main/java/com/genersoft/iot/vmp/media/service/IMediaServerService.java +++ b/src/main/java/com/genersoft/iot/vmp/media/service/IMediaServerService.java @@ -40,6 +40,7 @@ public interface IMediaServerService { void closeRTPServer(MediaServer mediaServerItem, String streamId); void closeRTPServer(MediaServer mediaServerItem, String streamId, CommonCallback callback); + Boolean updateRtpServerSSRC(MediaServer mediaServerItem, String streamId, String ssrc); void closeRTPServer(String mediaServerId, String streamId); @@ -98,7 +99,7 @@ public interface IMediaServerService { WVPResult addFFmpegSource(MediaServer mediaServerItem, String srcUrl, String dstUrl, int timeoutMs, boolean enableAudio, boolean enableMp4, String ffmpegCmdKey); - WVPResult addStreamProxy(MediaServer mediaServerItem, String app, String stream, String url, boolean enableAudio, boolean enableMp4, String rtpType); + WVPResult addStreamProxy(MediaServer mediaServerItem, String app, String stream, String url, boolean enableAudio, boolean enableMp4, String rtpType, Integer timeout); Boolean delFFmpegSource(MediaServer mediaServerItem, String streamKey); @@ -155,4 +156,6 @@ public interface IMediaServerService { Long updateDownloadProcess(MediaServer mediaServerItem, String app, String stream); StreamInfo startProxy(MediaServer mediaServer, StreamProxy streamProxy); + + void stopProxy(MediaServer mediaServer, String streamKey); } diff --git a/src/main/java/com/genersoft/iot/vmp/media/service/impl/MediaServerServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/media/service/impl/MediaServerServiceImpl.java index fe505b19..cdd64d91 100755 --- a/src/main/java/com/genersoft/iot/vmp/media/service/impl/MediaServerServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/media/service/impl/MediaServerServiceImpl.java @@ -704,13 +704,14 @@ public class MediaServerServiceImpl implements IMediaServerService { } @Override - public WVPResult addStreamProxy(MediaServer mediaServer, String app, String stream, String url, boolean enableAudio, boolean enableMp4, String rtpType) { + public WVPResult addStreamProxy(MediaServer mediaServer, String app, String stream, String url, + boolean enableAudio, boolean enableMp4, String rtpType, Integer timeout) { IMediaNodeServerService mediaNodeServerService = nodeServerServiceMap.get(mediaServer.getType()); if (mediaNodeServerService == null) { logger.info("[addStreamProxy] 失败, mediaServer的类型: {},未找到对应的实现类", mediaServer.getType()); return WVPResult.fail(ErrorCode.ERROR400); } - return mediaNodeServerService.addStreamProxy(mediaServer, app, stream, url, enableAudio, enableMp4, rtpType); + return mediaNodeServerService.addStreamProxy(mediaServer, app, stream, url, enableAudio, enableMp4, rtpType, timeout); } @Override @@ -930,4 +931,14 @@ public class MediaServerServiceImpl implements IMediaServerService { } return mediaNodeServerService.startProxy(mediaServer, streamProxy); } + + @Override + public void stopProxy(MediaServer mediaServer, String streamKey) { + IMediaNodeServerService mediaNodeServerService = nodeServerServiceMap.get(mediaServer.getType()); + if (mediaNodeServerService == null) { + logger.info("[stopProxy] 失败, mediaServer的类型: {},未找到对应的实现类", mediaServer.getType()); + throw new ControllerException(ErrorCode.ERROR100.getCode(), "未找到mediaServer对应的实现类"); + } + mediaNodeServerService.stopProxy(mediaServer, streamKey); + } } diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaNodeServerService.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaNodeServerService.java index a88f64f7..62acfa4e 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaNodeServerService.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaNodeServerService.java @@ -14,8 +14,7 @@ import com.genersoft.iot.vmp.media.zlm.dto.ZLMServerConfig; import com.genersoft.iot.vmp.streamProxy.bean.StreamProxy; import com.genersoft.iot.vmp.vmanager.bean.ErrorCode; import com.genersoft.iot.vmp.vmanager.bean.WVPResult; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import org.springframework.util.ObjectUtils; @@ -25,10 +24,10 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +@Slf4j @Service("zlm") public class ZLMMediaNodeServerService implements IMediaNodeServerService { - private final static Logger logger = LoggerFactory.getLogger(ZLMMediaNodeServerService.class); @Autowired private ZLMRESTfulUtils zlmresTfulUtils; @@ -134,7 +133,7 @@ public class ZLMMediaNodeServerService implements IMediaNodeServerService { } JSONObject jsonObject = zlmresTfulUtils.stopSendRtp(mediaInfo, param); if (jsonObject == null || jsonObject.getInteger("code") != 0 ) { - logger.error("停止发流失败: {}, 参数:{}", jsonObject.getString("msg"), JSON.toJSONString(param)); + log.error("停止发流失败: {}, 参数:{}", jsonObject.getString("msg"), JSON.toJSONString(param)); throw new ControllerException(jsonObject.getInteger("code"), jsonObject.getString("msg")); } return true; @@ -152,7 +151,7 @@ public class ZLMMediaNodeServerService implements IMediaNodeServerService { } JSONObject jsonObject = zlmresTfulUtils.stopSendRtp(mediaInfo, param); if (jsonObject == null || jsonObject.getInteger("code") != 0 ) { - logger.error("停止发流失败: {}, 参数:{}", jsonObject.getString("msg"), JSON.toJSONString(param)); + log.error("停止发流失败: {}, 参数:{}", jsonObject.getString("msg"), JSON.toJSONString(param)); return false; } return true; @@ -160,13 +159,13 @@ public class ZLMMediaNodeServerService implements IMediaNodeServerService { @Override public boolean deleteRecordDirectory(MediaServer mediaServer, String app, String stream, String date, String fileName) { - logger.info("[zlm-deleteRecordDirectory] 删除磁盘文件, server: {} {}:{}->{}/{}", mediaServer.getId(), app, stream, date, fileName); + log.info("[zlm-deleteRecordDirectory] 删除磁盘文件, server: {} {}:{}->{}/{}", mediaServer.getId(), app, stream, date, fileName); JSONObject jsonObject = zlmresTfulUtils.deleteRecordDirectory(mediaServer, app, stream, date, fileName); if (jsonObject.getInteger("code") == 0) { return true; }else { - logger.info("[zlm-deleteRecordDirectory] 删除磁盘文件错误, server: {} {}:{}->{}/{}, 结果: {}", mediaServer.getId(), app, stream, date, fileName, jsonObject); + log.info("[zlm-deleteRecordDirectory] 删除磁盘文件错误, server: {} {}:{}->{}/{}, 结果: {}", mediaServer.getId(), app, stream, date, fileName, jsonObject); return false; } } @@ -218,7 +217,7 @@ public class ZLMMediaNodeServerService implements IMediaNodeServerService { @Override public Boolean connectRtpServer(MediaServer mediaServer, String address, int port, String stream) { JSONObject jsonObject = zlmresTfulUtils.connectRtpServer(mediaServer, address, port, stream); - logger.info("[TCP主动连接对方] 结果: {}", jsonObject); + log.info("[TCP主动连接对方] 结果: {}", jsonObject); return jsonObject.getInteger("code") == 0; } @@ -252,7 +251,7 @@ public class ZLMMediaNodeServerService implements IMediaNodeServerService { public String getFfmpegCmd(MediaServer mediaServer, String cmdKey) { JSONObject jsonObject = zlmresTfulUtils.getMediaServerConfig(mediaServer); if (jsonObject.getInteger("code") != 0) { - logger.warn("[getFfmpegCmd] 获取流媒体配置失败"); + log.warn("[getFfmpegCmd] 获取流媒体配置失败"); throw new ControllerException(ErrorCode.ERROR100.getCode(), "获取流媒体配置失败"); } JSONArray dataArray = jsonObject.getJSONArray("data"); @@ -267,7 +266,7 @@ public class ZLMMediaNodeServerService implements IMediaNodeServerService { public WVPResult addFFmpegSource(MediaServer mediaServer, String srcUrl, String dstUrl, int timeoutMs, boolean enableAudio, boolean enableMp4, String ffmpegCmdKey) { JSONObject jsonObject = zlmresTfulUtils.addFFmpegSource(mediaServer, srcUrl, dstUrl, timeoutMs, enableAudio, enableMp4, ffmpegCmdKey); if (jsonObject.getInteger("code") != 0) { - logger.warn("[getFfmpegCmd] 添加FFMPEG代理失败"); + log.warn("[getFfmpegCmd] 添加FFMPEG代理失败"); return WVPResult.fail(ErrorCode.ERROR100.getCode(), "添加FFMPEG代理失败"); }else { JSONObject data = jsonObject.getJSONObject("data"); @@ -280,8 +279,9 @@ public class ZLMMediaNodeServerService implements IMediaNodeServerService { } @Override - public WVPResult addStreamProxy(MediaServer mediaServer, String app, String stream, String url, boolean enableAudio, boolean enableMp4, String rtpType) { - JSONObject jsonObject = zlmresTfulUtils.addStreamProxy(mediaServer, app, stream, url, enableAudio, enableMp4, rtpType); + public WVPResult addStreamProxy(MediaServer mediaServer, String app, String stream, String url, + boolean enableAudio, boolean enableMp4, String rtpType, Integer timeout) { + JSONObject jsonObject = zlmresTfulUtils.addStreamProxy(mediaServer, app, stream, url, enableAudio, enableMp4, rtpType, timeout); if (jsonObject.getInteger("code") != 0) { return WVPResult.fail(ErrorCode.ERROR100.getCode(), "添加代理失败"); }else { @@ -350,11 +350,11 @@ public class ZLMMediaNodeServerService implements IMediaNodeServerService { JSONObject jsonObject = zlmServerFactory.startSendRtpPassive(mediaServer, param, null); if (jsonObject == null || jsonObject.getInteger("code") != 0 ) { - logger.error("启动监听TCP被动推流失败: {}, 参数:{}", jsonObject.getString("msg"), JSON.toJSONString(param)); + log.error("启动监听TCP被动推流失败: {}, 参数:{}", jsonObject.getString("msg"), JSON.toJSONString(param)); throw new ControllerException(jsonObject.getInteger("code"), jsonObject.getString("msg")); } - logger.info("调用ZLM-TCP被动推流接口, 结果: {}", jsonObject); - logger.info("启动监听TCP被动推流成功[ {}/{} ],{}->{}:{}, " , sendRtpItem.getApp(), sendRtpItem.getStream(), + log.info("调用ZLM-TCP被动推流接口, 结果: {}", jsonObject); + log.info("启动监听TCP被动推流成功[ {}/{} ],{}->{}:{}, " , sendRtpItem.getApp(), sendRtpItem.getStream(), jsonObject.getString("local_port"), param.get("dst_url"), param.get("dst_port")); } @@ -386,7 +386,7 @@ public class ZLMMediaNodeServerService implements IMediaNodeServerService { public Long updateDownloadProcess(MediaServer mediaServer, String app, String stream) { MediaInfo mediaInfo = getMediaInfo(mediaServer, app, stream); if (mediaInfo == null) { - logger.warn("[获取下载进度] 查询进度失败, 节点Id: {}, {}/{}", mediaServer.getId(), app, stream); + log.warn("[获取下载进度] 查询进度失败, 节点Id: {}, {}/{}", mediaServer.getId(), app, stream); return null; } return mediaInfo.getDuration(); @@ -427,12 +427,38 @@ public class ZLMMediaNodeServerService implements IMediaNodeServerService { } MediaInfo mediaInfo = getMediaInfo(mediaServer, streamProxy.getApp(), streamProxy.getStream()); if (mediaInfo != null) { - mediaServerService.closeStreams(mediaServer, param.getApp(), param.getStream()); + if (mediaInfo.getOriginUrl().equals(streamProxy.getSrcUrl())) { + log.info("[启动拉流代理] 已存在, 直接返回, app: {}, stream: {}", mediaInfo.getApp(), streamProxy.getStream()); + return getStreamInfoByAppAndStream(mediaServer, streamProxy.getApp(), streamProxy.getStream(), null, null, true); + } + closeStreams(mediaServer, streamProxy.getApp(), streamProxy.getStream()); } - if (isStreamReady(mediaServer, param.getApp(), param.getStream())) { - mediaServerService.closeStreams(mediaServer, param.getApp(), param.getStream()); + + JSONObject jsonObject = null; + if ("ffmpeg".equalsIgnoreCase(streamProxy.getType())){ + if (streamProxy.getTimeout() == 0) { + streamProxy.setTimeout(15); + } + jsonObject = zlmresTfulUtils.addFFmpegSource(mediaServer, streamProxy.getSrcUrl().trim(), dstUrl, + streamProxy.getTimeout(), streamProxy.isEnableAudio(), streamProxy.isEnableMp4(), + streamProxy.getFfmpegCmdKey()); + }else { + jsonObject = zlmresTfulUtils.addStreamProxy(mediaServer, streamProxy.getApp(), streamProxy.getStream(), streamProxy.getSrcUrl().trim(), + streamProxy.isEnableAudio(), streamProxy.isEnableMp4(), streamProxy.getRtspType(), streamProxy.getTimeout()); + } + if (jsonObject == null) { + throw new ControllerException(ErrorCode.ERROR100.getCode(), "请求失败"); + }else if (jsonObject.getInteger("code") != 0) { + throw new ControllerException(jsonObject.getInteger("code"), jsonObject.getString("msg")); + }else { + JSONObject data = jsonObject.getJSONObject("data"); + if (data == null) { + throw new ControllerException(jsonObject.getInteger("code"), "代理结果异常: " + jsonObject); + }else { + streamProxy.setStreamKey(jsonObject.getString("key")); + return getStreamInfoByAppAndStream(mediaServer, streamProxy.getApp(), streamProxy.getStream(), null, null, true); + } } - return null; } private String getSchemaFromFFmpegCmd(String ffmpegCmd) { @@ -453,4 +479,14 @@ public class ZLMMediaNodeServerService implements IMediaNodeServerService { } return null; } + + @Override + public void stopProxy(MediaServer mediaServer, String streamKey) { + JSONObject jsonObject = zlmresTfulUtils.delStreamProxy(mediaServer, streamKey); + if (jsonObject == null) { + throw new ControllerException(ErrorCode.ERROR100.getCode(), "请求失败"); + }else if (jsonObject.getInteger("code") != 0) { + throw new ControllerException(jsonObject.getInteger("code"), jsonObject.getString("msg")); + } + } } diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRESTfulUtils.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRESTfulUtils.java index 5d5f1c31..5bc48eb5 100755 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRESTfulUtils.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRESTfulUtils.java @@ -335,7 +335,7 @@ public class ZLMRESTfulUtils { return sendPost(mediaServerItem, "restartServer",null, null); } - public JSONObject addStreamProxy(MediaServer mediaServerItem, String app, String stream, String url, boolean enable_audio, boolean enable_mp4, String rtp_type) { + public JSONObject addStreamProxy(MediaServer mediaServerItem, String app, String stream, String url, boolean enable_audio, boolean enable_mp4, String rtp_type, Integer timeOut) { Map param = new HashMap<>(); param.put("vhost", "__defaultVhost__"); param.put("app", app); @@ -344,6 +344,9 @@ public class ZLMRESTfulUtils { param.put("enable_mp4", enable_mp4?1:0); param.put("enable_audio", enable_audio?1:0); param.put("rtp_type", rtp_type); + param.put("timeout_sec", timeOut); + // 拉流重试次数,默认为3 + param.put("retry_count", 3); return sendPost(mediaServerItem, "addStreamProxy",param, null, 20); } diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServiceImpl.java index ee5e9262..1b90d7a1 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServiceImpl.java @@ -278,15 +278,14 @@ public class MediaServiceImpl implements IMediaService { } else { // 非国标流 推流/拉流代理 // 拉流代理 - StreamProxy streamProxyItem = streamProxyService.getStreamProxyByAppAndStream(app, stream); - if (streamProxyItem != null) { - if (streamProxyItem.isEnableRemoveNoneReader()) { + StreamProxy streamProxy = streamProxyService.getStreamProxyByAppAndStream(app, stream); + if (streamProxy != null) { + if (streamProxy.isEnableRemoveNoneReader()) { // 无人观看自动移除 result = true; streamProxyService.del(app, stream); - String url = streamProxyItem.getUrl() != null ? streamProxyItem.getUrl() : streamProxyItem.getSrcUrl(); - logger.info("[{}/{}]<-[{}] 拉流代理无人观看已经移除", app, stream, url); - } else if (streamProxyItem.isEnableDisableNoneReader()) { + logger.info("[{}/{}]<-[{}] 拉流代理无人观看已经移除", app, stream, streamProxy.getSrcUrl()); + } else if (streamProxy.isEnableDisableNoneReader()) { // 无人观看停用 result = true; // 修改数据 diff --git a/src/main/java/com/genersoft/iot/vmp/streamProxy/bean/StreamProxy.java b/src/main/java/com/genersoft/iot/vmp/streamProxy/bean/StreamProxy.java index adddde6c..0d7c97de 100755 --- a/src/main/java/com/genersoft/iot/vmp/streamProxy/bean/StreamProxy.java +++ b/src/main/java/com/genersoft/iot/vmp/streamProxy/bean/StreamProxy.java @@ -4,6 +4,7 @@ import com.genersoft.iot.vmp.gb28181.bean.CommonGBChannel; import io.swagger.v3.oas.annotations.media.Schema; import lombok.Data; import lombok.EqualsAndHashCode; +import org.springframework.util.ObjectUtils; /** * @author lin @@ -60,4 +61,21 @@ public class StreamProxy extends CommonGBChannel { @Schema(description = "拉流代理时zlm返回的key,用于停止拉流代理") private String streamKey; + + @Schema(description = "更新时间") + private String updateTime; + + @Schema(description = "创建时间") + private String createTime; + + public CommonGBChannel getCommonGBChannel() { + if (ObjectUtils.isEmpty(this.getGbDeviceId())) { + return null; + } + if (ObjectUtils.isEmpty(this.getGbName())) { + this.setGbName( app+ "-" +stream); + } + this.setStreamProxyId(this.getId()); + return this; + } } diff --git a/src/main/java/com/genersoft/iot/vmp/streamProxy/controller/StreamProxyController.java b/src/main/java/com/genersoft/iot/vmp/streamProxy/controller/StreamProxyController.java index c7ad8f6d..d135b622 100755 --- a/src/main/java/com/genersoft/iot/vmp/streamProxy/controller/StreamProxyController.java +++ b/src/main/java/com/genersoft/iot/vmp/streamProxy/controller/StreamProxyController.java @@ -6,14 +6,12 @@ import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.conf.exception.ControllerException; import com.genersoft.iot.vmp.conf.security.JwtUtils; import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder; -import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage; import com.genersoft.iot.vmp.media.bean.MediaServer; -import com.genersoft.iot.vmp.streamProxy.bean.StreamProxy; import com.genersoft.iot.vmp.media.service.IMediaServerService; +import com.genersoft.iot.vmp.streamProxy.bean.StreamProxy; import com.genersoft.iot.vmp.streamProxy.service.IStreamProxyService; import com.genersoft.iot.vmp.vmanager.bean.ErrorCode; import com.genersoft.iot.vmp.vmanager.bean.StreamContent; -import com.genersoft.iot.vmp.vmanager.bean.WVPResult; import com.github.pagehelper.PageInfo; import io.swagger.v3.oas.annotations.Operation; import io.swagger.v3.oas.annotations.Parameter; @@ -25,10 +23,8 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Controller; import org.springframework.util.ObjectUtils; import org.springframework.web.bind.annotation.*; -import org.springframework.web.context.request.async.DeferredResult; import java.util.Map; -import java.util.UUID; @SuppressWarnings("rawtypes") /** @@ -85,7 +81,7 @@ public class StreamProxyController { }) @PostMapping(value = "/save") @ResponseBody - public DeferredResult save(@RequestBody StreamProxy param){ + public StreamContent save(@RequestBody StreamProxy param){ logger.info("添加代理: " + JSONObject.toJSONString(param)); if (ObjectUtils.isEmpty(param.getMediaServerId())) { param.setMediaServerId("auto"); @@ -93,43 +89,25 @@ public class StreamProxyController { if (ObjectUtils.isEmpty(param.getType())) { param.setType("default"); } - if (ObjectUtils.isEmpty(param.getRtpType())) { - param.setRtpType("1"); - } if (ObjectUtils.isEmpty(param.getGbId())) { - param.setGbId(null); + param.setGbDeviceId(null); } StreamProxy streamProxyItem = streamProxyService.getStreamProxyByAppAndStream(param.getApp(), param.getStream()); if (streamProxyItem != null) { streamProxyService.del(param.getApp(), param.getStream()); } - RequestMessage requestMessage = new RequestMessage(); - String key = DeferredResultHolder.CALLBACK_CMD_PROXY + param.getApp() + param.getStream(); - requestMessage.setKey(key); - String uuid = UUID.randomUUID().toString(); - requestMessage.setId(uuid); - DeferredResult result = new DeferredResult<>(userSetting.getPlayTimeout().longValue()); - // 录像查询以channelId作为deviceId查询 - resultHolder.put(key, uuid, result); - result.onTimeout(()->{ - WVPResult wvpResult = new WVPResult<>(); - wvpResult.setCode(ErrorCode.ERROR100.getCode()); - wvpResult.setMsg("超时"); - requestMessage.setData(wvpResult); - resultHolder.invokeAllResult(requestMessage); - }); - - streamProxyService.save(param, (code, msg, streamInfo) -> { - logger.info("[拉流代理] {}", code == ErrorCode.SUCCESS.getCode()? "成功":"失败: " + msg); - if (code == ErrorCode.SUCCESS.getCode()) { - requestMessage.setData(new StreamContent(streamInfo)); + StreamInfo streamInfo = streamProxyService.save(param); + if (param.isEnable()) { + if (streamInfo == null) { + throw new ControllerException(ErrorCode.ERROR100.getCode(), ErrorCode.ERROR100.getMsg()); }else { - requestMessage.setData(WVPResult.fail(code, msg)); + return new StreamContent(streamInfo); } - resultHolder.invokeAllResult(requestMessage); - }); - return result; + }else { + return null; + } + } @GetMapping(value = "/ffmpeg_cmd/list") @@ -180,10 +158,6 @@ public class StreamProxyController { @Parameter(name = "stream", description = "流id", required = true) public void stop(String app, String stream){ logger.info("停用代理: " + app + "/" + stream); - boolean result = streamProxyService.stop(app, stream); - if (!result) { - logger.info("停用代理失败: " + app + "/" + stream); - throw new ControllerException(ErrorCode.ERROR100); - } + streamProxyService.stop(app, stream); } } diff --git a/src/main/java/com/genersoft/iot/vmp/streamProxy/dao/StreamProxyMapper.java b/src/main/java/com/genersoft/iot/vmp/streamProxy/dao/StreamProxyMapper.java index 9044e7c9..2561b7f3 100755 --- a/src/main/java/com/genersoft/iot/vmp/streamProxy/dao/StreamProxyMapper.java +++ b/src/main/java/com/genersoft/iot/vmp/streamProxy/dao/StreamProxyMapper.java @@ -87,4 +87,9 @@ public interface StreamProxyMapper { @Select("select count(1) from wvp_stream_proxy where status = true") int getOnline(); + + /** + * 查询设置了自动移除并且没有国标编号的代理 + */ + List selectWithAutoRemoveAndWithoutGbDeviceIdByMediaServerId(String mediaServerId); } diff --git a/src/main/java/com/genersoft/iot/vmp/streamProxy/service/IStreamProxyService.java b/src/main/java/com/genersoft/iot/vmp/streamProxy/service/IStreamProxyService.java index 751c01b9..ac5fc646 100755 --- a/src/main/java/com/genersoft/iot/vmp/streamProxy/service/IStreamProxyService.java +++ b/src/main/java/com/genersoft/iot/vmp/streamProxy/service/IStreamProxyService.java @@ -1,6 +1,5 @@ package com.genersoft.iot.vmp.streamProxy.service; -import com.genersoft.iot.vmp.common.GeneralCallback; import com.genersoft.iot.vmp.common.StreamInfo; import com.genersoft.iot.vmp.media.bean.MediaServer; import com.genersoft.iot.vmp.streamProxy.bean.StreamProxy; @@ -16,7 +15,7 @@ public interface IStreamProxyService { * 保存视频代理 * @param param */ - void save(StreamProxy param, GeneralCallback callback); + StreamInfo save(StreamProxy param); /** * 添加视频代理到zlm @@ -73,7 +72,7 @@ public interface IStreamProxyService { * @param stream * @return */ - boolean stop(String app, String stream); + void stop(String app, String stream); /** * 获取ffmpeg.cmd模板 diff --git a/src/main/java/com/genersoft/iot/vmp/streamProxy/service/impl/StreamProxyServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/streamProxy/service/impl/StreamProxyServiceImpl.java index 347d547f..be97ea0a 100755 --- a/src/main/java/com/genersoft/iot/vmp/streamProxy/service/impl/StreamProxyServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/streamProxy/service/impl/StreamProxyServiceImpl.java @@ -8,6 +8,7 @@ import com.genersoft.iot.vmp.conf.DynamicTask; import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.conf.exception.ControllerException; import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent; +import com.genersoft.iot.vmp.gb28181.service.IGbChannelService; import com.genersoft.iot.vmp.media.bean.MediaInfo; import com.genersoft.iot.vmp.media.bean.MediaServer; import com.genersoft.iot.vmp.media.event.hook.Hook; @@ -16,6 +17,8 @@ import com.genersoft.iot.vmp.media.event.hook.HookType; import com.genersoft.iot.vmp.media.event.media.MediaArrivalEvent; import com.genersoft.iot.vmp.media.event.media.MediaDepartureEvent; import com.genersoft.iot.vmp.media.event.media.MediaNotFoundEvent; +import com.genersoft.iot.vmp.media.event.mediaServer.MediaServerOfflineEvent; +import com.genersoft.iot.vmp.media.event.mediaServer.MediaServerOnlineEvent; import com.genersoft.iot.vmp.media.service.IMediaServerService; import com.genersoft.iot.vmp.streamProxy.bean.StreamProxy; import com.genersoft.iot.vmp.service.IGbStreamService; @@ -42,6 +45,7 @@ import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Service; import org.springframework.transaction.TransactionDefinition; import org.springframework.transaction.TransactionStatus; +import org.springframework.transaction.annotation.Transactional; import org.springframework.util.CollectionUtils; import org.springframework.util.ObjectUtils; @@ -73,7 +77,7 @@ public class StreamProxyServiceImpl implements IStreamProxyService { private UserSetting userSetting; @Autowired - private GbStreamMapper gbStreamMapper; + private IGbChannelService gbChannelService; @Autowired private PlatformGbStreamMapper platformGbStreamMapper; @@ -119,7 +123,7 @@ public class StreamProxyServiceImpl implements IStreamProxyService { } /** - * 流离开的处理 + * 流未找到的处理 */ @Async("taskExecutor") @EventListener @@ -134,9 +138,29 @@ public class StreamProxyServiceImpl implements IStreamProxyService { } } + /** + * 流媒体节点上线 + */ + @Async("taskExecutor") + @EventListener + @Transactional + public void onApplicationEvent(MediaServerOnlineEvent event) { + zlmServerOnline(event.getMediaServerId()); + } + + /** + * 流媒体节点离线 + */ + @Async("taskExecutor") + @EventListener + @Transactional + public void onApplicationEvent(MediaServerOfflineEvent event) { + zlmServerOffline(event.getMediaServerId()); + } + @Override - public void save(StreamProxy streamProxy, GeneralCallback callback) { + public StreamInfo save(StreamProxy streamProxy) { MediaServer mediaServer; if (ObjectUtils.isEmpty(streamProxy.getMediaServerId()) || "auto".equals(streamProxy.getMediaServerId())){ mediaServer = mediaServerService.getMediaServerForMinimumLoad(null); @@ -157,136 +181,47 @@ public class StreamProxyServiceImpl implements IStreamProxyService { saveResult = addStreamProxy(streamProxy); } if (!saveResult) { - callback.run(ErrorCode.ERROR100.getCode(), "保存失败", null); - return; + throw new ControllerException(ErrorCode.ERROR100.getCode(), "保存失败"); } if (streamProxy.isEnable()) { - StreamInfo streamInfo = mediaServerService.startProxy(mediaServer, streamProxy); - if (streamInfo != null) { - callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), streamInfo); - }else { - callback.run(ErrorCode.ERROR100.getCode(), "记录已保存,启用失败", null); - } - -// -// -// Hook hook = Hook.getInstance(HookType.on_media_arrival, streamProxy.getApp(), streamProxy.getStream(), mediaServer.getId()); -// hookSubscribe.addSubscribe(hook, (hookData) -> { -// StreamInfo streamInfo = mediaServerService.getStreamInfoByAppAndStream( -// mediaServer, streamProxy.getApp(), streamProxy.getStream(), null, null); -// callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), streamInfo); -// }); -// String talkKey = UUID.randomUUID().toString(); -// String delayTalkKey = UUID.randomUUID().toString(); -// dynamicTask.startDelay(delayTalkKey, ()->{ -// StreamInfo streamInfo = mediaServerService.getStreamInfoByAppAndStreamWithCheck(streamProxy.getApp(), streamProxy.getStream(), mediaServer.getId(), false); -// if (streamInfo != null) { -// callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), streamInfo); -// }else { -// dynamicTask.stop(talkKey); -// callback.run(ErrorCode.ERROR100.getCode(), "超时", null); -// } -// }, 7000); -// WVPResult result = addStreamProxyToZlm(streamProxy); -// if (result != null && result.getCode() == 0) { -// hookSubscribe.removeSubscribe(hook); -// dynamicTask.stop(talkKey); -// StreamInfo streamInfo = mediaServerService.getStreamInfoByAppAndStream( -// mediaServer, streamProxy.getApp(), streamProxy.getStream(), null, null); -// callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), streamInfo); -// }else { -// streamProxy.setEnable(false); -// // 直接移除 -// if (streamProxy.isEnableRemoveNoneReader()) { -// del(streamProxy.getApp(), streamProxy.getStream()); -// }else { -// updateStreamProxy(streamProxy); -// } -// if (result == null){ -// callback.run(ErrorCode.ERROR100.getCode(), "记录已保存,启用失败", null); -// }else { -// callback.run(ErrorCode.ERROR100.getCode(), result.getMsg(), null); -// } -// } - }else{ - StreamInfo streamInfo = mediaServerService.getStreamInfoByAppAndStream( - mediaServer, streamProxy.getApp(), streamProxy.getStream(), null, null); - callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), streamInfo); + return mediaServerService.startProxy(mediaServer, streamProxy); } + return null; } /** * 新增代理流 - * @param streamProxyItem - * @return */ - private boolean addStreamProxy(StreamProxy streamProxyItem) { - TransactionStatus transactionStatus = dataSourceTransactionManager.getTransaction(transactionDefinition); - boolean result = false; - streamProxyItem.setStreamType("proxy"); - streamProxyItem.setStatus(true); + @Transactional + public boolean addStreamProxy(StreamProxy streamProxy) { String now = DateUtil.getNow(); - streamProxyItem.setCreateTime(now); - try { - if (streamProxyMapper.add(streamProxyItem) > 0) { - if (!ObjectUtils.isEmpty(streamProxyItem.getGbId())) { - if (gbStreamMapper.add(streamProxyItem) < 0) { - //事务回滚 - dataSourceTransactionManager.rollback(transactionStatus); - return false; - } - } - }else { - //事务回滚 - dataSourceTransactionManager.rollback(transactionStatus); - return false; - } - result = true; - dataSourceTransactionManager.commit(transactionStatus); //手动提交 - }catch (Exception e) { - log.error("向数据库添加流代理失败:", e); - dataSourceTransactionManager.rollback(transactionStatus); + streamProxy.setCreateTime(now); + streamProxy.setUpdateTime(now); + + if (streamProxyMapper.add(streamProxy) > 0 && !ObjectUtils.isEmpty(streamProxy.getGbDeviceId())) { + gbChannelService.add(streamProxy.getCommonGBChannel()); } - - - return result; + return true; } /** * 更新代理流 - * @param streamProxyItem - * @return */ @Override - public boolean updateStreamProxy(StreamProxy streamProxyItem) { - TransactionStatus transactionStatus = dataSourceTransactionManager.getTransaction(transactionDefinition); - boolean result = false; - streamProxyItem.setStreamType("proxy"); - try { - if (streamProxyMapper.update(streamProxyItem) > 0) { - if (!ObjectUtils.isEmpty(streamProxyItem.getGbId())) { - if (gbStreamMapper.updateByAppAndStream(streamProxyItem) == 0) { - //事务回滚 - dataSourceTransactionManager.rollback(transactionStatus); - return false; - } - } - } else { - //事务回滚 - dataSourceTransactionManager.rollback(transactionStatus); - return false; - } + public boolean updateStreamProxy(StreamProxy streamProxy) { + streamProxy.setUpdateTime(DateUtil.getNow()); - dataSourceTransactionManager.commit(transactionStatus); //手动提交 - result = true; - }catch (Exception e) { - log.error("未处理的异常 ", e); - dataSourceTransactionManager.rollback(transactionStatus); + if (streamProxyMapper.update(streamProxy) > 0 && !ObjectUtils.isEmpty(streamProxy.getGbDeviceId())) { + if (streamProxy.getGbId() > 0) { + gbChannelService.update(streamProxy.getCommonGBChannel()); + }else { + gbChannelService.add(streamProxy.getCommonGBChannel()); + } } - return result; + return true; } @Override @@ -314,8 +249,8 @@ public class StreamProxyServiceImpl implements IStreamProxyService { param.getTimeoutMs(), param.isEnableAudio(), param.isEnableMp4(), param.getFfmpegCmdKey()); }else { - result = mediaServerService.addStreamProxy(mediaServer, param.getApp(), param.getStream(), param.getUrl().trim(), - param.isEnableAudio(), param.isEnableMp4(), param.getRtpType()); + result = mediaServerService.addStreamProxy(mediaServer, param.getApp(), param.getStream(), param.getSrcUrl().trim(), + param.isEnableAudio(), param.isEnableMp4(), param.getRtspType(), param.getTimeout()); } if (result != null && result.getCode() == 0) { String key = result.getData(); @@ -381,39 +316,42 @@ public class StreamProxyServiceImpl implements IStreamProxyService { @Override public boolean start(String app, String stream) { - boolean result = false; StreamProxy streamProxy = streamProxyMapper.selectOne(app, stream); - if (streamProxy != null && !streamProxy.isEnable() ) { - WVPResult wvpResult = addStreamProxyToZlm(streamProxy); - if (wvpResult == null) { - return false; - } - if (wvpResult.getCode() == 0) { - result = true; - streamProxy.setEnable(true); - updateStreamProxy(streamProxy); - }else { - log.info("启用代理失败: {}/{}->{}({})", app, stream, wvpResult.getMsg(), - streamProxy.getSrcUrl() == null? streamProxy.getUrl():streamProxy.getSrcUrl()); - } - } else if (streamProxy != null && streamProxy.isEnable()) { - return true ; + if (streamProxy == null) { + throw new ControllerException(ErrorCode.ERROR404.getCode(), "代理信息未找到"); } - return result; + MediaServer mediaServer; + if (ObjectUtils.isEmpty(streamProxy.getMediaServerId()) || "auto".equals(streamProxy.getMediaServerId())){ + mediaServer = mediaServerService.getMediaServerForMinimumLoad(null); + }else { + mediaServer = mediaServerService.getOne(streamProxy.getMediaServerId()); + } + if (mediaServer == null) { + log.warn("[启用代理] 未找到可用的媒体节点"); + throw new ControllerException(ErrorCode.ERROR100.getCode(), "未找到可用的媒体节点"); + } + StreamInfo streamInfo = mediaServerService.startProxy(mediaServer, streamProxy); + if (streamInfo == null) { + log.warn("[启用代理] 失败"); + throw new ControllerException(ErrorCode.ERROR100.getCode(), "失败"); + } + if (!streamProxy.isEnable()) { + updateStreamProxy(streamProxy); + } + return true; } @Override - public boolean stop(String app, String stream) { - boolean result = false; - StreamProxy streamProxyDto = streamProxyMapper.selectOne(app, stream); - if (streamProxyDto != null && streamProxyDto.isEnable()) { - Boolean removed = removeStreamProxyFromZlm(streamProxyDto); - if (removed != null && removed) { - streamProxyDto.setEnable(false); - result = updateStreamProxy(streamProxyDto); - } + public void stop(String app, String stream) { + StreamProxy streamProxy = streamProxyMapper.selectOne(app, stream); + if (streamProxy == null) { + throw new ControllerException(ErrorCode.ERROR404.getCode(), "代理信息未找到"); } - return result; + MediaServer mediaServer = mediaServerService.getOne(streamProxy.getMediaServerId()); + if (mediaServer == null) { + throw new ControllerException(ErrorCode.ERROR100.getCode(), "未找到启用时使用的媒体节点"); + } + mediaServerService.stopProxy(mediaServer, streamProxy.getStreamKey()); } @Override @@ -430,10 +368,7 @@ public class StreamProxyServiceImpl implements IStreamProxyService { @Override public void zlmServerOnline(String mediaServerId) { // 移除开启了无人观看自动移除的流 - List streamProxyItemList = streamProxyMapper.selectAutoRemoveItemByMediaServerId(mediaServerId); - if (streamProxyItemList.size() > 0) { - gbStreamMapper.batchDel(streamProxyItemList); - } + List streamProxyItemList = streamProxyMapper.selectWithAutoRemoveAndWithoutGbDeviceIdByMediaServerId(mediaServerId); streamProxyMapper.deleteAutoRemoveItemByMediaServerId(mediaServerId); // 移除拉流代理生成的流信息 @@ -540,40 +475,40 @@ public class StreamProxyServiceImpl implements IStreamProxyService { } - @Scheduled(cron = "* 0/10 * * * ?") - public void asyncCheckStreamProxyStatus() { - - List all = mediaServerService.getAllOnline(); - - if (CollectionUtils.isEmpty(all)){ - return; - } - - Map serverItemMap = all.stream().collect(Collectors.toMap(MediaServer::getId, Function.identity(), (m1, m2) -> m1)); - - List list = streamProxyMapper.selectForEnable(true); - - if (CollectionUtils.isEmpty(list)){ - return; - } - - for (StreamProxy streamProxyItem : list) { - - MediaServer mediaServerItem = serverItemMap.get(streamProxyItem.getMediaServerId()); - - MediaInfo mediaInfo = mediaServerService.getMediaInfo(mediaServerItem, streamProxyItem.getApp(), streamProxyItem.getStream()); - - if (mediaInfo == null){ - streamProxyItem.setStatus(false); - } else { - if (mediaInfo.getOnline() != null && mediaInfo.getOnline()) { - streamProxyItem.setStatus(true); - } else { - streamProxyItem.setStatus(false); - } - } - - updateStreamProxy(streamProxyItem); - } - } +// @Scheduled(cron = "* 0/10 * * * ?") +// public void asyncCheckStreamProxyStatus() { +// +// List all = mediaServerService.getAllOnline(); +// +// if (CollectionUtils.isEmpty(all)){ +// return; +// } +// +// Map serverItemMap = all.stream().collect(Collectors.toMap(MediaServer::getId, Function.identity(), (m1, m2) -> m1)); +// +// List list = streamProxyMapper.selectForEnable(true); +// +// if (CollectionUtils.isEmpty(list)){ +// return; +// } +// +// for (StreamProxy streamProxyItem : list) { +// +// MediaServer mediaServerItem = serverItemMap.get(streamProxyItem.getMediaServerId()); +// +// MediaInfo mediaInfo = mediaServerService.getMediaInfo(mediaServerItem, streamProxyItem.getApp(), streamProxyItem.getStream()); +// +// if (mediaInfo == null){ +// streamProxyItem.setStatus(false); +// } else { +// if (mediaInfo.getOnline() != null && mediaInfo.getOnline()) { +// streamProxyItem.setStatus(true); +// } else { +// streamProxyItem.setStatus(false); +// } +// } +// +// updateStreamProxy(streamProxyItem); +// } +// } }