修复节点上线和下线时未清理redis流信息的BUG

pull/1669/head
648540858 2024-10-24 16:41:41 +08:00
parent 0ddc374c34
commit bf3e181bdb
4 changed files with 35 additions and 9 deletions

View File

@ -16,6 +16,8 @@ import com.genersoft.iot.vmp.media.event.media.MediaArrivalEvent;
import com.genersoft.iot.vmp.media.event.media.MediaDepartureEvent; import com.genersoft.iot.vmp.media.event.media.MediaDepartureEvent;
import com.genersoft.iot.vmp.media.event.mediaServer.MediaServerChangeEvent; import com.genersoft.iot.vmp.media.event.mediaServer.MediaServerChangeEvent;
import com.genersoft.iot.vmp.media.event.mediaServer.MediaServerDeleteEvent; import com.genersoft.iot.vmp.media.event.mediaServer.MediaServerDeleteEvent;
import com.genersoft.iot.vmp.media.event.mediaServer.MediaServerOfflineEvent;
import com.genersoft.iot.vmp.media.event.mediaServer.MediaServerOnlineEvent;
import com.genersoft.iot.vmp.media.service.IMediaNodeServerService; import com.genersoft.iot.vmp.media.service.IMediaNodeServerService;
import com.genersoft.iot.vmp.media.service.IMediaServerService; import com.genersoft.iot.vmp.media.service.IMediaServerService;
import com.genersoft.iot.vmp.media.zlm.dto.StreamAuthorityInfo; import com.genersoft.iot.vmp.media.zlm.dto.StreamAuthorityInfo;
@ -40,6 +42,7 @@ import org.springframework.context.event.EventListener;
import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.scheduling.annotation.Async; import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.ObjectUtils; import org.springframework.util.ObjectUtils;
import java.time.LocalDateTime; import java.time.LocalDateTime;
@ -111,6 +114,26 @@ public class MediaServerServiceImpl implements IMediaServerService {
String type = OriginType.values()[mediaInfo.getOriginType()].getType(); String type = OriginType.values()[mediaInfo.getOriginType()].getType();
redisCatchStorage.removeStream(mediaInfo.getMediaServer().getId(), type, event.getApp(), event.getStream()); redisCatchStorage.removeStream(mediaInfo.getMediaServer().getId(), type, event.getApp(), event.getStream());
} }
}
/**
* 线
*/
@Async("taskExecutor")
@EventListener
@Transactional
public void onApplicationEvent(MediaServerOnlineEvent event) {
// 查看是否有未处理的RTP流
}
/**
* 线
*/
@Async("taskExecutor")
@EventListener
@Transactional
public void onApplicationEvent(MediaServerOfflineEvent event) {
} }

View File

