diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/dao/CommonGBChannelMapper.java b/src/main/java/com/genersoft/iot/vmp/gb28181/dao/CommonGBChannelMapper.java index d14ba09c..64f7dad8 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/dao/CommonGBChannelMapper.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/dao/CommonGBChannelMapper.java @@ -550,52 +550,4 @@ public interface CommonGBChannelMapper { @Param("channelType") Integer channelType, @Param("online") Boolean online, @Param("hasLink") Boolean hasLink); - @Select("") - List queryForRecordPlan(List planIdList); } diff --git a/src/main/java/com/genersoft/iot/vmp/service/IRecordPlanService.java b/src/main/java/com/genersoft/iot/vmp/service/IRecordPlanService.java index abec8e0c..f3b34912 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/IRecordPlanService.java +++ b/src/main/java/com/genersoft/iot/vmp/service/IRecordPlanService.java @@ -27,5 +27,5 @@ public interface IRecordPlanService { void cleanAll(Integer planId); - boolean recording(String app, String stream); + Integer recording(String app, String stream); } diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServiceImpl.java index 0959c784..fcb7570f 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServiceImpl.java @@ -209,7 +209,7 @@ public class MediaServiceImpl implements IMediaService { @Override public boolean closeStreamOnNoneReader(String mediaServerId, String app, String stream, String schema) { boolean result = false; - if (recordPlanService.recording(app, stream)) { + if (recordPlanService.recording(app, stream) != null) { return false; } // 国标类型的流 diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/RecordPlanServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/RecordPlanServiceImpl.java index 4bcc945a..ac146e86 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/RecordPlanServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/RecordPlanServiceImpl.java @@ -17,6 +17,7 @@ import com.genersoft.iot.vmp.utils.DateUtil; import com.genersoft.iot.vmp.vmanager.bean.ErrorCode; import com.github.pagehelper.PageHelper; import com.github.pagehelper.PageInfo; +import com.google.common.base.Joiner; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.event.EventListener; @@ -27,6 +28,7 @@ import org.springframework.transaction.annotation.Transactional; import java.time.LocalDateTime; import java.util.*; +import java.util.concurrent.TimeUnit; @Service @Slf4j @@ -53,63 +55,94 @@ public class RecordPlanServiceImpl implements IRecordPlanService { @EventListener public void onApplicationEvent(MediaDepartureEvent event) { // 流断开,检查是否还处于录像状态, 如果是则继续录像 - if (recording(event.getApp(), event.getStream())) { - // 重新拉起 - + Integer channelId = recording(event.getApp(), event.getStream()); + if(channelId == null) { + return; } + // 重新拉起 + CommonGBChannel channel = channelMapper.queryById(channelId); + if (channel == null) { + log.warn("[录制计划] 流离开时拉起需要录像的流时, 发现通道不存在, id: {}", channelId); + return; + } + // 开启点播, + channelPlayService.play(channel, null, ((code, msg, streamInfo) -> { + if (code == InviteErrorCode.SUCCESS.getCode() && streamInfo != null) { + log.info("[录像] 流离开时拉起需要录像的流, 开启成功, 通道ID: {}", channel.getGbId()); + recordStreamMap.put(channel.getGbId(), streamInfo); + } else { + recordStreamMap.remove(channelId); + log.info("[录像] 流离开时拉起需要录像的流, 开启失败, 十分钟后重试, 通道ID: {}", channel.getGbId()); + } + })); } Map recordStreamMap = new HashMap<>(); - @Scheduled(cron = "0 */30 * * * *") +// @Scheduled(cron = "0 */30 * * * *") + @Scheduled(fixedRate = 10, timeUnit = TimeUnit.MINUTES) public void execution() { - // 执行计划 + log.info("[录制计划] 执行"); + // 查询现在需要录像的通道Id + List startChannelIdList = queryCurrentChannelRecord(); - // 获取当前时间在一周内的序号 - LocalDateTime now = LocalDateTime.now(); - int week = now.getDayOfWeek().getValue(); - int index = now.getHour() * 2 + (now.getMinute() > 30?1:0); - // 查询startTime等于现在的, 开始录像 - List startPlanList = recordPlanMapper.queryStart(week, index); - - Map channelMapWithoutRecord = new HashMap<>(); - if (startPlanList.isEmpty()) { - // 停止所有正在录像的 - if(recordStreamMap.isEmpty()) { - // 暂无录像任务 - return; - }else { - channelMapWithoutRecord.putAll(recordStreamMap); + if (startChannelIdList.isEmpty()) { + // 当前没有录像任务, 如果存在旧的正在录像的就移除 + if(!recordStreamMap.isEmpty()) { + stopStreams(recordStreamMap.keySet(), recordStreamMap); recordStreamMap.clear(); } }else { - channelMapWithoutRecord.putAll(recordStreamMap); - // 获取所有的关联的通道 - List channelList = channelMapper.queryForRecordPlan(startPlanList); - if (channelList.isEmpty()) { - recordStreamMap.clear(); - }else { - // 查找是否已经开启录像, 如果没有则开启录像 - for (CommonGBChannel channel : channelList) { - if (recordStreamMap.get(channel.getGbId()) != null) { - channelMapWithoutRecord.remove(channel.getGbId()); - }else { + // 当前存在录像任务, 获取正在录像中存在但是当前录制列表不存在的内容,进行停止; 获取正在录像中没有但是当前需录制的列表中存在的进行开启. + Set recordStreamSet = new HashSet<>(recordStreamMap.keySet()); + startChannelIdList.forEach(recordStreamSet::remove); + if (!recordStreamSet.isEmpty()) { + // 正在录像中存在但是当前录制列表不存在的内容,进行停止; + stopStreams(recordStreamSet, recordStreamMap); + } + + // 移除startChannelIdList中已经在录像的部分, 剩下的都是需要新添加的(正在录像中没有但是当前需录制的列表中存在的进行开启) + recordStreamMap.keySet().forEach(startChannelIdList::remove); + if (!startChannelIdList.isEmpty()) { + // 获取所有的关联的通道 + List channelList = channelMapper.queryByIds(startChannelIdList); + if (!channelList.isEmpty()) { + // 查找是否已经开启录像, 如果没有则开启录像 + for (CommonGBChannel channel : channelList) { // 开启点播, channelPlayService.play(channel, null, ((code, msg, streamInfo) -> { if (code == InviteErrorCode.SUCCESS.getCode() && streamInfo != null) { log.info("[录像] 开启成功, 通道ID: {}", channel.getGbId()); recordStreamMap.put(channel.getGbId(), streamInfo); - channelMapWithoutRecord.remove(channel.getGbId(), streamInfo); + } else { + log.info("[录像] 开启失败, 十分钟后重试, 通道ID: {}", channel.getGbId()); } })); } + } else { + log.error("[录制计划] 数据异常, 这些关联的通道已经不存在了: {}", Joiner.on(",").join(startChannelIdList)); } } } - // 结束录像 - if(!channelMapWithoutRecord.isEmpty()) { - for (Integer channelId : channelMapWithoutRecord.keySet()) { - StreamInfo streamInfo = channelMapWithoutRecord.get(channelId); + } + + /** + * 获取当前时间段应该录像的通道Id列表 + */ + private List queryCurrentChannelRecord(){ + // 获取当前时间在一周内的序号, 数据库存储的从第几个30分钟开始, 0-47, 包括首尾 + LocalDateTime now = LocalDateTime.now(); + int week = now.getDayOfWeek().getValue(); + int index = now.getHour() * 2 + (now.getMinute() > 30?1:0); + + // 查询现在需要录像的通道Id + return recordPlanMapper.queryRecordIng(week, index); + } + + private void stopStreams(Collection channelIds, Map recordStreamMap) { + for (Integer channelId : channelIds) { + try { + StreamInfo streamInfo = recordStreamMap.get(channelId); if (streamInfo == null) { continue; } @@ -117,23 +150,25 @@ public class RecordPlanServiceImpl implements IRecordPlanService { MediaInfo mediaInfo = mediaServerService.getMediaInfo(streamInfo.getMediaServer(), streamInfo.getApp(), streamInfo.getStream()); if (mediaInfo.getReaderCount() == null || mediaInfo.getReaderCount() == 0) { mediaServerService.closeStreams(streamInfo.getMediaServer(), streamInfo.getApp(), streamInfo.getStream()); - log.info("[录像] 停止, 通道ID: {}", channelId); + log.info("[录制计划] 停止, 通道ID: {}", channelId); } + }catch (Exception e) { + log.error("[录制计划] 停止时异常", e); + }finally { + recordStreamMap.remove(channelId); } } } - // 系统启动时 - - @Override - public boolean recording(String app, String stream) { - for (StreamInfo streamInfo : recordStreamMap.values()) { - if (streamInfo.getApp().equals(app) && streamInfo.getStream().equals(stream)) { - return true; + public Integer recording(String app, String stream) { + for (Integer channelId : recordStreamMap.keySet()) { + StreamInfo streamInfo = recordStreamMap.get(channelId); + if (streamInfo != null && streamInfo.getApp().equals(app) && streamInfo.getStream().equals(stream)) { + return channelId; } } - return false; + return null; } @Override @@ -226,7 +261,12 @@ public class RecordPlanServiceImpl implements IRecordPlanService { }else { channelMapper.addRecordPlan(channelIds, planId); } - // TODO 更新录像队列 + // 查看当前的待录制列表是否变化,如果变化,则调用录制计划马上开始录制 + List currentChannelRecord = queryCurrentChannelRecord(); + recordStreamMap.keySet().forEach(currentChannelRecord::remove); + if (!currentChannelRecord.isEmpty()) { + execution(); + } } @Override diff --git a/src/main/java/com/genersoft/iot/vmp/storager/dao/RecordPlanMapper.java b/src/main/java/com/genersoft/iot/vmp/storager/dao/RecordPlanMapper.java index e9304df0..ae0649aa 100644 --- a/src/main/java/com/genersoft/iot/vmp/storager/dao/RecordPlanMapper.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/dao/RecordPlanMapper.java @@ -59,6 +59,9 @@ public interface RecordPlanMapper { @Delete("DELETE FROM wvp_record_plan_item WHERE plan_id = #{planId}") void cleanItems(@Param("planId") Integer planId); - @Select("select plan_id from wvp_record_plan_item where week_day = #{week} and start >= #{index} and stop <= #{index} group by plan_id") - List queryStart(@Param("week") int week, @Param("index") int index); + @Select(" ") + List queryRecordIng(@Param("week") int week, @Param("index") int index); }