From 76d3a5d8e492da47c2439680eaead77530739340 Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: Wed, 25 Sep 2024 15:42:04 +0800 Subject: [PATCH] =?UTF-8?q?=E4=B8=B4=E6=97=B6=E6=8F=90=E4=BA=A4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../impl/GbChannelPlayServiceImpl.java | 1 + .../request/impl/AckRequestProcessor.java | 16 ++++++++---- .../request/impl/InviteRequestProcessor.java | 2 +- .../cmd/DeviceStatusQueryMessageHandler.java | 2 -- .../service/redisMsg/IRedisRpcService.java | 8 +++--- .../redisMsg/control/RedisRpcController.java | 26 +++++++------------ .../redisMsg/service/RedisRpcServiceImpl.java | 24 ++++++++--------- web_src/src/components/StreamPushList.vue | 2 ++ 8 files changed, 41 insertions(+), 40 deletions(-) diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/GbChannelPlayServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/GbChannelPlayServiceImpl.java index 2d25f2d0..01f6ff8c 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/GbChannelPlayServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/GbChannelPlayServiceImpl.java @@ -137,6 +137,7 @@ public class GbChannelPlayServiceImpl implements IGbChannelPlayService { }catch (PlayException e) { callback.run(e.getCode(), e.getMsg(), null); }catch (Exception e) { + log.error("[点播推流通道失败] 通道: {}({})", channel.getGbName(), channel.getGbDeviceId(), e); callback.run(Response.BUSY_HERE, "busy here", null); } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java index 5425b510..220d9ee5 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java @@ -115,17 +115,23 @@ public class AckRequestProcessor extends SIPRequestProcessorParent implements In if (parentPlatform != null) { DeviceChannel deviceChannel = deviceChannelService.getOneForSourceById(sendRtpItem.getChannelId()); if (!userSetting.getServerId().equals(sendRtpItem.getServerId())) { - WVPResult wvpResult = redisRpcService.startSendRtp(sendRtpItem.getChannelId(), sendRtpItem); + WVPResult wvpResult = redisRpcService.startSendRtp(callIdHeader.getCallId(), sendRtpItem); if (wvpResult.getCode() == 0) { redisCatchStorage.sendPlatformStartPlayMsg(sendRtpItem, deviceChannel, parentPlatform); } } else { try { - if (sendRtpItem.isTcpActive()) { - mediaServerService.startSendRtpPassive(mediaInfo,sendRtpItem, null); - } else { - mediaServerService.startSendRtp(mediaInfo, sendRtpItem); + if (mediaInfo != null) { + if (sendRtpItem.isTcpActive()) { + mediaServerService.startSendRtpPassive(mediaInfo,sendRtpItem, null); + } else { + mediaServerService.startSendRtp(mediaInfo, sendRtpItem); + } + }else { + // mediaInfo 在集群的其他wvp里 + } + redisCatchStorage.sendPlatformStartPlayMsg(sendRtpItem, deviceChannel, parentPlatform); }catch (ControllerException e) { log.error("RTP推流失败: {}", e.getMessage()); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java index 7a766da1..bf1e17eb 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java @@ -189,7 +189,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements sendRtpItem.setStatus(1); sendRtpItem.setCallId(inviteInfo.getCallId()); sendRtpItem.setPlayType("Play".equalsIgnoreCase(inviteInfo.getSessionName()) ? InviteStreamType.PLAY : InviteStreamType.PLAYBACK); - + sendRtpItem.setServerId(streamInfo.getServerId()); sendRtpServerService.update(sendRtpItem); String sdpIp = streamInfo.getMediaServer().getSdpIp(); if (!ObjectUtils.isEmpty(platform.getSendStreamIp())) { diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/query/cmd/DeviceStatusQueryMessageHandler.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/query/cmd/DeviceStatusQueryMessageHandler.java index f4027f30..2b3b8fcc 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/query/cmd/DeviceStatusQueryMessageHandler.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/query/cmd/DeviceStatusQueryMessageHandler.java @@ -2,14 +2,12 @@ package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.query. import com.genersoft.iot.vmp.gb28181.bean.CommonGBChannel; import com.genersoft.iot.vmp.gb28181.bean.Device; -import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel; import com.genersoft.iot.vmp.gb28181.bean.Platform; import com.genersoft.iot.vmp.gb28181.service.IGbChannelService; import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform; import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent; import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.IMessageHandler; import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.query.QueryMessageHandler; -import com.genersoft.iot.vmp.storager.IVideoManagerStorage; import gov.nist.javax.sip.message.SIPRequest; import lombok.extern.slf4j.Slf4j; import org.dom4j.Element; diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/IRedisRpcService.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/IRedisRpcService.java index 31e16e5a..7e23e276 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/IRedisRpcService.java +++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/IRedisRpcService.java @@ -7,17 +7,17 @@ import com.genersoft.iot.vmp.vmanager.bean.WVPResult; public interface IRedisRpcService { - SendRtpInfo getSendRtpItem(Integer sendRtpChannelId); + SendRtpInfo getSendRtpItem(String callId); - WVPResult startSendRtp(Integer sendRtpChannelId, SendRtpInfo sendRtpItem); + WVPResult startSendRtp(String callId, SendRtpInfo sendRtpItem); - WVPResult stopSendRtp(Integer sendRtpChannelId); + WVPResult stopSendRtp(String callId); long waitePushStreamOnline(SendRtpInfo sendRtpItem, CommonCallback callback); void stopWaitePushStreamOnline(SendRtpInfo sendRtpItem); - void rtpSendStopped(Integer sendRtpChannelId); + void rtpSendStopped(String callId); void removeCallback(long key); diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcController.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcController.java index e14c19e5..3e6b824f 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcController.java +++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcController.java @@ -9,7 +9,6 @@ import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcMessage; import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcRequest; import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcResponse; import com.genersoft.iot.vmp.gb28181.bean.SendRtpInfo; -import com.genersoft.iot.vmp.gb28181.service.IPlatformService; import com.genersoft.iot.vmp.gb28181.session.SSRCFactory; import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform; import com.genersoft.iot.vmp.media.bean.MediaInfo; @@ -20,7 +19,6 @@ import com.genersoft.iot.vmp.media.event.hook.HookType; import com.genersoft.iot.vmp.media.service.IMediaServerService; import com.genersoft.iot.vmp.media.zlm.SendRtpPortManager; import com.genersoft.iot.vmp.service.ISendRtpServerService; -import com.genersoft.iot.vmp.service.impl.SendRtpServerServiceImpl; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.vmanager.bean.ErrorCode; import com.genersoft.iot.vmp.vmanager.bean.WVPResult; @@ -61,9 +59,6 @@ public class RedisRpcController { @Autowired private ISIPCommanderForPlatform commanderFroPlatform; - - @Autowired - private IPlatformService platformService; @Autowired private ISendRtpServerService sendRtpServerService; @@ -72,10 +67,10 @@ public class RedisRpcController { * 获取发流的信息 */ public RedisRpcResponse getSendRtpItem(RedisRpcRequest request) { - String sendRtpItemKey = request.getParam().toString(); - SendRtpInfo sendRtpItem = (SendRtpInfo) redisTemplate.opsForValue().get(sendRtpItemKey); + String callId = request.getParam().toString(); + SendRtpInfo sendRtpItem = sendRtpServerService.queryByCallId(callId); if (sendRtpItem == null) { - log.info("[redis-rpc] 获取发流的信息, 未找到redis中的发流信息, key:{}", sendRtpItemKey); + log.info("[redis-rpc] 获取发流的信息, 未找到redis中的发流信息, callId:{}", callId); RedisRpcResponse response = request.getResponse(); response.setStatusCode(200); return response; @@ -104,10 +99,9 @@ public class RedisRpcController { sendRtpItem.setSsrc(ssrc); } sendRtpServerService.update(sendRtpItem); - redisTemplate.opsForValue().set(sendRtpItemKey, sendRtpItem); RedisRpcResponse response = request.getResponse(); response.setStatusCode(200); - response.setBody(sendRtpItemKey); + response.setBody(callId); return response; } @@ -228,12 +222,12 @@ public class RedisRpcController { * 开始发流 */ public RedisRpcResponse startSendRtp(RedisRpcRequest request) { - String sendRtpItemKey = request.getParam().toString(); - SendRtpInfo sendRtpItem = (SendRtpInfo) redisTemplate.opsForValue().get(sendRtpItemKey); + String callId = request.getParam().toString(); + SendRtpInfo sendRtpItem = sendRtpServerService.queryByCallId(callId); RedisRpcResponse response = request.getResponse(); response.setStatusCode(200); if (sendRtpItem == null) { - log.info("[redis-rpc] 开始发流, 未找到redis中的发流信息, key:{}", sendRtpItemKey); + log.info("[redis-rpc] 开始发流, 未找到redis中的发流信息, callId:{}", callId); WVPResult wvpResult = WVPResult.fail(ErrorCode.ERROR100.getCode(), "未找到redis中的发流信息"); response.setBody(wvpResult); return response; @@ -271,12 +265,12 @@ public class RedisRpcController { * 停止发流 */ public RedisRpcResponse stopSendRtp(RedisRpcRequest request) { - String sendRtpItemKey = request.getParam().toString(); - SendRtpInfo sendRtpItem = (SendRtpInfo) redisTemplate.opsForValue().get(sendRtpItemKey); + String callId = request.getParam().toString(); + SendRtpInfo sendRtpItem = sendRtpServerService.queryByCallId(callId); RedisRpcResponse response = request.getResponse(); response.setStatusCode(200); if (sendRtpItem == null) { - log.info("[redis-rpc] 停止推流, 未找到redis中的发流信息, key:{}", sendRtpItemKey); + log.info("[redis-rpc] 停止推流, 未找到redis中的发流信息, key:{}", callId); WVPResult wvpResult = WVPResult.fail(ErrorCode.ERROR100.getCode(), "未找到redis中的发流信息"); response.setBody(wvpResult); return response; diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/service/RedisRpcServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/service/RedisRpcServiceImpl.java index 6131eadb..86182135 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/service/RedisRpcServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/service/RedisRpcServiceImpl.java @@ -57,8 +57,8 @@ public class RedisRpcServiceImpl implements IRedisRpcService { } @Override - public SendRtpInfo getSendRtpItem(Integer sendRtpItemKey) { - RedisRpcRequest request = buildRequest("getSendRtpItem", sendRtpItemKey); + public SendRtpInfo getSendRtpItem(String callId) { + RedisRpcRequest request = buildRequest("getSendRtpItem", callId); RedisRpcResponse response = redisRpcConfig.request(request, 10); if (response.getBody() == null) { return null; @@ -67,23 +67,23 @@ public class RedisRpcServiceImpl implements IRedisRpcService { } @Override - public WVPResult startSendRtp(Integer sendRtpItemKey, SendRtpInfo sendRtpItem) { + public WVPResult startSendRtp(String callId, SendRtpInfo sendRtpItem) { log.info("[请求其他WVP] 开始推流,wvp:{}, {}/{}", sendRtpItem.getServerId(), sendRtpItem.getApp(), sendRtpItem.getStream()); - RedisRpcRequest request = buildRequest("startSendRtp", sendRtpItemKey); + RedisRpcRequest request = buildRequest("startSendRtp", callId); request.setToId(sendRtpItem.getServerId()); RedisRpcResponse response = redisRpcConfig.request(request, 10); return JSON.parseObject(response.getBody().toString(), WVPResult.class); } @Override - public WVPResult stopSendRtp(Integer sendRtpItemKey) { - SendRtpInfo sendRtpItem = (SendRtpInfo)redisTemplate.opsForValue().get(sendRtpItemKey); + public WVPResult stopSendRtp(String callId) { + SendRtpInfo sendRtpItem = (SendRtpInfo)redisTemplate.opsForValue().get(callId); if (sendRtpItem == null) { - log.info("[请求其他WVP] 停止推流, 未找到redis中的发流信息, key:{}", sendRtpItemKey); + log.info("[请求其他WVP] 停止推流, 未找到redis中的发流信息, key:{}", callId); return WVPResult.fail(ErrorCode.ERROR100.getCode(), "未找到发流信息"); } log.info("[请求其他WVP] 停止推流,wvp:{}, {}/{}", sendRtpItem.getServerId(), sendRtpItem.getApp(), sendRtpItem.getStream()); - RedisRpcRequest request = buildRequest("stopSendRtp", sendRtpItemKey); + RedisRpcRequest request = buildRequest("stopSendRtp", callId); request.setToId(sendRtpItem.getServerId()); RedisRpcResponse response = redisRpcConfig.request(request, 10); return JSON.parseObject(response.getBody().toString(), WVPResult.class); @@ -141,13 +141,13 @@ public class RedisRpcServiceImpl implements IRedisRpcService { } @Override - public void rtpSendStopped(Integer sendRtpItemKey) { - SendRtpInfo sendRtpItem = (SendRtpInfo)redisTemplate.opsForValue().get(sendRtpItemKey); + public void rtpSendStopped(String callId) { + SendRtpInfo sendRtpItem = (SendRtpInfo)redisTemplate.opsForValue().get(callId); if (sendRtpItem == null) { - log.info("[停止WVP监听流上线] 未找到redis中的发流信息, key:{}", sendRtpItemKey); + log.info("[停止WVP监听流上线] 未找到redis中的发流信息, key:{}", callId); return; } - RedisRpcRequest request = buildRequest("rtpSendStopped", sendRtpItemKey); + RedisRpcRequest request = buildRequest("rtpSendStopped", callId); request.setToId(sendRtpItem.getServerId()); redisRpcConfig.request(request, 10); } diff --git a/web_src/src/components/StreamPushList.vue b/web_src/src/components/StreamPushList.vue index 8e89c6bf..24b466e1 100755 --- a/web_src/src/components/StreamPushList.vue +++ b/web_src/src/components/StreamPushList.vue @@ -58,6 +58,8 @@ + +