@ -4,7 +4,7 @@ import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONArray; import com.alibaba.fastjson2.JSONArray;
import com.alibaba.fastjson2.JSONObject; import com.alibaba.fastjson2.JSONObject;
import com.genersoft.iot.vmp.conf.DynamicTask; import com.genersoft.iot.vmp.conf.DynamicTask;
import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
import com.genersoft.iot.vmp.media.bean.MediaServer; import com.genersoft.iot.vmp.media.bean.MediaServer;
import com.genersoft.iot.vmp.media.event.mediaServer.MediaServerChangeEvent; import com.genersoft.iot.vmp.media.event.mediaServer.MediaServerChangeEvent;
import com.genersoft.iot.vmp.media.event.mediaServer.MediaServerDeleteEvent; import com.genersoft.iot.vmp.media.event.mediaServer.MediaServerDeleteEvent;
@ -57,7 +57,7 @@ public class ZLMMediaServerStatusManager {
private String serverServletContextPath; private String serverServletContextPath;
@Autowired @Autowired
private UserSetting userSetting; private EventPublisher eventPublisher;
private final String type = "zlm"; private final String type = "zlm";
@ -187,6 +187,8 @@ public class ZLMMediaServerStatusManager {
mediaServerItem.setStatus(true); mediaServerItem.setStatus(true);
mediaServerItem.setHookAliveInterval(10F); mediaServerItem.setHookAliveInterval(10F);
mediaServerService.update(mediaServerItem); mediaServerService.update(mediaServerItem);
// 发送上线通知
eventPublisher.mediaServerOnlineEventPublish(mediaServerItem.getId());
if(mediaServerItem.isAutoConfig()) { if(mediaServerItem.isAutoConfig()) {
if (config == null) { if (config == null) {
JSONObject responseJSON = zlmresTfulUtils.getMediaServerConfig(mediaServerItem); JSONObject responseJSON = zlmresTfulUtils.getMediaServerConfig(mediaServerItem);
@ -210,7 +212,8 @@ public class ZLMMediaServerStatusManager {
mediaServerItem.setStatus(false); mediaServerItem.setStatus(false);
offlineZlmPrimaryMap.put(mediaServerItem.getId(), mediaServerItem); offlineZlmPrimaryMap.put(mediaServerItem.getId(), mediaServerItem);
offlineZlmTimeMap.put(mediaServerItem.getId(), System.currentTimeMillis()); offlineZlmTimeMap.put(mediaServerItem.getId(), System.currentTimeMillis());
// TODO 发送离线通知 // 发送离线通知
eventPublisher.mediaServerOfflineEventPublish(mediaServerItem.getId());
mediaServerService.update(mediaServerItem); mediaServerService.update(mediaServerItem);
}, (int)(mediaServerItem.getHookAliveInterval() * 2 * 1000)); }, (int)(mediaServerItem.getHookAliveInterval() * 2 * 1000));
} }

View File

@ -137,7 +137,7 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage {
public void addStream(MediaServer mediaServerItem, String type, String app, String streamId, MediaInfo mediaInfo) { public void addStream(MediaServer mediaServerItem, String type, String app, String streamId, MediaInfo mediaInfo) {
// 查找是否使用了callID // 查找是否使用了callID
StreamAuthorityInfo streamAuthorityInfo = getStreamAuthorityInfo(app, streamId); StreamAuthorityInfo streamAuthorityInfo = getStreamAuthorityInfo(app, streamId);
String key = VideoManagerConstants.WVP_SERVER_STREAM_PREFIX + userSetting.getServerId() + "_" + type + "_" + app + "_" + streamId + "_" + mediaServerItem.getId(); String key = VideoManagerConstants.WVP_SERVER_STREAM_PREFIX + userSetting.getServerId() + "_" + type.toUpperCase() + "_" + app + "_" + streamId + "_" + mediaServerItem.getId();
if (streamAuthorityInfo != null) { if (streamAuthorityInfo != null) {
mediaInfo.setCallId(streamAuthorityInfo.getCallId()); mediaInfo.setCallId(streamAuthorityInfo.getCallId());
} }
@ -146,13 +146,13 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage {
@Override @Override
public void removeStream(String mediaServerId, String type, String app, String streamId) { public void removeStream(String mediaServerId, String type, String app, String streamId) {
String key = VideoManagerConstants.WVP_SERVER_STREAM_PREFIX + userSetting.getServerId() + "_" + type + "_" + app + "_" + streamId + "_" + mediaServerId; String key = VideoManagerConstants.WVP_SERVER_STREAM_PREFIX + userSetting.getServerId() + "_" + type.toUpperCase() + "_" + app + "_" + streamId + "_" + mediaServerId;
redisTemplate.delete(key); redisTemplate.delete(key);
} }
@Override @Override
public void removeStream(String mediaServerId, String type) { public void removeStream(String mediaServerId, String type) {
String key = VideoManagerConstants.WVP_SERVER_STREAM_PREFIX + userSetting.getServerId() + "_" + type + "_*_*_" + mediaServerId; String key = VideoManagerConstants.WVP_SERVER_STREAM_PREFIX + userSetting.getServerId() + "_" + type.toUpperCase() + "_*_*_" + mediaServerId;
List<Object> streams = RedisUtil.scan(redisTemplate, key); List<Object> streams = RedisUtil.scan(redisTemplate, key);
for (Object stream : streams) { for (Object stream : streams) {
redisTemplate.delete(stream); redisTemplate.delete(stream);
@ -162,7 +162,7 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage {
@Override @Override
public List<MediaInfo> getStreams(String mediaServerId, String type) { public List<MediaInfo> getStreams(String mediaServerId, String type) {
List<MediaInfo> result = new ArrayList<>(); List<MediaInfo> result = new ArrayList<>();
String key = VideoManagerConstants.WVP_SERVER_STREAM_PREFIX + userSetting.getServerId() + "_" + type + "_*_*_" + mediaServerId; String key = VideoManagerConstants.WVP_SERVER_STREAM_PREFIX + userSetting.getServerId() + "_" + type.toUpperCase() + "_*_*_" + mediaServerId;
List<Object> streams = RedisUtil.scan(redisTemplate, key); List<Object> streams = RedisUtil.scan(redisTemplate, key);
for (Object stream : streams) { for (Object stream : streams) {
MediaInfo mediaInfo = (MediaInfo)redisTemplate.opsForValue().get(stream); MediaInfo mediaInfo = (MediaInfo)redisTemplate.opsForValue().get(stream);

View File

@ -290,7 +290,7 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
return; return;
} }
// 这里主要是控制数据库/redis缓存/以及zlm中存在的代理流 三者状态一致。以数据库中数据为根本 // 这里主要是控制数据库/redis缓存/以及zlm中存在的代理流 三者状态一致。以数据库中数据为根本
redisCatchStorage.removeStream(mediaServerId, "pull"); redisCatchStorage.removeStream(mediaServerId, "PULL");
List<StreamProxy> streamProxies = streamProxyMapper.selectForEnableInMediaServer(mediaServerId, true); List<StreamProxy> streamProxies = streamProxyMapper.selectForEnableInMediaServer(mediaServerId, true);
if (streamProxies.isEmpty()) { if (streamProxies.isEmpty()) {
@ -363,7 +363,7 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
List<StreamProxy> streamProxies = streamProxyMapper.selectForEnableInMediaServer(mediaServerId, true); List<StreamProxy> streamProxies = streamProxyMapper.selectForEnableInMediaServer(mediaServerId, true);
// 清理redis相关的缓存 // 清理redis相关的缓存
redisCatchStorage.removeStream(mediaServerId, "pull"); redisCatchStorage.removeStream(mediaServerId, "PULL");
if (streamProxies.isEmpty()) { if (streamProxies.isEmpty()) {
return; return;