Merge branch 'wvp-28181-2.0' into main-dev

# Conflicts:
#	src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java
#	src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java
#	src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java
结构优化
648540858 2023-07-01 18:28:28 +08:00
commit 975ace051b
18 changed files with 270 additions and 177 deletions

View File

@ -204,6 +204,12 @@ alter table log
alter table media_server alter table media_server
change hookIp hook_ip varchar(50) not null; change hookIp hook_ip varchar(50) not null;
alter table media_server
add send_rtp_port_range varchar(50) not null;
alter table media_server
add column send_rtp_port_range varchar(50) default null;
alter table media_server alter table media_server
change sdpIp sdp_ip varchar(50) not null; change sdpIp sdp_ip varchar(50) not null;

View File

@ -153,6 +153,7 @@ create table wvp_media_server (
secret character varying(50), secret character varying(50),
rtp_enable bool default false, rtp_enable bool default false,
rtp_port_range character varying(50), rtp_port_range character varying(50),
send_rtp_port_range character varying(50),
record_assist_port integer, record_assist_port integer,
default_server bool default false, default_server bool default false,
create_time character varying(50), create_time character varying(50),

View File

@ -75,6 +75,9 @@ public class MediaConfig{
@Value("${media.rtp.port-range}") @Value("${media.rtp.port-range}")
private String rtpPortRange; private String rtpPortRange;
@Value("${media.rtp.send-port-range}")
private String rtpSendPortRange;
@Value("${media.record-assist-port:0}") @Value("${media.record-assist-port:0}")
private Integer recordAssistPort = 0; private Integer recordAssistPort = 0;
@ -206,6 +209,7 @@ public class MediaConfig{
mediaServerItem.setSecret(secret); mediaServerItem.setSecret(secret);
mediaServerItem.setRtpEnable(rtpEnable); mediaServerItem.setRtpEnable(rtpEnable);
mediaServerItem.setRtpPortRange(rtpPortRange); mediaServerItem.setRtpPortRange(rtpPortRange);
mediaServerItem.setSendRtpPortRange(rtpSendPortRange);
mediaServerItem.setRecordAssistPort(recordAssistPort); mediaServerItem.setRecordAssistPort(recordAssistPort);
mediaServerItem.setHookAliveInterval(30.00f); mediaServerItem.setHookAliveInterval(30.00f);
@ -215,6 +219,14 @@ public class MediaConfig{
return mediaServerItem; return mediaServerItem;
} }
public String getRtpSendPortRange() {
return rtpSendPortRange;
}
public void setRtpSendPortRange(String rtpSendPortRange) {
this.rtpSendPortRange = rtpSendPortRange;
}
private boolean isValidIPAddress(String ipAddress) { private boolean isValidIPAddress(String ipAddress) {
if ((ipAddress != null) && (!ipAddress.isEmpty())) { if ((ipAddress != null) && (!ipAddress.isEmpty())) {
return Pattern.matches("^([1-9]|[1-9]\\d|1\\d{2}|2[0-4]\\d|25[0-5])(\\.(\\d|[1-9]\\d|1\\d{2}|2[0-4]\\d|25[0-5])){3}$", ipAddress); return Pattern.matches("^([1-9]|[1-9]\\d|1\\d{2}|2[0-4]\\d|25[0-5])(\\.(\\d|[1-9]\\d|1\\d{2}|2[0-4]\\d|25[0-5])){3}$", ipAddress);

View File

@ -109,6 +109,8 @@ public class AckRequestProcessor extends SIPRequestProcessorParent implements In
param.put("app",sendRtpItem.getApp()); param.put("app",sendRtpItem.getApp());
param.put("stream",sendRtpItem.getStream()); param.put("stream",sendRtpItem.getStream());
param.put("ssrc", sendRtpItem.getSsrc()); param.put("ssrc", sendRtpItem.getSsrc());
param.put("dst_url",sendRtpItem.getIp());
param.put("dst_port", sendRtpItem.getPort());
param.put("src_port", sendRtpItem.getLocalPort()); param.put("src_port", sendRtpItem.getLocalPort());
param.put("pt", sendRtpItem.getPt()); param.put("pt", sendRtpItem.getPt());
param.put("use_ps", sendRtpItem.isUsePs() ? "1" : "0"); param.put("use_ps", sendRtpItem.isUsePs() ? "1" : "0");
@ -131,16 +133,12 @@ public class AckRequestProcessor extends SIPRequestProcessorParent implements In
// 如果是非严格模式,需要关闭端口占用 // 如果是非严格模式,需要关闭端口占用
JSONObject startSendRtpStreamResult = null; JSONObject startSendRtpStreamResult = null;
if (sendRtpItem.getLocalPort() != 0) { if (sendRtpItem.getLocalPort() != 0) {
HookSubscribeForRtpServerTimeout hookSubscribeForRtpServerTimeout = HookSubscribeFactory.on_rtp_server_timeout(sendRtpItem.getSsrc(), null, mediaInfo.getId()); if (sendRtpItem.isTcpActive()) {
hookSubscribe.removeSubscribe(hookSubscribeForRtpServerTimeout); startSendRtpStreamResult = zlmrtpServerFactory.startSendRtpPassive(mediaInfo, param);
if (zlmrtpServerFactory.releasePort(mediaInfo, sendRtpItem.getSsrc())) { }else {
if (sendRtpItem.isTcpActive()) { param.put("dst_url", sendRtpItem.getIp());
startSendRtpStreamResult = zlmrtpServerFactory.startSendRtpPassive(mediaInfo, param); param.put("dst_port", sendRtpItem.getPort());
}else { startSendRtpStreamResult = zlmrtpServerFactory.startSendRtpStream(mediaInfo, param);
param.put("dst_url", sendRtpItem.getIp());
param.put("dst_port", sendRtpItem.getPort());
startSendRtpStreamResult = zlmrtpServerFactory.startSendRtpStream(mediaInfo, param);
}
} }
}else { }else {
if (sendRtpItem.isTcpActive()) { if (sendRtpItem.isTcpActive()) {

View File

@ -375,9 +375,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
} }
logger.info("[上级Invite] {}, 平台:{} 通道:{}, 收流地址:{}:{},收流方式:{}, ssrc{}", sessionName, username, channelId, addressStr, port, streamTypeStr, ssrc); logger.info("[上级Invite] {}, 平台:{} 通道:{}, 收流地址:{}:{},收流方式:{}, ssrc{}", sessionName, username, channelId, addressStr, port, streamTypeStr, ssrc);
SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(mediaServerItem, addressStr, port, ssrc, requesterId, SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(mediaServerItem, addressStr, port, ssrc, requesterId,
device.getDeviceId(), channelId, mediaTransmissionTCP, platform.isRtcp(), ssrcFromCallback -> { device.getDeviceId(), channelId, mediaTransmissionTCP, platform.isRtcp());
return redisCatchStorage.querySendRTPServer(platform.getServerGBId(), channelId, null, callIdHeader.getCallId()) != null;
});
if (tcpActive != null) { if (tcpActive != null) {
sendRtpItem.setTcpActive(tcpActive); sendRtpItem.setTcpActive(tcpActive);
@ -588,9 +586,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
if (streamReady != null && streamReady) { if (streamReady != null && streamReady) {
// 自平台内容 // 自平台内容
SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(mediaServerItem, addressStr, port, ssrc, requesterId, SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(mediaServerItem, addressStr, port, ssrc, requesterId,
gbStream.getApp(), gbStream.getStream(), channelId, mediaTransmissionTCP, platform.isRtcp(), ssrcFromCallback ->{ gbStream.getApp(), gbStream.getStream(), channelId, mediaTransmissionTCP, platform.isRtcp());
return redisCatchStorage.querySendRTPServer(platform.getServerGBId(), channelId, null, callIdHeader.getCallId()) != null;
});
if (sendRtpItem == null) { if (sendRtpItem == null) {
logger.warn("服务器端口资源不足"); logger.warn("服务器端口资源不足");
@ -630,9 +626,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
if (streamReady != null && streamReady) { if (streamReady != null && streamReady) {
// 自平台内容 // 自平台内容
SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(mediaServerItem, addressStr, port, ssrc, requesterId, SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(mediaServerItem, addressStr, port, ssrc, requesterId,
gbStream.getApp(), gbStream.getStream(), channelId, mediaTransmissionTCP, platform.isRtcp(), ssrcFromCallback ->{ gbStream.getApp(), gbStream.getStream(), channelId, mediaTransmissionTCP, platform.isRtcp());
return redisCatchStorage.querySendRTPServer(platform.getServerGBId(), channelId, null, callIdHeader.getCallId()) != null;
});
if (sendRtpItem == null) { if (sendRtpItem == null) {
logger.warn("服务器端口资源不足"); logger.warn("服务器端口资源不足");
@ -748,9 +742,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
dynamicTask.stop(callIdHeader.getCallId()); dynamicTask.stop(callIdHeader.getCallId());
if (serverId.equals(userSetting.getServerId())) { if (serverId.equals(userSetting.getServerId())) {
SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(mediaServerItem, addressStr, finalPort, ssrc, requesterId, SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(mediaServerItem, addressStr, finalPort, ssrc, requesterId,
app, stream, channelId, mediaTransmissionTCP, platform.isRtcp(), ssrcFromCallback -> { app, stream, channelId, mediaTransmissionTCP, platform.isRtcp());
return redisCatchStorage.querySendRTPServer(platform.getServerGBId(), channelId, null, callIdHeader.getCallId()) != null;
});
if (sendRtpItem == null) { if (sendRtpItem == null) {
logger.warn("上级点时创建sendRTPItem失败可能是服务器端口资源不足"); logger.warn("上级点时创建sendRTPItem失败可能是服务器端口资源不足");

View File

@ -2,6 +2,7 @@ package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl;
import com.genersoft.iot.vmp.conf.CivilCodeFileConf; import com.genersoft.iot.vmp.conf.CivilCodeFileConf;
import com.genersoft.iot.vmp.conf.DynamicTask; import com.genersoft.iot.vmp.conf.DynamicTask;
import com.genersoft.iot.vmp.conf.SipConfig;
import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.gb28181.bean.Device; import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel; import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel;
@ -63,6 +64,9 @@ public class NotifyRequestForCatalogProcessor extends SIPRequestProcessorParent
@Autowired @Autowired
private CivilCodeFileConf civilCodeFileConf; private CivilCodeFileConf civilCodeFileConf;
@Autowired
private SipConfig sipConfig;
private final static String talkKey = "notify-request-for-catalog-task"; private final static String talkKey = "notify-request-for-catalog-task";
public void process(RequestEvent evt) { public void process(RequestEvent evt) {
@ -104,7 +108,13 @@ public class NotifyRequestForCatalogProcessor extends SIPRequestProcessorParent
event = eventElement.getText().toUpperCase(); event = eventElement.getText().toUpperCase();
} }
DeviceChannel channel = XmlUtil.channelContentHandler(itemDevice, device, event, civilCodeFileConf); DeviceChannel channel = XmlUtil.channelContentHandler(itemDevice, device, event, civilCodeFileConf);
if (channel == null) {
logger.info("[收到目录订阅]:但是解析失败 {}", new String(evt.getRequest().getRawContent()));
continue;
}
if (channel.getParentId() != null && channel.getParentId().equals(sipConfig.getId())) {
channel.setParentId(null);
}
channel.setDeviceId(device.getDeviceId()); channel.setDeviceId(device.getDeviceId());
logger.info("[收到目录订阅]{}/{}", device.getDeviceId(), channel.getChannelId()); logger.info("[收到目录订阅]{}/{}", device.getDeviceId(), channel.getChannelId());
switch (event) { switch (event) {

View File

@ -413,6 +413,13 @@ public class NotifyRequestProcessor extends SIPRequestProcessorParent implements
event = eventElement.getText().toUpperCase(); event = eventElement.getText().toUpperCase();
} }
DeviceChannel channel = XmlUtil.channelContentHandler(itemDevice, device, event, civilCodeFileConf); DeviceChannel channel = XmlUtil.channelContentHandler(itemDevice, device, event, civilCodeFileConf);
if (channel == null) {
logger.info("[收到目录订阅]:但是解析失败 {}", new String(evt.getRequest().getRawContent()));
continue;
}
if (channel.getParentId() != null && channel.getParentId().equals(sipConfig.getId())) {
channel.setParentId(null);
}
channel.setDeviceId(device.getDeviceId()); channel.setDeviceId(device.getDeviceId());
logger.info("[收到目录订阅]{}/{}", device.getDeviceId(), channel.getChannelId()); logger.info("[收到目录订阅]{}/{}", device.getDeviceId(), channel.getChannelId());
switch (event) { switch (event) {

View File

@ -1,6 +1,7 @@
package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.response.cmd; package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.response.cmd;
import com.genersoft.iot.vmp.conf.CivilCodeFileConf; import com.genersoft.iot.vmp.conf.CivilCodeFileConf;
import com.genersoft.iot.vmp.conf.SipConfig;
import com.genersoft.iot.vmp.gb28181.bean.*; import com.genersoft.iot.vmp.gb28181.bean.*;
import com.genersoft.iot.vmp.gb28181.session.CatalogDataCatch; import com.genersoft.iot.vmp.gb28181.session.CatalogDataCatch;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent; import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent;
@ -58,6 +59,9 @@ public class CatalogResponseMessageHandler extends SIPRequestProcessorParent imp
@Autowired @Autowired
private CivilCodeFileConf civilCodeFileConf; private CivilCodeFileConf civilCodeFileConf;
@Autowired
private SipConfig sipConfig;
@Override @Override
public void afterPropertiesSet() throws Exception { public void afterPropertiesSet() throws Exception {
responseMessageHandler.addHandler(cmdType, this); responseMessageHandler.addHandler(cmdType, this);
@ -113,11 +117,18 @@ public class CatalogResponseMessageHandler extends SIPRequestProcessorParent imp
if (channelDeviceElement == null) { if (channelDeviceElement == null) {
continue; continue;
} }
DeviceChannel deviceChannel = XmlUtil.channelContentHandler(itemDevice, device, null, civilCodeFileConf); DeviceChannel channel = XmlUtil.channelContentHandler(itemDevice, device, null, civilCodeFileConf);
deviceChannel = SipUtils.updateGps(deviceChannel, device.getGeoCoordSys()); if (channel == null) {
deviceChannel.setDeviceId(take.getDevice().getDeviceId()); logger.info("[收到目录订阅]:但是解析失败 {}", new String(evt.getRequest().getRawContent()));
continue;
}
if (channel.getParentId() != null && channel.getParentId().equals(sipConfig.getId())) {
channel.setParentId(null);
}
SipUtils.updateGps(channel, device.getGeoCoordSys());
channel.setDeviceId(take.getDevice().getDeviceId());
channelList.add(deviceChannel); channelList.add(channel);
} }
int sn = Integer.parseInt(snElement.getText()); int sn = Integer.parseInt(snElement.getText());
catalogDataCatch.put(take.getDevice().getDeviceId(), sn, sumNum, take.getDevice(), channelList); catalogDataCatch.put(take.getDevice().getDeviceId(), sn, sumNum, take.getDevice(), channelList);

View File

@ -8,6 +8,7 @@ import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel; import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel;
import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent; import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent;
import com.genersoft.iot.vmp.utils.DateUtil; import com.genersoft.iot.vmp.utils.DateUtil;
import org.apache.commons.lang3.math.NumberUtils;
import org.dom4j.Attribute; import org.dom4j.Attribute;
import org.dom4j.Document; import org.dom4j.Document;
import org.dom4j.DocumentException; import org.dom4j.DocumentException;
@ -317,7 +318,6 @@ public class XmlUtil {
deviceChannel.setBusinessGroupId(businessGroupID); deviceChannel.setBusinessGroupId(businessGroupID);
} }
if (!ObjectUtils.isEmpty(parentID)) { if (!ObjectUtils.isEmpty(parentID)) {
if (parentID.contains("/")) { if (parentID.contains("/")) {
String[] parentIdArray = parentID.split("/"); String[] parentIdArray = parentID.split("/");
@ -341,7 +341,11 @@ public class XmlUtil {
if (!ObjectUtils.isEmpty(owner)) { if (!ObjectUtils.isEmpty(owner)) {
deviceChannel.setOwner(owner); deviceChannel.setOwner(owner);
} }
if (!ObjectUtils.isEmpty(civilCode)) { if (!ObjectUtils.isEmpty(civilCode)
&& civilCode.length() <= 8
&& NumberUtils.isParsable(civilCode)
&& Integer.parseInt(civilCode)%2 == 0
) {
deviceChannel.setCivilCode(civilCode); deviceChannel.setCivilCode(civilCode);
} }
if (!ObjectUtils.isEmpty(businessGroupID)) { if (!ObjectUtils.isEmpty(businessGroupID)) {
@ -387,8 +391,8 @@ public class XmlUtil {
if (!ObjectUtils.isEmpty(businessGroupID)) { if (!ObjectUtils.isEmpty(businessGroupID)) {
deviceChannel.setParentId(businessGroupID); deviceChannel.setParentId(businessGroupID);
}else { }else {
if (!ObjectUtils.isEmpty(civilCode)) { if (!ObjectUtils.isEmpty(deviceChannel.getCivilCode())) {
deviceChannel.setParentId(civilCode); deviceChannel.setParentId(deviceChannel.getCivilCode());
} }
} }
} }

View File

@ -0,0 +1,55 @@
package com.genersoft.iot.vmp.media.zlm;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.media.zlm.dto.MediaSendRtpPortInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
@Component
public class SendRtpPortManager {
private final static Logger logger = LoggerFactory.getLogger(SendRtpPortManager.class);
@Autowired
private UserSetting userSetting;
@Autowired
private RedisTemplate<Object, Object> redisTemplate;
private final String KEY = "VM_MEDIA_SEND_RTP_PORT_RANGE_";
public void initServerPort(String mediaServerId, int startPort, int endPort){
String key = KEY + userSetting.getServerId() + "_" + mediaServerId;
MediaSendRtpPortInfo mediaSendRtpPortInfo = new MediaSendRtpPortInfo(startPort, endPort, mediaServerId);
redisTemplate.opsForValue().set(key, mediaSendRtpPortInfo);
}
public int getNextPort(String mediaServerId) {
String key = KEY + userSetting.getServerId() + "_" + mediaServerId;
MediaSendRtpPortInfo mediaSendRtpPortInfo = (MediaSendRtpPortInfo)redisTemplate.opsForValue().get(key);
if (mediaSendRtpPortInfo == null) {
logger.warn("[发送端口管理] 获取{}的发送端口时未找到端口信息", mediaSendRtpPortInfo);
return 0;
}
int port;
if (mediaSendRtpPortInfo.getCurrent() %2 != 0) {
port = mediaSendRtpPortInfo.getCurrent() + 1;
}else {
port = mediaSendRtpPortInfo.getCurrent() + 2;
}
if (port > mediaSendRtpPortInfo.getEnd()) {
if (mediaSendRtpPortInfo.getStart() %2 != 0) {
port = mediaSendRtpPortInfo.getStart() + 1;
}else {
port = mediaSendRtpPortInfo.getStart();
}
}
mediaSendRtpPortInfo.setCurrent(port);
redisTemplate.opsForValue().set(key, mediaSendRtpPortInfo);
return port;
}
}

View File

@ -1,22 +1,18 @@
package com.genersoft.iot.vmp.media.zlm; package com.genersoft.iot.vmp.media.zlm;
import com.alibaba.fastjson2.JSON; import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONArray;
import com.alibaba.fastjson2.JSONObject; import com.alibaba.fastjson2.JSONObject;
import com.genersoft.iot.vmp.common.CommonCallback; import com.genersoft.iot.vmp.common.CommonCallback;
import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem; import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory;
import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForRtpServerTimeout;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.media.zlm.dto.hook.HookParam;
import com.genersoft.iot.vmp.media.zlm.dto.hook.OnRtpServerTimeoutHookParam;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import java.util.*; import java.util.HashMap;
import java.util.Map;
@Component @Component
public class ZLMRTPServerFactory { public class ZLMRTPServerFactory {
@ -32,68 +28,9 @@ public class ZLMRTPServerFactory {
@Autowired @Autowired
private ZlmHttpHookSubscribe hookSubscribe; private ZlmHttpHookSubscribe hookSubscribe;
private int[] portRangeArray = new int[2]; @Autowired
private SendRtpPortManager sendRtpPortManager;
public int getFreePort(MediaServerItem mediaServerItem, int startPort, int endPort, List<Integer> usedFreelist) {
if (endPort <= startPort) {
return -1;
}
if (usedFreelist == null) {
usedFreelist = new ArrayList<>();
}
JSONObject listRtpServerJsonResult = zlmresTfulUtils.listRtpServer(mediaServerItem);
if (listRtpServerJsonResult != null) {
JSONArray data = listRtpServerJsonResult.getJSONArray("data");
if (data != null) {
for (int i = 0; i < data.size(); i++) {
JSONObject dataItem = data.getJSONObject(i);
usedFreelist.add(dataItem.getInteger("port"));
}
}
}
Map<String, Object> param = new HashMap<>();
int result = -1;
// 设置推流端口
if (startPort%2 == 1) {
startPort ++;
}
boolean checkPort = false;
for (int i = startPort; i < endPort + 1; i+=2) {
if (!usedFreelist.contains(i)){
checkPort = true;
startPort = i;
break;
}
}
if (!checkPort) {
logger.warn("未找到节点{}上范围[{}-{}]的空闲端口", mediaServerItem.getId(), startPort, endPort);
return -1;
}
param.put("port", startPort);
String stream = UUID.randomUUID().toString();
param.put("enable_tcp", 1);
param.put("stream_id", stream);
// param.put("port", 0);
JSONObject openRtpServerResultJson = zlmresTfulUtils.openRtpServer(mediaServerItem, param);
if (openRtpServerResultJson != null) {
if (openRtpServerResultJson.getInteger("code") == 0) {
result= openRtpServerResultJson.getInteger("port");
Map<String, Object> closeRtpServerParam = new HashMap<>();
closeRtpServerParam.put("stream_id", stream);
zlmresTfulUtils.closeRtpServer(mediaServerItem, closeRtpServerParam);
}else {
usedFreelist.add(startPort);
startPort +=2;
result = getFreePort(mediaServerItem, startPort, endPort,usedFreelist);
}
}else {
// 检查ZLM状态
logger.error("创建RTP Server 失败 {}: 请检查ZLM服务", param.get("port"));
}
return result;
}
/** /**
* rtpServer * rtpServer
@ -226,16 +163,14 @@ public class ZLMRTPServerFactory {
* @return SendRtpItem * @return SendRtpItem
*/ */
public SendRtpItem createSendRtpItem(MediaServerItem serverItem, String ip, int port, String ssrc, String platformId, public SendRtpItem createSendRtpItem(MediaServerItem serverItem, String ip, int port, String ssrc, String platformId,
String deviceId, String channelId, boolean tcp, boolean rtcp, KeepPortCallback callback){ String deviceId, String channelId, boolean tcp, boolean rtcp){
// 默认为随机端口 // 默认为随机端口
int localPort = 0; int localPort = 0;
if (userSetting.getGbSendStreamStrict()) { if (userSetting.getGbSendStreamStrict()) {
if (userSetting.getGbSendStreamStrict()) { localPort = sendRtpPortManager.getNextPort(serverItem.getId());
localPort = keepPort(serverItem, ssrc, localPort, callback); if (localPort == 0) {
if (localPort == 0) { return null;
return null;
}
} }
} }
SendRtpItem sendRtpItem = new SendRtpItem(); SendRtpItem sendRtpItem = new SendRtpItem();
@ -265,11 +200,11 @@ public class ZLMRTPServerFactory {
* @return SendRtpItem * @return SendRtpItem
*/ */
public SendRtpItem createSendRtpItem(MediaServerItem serverItem, String ip, int port, String ssrc, String platformId, public SendRtpItem createSendRtpItem(MediaServerItem serverItem, String ip, int port, String ssrc, String platformId,
String app, String stream, String channelId, boolean tcp, boolean rtcp, KeepPortCallback callback){ String app, String stream, String channelId, boolean tcp, boolean rtcp){
// 默认为随机端口 // 默认为随机端口
int localPort = 0; int localPort = 0;
if (userSetting.getGbSendStreamStrict()) { if (userSetting.getGbSendStreamStrict()) {
localPort = keepPort(serverItem, ssrc, localPort, callback); localPort = sendRtpPortManager.getNextPort(serverItem.getId());
if (localPort == 0) { if (localPort == 0) {
return null; return null;
} }
@ -290,58 +225,6 @@ public class ZLMRTPServerFactory {
return sendRtpItem; return sendRtpItem;
} }
public interface KeepPortCallback{
Boolean keep(String ssrc);
}
/**
*
*/
public int keepPort(MediaServerItem serverItem, String ssrc, int localPort, KeepPortCallback keepPortCallback) {
Map<String, Object> param = new HashMap<>(3);
param.put("port", localPort);
param.put("enable_tcp", 1);
param.put("stream_id", ssrc);
JSONObject jsonObject = zlmresTfulUtils.openRtpServer(serverItem, param);
if (jsonObject.getInteger("code") == 0) {
localPort = jsonObject.getInteger("port");
HookSubscribeForRtpServerTimeout hookSubscribeForRtpServerTimeout = HookSubscribeFactory.on_rtp_server_timeout(ssrc, null, serverItem.getId());
// 订阅 zlm启动事件, 新的zlm也会从这里进入系统
int finalLocalPort = localPort;
hookSubscribe.addSubscribe(hookSubscribeForRtpServerTimeout,
(MediaServerItem mediaServerItem, HookParam hookParam)->{
logger.info("[上级点播] {}->监听端口到期继续保持监听: {}", ssrc, finalLocalPort);
OnRtpServerTimeoutHookParam rtpServerTimeoutHookParam = (OnRtpServerTimeoutHookParam) hookParam;
if (ssrc.equals(rtpServerTimeoutHookParam.getStream_id())) {
if (keepPortCallback.keep(ssrc)) {
logger.info("[上级点播] {}->监听端口到期继续保持监听", ssrc);
keepPort(serverItem, ssrc, finalLocalPort, keepPortCallback);
}else {
logger.info("[上级点播] {}->发送取消,无需继续监听", ssrc);
releasePort(serverItem, ssrc);
}
}
});
logger.info("[上级点播] {}->: {}", ssrc, localPort);
return localPort;
}else {
logger.info("[上级点播] 监听端口失败: {}->{}", ssrc, localPort);
return 0;
}
}
/**
*
*/
public boolean releasePort(MediaServerItem serverItem, String ssrc) {
logger.info("[保持端口] {}->释放监听端口", ssrc);
boolean closeRTPServerResult = closeRtpServer(serverItem, ssrc);
HookSubscribeForRtpServerTimeout hookSubscribeForRtpServerTimeout = HookSubscribeFactory.on_rtp_server_timeout(ssrc, null, serverItem.getId());
// 订阅 zlm启动事件, 新的zlm也会从这里进入系统
hookSubscribe.removeSubscribe(hookSubscribeForRtpServerTimeout);
return closeRTPServerResult;
}
/** /**
* zlm RESTFUL API startSendRtp * zlm RESTFUL API startSendRtp
*/ */

View File

@ -0,0 +1,50 @@
package com.genersoft.iot.vmp.media.zlm.dto;
public class MediaSendRtpPortInfo {
private int start;
private int end;
private String mediaServerId;
private int current;
public MediaSendRtpPortInfo(int start, int end, String mediaServerId) {
this.start = start;
this.current = start;
this.end = end;
this.mediaServerId = mediaServerId;
}
public int getStart() {
return start;
}
public void setStart(int start) {
this.start = start;
}
public int getEnd() {
return end;
}
public void setEnd(int end) {
this.end = end;
}
public String getMediaServerId() {
return mediaServerId;
}
public void setMediaServerId(String mediaServerId) {
this.mediaServerId = mediaServerId;
}
public int getCurrent() {
return current;
}
public void setCurrent(int current) {
this.current = current;
}
}

View File

@ -62,6 +62,9 @@ public class MediaServerItem{
@Schema(description = "多端口RTP收流端口范围") @Schema(description = "多端口RTP收流端口范围")
private String rtpPortRange; private String rtpPortRange;
@Schema(description = "RTP发流端口范围")
private String sendRtpPortRange;
@Schema(description = "assist服务端口") @Schema(description = "assist服务端口")
private int recordAssistPort; private int recordAssistPort;
@ -297,4 +300,12 @@ public class MediaServerItem{
public void setHookAliveInterval(Float hookAliveInterval) { public void setHookAliveInterval(Float hookAliveInterval) {
this.hookAliveInterval = hookAliveInterval; this.hookAliveInterval = hookAliveInterval;
} }
public String getSendRtpPortRange() {
return sendRtpPortRange;
}
public void setSendRtpPortRange(String sendRtpPortRange) {
this.sendRtpPortRange = sendRtpPortRange;
}
} }

View File

@ -11,10 +11,7 @@ import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.conf.exception.ControllerException; import com.genersoft.iot.vmp.conf.exception.ControllerException;
import com.genersoft.iot.vmp.gb28181.event.EventPublisher; import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
import com.genersoft.iot.vmp.gb28181.session.SSRCFactory; import com.genersoft.iot.vmp.gb28181.session.SSRCFactory;
import com.genersoft.iot.vmp.media.zlm.AssistRESTfulUtils; import com.genersoft.iot.vmp.media.zlm.*;
import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils;
import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory;
import com.genersoft.iot.vmp.media.zlm.ZLMServerConfig;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.media.zlm.dto.ServerKeepaliveData; import com.genersoft.iot.vmp.media.zlm.dto.ServerKeepaliveData;
import com.genersoft.iot.vmp.service.IInviteStreamService; import com.genersoft.iot.vmp.service.IInviteStreamService;
@ -70,6 +67,9 @@ public class MediaServerServiceImpl implements IMediaServerService {
@Autowired @Autowired
private UserSetting userSetting; private UserSetting userSetting;
@Autowired
private SendRtpPortManager sendRtpPortManager;
@Autowired @Autowired
private AssistRESTfulUtils assistRESTfulUtils; private AssistRESTfulUtils assistRESTfulUtils;
@ -121,13 +121,40 @@ public class MediaServerServiceImpl implements IMediaServerService {
if (ssrcFactory.hasMediaServerSSRC(mediaServerItem.getId())) { if (ssrcFactory.hasMediaServerSSRC(mediaServerItem.getId())) {
ssrcFactory.initMediaServerSSRC(mediaServerItem.getId(), null); ssrcFactory.initMediaServerSSRC(mediaServerItem.getId(), null);
} }
if (userSetting.getGbSendStreamStrict()) {
int startPort = 50000;
int endPort = 60000;
String sendRtpPortRange = mediaServerItem.getSendRtpPortRange();
if (sendRtpPortRange == null) {
logger.warn("[zlm] ] 未配置发流端口范围默认使用50000到60000");
}else {
String[] sendRtpPortRangeArray = sendRtpPortRange.trim().split(",");
if (sendRtpPortRangeArray.length != 2) {
logger.warn("[zlm] ] 发流端口范围错误默认使用50000到60000");
}else {
try {
startPort = Integer.parseInt(sendRtpPortRangeArray[0]);
endPort = Integer.parseInt(sendRtpPortRangeArray[1]);
if (endPort <= startPort) {
logger.warn("[zlm] ] 发流端口范围错误,结束端口应大于开始端口,使用默认端口");
startPort = 50000;
endPort = 60000;
}
}catch (NumberFormatException e) {
logger.warn("[zlm] ] 发流端口范围错误默认使用50000到60000");
}
}
}
logger.info("[[zlm] ] 配置发流端口范围,{}-{}", startPort, endPort);
sendRtpPortManager.initServerPort(mediaServerItem.getId(), startPort, endPort);
}
// 查询redis是否存在此mediaServer // 查询redis是否存在此mediaServer
String key = VideoManagerConstants.MEDIA_SERVER_PREFIX + userSetting.getServerId() + "_" + mediaServerItem.getId(); String key = VideoManagerConstants.MEDIA_SERVER_PREFIX + userSetting.getServerId() + "_" + mediaServerItem.getId();
Boolean hasKey = redisTemplate.hasKey(key); Boolean hasKey = redisTemplate.hasKey(key);
if (hasKey != null && ! hasKey) { if (hasKey != null && ! hasKey) {
redisTemplate.opsForValue().set(key, mediaServerItem); redisTemplate.opsForValue().set(key, mediaServerItem);
} }
} }
} }

View File

@ -314,9 +314,7 @@ public class RedisGbPlayMsgListener implements MessageListener {
SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(mediaServerItem, content.getIp(), SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(mediaServerItem, content.getIp(),
content.getPort(), content.getSsrc(), content.getPlatformId(), content.getPort(), content.getSsrc(), content.getPlatformId(),
content.getApp(), content.getStream(), content.getChannelId(), content.getApp(), content.getStream(), content.getChannelId(),
content.getTcp(), content.getRtcp(), ssrcFromCallback -> { content.getTcp(), content.getRtcp());
return querySendRTPServer(content.getPlatformId(), content.getChannelId(), content.getStream(), null) != null;
});
WVPResult<ResponseSendItemMsg> result = new WVPResult<>(); WVPResult<ResponseSendItemMsg> result = new WVPResult<>();
result.setCode(0); result.setCode(0);

View File

@ -28,6 +28,7 @@ public interface MediaServerMapper {
"secret,"+ "secret,"+
"rtp_enable,"+ "rtp_enable,"+
"rtp_port_range,"+ "rtp_port_range,"+
"send_rtp_port_range,"+
"record_assist_port,"+ "record_assist_port,"+
"default_server,"+ "default_server,"+
"create_time,"+ "create_time,"+
@ -51,6 +52,7 @@ public interface MediaServerMapper {
"#{secret}, " + "#{secret}, " +
"#{rtpEnable}, " + "#{rtpEnable}, " +
"#{rtpPortRange}, " + "#{rtpPortRange}, " +
"#{sendRtpPortRange}, " +
"#{recordAssistPort}, " + "#{recordAssistPort}, " +
"#{defaultServer}, " + "#{defaultServer}, " +
"#{createTime}, " + "#{createTime}, " +
@ -75,6 +77,7 @@ public interface MediaServerMapper {
"<if test=\"autoConfig != null\">, auto_config=#{autoConfig}</if>" + "<if test=\"autoConfig != null\">, auto_config=#{autoConfig}</if>" +
"<if test=\"rtpEnable != null\">, rtp_enable=#{rtpEnable}</if>" + "<if test=\"rtpEnable != null\">, rtp_enable=#{rtpEnable}</if>" +
"<if test=\"rtpPortRange != null\">, rtp_port_range=#{rtpPortRange}</if>" + "<if test=\"rtpPortRange != null\">, rtp_port_range=#{rtpPortRange}</if>" +
"<if test=\"sendRtpPortRange != null\">, send_rtp_port_range=#{sendRtpPortRange}</if>" +
"<if test=\"secret != null\">, secret=#{secret}</if>" + "<if test=\"secret != null\">, secret=#{secret}</if>" +
"<if test=\"recordAssistPort != null\">, record_assist_port=#{recordAssistPort}</if>" + "<if test=\"recordAssistPort != null\">, record_assist_port=#{recordAssistPort}</if>" +
"<if test=\"hookAliveInterval != null\">, hook_alive_interval=#{hookAliveInterval}</if>" + "<if test=\"hookAliveInterval != null\">, hook_alive_interval=#{hookAliveInterval}</if>" +
@ -98,6 +101,7 @@ public interface MediaServerMapper {
"<if test=\"autoConfig != null\">, auto_config=#{autoConfig}</if>" + "<if test=\"autoConfig != null\">, auto_config=#{autoConfig}</if>" +
"<if test=\"rtpEnable != null\">, rtp_enable=#{rtpEnable}</if>" + "<if test=\"rtpEnable != null\">, rtp_enable=#{rtpEnable}</if>" +
"<if test=\"rtpPortRange != null\">, rtp_port_range=#{rtpPortRange}</if>" + "<if test=\"rtpPortRange != null\">, rtp_port_range=#{rtpPortRange}</if>" +
"<if test=\"sendRtpPortRange != null\">, send_rtp_port_range=#{sendRtpPortRange}</if>" +
"<if test=\"secret != null\">, secret=#{secret}</if>" + "<if test=\"secret != null\">, secret=#{secret}</if>" +
"<if test=\"recordAssistPort != null\">, record_assist_port=#{recordAssistPort}</if>" + "<if test=\"recordAssistPort != null\">, record_assist_port=#{recordAssistPort}</if>" +
"<if test=\"hookAliveInterval != null\">, hook_alive_interval=#{hookAliveInterval}</if>" + "<if test=\"hookAliveInterval != null\">, hook_alive_interval=#{hookAliveInterval}</if>" +

View File

@ -81,6 +81,7 @@
<el-switch active-text="" inactive-text="" @change="portRangeChange" v-model="mediaServerForm.rtpEnable" :disabled="mediaServerForm.defaultServer"></el-switch> <el-switch active-text="" inactive-text="" @change="portRangeChange" v-model="mediaServerForm.rtpEnable" :disabled="mediaServerForm.defaultServer"></el-switch>
</el-form-item> </el-form-item>
<el-form-item v-if="!mediaServerForm.rtpEnable" label="收流端口" prop="rtpProxyPort"> <el-form-item v-if="!mediaServerForm.rtpEnable" label="收流端口" prop="rtpProxyPort">
<el-input v-model.number="mediaServerForm.rtpProxyPort" clearable :disabled="mediaServerForm.defaultServer"></el-input> <el-input v-model.number="mediaServerForm.rtpProxyPort" clearable :disabled="mediaServerForm.defaultServer"></el-input>
</el-form-item> </el-form-item>
@ -89,6 +90,11 @@
- -
<el-input v-model="rtpPortRange2" placeholder="终止" @change="portRangeChange" clearable style="width: 100px" prop="rtpPortRange2" :disabled="mediaServerForm.defaultServer"></el-input> <el-input v-model="rtpPortRange2" placeholder="终止" @change="portRangeChange" clearable style="width: 100px" prop="rtpPortRange2" :disabled="mediaServerForm.defaultServer"></el-input>
</el-form-item> </el-form-item>
<el-form-item v-if="mediaServerForm.sendRtpEnable" label="发流端口" >
<el-input v-model="sendRtpPortRange1" placeholder="起始" @change="portRangeChange" clearable style="width: 100px" prop="rtpPortRange1" :disabled="mediaServerForm.defaultServer"></el-input>
-
<el-input v-model="sendRtpPortRange2" placeholder="终止" @change="portRangeChange" clearable style="width: 100px" prop="rtpPortRange2" :disabled="mediaServerForm.defaultServer"></el-input>
</el-form-item>
<el-form-item label="录像管理服务端口" prop="recordAssistPort"> <el-form-item label="录像管理服务端口" prop="recordAssistPort">
<el-input v-model.number="mediaServerForm.recordAssistPort" :disabled="mediaServerForm.defaultServer"> <el-input v-model.number="mediaServerForm.recordAssistPort" :disabled="mediaServerForm.defaultServer">
<!-- <el-button v-if="mediaServerForm.recordAssistPort > 0" slot="append" type="primary" @click="checkRecordServer"></el-button>--> <!-- <el-button v-if="mediaServerForm.recordAssistPort > 0" slot="append" type="primary" @click="checkRecordServer"></el-button>-->
@ -172,6 +178,7 @@ export default {
rtmpSSlPort: "", rtmpSSlPort: "",
rtpEnable: false, rtpEnable: false,
rtpPortRange: "", rtpPortRange: "",
sendRtpPortRange: "",
rtpProxyPort: "", rtpProxyPort: "",
rtspPort: "", rtspPort: "",
rtspSSLPort: "", rtspSSLPort: "",
@ -179,6 +186,9 @@ export default {
rtpPortRange1:30000, rtpPortRange1:30000,
rtpPortRange2:30500, rtpPortRange2:30500,
sendRtpPortRange1:50000,
sendRtpPortRange2:60000,
rules: { rules: {
ip: [{ required: true, validator: isValidIp, message: '请输入有效的IP地址', trigger: 'blur' }], ip: [{ required: true, validator: isValidIp, message: '请输入有效的IP地址', trigger: 'blur' }],
httpPort: [{ required: true, validator: isValidPort, message: '请输入有效的端口号', trigger: 'blur' }], httpPort: [{ required: true, validator: isValidPort, message: '请输入有效的端口号', trigger: 'blur' }],
@ -214,10 +224,15 @@ export default {
this.currentStep = 3; this.currentStep = 3;
if (param.rtpPortRange) { if (param.rtpPortRange) {
let rtpPortRange = this.mediaServerForm.rtpPortRange.split(","); let rtpPortRange = this.mediaServerForm.rtpPortRange.split(",");
let sendRtpPortRange = this.mediaServerForm.sendRtpPortRange.split(",");
if (rtpPortRange.length > 0) { if (rtpPortRange.length > 0) {
this.rtpPortRange1 = rtpPortRange[0] this.rtpPortRange1 = rtpPortRange[0]
this.rtpPortRange2 = rtpPortRange[1] this.rtpPortRange2 = rtpPortRange[1]
} }
if (sendRtpPortRange.length > 0) {
this.sendRtpPortRange1 = sendRtpPortRange[0]
this.sendRtpPortRange2 = sendRtpPortRange[1]
}
} }
} }
}, },
@ -240,6 +255,8 @@ export default {
that.mediaServerForm.autoConfig = true; that.mediaServerForm.autoConfig = true;
that.rtpPortRange1 = 30000 that.rtpPortRange1 = 30000
that.rtpPortRange2 = 30500 that.rtpPortRange2 = 30500
that.sendRtpPortRange1 = 50000
that.sendRtpPortRange2 = 60000
that.serverCheck = 1; that.serverCheck = 1;
}else { }else {
that.serverCheck = -1; that.serverCheck = -1;
@ -321,12 +338,15 @@ export default {
rtmpSSlPort: "", rtmpSSlPort: "",
rtpEnable: false, rtpEnable: false,
rtpPortRange: "", rtpPortRange: "",
sendRtpPortRange: "",
rtpProxyPort: "", rtpProxyPort: "",
rtspPort: "", rtspPort: "",
rtspSSLPort: "", rtspSSLPort: "",
}; };
this.rtpPortRange1 = 30500; this.rtpPortRange1 = 30500;
this.rtpPortRange2 = 30500; this.rtpPortRange2 = 30500;
this.sendRtpPortRange1 = 50000;
this.sendRtpPortRange2 = 60000;
this.listChangeCallback = null this.listChangeCallback = null
this.currentStep = 1; this.currentStep = 1;
}, },
@ -351,7 +371,7 @@ export default {
portRangeChange: function() { portRangeChange: function() {
if (this.mediaServerForm.rtpEnable) { if (this.mediaServerForm.rtpEnable) {
this.mediaServerForm.rtpPortRange = this.rtpPortRange1 + "," + this.rtpPortRange2 this.mediaServerForm.rtpPortRange = this.rtpPortRange1 + "," + this.rtpPortRange2
console.log(this.mediaServerForm.rtpPortRange) this.mediaServerForm.sendRtpPortRange = this.sendRtpPortRange1 + "," + this.sendRtpPortRange2
} }
} }
}, },

View File

@ -82,17 +82,21 @@
<el-option label="组播" value="2"></el-option> <el-option label="组播" value="2"></el-option>
</el-select> </el-select>
</el-form-item> </el-form-item>
<el-form-item label="无人观看" prop="rtpType" > <el-form-item label="无人观看" prop="rtpType" >
<el-select <el-radio v-model="proxyParam.noneReader" label="1"></el-radio>
@change="noneReaderHandler" <el-radio v-model="proxyParam.enableDisableNoneReader" label="2"></el-radio>
v-model="proxyParam.noneReader" <el-radio v-model="proxyParam.enableRemoveNoneReader" label="3"></el-radio>
style="width: 100%" <!-- <el-select-->
placeholder="请选择无人观看的处理方式" <!-- @change="noneReaderHandler"-->
> <!-- v-model="proxyParam.noneReader"-->
<el-option label="不做处理" value="0"></el-option> <!-- style="width: 100%"-->
<el-option label="停用" value="1"></el-option> <!-- placeholder="请选择无人观看的处理方式"-->
<el-option label="移除" value="2"></el-option> <!-- >-->
</el-select> <!-- <el-option label="不做处理" value="0"></el-option>-->
<!-- <el-option label="停用" value="1"></el-option>-->
<!-- <el-option label="移除" value="2"></el-option>-->
<!-- </el-select>-->
</el-form-item> </el-form-item>
<el-form-item label="其他选项"> <el-form-item label="其他选项">
<div style="float: left;"> <div style="float: left;">