From 885842249fb6b264b0abf78668872d04bdc179ce Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: Fri, 7 Jul 2023 18:17:24 +0800 Subject: [PATCH 1/4] =?UTF-8?q?=E4=BC=98=E5=8C=96=E7=AC=AC=E4=B8=89?= =?UTF-8?q?=E6=96=B9=E5=AF=B9=E6=8E=A5=E6=8E=A5=E5=8F=A3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../iot/vmp/common/VideoManagerConstants.java | 1 + .../vmp/media/zlm/ZLMHttpHookListener.java | 26 ++++++- .../iot/vmp/vmanager/rtp/RtpController.java | 75 ++++++++++++++----- 3 files changed, 81 insertions(+), 21 deletions(-) diff --git a/src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java b/src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java index 6eec845d..ec25ea2e 100644 --- a/src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java +++ b/src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java @@ -154,6 +154,7 @@ public class VideoManagerConstants { public static final String WVP_STREAM_GB_ID_PREFIX = "memberNo_"; public static final String WVP_STREAM_GPS_MSG_PREFIX = "WVP_STREAM_GPS_MSG_"; public static final String WVP_OTHER_SEND_RTP_INFO = "VMP_OTHER_SEND_RTP_INFO_"; + public static final String WVP_OTHER_RECEIVE_RTP_INFO = "VMP_OTHER_RECEIVE_RTP_INFO_"; /** * Redis Const diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java index 33c87138..5df3be44 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java @@ -3,6 +3,7 @@ package com.genersoft.iot.vmp.media.zlm; import com.alibaba.fastjson2.JSON; import com.alibaba.fastjson2.JSONObject; import com.genersoft.iot.vmp.common.StreamInfo; +import com.genersoft.iot.vmp.common.VideoManagerConstants; import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.conf.exception.SsrcTransactionNotFoundException; import com.genersoft.iot.vmp.gb28181.bean.*; @@ -22,14 +23,13 @@ import com.genersoft.iot.vmp.service.*; import com.genersoft.iot.vmp.service.bean.MessageForPushChannel; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.IVideoManagerStorage; -import com.genersoft.iot.vmp.vmanager.bean.DeferredResultEx; -import com.genersoft.iot.vmp.vmanager.bean.ErrorCode; -import com.genersoft.iot.vmp.vmanager.bean.StreamContent; -import com.genersoft.iot.vmp.vmanager.bean.WVPResult; +import com.genersoft.iot.vmp.utils.redis.RedisUtil; +import com.genersoft.iot.vmp.vmanager.bean.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.data.redis.core.RedisTemplate; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.util.ObjectUtils; import org.springframework.web.bind.annotation.*; @@ -106,6 +106,9 @@ public class ZLMHttpHookListener { @Autowired private AssistRESTfulUtils assistRESTfulUtils; + @Autowired + private RedisTemplate redisTemplate; + @Qualifier("taskExecutor") @Autowired private ThreadPoolTaskExecutor taskExecutor; @@ -255,6 +258,21 @@ public class ZLMHttpHookListener { result.setEnable_mp4(true); } } + + String receiveKey = VideoManagerConstants.WVP_OTHER_RECEIVE_RTP_INFO + userSetting.getServerId() + "*"; + // 将信息写入redis中,以备后用 + List scan = RedisUtil.scan(redisTemplate, receiveKey); + if (scan.size()>0) { + for (Object o : scan) { + String key = (String) o; + OtherRtpSendInfo otherRtpSendInfo = (OtherRtpSendInfo)redisTemplate.opsForValue().get(key); + if (otherRtpSendInfo != null && otherRtpSendInfo.getStream().equalsIgnoreCase(param.getStream())) { + result.setEnable_audio(true); + result.setEnable_mp4(true); + } + } + } + if (mediaInfo.getRecordAssistPort() > 0 && userSetting.getRecordPath() == null) { logger.info("推流时发现尚未设置录像路径,从assist服务中读取"); JSONObject info = assistRESTfulUtils.getInfo(mediaInfo, null); diff --git a/src/main/java/com/genersoft/iot/vmp/vmanager/rtp/RtpController.java b/src/main/java/com/genersoft/iot/vmp/vmanager/rtp/RtpController.java index 311a007a..c06c4af1 100644 --- a/src/main/java/com/genersoft/iot/vmp/vmanager/rtp/RtpController.java +++ b/src/main/java/com/genersoft/iot/vmp/vmanager/rtp/RtpController.java @@ -11,6 +11,7 @@ import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory; import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe; import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory; import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForRtpServerTimeout; +import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import com.genersoft.iot.vmp.service.IDeviceChannelService; import com.genersoft.iot.vmp.service.IDeviceService; @@ -34,6 +35,7 @@ import org.springframework.web.bind.annotation.*; import java.io.IOException; import java.util.HashMap; import java.util.Map; +import java.util.UUID; @SuppressWarnings("rawtypes") @Tag(name = "第三方服务对接") @@ -120,12 +122,12 @@ public class RtpController { int localPort = zlmServerFactory.createRTPServer(mediaServerItem, stream, ssrcInt, null, false, tcpMode); // 注册回调如果rtp收流超时则通过回调发送通知 if (callBack != null) { - HookSubscribeForRtpServerTimeout hookSubscribeForRtpServerTimeout = HookSubscribeFactory.on_rtp_server_timeout(ssrc, null, mediaServerItem.getId()); + HookSubscribeForRtpServerTimeout hookSubscribeForRtpServerTimeout = HookSubscribeFactory.on_rtp_server_timeout(stream, String.valueOf(ssrcInt), mediaServerItem.getId()); // 订阅 zlm启动事件, 新的zlm也会从这里进入系统 hookSubscribe.addSubscribe(hookSubscribeForRtpServerTimeout, (mediaServerItemInUse, response)->{ if (stream.equals(response.getString("stream_id"))) { - logger.info("[开启收流和获取发流信息] 等待收流超时 callId->{}, 发送回调", callId); + logger.info("[第三方服务对接->开启收流和获取发流信息] 等待收流超时 callId->{}, 发送回调", callId); OkHttpClient.Builder httpClientBuilder = new OkHttpClient.Builder(); OkHttpClient client = httpClientBuilder.build(); String url = callBack + "?callId=" + callId; @@ -133,7 +135,7 @@ public class RtpController { try { client.newCall(request).execute(); } catch (IOException e) { - logger.error("[开启收流和获取发流信息] 等待收流超时 callId->{}, 发送回调失败", callId, e); + logger.error("[第三方服务对接->开启收流和获取发流信息] 等待收流超时 callId->{}, 发送回调失败", callId, e); } } }); @@ -143,6 +145,9 @@ public class RtpController { otherRtpSendInfo.setReceivePort(localPort); otherRtpSendInfo.setCallId(callId); otherRtpSendInfo.setStream(stream); + String receiveKey = VideoManagerConstants.WVP_OTHER_RECEIVE_RTP_INFO + userSetting.getServerId() + stream; + // 将信息写入redis中,以备后用 + redisTemplate.opsForValue().set(receiveKey, otherRtpSendInfo); if (isSend != null && isSend) { String key = VideoManagerConstants.WVP_OTHER_SEND_RTP_INFO + userSetting.getServerId() + callId; // 预创建发流信息 @@ -160,7 +165,7 @@ public class RtpController { }, 15000); otherRtpSendInfo.setIp(mediaServerItem.getSdpIp()); otherRtpSendInfo.setPort(port); - logger.info("[开启收流和获取发流信息] 结果,callId->{}, {}", callId, otherRtpSendInfo); + logger.info("[第三方服务对接->开启收流和获取发流信息] 结果,callId->{}, {}", callId, otherRtpSendInfo); } return otherRtpSendInfo; } @@ -173,6 +178,9 @@ public class RtpController { logger.info("[第三方服务对接->关闭收流] stream->{}", stream); MediaServerItem mediaServerItem = mediaServerService.getDefaultMediaServer(); zlmServerFactory.closeRtpServer(mediaServerItem,stream); + String receiveKey = VideoManagerConstants.WVP_OTHER_RECEIVE_RTP_INFO + userSetting.getServerId() + stream; + // 将信息写入redis中,以备后用 + redisTemplate.delete(receiveKey); } @GetMapping(value = "/send/start") @@ -187,9 +195,10 @@ public class RtpController { @Parameter(name = "onlyAudio", description = "是否只有音频", required = true) @Parameter(name = "isUdp", description = "是否为UDP", required = true) @Parameter(name = "streamType", description = "流类型,1为es流,2为ps流, 默认es流", required = false) - public void sendRTP(String ssrc, String ip, Integer port, String app, String stream, String callId, Boolean onlyAudio, Boolean isUdp, @RequestParam(required = false)Integer streamType) { - logger.info("[第三方服务对接->发送流] ssrc->{}, ip->{}, port->{}, app->{}, stream->{}, callId->{}, onlyAudio->{}, streamType->{}", - ssrc, ip, port, app, stream, callId, onlyAudio, streamType == 1? "ES":"PS"); + @Parameter(name = "pt", description = "rtp的pt", required = true) + public void sendRTP(String ssrc, String ip, Integer port, String app, String stream, String callId, Boolean onlyAudio, Boolean isUdp, @RequestParam(required = false)Integer streamType, Integer pt) { + logger.info("[第三方服务对接->发送流] ssrc->{}, ip->{}, port->{}, app->{}, stream->{}, callId->{}, onlyAudio->{}, streamType->{}, pt->{}", + ssrc, ip, port, app, stream, callId, onlyAudio, streamType == 1? "ES":"PS", pt); if (ObjectUtils.isEmpty(streamType)) { streamType = 1; } @@ -197,7 +206,7 @@ public class RtpController { String key = VideoManagerConstants.WVP_OTHER_SEND_RTP_INFO + userSetting.getServerId() + callId; OtherRtpSendInfo sendInfo = (OtherRtpSendInfo)redisTemplate.opsForValue().get(key); if (sendInfo != null) { - zlmServerFactory.releasePort(mediaServerItem, sendInfo.getCallId()); + zlmServerFactory.releasePort(mediaServerItem, callId); }else { sendInfo = new OtherRtpSendInfo(); } @@ -218,20 +227,52 @@ public class RtpController { param.put("src_port", sendInfo.getPort()); param.put("use_ps", streamType==2 ? "1" : "0"); param.put("only_audio", onlyAudio ? "1" : "0"); + param.put("pt", pt); - JSONObject jsonObject = zlmServerFactory.startSendRtpStream(mediaServerItem, param); - if (jsonObject.getInteger("code") == 0) { - logger.info("[第三方服务对接->发送流] 发流成功,callId->{}", callId); - redisTemplate.opsForValue().set(key, sendInfo); + dynamicTask.stop(key); + Boolean streamReady = zlmServerFactory.isStreamReady(mediaServerItem, app, stream); + if (streamReady) { + logger.info("[第三方服务对接->发送流] 流存在,开始发流,callId->{}", callId); + JSONObject jsonObject = zlmServerFactory.startSendRtpStream(mediaServerItem, param); + if (jsonObject.getInteger("code") == 0) { + logger.info("[第三方服务对接->发送流] 发流成功,callId->{}", callId); + redisTemplate.opsForValue().set(key, sendInfo); + }else { + redisTemplate.delete(key); + logger.info("[第三方服务对接->发送流] 发流失败,callId->{}, {}", callId, jsonObject.getString("msg")); + throw new ControllerException(ErrorCode.ERROR100.getCode(), "[发流失败] " + jsonObject.getString("msg")); + } }else { - redisTemplate.delete(key); - logger.info("[第三方服务对接->发送流] 发流失败,callId->{}, {}", callId, jsonObject.getString("msg")); - throw new ControllerException(ErrorCode.ERROR100.getCode(), "[发流失败] " + jsonObject.getString("msg")); + logger.info("[第三方服务对接->发送流] 流不存在,等待流上线,callId->{}", callId); + String uuid = UUID.randomUUID().toString(); + HookSubscribeForStreamChange hookSubscribeForStreamChange = HookSubscribeFactory.on_stream_changed(app, stream, true, "rtsp", mediaServerItem.getId()); + dynamicTask.startDelay(uuid, ()->{ + logger.info("[第三方服务对接->发送流] 等待流上线超时 callId->{}", callId); + redisTemplate.delete(key); + hookSubscribe.removeSubscribe(hookSubscribeForStreamChange); + }, 10000); + + // 订阅 zlm启动事件, 新的zlm也会从这里进入系统 + OtherRtpSendInfo finalSendInfo = sendInfo; + hookSubscribe.addSubscribe(hookSubscribeForStreamChange, + (mediaServerItemInUse, response)->{ + dynamicTask.stop(uuid); + logger.info("[第三方服务对接->发送流] 流上线,开始发流 callId->{}", callId); + JSONObject jsonObject = zlmServerFactory.startSendRtpStream(mediaServerItem, param); + System.out.println("========发流结果=========="); + System.out.println(jsonObject); + if (jsonObject.getInteger("code") == 0) { + logger.info("[第三方服务对接->发送流] 发流成功,callId->{}", callId); + redisTemplate.opsForValue().set(key, finalSendInfo); + }else { + redisTemplate.delete(key); + logger.info("[第三方服务对接->发送流] 发流失败,callId->{}, {}", callId, jsonObject.getString("msg")); + throw new ControllerException(ErrorCode.ERROR100.getCode(), "[发流失败] " + jsonObject.getString("msg")); + } + }); } } - - @GetMapping(value = "/send/stop") @ResponseBody @Operation(summary = "关闭发送流") From 04e7f48fde1b1a653d413eb41186ec7354f5ae31 Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: Mon, 10 Jul 2023 14:30:59 +0800 Subject: [PATCH 2/4] =?UTF-8?q?=E5=90=88=E5=B9=B6=E4=B8=BB=E7=BA=BF?= =?UTF-8?q?=E7=9A=84=E5=8F=91=E6=B5=81=E7=AB=AF=E5=8F=A3=E7=AE=A1=E7=90=86?= =?UTF-8?q?=E9=80=BB=E8=BE=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- sql/2.6.8补丁更新.sql | 2 + .../genersoft/iot/vmp/conf/MediaConfig.java | 11 ++ .../request/impl/AckRequestProcessor.java | 10 +- .../request/impl/InviteRequestProcessor.java | 16 +-- .../iot/vmp/media/zlm/SendRtpPortManager.java | 55 ++++++++ .../vmp/media/zlm/ZLMHttpHookListener.java | 30 ++-- .../vmp/media/zlm/ZLMRTPServerFactory.java | 130 ++---------------- .../media/zlm/dto/MediaSendRtpPortInfo.java | 50 +++++++ .../vmp/media/zlm/dto/MediaServerItem.java | 11 ++ .../zlm/dto/hook/HookResultForOnPublish.java | 10 ++ .../service/impl/MediaServerServiceImpl.java | 38 ++++- .../redisMsg/RedisGbPlayMsgListener.java | 4 +- .../vmp/storager/dao/MediaServerMapper.java | 4 + .../vmp/vmanager/bean/OtherRtpSendInfo.java | 2 +- .../iot/vmp/vmanager/rtp/RtpController.java | 55 ++------ .../src/components/dialog/MediaServerEdit.vue | 21 ++- 16 files changed, 234 insertions(+), 215 deletions(-) create mode 100644 sql/2.6.8补丁更新.sql create mode 100644 src/main/java/com/genersoft/iot/vmp/media/zlm/SendRtpPortManager.java create mode 100644 src/main/java/com/genersoft/iot/vmp/media/zlm/dto/MediaSendRtpPortInfo.java diff --git a/sql/2.6.8补丁更新.sql b/sql/2.6.8补丁更新.sql new file mode 100644 index 00000000..8ce9d543 --- /dev/null +++ b/sql/2.6.8补丁更新.sql @@ -0,0 +1,2 @@ +alter table media_server + add sendRtpPortRange varchar(50) not null; \ No newline at end of file diff --git a/src/main/java/com/genersoft/iot/vmp/conf/MediaConfig.java b/src/main/java/com/genersoft/iot/vmp/conf/MediaConfig.java index fa126794..cd228803 100644 --- a/src/main/java/com/genersoft/iot/vmp/conf/MediaConfig.java +++ b/src/main/java/com/genersoft/iot/vmp/conf/MediaConfig.java @@ -75,6 +75,9 @@ public class MediaConfig{ @Value("${media.rtp.port-range}") private String rtpPortRange; + @Value("${media.rtp.send-port-range}") + private String rtpSendPortRange; + @Value("${media.record-assist-port:0}") private Integer recordAssistPort = 0; @@ -206,6 +209,7 @@ public class MediaConfig{ mediaServerItem.setSecret(secret); mediaServerItem.setRtpEnable(rtpEnable); mediaServerItem.setRtpPortRange(rtpPortRange); + mediaServerItem.setSendRtpPortRange(rtpSendPortRange); mediaServerItem.setRecordAssistPort(recordAssistPort); mediaServerItem.setHookAliveInterval(30.00f); @@ -222,4 +226,11 @@ public class MediaConfig{ return false; } + public String getRtpSendPortRange() { + return rtpSendPortRange; + } + + public void setRtpSendPortRange(String rtpSendPortRange) { + this.rtpSendPortRange = rtpSendPortRange; + } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java index f40e0c87..e7091fbc 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java @@ -140,15 +140,7 @@ public class AckRequestProcessor extends SIPRequestProcessorParent implements In startSendRtpStreamHand(evt, sendRtpItem, parentPlatform, jsonObject, param, callIdHeader); }); }else { - // 如果是非严格模式,需要关闭端口占用 - JSONObject startSendRtpStreamResult = null; - if (sendRtpItem.getLocalPort() != 0) { - if (zlmrtpServerFactory.releasePort(mediaInfo, sendRtpItem.getSsrc())) { - startSendRtpStreamResult = zlmrtpServerFactory.startSendRtpStream(mediaInfo, param); - } - }else { - startSendRtpStreamResult = zlmrtpServerFactory.startSendRtpStream(mediaInfo, param); - } + JSONObject startSendRtpStreamResult = zlmrtpServerFactory.startSendRtpStream(mediaInfo, param); if (startSendRtpStreamResult != null) { startSendRtpStreamHand(evt, sendRtpItem, parentPlatform, startSendRtpStreamResult, param, callIdHeader); } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java index b7f600f7..073720aa 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java @@ -349,9 +349,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements } logger.info("[上级Invite] {}, 平台:{}, 通道:{}, 收流地址:{}:{},收流方式:{}, ssrc:{}", sessionName, username, channelId, addressStr, port, streamTypeStr, ssrc); SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(mediaServerItem, addressStr, port, ssrc, requesterId, - device.getDeviceId(), channelId, mediaTransmissionTCP, platform.isRtcp(), ssrcFromCallback -> { - return redisCatchStorage.querySendRTPServer(platform.getServerGBId(), channelId, null, callIdHeader.getCallId()) != null; - }); + device.getDeviceId(), channelId, mediaTransmissionTCP, platform.isRtcp()); if (tcpActive != null) { sendRtpItem.setTcpActive(tcpActive); @@ -553,9 +551,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements if (streamReady) { // 自平台内容 SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(mediaServerItem, addressStr, port, ssrc, requesterId, - gbStream.getApp(), gbStream.getStream(), channelId, mediaTransmissionTCP, platform.isRtcp(), ssrcFromCallback ->{ - return redisCatchStorage.querySendRTPServer(platform.getServerGBId(), channelId, null, callIdHeader.getCallId()) != null; - }); + gbStream.getApp(), gbStream.getStream(), channelId, mediaTransmissionTCP, platform.isRtcp()); if (sendRtpItem == null) { logger.warn("服务器端口资源不足"); @@ -594,9 +590,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements if (streamReady) { // 自平台内容 SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(mediaServerItem, addressStr, port, ssrc, requesterId, - gbStream.getApp(), gbStream.getStream(), channelId, mediaTransmissionTCP, platform.isRtcp(), ssrcFromCallback ->{ - return redisCatchStorage.querySendRTPServer(platform.getServerGBId(), channelId, null, callIdHeader.getCallId()) != null; - }); + gbStream.getApp(), gbStream.getStream(), channelId, mediaTransmissionTCP, platform.isRtcp()); if (sendRtpItem == null) { logger.warn("服务器端口资源不足"); @@ -713,9 +707,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements dynamicTask.stop(callIdHeader.getCallId()); if (serverId.equals(userSetting.getServerId())) { SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(mediaServerItem, addressStr, finalPort, ssrc, requesterId, - app, stream, channelId, mediaTransmissionTCP, platform.isRtcp(), ssrcFromCallback -> { - return redisCatchStorage.querySendRTPServer(platform.getServerGBId(), channelId, null, callIdHeader.getCallId()) != null; - }); + app, stream, channelId, mediaTransmissionTCP, platform.isRtcp()); if (sendRtpItem == null) { logger.warn("上级点时创建sendRTPItem失败,可能是服务器端口资源不足"); diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/SendRtpPortManager.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/SendRtpPortManager.java new file mode 100644 index 00000000..8366a4a2 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/SendRtpPortManager.java @@ -0,0 +1,55 @@ +package com.genersoft.iot.vmp.media.zlm; + +import com.genersoft.iot.vmp.conf.UserSetting; +import com.genersoft.iot.vmp.media.zlm.dto.MediaSendRtpPortInfo; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.data.redis.core.RedisTemplate; +import org.springframework.stereotype.Component; + +@Component +public class SendRtpPortManager { + + private final static Logger logger = LoggerFactory.getLogger(SendRtpPortManager.class); + + @Autowired + private UserSetting userSetting; + + @Autowired + private RedisTemplate redisTemplate; + + private final String KEY = "VM_MEDIA_SEND_RTP_PORT_RANGE_"; + + + public void initServerPort(String mediaServerId, int startPort, int endPort){ + String key = KEY + userSetting.getServerId() + "_" + mediaServerId; + MediaSendRtpPortInfo mediaSendRtpPortInfo = new MediaSendRtpPortInfo(startPort, endPort, mediaServerId); + redisTemplate.opsForValue().set(key, mediaSendRtpPortInfo); + } + + public int getNextPort(String mediaServerId) { + String key = KEY + userSetting.getServerId() + "_" + mediaServerId; + MediaSendRtpPortInfo mediaSendRtpPortInfo = (MediaSendRtpPortInfo)redisTemplate.opsForValue().get(key); + if (mediaSendRtpPortInfo == null) { + logger.warn("[发送端口管理] 获取{}的发送端口时未找到端口信息", mediaSendRtpPortInfo); + return 0; + } + int port; + if (mediaSendRtpPortInfo.getCurrent() %2 != 0) { + port = mediaSendRtpPortInfo.getCurrent() + 1; + }else { + port = mediaSendRtpPortInfo.getCurrent() + 2; + } + if (port > mediaSendRtpPortInfo.getEnd()) { + if (mediaSendRtpPortInfo.getStart() %2 != 0) { + port = mediaSendRtpPortInfo.getStart() + 1; + }else { + port = mediaSendRtpPortInfo.getStart(); + } + } + mediaSendRtpPortInfo.setCurrent(port); + redisTemplate.opsForValue().set(key, mediaSendRtpPortInfo); + return port; + } +} diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java index 5df3be44..d23c6c7e 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java @@ -23,7 +23,6 @@ import com.genersoft.iot.vmp.service.*; import com.genersoft.iot.vmp.service.bean.MessageForPushChannel; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.IVideoManagerStorage; -import com.genersoft.iot.vmp.utils.redis.RedisUtil; import com.genersoft.iot.vmp.vmanager.bean.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -223,9 +222,6 @@ public class ZLMHttpHookListener { HookResultForOnPublish result = HookResultForOnPublish.SUCCESS(); - if (!"rtp".equals(param.getApp())) { - result.setEnable_audio(true); - } taskExecutor.execute(() -> { ZlmHttpHookSubscribe.Event subscribe = this.subscribe.sendNotify(HookType.on_publish, json); @@ -259,20 +255,6 @@ public class ZLMHttpHookListener { } } - String receiveKey = VideoManagerConstants.WVP_OTHER_RECEIVE_RTP_INFO + userSetting.getServerId() + "*"; - // 将信息写入redis中,以备后用 - List scan = RedisUtil.scan(redisTemplate, receiveKey); - if (scan.size()>0) { - for (Object o : scan) { - String key = (String) o; - OtherRtpSendInfo otherRtpSendInfo = (OtherRtpSendInfo)redisTemplate.opsForValue().get(key); - if (otherRtpSendInfo != null && otherRtpSendInfo.getStream().equalsIgnoreCase(param.getStream())) { - result.setEnable_audio(true); - result.setEnable_mp4(true); - } - } - } - if (mediaInfo.getRecordAssistPort() > 0 && userSetting.getRecordPath() == null) { logger.info("推流时发现尚未设置录像路径,从assist服务中读取"); JSONObject info = assistRESTfulUtils.getInfo(mediaInfo, null); @@ -291,6 +273,18 @@ public class ZLMHttpHookListener { } } } + if (param.getApp().equalsIgnoreCase("rtp")) { + String receiveKey = VideoManagerConstants.WVP_OTHER_RECEIVE_RTP_INFO + userSetting.getServerId() + "_" + param.getStream(); + System.out.println(receiveKey); + OtherRtpSendInfo otherRtpSendInfo = (OtherRtpSendInfo)redisTemplate.opsForValue().get(receiveKey); + System.out.println("otherRtpSendInfo != null ====>" + (otherRtpSendInfo != null)); + if (otherRtpSendInfo != null) { + System.out.println("otherRtpSendInfo != null"); + result.setEnable_audio(true); + result.setEnable_mp4(true); + } + } + logger.info("[ZLM HOOK]推流鉴权 响应:{}->{}->>>>{}", param.getMediaServerId(), param, result); return result; } diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java index 9c5a4728..af4b3918 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java @@ -1,20 +1,18 @@ package com.genersoft.iot.vmp.media.zlm; import com.alibaba.fastjson2.JSON; -import com.alibaba.fastjson2.JSONArray; import com.alibaba.fastjson2.JSONObject; import com.genersoft.iot.vmp.common.CommonCallback; import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem; -import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory; -import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForRtpServerTimeout; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; -import java.util.*; +import java.util.HashMap; +import java.util.Map; @Component public class ZLMRTPServerFactory { @@ -30,68 +28,8 @@ public class ZLMRTPServerFactory { @Autowired private ZlmHttpHookSubscribe hookSubscribe; - private int[] portRangeArray = new int[2]; - - public int getFreePort(MediaServerItem mediaServerItem, int startPort, int endPort, List usedFreelist) { - if (endPort <= startPort) { - return -1; - } - if (usedFreelist == null) { - usedFreelist = new ArrayList<>(); - } - JSONObject listRtpServerJsonResult = zlmresTfulUtils.listRtpServer(mediaServerItem); - if (listRtpServerJsonResult != null) { - JSONArray data = listRtpServerJsonResult.getJSONArray("data"); - if (data != null) { - for (int i = 0; i < data.size(); i++) { - JSONObject dataItem = data.getJSONObject(i); - usedFreelist.add(dataItem.getInteger("port")); - } - } - } - - Map param = new HashMap<>(); - int result = -1; - // 设置推流端口 - if (startPort%2 == 1) { - startPort ++; - } - boolean checkPort = false; - for (int i = startPort; i < endPort + 1; i+=2) { - if (!usedFreelist.contains(i)){ - checkPort = true; - startPort = i; - break; - } - } - if (!checkPort) { - logger.warn("未找到节点{}上范围[{}-{}]的空闲端口", mediaServerItem.getId(), startPort, endPort); - return -1; - } - param.put("port", startPort); - String stream = UUID.randomUUID().toString(); - param.put("enable_tcp", 1); - param.put("stream_id", stream); -// param.put("port", 0); - JSONObject openRtpServerResultJson = zlmresTfulUtils.openRtpServer(mediaServerItem, param); - - if (openRtpServerResultJson != null) { - if (openRtpServerResultJson.getInteger("code") == 0) { - result= openRtpServerResultJson.getInteger("port"); - Map closeRtpServerParam = new HashMap<>(); - closeRtpServerParam.put("stream_id", stream); - zlmresTfulUtils.closeRtpServer(mediaServerItem, closeRtpServerParam); - }else { - usedFreelist.add(startPort); - startPort +=2; - result = getFreePort(mediaServerItem, startPort, endPort,usedFreelist); - } - }else { - // 检查ZLM状态 - logger.error("创建RTP Server 失败 {}: 请检查ZLM服务", param.get("port")); - } - return result; - } + @Autowired + private SendRtpPortManager sendRtpPortManager; /** * 开启rtpServer @@ -220,13 +158,13 @@ public class ZLMRTPServerFactory { * @return SendRtpItem */ public SendRtpItem createSendRtpItem(MediaServerItem serverItem, String ip, int port, String ssrc, String platformId, - String deviceId, String channelId, boolean tcp, boolean rtcp, KeepPortCallback callback){ + String deviceId, String channelId, boolean tcp, boolean rtcp){ // 默认为随机端口 int localPort = 0; if (userSetting.getGbSendStreamStrict()) { if (userSetting.getGbSendStreamStrict()) { - localPort = keepPort(serverItem, ssrc, localPort, callback); + localPort = sendRtpPortManager.getNextPort(serverItem.getId()); if (localPort == 0) { return null; } @@ -259,11 +197,11 @@ public class ZLMRTPServerFactory { * @return SendRtpItem */ public SendRtpItem createSendRtpItem(MediaServerItem serverItem, String ip, int port, String ssrc, String platformId, - String app, String stream, String channelId, boolean tcp, boolean rtcp, KeepPortCallback callback){ + String app, String stream, String channelId, boolean tcp, boolean rtcp){ // 默认为随机端口 int localPort = 0; if (userSetting.getGbSendStreamStrict()) { - localPort = keepPort(serverItem, ssrc, localPort, callback); + localPort = sendRtpPortManager.getNextPort(serverItem.getId()); if (localPort == 0) { return null; } @@ -284,58 +222,6 @@ public class ZLMRTPServerFactory { return sendRtpItem; } - public interface KeepPortCallback{ - Boolean keep(String ssrc); - } - - /** - * 保持端口,直到需要需要发流时再释放 - */ - public int keepPort(MediaServerItem serverItem, String ssrc, int localPort, KeepPortCallback keepPortCallback) { - Map param = new HashMap<>(3); - param.put("port", localPort); - param.put("enable_tcp", 1); - param.put("stream_id", ssrc); - JSONObject jsonObject = zlmresTfulUtils.openRtpServer(serverItem, param); - if (jsonObject.getInteger("code") == 0) { - localPort = jsonObject.getInteger("port"); - HookSubscribeForRtpServerTimeout hookSubscribeForRtpServerTimeout = HookSubscribeFactory.on_rtp_server_timeout(ssrc, null, serverItem.getId()); - // 订阅 zlm启动事件, 新的zlm也会从这里进入系统 - Integer finalLocalPort = localPort; - hookSubscribe.addSubscribe(hookSubscribeForRtpServerTimeout, - (MediaServerItem mediaServerItem, JSONObject response)->{ - System.out.println("监听端口到期继续保持监听"); - System.out.println(response); - if (ssrc.equals(response.getString("stream_id"))) { - if (keepPortCallback.keep(ssrc)) { - logger.info("[上级点播] {}->监听端口到期继续保持监听", ssrc); - keepPort(serverItem, ssrc, finalLocalPort, keepPortCallback); - }else { - logger.info("[上级点播] {}->发送取消,无需继续监听", ssrc); - releasePort(serverItem, ssrc); - } - } - - }); - logger.info("[上级点播] {}->监听端口: {}", ssrc, localPort); - }else { - logger.info("[上级点播] 监听端口失败: {}", ssrc); - } - return localPort; - } - - /** - * 释放保持的端口 - */ - public boolean releasePort(MediaServerItem serverItem, String ssrc) { - logger.info("[上级点播] {}->释放监听端口", ssrc); - boolean closeRTPServerResult = closeRtpServer(serverItem, ssrc); - HookSubscribeForRtpServerTimeout hookSubscribeForRtpServerTimeout = HookSubscribeFactory.on_rtp_server_timeout(ssrc, null, serverItem.getId()); - // 订阅 zlm启动事件, 新的zlm也会从这里进入系统 - hookSubscribe.removeSubscribe(hookSubscribeForRtpServerTimeout); - return closeRTPServerResult; - } - /** * 调用zlm RESTFUL API —— startSendRtp */ diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/MediaSendRtpPortInfo.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/MediaSendRtpPortInfo.java new file mode 100644 index 00000000..2e9f6317 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/MediaSendRtpPortInfo.java @@ -0,0 +1,50 @@ +package com.genersoft.iot.vmp.media.zlm.dto; + +public class MediaSendRtpPortInfo { + + private int start; + private int end; + private String mediaServerId; + + private int current; + + + public MediaSendRtpPortInfo(int start, int end, String mediaServerId) { + this.start = start; + this.current = start; + this.end = end; + this.mediaServerId = mediaServerId; + } + + public int getStart() { + return start; + } + + public void setStart(int start) { + this.start = start; + } + + public int getEnd() { + return end; + } + + public void setEnd(int end) { + this.end = end; + } + + public String getMediaServerId() { + return mediaServerId; + } + + public void setMediaServerId(String mediaServerId) { + this.mediaServerId = mediaServerId; + } + + public int getCurrent() { + return current; + } + + public void setCurrent(int current) { + this.current = current; + } +} diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/MediaServerItem.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/MediaServerItem.java index e6bbb5fa..066a6776 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/MediaServerItem.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/MediaServerItem.java @@ -62,6 +62,9 @@ public class MediaServerItem{ @Schema(description = "多端口RTP收流端口范围") private String rtpPortRange; + @Schema(description = "RTP发流端口范围") + private String sendRtpPortRange; + @Schema(description = "assist服务端口") private int recordAssistPort; @@ -297,4 +300,12 @@ public class MediaServerItem{ public void setHookAliveInterval(Float hookAliveInterval) { this.hookAliveInterval = hookAliveInterval; } + + public String getSendRtpPortRange() { + return sendRtpPortRange; + } + + public void setSendRtpPortRange(String sendRtpPortRange) { + this.sendRtpPortRange = sendRtpPortRange; + } } diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/hook/HookResultForOnPublish.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/hook/HookResultForOnPublish.java index cb8e7383..68d969f4 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/hook/HookResultForOnPublish.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/hook/HookResultForOnPublish.java @@ -50,4 +50,14 @@ public class HookResultForOnPublish extends HookResult{ public void setMp4_save_path(String mp4_save_path) { this.mp4_save_path = mp4_save_path; } + + @Override + public String toString() { + return "HookResultForOnPublish{" + + "enable_audio=" + enable_audio + + ", enable_mp4=" + enable_mp4 + + ", mp4_max_second=" + mp4_max_second + + ", mp4_save_path='" + mp4_save_path + '\'' + + '}'; + } } diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java index eaa6a9fb..789974b8 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java @@ -11,10 +11,7 @@ import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.conf.exception.ControllerException; import com.genersoft.iot.vmp.gb28181.event.EventPublisher; import com.genersoft.iot.vmp.gb28181.session.SSRCFactory; -import com.genersoft.iot.vmp.media.zlm.AssistRESTfulUtils; -import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils; -import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory; -import com.genersoft.iot.vmp.media.zlm.ZLMServerConfig; +import com.genersoft.iot.vmp.media.zlm.*; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import com.genersoft.iot.vmp.media.zlm.dto.ServerKeepaliveData; import com.genersoft.iot.vmp.service.IMediaServerService; @@ -69,6 +66,10 @@ public class MediaServerServiceImpl implements IMediaServerService { @Autowired private UserSetting userSetting; + @Autowired + private SendRtpPortManager sendRtpPortManager; + + @Autowired private AssistRESTfulUtils assistRESTfulUtils; @@ -115,13 +116,40 @@ public class MediaServerServiceImpl implements IMediaServerService { if (ssrcFactory.hasMediaServerSSRC(mediaServerItem.getId())) { ssrcFactory.initMediaServerSSRC(mediaServerItem.getId(), null); } + if (userSetting.getGbSendStreamStrict()) { + int startPort = 50000; + int endPort = 60000; + String sendRtpPortRange = mediaServerItem.getSendRtpPortRange(); + if (sendRtpPortRange == null) { + logger.warn("[zlm] ] 未配置发流端口范围,默认使用50000到60000"); + }else { + String[] sendRtpPortRangeArray = sendRtpPortRange.trim().split(","); + if (sendRtpPortRangeArray.length != 2) { + logger.warn("[zlm] ] 发流端口范围错误,默认使用50000到60000"); + }else { + try { + startPort = Integer.parseInt(sendRtpPortRangeArray[0]); + endPort = Integer.parseInt(sendRtpPortRangeArray[1]); + if (endPort <= startPort) { + logger.warn("[zlm] ] 发流端口范围错误,结束端口应大于开始端口,使用默认端口"); + startPort = 50000; + endPort = 60000; + } + + }catch (NumberFormatException e) { + logger.warn("[zlm] ] 发流端口范围错误,默认使用50000到60000"); + } + } + } + logger.info("[[zlm] ] 配置发流端口范围,{}-{}", startPort, endPort); + sendRtpPortManager.initServerPort(mediaServerItem.getId(), startPort, endPort); + } // 查询redis是否存在此mediaServer String key = VideoManagerConstants.MEDIA_SERVER_PREFIX + userSetting.getServerId() + "_" + mediaServerItem.getId(); Boolean hasKey = redisTemplate.hasKey(key); if (hasKey != null && ! hasKey) { redisTemplate.opsForValue().set(key, mediaServerItem); } - } } diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGbPlayMsgListener.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGbPlayMsgListener.java index 8231fb33..33eb5c1f 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGbPlayMsgListener.java +++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGbPlayMsgListener.java @@ -317,9 +317,7 @@ public class RedisGbPlayMsgListener implements MessageListener { SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(mediaServerItem, content.getIp(), content.getPort(), content.getSsrc(), content.getPlatformId(), content.getApp(), content.getStream(), content.getChannelId(), - content.getTcp(), content.getRtcp(), ssrcFromCallback -> { - return querySendRTPServer(content.getPlatformId(), content.getChannelId(), content.getStream(), null) != null; - }); + content.getTcp(), content.getRtcp()); WVPResult result = new WVPResult<>(); result.setCode(0); diff --git a/src/main/java/com/genersoft/iot/vmp/storager/dao/MediaServerMapper.java b/src/main/java/com/genersoft/iot/vmp/storager/dao/MediaServerMapper.java index 97e74ae8..e222ba86 100644 --- a/src/main/java/com/genersoft/iot/vmp/storager/dao/MediaServerMapper.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/dao/MediaServerMapper.java @@ -28,6 +28,7 @@ public interface MediaServerMapper { "secret, " + "rtpEnable, " + "rtpPortRange, " + + "sendRtpPortRange, " + "recordAssistPort, " + "defaultServer, " + "createTime, " + @@ -51,6 +52,7 @@ public interface MediaServerMapper { "#{secret}, " + "#{rtpEnable}, " + "#{rtpPortRange}, " + + "#{sendRtpPortRange}, " + "#{recordAssistPort}, " + "#{defaultServer}, " + "#{createTime}, " + @@ -75,6 +77,7 @@ public interface MediaServerMapper { ", autoConfig=#{autoConfig}" + ", rtpEnable=#{rtpEnable}" + ", rtpPortRange=#{rtpPortRange}" + + ", sendRtpPortRange=#{sendRtpPortRange}" + ", secret=#{secret}" + ", recordAssistPort=#{recordAssistPort}" + ", hookAliveInterval=#{hookAliveInterval}" + @@ -98,6 +101,7 @@ public interface MediaServerMapper { ", autoConfig=#{autoConfig}" + ", rtpEnable=#{rtpEnable}" + ", rtpPortRange=#{rtpPortRange}" + + ", sendRtpPortRange=#{sendRtpPortRange}" + ", secret=#{secret}" + ", recordAssistPort=#{recordAssistPort}" + ", hookAliveInterval=#{hookAliveInterval}" + diff --git a/src/main/java/com/genersoft/iot/vmp/vmanager/bean/OtherRtpSendInfo.java b/src/main/java/com/genersoft/iot/vmp/vmanager/bean/OtherRtpSendInfo.java index 225e40c4..9fea5c54 100644 --- a/src/main/java/com/genersoft/iot/vmp/vmanager/bean/OtherRtpSendInfo.java +++ b/src/main/java/com/genersoft/iot/vmp/vmanager/bean/OtherRtpSendInfo.java @@ -124,7 +124,7 @@ public class OtherRtpSendInfo { @Override public String toString() { return "OtherRtpSendInfo{" + - "ip='" + ip + '\'' + + " ip='" + ip + '\'' + ", port=" + port + ", receiveIp='" + receiveIp + '\'' + ", receivePort=" + receivePort + diff --git a/src/main/java/com/genersoft/iot/vmp/vmanager/rtp/RtpController.java b/src/main/java/com/genersoft/iot/vmp/vmanager/rtp/RtpController.java index c06c4af1..ad021ba4 100644 --- a/src/main/java/com/genersoft/iot/vmp/vmanager/rtp/RtpController.java +++ b/src/main/java/com/genersoft/iot/vmp/vmanager/rtp/RtpController.java @@ -3,20 +3,16 @@ package com.genersoft.iot.vmp.vmanager.rtp; import com.alibaba.fastjson2.JSONObject; import com.genersoft.iot.vmp.common.VideoManagerConstants; import com.genersoft.iot.vmp.conf.DynamicTask; -import com.genersoft.iot.vmp.conf.SipConfig; import com.genersoft.iot.vmp.conf.UserSetting; -import com.genersoft.iot.vmp.conf.VersionInfo; import com.genersoft.iot.vmp.conf.exception.ControllerException; +import com.genersoft.iot.vmp.media.zlm.SendRtpPortManager; import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory; import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe; import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory; import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForRtpServerTimeout; import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; -import com.genersoft.iot.vmp.service.IDeviceChannelService; -import com.genersoft.iot.vmp.service.IDeviceService; import com.genersoft.iot.vmp.service.IMediaServerService; -import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.vmanager.bean.ErrorCode; import com.genersoft.iot.vmp.vmanager.bean.OtherRtpSendInfo; import io.swagger.v3.oas.annotations.Operation; @@ -27,7 +23,6 @@ import okhttp3.Request; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Value; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.util.ObjectUtils; import org.springframework.web.bind.annotation.*; @@ -36,6 +31,7 @@ import java.io.IOException; import java.util.HashMap; import java.util.Map; import java.util.UUID; +import java.util.concurrent.TimeUnit; @SuppressWarnings("rawtypes") @Tag(name = "第三方服务对接") @@ -56,20 +52,11 @@ public class RtpController { private IMediaServerService mediaServerService; @Autowired - private VersionInfo versionInfo; - - @Autowired - private SipConfig sipConfig; + private SendRtpPortManager sendRtpPortManager; @Autowired private UserSetting userSetting; - @Autowired - private IDeviceService deviceService; - - @Autowired - private IDeviceChannelService channelService; - @Autowired private DynamicTask dynamicTask; @@ -78,14 +65,6 @@ public class RtpController { private RedisTemplate redisTemplate; - @Value("${server.port}") - private int serverPort; - - - @Autowired - private IRedisCatchStorage redisCatchStorage; - - @GetMapping(value = "/receive/open") @ResponseBody @Operation(summary = "开启收流和获取发流信息") @@ -145,24 +124,15 @@ public class RtpController { otherRtpSendInfo.setReceivePort(localPort); otherRtpSendInfo.setCallId(callId); otherRtpSendInfo.setStream(stream); - String receiveKey = VideoManagerConstants.WVP_OTHER_RECEIVE_RTP_INFO + userSetting.getServerId() + stream; + String receiveKey = VideoManagerConstants.WVP_OTHER_RECEIVE_RTP_INFO + userSetting.getServerId() + "_" + stream; // 将信息写入redis中,以备后用 redisTemplate.opsForValue().set(receiveKey, otherRtpSendInfo); if (isSend != null && isSend) { - String key = VideoManagerConstants.WVP_OTHER_SEND_RTP_INFO + userSetting.getServerId() + callId; + String key = VideoManagerConstants.WVP_OTHER_SEND_RTP_INFO + userSetting.getServerId() + "_" + callId; // 预创建发流信息 - int port = zlmServerFactory.keepPort(mediaServerItem, callId, 0, ssrc1 -> { - return redisTemplate.opsForValue().get(key) != null; - }); - + int port = sendRtpPortManager.getNextPort(mediaServerItem.getId()); // 将信息写入redis中,以备后用 - redisTemplate.opsForValue().set(key, otherRtpSendInfo); - // 设置超时任务,超时未使用,则自动移除,并关闭端口保持, 默认五分钟 - dynamicTask.startDelay(key, ()->{ - logger.info("[第三方服务对接->开启收流和获取发流信息] 端口保持超时 callId->{}", callId); - redisTemplate.delete(key); - zlmServerFactory.releasePort(mediaServerItem, callId); - }, 15000); + redisTemplate.opsForValue().set(key, otherRtpSendInfo, 300, TimeUnit.SECONDS); otherRtpSendInfo.setIp(mediaServerItem.getSdpIp()); otherRtpSendInfo.setPort(port); logger.info("[第三方服务对接->开启收流和获取发流信息] 结果,callId->{}, {}", callId, otherRtpSendInfo); @@ -178,7 +148,7 @@ public class RtpController { logger.info("[第三方服务对接->关闭收流] stream->{}", stream); MediaServerItem mediaServerItem = mediaServerService.getDefaultMediaServer(); zlmServerFactory.closeRtpServer(mediaServerItem,stream); - String receiveKey = VideoManagerConstants.WVP_OTHER_RECEIVE_RTP_INFO + userSetting.getServerId() + stream; + String receiveKey = VideoManagerConstants.WVP_OTHER_RECEIVE_RTP_INFO + userSetting.getServerId() + "_" + stream; // 将信息写入redis中,以备后用 redisTemplate.delete(receiveKey); } @@ -203,11 +173,9 @@ public class RtpController { streamType = 1; } MediaServerItem mediaServerItem = mediaServerService.getDefaultMediaServer(); - String key = VideoManagerConstants.WVP_OTHER_SEND_RTP_INFO + userSetting.getServerId() + callId; + String key = VideoManagerConstants.WVP_OTHER_SEND_RTP_INFO + userSetting.getServerId() + "_" + callId; OtherRtpSendInfo sendInfo = (OtherRtpSendInfo)redisTemplate.opsForValue().get(key); - if (sendInfo != null) { - zlmServerFactory.releasePort(mediaServerItem, callId); - }else { + if (sendInfo == null) { sendInfo = new OtherRtpSendInfo(); } sendInfo.setPushApp(app); @@ -229,7 +197,6 @@ public class RtpController { param.put("only_audio", onlyAudio ? "1" : "0"); param.put("pt", pt); - dynamicTask.stop(key); Boolean streamReady = zlmServerFactory.isStreamReady(mediaServerItem, app, stream); if (streamReady) { logger.info("[第三方服务对接->发送流] 流存在,开始发流,callId->{}", callId); @@ -279,7 +246,7 @@ public class RtpController { @Parameter(name = "callId", description = "整个过程的唯一标识,不传则使用随机端口发流", required = true) public void closeSendRTP(String callId) { logger.info("[第三方服务对接->关闭发送流] callId->{}", callId); - String key = VideoManagerConstants.WVP_OTHER_SEND_RTP_INFO + userSetting.getServerId() + callId; + String key = VideoManagerConstants.WVP_OTHER_SEND_RTP_INFO + userSetting.getServerId() + "_" + callId; OtherRtpSendInfo sendInfo = (OtherRtpSendInfo)redisTemplate.opsForValue().get(key); if (sendInfo == null){ throw new ControllerException(ErrorCode.ERROR100.getCode(), "未开启发流"); diff --git a/web_src/src/components/dialog/MediaServerEdit.vue b/web_src/src/components/dialog/MediaServerEdit.vue index 15923c1f..bb55048c 100644 --- a/web_src/src/components/dialog/MediaServerEdit.vue +++ b/web_src/src/components/dialog/MediaServerEdit.vue @@ -89,6 +89,11 @@ - + + + - + + @@ -172,6 +177,7 @@ export default { rtmpSSlPort: "", rtpEnable: false, rtpPortRange: "", + sendRtpPortRange: "", rtpProxyPort: "", rtspPort: "", rtspSSLPort: "", @@ -179,6 +185,9 @@ export default { rtpPortRange1:30000, rtpPortRange2:30500, + sendRtpPortRange1:50000, + sendRtpPortRange2:60000, + rules: { ip: [{ required: true, validator: isValidIp, message: '请输入有效的IP地址', trigger: 'blur' }], httpPort: [{ required: true, validator: isValidPort, message: '请输入有效的端口号', trigger: 'blur' }], @@ -214,10 +223,15 @@ export default { this.currentStep = 3; if (param.rtpPortRange) { let rtpPortRange = this.mediaServerForm.rtpPortRange.split(","); + let sendRtpPortRange = this.mediaServerForm.sendRtpPortRange.split(","); if (rtpPortRange.length > 0) { this.rtpPortRange1 = rtpPortRange[0] this.rtpPortRange2 = rtpPortRange[1] } + if (sendRtpPortRange.length > 0) { + this.sendRtpPortRange1 = sendRtpPortRange[0] + this.sendRtpPortRange2 = sendRtpPortRange[1] + } } } }, @@ -240,6 +254,8 @@ export default { that.mediaServerForm.autoConfig = true; that.rtpPortRange1 = 30000 that.rtpPortRange2 = 30500 + that.sendRtpPortRange1 = 50000 + that.sendRtpPortRange2 = 60000 that.serverCheck = 1; }else { that.serverCheck = -1; @@ -321,12 +337,15 @@ export default { rtmpSSlPort: "", rtpEnable: false, rtpPortRange: "", + sendRtpPortRange: "", rtpProxyPort: "", rtspPort: "", rtspSSLPort: "", }; this.rtpPortRange1 = 30500; this.rtpPortRange2 = 30500; + this.sendRtpPortRange1 = 50000; + this.sendRtpPortRange2 = 60000; this.listChangeCallback = null this.currentStep = 1; }, @@ -351,7 +370,7 @@ export default { portRangeChange: function() { if (this.mediaServerForm.rtpEnable) { this.mediaServerForm.rtpPortRange = this.rtpPortRange1 + "," + this.rtpPortRange2 - console.log(this.mediaServerForm.rtpPortRange) + this.mediaServerForm.sendRtpPortRange = this.sendRtpPortRange1 + "," + this.sendRtpPortRange2 } } }, From f525b5572988326c4b73da9f68e7ee7e292a2e46 Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: Wed, 12 Jul 2023 15:13:53 +0800 Subject: [PATCH 3/4] =?UTF-8?q?=E4=BF=AE=E5=A4=8D=E5=8F=91=E6=B5=81?= =?UTF-8?q?=E5=A4=8D=E7=9B=96=E7=9A=84=E4=B8=BA=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../vmp/media/zlm/ZLMRTPServerFactory.java | 5 ++++- .../iot/vmp/vmanager/rtp/RtpController.java | 20 ++++++++++++++----- 2 files changed, 19 insertions(+), 6 deletions(-) diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java index af4b3918..82a0faf4 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java @@ -85,7 +85,10 @@ public class ZLMRTPServerFactory { }else { param.put("port", port); } - param.put("ssrc", ssrc); + if (ssrc != 0) { + param.put("ssrc", ssrc); + } + JSONObject openRtpServerResultJson = zlmresTfulUtils.openRtpServer(mediaServerItem, param); logger.info(JSONObject.toJSONString(openRtpServerResultJson)); if (openRtpServerResultJson != null) { diff --git a/src/main/java/com/genersoft/iot/vmp/vmanager/rtp/RtpController.java b/src/main/java/com/genersoft/iot/vmp/vmanager/rtp/RtpController.java index ad021ba4..b7a7e152 100644 --- a/src/main/java/com/genersoft/iot/vmp/vmanager/rtp/RtpController.java +++ b/src/main/java/com/genersoft/iot/vmp/vmanager/rtp/RtpController.java @@ -13,6 +13,7 @@ import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForRtpServerTimeout; import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import com.genersoft.iot.vmp.service.IMediaServerService; +import com.genersoft.iot.vmp.utils.redis.RedisUtil; import com.genersoft.iot.vmp.vmanager.bean.ErrorCode; import com.genersoft.iot.vmp.vmanager.bean.OtherRtpSendInfo; import io.swagger.v3.oas.annotations.Operation; @@ -29,6 +30,7 @@ import org.springframework.web.bind.annotation.*; import java.io.IOException; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.UUID; import java.util.concurrent.TimeUnit; @@ -98,6 +100,7 @@ public class RtpController { } } + String receiveKey = VideoManagerConstants.WVP_OTHER_RECEIVE_RTP_INFO + userSetting.getServerId() + "_" + callId + "_" + stream; int localPort = zlmServerFactory.createRTPServer(mediaServerItem, stream, ssrcInt, null, false, tcpMode); // 注册回调如果rtp收流超时则通过回调发送通知 if (callBack != null) { @@ -107,6 +110,8 @@ public class RtpController { (mediaServerItemInUse, response)->{ if (stream.equals(response.getString("stream_id"))) { logger.info("[第三方服务对接->开启收流和获取发流信息] 等待收流超时 callId->{}, 发送回调", callId); + // 将信息写入redis中,以备后用 + redisTemplate.delete(receiveKey); OkHttpClient.Builder httpClientBuilder = new OkHttpClient.Builder(); OkHttpClient client = httpClientBuilder.build(); String url = callBack + "?callId=" + callId; @@ -124,7 +129,7 @@ public class RtpController { otherRtpSendInfo.setReceivePort(localPort); otherRtpSendInfo.setCallId(callId); otherRtpSendInfo.setStream(stream); - String receiveKey = VideoManagerConstants.WVP_OTHER_RECEIVE_RTP_INFO + userSetting.getServerId() + "_" + stream; + // 将信息写入redis中,以备后用 redisTemplate.opsForValue().set(receiveKey, otherRtpSendInfo); if (isSend != null && isSend) { @@ -148,9 +153,14 @@ public class RtpController { logger.info("[第三方服务对接->关闭收流] stream->{}", stream); MediaServerItem mediaServerItem = mediaServerService.getDefaultMediaServer(); zlmServerFactory.closeRtpServer(mediaServerItem,stream); - String receiveKey = VideoManagerConstants.WVP_OTHER_RECEIVE_RTP_INFO + userSetting.getServerId() + "_" + stream; - // 将信息写入redis中,以备后用 - redisTemplate.delete(receiveKey); + String receiveKey = VideoManagerConstants.WVP_OTHER_RECEIVE_RTP_INFO + userSetting.getServerId() + "_*_" + stream; + List scan = RedisUtil.scan(redisTemplate, receiveKey); + if (scan.size() > 0) { + for (Object key : scan) { + // 将信息写入redis中,以备后用 + redisTemplate.delete(key); + } + } } @GetMapping(value = "/send/start") @@ -224,7 +234,7 @@ public class RtpController { hookSubscribe.addSubscribe(hookSubscribeForStreamChange, (mediaServerItemInUse, response)->{ dynamicTask.stop(uuid); - logger.info("[第三方服务对接->发送流] 流上线,开始发流 callId->{}", callId); + logger.info("[第三方服务对接->发送流] 流上线,开始发流 callId->{},param->{}", callId, JSONObject.toJSONString(param)); JSONObject jsonObject = zlmServerFactory.startSendRtpStream(mediaServerItem, param); System.out.println("========发流结果=========="); System.out.println(jsonObject); From 9fc3db1f5e381378fd54818e0ba017df6be96fa9 Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: Thu, 13 Jul 2023 14:30:41 +0800 Subject: [PATCH 4/4] =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E5=8F=91=E6=B5=81?= =?UTF-8?q?=E9=9F=B3=E8=A7=86=E9=A2=91=E5=88=86=E5=BC=80=E5=8F=91=E9=80=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../iot/vmp/media/zlm/dto/HookType.java | 3 +- .../vmp/vmanager/bean/OtherRtpSendInfo.java | 89 +++++--- .../iot/vmp/vmanager/rtp/RtpController.java | 198 +++++++++++++----- 3 files changed, 211 insertions(+), 79 deletions(-) diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/HookType.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/HookType.java index a4557e9a..235cea7b 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/HookType.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/HookType.java @@ -21,5 +21,6 @@ public enum HookType { on_server_started, on_rtp_server_timeout, - on_server_keepalive + on_server_keepalive, + on_send_rtp_stopped } diff --git a/src/main/java/com/genersoft/iot/vmp/vmanager/bean/OtherRtpSendInfo.java b/src/main/java/com/genersoft/iot/vmp/vmanager/bean/OtherRtpSendInfo.java index 9fea5c54..75c05d3b 100644 --- a/src/main/java/com/genersoft/iot/vmp/vmanager/bean/OtherRtpSendInfo.java +++ b/src/main/java/com/genersoft/iot/vmp/vmanager/bean/OtherRtpSendInfo.java @@ -5,12 +5,17 @@ public class OtherRtpSendInfo { /** * 发流IP */ - private String ip; + private String sendLocalIp; /** - * 发流端口 + * 音频发流端口 */ - private int port; + private int sendLocalPortForAudio; + + /** + * 视频发流端口 + */ + private int sendLocalPortForVideo; /** * 收流IP @@ -18,9 +23,14 @@ public class OtherRtpSendInfo { private String receiveIp; /** - * 收流端口 + * 音频收流端口 */ - private int receivePort; + private int receivePortForAudio; + + /** + * 视频收流端口 + */ + private int receivePortForVideo; /** * 会话ID @@ -48,23 +58,6 @@ public class OtherRtpSendInfo { private String pushSSRC; - - public String getIp() { - return ip; - } - - public void setIp(String ip) { - this.ip = ip; - } - - public int getPort() { - return port; - } - - public void setPort(int port) { - this.port = port; - } - public String getReceiveIp() { return receiveIp; } @@ -73,12 +66,20 @@ public class OtherRtpSendInfo { this.receiveIp = receiveIp; } - public int getReceivePort() { - return receivePort; + public int getReceivePortForAudio() { + return receivePortForAudio; } - public void setReceivePort(int receivePort) { - this.receivePort = receivePort; + public void setReceivePortForAudio(int receivePortForAudio) { + this.receivePortForAudio = receivePortForAudio; + } + + public int getReceivePortForVideo() { + return receivePortForVideo; + } + + public void setReceivePortForVideo(int receivePortForVideo) { + this.receivePortForVideo = receivePortForVideo; } public String getCallId() { @@ -121,15 +122,45 @@ public class OtherRtpSendInfo { this.pushSSRC = pushSSRC; } + + public String getSendLocalIp() { + return sendLocalIp; + } + + public void setSendLocalIp(String sendLocalIp) { + this.sendLocalIp = sendLocalIp; + } + + public int getSendLocalPortForAudio() { + return sendLocalPortForAudio; + } + + public void setSendLocalPortForAudio(int sendLocalPortForAudio) { + this.sendLocalPortForAudio = sendLocalPortForAudio; + } + + public int getSendLocalPortForVideo() { + return sendLocalPortForVideo; + } + + public void setSendLocalPortForVideo(int sendLocalPortForVideo) { + this.sendLocalPortForVideo = sendLocalPortForVideo; + } + @Override public String toString() { return "OtherRtpSendInfo{" + - " ip='" + ip + '\'' + - ", port=" + port + + "sendLocalIp='" + sendLocalIp + '\'' + + ", sendLocalPortForAudio=" + sendLocalPortForAudio + + ", sendLocalPortForVideo=" + sendLocalPortForVideo + ", receiveIp='" + receiveIp + '\'' + - ", receivePort=" + receivePort + + ", receivePortForAudio=" + receivePortForAudio + + ", receivePortForVideo=" + receivePortForVideo + ", callId='" + callId + '\'' + ", stream='" + stream + '\'' + + ", pushApp='" + pushApp + '\'' + + ", pushStream='" + pushStream + '\'' + + ", pushSSRC='" + pushSSRC + '\'' + '}'; } } diff --git a/src/main/java/com/genersoft/iot/vmp/vmanager/rtp/RtpController.java b/src/main/java/com/genersoft/iot/vmp/vmanager/rtp/RtpController.java index b7a7e152..a9a66f5c 100644 --- a/src/main/java/com/genersoft/iot/vmp/vmanager/rtp/RtpController.java +++ b/src/main/java/com/genersoft/iot/vmp/vmanager/rtp/RtpController.java @@ -98,10 +98,13 @@ public class RtpController { }catch (NumberFormatException e) { throw new ControllerException(ErrorCode.ERROR100.getCode(),"ssrc格式错误"); } - } String receiveKey = VideoManagerConstants.WVP_OTHER_RECEIVE_RTP_INFO + userSetting.getServerId() + "_" + callId + "_" + stream; - int localPort = zlmServerFactory.createRTPServer(mediaServerItem, stream, ssrcInt, null, false, tcpMode); + int localPortForVideo = zlmServerFactory.createRTPServer(mediaServerItem, stream, ssrcInt, null, false, tcpMode); + int localPortForAudio = zlmServerFactory.createRTPServer(mediaServerItem, stream + "_a" , ssrcInt, null, false, tcpMode); + if (localPortForVideo == 0 || localPortForAudio == 0) { + throw new ControllerException(ErrorCode.ERROR100.getCode(), "获取端口失败"); + } // 注册回调如果rtp收流超时则通过回调发送通知 if (callBack != null) { HookSubscribeForRtpServerTimeout hookSubscribeForRtpServerTimeout = HookSubscribeFactory.on_rtp_server_timeout(stream, String.valueOf(ssrcInt), mediaServerItem.getId()); @@ -121,12 +124,14 @@ public class RtpController { } catch (IOException e) { logger.error("[第三方服务对接->开启收流和获取发流信息] 等待收流超时 callId->{}, 发送回调失败", callId, e); } + hookSubscribe.removeSubscribe(hookSubscribeForRtpServerTimeout); } }); } OtherRtpSendInfo otherRtpSendInfo = new OtherRtpSendInfo(); otherRtpSendInfo.setReceiveIp(mediaServerItem.getSdpIp()); - otherRtpSendInfo.setReceivePort(localPort); + otherRtpSendInfo.setReceivePortForVideo(localPortForVideo); + otherRtpSendInfo.setReceivePortForAudio(localPortForAudio); otherRtpSendInfo.setCallId(callId); otherRtpSendInfo.setStream(stream); @@ -135,11 +140,13 @@ public class RtpController { if (isSend != null && isSend) { String key = VideoManagerConstants.WVP_OTHER_SEND_RTP_INFO + userSetting.getServerId() + "_" + callId; // 预创建发流信息 - int port = sendRtpPortManager.getNextPort(mediaServerItem.getId()); + int portForVideo = sendRtpPortManager.getNextPort(mediaServerItem.getId()); + int portForAudio = sendRtpPortManager.getNextPort(mediaServerItem.getId()); // 将信息写入redis中,以备后用 redisTemplate.opsForValue().set(key, otherRtpSendInfo, 300, TimeUnit.SECONDS); - otherRtpSendInfo.setIp(mediaServerItem.getSdpIp()); - otherRtpSendInfo.setPort(port); + otherRtpSendInfo.setSendLocalIp(mediaServerItem.getSdpIp()); + otherRtpSendInfo.setSendLocalPortForVideo(portForVideo); + otherRtpSendInfo.setSendLocalPortForAudio(portForAudio); logger.info("[第三方服务对接->开启收流和获取发流信息] 结果,callId->{}, {}", callId, otherRtpSendInfo); } return otherRtpSendInfo; @@ -153,6 +160,7 @@ public class RtpController { logger.info("[第三方服务对接->关闭收流] stream->{}", stream); MediaServerItem mediaServerItem = mediaServerService.getDefaultMediaServer(); zlmServerFactory.closeRtpServer(mediaServerItem,stream); + zlmServerFactory.closeRtpServer(mediaServerItem,stream + "_a"); String receiveKey = VideoManagerConstants.WVP_OTHER_RECEIVE_RTP_INFO + userSetting.getServerId() + "_*_" + stream; List scan = RedisUtil.scan(redisTemplate, receiveKey); if (scan.size() > 0) { @@ -167,20 +175,51 @@ public class RtpController { @ResponseBody @Operation(summary = "发送流") @Parameter(name = "ssrc", description = "发送流的SSRC", required = true) - @Parameter(name = "ip", description = "目标IP", required = true) - @Parameter(name = "port", description = "目标端口", required = true) + @Parameter(name = "dstIpForAudio", description = "目标音频收流IP", required = false) + @Parameter(name = "dstIpForVideo", description = "目标视频收流IP", required = false) + @Parameter(name = "dstPortForAudio", description = "目标音频收流端口", required = false) + @Parameter(name = "dstPortForVideo", description = "目标视频收流端口", required = false) @Parameter(name = "app", description = "待发送应用名", required = true) @Parameter(name = "stream", description = "待发送流Id", required = true) @Parameter(name = "callId", description = "整个过程的唯一标识,不传则使用随机端口发流", required = true) - @Parameter(name = "onlyAudio", description = "是否只有音频", required = true) @Parameter(name = "isUdp", description = "是否为UDP", required = true) - @Parameter(name = "streamType", description = "流类型,1为es流,2为ps流, 默认es流", required = false) - @Parameter(name = "pt", description = "rtp的pt", required = true) - public void sendRTP(String ssrc, String ip, Integer port, String app, String stream, String callId, Boolean onlyAudio, Boolean isUdp, @RequestParam(required = false)Integer streamType, Integer pt) { - logger.info("[第三方服务对接->发送流] ssrc->{}, ip->{}, port->{}, app->{}, stream->{}, callId->{}, onlyAudio->{}, streamType->{}, pt->{}", - ssrc, ip, port, app, stream, callId, onlyAudio, streamType == 1? "ES":"PS", pt); - if (ObjectUtils.isEmpty(streamType)) { - streamType = 1; + @Parameter(name = "ptForAudio", description = "rtp的音频pt", required = false) + @Parameter(name = "ptForVideo", description = "rtp的视频pt", required = false) + public void sendRTP(String ssrc, + @RequestParam(required = false)String dstIpForAudio, + @RequestParam(required = false)String dstIpForVideo, + @RequestParam(required = false)Integer dstPortForAudio, + @RequestParam(required = false)Integer dstPortForVideo, + String app, + String stream, + String callId, + Boolean isUdp, + @RequestParam(required = false)Integer ptForAudio, + @RequestParam(required = false)Integer ptForVideo + ) { + logger.info("[第三方服务对接->发送流] " + + "ssrc->{}, \r\n" + + "dstIpForAudio->{}, \n" + + "dstIpForAudio->{}, \n" + + "dstPortForAudio->{}, \n" + + "dstPortForVideo->{}, \n" + + "app->{}, \n" + + "stream->{}, \n" + + "callId->{}, \n" + + "ptForAudio->{}, \n" + + "ptForVideo->{}", + ssrc, + dstIpForAudio, + dstIpForVideo, + dstPortForAudio, + dstPortForVideo, + app, + stream, + callId, + ptForAudio, + ptForVideo); + if (!((dstPortForAudio > 0 && !ObjectUtils.isEmpty(dstPortForAudio) || (dstPortForVideo > 0 && !ObjectUtils.isEmpty(dstIpForVideo))))) { + throw new ControllerException(ErrorCode.ERROR400.getCode(), "至少应该存在一组音频或视频发送参数"); } MediaServerItem mediaServerItem = mediaServerService.getDefaultMediaServer(); String key = VideoManagerConstants.WVP_OTHER_SEND_RTP_INFO + userSetting.getServerId() + "_" + callId; @@ -192,32 +231,74 @@ public class RtpController { sendInfo.setPushStream(stream); sendInfo.setPushSSRC(ssrc); - Map param = new HashMap<>(12); - param.put("vhost","__defaultVhost__"); - param.put("app",app); - param.put("stream",stream); - param.put("ssrc", ssrc); + Map paramForAudio; + Map paramForVideo; + if (!ObjectUtils.isEmpty(dstIpForAudio) && dstPortForAudio > 0) { + paramForAudio = new HashMap<>(); + paramForAudio.put("vhost","__defaultVhost__"); + paramForAudio.put("app",app); + paramForAudio.put("stream",stream); + paramForAudio.put("ssrc", ssrc); - param.put("dst_url",ip); - param.put("dst_port", port); - String is_Udp = isUdp ? "1" : "0"; - param.put("is_udp", is_Udp); - param.put("src_port", sendInfo.getPort()); - param.put("use_ps", streamType==2 ? "1" : "0"); - param.put("only_audio", onlyAudio ? "1" : "0"); - param.put("pt", pt); + paramForAudio.put("dst_url", dstIpForAudio); + paramForAudio.put("dst_port", dstPortForAudio); + String is_Udp = isUdp ? "1" : "0"; + paramForAudio.put("is_udp", is_Udp); + paramForAudio.put("src_port", sendInfo.getSendLocalPortForAudio()); + paramForAudio.put("use_ps", "0"); + paramForAudio.put("only_audio", "1"); + if (ptForAudio != null) { + paramForAudio.put("pt", ptForAudio); + } + + } else { + paramForAudio = null; + } + if (!ObjectUtils.isEmpty(dstIpForVideo) && dstPortForVideo > 0) { + paramForVideo = new HashMap<>(); + paramForVideo.put("vhost","__defaultVhost__"); + paramForVideo.put("app",app); + paramForVideo.put("stream",stream); + paramForVideo.put("ssrc", ssrc); + + paramForVideo.put("dst_url", dstIpForVideo); + paramForVideo.put("dst_port", dstPortForVideo); + String is_Udp = isUdp ? "1" : "0"; + paramForVideo.put("is_udp", is_Udp); + paramForVideo.put("src_port", sendInfo.getSendLocalPortForVideo()); + paramForVideo.put("use_ps", "0"); + paramForVideo.put("only_audio", "0"); + if (ptForVideo != null) { + paramForVideo.put("pt", ptForVideo); + } + + } else { + paramForVideo = null; + } Boolean streamReady = zlmServerFactory.isStreamReady(mediaServerItem, app, stream); if (streamReady) { - logger.info("[第三方服务对接->发送流] 流存在,开始发流,callId->{}", callId); - JSONObject jsonObject = zlmServerFactory.startSendRtpStream(mediaServerItem, param); - if (jsonObject.getInteger("code") == 0) { - logger.info("[第三方服务对接->发送流] 发流成功,callId->{}", callId); - redisTemplate.opsForValue().set(key, sendInfo); - }else { - redisTemplate.delete(key); - logger.info("[第三方服务对接->发送流] 发流失败,callId->{}, {}", callId, jsonObject.getString("msg")); - throw new ControllerException(ErrorCode.ERROR100.getCode(), "[发流失败] " + jsonObject.getString("msg")); + if (paramForVideo != null) { + JSONObject jsonObject = zlmServerFactory.startSendRtpStream(mediaServerItem, paramForVideo); + if (jsonObject.getInteger("code") == 0) { + logger.info("[第三方服务对接->发送流] 视频流发流成功,callId->{},param->{}", callId, paramForVideo); + redisTemplate.opsForValue().set(key, sendInfo); + }else { + redisTemplate.delete(key); + logger.info("[第三方服务对接->发送流] 视频流发流失败,callId->{}, {}", callId, jsonObject.getString("msg")); + throw new ControllerException(ErrorCode.ERROR100.getCode(), "[视频流发流失败] " + jsonObject.getString("msg")); + } + } + if(paramForAudio != null) { + JSONObject jsonObject = zlmServerFactory.startSendRtpStream(mediaServerItem, paramForAudio); + if (jsonObject.getInteger("code") == 0) { + logger.info("[第三方服务对接->发送流] 音频流发流成功,callId->{},param->{}", callId, paramForAudio); + redisTemplate.opsForValue().set(key, sendInfo); + }else { + redisTemplate.delete(key); + logger.info("[第三方服务对接->发送流] 音频流发流失败,callId->{}, {}", callId, jsonObject.getString("msg")); + throw new ControllerException(ErrorCode.ERROR100.getCode(), "[音频流发流失败] " + jsonObject.getString("msg")); + } } }else { logger.info("[第三方服务对接->发送流] 流不存在,等待流上线,callId->{}", callId); @@ -231,21 +312,39 @@ public class RtpController { // 订阅 zlm启动事件, 新的zlm也会从这里进入系统 OtherRtpSendInfo finalSendInfo = sendInfo; + hookSubscribe.removeSubscribe(hookSubscribeForStreamChange); hookSubscribe.addSubscribe(hookSubscribeForStreamChange, (mediaServerItemInUse, response)->{ dynamicTask.stop(uuid); - logger.info("[第三方服务对接->发送流] 流上线,开始发流 callId->{},param->{}", callId, JSONObject.toJSONString(param)); - JSONObject jsonObject = zlmServerFactory.startSendRtpStream(mediaServerItem, param); - System.out.println("========发流结果=========="); - System.out.println(jsonObject); - if (jsonObject.getInteger("code") == 0) { - logger.info("[第三方服务对接->发送流] 发流成功,callId->{}", callId); - redisTemplate.opsForValue().set(key, finalSendInfo); - }else { - redisTemplate.delete(key); - logger.info("[第三方服务对接->发送流] 发流失败,callId->{}, {}", callId, jsonObject.getString("msg")); - throw new ControllerException(ErrorCode.ERROR100.getCode(), "[发流失败] " + jsonObject.getString("msg")); + logger.info("[第三方服务对接->发送流] 流上线,开始发流 callId->{}", callId); + try { + Thread.sleep(400); + } catch (InterruptedException e) { + throw new RuntimeException(e); } + if (paramForVideo != null) { + JSONObject jsonObject = zlmServerFactory.startSendRtpStream(mediaServerItem, paramForVideo); + if (jsonObject.getInteger("code") == 0) { + logger.info("[第三方服务对接->发送流] 视频流发流成功,callId->{},param->{}", callId, paramForVideo); + redisTemplate.opsForValue().set(key, finalSendInfo); + }else { + redisTemplate.delete(key); + logger.info("[第三方服务对接->发送流] 视频流发流失败,callId->{}, {}", callId, jsonObject.getString("msg")); + throw new ControllerException(ErrorCode.ERROR100.getCode(), "[视频流发流失败] " + jsonObject.getString("msg")); + } + } + if(paramForAudio != null) { + JSONObject jsonObject = zlmServerFactory.startSendRtpStream(mediaServerItem, paramForAudio); + if (jsonObject.getInteger("code") == 0) { + logger.info("[第三方服务对接->发送流] 音频流发流成功,callId->{},param->{}", callId, paramForAudio); + redisTemplate.opsForValue().set(key, finalSendInfo); + }else { + redisTemplate.delete(key); + logger.info("[第三方服务对接->发送流] 音频流发流失败,callId->{}, {}", callId, jsonObject.getString("msg")); + throw new ControllerException(ErrorCode.ERROR100.getCode(), "[音频流发流失败] " + jsonObject.getString("msg")); + } + } + hookSubscribe.removeSubscribe(hookSubscribeForStreamChange); }); } } @@ -274,6 +373,7 @@ public class RtpController { }else { logger.info("[第三方服务对接->关闭发送流] 成功 callId->{}", callId); } + redisTemplate.delete(key); } }