From 7cb4c5c1df4c287730c8cf67ce18ea64f7da5702 Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: Mon, 22 Jan 2024 00:02:50 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96=E6=8B=89=E6=B5=81=E4=BB=A3?= =?UTF-8?q?=E7=90=86=E7=9A=84=E6=92=AD=E6=94=BE?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../vmp/media/zlm/ZLMHttpHookListener.java | 60 ++++++++++--------- .../vmp/media/zlm/ZLMMediaListManager.java | 16 ----- .../iot/vmp/service/IMediaService.java | 2 + .../iot/vmp/service/IStreamProxyService.java | 2 +- .../iot/vmp/service/IStreamPushService.java | 2 +- .../vmp/service/impl/MediaServiceImpl.java | 24 +++++++- .../impl/StreamProxyPlayServiceImpl.java | 14 +++-- .../service/impl/StreamProxyServiceImpl.java | 22 +------ .../service/impl/StreamPushServiceImpl.java | 22 ++++--- .../vmp/storager/dao/StreamProxyMapper.java | 4 +- .../storager/impl/RedisCatchStorageImpl.java | 6 +- 11 files changed, 90 insertions(+), 84 deletions(-) diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java index 74edf5fa..ea6a1a8d 100755 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java @@ -330,9 +330,11 @@ public class ZLMHttpHookListener { public HookResult onStreamChanged(@RequestBody OnStreamChangedHookParam param) { if (param.isRegist()) { - logger.info("[ZLM HOOK] 流注册, {}->{}->{}/{}", param.getMediaServerId(), param.getSchema(), param.getApp(), param.getStream()); + logger.info("[ZLM HOOK] 流注册, {}->{}->{}->{}/{}", param.getMediaServerId(), param.getSchema(), + OriginType.values()[param.getOriginType()].getType(), param.getApp(), param.getStream()); } else { - logger.info("[ZLM HOOK] 流注销, {}->{}->{}/{}", param.getMediaServerId(), param.getSchema(), param.getApp(), param.getStream()); + logger.info("[ZLM HOOK] 流注销, {}->{}->{}->{}/{}", param.getMediaServerId(), param.getSchema(), + OriginType.values()[param.getOriginType()].getType(), param.getApp(), param.getStream()); } @@ -373,18 +375,25 @@ public class ZLMHttpHookListener { } else { mediaServerService.removeCount(param.getMediaServerId()); } - // 设置拉流代理上线/离线 - streamProxyService.updateStatus(param.isRegist(), param.getApp(), param.getStream()); - if ("rtp".equals(param.getApp()) && !param.isRegist()) { - InviteInfo inviteInfo = inviteStreamService.getInviteInfoByStream(null, param.getStream()); - if (inviteInfo != null && (inviteInfo.getType() == InviteSessionType.PLAY || inviteInfo.getType() == InviteSessionType.PLAYBACK)) { - inviteStreamService.removeInviteInfo(inviteInfo); - storager.stopPlay(inviteInfo.getDeviceId(), inviteInfo.getChannelId()); + if ("rtp".equals(param.getApp())) { + if (!param.isRegist()) { + InviteInfo inviteInfo = inviteStreamService.getInviteInfoByStream(null, param.getStream()); + if (inviteInfo != null && (inviteInfo.getType() == InviteSessionType.PLAY || inviteInfo.getType() == InviteSessionType.PLAYBACK)) { + inviteStreamService.removeInviteInfo(inviteInfo); + storager.stopPlay(inviteInfo.getDeviceId(), inviteInfo.getChannelId()); + } } + } else { - if (!"rtp".equals(param.getApp())) { - String type = OriginType.values()[param.getOriginType()].getType(); + String type; + if (param.getOriginType() == 0) { + // 源类型为unknown,则主动查询类型 + type = mediaService.getStreamType(param.getApp(), param.getStream()); + }else { + type = OriginType.values()[param.getOriginType()].getType(); + } + if (type != null) { if (param.isRegist()) { StreamAuthorityInfo streamAuthorityInfo = redisCatchStorage.getStreamAuthorityInfo( param.getApp(), param.getStream()); @@ -415,21 +424,20 @@ public class ZLMHttpHookListener { if ("PUSH".equalsIgnoreCase(type)) { // 冗余数据,自己系统中自用 redisCatchStorage.removePushListItem(param.getApp(), param.getStream(), param.getMediaServerId()); - zlmMediaListManager.removePush(param); + streamPushService.offline(param.getApp(), param.getStream()); } } - zlmMediaListManager.streamOffline(param.getApp(), param.getStream()); - } - if (type != null) { - // 发送流变化redis消息 - JSONObject jsonObject = new JSONObject(); - jsonObject.put("serverId", userSetting.getServerId()); - jsonObject.put("app", param.getApp()); - jsonObject.put("stream", param.getStream()); - jsonObject.put("register", param.isRegist()); - jsonObject.put("mediaServerId", param.getMediaServerId()); - redisCatchStorage.sendStreamChangeMsg(type, jsonObject); } + // 设置拉流代理拉流状态 + streamProxyService.updatePullingStatus(param.isRegist(), param.getApp(), param.getStream()); + // 发送流变化redis消息 + JSONObject jsonObject = new JSONObject(); + jsonObject.put("serverId", userSetting.getServerId()); + jsonObject.put("app", param.getApp()); + jsonObject.put("stream", param.getStream()); + jsonObject.put("register", param.isRegist()); + jsonObject.put("mediaServerId", param.getMediaServerId()); + redisCatchStorage.sendStreamChangeMsg(type, jsonObject); } } if (!param.isRegist()) { @@ -556,12 +564,6 @@ public class ZLMHttpHookListener { } return ret; } - // TODO 推流具有主动性,暂时不做处理 -// StreamPushItem streamPushItem = streamPushService.getPush(app, streamId); -// if (streamPushItem != null) { -// // TODO 发送停止 -// -// } } return ret; } diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaListManager.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaListManager.java index ba846e50..a166e34d 100755 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaListManager.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaListManager.java @@ -77,22 +77,6 @@ public class ZLMMediaListManager { return transform; } - public void removePush(OnStreamChangedHookParam param) { - StreamPush pushInDb = streamPushService.getPush(param.getApp(), param.getStream()); - if (pushInDb == null) { - return; - } - if (ObjectUtils.isEmpty(pushInDb.getGbId())) { - streamPushService.remove(pushInDb.getId()); - }else { - List onlinePushers = new ArrayList<>(); - onlinePushers.add(pushInDb.getCommonGbChannelId()); - commonGbChannelService.offlineForList(onlinePushers); - streamPushService.offline(pushInDb.getId()); - } - - } - public void sendStreamEvent(String app, String stream, String mediaServerId) { MediaServerItem mediaServerItem = mediaServerService.getOne(mediaServerId); // 查看推流状态 diff --git a/src/main/java/com/genersoft/iot/vmp/service/IMediaService.java b/src/main/java/com/genersoft/iot/vmp/service/IMediaService.java index a4a14593..eb76d30a 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/IMediaService.java +++ b/src/main/java/com/genersoft/iot/vmp/service/IMediaService.java @@ -51,4 +51,6 @@ public interface IMediaService { * 关闭zlm的流 */ boolean closeStream(MediaServerItem mediaInfo, String app, String stream); + + String getStreamType(String app, String stream); } diff --git a/src/main/java/com/genersoft/iot/vmp/service/IStreamProxyService.java b/src/main/java/com/genersoft/iot/vmp/service/IStreamProxyService.java index b59d4465..69ac7da1 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/IStreamProxyService.java +++ b/src/main/java/com/genersoft/iot/vmp/service/IStreamProxyService.java @@ -32,7 +32,7 @@ public interface IStreamProxyService { /** * 更新状态 */ - void updateStatus(boolean status, String app, String stream); + void updatePullingStatus(boolean status, String app, String stream); diff --git a/src/main/java/com/genersoft/iot/vmp/service/IStreamPushService.java b/src/main/java/com/genersoft/iot/vmp/service/IStreamPushService.java index ad8827c5..a296070e 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/IStreamPushService.java +++ b/src/main/java/com/genersoft/iot/vmp/service/IStreamPushService.java @@ -116,5 +116,5 @@ public interface IStreamPushService { /** * 设置推流离线 */ - void offline(Integer id); + void offline(String app, String stream); } diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServiceImpl.java index 46400215..4f2f81ae 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServiceImpl.java @@ -9,10 +9,14 @@ import com.genersoft.iot.vmp.conf.MediaConfig; import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import com.genersoft.iot.vmp.media.zlm.dto.StreamAuthorityInfo; +import com.genersoft.iot.vmp.media.zlm.dto.StreamProxy; +import com.genersoft.iot.vmp.media.zlm.dto.StreamPush; import com.genersoft.iot.vmp.service.IMediaServerService; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.IVideoManagerStorage; import com.genersoft.iot.vmp.service.IMediaService; +import com.genersoft.iot.vmp.storager.dao.StreamProxyMapper; +import com.genersoft.iot.vmp.storager.dao.StreamPushMapper; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import org.springframework.util.ObjectUtils; @@ -26,7 +30,10 @@ public class MediaServiceImpl implements IMediaService { private IRedisCatchStorage redisCatchStorage; @Autowired - private IVideoManagerStorage storager; + private StreamProxyMapper streamProxyMapper; + + @Autowired + private StreamPushMapper streamPushMapper; @Autowired private IMediaServerService mediaServerService; @@ -121,4 +128,19 @@ public class MediaServiceImpl implements IMediaService { JSONObject jsonObject = zlmresTfulUtils.closeStreams(mediaInfo, app, stream); return jsonObject != null && jsonObject.getInteger("code") == 0 && jsonObject.getInteger("count_hit") > 0; } + + @Override + public String getStreamType(String app, String stream) { + String result = null; + StreamProxy streamProxy = streamProxyMapper.selectOne(app, stream); + if (streamProxy != null) { + result = "PULL"; + }else { + StreamPush streamPush = streamPushMapper.selectOneByAppAndStream(app, stream); + if (streamPush != null) { + result = "PUSH"; + } + } + return result; + } } diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyPlayServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyPlayServiceImpl.java index da842b30..5fae26fd 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyPlayServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyPlayServiceImpl.java @@ -87,6 +87,9 @@ public class StreamProxyPlayServiceImpl implements IStreamProxyPlayService { } return; }else { + if (streamProxy.getStreamKey() != null) { + zlmresTfulUtils.delStreamProxy(mediaInfo, streamProxy.getStreamKey()); + } redisCatchStorage.removeStream(streamChangedHookParam.getMediaServerId(), "PULL", streamChangedHookParam.getApp(), streamChangedHookParam.getStream()); } @@ -94,10 +97,12 @@ public class StreamProxyPlayServiceImpl implements IStreamProxyPlayService { redisCatchStorage.removeStream(streamChangedHookParam.getMediaServerId(), "PULL", streamChangedHookParam.getApp(), streamChangedHookParam.getStream()); } - } - - if (streamProxy.getStreamKey() != null) { - zlmresTfulUtils.delStreamProxy(mediaInfo, streamProxy.getStreamKey()); + } else { + // 查询zlm是否含有这个流,存在则停止 + boolean ready = mediaService.isReady(mediaInfo, streamProxy.getApp(), streamProxy.getStream()); + if (ready) { + mediaService.closeStream(mediaInfo, streamProxy.getApp(), streamProxy.getStream()); + } } String delayTalkKey = UUID.randomUUID().toString(); @@ -140,6 +145,7 @@ public class StreamProxyPlayServiceImpl implements IStreamProxyPlayService { return; } if (result.getInteger("code") != 0) { + logger.info("[开始拉流代理] 失败: {}", result.getString("msg") ); hookSubscribe.removeSubscribe(hookSubscribeForStreamChange); dynamicTask.stop(delayTalkKey); if (callback != null) { diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java index 8317f8b3..53d94e59 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java @@ -518,20 +518,6 @@ public class StreamProxyServiceImpl implements IStreamProxyService { syncPullStream(mediaServerId); } - @Transactional - public void updateStatusById(StreamProxy streamProxy, boolean status) { - streamProxyMapper.updatePullingById(streamProxy.getId(), status); - if (streamProxy.getCommonGbChannelId() > 0) { - List ids = new ArrayList<>(); - ids.add(streamProxy.getCommonGbChannelId()); - if (status) { - commonGbChannelService.onlineForList(ids); - }else { - commonGbChannelService.offlineForList(ids); - } - } - } - @Override public void zlmServerOffline(String mediaServerId) { // 移除开启了无人观看自动移除的流 @@ -571,12 +557,8 @@ public class StreamProxyServiceImpl implements IStreamProxyService { @Override @Transactional - public void updateStatus(boolean status, String app, String stream) { - StreamProxy streamProxy = streamProxyMapper.selectOne(app, stream); - if (streamProxy == null) { - return; - } - updateStatusById(streamProxy, status); + public void updatePullingStatus(boolean status, String app, String stream) { + streamProxyMapper.updatePullingById(app, stream, status); } private void syncPullStream(String mediaServerId){ diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java index 1207ab42..28b41a00 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java @@ -529,21 +529,25 @@ public class StreamPushServiceImpl implements IStreamPushService { } @Override - public void offline(Integer id) { - if (id == null) { + public void offline(String app, String stream) { + if (app == null || stream == null) { return; } - StreamPush streamPush = streamPushMapper.selectOne(id); + StreamPush streamPush = streamPushMapper.selectOneByAppAndStream(app, stream); if (streamPush == null) { return; } - if (userSetting.isUsePushingAsStatus()) { - if (streamPush.getCommonGbChannelId() > 0) { - List pushers = new ArrayList<>(); - pushers.add(streamPush.getCommonGbChannelId()); - commonGbChannelService.offlineForList(pushers); + if (streamPush.getGbId() == null) { + streamPushMapper.del(streamPush.getId()); + }else { + if (userSetting.isUsePushingAsStatus()) { + if (streamPush.getCommonGbChannelId() > 0) { + List pushers = new ArrayList<>(); + pushers.add(streamPush.getCommonGbChannelId()); + commonGbChannelService.offlineForList(pushers); + } + streamPushMapper.offlineById(streamPush.getId()); } - streamPushMapper.offlineById(streamPush.getId()); } } } diff --git a/src/main/java/com/genersoft/iot/vmp/storager/dao/StreamProxyMapper.java b/src/main/java/com/genersoft/iot/vmp/storager/dao/StreamProxyMapper.java index 561977da..ac8b1122 100755 --- a/src/main/java/com/genersoft/iot/vmp/storager/dao/StreamProxyMapper.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/dao/StreamProxyMapper.java @@ -68,8 +68,8 @@ public interface StreamProxyMapper { void updatePullingStatusByMediaServerId(@Param("mediaServerId") String mediaServerId, @Param("pulling") boolean pulling); @Update("UPDATE wvp_stream_proxy " + "SET pulling=#{pulling} " + - "WHERE id=#{id}") - int updatePullingById(@Param("id") int id, @Param("pulling") boolean pulling); + "WHERE app=#{app} and stream=#{stream}") + int updatePullingById(@Param("app") String app, @Param("stream") String stream, @Param("pulling") boolean pulling); @Delete("DELETE FROM wvp_stream_proxy WHERE enable_remove_none_reader=true AND media_server_id=#{mediaServerId}") void deleteAutoRemoveItemByMediaServerId(String mediaServerId); diff --git a/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java b/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java index aceeb88e..d44bf834 100755 --- a/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java @@ -479,11 +479,14 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage { @Override public OnStreamChangedHookParam getProxyStreamInfo(String app, String streamId, String mediaServerId) { + if (mediaServerId == null) { + mediaServerId = "*"; + } String scanKey = VideoManagerConstants.WVP_SERVER_STREAM_PREFIX + userSetting.getServerId() + "_PULL_" + app + "_" + streamId + "_" + mediaServerId; OnStreamChangedHookParam result = null; List keys = RedisUtil.scan(redisTemplate, scanKey); - if (keys.size() > 0) { + if (!keys.isEmpty()) { String key = (String) keys.get(0); result = JsonUtil.redisJsonToObject(redisTemplate, key, OnStreamChangedHookParam.class); } @@ -679,4 +682,5 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage { } } + }