diff --git a/src/main/java/com/genersoft/iot/vmp/common/ServerInfo.java b/src/main/java/com/genersoft/iot/vmp/common/ServerInfo.java new file mode 100644 index 00000000..f03f7190 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/common/ServerInfo.java @@ -0,0 +1,22 @@ +package com.genersoft.iot.vmp.common; + +import lombok.Data; + +@Data +public class ServerInfo { + + private String ip; + private int port; + /** + * 现在使用的线程数 + */ + private int threadNumber; + + public static ServerInfo create(String ip, int port, int threadNumber) { + ServerInfo serverInfo = new ServerInfo(); + serverInfo.setIp(ip); + serverInfo.setPort(port); + serverInfo.setThreadNumber(threadNumber); + return serverInfo; + } +} diff --git a/src/main/java/com/genersoft/iot/vmp/conf/WVPTimerTask.java b/src/main/java/com/genersoft/iot/vmp/conf/WVPTimerTask.java index 6da0caf3..4a2098a2 100644 --- a/src/main/java/com/genersoft/iot/vmp/conf/WVPTimerTask.java +++ b/src/main/java/com/genersoft/iot/vmp/conf/WVPTimerTask.java @@ -1,12 +1,14 @@ package com.genersoft.iot.vmp.conf; -import com.alibaba.fastjson2.JSONObject; +import com.genersoft.iot.vmp.common.ServerInfo; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; +import java.util.concurrent.TimeUnit; + @Component public class WVPTimerTask { @@ -19,11 +21,8 @@ public class WVPTimerTask { @Autowired private SipConfig sipConfig; - @Scheduled(fixedDelay = 2 * 1000) //每3秒执行一次 + @Scheduled(fixedDelay = 2, timeUnit = TimeUnit.SECONDS) //每3秒执行一次 public void execute(){ - JSONObject jsonObject = new JSONObject(); - jsonObject.put("ip", sipConfig.getShowIp()); - jsonObject.put("port", serverPort); - redisCatchStorage.updateWVPInfo(jsonObject, 3); + redisCatchStorage.updateWVPInfo(ServerInfo.create(sipConfig.getShowIp(), serverPort), 3); } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/Platform.java b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/Platform.java index 74afd9ff..13ec2c8b 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/Platform.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/Platform.java @@ -127,4 +127,7 @@ public class Platform { @Schema(description = "保密属性(必选)缺省为0;0-不涉密,1-涉密") private int secrecy = 0; + + @Schema(description = "执行注册的服务ID") + private String serverId; } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/dao/PlatformMapper.java b/src/main/java/com/genersoft/iot/vmp/gb28181/dao/PlatformMapper.java index b8080bb5..d65075ee 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/dao/PlatformMapper.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/dao/PlatformMapper.java @@ -16,11 +16,11 @@ public interface PlatformMapper { @Insert("INSERT INTO wvp_platform (enable, name, server_gb_id, server_gb_domain, server_ip, server_port,device_gb_id,device_ip,"+ " device_port,username,password,expires,keep_timeout,transport,character_set,ptz,rtcp,status,catalog_group, update_time," + " create_time, as_message_channel, send_stream_ip, auto_push_channel, catalog_with_platform,catalog_with_group,catalog_with_region, "+ - " civil_code,manufacturer,model,address,register_way,secrecy) " + + " civil_code,manufacturer,model,address,register_way,secrecy,server_id) " + " VALUES (#{enable}, #{name}, #{serverGBId}, #{serverGBDomain}, #{serverIp}, #{serverPort}, #{deviceGBId}, #{deviceIp}, " + " #{devicePort}, #{username}, #{password}, #{expires}, #{keepTimeout}, #{transport}, #{characterSet}, #{ptz}, #{rtcp}, #{status}, #{catalogGroup},#{updateTime}," + " #{createTime}, #{asMessageChannel}, #{sendStreamIp}, #{autoPushChannel}, #{catalogWithPlatform}, #{catalogWithGroup},#{catalogWithRegion}, " + - " #{civilCode}, #{manufacturer}, #{model}, #{address}, #{registerWay}, #{secrecy})") + " #{civilCode}, #{manufacturer}, #{model}, #{address}, #{registerWay}, #{secrecy}, #{serverId})") int add(Platform parentPlatform); @Update("UPDATE wvp_platform " + @@ -55,6 +55,7 @@ public interface PlatformMapper { " model=#{model}, " + " address=#{address}, " + " register_way=#{registerWay}, " + + " server_id=#{serverId}, " + " secrecy=#{secrecy} " + "WHERE id=#{id}") int update(Platform parentPlatform); @@ -77,7 +78,7 @@ public interface PlatformMapper { List queryList(@Param("query") String query); @Select("SELECT * FROM wvp_platform WHERE enable=#{enable} ") - List getEnableParentPlatformList(boolean enable); + List queryEnableParentPlatformList(boolean enable); @Select("SELECT * FROM wvp_platform WHERE enable=true and as_message_channel=true") List queryEnablePlatformListWithAsMessageChannel(); @@ -91,7 +92,6 @@ public interface PlatformMapper { @Update("UPDATE wvp_platform SET status=#{online} WHERE server_gb_id=#{platformGbID}" ) int updateStatus(@Param("platformGbID") String platformGbID, @Param("online") boolean online); - @Select("SELECT * FROM wvp_platform WHERE enable=true") - List queryEnablePlatformList(); - + @Select("SELECT server_id FROM wvp_platform WHERE enable=true and server_id != #{serverId} group by server_id") + List queryServerIdsWithEnable(@Param("serverId") String serverId); } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlatformServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlatformServiceImpl.java index 55c39c36..a35dfbac 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlatformServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlatformServiceImpl.java @@ -34,6 +34,7 @@ import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.event.EventListener; import org.springframework.scheduling.annotation.Async; +import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; import org.springframework.util.Assert; @@ -46,6 +47,7 @@ import java.text.ParseException; import java.util.List; import java.util.UUID; import java.util.Vector; +import java.util.concurrent.TimeUnit; /** * @author lin @@ -98,6 +100,28 @@ public class PlatformServiceImpl implements IPlatformService { @Autowired private ISendRtpServerService sendRtpServerService; + // 定时监听国标级联所进行的WVP服务是否正常, 如果异常则选择新的wvp执行 + @Scheduled(fixedDelay = 2, timeUnit = TimeUnit.SECONDS) //每3秒执行一次 + public void execute(){ + if (!userSetting.isAutoRegisterPlatform()) { + return; + } + // 查找非平台的国标级联执行服务Id + List serverIds = platformMapper.queryServerIdsWithEnable(userSetting.getServerId()); + if (serverIds == null || serverIds.isEmpty()) { + return; + } + serverIds.forEach(serverId -> { + // 检查每个是否存活 + ServerInfo serverInfo = redisCatchStorage.queryServerInfo(serverId); + if (serverInfo == null) { + // 此平台需要选择新平台处理 + + } + }); + + } + /** * 流离开的处理 */ @@ -780,7 +804,7 @@ public class PlatformServiceImpl implements IPlatformService { @Override public List queryEnablePlatformList() { - return platformMapper.queryEnablePlatformList(); + return platformMapper.queryEnableParentPlatformList(true); } @Override diff --git a/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java b/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java index 238134a9..78f26a18 100755 --- a/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java @@ -1,6 +1,7 @@ package com.genersoft.iot.vmp.storager; import com.alibaba.fastjson2.JSONObject; +import com.genersoft.iot.vmp.common.ServerInfo; import com.genersoft.iot.vmp.common.SystemAllInfo; import com.genersoft.iot.vmp.gb28181.bean.*; import com.genersoft.iot.vmp.media.bean.MediaInfo; @@ -37,7 +38,7 @@ public interface IRedisCatchStorage { /** * 在redis添加wvp的信息 */ - void updateWVPInfo(JSONObject jsonObject, int time); + void updateWVPInfo(ServerInfo serverInfo, int time); /** * 发送推流生成与推流消失消息 @@ -187,4 +188,6 @@ public interface IRedisCatchStorage { void sendStartSendRtp(SendRtpInfo sendRtpItem); void sendPushStreamOnline(SendRtpInfo sendRtpItem); + + ServerInfo queryServerInfo(String serverId); } diff --git a/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java b/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java index b97c9cf0..ec60df6d 100755 --- a/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java @@ -2,9 +2,9 @@ package com.genersoft.iot.vmp.storager.impl; import com.alibaba.fastjson2.JSON; import com.alibaba.fastjson2.JSONObject; +import com.genersoft.iot.vmp.common.ServerInfo; import com.genersoft.iot.vmp.common.SystemAllInfo; import com.genersoft.iot.vmp.common.VideoManagerConstants; -import com.genersoft.iot.vmp.common.enums.ChannelDataType; import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.gb28181.bean.*; import com.genersoft.iot.vmp.gb28181.dao.DeviceChannelMapper; @@ -110,10 +110,11 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage { @Override - public void updateWVPInfo(JSONObject jsonObject, int time) { + public void updateWVPInfo(ServerInfo serverInfo, int time) { String key = VideoManagerConstants.WVP_SERVER_PREFIX + userSetting.getServerId(); Duration duration = Duration.ofSeconds(time); - redisTemplate.opsForValue().set(key, jsonObject, duration); + redisTemplate.opsForValue().set(key, serverInfo, duration); + // } @Override @@ -533,4 +534,10 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage { log.info("[redis发送通知] 流上线 {}: {}/{}->{}", key, sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getTargetId()); redisTemplate.convertAndSend(key, JSON.toJSON(sendRtpItem)); } + + @Override + public ServerInfo queryServerInfo(String serverId) { + String key = VideoManagerConstants.WVP_SERVER_PREFIX + serverId; + return (ServerInfo)redisTemplate.opsForValue().get(key); + } } diff --git a/数据库/2.7.3/初始化-mysql-2.7.3.sql b/数据库/2.7.3/初始化-mysql-2.7.3.sql index 1679f826..27e42738 100644 --- a/数据库/2.7.3/初始化-mysql-2.7.3.sql +++ b/数据库/2.7.3/初始化-mysql-2.7.3.sql @@ -225,6 +225,7 @@ create table wvp_platform catalog_with_region integer default 1, auto_push_channel bool default true, send_stream_ip character varying(50), + server_id character varying(50), constraint uk_platform_unique_server_gb_id unique (server_gb_id) ); diff --git a/数据库/2.7.3/初始化-postgresql-kingbase-2.7.3.sql b/数据库/2.7.3/初始化-postgresql-kingbase-2.7.3.sql index 939e56a5..8b511957 100644 --- a/数据库/2.7.3/初始化-postgresql-kingbase-2.7.3.sql +++ b/数据库/2.7.3/初始化-postgresql-kingbase-2.7.3.sql @@ -242,6 +242,7 @@ create table wvp_platform catalog_with_region integer default 1, auto_push_channel bool default true, send_stream_ip character varying(50), + server_id character varying(50), constraint uk_platform_unique_server_gb_id unique (server_gb_id) ); diff --git a/数据库/2.7.3/数据库统合-更新.sql b/数据库/2.7.3/数据库统合-更新.sql index fdfbe4d5..8d850262 100644 --- a/数据库/2.7.3/数据库统合-更新.sql +++ b/数据库/2.7.3/数据库统合-更新.sql @@ -5,6 +5,7 @@ alter table wvp_device add server_id character varying(50); alter table wvp_media_server add server_id character varying(50); alter table wvp_stream_proxy add server_id character varying(50); alter table wvp_cloud_record add server_id character varying(50); +alter table wvp_platform add server_id character varying(50); update wvp_device set server_id = "你服务的ID"; update wvp_media_server set server_id = "你服务的ID";