diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/service/IGbChannelService.java b/src/main/java/com/genersoft/iot/vmp/gb28181/service/IGbChannelService.java index 4df6f48b..ddd05172 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/service/IGbChannelService.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/service/IGbChannelService.java @@ -25,4 +25,6 @@ public interface IGbChannelService { void closeSend(CommonGBChannel commonGBChannel); void batchAdd(List commonGBChannels); + + void updateStatus(List channelList); } diff --git a/src/main/java/com/genersoft/iot/vmp/streamProxy/dao/StreamProxyMapper.java b/src/main/java/com/genersoft/iot/vmp/streamProxy/dao/StreamProxyMapper.java index 7be3d821..004690a2 100755 --- a/src/main/java/com/genersoft/iot/vmp/streamProxy/dao/StreamProxyMapper.java +++ b/src/main/java/com/genersoft/iot/vmp/streamProxy/dao/StreamProxyMapper.java @@ -95,4 +95,5 @@ public interface StreamProxyMapper { int delete(int id); + void deleteByList(List streamProxiesForRemove); } diff --git a/src/main/java/com/genersoft/iot/vmp/streamProxy/service/impl/StreamProxyServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/streamProxy/service/impl/StreamProxyServiceImpl.java index 4c29585f..b3a382a6 100755 --- a/src/main/java/com/genersoft/iot/vmp/streamProxy/service/impl/StreamProxyServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/streamProxy/service/impl/StreamProxyServiceImpl.java @@ -2,59 +2,51 @@ package com.genersoft.iot.vmp.streamProxy.service.impl; import com.alibaba.fastjson2.JSONObject; import com.baomidou.dynamic.datasource.annotation.DS; -import com.genersoft.iot.vmp.common.GeneralCallback; import com.genersoft.iot.vmp.common.StreamInfo; import com.genersoft.iot.vmp.conf.DynamicTask; import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.conf.exception.ControllerException; +import com.genersoft.iot.vmp.gb28181.bean.CommonGBChannel; import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent; import com.genersoft.iot.vmp.gb28181.service.IGbChannelService; import com.genersoft.iot.vmp.media.bean.MediaInfo; import com.genersoft.iot.vmp.media.bean.MediaServer; -import com.genersoft.iot.vmp.media.event.hook.Hook; import com.genersoft.iot.vmp.media.event.hook.HookSubscribe; -import com.genersoft.iot.vmp.media.event.hook.HookType; import com.genersoft.iot.vmp.media.event.media.MediaArrivalEvent; import com.genersoft.iot.vmp.media.event.media.MediaDepartureEvent; import com.genersoft.iot.vmp.media.event.media.MediaNotFoundEvent; import com.genersoft.iot.vmp.media.event.mediaServer.MediaServerOfflineEvent; import com.genersoft.iot.vmp.media.event.mediaServer.MediaServerOnlineEvent; import com.genersoft.iot.vmp.media.service.IMediaServerService; -import com.genersoft.iot.vmp.streamProxy.bean.StreamProxy; +import com.genersoft.iot.vmp.media.zlm.dto.hook.OriginType; import com.genersoft.iot.vmp.service.IGbStreamService; -import com.genersoft.iot.vmp.streamProxy.service.IStreamProxyService; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.IVideoManagerStorage; -import com.genersoft.iot.vmp.storager.dao.GbStreamMapper; import com.genersoft.iot.vmp.storager.dao.PlatformGbStreamMapper; +import com.genersoft.iot.vmp.streamProxy.bean.StreamProxy; import com.genersoft.iot.vmp.streamProxy.dao.StreamProxyMapper; +import com.genersoft.iot.vmp.streamProxy.service.IStreamProxyService; import com.genersoft.iot.vmp.utils.DateUtil; import com.genersoft.iot.vmp.vmanager.bean.ErrorCode; import com.genersoft.iot.vmp.vmanager.bean.ResourceBaseInfo; import com.genersoft.iot.vmp.vmanager.bean.WVPResult; import com.github.pagehelper.PageHelper; import com.github.pagehelper.PageInfo; +import com.google.common.collect.Lists; import lombok.extern.slf4j.Slf4j; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.event.EventListener; import org.springframework.jdbc.datasource.DataSourceTransactionManager; import org.springframework.scheduling.annotation.Async; -import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Service; import org.springframework.transaction.TransactionDefinition; -import org.springframework.transaction.TransactionStatus; import org.springframework.transaction.annotation.Transactional; -import org.springframework.util.CollectionUtils; import org.springframework.util.ObjectUtils; +import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.UUID; -import java.util.function.Function; -import java.util.stream.Collectors; /** * 视频代理业务 @@ -301,74 +293,122 @@ public class StreamProxyServiceImpl implements IStreamProxyService { } @Override + @Transactional public void zlmServerOnline(String mediaServerId) { MediaServer mediaServer = mediaServerService.getOne(mediaServerId); if (mediaServer == null) { return; } // 这里主要是控制数据库/redis缓存/以及zlm中存在的代理流 三者状态一致。以数据库中数据为根本 + redisCatchStorage.removeStream(mediaServerId, "pull"); + List streamProxies = streamProxyMapper.selectForEnableInMediaServer(mediaServerId, true); if (streamProxies.isEmpty()){ return; } + Map streamProxyMapForDb = new HashMap<>(); + for (StreamProxy streamProxy : streamProxies) { + streamProxyMapForDb.put(streamProxy.getApp() + "_" + streamProxy.getStream(), streamProxy); + } + + List streamInfoList = mediaServerService.getMediaList(mediaServer, null, null, null); + + List channelListForOnline = new ArrayList<>(); + for (StreamInfo streamInfo : streamInfoList) { + String key = streamInfo.getApp() + streamInfo.getStream(); + StreamProxy streamProxy = streamProxyMapForDb.get(key); + if (streamProxy == null) { + // 流媒体存在,数据库中不存在 + continue; + } + if (streamInfo.getOriginType() == OriginType.PULL.ordinal() + || streamInfo.getOriginType() == OriginType.FFMPEG_PULL.ordinal() ) { + if (streamProxyMapForDb.get(key) != null) { + redisCatchStorage.addStream(mediaServer, "pull", streamInfo.getApp(), streamInfo.getStream(), streamInfo.getMediaInfo()); + if (!streamProxy.getGbStatus() && streamProxy.getGbId() > 0) { + streamProxy.setGbStatus(true); + channelListForOnline.add(streamProxy.getCommonGBChannel()); + } + streamProxyMapForDb.remove(key); + } + } + } + + if (!channelListForOnline.isEmpty()) { + gbChannelService.online(channelListForOnline); + } + List channelListForOffline = new ArrayList<>(); + List streamProxiesForRemove = new ArrayList<>(); + if (!streamProxyMapForDb.isEmpty()) { + for (StreamProxy streamProxy : streamProxyMapForDb.values()) { + if (streamProxy.getGbStatus() && streamProxy.getGbId() > 0) { + streamProxy.setGbStatus(false); + channelListForOffline.add(streamProxy.getCommonGBChannel()); + } + // 移除开启了无人观看自动移除的流 + if (streamProxy.getGbDeviceId() == null && streamProxy.isEnableRemoveNoneReader()) { + streamProxiesForRemove.add(streamProxy); + streamProxyMapForDb.remove(streamProxy.getApp() + streamProxy.getStream()); + } + } + } + if (!channelListForOffline.isEmpty()) { + gbChannelService.offline(channelListForOffline); + } + if (!streamProxiesForRemove.isEmpty()) { + streamProxyMapper.deleteByList(streamProxiesForRemove); + } - // 移除开启了无人观看自动移除的流 -// List streamProxyItemList = streamProxyMapper.selectWithAutoRemoveAndWithoutGbDeviceIdByMediaServerId(mediaServerId); - streamProxyMapper.deleteWithAutoRemoveAndWithoutGbDeviceIdByMediaServerId(mediaServerId); - - // 移除拉流代理生成的流信息 - syncPullStream(mediaServerId); - - // 恢复流代理, 只查找这个这个流媒体 - List streamProxyListForEnable = storager.getStreamProxyListForEnableInMediaServer( - mediaServerId, true); - for (StreamProxy streamProxyDto : streamProxyListForEnable) { - log.info("恢复流代理," + streamProxyDto.getApp() + "/" + streamProxyDto.getStream()); - StreamInfo streamInfo = mediaServerService.startProxy(mediaServer, streamProxy); - WVPResult wvpResult = addStreamProxyToZlm(streamProxyDto); - if (wvpResult == null) { - // 设置为离线 - log.info("恢复流代理失败" + streamProxyDto.getApp() + "/" + streamProxyDto.getStream()); - updateStatusByAppAndStream(streamProxyDto.getApp(), streamProxyDto.getStream(), false); - }else { - updateStatusByAppAndStream(streamProxyDto.getApp(), streamProxyDto.getStream(), true); + if (!streamProxyMapForDb.isEmpty()) { + for (StreamProxy streamProxy : streamProxyMapForDb.values()) { + log.info("恢复流代理," + streamProxy.getApp() + "/" + streamProxy.getStream()); + mediaServerService.startProxy(mediaServer, streamProxy); } } } @Override public void zlmServerOffline(String mediaServerId) { - // 移除开启了无人观看自动移除的流 - List streamProxyItemList = streamProxyMapper.selectAutoRemoveItemByMediaServerId(mediaServerId); - if (streamProxyItemList.size() > 0) { - gbStreamMapper.batchDel(streamProxyItemList); - } - streamProxyMapper.deleteAutoRemoveItemByMediaServerId(mediaServerId); - // 其他的流设置离线 - streamProxyMapper.updateStatusByMediaServerId(mediaServerId, false); - String type = "PULL"; + List streamProxies = streamProxyMapper.selectForEnableInMediaServer(mediaServerId, true); - // 发送redis消息 - List mediaInfoList = redisCatchStorage.getStreams(mediaServerId, type); - if (mediaInfoList.size() > 0) { - for (MediaInfo mediaInfo : mediaInfoList) { - JSONObject jsonObject = new JSONObject(); - jsonObject.put("serverId", userSetting.getServerId()); - jsonObject.put("app", mediaInfo.getApp()); - jsonObject.put("stream", mediaInfo.getStream()); - jsonObject.put("register", false); - jsonObject.put("mediaServerId", mediaServerId); - redisCatchStorage.sendStreamChangeMsg(type, jsonObject); - // 移除redis内流的信息 - redisCatchStorage.removeStream(mediaServerId, type, mediaInfo.getApp(), mediaInfo.getStream()); + // 清理redis相关的缓存 + redisCatchStorage.removeStream(mediaServerId, "pull"); + + if (streamProxies.isEmpty()){ + return; + } + List streamProxiesForRemove = new ArrayList<>(); + List streamProxiesForSendMessage = new ArrayList<>(); + List channelListForOffline = new ArrayList<>(); + + for (StreamProxy streamProxy : streamProxies) { + if (streamProxy.getGbId() > 0 && streamProxy.getGbStatus()) { + channelListForOffline.add(streamProxy.getCommonGBChannel()); + } + if (streamProxy.getGbId() == 0 && streamProxy.isEnableRemoveNoneReader()) { + streamProxiesForRemove.add(streamProxy); + } + if (streamProxy.getGbStatus()) { + streamProxiesForSendMessage.add(streamProxy); } } - } - - @Override - public void clean() { + // 移除开启了无人观看自动移除的流 + streamProxyMapper.deleteByList(streamProxiesForRemove); + // 修改国标关联的国标通道的状态 + gbChannelService.offline(channelListForOffline); + if (!streamProxiesForSendMessage.isEmpty()) { + for (StreamProxy streamProxy : streamProxiesForSendMessage) { + JSONObject jsonObject = new JSONObject(); + jsonObject.put("serverId", userSetting.getServerId()); + jsonObject.put("app", streamProxy.getApp()); + jsonObject.put("stream", streamProxy.getStream()); + jsonObject.put("register", false); + jsonObject.put("mediaServerId", mediaServerId); + redisCatchStorage.sendStreamChangeMsg("pull", jsonObject); + } + } } @Override @@ -385,33 +425,6 @@ public class StreamProxyServiceImpl implements IStreamProxyService { return result; } - private void syncPullStream(String mediaServerId){ - MediaServer mediaServer = mediaServerService.getOne(mediaServerId); - if (mediaServer != null) { - List mediaInfoList = redisCatchStorage.getStreams(mediaServerId, "PULL"); - if (!mediaInfoList.isEmpty()) { - List mediaList = mediaServerService.getMediaList(mediaServer, null, null, null); - Map stringStreamInfoMap = new HashMap<>(); - if (mediaList != null && !mediaList.isEmpty()) { - for (StreamInfo streamInfo : mediaList) { - stringStreamInfoMap.put(streamInfo.getApp() + streamInfo.getStream(), streamInfo); - } - } - if (stringStreamInfoMap.isEmpty()) { - redisCatchStorage.removeStream(mediaServerId, "PULL"); - }else { - for (String key : stringStreamInfoMap.keySet()) { - StreamInfo streamInfo = stringStreamInfoMap.get(key); - if (stringStreamInfoMap.get(streamInfo.getApp() + streamInfo.getStream()) == null) { - redisCatchStorage.removeStream(mediaServerId, "PULL", streamInfo.getApp(), - streamInfo.getStream()); - } - } - } - } - } - } - @Override public ResourceBaseInfo getOverview() {