From 800d6c926a74bad890ad5e699544e2834042eb45 Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: Sat, 14 Sep 2024 15:19:07 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E5=A4=8D=E6=95=B0=E6=8D=AE=E5=BA=93?= =?UTF-8?q?=E8=AF=AD=E6=B3=95=E5=85=BC=E5=AE=B9=E4=BB=A5=E5=8F=8Aredis?= =?UTF-8?q?=E6=8E=A5=E6=94=B6=E6=8E=A8=E6=B5=81=E4=BF=A1=E6=81=AF=E5=AF=BC?= =?UTF-8?q?=E5=85=A5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../vmp/conf/redis/RedisMsgListenConfig.java | 4 +- .../iot/vmp/gb28181/bean/CommonGBChannel.java | 4 +- .../iot/vmp/gb28181/bean/Platform.java | 12 +- .../gb28181/dao/CommonGBChannelMapper.java | 8 +- .../vmp/gb28181/dao/DeviceChannelMapper.java | 2 +- .../service/impl/GbChannelServiceImpl.java | 14 +- .../impl/PlatformChannelServiceImpl.java | 6 +- .../service/impl/PlatformServiceImpl.java | 20 +- ...va => RedisPushStreamListMsgListener.java} | 35 ++-- .../iot/vmp/storager/IRedisCatchStorage.java | 2 +- .../iot/vmp/storager/dao/GbStreamMapper.java | 177 ------------------ .../storager/impl/RedisCatchStorageImpl.java | 4 +- .../streamProxy/dao/StreamProxyMapper.java | 2 - .../bean/RedisPushStreamMessage.java | 24 +++ .../iot/vmp/streamPush/bean/StreamPush.java | 2 + .../vmp/streamPush/dao/StreamPushMapper.java | 10 +- .../service/impl/StreamPushServiceImpl.java | 14 +- src/main/resources/application.yml | 2 +- web_src/src/components/PlatformEdit.vue | 6 +- 数据库/2.7.2-重构/初始化-mysql-2.7.2.sql | 18 +- .../初始化-postgresql-kingbase-2.7.2.sql | 28 +-- 21 files changed, 120 insertions(+), 274 deletions(-) rename src/main/java/com/genersoft/iot/vmp/service/redisMsg/{RedisPushStreamStatusListMsgListener.java => RedisPushStreamListMsgListener.java} (67%) delete mode 100755 src/main/java/com/genersoft/iot/vmp/storager/dao/GbStreamMapper.java create mode 100644 src/main/java/com/genersoft/iot/vmp/streamPush/bean/RedisPushStreamMessage.java diff --git a/src/main/java/com/genersoft/iot/vmp/conf/redis/RedisMsgListenConfig.java b/src/main/java/com/genersoft/iot/vmp/conf/redis/RedisMsgListenConfig.java index dcf2830c..f70a9e31 100644 --- a/src/main/java/com/genersoft/iot/vmp/conf/redis/RedisMsgListenConfig.java +++ b/src/main/java/com/genersoft/iot/vmp/conf/redis/RedisMsgListenConfig.java @@ -32,7 +32,7 @@ public class RedisMsgListenConfig { private RedisPushStreamStatusMsgListener redisPushStreamStatusMsgListener; @Autowired - private RedisPushStreamStatusListMsgListener redisPushStreamListMsgListener; + private RedisPushStreamListMsgListener pushStreamListMsgListener; @Autowired @@ -61,7 +61,7 @@ public class RedisMsgListenConfig { container.addMessageListener(redisGPSMsgListener, new PatternTopic(VideoManagerConstants.VM_MSG_GPS)); container.addMessageListener(redisAlarmMsgListener, new PatternTopic(VideoManagerConstants.VM_MSG_SUBSCRIBE_ALARM_RECEIVE)); container.addMessageListener(redisPushStreamStatusMsgListener, new PatternTopic(VideoManagerConstants.VM_MSG_PUSH_STREAM_STATUS_CHANGE)); - container.addMessageListener(redisPushStreamListMsgListener, new PatternTopic(VideoManagerConstants.VM_MSG_PUSH_STREAM_LIST_CHANGE)); + container.addMessageListener(pushStreamListMsgListener, new PatternTopic(VideoManagerConstants.VM_MSG_PUSH_STREAM_LIST_CHANGE)); container.addMessageListener(redisCloseStreamMsgListener, new PatternTopic(VideoManagerConstants.VM_MSG_STREAM_PUSH_CLOSE)); container.addMessageListener(redisRpcConfig, new PatternTopic(RedisRpcConfig.REDIS_REQUEST_CHANNEL_KEY)); container.addMessageListener(redisPushStreamCloseResponseListener, new PatternTopic(VideoManagerConstants.VM_MSG_STREAM_PUSH_RESPONSE)); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/CommonGBChannel.java b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/CommonGBChannel.java index b3a41392..6f2e147d 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/CommonGBChannel.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/CommonGBChannel.java @@ -51,7 +51,7 @@ public class CommonGBChannel { // 2016 @Schema(description = "国标-证书序列号") - private Integer gbCertNum; + private String gbCertNum; // 2016 @Schema(description = "国标-证书有效标识") @@ -63,7 +63,7 @@ public class CommonGBChannel { // 2016 @Schema(description = "国标-证书终止有效期(有证书且证书无效的设备必选)") - private Integer gbEndTime; + private String gbEndTime; // 2022 @Schema(description = "国标-摄像机安全能力等级代码") diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/Platform.java b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/Platform.java index 790d2426..74afd9ff 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/Platform.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/Platform.java @@ -97,14 +97,14 @@ public class Platform { @Schema(description = "是否自动推送通道变化") private Boolean autoPushChannel; - @Schema(description = "目录信息包含平台信息") - private Boolean catalogWithPlatform; + @Schema(description = "目录信息包含平台信息, 0:关闭,1:打开") + private int catalogWithPlatform; - @Schema(description = "目录信息包含分组信息") - private Boolean catalogWithGroup; + @Schema(description = "目录信息包含分组信息, 0:关闭,1:打开") + private int catalogWithGroup; - @Schema(description = "目录信息包含行政区划") - private Boolean catalogWithRegion; + @Schema(description = "目录信息包含行政区划, 0:关闭,1:打开") + private int catalogWithRegion; @Schema(description = "行政区划") private String civilCode; diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/dao/CommonGBChannelMapper.java b/src/main/java/com/genersoft/iot/vmp/gb28181/dao/CommonGBChannelMapper.java index f2b93d41..c48a1878 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/dao/CommonGBChannelMapper.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/dao/CommonGBChannelMapper.java @@ -154,13 +154,13 @@ public interface CommonGBChannelMapper { @Update("") - int updateStatusForListById(List commonGBChannels, @Param("status") int status); + int updateStatusForListById(List commonGBChannels, @Param("status") String status); @SelectProvider(type = ChannelProvider.class, method = "queryInListByStatus") - List queryInListByStatus(List commonGBChannelList, @Param("status") int status); + List queryInListByStatus(List commonGBChannelList, @Param("status") String status); @Insert(" "}) int batchUpdate(List commonGBChannels); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/dao/DeviceChannelMapper.java b/src/main/java/com/genersoft/iot/vmp/gb28181/dao/DeviceChannelMapper.java index 5ee523c4..b0b52d72 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/dao/DeviceChannelMapper.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/dao/DeviceChannelMapper.java @@ -612,7 +612,7 @@ public interface DeviceChannelMapper { int batchOffline(List channels); - @Select("select count(1) from wvp_device_channel where status = true") + @Select("select count(1) from wvp_device_channel where status = 'ON'") int getOnlineCount(); @Select("select count(1) from wvp_device_channel") diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/GbChannelServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/GbChannelServiceImpl.java index ffbe6f64..e0826601 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/GbChannelServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/GbChannelServiceImpl.java @@ -153,7 +153,7 @@ public class GbChannelServiceImpl implements IGbChannelService { log.warn("[多个通道离线] 通道数量为0,更新失败"); return 0; } - List onlineChannelList = commonGBChannelMapper.queryInListByStatus(commonGBChannelList, 1); + List onlineChannelList = commonGBChannelMapper.queryInListByStatus(commonGBChannelList, "ON"); if (onlineChannelList.isEmpty()) { log.warn("[多个通道离线] 更新失败, 参数内通道已经离线"); return 0; @@ -166,10 +166,10 @@ public class GbChannelServiceImpl implements IGbChannelService { if (i + limitCount > onlineChannelList.size()) { toIndex = onlineChannelList.size(); } - result += commonGBChannelMapper.updateStatusForListById(onlineChannelList.subList(i, toIndex), 0); + result += commonGBChannelMapper.updateStatusForListById(onlineChannelList.subList(i, toIndex), "OFF"); } } else { - result += commonGBChannelMapper.updateStatusForListById(onlineChannelList, 0); + result += commonGBChannelMapper.updateStatusForListById(onlineChannelList, "OFF"); } if (result > 0) { try { @@ -207,7 +207,7 @@ public class GbChannelServiceImpl implements IGbChannelService { log.warn("[多个通道上线] 通道数量为0,更新失败"); return 0; } - List offlineChannelList = commonGBChannelMapper.queryInListByStatus(commonGBChannelList, 0); + List offlineChannelList = commonGBChannelMapper.queryInListByStatus(commonGBChannelList, "OFF"); if (offlineChannelList.isEmpty()) { log.warn("[多个通道上线] 更新失败, 参数内通道已经上线线"); return 0; @@ -221,10 +221,10 @@ public class GbChannelServiceImpl implements IGbChannelService { if (i + limitCount > offlineChannelList.size()) { toIndex = offlineChannelList.size(); } - result += commonGBChannelMapper.updateStatusForListById(offlineChannelList.subList(i, toIndex), 1); + result += commonGBChannelMapper.updateStatusForListById(offlineChannelList.subList(i, toIndex), "ON"); } } else { - result += commonGBChannelMapper.updateStatusForListById(offlineChannelList, 1); + result += commonGBChannelMapper.updateStatusForListById(offlineChannelList, "ON"); } if (result > 0) { try { @@ -282,7 +282,7 @@ public class GbChannelServiceImpl implements IGbChannelService { } else { result += commonGBChannelMapper.batchUpdate(commonGBChannels); } - log.warn("[更新多个通道] 通道数量为{},成功保存:{}", commonGBChannels.size(), result); + log.info("[更新多个通道] 通道数量为{},成功保存:{}", commonGBChannels.size(), result); // 发送通过更新通知 try { // 发送通知 diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlatformChannelServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlatformChannelServiceImpl.java index 20d8b73a..7a12647c 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlatformChannelServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlatformChannelServiceImpl.java @@ -376,19 +376,19 @@ public class PlatformChannelServiceImpl implements IPlatformChannelService { } List channelList = new ArrayList<>(); // 是否包含平台信息 - if (platform.getCatalogWithPlatform()) { + if (platform.getCatalogWithPlatform() > 0) { CommonGBChannel channel = CommonGBChannel.build(platform); channelList.add(channel); } // 关联的行政区划信息 - if (platform.getCatalogWithRegion()) { + if (platform.getCatalogWithRegion() > 0) { // 查询关联平台的行政区划信息 List regionChannelList = regionMapper.queryByPlatform(platform.getId()); if (!regionChannelList.isEmpty()) { channelList.addAll(regionChannelList); } } - if (platform.getCatalogWithGroup()) { + if (platform.getCatalogWithGroup() > 0) { // 关联的分组信息 List groupChannelList = groupMapper.queryForPlatform(platform.getId()); if (!groupChannelList.isEmpty()) { diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlatformServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlatformServiceImpl.java index d530b71f..720b8789 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlatformServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlatformServiceImpl.java @@ -27,7 +27,6 @@ import com.genersoft.iot.vmp.media.service.IMediaServerService; import com.genersoft.iot.vmp.service.ISendRtpServerService; import com.genersoft.iot.vmp.service.bean.*; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; -import com.genersoft.iot.vmp.storager.dao.GbStreamMapper; import com.genersoft.iot.vmp.utils.DateUtil; import com.github.pagehelper.PageHelper; import com.github.pagehelper.PageInfo; @@ -83,9 +82,6 @@ public class PlatformServiceImpl implements IPlatformService { @Autowired private SubscribeHolder subscribeHolder; - @Autowired - private GbStreamMapper gbStreamMapper; - @Autowired private UserSetting userSetting; @@ -457,27 +453,21 @@ public class PlatformServiceImpl implements IPlatformService { SubscribeInfo subscribe = subscribeHolder.getMobilePositionSubscribe(platform.getServerGBId()); if (subscribe != null) { - // TODO 暂时只处理视频流的回复,后续增加对国标设备的支持 - List gbStreams = gbStreamMapper.queryGbStreamListInPlatform(platform.getServerGBId(), userSetting.isUsePushingAsStatus()); - if (gbStreams.size() == 0) { + List channelList = platformChannelMapper.queryShare(platform.getId(), null); + if (channelList.isEmpty()) { return; } - for (DeviceChannel deviceChannel : gbStreams) { - String gbId = deviceChannel.getDeviceId(); - GPSMsgInfo gpsMsgInfo = redisCatchStorage.getGpsMsgInfo(gbId); + for (CommonGBChannel channel : channelList) { + GPSMsgInfo gpsMsgInfo = redisCatchStorage.getGpsMsgInfo(channel.getGbId()); // 无最新位置不发送 if (gpsMsgInfo != null) { // 经纬度都为0不发送 if (gpsMsgInfo.getLng() == 0 && gpsMsgInfo.getLat() == 0) { continue; } - CommonGBChannel commonGBChannel = platformChannelMapper.queryShareChannel(platform.getId(), deviceChannel.getId()); - if (commonGBChannel == null) { - continue; - } // 发送GPS消息 try { - commanderForPlatform.sendNotifyMobilePosition(platform, gpsMsgInfo, commonGBChannel, subscribe); + commanderForPlatform.sendNotifyMobilePosition(platform, gpsMsgInfo, channel, subscribe); } catch (InvalidArgumentException | ParseException | NoSuchFieldException | SipException | IllegalAccessException e) { log.error("[命令发送失败] 国标级联 移动位置通知: {}", e.getMessage()); diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamStatusListMsgListener.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamListMsgListener.java similarity index 67% rename from src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamStatusListMsgListener.java rename to src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamListMsgListener.java index d853e066..1c2845a4 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamStatusListMsgListener.java +++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamListMsgListener.java @@ -3,6 +3,7 @@ package com.genersoft.iot.vmp.service.redisMsg; import com.alibaba.fastjson2.JSON; import com.alibaba.fastjson2.JSONObject; import com.genersoft.iot.vmp.media.service.IMediaServerService; +import com.genersoft.iot.vmp.streamPush.bean.RedisPushStreamMessage; import com.genersoft.iot.vmp.streamPush.bean.StreamPush; import com.genersoft.iot.vmp.streamPush.service.IStreamPushService; import com.genersoft.iot.vmp.utils.DateUtil; @@ -24,10 +25,12 @@ import java.util.concurrent.ConcurrentLinkedQueue; * @Auther: JiangFeng * @Date: 2022/8/16 11:32 * @Description: 接收redis发送的推流设备列表更新通知 + * 监听: SUBSCRIBE VM_MSG_PUSH_STREAM_LIST_CHANGE + * 发布 PUBLISH VM_MSG_PUSH_STREAM_LIST_CHANGE '[{"app":1000,"stream":10000000,"gbId":"12345678901234567890","name":"A6","status":false},{"app":1000,"stream":10000021,"gbId":"24212345671381000021","name":"终端9273","status":false},{"app":1000,"stream":10000022,"gbId":"24212345671381000022","name":"终端9434","status":true},{"app":1000,"stream":10000025,"gbId":"24212345671381000025","name":"华为M10","status":false},{"app":1000,"stream":10000051,"gbId":"11111111111381111122","name":"终端9720","status":false}]' */ @Slf4j @Component -public class RedisPushStreamStatusListMsgListener implements MessageListener { +public class RedisPushStreamListMsgListener implements MessageListener { @Resource private IMediaServerService mediaServerService; @@ -51,7 +54,7 @@ public class RedisPushStreamStatusListMsgListener implements MessageListener { while (!taskQueue.isEmpty()) { Message msg = taskQueue.poll(); try { - List streamPushItems = JSON.parseArray(new String(msg.getBody()), StreamPush.class); + List streamPushItems = JSON.parseArray(new String(msg.getBody()), RedisPushStreamMessage.class); //查询全部的app+stream 用于判断是添加还是修改 Map allAppAndStream = streamPushService.getAllAppAndStreamMap(); Map allGBId = streamPushService.getAllGBId(); @@ -61,31 +64,39 @@ public class RedisPushStreamStatusListMsgListener implements MessageListener { */ List streamPushItemForSave = new ArrayList<>(); List streamPushItemForUpdate = new ArrayList<>(); - for (StreamPush streamPush : streamPushItems) { - String app = streamPush.getApp(); - String stream = streamPush.getStream(); + for (RedisPushStreamMessage pushStreamMessage : streamPushItems) { + String app = pushStreamMessage.getApp(); + String stream = pushStreamMessage.getStream(); boolean contains = allAppAndStream.containsKey(app + stream); //不存在就添加 if (!contains) { - if (allGBId.containsKey(streamPush.getGbDeviceId())) { - StreamPush streamPushInDb = allGBId.get(streamPush.getGbDeviceId()); + if (allGBId.containsKey(pushStreamMessage.getGbId())) { + StreamPush streamPushInDb = allGBId.get(pushStreamMessage.getGbId()); log.warn("[REDIS消息-推流设备列表更新-INSERT] 国标编号重复: {}, 已分配给{}/{}", streamPushInDb.getGbDeviceId(), streamPushInDb.getApp(), streamPushInDb.getStream()); continue; } + StreamPush streamPush = pushStreamMessage.buildstreamPush(); streamPush.setCreateTime(DateUtil.getNow()); + streamPush.setUpdateTime(DateUtil.getNow()); streamPush.setMediaServerId(mediaServerService.getDefaultMediaServer().getId()); streamPushItemForSave.add(streamPush); allGBId.put(streamPush.getGbDeviceId(), streamPush); } else { - if (allGBId.containsKey(streamPush.getGbDeviceId()) - && (!allGBId.get(streamPush.getGbDeviceId()).getApp().equals(streamPush.getApp()) - || !allGBId.get(streamPush.getGbDeviceId()).getStream().equals(streamPush.getStream()))) { - StreamPush streamPushInDb = allGBId.get(streamPush.getGbDeviceId()); + StreamPush streamPushForGbDeviceId = allGBId.get(pushStreamMessage.getGbId()); + if (streamPushForGbDeviceId != null + && (!streamPushForGbDeviceId.getApp().equals(pushStreamMessage.getApp()) + || !streamPushForGbDeviceId.getStream().equals(pushStreamMessage.getStream()))) { + StreamPush streamPushInDb = allGBId.get(pushStreamMessage.getGbId()); log.warn("[REDIS消息-推流设备列表更新-UPDATE] 国标编号重复: {}, 已分配给{}/{}", - streamPush.getGbDeviceId(), streamPushInDb.getApp(), streamPushInDb.getStream()); + pushStreamMessage.getGbId(), streamPushInDb.getApp(), streamPushInDb.getStream()); continue; } + StreamPush streamPush = allAppAndStream.get(app + stream); + streamPush.setUpdateTime(DateUtil.getNow()); + streamPush.setGbDeviceId(pushStreamMessage.getGbId()); + streamPush.setGbName(pushStreamMessage.getName()); + streamPush.setGbStatus(pushStreamMessage.isStatus()?"ON":"OFF"); //存在就只修改 name和gbId streamPushItemForUpdate.add(streamPush); } diff --git a/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java b/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java index 6d705101..82f94a79 100755 --- a/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java @@ -97,7 +97,7 @@ public interface IRedisCatchStorage { void updateGpsMsgInfo(GPSMsgInfo gpsMsgInfo); - GPSMsgInfo getGpsMsgInfo(String gbId); + GPSMsgInfo getGpsMsgInfo(Integer gbId); List getAllGpsMsgInfo(); Long getSN(String method); diff --git a/src/main/java/com/genersoft/iot/vmp/storager/dao/GbStreamMapper.java b/src/main/java/com/genersoft/iot/vmp/storager/dao/GbStreamMapper.java deleted file mode 100755 index 74f3f3ba..00000000 --- a/src/main/java/com/genersoft/iot/vmp/storager/dao/GbStreamMapper.java +++ /dev/null @@ -1,177 +0,0 @@ -package com.genersoft.iot.vmp.storager.dao; - -import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel; -import com.genersoft.iot.vmp.gb28181.bean.GbStream; -import com.genersoft.iot.vmp.streamProxy.bean.StreamProxy; -import com.genersoft.iot.vmp.streamPush.bean.StreamPush; -import com.genersoft.iot.vmp.service.bean.GPSMsgInfo; -import org.apache.ibatis.annotations.*; -import org.apache.ibatis.annotations.Param; -import org.springframework.stereotype.Repository; - -import java.util.List; -import java.util.Map; - -@Mapper -@Repository -public interface GbStreamMapper { - - @Insert("INSERT INTO wvp_gb_stream (app, stream, gb_id, name, " + - "longitude, latitude, stream_type,media_server_id,create_time) VALUES" + - "(#{app}, #{stream}, #{gbId}, #{name}, " + - "#{longitude}, #{latitude}, #{streamType}, " + - "#{mediaServerId}, #{createTime})") - @Options(useGeneratedKeys = true, keyProperty = "gbStreamId", keyColumn = "gb_stream_id") - int add(GbStream gbStream); - - @Update("UPDATE wvp_gb_stream " + - "SET app=#{app}," + - "stream=#{stream}," + - "gb_id=#{gbId}," + - "name=#{name}," + - "stream_type=#{streamType}," + - "longitude=#{longitude}, " + - "latitude=#{latitude}," + - "media_server_id=#{mediaServerId}" + - "WHERE app=#{app} AND stream=#{stream}") - int updateByAppAndStream(GbStream gbStream); - - @Update("UPDATE wvp_gb_stream " + - "SET app=#{app}," + - "stream=#{stream}," + - "gb_id=#{gbId}," + - "name=#{name}," + - "stream_type=#{streamType}," + - "longitude=#{longitude}, " + - "latitude=#{latitude}," + - "media_server_id=#{mediaServerId}" + - "WHERE gb_stream_id=#{gbStreamId}") - int update(GbStream gbStream); - - @Delete("DELETE FROM wvp_gb_stream WHERE app=#{app} AND stream=#{stream}") - int del(@Param("app") String app, @Param("stream") String stream); - - @Select("") - List selectAll(@Param("platformId") String platformId, @Param("catalogId") String catalogId, @Param("query") String query, @Param("mediaServerId") String mediaServerId); - - @Select("SELECT * FROM wvp_gb_stream WHERE app=#{app} AND stream=#{stream}") - GbStream selectOne(@Param("app") String app, @Param("stream") String stream); - - @Select("SELECT * FROM wvp_gb_stream WHERE gb_id=#{gbId}") - List selectByGBId(String gbId); - - @Select("SELECT gs.*, pgs.platform_id as platform_id, pgs.catalog_id as catalog_id FROM wvp_gb_stream gs " + - "LEFT JOIN wvp_platform_gb_stream pgs ON gs.gb_stream_id = pgs.gb_stream_id " + - "WHERE gs.gb_id = #{gbId} AND pgs.platform_id = #{platformId}") - GbStream queryStreamInPlatform(@Param("platformId") String platformId, @Param("gbId") String gbId); - - @Select("") - List queryGbStreamListInPlatform(String platformId, @Param("usPushingAsStatus") boolean usPushingAsStatus); - - - @Select("SELECT gs.* FROM wvp_gb_stream gs left join wvp_platform_gb_stream pgs " + - "ON gs.gb_stream_id = pgs.gb_stream_id WHERE pgs.gb_stream_id is NULL") - List queryStreamNotInPlatform(); - - @Delete("DELETE FROM wvp_gb_stream WHERE stream_type=#{type} AND gb_id=NULL AND media_server_id=#{mediaServerId}") - void deleteWithoutGBId(@Param("type") String type, @Param("mediaServerId") String mediaServerId); - - @Delete("") - void batchDel(List streamProxyItemList); - - @Delete("") - void batchDelForGbStream(List gbStreams); - - @Insert("") - @Options(useGeneratedKeys = true, keyProperty = "gbStreamId", keyColumn = "gb_stream_id") - void batchAdd(@Param("subList") List subList); - - @Update({""}) - int updateStreamGPS(List gpsMsgInfos); - - @Select("") - List selectAllForAppAndStream(List streamPushItems); - - @Update("UPDATE wvp_gb_stream " + - "SET media_server_id=#{mediaServerId}" + - "WHERE app=#{app} AND stream=#{stream}") - void updateMediaServer(String app, String stream, String mediaServerId); - - @Update("") - int updateGbIdOrName(List streamPushItemForUpdate); - - @Select("SELECT status FROM wvp_stream_proxy WHERE app=#{app} AND stream=#{stream}") - Boolean selectStatusForProxy(@Param("app") String app, @Param("stream") String stream); - - @Select("SELECT status FROM wvp_stream_push WHERE app=#{app} AND stream=#{stream}") - Boolean selectStatusForPush(@Param("app") String app, @Param("stream") String stream); - - @MapKey("gbId") - @Select("SELECT * from wvp_gb_stream") - Map getAllGBId(); -} diff --git a/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java b/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java index 2425c49c..acf96fab 100755 --- a/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java @@ -253,8 +253,8 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage { } @Override - public GPSMsgInfo getGpsMsgInfo(String gbId) { - String key = VideoManagerConstants.WVP_STREAM_GPS_MSG_PREFIX + userSetting.getServerId() + "_" + gbId; + public GPSMsgInfo getGpsMsgInfo(Integer channelId) { + String key = VideoManagerConstants.WVP_STREAM_GPS_MSG_PREFIX + userSetting.getServerId() + "_" + channelId; return JsonUtil.redisJsonToObject(redisTemplate, key, GPSMsgInfo.class); } diff --git a/src/main/java/com/genersoft/iot/vmp/streamProxy/dao/StreamProxyMapper.java b/src/main/java/com/genersoft/iot/vmp/streamProxy/dao/StreamProxyMapper.java index 74fe9dbf..551b6e3c 100755 --- a/src/main/java/com/genersoft/iot/vmp/streamProxy/dao/StreamProxyMapper.java +++ b/src/main/java/com/genersoft/iot/vmp/streamProxy/dao/StreamProxyMapper.java @@ -24,8 +24,6 @@ public interface StreamProxyMapper { "SET type=#{type}, " + "app=#{app}," + "stream=#{stream}," + - "app=#{app}," + - "stream=#{stream}," + "media_server_id=#{mediaServerId}, " + "src_url=#{srcUrl}," + "timeout=#{timeout}, " + diff --git a/src/main/java/com/genersoft/iot/vmp/streamPush/bean/RedisPushStreamMessage.java b/src/main/java/com/genersoft/iot/vmp/streamPush/bean/RedisPushStreamMessage.java new file mode 100644 index 00000000..6d12fb48 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/streamPush/bean/RedisPushStreamMessage.java @@ -0,0 +1,24 @@ +package com.genersoft.iot.vmp.streamPush.bean; + +import lombok.Data; + +@Data +public class RedisPushStreamMessage { + + private String gbId; + private String app; + private String stream; + private String name; + private boolean status; + + public StreamPush buildstreamPush() { + StreamPush push = new StreamPush(); + push.setApp(app); + push.setStream(stream); + push.setGbName(name); + push.setGbDeviceId(gbId); + push.setStartOfflinePush(true); + push.setGbStatus(status?"ON":"OFF"); + return push; + } +} diff --git a/src/main/java/com/genersoft/iot/vmp/streamPush/bean/StreamPush.java b/src/main/java/com/genersoft/iot/vmp/streamPush/bean/StreamPush.java index 2c2da81e..9a9d5e98 100755 --- a/src/main/java/com/genersoft/iot/vmp/streamPush/bean/StreamPush.java +++ b/src/main/java/com/genersoft/iot/vmp/streamPush/bean/StreamPush.java @@ -76,6 +76,8 @@ public class StreamPush extends CommonGBChannel implements Comparable"}) int getAllPushing(Boolean usePushingAsStatus); - @MapKey("vhost") - @Select("SELECT CONCAT(wsp.app, wsp.stream) as vhost, wsp.*, wsp.* , wsp.id as gb_id " + + @MapKey("uniqueKey") + @Select("SELECT CONCAT(wsp.app, wsp.stream) as unique_key, wsp.*, wsp.* , wdc.id as gb_id " + " from wvp_stream_push wsp " + " LEFT join wvp_device_channel wdc on wsp.id = wdc.stream_push_id") Map getAllAppAndStreamMap(); - @MapKey("gb_device_id") - @Select("SELECT wdc.gb_device_id, wsp.id as stream_push_id, wsp.*, wsp.* , wsp.id as gb_id " + + @MapKey("gbDeviceId") + @Select("SELECT wdc.gb_device_id, wsp.id as stream_push_id, wsp.*, wsp.* , wdc.id as gb_id " + " from wvp_stream_push wsp " + " LEFT join wvp_device_channel wdc on wsp.id = wdc.stream_push_id") Map getAllGBId(); @@ -150,7 +150,7 @@ public interface StreamPushMapper { ", push_time=#{item.pushTime}" + ", pushing=#{item.pushing}" + ", start_offline_push=#{item.startOfflinePush}" + - " WHERE id=#{item.item.id}" + + " WHERE id=#{item.id}" + "" + ""}) int batchUpdate(List streamPushItemForUpdate); diff --git a/src/main/java/com/genersoft/iot/vmp/streamPush/service/impl/StreamPushServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/streamPush/service/impl/StreamPushServiceImpl.java index b5c0727a..46806e93 100755 --- a/src/main/java/com/genersoft/iot/vmp/streamPush/service/impl/StreamPushServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/streamPush/service/impl/StreamPushServiceImpl.java @@ -542,16 +542,14 @@ public class StreamPushServiceImpl implements IStreamPushService { @Override public void batchUpdate(List streamPushItemForUpdate) { - int result = streamPushMapper.batchUpdate(streamPushItemForUpdate); - if (result > 0) { - List commonGBChannels = new ArrayList<>(); - for (StreamPush streamPush : streamPushItemForUpdate) { - if (!ObjectUtils.isEmpty(streamPush.getGbDeviceId())) { - commonGBChannels.add(streamPush.buildCommonGBChannel()); - } + streamPushMapper.batchUpdate(streamPushItemForUpdate); + List commonGBChannels = new ArrayList<>(); + for (StreamPush streamPush : streamPushItemForUpdate) { + if (!ObjectUtils.isEmpty(streamPush.getGbDeviceId())) { + commonGBChannels.add(streamPush.buildCommonGBChannel()); } - gbChannelService.batchUpdate(commonGBChannels); } + gbChannelService.batchUpdate(commonGBChannels); } @Override diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index b91ff2aa..e7c85a82 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -2,4 +2,4 @@ spring: application: name: wvp profiles: - active: 272重构 \ No newline at end of file + active: 272重构-postgre \ No newline at end of file diff --git a/web_src/src/components/PlatformEdit.vue b/web_src/src/components/PlatformEdit.vue index 9d93b619..f815adda 100644 --- a/web_src/src/components/PlatformEdit.vue +++ b/web_src/src/components/PlatformEdit.vue @@ -124,9 +124,9 @@ - - - + + + diff --git a/数据库/2.7.2-重构/初始化-mysql-2.7.2.sql b/数据库/2.7.2-重构/初始化-mysql-2.7.2.sql index b49463b8..b9fd774b 100644 --- a/数据库/2.7.2-重构/初始化-mysql-2.7.2.sql +++ b/数据库/2.7.2-重构/初始化-mysql-2.7.2.sql @@ -158,10 +158,10 @@ create table wvp_device_channel gb_parent_id character varying(255), gb_safety_way integer, gb_register_way integer, - gb_cert_num integer, + gb_cert_num character varying(50), gb_certifiable integer, gb_err_code integer, - gb_end_time integer, + gb_end_time character varying(50), gb_security_level_code character varying(255), gb_secrecy integer, gb_ip_address character varying(50), @@ -274,9 +274,9 @@ create table wvp_platform create_time character varying(50), update_time character varying(50), as_message_channel bool default false, - catalog_with_platform bool default true, - catalog_with_group bool default true, - catalog_with_region bool default true, + catalog_with_platform integer default 1, + catalog_with_group integer default 1, + catalog_with_region integer default 1, auto_push_channel bool default true, send_stream_ip character varying(50), constraint uk_platform_unique_server_gb_id unique (server_gb_id) @@ -331,7 +331,7 @@ create table wvp_platform_group id serial primary key, platform_id integer, group_id integer, - constraint uk_wvp_platform_group_id_catalog_platform_id_group_id unique (platform_id, group_id) + constraint uk_wvp_platform_group_platform_id_group_id unique (platform_id, group_id) ); create table wvp_platform_region @@ -339,7 +339,7 @@ create table wvp_platform_region id serial primary key, platform_id integer, region_id integer, - constraint uk_wvp_platform_group_id_catalog_platform_id_group_id unique (platform_id, region_id) + constraint uk_wvp_platform_region_platform_id_group_id unique (platform_id, region_id) ); create table wvp_stream_proxy @@ -463,7 +463,7 @@ CREATE TABLE wvp_common_group create_time varchar(50) NOT NULL, update_time varchar(50) NOT NULL, civil_code varchar(50) default null, - UNIQUE KEY common_group_device_platform (device_id) + constraint uk_common_group_device_platform unique (device_id) ); CREATE TABLE wvp_common_region @@ -475,6 +475,6 @@ CREATE TABLE wvp_common_region parent_device_id varchar(50) DEFAULT NULL, create_time varchar(50) NOT NULL, update_time varchar(50) NOT NULL, - UNIQUE KEY common_region_device_id (device_id) + constraint uk_common_region_device_id unique (device_id) ); diff --git a/数据库/2.7.2-重构/初始化-postgresql-kingbase-2.7.2.sql b/数据库/2.7.2-重构/初始化-postgresql-kingbase-2.7.2.sql index 4ba572df..7debbf15 100644 --- a/数据库/2.7.2-重构/初始化-postgresql-kingbase-2.7.2.sql +++ b/数据库/2.7.2-重构/初始化-postgresql-kingbase-2.7.2.sql @@ -158,18 +158,18 @@ create table wvp_device_channel gb_parent_id character varying(255), gb_safety_way integer, gb_register_way integer, - gb_cert_num integer, + gb_cert_num character varying(50), gb_certifiable integer, gb_err_code integer, - gb_end_time integer, + gb_end_time character varying(50), gb_security_level_code character varying(255), gb_secrecy integer, gb_ip_address character varying(50), gb_port integer, gb_password character varying(50), gb_status character varying(50), - gb_longitude double, - gb_latitude double, + gb_longitude double precision, + gb_latitude double precision, gb_business_group_id character varying(50), gb_ptz_type integer, gb_position_type integer, @@ -186,9 +186,9 @@ create table wvp_device_channel gb_svc_time_support_mode integer, gb_ssvc_ratio_support_list character varying(255), gb_mobile_device_type integer, - gb_horizontal_field_angle double, - gb_vertical_field_angle double, - gb_max_view_distance double, + gb_horizontal_field_angle double precision, + gb_vertical_field_angle double precision, + gb_max_view_distance double precision, gb_grassroots_code character varying(255), gb_po_type integer, gb_po_common_name character varying(255), @@ -274,9 +274,9 @@ create table wvp_platform create_time character varying(50), update_time character varying(50), as_message_channel bool default false, - catalog_with_platform bool default true, - catalog_with_group bool default true, - catalog_with_region bool default true, + catalog_with_platform integer default 1, + catalog_with_group integer default 1, + catalog_with_region integer default 1, auto_push_channel bool default true, send_stream_ip character varying(50), constraint uk_platform_unique_server_gb_id unique (server_gb_id) @@ -331,7 +331,7 @@ create table wvp_platform_group id serial primary key, platform_id integer, group_id integer, - constraint uk_wvp_platform_group_id_catalog_platform_id_group_id unique (platform_id, group_id) + constraint uk_wvp_platform_group_platform_id_group_id unique (platform_id, group_id) ); create table wvp_platform_region @@ -339,7 +339,7 @@ create table wvp_platform_region id serial primary key, platform_id integer, region_id integer, - constraint uk_wvp_platform_group_id_catalog_platform_id_group_id unique (platform_id, region_id) + constraint uk_wvp_platform_region_platform_id_group_id unique (platform_id, region_id) ); create table wvp_stream_proxy @@ -463,7 +463,7 @@ CREATE TABLE wvp_common_group create_time varchar(50) NOT NULL, update_time varchar(50) NOT NULL, civil_code varchar(50) default null, - UNIQUE KEY common_group_device_platform (device_id) + constraint uk_common_group_device_platform unique (device_id) ); CREATE TABLE wvp_common_region @@ -475,6 +475,6 @@ CREATE TABLE wvp_common_region parent_device_id varchar(50) DEFAULT NULL, create_time varchar(50) NOT NULL, update_time varchar(50) NOT NULL, - UNIQUE KEY common_region_device_id (device_id) + constraint uk_common_region_device_id unique (device_id) );