From bf3e181bdb4991599d6434f66e6db4eaa8b23efc Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: Thu, 24 Oct 2024 16:41:41 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E5=A4=8D=E8=8A=82=E7=82=B9=E4=B8=8A?= =?UTF-8?q?=E7=BA=BF=E5=92=8C=E4=B8=8B=E7=BA=BF=E6=97=B6=E6=9C=AA=E6=B8=85?= =?UTF-8?q?=E7=90=86redis=E6=B5=81=E4=BF=A1=E6=81=AF=E7=9A=84BUG?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../service/impl/MediaServerServiceImpl.java | 23 +++++++++++++++++++ .../zlm/ZLMMediaServerStatusManager.java | 9 +++++--- .../storager/impl/RedisCatchStorageImpl.java | 8 +++---- .../service/impl/StreamProxyServiceImpl.java | 4 ++-- 4 files changed, 35 insertions(+), 9 deletions(-) diff --git a/src/main/java/com/genersoft/iot/vmp/media/service/impl/MediaServerServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/media/service/impl/MediaServerServiceImpl.java index 56d5399a..3134c182 100755 --- a/src/main/java/com/genersoft/iot/vmp/media/service/impl/MediaServerServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/media/service/impl/MediaServerServiceImpl.java @@ -16,6 +16,8 @@ import com.genersoft.iot.vmp.media.event.media.MediaArrivalEvent; import com.genersoft.iot.vmp.media.event.media.MediaDepartureEvent; import com.genersoft.iot.vmp.media.event.mediaServer.MediaServerChangeEvent; import com.genersoft.iot.vmp.media.event.mediaServer.MediaServerDeleteEvent; +import com.genersoft.iot.vmp.media.event.mediaServer.MediaServerOfflineEvent; +import com.genersoft.iot.vmp.media.event.mediaServer.MediaServerOnlineEvent; import com.genersoft.iot.vmp.media.service.IMediaNodeServerService; import com.genersoft.iot.vmp.media.service.IMediaServerService; import com.genersoft.iot.vmp.media.zlm.dto.StreamAuthorityInfo; @@ -40,6 +42,7 @@ import org.springframework.context.event.EventListener; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Service; +import org.springframework.transaction.annotation.Transactional; import org.springframework.util.ObjectUtils; import java.time.LocalDateTime; @@ -111,6 +114,26 @@ public class MediaServerServiceImpl implements IMediaServerService { String type = OriginType.values()[mediaInfo.getOriginType()].getType(); redisCatchStorage.removeStream(mediaInfo.getMediaServer().getId(), type, event.getApp(), event.getStream()); } + } + + /** + * 流媒体节点上线 + */ + @Async("taskExecutor") + @EventListener + @Transactional + public void onApplicationEvent(MediaServerOnlineEvent event) { + // 查看是否有未处理的RTP流 + + } + + /** + * 流媒体节点离线 + */ + @Async("taskExecutor") + @EventListener + @Transactional + public void onApplicationEvent(MediaServerOfflineEvent event) { } diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaServerStatusManager.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaServerStatusManager.java index 0d7ee045..9d0ddca5 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaServerStatusManager.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaServerStatusManager.java @@ -4,7 +4,7 @@ import com.alibaba.fastjson2.JSON; import com.alibaba.fastjson2.JSONArray; import com.alibaba.fastjson2.JSONObject; import com.genersoft.iot.vmp.conf.DynamicTask; -import com.genersoft.iot.vmp.conf.UserSetting; +import com.genersoft.iot.vmp.gb28181.event.EventPublisher; import com.genersoft.iot.vmp.media.bean.MediaServer; import com.genersoft.iot.vmp.media.event.mediaServer.MediaServerChangeEvent; import com.genersoft.iot.vmp.media.event.mediaServer.MediaServerDeleteEvent; @@ -57,7 +57,7 @@ public class ZLMMediaServerStatusManager { private String serverServletContextPath; @Autowired - private UserSetting userSetting; + private EventPublisher eventPublisher; private final String type = "zlm"; @@ -187,6 +187,8 @@ public class ZLMMediaServerStatusManager { mediaServerItem.setStatus(true); mediaServerItem.setHookAliveInterval(10F); mediaServerService.update(mediaServerItem); + // 发送上线通知 + eventPublisher.mediaServerOnlineEventPublish(mediaServerItem.getId()); if(mediaServerItem.isAutoConfig()) { if (config == null) { JSONObject responseJSON = zlmresTfulUtils.getMediaServerConfig(mediaServerItem); @@ -210,7 +212,8 @@ public class ZLMMediaServerStatusManager { mediaServerItem.setStatus(false); offlineZlmPrimaryMap.put(mediaServerItem.getId(), mediaServerItem); offlineZlmTimeMap.put(mediaServerItem.getId(), System.currentTimeMillis()); - // TODO 发送离线通知 + // 发送离线通知 + eventPublisher.mediaServerOfflineEventPublish(mediaServerItem.getId()); mediaServerService.update(mediaServerItem); }, (int)(mediaServerItem.getHookAliveInterval() * 2 * 1000)); } diff --git a/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java b/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java index d51e1174..3418525d 100755 --- a/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java @@ -137,7 +137,7 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage { public void addStream(MediaServer mediaServerItem, String type, String app, String streamId, MediaInfo mediaInfo) { // 查找是否使用了callID StreamAuthorityInfo streamAuthorityInfo = getStreamAuthorityInfo(app, streamId); - String key = VideoManagerConstants.WVP_SERVER_STREAM_PREFIX + userSetting.getServerId() + "_" + type + "_" + app + "_" + streamId + "_" + mediaServerItem.getId(); + String key = VideoManagerConstants.WVP_SERVER_STREAM_PREFIX + userSetting.getServerId() + "_" + type.toUpperCase() + "_" + app + "_" + streamId + "_" + mediaServerItem.getId(); if (streamAuthorityInfo != null) { mediaInfo.setCallId(streamAuthorityInfo.getCallId()); } @@ -146,13 +146,13 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage { @Override public void removeStream(String mediaServerId, String type, String app, String streamId) { - String key = VideoManagerConstants.WVP_SERVER_STREAM_PREFIX + userSetting.getServerId() + "_" + type + "_" + app + "_" + streamId + "_" + mediaServerId; + String key = VideoManagerConstants.WVP_SERVER_STREAM_PREFIX + userSetting.getServerId() + "_" + type.toUpperCase() + "_" + app + "_" + streamId + "_" + mediaServerId; redisTemplate.delete(key); } @Override public void removeStream(String mediaServerId, String type) { - String key = VideoManagerConstants.WVP_SERVER_STREAM_PREFIX + userSetting.getServerId() + "_" + type + "_*_*_" + mediaServerId; + String key = VideoManagerConstants.WVP_SERVER_STREAM_PREFIX + userSetting.getServerId() + "_" + type.toUpperCase() + "_*_*_" + mediaServerId; List streams = RedisUtil.scan(redisTemplate, key); for (Object stream : streams) { redisTemplate.delete(stream); @@ -162,7 +162,7 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage { @Override public List getStreams(String mediaServerId, String type) { List result = new ArrayList<>(); - String key = VideoManagerConstants.WVP_SERVER_STREAM_PREFIX + userSetting.getServerId() + "_" + type + "_*_*_" + mediaServerId; + String key = VideoManagerConstants.WVP_SERVER_STREAM_PREFIX + userSetting.getServerId() + "_" + type.toUpperCase() + "_*_*_" + mediaServerId; List streams = RedisUtil.scan(redisTemplate, key); for (Object stream : streams) { MediaInfo mediaInfo = (MediaInfo)redisTemplate.opsForValue().get(stream); diff --git a/src/main/java/com/genersoft/iot/vmp/streamProxy/service/impl/StreamProxyServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/streamProxy/service/impl/StreamProxyServiceImpl.java index c5909c0b..45e08a76 100755 --- a/src/main/java/com/genersoft/iot/vmp/streamProxy/service/impl/StreamProxyServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/streamProxy/service/impl/StreamProxyServiceImpl.java @@ -290,7 +290,7 @@ public class StreamProxyServiceImpl implements IStreamProxyService { return; } // 这里主要是控制数据库/redis缓存/以及zlm中存在的代理流 三者状态一致。以数据库中数据为根本 - redisCatchStorage.removeStream(mediaServerId, "pull"); + redisCatchStorage.removeStream(mediaServerId, "PULL"); List streamProxies = streamProxyMapper.selectForEnableInMediaServer(mediaServerId, true); if (streamProxies.isEmpty()) { @@ -363,7 +363,7 @@ public class StreamProxyServiceImpl implements IStreamProxyService { List streamProxies = streamProxyMapper.selectForEnableInMediaServer(mediaServerId, true); // 清理redis相关的缓存 - redisCatchStorage.removeStream(mediaServerId, "pull"); + redisCatchStorage.removeStream(mediaServerId, "PULL"); if (streamProxies.isEmpty()) { return;