diff --git a/sql/2.6.8升级2.6.9.sql b/sql/2.6.8升级2.6.9.sql index e633d63a..a398cb8d 100644 --- a/sql/2.6.8升级2.6.9.sql +++ b/sql/2.6.8升级2.6.9.sql @@ -204,6 +204,12 @@ alter table log alter table media_server change hookIp hook_ip varchar(50) not null; +alter table media_server + add send_rtp_port_range varchar(50) not null; + +alter table media_server + add column send_rtp_port_range varchar(50) default null; + alter table media_server change sdpIp sdp_ip varchar(50) not null; diff --git a/sql/初始化.sql b/sql/初始化.sql index 3eed0dd4..421760fe 100644 --- a/sql/初始化.sql +++ b/sql/初始化.sql @@ -153,6 +153,7 @@ create table wvp_media_server ( secret character varying(50), rtp_enable bool default false, rtp_port_range character varying(50), + send_rtp_port_range character varying(50), record_assist_port integer, default_server bool default false, create_time character varying(50), diff --git a/src/main/java/com/genersoft/iot/vmp/conf/MediaConfig.java b/src/main/java/com/genersoft/iot/vmp/conf/MediaConfig.java index 62baa71b..fca6d63a 100644 --- a/src/main/java/com/genersoft/iot/vmp/conf/MediaConfig.java +++ b/src/main/java/com/genersoft/iot/vmp/conf/MediaConfig.java @@ -75,6 +75,9 @@ public class MediaConfig{ @Value("${media.rtp.port-range}") private String rtpPortRange; + @Value("${media.rtp.send-port-range}") + private String rtpSendPortRange; + @Value("${media.record-assist-port:0}") private Integer recordAssistPort = 0; @@ -206,6 +209,7 @@ public class MediaConfig{ mediaServerItem.setSecret(secret); mediaServerItem.setRtpEnable(rtpEnable); mediaServerItem.setRtpPortRange(rtpPortRange); + mediaServerItem.setSendRtpPortRange(rtpSendPortRange); mediaServerItem.setRecordAssistPort(recordAssistPort); mediaServerItem.setHookAliveInterval(30.00f); @@ -215,6 +219,14 @@ public class MediaConfig{ return mediaServerItem; } + public String getRtpSendPortRange() { + return rtpSendPortRange; + } + + public void setRtpSendPortRange(String rtpSendPortRange) { + this.rtpSendPortRange = rtpSendPortRange; + } + private boolean isValidIPAddress(String ipAddress) { if ((ipAddress != null) && (!ipAddress.isEmpty())) { return Pattern.matches("^([1-9]|[1-9]\\d|1\\d{2}|2[0-4]\\d|25[0-5])(\\.(\\d|[1-9]\\d|1\\d{2}|2[0-4]\\d|25[0-5])){3}$", ipAddress); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java index c50498b3..cc1f0c0c 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java @@ -109,6 +109,8 @@ public class AckRequestProcessor extends SIPRequestProcessorParent implements In param.put("app",sendRtpItem.getApp()); param.put("stream",sendRtpItem.getStream()); param.put("ssrc", sendRtpItem.getSsrc()); + param.put("dst_url",sendRtpItem.getIp()); + param.put("dst_port", sendRtpItem.getPort()); param.put("src_port", sendRtpItem.getLocalPort()); param.put("pt", sendRtpItem.getPt()); param.put("use_ps", sendRtpItem.isUsePs() ? "1" : "0"); @@ -131,16 +133,12 @@ public class AckRequestProcessor extends SIPRequestProcessorParent implements In // 如果是非严格模式,需要关闭端口占用 JSONObject startSendRtpStreamResult = null; if (sendRtpItem.getLocalPort() != 0) { - HookSubscribeForRtpServerTimeout hookSubscribeForRtpServerTimeout = HookSubscribeFactory.on_rtp_server_timeout(sendRtpItem.getSsrc(), null, mediaInfo.getId()); - hookSubscribe.removeSubscribe(hookSubscribeForRtpServerTimeout); - if (zlmrtpServerFactory.releasePort(mediaInfo, sendRtpItem.getSsrc())) { - if (sendRtpItem.isTcpActive()) { - startSendRtpStreamResult = zlmrtpServerFactory.startSendRtpPassive(mediaInfo, param); - }else { - param.put("dst_url", sendRtpItem.getIp()); - param.put("dst_port", sendRtpItem.getPort()); - startSendRtpStreamResult = zlmrtpServerFactory.startSendRtpStream(mediaInfo, param); - } + if (sendRtpItem.isTcpActive()) { + startSendRtpStreamResult = zlmrtpServerFactory.startSendRtpPassive(mediaInfo, param); + }else { + param.put("dst_url", sendRtpItem.getIp()); + param.put("dst_port", sendRtpItem.getPort()); + startSendRtpStreamResult = zlmrtpServerFactory.startSendRtpStream(mediaInfo, param); } }else { if (sendRtpItem.isTcpActive()) { diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java index 8a94d681..93e5540c 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java @@ -375,9 +375,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements } logger.info("[上级Invite] {}, 平台:{}, 通道:{}, 收流地址:{}:{},收流方式:{}, ssrc:{}", sessionName, username, channelId, addressStr, port, streamTypeStr, ssrc); SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(mediaServerItem, addressStr, port, ssrc, requesterId, - device.getDeviceId(), channelId, mediaTransmissionTCP, platform.isRtcp(), ssrcFromCallback -> { - return redisCatchStorage.querySendRTPServer(platform.getServerGBId(), channelId, null, callIdHeader.getCallId()) != null; - }); + device.getDeviceId(), channelId, mediaTransmissionTCP, platform.isRtcp()); if (tcpActive != null) { sendRtpItem.setTcpActive(tcpActive); @@ -588,9 +586,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements if (streamReady != null && streamReady) { // 自平台内容 SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(mediaServerItem, addressStr, port, ssrc, requesterId, - gbStream.getApp(), gbStream.getStream(), channelId, mediaTransmissionTCP, platform.isRtcp(), ssrcFromCallback ->{ - return redisCatchStorage.querySendRTPServer(platform.getServerGBId(), channelId, null, callIdHeader.getCallId()) != null; - }); + gbStream.getApp(), gbStream.getStream(), channelId, mediaTransmissionTCP, platform.isRtcp()); if (sendRtpItem == null) { logger.warn("服务器端口资源不足"); @@ -630,9 +626,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements if (streamReady != null && streamReady) { // 自平台内容 SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(mediaServerItem, addressStr, port, ssrc, requesterId, - gbStream.getApp(), gbStream.getStream(), channelId, mediaTransmissionTCP, platform.isRtcp(), ssrcFromCallback ->{ - return redisCatchStorage.querySendRTPServer(platform.getServerGBId(), channelId, null, callIdHeader.getCallId()) != null; - }); + gbStream.getApp(), gbStream.getStream(), channelId, mediaTransmissionTCP, platform.isRtcp()); if (sendRtpItem == null) { logger.warn("服务器端口资源不足"); @@ -748,9 +742,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements dynamicTask.stop(callIdHeader.getCallId()); if (serverId.equals(userSetting.getServerId())) { SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(mediaServerItem, addressStr, finalPort, ssrc, requesterId, - app, stream, channelId, mediaTransmissionTCP, platform.isRtcp(), ssrcFromCallback -> { - return redisCatchStorage.querySendRTPServer(platform.getServerGBId(), channelId, null, callIdHeader.getCallId()) != null; - }); + app, stream, channelId, mediaTransmissionTCP, platform.isRtcp()); if (sendRtpItem == null) { logger.warn("上级点时创建sendRTPItem失败,可能是服务器端口资源不足"); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForCatalogProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForCatalogProcessor.java index 907cd666..281d780e 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForCatalogProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForCatalogProcessor.java @@ -2,6 +2,7 @@ package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl; import com.genersoft.iot.vmp.conf.CivilCodeFileConf; import com.genersoft.iot.vmp.conf.DynamicTask; +import com.genersoft.iot.vmp.conf.SipConfig; import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.gb28181.bean.Device; import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel; @@ -63,6 +64,9 @@ public class NotifyRequestForCatalogProcessor extends SIPRequestProcessorParent @Autowired private CivilCodeFileConf civilCodeFileConf; + @Autowired + private SipConfig sipConfig; + private final static String talkKey = "notify-request-for-catalog-task"; public void process(RequestEvent evt) { @@ -104,7 +108,13 @@ public class NotifyRequestForCatalogProcessor extends SIPRequestProcessorParent event = eventElement.getText().toUpperCase(); } DeviceChannel channel = XmlUtil.channelContentHandler(itemDevice, device, event, civilCodeFileConf); - + if (channel == null) { + logger.info("[收到目录订阅]:但是解析失败 {}", new String(evt.getRequest().getRawContent())); + continue; + } + if (channel.getParentId() != null && channel.getParentId().equals(sipConfig.getId())) { + channel.setParentId(null); + } channel.setDeviceId(device.getDeviceId()); logger.info("[收到目录订阅]:{}/{}", device.getDeviceId(), channel.getChannelId()); switch (event) { diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java index bc34bbd3..d44ff5a8 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java @@ -413,6 +413,13 @@ public class NotifyRequestProcessor extends SIPRequestProcessorParent implements event = eventElement.getText().toUpperCase(); } DeviceChannel channel = XmlUtil.channelContentHandler(itemDevice, device, event, civilCodeFileConf); + if (channel == null) { + logger.info("[收到目录订阅]:但是解析失败 {}", new String(evt.getRequest().getRawContent())); + continue; + } + if (channel.getParentId() != null && channel.getParentId().equals(sipConfig.getId())) { + channel.setParentId(null); + } channel.setDeviceId(device.getDeviceId()); logger.info("[收到目录订阅]:{}/{}", device.getDeviceId(), channel.getChannelId()); switch (event) { diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/CatalogResponseMessageHandler.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/CatalogResponseMessageHandler.java index 20cb59cc..2283fa2e 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/CatalogResponseMessageHandler.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/CatalogResponseMessageHandler.java @@ -1,6 +1,7 @@ package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.response.cmd; import com.genersoft.iot.vmp.conf.CivilCodeFileConf; +import com.genersoft.iot.vmp.conf.SipConfig; import com.genersoft.iot.vmp.gb28181.bean.*; import com.genersoft.iot.vmp.gb28181.session.CatalogDataCatch; import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent; @@ -58,6 +59,9 @@ public class CatalogResponseMessageHandler extends SIPRequestProcessorParent imp @Autowired private CivilCodeFileConf civilCodeFileConf; + @Autowired + private SipConfig sipConfig; + @Override public void afterPropertiesSet() throws Exception { responseMessageHandler.addHandler(cmdType, this); @@ -113,11 +117,18 @@ public class CatalogResponseMessageHandler extends SIPRequestProcessorParent imp if (channelDeviceElement == null) { continue; } - DeviceChannel deviceChannel = XmlUtil.channelContentHandler(itemDevice, device, null, civilCodeFileConf); - deviceChannel = SipUtils.updateGps(deviceChannel, device.getGeoCoordSys()); - deviceChannel.setDeviceId(take.getDevice().getDeviceId()); + DeviceChannel channel = XmlUtil.channelContentHandler(itemDevice, device, null, civilCodeFileConf); + if (channel == null) { + logger.info("[收到目录订阅]:但是解析失败 {}", new String(evt.getRequest().getRawContent())); + continue; + } + if (channel.getParentId() != null && channel.getParentId().equals(sipConfig.getId())) { + channel.setParentId(null); + } + SipUtils.updateGps(channel, device.getGeoCoordSys()); + channel.setDeviceId(take.getDevice().getDeviceId()); - channelList.add(deviceChannel); + channelList.add(channel); } int sn = Integer.parseInt(snElement.getText()); catalogDataCatch.put(take.getDevice().getDeviceId(), sn, sumNum, take.getDevice(), channelList); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/utils/XmlUtil.java b/src/main/java/com/genersoft/iot/vmp/gb28181/utils/XmlUtil.java index 7914ffac..a83e7c9f 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/utils/XmlUtil.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/utils/XmlUtil.java @@ -8,6 +8,7 @@ import com.genersoft.iot.vmp.gb28181.bean.Device; import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel; import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent; import com.genersoft.iot.vmp.utils.DateUtil; +import org.apache.commons.lang3.math.NumberUtils; import org.dom4j.Attribute; import org.dom4j.Document; import org.dom4j.DocumentException; @@ -317,7 +318,6 @@ public class XmlUtil { deviceChannel.setBusinessGroupId(businessGroupID); } - if (!ObjectUtils.isEmpty(parentID)) { if (parentID.contains("/")) { String[] parentIdArray = parentID.split("/"); @@ -341,7 +341,11 @@ public class XmlUtil { if (!ObjectUtils.isEmpty(owner)) { deviceChannel.setOwner(owner); } - if (!ObjectUtils.isEmpty(civilCode)) { + if (!ObjectUtils.isEmpty(civilCode) + && civilCode.length() <= 8 + && NumberUtils.isParsable(civilCode) + && Integer.parseInt(civilCode)%2 == 0 + ) { deviceChannel.setCivilCode(civilCode); } if (!ObjectUtils.isEmpty(businessGroupID)) { @@ -387,8 +391,8 @@ public class XmlUtil { if (!ObjectUtils.isEmpty(businessGroupID)) { deviceChannel.setParentId(businessGroupID); }else { - if (!ObjectUtils.isEmpty(civilCode)) { - deviceChannel.setParentId(civilCode); + if (!ObjectUtils.isEmpty(deviceChannel.getCivilCode())) { + deviceChannel.setParentId(deviceChannel.getCivilCode()); } } } diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/SendRtpPortManager.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/SendRtpPortManager.java new file mode 100644 index 00000000..8366a4a2 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/SendRtpPortManager.java @@ -0,0 +1,55 @@ +package com.genersoft.iot.vmp.media.zlm; + +import com.genersoft.iot.vmp.conf.UserSetting; +import com.genersoft.iot.vmp.media.zlm.dto.MediaSendRtpPortInfo; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.data.redis.core.RedisTemplate; +import org.springframework.stereotype.Component; + +@Component +public class SendRtpPortManager { + + private final static Logger logger = LoggerFactory.getLogger(SendRtpPortManager.class); + + @Autowired + private UserSetting userSetting; + + @Autowired + private RedisTemplate redisTemplate; + + private final String KEY = "VM_MEDIA_SEND_RTP_PORT_RANGE_"; + + + public void initServerPort(String mediaServerId, int startPort, int endPort){ + String key = KEY + userSetting.getServerId() + "_" + mediaServerId; + MediaSendRtpPortInfo mediaSendRtpPortInfo = new MediaSendRtpPortInfo(startPort, endPort, mediaServerId); + redisTemplate.opsForValue().set(key, mediaSendRtpPortInfo); + } + + public int getNextPort(String mediaServerId) { + String key = KEY + userSetting.getServerId() + "_" + mediaServerId; + MediaSendRtpPortInfo mediaSendRtpPortInfo = (MediaSendRtpPortInfo)redisTemplate.opsForValue().get(key); + if (mediaSendRtpPortInfo == null) { + logger.warn("[发送端口管理] 获取{}的发送端口时未找到端口信息", mediaSendRtpPortInfo); + return 0; + } + int port; + if (mediaSendRtpPortInfo.getCurrent() %2 != 0) { + port = mediaSendRtpPortInfo.getCurrent() + 1; + }else { + port = mediaSendRtpPortInfo.getCurrent() + 2; + } + if (port > mediaSendRtpPortInfo.getEnd()) { + if (mediaSendRtpPortInfo.getStart() %2 != 0) { + port = mediaSendRtpPortInfo.getStart() + 1; + }else { + port = mediaSendRtpPortInfo.getStart(); + } + } + mediaSendRtpPortInfo.setCurrent(port); + redisTemplate.opsForValue().set(key, mediaSendRtpPortInfo); + return port; + } +} diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java index e39474f4..52e7953e 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java @@ -1,22 +1,18 @@ package com.genersoft.iot.vmp.media.zlm; import com.alibaba.fastjson2.JSON; -import com.alibaba.fastjson2.JSONArray; import com.alibaba.fastjson2.JSONObject; import com.genersoft.iot.vmp.common.CommonCallback; import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem; -import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory; -import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForRtpServerTimeout; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; -import com.genersoft.iot.vmp.media.zlm.dto.hook.HookParam; -import com.genersoft.iot.vmp.media.zlm.dto.hook.OnRtpServerTimeoutHookParam; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; -import java.util.*; +import java.util.HashMap; +import java.util.Map; @Component public class ZLMRTPServerFactory { @@ -32,68 +28,9 @@ public class ZLMRTPServerFactory { @Autowired private ZlmHttpHookSubscribe hookSubscribe; - private int[] portRangeArray = new int[2]; + @Autowired + private SendRtpPortManager sendRtpPortManager; - public int getFreePort(MediaServerItem mediaServerItem, int startPort, int endPort, List usedFreelist) { - if (endPort <= startPort) { - return -1; - } - if (usedFreelist == null) { - usedFreelist = new ArrayList<>(); - } - JSONObject listRtpServerJsonResult = zlmresTfulUtils.listRtpServer(mediaServerItem); - if (listRtpServerJsonResult != null) { - JSONArray data = listRtpServerJsonResult.getJSONArray("data"); - if (data != null) { - for (int i = 0; i < data.size(); i++) { - JSONObject dataItem = data.getJSONObject(i); - usedFreelist.add(dataItem.getInteger("port")); - } - } - } - - Map param = new HashMap<>(); - int result = -1; - // 设置推流端口 - if (startPort%2 == 1) { - startPort ++; - } - boolean checkPort = false; - for (int i = startPort; i < endPort + 1; i+=2) { - if (!usedFreelist.contains(i)){ - checkPort = true; - startPort = i; - break; - } - } - if (!checkPort) { - logger.warn("未找到节点{}上范围[{}-{}]的空闲端口", mediaServerItem.getId(), startPort, endPort); - return -1; - } - param.put("port", startPort); - String stream = UUID.randomUUID().toString(); - param.put("enable_tcp", 1); - param.put("stream_id", stream); -// param.put("port", 0); - JSONObject openRtpServerResultJson = zlmresTfulUtils.openRtpServer(mediaServerItem, param); - - if (openRtpServerResultJson != null) { - if (openRtpServerResultJson.getInteger("code") == 0) { - result= openRtpServerResultJson.getInteger("port"); - Map closeRtpServerParam = new HashMap<>(); - closeRtpServerParam.put("stream_id", stream); - zlmresTfulUtils.closeRtpServer(mediaServerItem, closeRtpServerParam); - }else { - usedFreelist.add(startPort); - startPort +=2; - result = getFreePort(mediaServerItem, startPort, endPort,usedFreelist); - } - }else { - // 检查ZLM状态 - logger.error("创建RTP Server 失败 {}: 请检查ZLM服务", param.get("port")); - } - return result; - } /** * 开启rtpServer @@ -226,16 +163,14 @@ public class ZLMRTPServerFactory { * @return SendRtpItem */ public SendRtpItem createSendRtpItem(MediaServerItem serverItem, String ip, int port, String ssrc, String platformId, - String deviceId, String channelId, boolean tcp, boolean rtcp, KeepPortCallback callback){ + String deviceId, String channelId, boolean tcp, boolean rtcp){ // 默认为随机端口 int localPort = 0; if (userSetting.getGbSendStreamStrict()) { - if (userSetting.getGbSendStreamStrict()) { - localPort = keepPort(serverItem, ssrc, localPort, callback); - if (localPort == 0) { - return null; - } + localPort = sendRtpPortManager.getNextPort(serverItem.getId()); + if (localPort == 0) { + return null; } } SendRtpItem sendRtpItem = new SendRtpItem(); @@ -265,11 +200,11 @@ public class ZLMRTPServerFactory { * @return SendRtpItem */ public SendRtpItem createSendRtpItem(MediaServerItem serverItem, String ip, int port, String ssrc, String platformId, - String app, String stream, String channelId, boolean tcp, boolean rtcp, KeepPortCallback callback){ + String app, String stream, String channelId, boolean tcp, boolean rtcp){ // 默认为随机端口 int localPort = 0; if (userSetting.getGbSendStreamStrict()) { - localPort = keepPort(serverItem, ssrc, localPort, callback); + localPort = sendRtpPortManager.getNextPort(serverItem.getId()); if (localPort == 0) { return null; } @@ -290,58 +225,6 @@ public class ZLMRTPServerFactory { return sendRtpItem; } - public interface KeepPortCallback{ - Boolean keep(String ssrc); - } - - /** - * 保持端口,直到需要需要发流时再释放 - */ - public int keepPort(MediaServerItem serverItem, String ssrc, int localPort, KeepPortCallback keepPortCallback) { - Map param = new HashMap<>(3); - param.put("port", localPort); - param.put("enable_tcp", 1); - param.put("stream_id", ssrc); - JSONObject jsonObject = zlmresTfulUtils.openRtpServer(serverItem, param); - if (jsonObject.getInteger("code") == 0) { - localPort = jsonObject.getInteger("port"); - HookSubscribeForRtpServerTimeout hookSubscribeForRtpServerTimeout = HookSubscribeFactory.on_rtp_server_timeout(ssrc, null, serverItem.getId()); - // 订阅 zlm启动事件, 新的zlm也会从这里进入系统 - int finalLocalPort = localPort; - hookSubscribe.addSubscribe(hookSubscribeForRtpServerTimeout, - (MediaServerItem mediaServerItem, HookParam hookParam)->{ - logger.info("[上级点播] {}->监听端口到期继续保持监听: {}", ssrc, finalLocalPort); - OnRtpServerTimeoutHookParam rtpServerTimeoutHookParam = (OnRtpServerTimeoutHookParam) hookParam; - if (ssrc.equals(rtpServerTimeoutHookParam.getStream_id())) { - if (keepPortCallback.keep(ssrc)) { - logger.info("[上级点播] {}->监听端口到期继续保持监听", ssrc); - keepPort(serverItem, ssrc, finalLocalPort, keepPortCallback); - }else { - logger.info("[上级点播] {}->发送取消,无需继续监听", ssrc); - releasePort(serverItem, ssrc); - } - } - }); - logger.info("[上级点播] {}->: {}", ssrc, localPort); - return localPort; - }else { - logger.info("[上级点播] 监听端口失败: {}->{}", ssrc, localPort); - return 0; - } - } - - /** - * 释放保持的端口 - */ - public boolean releasePort(MediaServerItem serverItem, String ssrc) { - logger.info("[保持端口] {}->释放监听端口", ssrc); - boolean closeRTPServerResult = closeRtpServer(serverItem, ssrc); - HookSubscribeForRtpServerTimeout hookSubscribeForRtpServerTimeout = HookSubscribeFactory.on_rtp_server_timeout(ssrc, null, serverItem.getId()); - // 订阅 zlm启动事件, 新的zlm也会从这里进入系统 - hookSubscribe.removeSubscribe(hookSubscribeForRtpServerTimeout); - return closeRTPServerResult; - } - /** * 调用zlm RESTFUL API —— startSendRtp */ diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/MediaSendRtpPortInfo.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/MediaSendRtpPortInfo.java new file mode 100644 index 00000000..2e9f6317 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/MediaSendRtpPortInfo.java @@ -0,0 +1,50 @@ +package com.genersoft.iot.vmp.media.zlm.dto; + +public class MediaSendRtpPortInfo { + + private int start; + private int end; + private String mediaServerId; + + private int current; + + + public MediaSendRtpPortInfo(int start, int end, String mediaServerId) { + this.start = start; + this.current = start; + this.end = end; + this.mediaServerId = mediaServerId; + } + + public int getStart() { + return start; + } + + public void setStart(int start) { + this.start = start; + } + + public int getEnd() { + return end; + } + + public void setEnd(int end) { + this.end = end; + } + + public String getMediaServerId() { + return mediaServerId; + } + + public void setMediaServerId(String mediaServerId) { + this.mediaServerId = mediaServerId; + } + + public int getCurrent() { + return current; + } + + public void setCurrent(int current) { + this.current = current; + } +} diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/MediaServerItem.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/MediaServerItem.java index e6bbb5fa..066a6776 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/MediaServerItem.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/MediaServerItem.java @@ -62,6 +62,9 @@ public class MediaServerItem{ @Schema(description = "多端口RTP收流端口范围") private String rtpPortRange; + @Schema(description = "RTP发流端口范围") + private String sendRtpPortRange; + @Schema(description = "assist服务端口") private int recordAssistPort; @@ -297,4 +300,12 @@ public class MediaServerItem{ public void setHookAliveInterval(Float hookAliveInterval) { this.hookAliveInterval = hookAliveInterval; } + + public String getSendRtpPortRange() { + return sendRtpPortRange; + } + + public void setSendRtpPortRange(String sendRtpPortRange) { + this.sendRtpPortRange = sendRtpPortRange; + } } diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java index 83c19044..2fd13ecc 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java @@ -11,10 +11,7 @@ import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.conf.exception.ControllerException; import com.genersoft.iot.vmp.gb28181.event.EventPublisher; import com.genersoft.iot.vmp.gb28181.session.SSRCFactory; -import com.genersoft.iot.vmp.media.zlm.AssistRESTfulUtils; -import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils; -import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory; -import com.genersoft.iot.vmp.media.zlm.ZLMServerConfig; +import com.genersoft.iot.vmp.media.zlm.*; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import com.genersoft.iot.vmp.media.zlm.dto.ServerKeepaliveData; import com.genersoft.iot.vmp.service.IInviteStreamService; @@ -70,6 +67,9 @@ public class MediaServerServiceImpl implements IMediaServerService { @Autowired private UserSetting userSetting; + @Autowired + private SendRtpPortManager sendRtpPortManager; + @Autowired private AssistRESTfulUtils assistRESTfulUtils; @@ -121,13 +121,40 @@ public class MediaServerServiceImpl implements IMediaServerService { if (ssrcFactory.hasMediaServerSSRC(mediaServerItem.getId())) { ssrcFactory.initMediaServerSSRC(mediaServerItem.getId(), null); } + if (userSetting.getGbSendStreamStrict()) { + int startPort = 50000; + int endPort = 60000; + String sendRtpPortRange = mediaServerItem.getSendRtpPortRange(); + if (sendRtpPortRange == null) { + logger.warn("[zlm] ] 未配置发流端口范围,默认使用50000到60000"); + }else { + String[] sendRtpPortRangeArray = sendRtpPortRange.trim().split(","); + if (sendRtpPortRangeArray.length != 2) { + logger.warn("[zlm] ] 发流端口范围错误,默认使用50000到60000"); + }else { + try { + startPort = Integer.parseInt(sendRtpPortRangeArray[0]); + endPort = Integer.parseInt(sendRtpPortRangeArray[1]); + if (endPort <= startPort) { + logger.warn("[zlm] ] 发流端口范围错误,结束端口应大于开始端口,使用默认端口"); + startPort = 50000; + endPort = 60000; + } + + }catch (NumberFormatException e) { + logger.warn("[zlm] ] 发流端口范围错误,默认使用50000到60000"); + } + } + } + logger.info("[[zlm] ] 配置发流端口范围,{}-{}", startPort, endPort); + sendRtpPortManager.initServerPort(mediaServerItem.getId(), startPort, endPort); + } // 查询redis是否存在此mediaServer String key = VideoManagerConstants.MEDIA_SERVER_PREFIX + userSetting.getServerId() + "_" + mediaServerItem.getId(); Boolean hasKey = redisTemplate.hasKey(key); if (hasKey != null && ! hasKey) { redisTemplate.opsForValue().set(key, mediaServerItem); } - } } diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGbPlayMsgListener.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGbPlayMsgListener.java index a40bb3b1..d46f9097 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGbPlayMsgListener.java +++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGbPlayMsgListener.java @@ -314,9 +314,7 @@ public class RedisGbPlayMsgListener implements MessageListener { SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(mediaServerItem, content.getIp(), content.getPort(), content.getSsrc(), content.getPlatformId(), content.getApp(), content.getStream(), content.getChannelId(), - content.getTcp(), content.getRtcp(), ssrcFromCallback -> { - return querySendRTPServer(content.getPlatformId(), content.getChannelId(), content.getStream(), null) != null; - }); + content.getTcp(), content.getRtcp()); WVPResult result = new WVPResult<>(); result.setCode(0); diff --git a/src/main/java/com/genersoft/iot/vmp/storager/dao/MediaServerMapper.java b/src/main/java/com/genersoft/iot/vmp/storager/dao/MediaServerMapper.java index ca0d98d0..cdc303dd 100644 --- a/src/main/java/com/genersoft/iot/vmp/storager/dao/MediaServerMapper.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/dao/MediaServerMapper.java @@ -28,6 +28,7 @@ public interface MediaServerMapper { "secret,"+ "rtp_enable,"+ "rtp_port_range,"+ + "send_rtp_port_range,"+ "record_assist_port,"+ "default_server,"+ "create_time,"+ @@ -51,6 +52,7 @@ public interface MediaServerMapper { "#{secret}, " + "#{rtpEnable}, " + "#{rtpPortRange}, " + + "#{sendRtpPortRange}, " + "#{recordAssistPort}, " + "#{defaultServer}, " + "#{createTime}, " + @@ -75,6 +77,7 @@ public interface MediaServerMapper { ", auto_config=#{autoConfig}" + ", rtp_enable=#{rtpEnable}" + ", rtp_port_range=#{rtpPortRange}" + + ", send_rtp_port_range=#{sendRtpPortRange}" + ", secret=#{secret}" + ", record_assist_port=#{recordAssistPort}" + ", hook_alive_interval=#{hookAliveInterval}" + @@ -98,6 +101,7 @@ public interface MediaServerMapper { ", auto_config=#{autoConfig}" + ", rtp_enable=#{rtpEnable}" + ", rtp_port_range=#{rtpPortRange}" + + ", send_rtp_port_range=#{sendRtpPortRange}" + ", secret=#{secret}" + ", record_assist_port=#{recordAssistPort}" + ", hook_alive_interval=#{hookAliveInterval}" + diff --git a/web_src/src/components/dialog/MediaServerEdit.vue b/web_src/src/components/dialog/MediaServerEdit.vue index 15923c1f..9808a1c2 100644 --- a/web_src/src/components/dialog/MediaServerEdit.vue +++ b/web_src/src/components/dialog/MediaServerEdit.vue @@ -81,6 +81,7 @@ + @@ -89,6 +90,11 @@ - + + + - + + @@ -172,6 +178,7 @@ export default { rtmpSSlPort: "", rtpEnable: false, rtpPortRange: "", + sendRtpPortRange: "", rtpProxyPort: "", rtspPort: "", rtspSSLPort: "", @@ -179,6 +186,9 @@ export default { rtpPortRange1:30000, rtpPortRange2:30500, + sendRtpPortRange1:50000, + sendRtpPortRange2:60000, + rules: { ip: [{ required: true, validator: isValidIp, message: '请输入有效的IP地址', trigger: 'blur' }], httpPort: [{ required: true, validator: isValidPort, message: '请输入有效的端口号', trigger: 'blur' }], @@ -214,10 +224,15 @@ export default { this.currentStep = 3; if (param.rtpPortRange) { let rtpPortRange = this.mediaServerForm.rtpPortRange.split(","); + let sendRtpPortRange = this.mediaServerForm.sendRtpPortRange.split(","); if (rtpPortRange.length > 0) { this.rtpPortRange1 = rtpPortRange[0] this.rtpPortRange2 = rtpPortRange[1] } + if (sendRtpPortRange.length > 0) { + this.sendRtpPortRange1 = sendRtpPortRange[0] + this.sendRtpPortRange2 = sendRtpPortRange[1] + } } } }, @@ -240,6 +255,8 @@ export default { that.mediaServerForm.autoConfig = true; that.rtpPortRange1 = 30000 that.rtpPortRange2 = 30500 + that.sendRtpPortRange1 = 50000 + that.sendRtpPortRange2 = 60000 that.serverCheck = 1; }else { that.serverCheck = -1; @@ -321,12 +338,15 @@ export default { rtmpSSlPort: "", rtpEnable: false, rtpPortRange: "", + sendRtpPortRange: "", rtpProxyPort: "", rtspPort: "", rtspSSLPort: "", }; this.rtpPortRange1 = 30500; this.rtpPortRange2 = 30500; + this.sendRtpPortRange1 = 50000; + this.sendRtpPortRange2 = 60000; this.listChangeCallback = null this.currentStep = 1; }, @@ -351,7 +371,7 @@ export default { portRangeChange: function() { if (this.mediaServerForm.rtpEnable) { this.mediaServerForm.rtpPortRange = this.rtpPortRange1 + "," + this.rtpPortRange2 - console.log(this.mediaServerForm.rtpPortRange) + this.mediaServerForm.sendRtpPortRange = this.sendRtpPortRange1 + "," + this.sendRtpPortRange2 } } }, diff --git a/web_src/src/components/dialog/StreamProxyEdit.vue b/web_src/src/components/dialog/StreamProxyEdit.vue index 588f1142..f56134e1 100644 --- a/web_src/src/components/dialog/StreamProxyEdit.vue +++ b/web_src/src/components/dialog/StreamProxyEdit.vue @@ -82,17 +82,21 @@ + - - - - - + 不做处理 + 停用 + 移除 + + + + + + + + + +