优化启动后清理过期信息的逻辑
parent
010b09674c
commit
faac93613a
|
@ -30,7 +30,7 @@ public class ZLMKeepliveTimeoutListener extends KeyExpirationEventMessageListene
|
|||
public ZLMKeepliveTimeoutListener(RedisMessageListenerContainer listenerContainer) {
|
||||
super(listenerContainer);
|
||||
// 配置springboot默认Config为空,即不让应用去修改redis的默认配置,因为Redis服务出于安全会禁用CONFIG命令给远程用户使用
|
||||
setKeyspaceNotificationsConfigParameter("");
|
||||
// setKeyspaceNotificationsConfigParameter("");
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -31,10 +31,8 @@ public class ZLMOfflineEventListener implements ApplicationListener<ZLMOfflineEv
|
|||
|
||||
@Override
|
||||
public void onApplicationEvent(ZLMOfflineEvent event) {
|
||||
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("ZLM离线事件触发,ID:" + event.getMediaServerId());
|
||||
}
|
||||
|
||||
logger.info("ZLM离线事件触发,ID:" + event.getMediaServerId());
|
||||
// 处理ZLM离线
|
||||
mediaServerService.zlmServerOffline(event.getMediaServerId());
|
||||
streamProxyService.zlmServerOffline(event.getMediaServerId());
|
||||
|
|
|
@ -37,10 +37,8 @@ public class ZLMOnlineEventListener implements ApplicationListener<ZLMOnlineEven
|
|||
|
||||
@Override
|
||||
public void onApplicationEvent(ZLMOnlineEvent event) {
|
||||
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("ZLM上线事件触发,ID:" + event.getMediaServerId());
|
||||
}
|
||||
|
||||
logger.info("ZLM上线事件触发,ID:" + event.getMediaServerId());
|
||||
streamPushService.zlmServerOnline(event.getMediaServerId());
|
||||
streamProxyService.zlmServerOnline(event.getMediaServerId());
|
||||
|
||||
|
|
|
@ -565,7 +565,6 @@ public class MediaServerServiceImpl implements IMediaServerService, CommandLineR
|
|||
redisUtil.zRemove(VideoManagerConstants.MEDIA_SERVERS_ONLINE_PREFIX + userSetup.getServerId(), id);
|
||||
String key = VideoManagerConstants.MEDIA_SERVER_PREFIX + userSetup.getServerId() + "_" + id;
|
||||
redisUtil.del(key);
|
||||
mediaServerMapper.delOne(id);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -154,47 +154,61 @@ public class StreamPushServiceImpl implements IStreamPushService {
|
|||
if (mediaServerItem == null) {
|
||||
return;
|
||||
}
|
||||
// 数据库记录
|
||||
List<StreamPushItem> pushList = getPushList(mediaServerId);
|
||||
Map<String, StreamPushItem> pushItemMap = new HashMap<>();
|
||||
// redis记录
|
||||
List<StreamInfo> streamInfoPushList = redisCatchStorage.getStreams(mediaServerId, "PUSH");
|
||||
Map<String, StreamInfo> streamInfoPushItemMap = new HashMap<>();
|
||||
if (pushList.size() > 0) {
|
||||
Map<String, StreamPushItem> pushItemMap = new HashMap<>();
|
||||
for (StreamPushItem streamPushItem : pushList) {
|
||||
pushItemMap.put(streamPushItem.getApp() + streamPushItem.getStream(), streamPushItem);
|
||||
}
|
||||
zlmresTfulUtils.getMediaList(mediaServerItem, (mediaList ->{
|
||||
if (mediaList == null) return;
|
||||
String dataStr = mediaList.getString("data");
|
||||
|
||||
Integer code = mediaList.getInteger("code");
|
||||
List<StreamPushItem> streamPushItems = null;
|
||||
if (code == 0 ) {
|
||||
if (dataStr != null) {
|
||||
streamPushItems = handleJSON(dataStr, mediaServerItem);
|
||||
}
|
||||
}
|
||||
|
||||
if (streamPushItems != null) {
|
||||
for (StreamPushItem streamPushItem : streamPushItems) {
|
||||
pushItemMap.remove(streamPushItem.getApp() + streamPushItem.getStream());
|
||||
}
|
||||
}
|
||||
Collection<StreamPushItem> offlinePushItems = pushItemMap.values();
|
||||
if (offlinePushItems.size() > 0) {
|
||||
String type = "PUSH";
|
||||
streamPushMapper.delAll(new ArrayList<>(offlinePushItems));
|
||||
for (StreamPushItem offlinePushItem : offlinePushItems) {
|
||||
JSONObject jsonObject = new JSONObject();
|
||||
jsonObject.put("serverId", userSetup.getServerId());
|
||||
jsonObject.put("app", offlinePushItem.getApp());
|
||||
jsonObject.put("stream", offlinePushItem.getStream());
|
||||
jsonObject.put("register", false);
|
||||
jsonObject.put("mediaServerId", mediaServerId);
|
||||
redisCatchStorage.sendStreamChangeMsg(type, jsonObject);
|
||||
// 移除redis内流的信息
|
||||
redisCatchStorage.removeStream(mediaServerItem.getId(), "PUSH", offlinePushItem.getApp(), offlinePushItem.getStream());
|
||||
}
|
||||
}
|
||||
}));
|
||||
}
|
||||
if (streamInfoPushList.size() > 0) {
|
||||
for (StreamInfo streamInfo : streamInfoPushList) {
|
||||
streamInfoPushItemMap.put(streamInfo.getApp() + streamInfo.getStreamId(), streamInfo);
|
||||
}
|
||||
}
|
||||
zlmresTfulUtils.getMediaList(mediaServerItem, (mediaList ->{
|
||||
if (mediaList == null) return;
|
||||
String dataStr = mediaList.getString("data");
|
||||
|
||||
Integer code = mediaList.getInteger("code");
|
||||
List<StreamPushItem> streamPushItems = null;
|
||||
if (code == 0 ) {
|
||||
if (dataStr != null) {
|
||||
streamPushItems = handleJSON(dataStr, mediaServerItem);
|
||||
}
|
||||
}
|
||||
|
||||
if (streamPushItems != null) {
|
||||
for (StreamPushItem streamPushItem : streamPushItems) {
|
||||
pushItemMap.remove(streamPushItem.getApp() + streamPushItem.getStream());
|
||||
streamInfoPushItemMap.remove(streamPushItem.getApp() + streamPushItem.getStream());
|
||||
}
|
||||
}
|
||||
Collection<StreamPushItem> offlinePushItems = pushItemMap.values();
|
||||
if (offlinePushItems.size() > 0) {
|
||||
String type = "PUSH";
|
||||
streamPushMapper.delAll(new ArrayList<>(offlinePushItems));
|
||||
}
|
||||
Collection<StreamInfo> offlineStreamInfoItems = streamInfoPushItemMap.values();
|
||||
if (offlineStreamInfoItems.size() > 0) {
|
||||
String type = "PUSH";
|
||||
for (StreamInfo offlineStreamInfoItem : offlineStreamInfoItems) {
|
||||
JSONObject jsonObject = new JSONObject();
|
||||
jsonObject.put("serverId", userSetup.getServerId());
|
||||
jsonObject.put("app", offlineStreamInfoItem.getApp());
|
||||
jsonObject.put("stream", offlineStreamInfoItem.getStreamId());
|
||||
jsonObject.put("register", false);
|
||||
jsonObject.put("mediaServerId", mediaServerId);
|
||||
redisCatchStorage.sendStreamChangeMsg(type, jsonObject);
|
||||
// 移除redis内流的信息
|
||||
redisCatchStorage.removeStream(mediaServerItem.getId(), "PUSH", offlineStreamInfoItem.getApp(), offlineStreamInfoItem.getStreamId());
|
||||
}
|
||||
}
|
||||
}));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -211,6 +225,8 @@ public class StreamPushServiceImpl implements IStreamPushService {
|
|||
List<StreamInfo> streamInfoList = redisCatchStorage.getStreams(mediaServerId, type);
|
||||
if (streamInfoList.size() > 0) {
|
||||
for (StreamInfo streamInfo : streamInfoList) {
|
||||
// 移除redis内流的信息
|
||||
redisCatchStorage.removeStream(mediaServerId, type, streamInfo.getApp(), streamInfo.getStreamId());
|
||||
JSONObject jsonObject = new JSONObject();
|
||||
jsonObject.put("serverId", userSetup.getServerId());
|
||||
jsonObject.put("app", streamInfo.getApp());
|
||||
|
@ -218,8 +234,6 @@ public class StreamPushServiceImpl implements IStreamPushService {
|
|||
jsonObject.put("register", false);
|
||||
jsonObject.put("mediaServerId", mediaServerId);
|
||||
redisCatchStorage.sendStreamChangeMsg(type, jsonObject);
|
||||
// 移除redis内流的信息
|
||||
redisCatchStorage.removeStream(mediaServerId, type, streamInfo.getApp(), streamInfo.getStreamId());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue