[集群-自动切换过国标级联] 添加数据库脚本

dev/数据库统合
648540858 2025-01-02 14:29:13 +08:00
parent 3331a7d931
commit 884fcb827f
10 changed files with 78 additions and 17 deletions

View File

@ -0,0 +1,22 @@
package com.genersoft.iot.vmp.common;
import lombok.Data;
@Data
public class ServerInfo {
private String ip;
private int port;
/**
* 使线
*/
private int threadNumber;
public static ServerInfo create(String ip, int port, int threadNumber) {
ServerInfo serverInfo = new ServerInfo();
serverInfo.setIp(ip);
serverInfo.setPort(port);
serverInfo.setThreadNumber(threadNumber);
return serverInfo;
}
}

View File

@ -1,12 +1,14 @@
package com.genersoft.iot.vmp.conf; package com.genersoft.iot.vmp.conf;
import com.alibaba.fastjson2.JSONObject; import com.genersoft.iot.vmp.common.ServerInfo;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.annotation.Scheduled; import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import java.util.concurrent.TimeUnit;
@Component @Component
public class WVPTimerTask { public class WVPTimerTask {
@ -19,11 +21,8 @@ public class WVPTimerTask {
@Autowired @Autowired
private SipConfig sipConfig; private SipConfig sipConfig;
@Scheduled(fixedDelay = 2 * 1000) //每3秒执行一次 @Scheduled(fixedDelay = 2, timeUnit = TimeUnit.SECONDS) //每3秒执行一次
public void execute(){ public void execute(){
JSONObject jsonObject = new JSONObject(); redisCatchStorage.updateWVPInfo(ServerInfo.create(sipConfig.getShowIp(), serverPort), 3);
jsonObject.put("ip", sipConfig.getShowIp());
jsonObject.put("port", serverPort);
redisCatchStorage.updateWVPInfo(jsonObject, 3);
} }
} }

View File

@ -127,4 +127,7 @@ public class Platform {
@Schema(description = "保密属性必选缺省为00-不涉密1-涉密") @Schema(description = "保密属性必选缺省为00-不涉密1-涉密")
private int secrecy = 0; private int secrecy = 0;
@Schema(description = "执行注册的服务ID")
private String serverId;
} }

View File

