diff --git a/src/main/java/com/genersoft/iot/vmp/service/IGbStreamService.java b/src/main/java/com/genersoft/iot/vmp/service/IGbStreamService.java index 43f1a8a6..e4090977 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/IGbStreamService.java +++ b/src/main/java/com/genersoft/iot/vmp/service/IGbStreamService.java @@ -7,6 +7,7 @@ import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem; import com.github.pagehelper.PageInfo; import java.util.List; +import java.util.Map; /** * 级联国标平台关联流业务接口 @@ -71,4 +72,7 @@ public interface IGbStreamService { void delAllPlatformInfo(String platformId, String catalogId); List getGbChannelWithGbid(String gbId); + + Map getAllGBId(); + } diff --git a/src/main/java/com/genersoft/iot/vmp/service/IStreamPushService.java b/src/main/java/com/genersoft/iot/vmp/service/IStreamPushService.java index 333b7b31..10b1eff1 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/IStreamPushService.java +++ b/src/main/java/com/genersoft/iot/vmp/service/IStreamPushService.java @@ -115,4 +115,7 @@ public interface IStreamPushService { */ ResourceBaseInfo getOverview(); + Map getAllAppAndStreamMap(); + + } diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/GbStreamServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/GbStreamServiceImpl.java index 9fcbb407..c2c9d725 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/GbStreamServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/GbStreamServiceImpl.java @@ -19,11 +19,11 @@ import org.springframework.jdbc.datasource.DataSourceTransactionManager; import org.springframework.stereotype.Service; import org.springframework.transaction.TransactionDefinition; import org.springframework.transaction.TransactionStatus; -import org.springframework.transaction.annotation.Transactional; import org.springframework.util.ObjectUtils; import java.util.ArrayList; import java.util.List; +import java.util.Map; @Service @DS("master") @@ -268,4 +268,9 @@ public class GbStreamServiceImpl implements IGbStreamService { public List getGbChannelWithGbid(String gbId) { return gbStreamMapper.selectByGBId(gbId); } + + @Override + public Map getAllGBId() { + return gbStreamMapper.getAllGBId(); + } } diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java index 13c452c1..e2d7e68c 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java @@ -548,4 +548,9 @@ public class StreamPushServiceImpl implements IStreamPushService { return new ResourceBaseInfo(total, online); } + + @Override + public Map getAllAppAndStreamMap() { + return streamPushMapper.getAllAppAndStreamMap(); + } } diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamStatusListMsgListener.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamStatusListMsgListener.java index cb34ff59..24a19f3e 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamStatusListMsgListener.java +++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamStatusListMsgListener.java @@ -2,6 +2,7 @@ package com.genersoft.iot.vmp.service.redisMsg; import com.alibaba.fastjson2.JSON; import com.alibaba.fastjson2.JSONObject; +import com.genersoft.iot.vmp.gb28181.bean.GbStream; import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem; import com.genersoft.iot.vmp.service.IGbStreamService; import com.genersoft.iot.vmp.service.IMediaServerService; @@ -19,6 +20,7 @@ import org.springframework.stereotype.Component; import javax.annotation.Resource; import java.util.ArrayList; import java.util.List; +import java.util.Map; import java.util.concurrent.ConcurrentLinkedQueue; /** @@ -57,7 +59,8 @@ public class RedisPushStreamStatusListMsgListener implements MessageListener { try { List streamPushItems = JSON.parseArray(new String(msg.getBody()), StreamPushItem.class); //查询全部的app+stream 用于判断是添加还是修改 - List allAppAndStream = streamPushService.getAllAppAndStream(); + Map allAppAndStream = streamPushService.getAllAppAndStreamMap(); + Map allGBId = gbStreamService.getAllGBId(); /** * 用于存储更具APP+Stream过滤后的数据,可以直接存入stream_push表与gb_stream表 @@ -67,9 +70,15 @@ public class RedisPushStreamStatusListMsgListener implements MessageListener { for (StreamPushItem streamPushItem : streamPushItems) { String app = streamPushItem.getApp(); String stream = streamPushItem.getStream(); - boolean contains = allAppAndStream.contains(app + stream); + boolean contains = allAppAndStream.containsKey(app + stream); //不存在就添加 if (!contains) { + if (allGBId.containsKey(streamPushItem.getGbId())) { + GbStream gbStream = allGBId.get(streamPushItem.getGbId()); + logger.warn("[REDIS消息-推流设备列表更新] 国标编号重复: {}, 已分配给{}/{}", + streamPushItem.getGbId(), gbStream.getApp(), gbStream.getStream()); + continue; + } streamPushItem.setStreamType("push"); streamPushItem.setCreateTime(DateUtil.getNow()); streamPushItem.setMediaServerId(mediaServerService.getDefaultMediaServer().getId()); @@ -77,25 +86,25 @@ public class RedisPushStreamStatusListMsgListener implements MessageListener { streamPushItem.setOriginTypeStr("rtsp_push"); streamPushItem.setTotalReaderCount("0"); streamPushItemForSave.add(streamPushItem); + allGBId.put(streamPushItem.getGbId(), streamPushItem); } else { //存在就只修改 name和gbId streamPushItemForUpdate.add(streamPushItem); } } - if (streamPushItemForSave.size() > 0) { - + if (!streamPushItemForSave.isEmpty()) { logger.info("添加{}条",streamPushItemForSave.size()); logger.info(JSONObject.toJSONString(streamPushItemForSave)); streamPushService.batchAdd(streamPushItemForSave); } - if(streamPushItemForUpdate.size()>0){ + if(!streamPushItemForUpdate.isEmpty()){ logger.info("修改{}条",streamPushItemForUpdate.size()); logger.info(JSONObject.toJSONString(streamPushItemForUpdate)); gbStreamService.updateGbIdOrName(streamPushItemForUpdate); } }catch (Exception e) { - logger.warn("[REDIS消息-推流设备列表更新] 发现未处理的异常, \r\n{}", JSON.toJSONString(message)); + logger.warn("[REDIS消息-推流设备列表更新] 发现未处理的异常, \r\n{}", new String(message.getBody())); logger.error("[REDIS消息-推流设备列表更新] 异常内容: ", e); } } diff --git a/src/main/java/com/genersoft/iot/vmp/storager/dao/GbStreamMapper.java b/src/main/java/com/genersoft/iot/vmp/storager/dao/GbStreamMapper.java index 6591e3f9..3790bdab 100755 --- a/src/main/java/com/genersoft/iot/vmp/storager/dao/GbStreamMapper.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/dao/GbStreamMapper.java @@ -10,6 +10,7 @@ import org.apache.ibatis.annotations.Param; import org.springframework.stereotype.Repository; import java.util.List; +import java.util.Map; @Mapper @Repository @@ -170,4 +171,7 @@ public interface GbStreamMapper { @Select("SELECT status FROM wvp_stream_push WHERE app=#{app} AND stream=#{stream}") Boolean selectStatusForPush(@Param("app") String app, @Param("stream") String stream); + @MapKey("gbId") + @Select("SELECT * from wvp_gb_stream") + Map getAllGBId(); } diff --git a/src/main/java/com/genersoft/iot/vmp/storager/dao/StreamPushMapper.java b/src/main/java/com/genersoft/iot/vmp/storager/dao/StreamPushMapper.java index 682f07c6..daf21eff 100755 --- a/src/main/java/com/genersoft/iot/vmp/storager/dao/StreamPushMapper.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/dao/StreamPushMapper.java @@ -7,6 +7,7 @@ import org.apache.ibatis.annotations.*; import org.springframework.stereotype.Repository; import java.util.List; +import java.util.Map; @Mapper @Repository @@ -195,4 +196,12 @@ public interface StreamPushMapper { "" + "") List getListIn(List streamPushItems); + + @MapKey("vhost") + @Select("SELECT CONCAT(wsp.app, wsp.stream) as vhost, wsp.app, wsp.stream, wgs.gb_id, wgs.name " + + " from wvp_stream_push wsp " + + " left join wvp_gb_stream wgs on wgs.app = wsp.app and wgs.stream = wsp.stream") + Map getAllAppAndStreamMap(); + + } diff --git a/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/gbStream/GbStreamController.java b/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/gbStream/GbStreamController.java index d4791801..e65a5795 100755 --- a/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/gbStream/GbStreamController.java +++ b/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/gbStream/GbStreamController.java @@ -3,10 +3,9 @@ package com.genersoft.iot.vmp.vmanager.gb28181.gbStream; import com.genersoft.iot.vmp.conf.exception.ControllerException; import com.genersoft.iot.vmp.conf.security.JwtUtils; import com.genersoft.iot.vmp.gb28181.bean.GbStream; -import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform; import com.genersoft.iot.vmp.service.IGbStreamService; import com.genersoft.iot.vmp.service.IPlatformService; -import com.genersoft.iot.vmp.storager.IVideoManagerStorage; +import com.genersoft.iot.vmp.service.IStreamPushService; import com.genersoft.iot.vmp.vmanager.bean.ErrorCode; import com.genersoft.iot.vmp.vmanager.gb28181.gbStream.bean.GbStreamParam; import com.github.pagehelper.PageInfo; @@ -20,7 +19,6 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.util.ObjectUtils; import org.springframework.web.bind.annotation.*; -import java.util.ArrayList; import java.util.List; @Tag(name = "视频流关联到级联平台") @@ -34,6 +32,9 @@ public class GbStreamController { @Autowired private IGbStreamService gbStreamService; + @Autowired + private IStreamPushService service; + @Autowired private IPlatformService platformService;