@ -16,11 +16,11 @@ public interface PlatformMapper {
@Insert("INSERT INTO wvp_platform (enable, name, server_gb_id, server_gb_domain, server_ip, server_port,device_gb_id,device_ip,"+ @Insert("INSERT INTO wvp_platform (enable, name, server_gb_id, server_gb_domain, server_ip, server_port,device_gb_id,device_ip,"+
" device_port,username,password,expires,keep_timeout,transport,character_set,ptz,rtcp,status,catalog_group, update_time," + " device_port,username,password,expires,keep_timeout,transport,character_set,ptz,rtcp,status,catalog_group, update_time," +
" create_time, as_message_channel, send_stream_ip, auto_push_channel, catalog_with_platform,catalog_with_group,catalog_with_region, "+ " create_time, as_message_channel, send_stream_ip, auto_push_channel, catalog_with_platform,catalog_with_group,catalog_with_region, "+
" civil_code,manufacturer,model,address,register_way,secrecy) " + " civil_code,manufacturer,model,address,register_way,secrecy,server_id) " +
" VALUES (#{enable}, #{name}, #{serverGBId}, #{serverGBDomain}, #{serverIp}, #{serverPort}, #{deviceGBId}, #{deviceIp}, " + " VALUES (#{enable}, #{name}, #{serverGBId}, #{serverGBDomain}, #{serverIp}, #{serverPort}, #{deviceGBId}, #{deviceIp}, " +
" #{devicePort}, #{username}, #{password}, #{expires}, #{keepTimeout}, #{transport}, #{characterSet}, #{ptz}, #{rtcp}, #{status}, #{catalogGroup},#{updateTime}," + " #{devicePort}, #{username}, #{password}, #{expires}, #{keepTimeout}, #{transport}, #{characterSet}, #{ptz}, #{rtcp}, #{status}, #{catalogGroup},#{updateTime}," +
" #{createTime}, #{asMessageChannel}, #{sendStreamIp}, #{autoPushChannel}, #{catalogWithPlatform}, #{catalogWithGroup},#{catalogWithRegion}, " + " #{createTime}, #{asMessageChannel}, #{sendStreamIp}, #{autoPushChannel}, #{catalogWithPlatform}, #{catalogWithGroup},#{catalogWithRegion}, " +
" #{civilCode}, #{manufacturer}, #{model}, #{address}, #{registerWay}, #{secrecy})") " #{civilCode}, #{manufacturer}, #{model}, #{address}, #{registerWay}, #{secrecy}, #{serverId})")
int add(Platform parentPlatform); int add(Platform parentPlatform);
@Update("UPDATE wvp_platform " + @Update("UPDATE wvp_platform " +
@ -55,6 +55,7 @@ public interface PlatformMapper {
" model=#{model}, " + " model=#{model}, " +
" address=#{address}, " + " address=#{address}, " +
" register_way=#{registerWay}, " + " register_way=#{registerWay}, " +
" server_id=#{serverId}, " +
" secrecy=#{secrecy} " + " secrecy=#{secrecy} " +
"WHERE id=#{id}") "WHERE id=#{id}")
int update(Platform parentPlatform); int update(Platform parentPlatform);
@ -77,7 +78,7 @@ public interface PlatformMapper {
List<Platform> queryList(@Param("query") String query); List<Platform> queryList(@Param("query") String query);
@Select("SELECT * FROM wvp_platform WHERE enable=#{enable} ") @Select("SELECT * FROM wvp_platform WHERE enable=#{enable} ")
List<Platform> getEnableParentPlatformList(boolean enable); List<Platform> queryEnableParentPlatformList(boolean enable);
@Select("SELECT * FROM wvp_platform WHERE enable=true and as_message_channel=true") @Select("SELECT * FROM wvp_platform WHERE enable=true and as_message_channel=true")
List<Platform> queryEnablePlatformListWithAsMessageChannel(); List<Platform> queryEnablePlatformListWithAsMessageChannel();
@ -91,7 +92,6 @@ public interface PlatformMapper {
@Update("UPDATE wvp_platform SET status=#{online} WHERE server_gb_id=#{platformGbID}" ) @Update("UPDATE wvp_platform SET status=#{online} WHERE server_gb_id=#{platformGbID}" )
int updateStatus(@Param("platformGbID") String platformGbID, @Param("online") boolean online); int updateStatus(@Param("platformGbID") String platformGbID, @Param("online") boolean online);
@Select("SELECT * FROM wvp_platform WHERE enable=true") @Select("SELECT server_id FROM wvp_platform WHERE enable=true and server_id != #{serverId} group by server_id")
List<Platform> queryEnablePlatformList(); List<String> queryServerIdsWithEnable(@Param("serverId") String serverId);
} }

View File

@ -34,6 +34,7 @@ import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.event.EventListener; import org.springframework.context.event.EventListener;
import org.springframework.scheduling.annotation.Async; import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional; import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.Assert; import org.springframework.util.Assert;
@ -46,6 +47,7 @@ import java.text.ParseException;
import java.util.List; import java.util.List;
import java.util.UUID; import java.util.UUID;
import java.util.Vector; import java.util.Vector;
import java.util.concurrent.TimeUnit;
/** /**
* @author lin * @author lin
@ -98,6 +100,28 @@ public class PlatformServiceImpl implements IPlatformService {
@Autowired @Autowired
private ISendRtpServerService sendRtpServerService; private ISendRtpServerService sendRtpServerService;
// 定时监听国标级联所进行的WVP服务是否正常 如果异常则选择新的wvp执行
@Scheduled(fixedDelay = 2, timeUnit = TimeUnit.SECONDS) //每3秒执行一次
public void execute(){
if (!userSetting.isAutoRegisterPlatform()) {
return;
}
// 查找非平台的国标级联执行服务Id
List<String> serverIds = platformMapper.queryServerIdsWithEnable(userSetting.getServerId());
if (serverIds == null || serverIds.isEmpty()) {
return;
}
serverIds.forEach(serverId -> {
// 检查每个是否存活
ServerInfo serverInfo = redisCatchStorage.queryServerInfo(serverId);
if (serverInfo == null) {
// 此平台需要选择新平台处理
}
});
}
/** /**
* *
*/ */
@ -780,7 +804,7 @@ public class PlatformServiceImpl implements IPlatformService {
@Override @Override
public List<Platform> queryEnablePlatformList() { public List<Platform> queryEnablePlatformList() {
return platformMapper.queryEnablePlatformList(); return platformMapper.queryEnableParentPlatformList(true);
} }
@Override @Override

View File

@ -1,6 +1,7 @@
package com.genersoft.iot.vmp.storager; package com.genersoft.iot.vmp.storager;
import com.alibaba.fastjson2.JSONObject; import com.alibaba.fastjson2.JSONObject;
import com.genersoft.iot.vmp.common.ServerInfo;
import com.genersoft.iot.vmp.common.SystemAllInfo; import com.genersoft.iot.vmp.common.SystemAllInfo;
import com.genersoft.iot.vmp.gb28181.bean.*; import com.genersoft.iot.vmp.gb28181.bean.*;
import com.genersoft.iot.vmp.media.bean.MediaInfo; import com.genersoft.iot.vmp.media.bean.MediaInfo;
@ -37,7 +38,7 @@ public interface IRedisCatchStorage {
/** /**
* rediswvp * rediswvp
*/ */
void updateWVPInfo(JSONObject jsonObject, int time); void updateWVPInfo(ServerInfo serverInfo, int time);
/** /**
* *
@ -187,4 +188,6 @@ public interface IRedisCatchStorage {
void sendStartSendRtp(SendRtpInfo sendRtpItem); void sendStartSendRtp(SendRtpInfo sendRtpItem);
void sendPushStreamOnline(SendRtpInfo sendRtpItem); void sendPushStreamOnline(SendRtpInfo sendRtpItem);
ServerInfo queryServerInfo(String serverId);
} }

View File

@ -2,9 +2,9 @@ package com.genersoft.iot.vmp.storager.impl;
import com.alibaba.fastjson2.JSON; import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONObject; import com.alibaba.fastjson2.JSONObject;
import com.genersoft.iot.vmp.common.ServerInfo;
import com.genersoft.iot.vmp.common.SystemAllInfo; import com.genersoft.iot.vmp.common.SystemAllInfo;
import com.genersoft.iot.vmp.common.VideoManagerConstants; import com.genersoft.iot.vmp.common.VideoManagerConstants;
import com.genersoft.iot.vmp.common.enums.ChannelDataType;
import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.gb28181.bean.*; import com.genersoft.iot.vmp.gb28181.bean.*;
import com.genersoft.iot.vmp.gb28181.dao.DeviceChannelMapper; import com.genersoft.iot.vmp.gb28181.dao.DeviceChannelMapper;
@ -110,10 +110,11 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage {
@Override @Override
public void updateWVPInfo(JSONObject jsonObject, int time) { public void updateWVPInfo(ServerInfo serverInfo, int time) {
String key = VideoManagerConstants.WVP_SERVER_PREFIX + userSetting.getServerId(); String key = VideoManagerConstants.WVP_SERVER_PREFIX + userSetting.getServerId();
Duration duration = Duration.ofSeconds(time); Duration duration = Duration.ofSeconds(time);
redisTemplate.opsForValue().set(key, jsonObject, duration); redisTemplate.opsForValue().set(key, serverInfo, duration);
//
} }
@Override @Override
@ -533,4 +534,10 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage {
log.info("[redis发送通知] 流上线 {}: {}/{}->{}", key, sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getTargetId()); log.info("[redis发送通知] 流上线 {}: {}/{}->{}", key, sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getTargetId());
redisTemplate.convertAndSend(key, JSON.toJSON(sendRtpItem)); redisTemplate.convertAndSend(key, JSON.toJSON(sendRtpItem));
} }
@Override
public ServerInfo queryServerInfo(String serverId) {
String key = VideoManagerConstants.WVP_SERVER_PREFIX + serverId;
return (ServerInfo)redisTemplate.opsForValue().get(key);
}
} }

View File

@ -225,6 +225,7 @@ create table wvp_platform
catalog_with_region integer default 1, catalog_with_region integer default 1,
auto_push_channel bool default true, auto_push_channel bool default true,
send_stream_ip character varying(50), send_stream_ip character varying(50),
server_id character varying(50),
constraint uk_platform_unique_server_gb_id unique (server_gb_id) constraint uk_platform_unique_server_gb_id unique (server_gb_id)
); );

View File

@ -242,6 +242,7 @@ create table wvp_platform
catalog_with_region integer default 1, catalog_with_region integer default 1,
auto_push_channel bool default true, auto_push_channel bool default true,
send_stream_ip character varying(50), send_stream_ip character varying(50),
server_id character varying(50),
constraint uk_platform_unique_server_gb_id unique (server_gb_id) constraint uk_platform_unique_server_gb_id unique (server_gb_id)
); );

View File

@ -5,6 +5,7 @@ alter table wvp_device add server_id character varying(50);
alter table wvp_media_server add server_id character varying(50); alter table wvp_media_server add server_id character varying(50);
alter table wvp_stream_proxy add server_id character varying(50); alter table wvp_stream_proxy add server_id character varying(50);
alter table wvp_cloud_record add server_id character varying(50); alter table wvp_cloud_record add server_id character varying(50);
alter table wvp_platform add server_id character varying(50);
update wvp_device set server_id = "你服务的ID"; update wvp_device set server_id = "你服务的ID";
update wvp_media_server set server_id = "你服务的ID"; update wvp_media_server set server_id = "你服务的ID";