From 2466a248609b2a9b8eeee08c1378b2a440db475d Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: Thu, 10 Nov 2022 16:48:17 +0800 Subject: [PATCH 1/3] =?UTF-8?q?=E5=8E=BB=E9=99=A4=E5=8F=91=E9=80=81?= =?UTF-8?q?=E6=B5=81=E7=AB=AF=E5=8F=A3=E8=8C=83=E5=9B=B4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pom.xml | 2 +- .../genersoft/iot/vmp/conf/DynamicTask.java | 11 +- .../genersoft/iot/vmp/conf/MediaConfig.java | 13 +- .../request/impl/InviteRequestProcessor.java | 9 +- .../impl/RegisterRequestProcessor.java | 2 +- .../vmp/media/zlm/ZLMHttpHookListener.java | 27 ++-- .../vmp/media/zlm/ZLMRTPServerFactory.java | 4 +- .../iot/vmp/media/zlm/ZLMServerConfig.java | 6 +- .../vmp/media/zlm/dto/MediaServerItem.java | 7 +- .../iot/vmp/service/IStreamPushService.java | 3 + .../service/impl/MediaServerServiceImpl.java | 60 ++++---- .../iot/vmp/service/impl/PlayServiceImpl.java | 139 +++++++++--------- .../storager/dao/PlatformGbStreamMapper.java | 4 +- 13 files changed, 134 insertions(+), 153 deletions(-) diff --git a/pom.xml b/pom.xml index 64d414d7..4c26f332 100644 --- a/pom.xml +++ b/pom.xml @@ -11,7 +11,7 @@ com.genersoft wvp-pro - 2.3.2 + 2.6.6 web video platform 国标28181视频平台 diff --git a/src/main/java/com/genersoft/iot/vmp/conf/DynamicTask.java b/src/main/java/com/genersoft/iot/vmp/conf/DynamicTask.java index b2c23580..9ca936c6 100644 --- a/src/main/java/com/genersoft/iot/vmp/conf/DynamicTask.java +++ b/src/main/java/com/genersoft/iot/vmp/conf/DynamicTask.java @@ -1,10 +1,7 @@ package com.genersoft.iot.vmp.conf; -import com.genersoft.iot.vmp.gb28181.task.ISubscribeTask; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.context.annotation.Bean; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; import org.springframework.stereotype.Component; @@ -101,12 +98,14 @@ public class DynamicTask { } } - public void stop(String key) { - if (futureMap.get(key) != null && !futureMap.get(key).isCancelled()) { - futureMap.get(key).cancel(false); + public boolean stop(String key) { + boolean result = false; + if (futureMap.get(key) != null && !futureMap.get(key).isCancelled() && !futureMap.get(key).isDone()) { + result = futureMap.get(key).cancel(false); futureMap.remove(key); runnableMap.remove(key); } + return result; } public boolean contains(String key) { diff --git a/src/main/java/com/genersoft/iot/vmp/conf/MediaConfig.java b/src/main/java/com/genersoft/iot/vmp/conf/MediaConfig.java index 708f72be..29b2d8db 100644 --- a/src/main/java/com/genersoft/iot/vmp/conf/MediaConfig.java +++ b/src/main/java/com/genersoft/iot/vmp/conf/MediaConfig.java @@ -2,13 +2,11 @@ package com.genersoft.iot.vmp.conf; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import com.genersoft.iot.vmp.utils.DateUtil; -import com.genersoft.iot.vmp.vmanager.gb28181.device.DeviceQuery; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Configuration; import org.springframework.util.ObjectUtils; -import org.springframework.util.StringUtils; import java.net.InetAddress; import java.net.UnknownHostException; @@ -75,10 +73,6 @@ public class MediaConfig{ @Value("${media.rtp.port-range}") private String rtpPortRange; - - @Value("${media.rtp.send-port-range}") - private String sendRtpPortRange; - @Value("${media.record-assist-port:0}") private Integer recordAssistPort = 0; @@ -191,10 +185,6 @@ public class MediaConfig{ return sipDomain; } - public String getSendRtpPortRange() { - return sendRtpPortRange; - } - public MediaServerItem getMediaSerItem(){ MediaServerItem mediaServerItem = new MediaServerItem(); mediaServerItem.setId(id); @@ -214,9 +204,8 @@ public class MediaConfig{ mediaServerItem.setSecret(secret); mediaServerItem.setRtpEnable(rtpEnable); mediaServerItem.setRtpPortRange(rtpPortRange); - mediaServerItem.setSendRtpPortRange(sendRtpPortRange); mediaServerItem.setRecordAssistPort(recordAssistPort); - mediaServerItem.setHookAliveInterval(120); + mediaServerItem.setHookAliveInterval(30.00f); mediaServerItem.setCreateTime(DateUtil.getNow()); mediaServerItem.setUpdateTime(DateUtil.getNow()); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java index 6634322e..598c0876 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java @@ -8,14 +8,13 @@ import com.genersoft.iot.vmp.gb28181.event.SipSubscribe; import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager; import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver; import com.genersoft.iot.vmp.gb28181.transmit.SIPSender; -import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander; import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommanderFroPlatform; import com.genersoft.iot.vmp.gb28181.transmit.event.request.ISIPRequestProcessor; import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent; import com.genersoft.iot.vmp.gb28181.utils.SipUtils; -import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe; import com.genersoft.iot.vmp.media.zlm.ZLMMediaListManager; import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory; +import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe; import com.genersoft.iot.vmp.media.zlm.dto.*; import com.genersoft.iot.vmp.service.IMediaServerService; import com.genersoft.iot.vmp.service.IPlayService; @@ -39,9 +38,10 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import javax.sdp.*; -import javax.sip.*; +import javax.sip.InvalidArgumentException; +import javax.sip.RequestEvent; +import javax.sip.SipException; import javax.sip.header.CallIdHeader; -import javax.sip.message.Request; import javax.sip.message.Response; import java.text.ParseException; import java.time.Instant; @@ -656,6 +656,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements if (!platform.isStartOfflinePush()) { // 平台设置中关闭了拉起离线的推流则直接回复 try { + logger.info("[上级点播] 失败,推流设备未推流,channel: {}, app: {}, stream: {}", gbStream.getGbId(), gbStream.getApp(), gbStream.getStream()); responseAck(request, Response.TEMPORARILY_UNAVAILABLE, "channel stream not pushing"); } catch (SipException | InvalidArgumentException | ParseException e) { logger.error("[命令发送失败] invite 通道未推流: {}", e.getMessage()); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/RegisterRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/RegisterRequestProcessor.java index 6ca99e4f..0b657b56 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/RegisterRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/RegisterRequestProcessor.java @@ -85,7 +85,7 @@ public class RegisterRequestProcessor extends SIPRequestProcessorParent implemen String password = (device != null && !ObjectUtils.isEmpty(device.getPassword()))? device.getPassword() : sipConfig.getPassword(); AuthorizationHeader authHead = (AuthorizationHeader) request.getHeader(AuthorizationHeader.NAME); if (authHead == null && !ObjectUtils.isEmpty(password)) { - logger.info("[注册请求] 未携带授权头 回复401: {}", requestAddress); + logger.info("[注册请求] 回复401: {}", requestAddress); response = getMessageFactory().createResponse(Response.UNAUTHORIZED, request); new DigestServerAuthenticationHelper().generateChallenge(getHeaderFactory(), response, sipConfig.getDomain()); sipSender.transmitRequest(response); diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java index d925e4b7..e2b5a289 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java @@ -1,19 +1,19 @@ package com.genersoft.iot.vmp.media.zlm; -import java.text.ParseException; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - import com.alibaba.fastjson2.JSON; +import com.alibaba.fastjson2.JSONObject; import com.genersoft.iot.vmp.common.StreamInfo; import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.conf.exception.SsrcTransactionNotFoundException; import com.genersoft.iot.vmp.gb28181.bean.*; import com.genersoft.iot.vmp.gb28181.event.EventPublisher; import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager; +import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander; import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommanderFroPlatform; -import com.genersoft.iot.vmp.media.zlm.dto.*; +import com.genersoft.iot.vmp.media.zlm.dto.HookType; +import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; +import com.genersoft.iot.vmp.media.zlm.dto.StreamAuthorityInfo; +import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem; import com.genersoft.iot.vmp.media.zlm.dto.hook.*; import com.genersoft.iot.vmp.service.*; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; @@ -24,18 +24,15 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.util.ObjectUtils; -import org.springframework.web.bind.annotation.PostMapping; -import org.springframework.web.bind.annotation.RequestBody; -import org.springframework.web.bind.annotation.RequestMapping; -import org.springframework.web.bind.annotation.ResponseBody; -import org.springframework.web.bind.annotation.RestController; - -import com.alibaba.fastjson2.JSONObject; -import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander; +import org.springframework.web.bind.annotation.*; import javax.servlet.http.HttpServletRequest; import javax.sip.InvalidArgumentException; import javax.sip.SipException; +import java.text.ParseException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; /** * @description:针对 ZLMediaServer的hook事件监听 @@ -571,6 +568,8 @@ public class ZLMHttpHookListener { public JSONObject onServerStarted(HttpServletRequest request, @RequestBody JSONObject jsonObject){ jsonObject.put("ip", request.getRemoteAddr()); + System.out.println(jsonObject.toJSONString() + ); ZLMServerConfig zlmServerConfig = JSON.to(ZLMServerConfig.class, jsonObject); zlmServerConfig.setIp(request.getRemoteAddr()); logger.info("[ZLM HOOK] zlm 启动 " + zlmServerConfig.getGeneralMediaServerId()); diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java index 49eb7b5b..9819343f 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java @@ -10,7 +10,6 @@ import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import org.springframework.util.ObjectUtils; -import org.springframework.util.StringUtils; import java.util.*; @@ -294,7 +293,8 @@ public class ZLMRTPServerFactory { */ public Boolean isStreamReady(MediaServerItem mediaServerItem, String app, String streamId) { JSONObject mediaInfo = zlmresTfulUtils.getMediaList(mediaServerItem, app, streamId); - return (mediaInfo.getInteger("code") == 0 + return mediaInfo != null && (mediaInfo.getInteger("code") == 0 + && mediaInfo.getJSONArray("data") != null && mediaInfo.getJSONArray("data").size() > 0); } diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMServerConfig.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMServerConfig.java index f5f772e0..fcf24012 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMServerConfig.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMServerConfig.java @@ -66,7 +66,7 @@ public class ZLMServerConfig { private String hookAdminParams; @JSONField(name = "hook.alive_interval") - private int hookAliveInterval; + private Float hookAliveInterval; @JSONField(name = "hook.enable") private String hookEnable; @@ -798,11 +798,11 @@ public class ZLMServerConfig { this.shellPhell = shellPhell; } - public int getHookAliveInterval() { + public Float getHookAliveInterval() { return hookAliveInterval; } - public void setHookAliveInterval(int hookAliveInterval) { + public void setHookAliveInterval(Float hookAliveInterval) { this.hookAliveInterval = hookAliveInterval; } diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/MediaServerItem.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/MediaServerItem.java index bf58187e..620b167c 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/MediaServerItem.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/MediaServerItem.java @@ -5,7 +5,6 @@ import com.genersoft.iot.vmp.gb28181.session.SsrcConfig; import com.genersoft.iot.vmp.media.zlm.ZLMServerConfig; import io.swagger.v3.oas.annotations.media.Schema; import org.springframework.util.ObjectUtils; -import org.springframework.util.StringUtils; import java.util.HashMap; @@ -55,7 +54,7 @@ public class MediaServerItem{ private String secret; @Schema(description = "keepalive hook触发间隔,单位秒") - private int hookAliveInterval; + private Float hookAliveInterval; @Schema(description = "是否使用多端口模式") private boolean rtpEnable; @@ -332,11 +331,11 @@ public class MediaServerItem{ this.sendRtpPortRange = sendRtpPortRange; } - public int getHookAliveInterval() { + public Float getHookAliveInterval() { return hookAliveInterval; } - public void setHookAliveInterval(int hookAliveInterval) { + public void setHookAliveInterval(Float hookAliveInterval) { this.hookAliveInterval = hookAliveInterval; } } diff --git a/src/main/java/com/genersoft/iot/vmp/service/IStreamPushService.java b/src/main/java/com/genersoft/iot/vmp/service/IStreamPushService.java index 8885ed51..cf6f0ed2 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/IStreamPushService.java +++ b/src/main/java/com/genersoft/iot/vmp/service/IStreamPushService.java @@ -11,6 +11,9 @@ import com.github.pagehelper.PageInfo; import java.util.List; import java.util.Map; +/** + * @author lin + */ public interface IStreamPushService { List handleJSON(String json, MediaServerItem mediaServerItem); diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java index 1f53c7f4..702f8a22 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java @@ -1,19 +1,32 @@ package com.genersoft.iot.vmp.service.impl; -import java.time.LocalDateTime; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Set; - +import com.alibaba.fastjson2.JSON; +import com.alibaba.fastjson2.JSONArray; +import com.alibaba.fastjson2.JSONObject; +import com.genersoft.iot.vmp.common.VideoManagerConstants; import com.genersoft.iot.vmp.conf.DynamicTask; +import com.genersoft.iot.vmp.conf.SipConfig; +import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.conf.exception.ControllerException; +import com.genersoft.iot.vmp.gb28181.event.EventPublisher; +import com.genersoft.iot.vmp.gb28181.session.SsrcConfig; +import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager; +import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils; +import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory; +import com.genersoft.iot.vmp.media.zlm.ZLMServerConfig; +import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import com.genersoft.iot.vmp.media.zlm.dto.ServerKeepaliveData; +import com.genersoft.iot.vmp.service.IMediaServerService; import com.genersoft.iot.vmp.service.bean.MediaServerLoad; +import com.genersoft.iot.vmp.service.bean.SSRCInfo; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; +import com.genersoft.iot.vmp.storager.dao.MediaServerMapper; +import com.genersoft.iot.vmp.utils.DateUtil; +import com.genersoft.iot.vmp.utils.redis.RedisUtil; import com.genersoft.iot.vmp.vmanager.bean.ErrorCode; +import okhttp3.OkHttpClient; +import okhttp3.Request; +import okhttp3.Response; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -24,28 +37,8 @@ import org.springframework.transaction.TransactionDefinition; import org.springframework.transaction.TransactionStatus; import org.springframework.util.ObjectUtils; -import com.alibaba.fastjson2.JSON; -import com.alibaba.fastjson2.JSONArray; -import com.alibaba.fastjson2.JSONObject; -import com.genersoft.iot.vmp.common.VideoManagerConstants; -import com.genersoft.iot.vmp.conf.SipConfig; -import com.genersoft.iot.vmp.conf.UserSetting; -import com.genersoft.iot.vmp.gb28181.event.EventPublisher; -import com.genersoft.iot.vmp.gb28181.session.SsrcConfig; -import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager; -import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils; -import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory; -import com.genersoft.iot.vmp.media.zlm.ZLMServerConfig; -import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; -import com.genersoft.iot.vmp.service.IMediaServerService; -import com.genersoft.iot.vmp.service.bean.SSRCInfo; -import com.genersoft.iot.vmp.storager.dao.MediaServerMapper; -import com.genersoft.iot.vmp.utils.DateUtil; -import com.genersoft.iot.vmp.utils.redis.RedisUtil; - -import okhttp3.OkHttpClient; -import okhttp3.Request; -import okhttp3.Response; +import java.time.LocalDateTime; +import java.util.*; /** * 媒体服务器节点管理 @@ -129,6 +122,7 @@ public class MediaServerServiceImpl implements IMediaServerService { @Override public SSRCInfo openRTPServer(MediaServerItem mediaServerItem, String streamId, String presetSsrc, boolean ssrcCheck, boolean isPlayback, Integer port) { if (mediaServerItem == null || mediaServerItem.getId() == null) { + logger.info("[openRTPServer] 失败, mediaServerItem == null || mediaServerItem.getId() == null"); return null; } // 获取mediaServer可用的ssrc @@ -306,7 +300,7 @@ public class MediaServerServiceImpl implements IMediaServerService { public void add(MediaServerItem mediaServerItem) { mediaServerItem.setCreateTime(DateUtil.getNow()); mediaServerItem.setUpdateTime(DateUtil.getNow()); - mediaServerItem.setHookAliveInterval(120); + mediaServerItem.setHookAliveInterval(30f); JSONObject responseJSON = zlmresTfulUtils.getMediaServerConfig(mediaServerItem); if (responseJSON != null) { JSONArray data = responseJSON.getJSONArray("data"); @@ -413,7 +407,7 @@ public class MediaServerServiceImpl implements IMediaServerService { } final String zlmKeepaliveKey = zlmKeepaliveKeyPrefix + serverItem.getId(); dynamicTask.stop(zlmKeepaliveKey); - dynamicTask.startDelay(zlmKeepaliveKey, new KeepAliveTimeoutRunnable(serverItem), (serverItem.getHookAliveInterval() + 5) * 1000); + dynamicTask.startDelay(zlmKeepaliveKey, new KeepAliveTimeoutRunnable(serverItem), (Math.getExponent(serverItem.getHookAliveInterval()) + 5) * 1000); publisher.zlmOnlineEventPublish(serverItem.getId()); logger.info("[ZLM] 连接成功 {} - {}:{} ", zlmServerConfig.getGeneralMediaServerId(), zlmServerConfig.getIp(), zlmServerConfig.getHttpPort()); @@ -666,7 +660,7 @@ public class MediaServerServiceImpl implements IMediaServerService { } final String zlmKeepaliveKey = zlmKeepaliveKeyPrefix + mediaServerItem.getId(); dynamicTask.stop(zlmKeepaliveKey); - dynamicTask.startDelay(zlmKeepaliveKey, new KeepAliveTimeoutRunnable(mediaServerItem), (mediaServerItem.getHookAliveInterval() + 5) * 1000); + dynamicTask.startDelay(zlmKeepaliveKey, new KeepAliveTimeoutRunnable(mediaServerItem), (mediaServerItem.getHookAliveInterval().intValue() + 5) * 1000); } private MediaServerItem getOneFromDatabase(String mediaServerId) { diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java index 97682900..2dd4e617 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java @@ -1,49 +1,28 @@ package com.genersoft.iot.vmp.service.impl; -import java.math.BigDecimal; -import java.math.RoundingMode; -import java.text.ParseException; -import java.util.*; - -import javax.sip.InvalidArgumentException; -import javax.sip.ResponseEvent; -import javax.sip.SipException; - -import com.genersoft.iot.vmp.conf.exception.ControllerException; -import com.genersoft.iot.vmp.conf.exception.ServiceException; -import com.genersoft.iot.vmp.conf.exception.SsrcTransactionNotFoundException; -import com.genersoft.iot.vmp.gb28181.bean.*; -import com.genersoft.iot.vmp.service.IDeviceService; -import com.genersoft.iot.vmp.utils.redis.RedisUtil; -import com.genersoft.iot.vmp.vmanager.bean.ErrorCode; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Qualifier; -import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; -import org.springframework.stereotype.Service; -import org.springframework.util.ObjectUtils; -import org.springframework.web.context.request.async.DeferredResult; - import com.alibaba.fastjson2.JSON; import com.alibaba.fastjson2.JSONArray; import com.alibaba.fastjson2.JSONObject; import com.genersoft.iot.vmp.common.StreamInfo; import com.genersoft.iot.vmp.conf.DynamicTask; import com.genersoft.iot.vmp.conf.UserSetting; +import com.genersoft.iot.vmp.conf.exception.ControllerException; +import com.genersoft.iot.vmp.conf.exception.ServiceException; +import com.genersoft.iot.vmp.conf.exception.SsrcTransactionNotFoundException; +import com.genersoft.iot.vmp.gb28181.bean.*; import com.genersoft.iot.vmp.gb28181.event.SipSubscribe; import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager; import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder; import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage; import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander; import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommanderFroPlatform; +import com.genersoft.iot.vmp.media.zlm.AssistRESTfulUtils; +import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils; +import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe; import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory; import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange; -import com.genersoft.iot.vmp.utils.DateUtil; -import com.genersoft.iot.vmp.media.zlm.AssistRESTfulUtils; -import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe; -import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; +import com.genersoft.iot.vmp.service.IDeviceService; import com.genersoft.iot.vmp.service.IMediaServerService; import com.genersoft.iot.vmp.service.IMediaService; import com.genersoft.iot.vmp.service.IPlayService; @@ -53,8 +32,29 @@ import com.genersoft.iot.vmp.service.bean.PlayBackResult; import com.genersoft.iot.vmp.service.bean.SSRCInfo; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.IVideoManagerStorage; +import com.genersoft.iot.vmp.utils.DateUtil; +import com.genersoft.iot.vmp.utils.redis.RedisUtil; +import com.genersoft.iot.vmp.vmanager.bean.ErrorCode; import com.genersoft.iot.vmp.vmanager.bean.WVPResult; import com.genersoft.iot.vmp.vmanager.gb28181.play.bean.PlayResult; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; +import org.springframework.stereotype.Service; +import org.springframework.util.ObjectUtils; +import org.springframework.web.context.request.async.DeferredResult; + +import javax.sip.InvalidArgumentException; +import javax.sip.ResponseEvent; +import javax.sip.SipException; +import java.math.BigDecimal; +import java.math.RoundingMode; +import java.text.ParseException; +import java.util.List; +import java.util.Objects; +import java.util.UUID; @SuppressWarnings(value = {"rawtypes", "unchecked"}) @Service @@ -212,6 +212,15 @@ public class PlayServiceImpl implements IPlayService { } SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, streamId, device.isSsrcCheck(), false); logger.info(JSONObject.toJSONString(ssrcInfo)); + if (ssrcInfo == null) { + WVPResult wvpResult = new WVPResult(); + wvpResult.setCode(ErrorCode.ERROR100.getCode()); + wvpResult.setMsg("开启收流失败"); + msg.setData(wvpResult); + + resultHolder.invokeAllResult(msg); + return playResult; + } play(mediaServerItem, ssrcInfo, device, channelId, (mediaServerItemInUse, response) -> { if (hookEvent != null) { hookEvent.response(mediaServerItem, response); @@ -249,45 +258,33 @@ public class PlayServiceImpl implements IPlayService { ZlmHttpHookSubscribe.Event hookEvent, SipSubscribe.Event errorEvent, InviteTimeOutCallback timeoutCallback, String uuid) { - String streamId = null; - if (mediaServerItem.isRtpEnable()) { - streamId = String.format("%s_%s", device.getDeviceId(), channelId); - } - if (ssrcInfo == null) { - ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, streamId, device.isSsrcCheck(), false); - } logger.info("[点播开始] deviceId: {}, channelId: {},收流端口: {}, 收流模式:{}, SSRC: {}, SSRC校验:{}", device.getDeviceId(), channelId, ssrcInfo.getPort(), device.getStreamMode(), ssrcInfo.getSsrc(), device.isSsrcCheck()); // 超时处理 String timeOutTaskKey = UUID.randomUUID().toString(); - SSRCInfo finalSsrcInfo = ssrcInfo; dynamicTask.startDelay(timeOutTaskKey, () -> { - - logger.info("[点播超时] 收流超时 deviceId: {}, channelId: {},端口:{}, SSRC: {}", device.getDeviceId(), channelId, finalSsrcInfo.getPort(), finalSsrcInfo.getSsrc()); - timeoutCallback.run(1, "收流超时"); - // 点播超时回复BYE 同时释放ssrc以及此次点播的资源 - try { - cmder.streamByeCmd(device, channelId, finalSsrcInfo.getStream(), null); - } catch (InvalidArgumentException | ParseException | SipException e) { - logger.error("[点播超时], 发送BYE失败 {}", e.getMessage()); - } catch (SsrcTransactionNotFoundException e) { - timeoutCallback.run(0, "点播超时"); - mediaServerService.releaseSsrc(mediaServerItem.getId(), finalSsrcInfo.getSsrc()); - mediaServerService.closeRTPServer(mediaServerItem, finalSsrcInfo.getStream()); - streamSession.remove(device.getDeviceId(), channelId, finalSsrcInfo.getStream()); + // 执行超时任务时查询是否已经成功,成功了则不执行超时任务,防止超时任务取消失败的情况 + if (redisCatchStorage.queryPlayByDevice(device.getDeviceId(), channelId) == null) { + logger.info("[点播超时] 收流超时 deviceId: {}, channelId: {},端口:{}, SSRC: {}", device.getDeviceId(), channelId, ssrcInfo.getPort(), ssrcInfo.getSsrc()); + // 点播超时回复BYE 同时释放ssrc以及此次点播的资源 + try { + cmder.streamByeCmd(device, channelId, ssrcInfo.getStream(), null); + } catch (InvalidArgumentException | ParseException | SipException | SsrcTransactionNotFoundException e) { + logger.error("[点播超时], 发送BYE失败 {}", e.getMessage()); + } finally { + timeoutCallback.run(1, "收流超时"); + mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc()); + mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream()); + streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream()); + mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream()); + } } }, userSetting.getPlayTimeout()); - final String ssrc = ssrcInfo.getSsrc(); - final String stream = ssrcInfo.getStream(); - //端口获取失败的ssrcInfo 没有必要发送点播指令 - if (ssrcInfo.getPort() <= 0) { - logger.info("[点播端口分配异常],deviceId={},channelId={},ssrcInfo={}", device.getDeviceId(), channelId, ssrcInfo); - return; - } + try { cmder.playStreamCmd(mediaServerItem, ssrcInfo, device, channelId, (MediaServerItem mediaServerItemInuse, JSONObject response) -> { logger.info("收到订阅消息: " + response.toJSONString()); System.out.println("停止超时任务: " + timeOutTaskKey); - dynamicTask.stop(timeOutTaskKey); + // hook响应 onPublishHandlerForPlay(mediaServerItemInuse, response, device.getDeviceId(), channelId, uuid); hookEvent.response(mediaServerItemInuse, response); @@ -303,18 +300,18 @@ public class PlayServiceImpl implements IPlayService { //ssrc规定长度为10字节,不取余下长度以避免后续还有“f=”字段 TODO 后续对不规范的非10位ssrc兼容 String ssrcInResponse = contentString.substring(ssrcIndex + 2, ssrcIndex + 12); // 查询到ssrc不一致且开启了ssrc校验则需要针对处理 - if (ssrc.equals(ssrcInResponse)) { + if (ssrcInfo.getSsrc().equals(ssrcInResponse)) { return; } logger.info("[点播消息] 收到invite 200, 发现下级自定义了ssrc: {}", ssrcInResponse); if (!mediaServerItem.isRtpEnable() || device.isSsrcCheck()) { - logger.info("[点播消息] SSRC修正 {}->{}", ssrc, ssrcInResponse); + logger.info("[点播消息] SSRC修正 {}->{}", ssrcInfo.getSsrc(), ssrcInResponse); if (!mediaServerItem.getSsrcConfig().checkSsrc(ssrcInResponse)) { // ssrc 不可用 // 释放ssrc - mediaServerService.releaseSsrc(mediaServerItem.getId(), finalSsrcInfo.getSsrc()); - streamSession.remove(device.getDeviceId(), channelId, finalSsrcInfo.getStream()); + mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc()); + streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream()); event.msg = "下级自定义了ssrc,但是此ssrc不可用"; event.statusCode = 400; errorEvent.response(event); @@ -324,7 +321,7 @@ public class PlayServiceImpl implements IPlayService { // 单端口模式streamId也有变化,需要重新设置监听 if (!mediaServerItem.isRtpEnable()) { // 添加订阅 - HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed("rtp", stream, true, "rtsp", mediaServerItem.getId()); + HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed("rtp", ssrcInfo.getStream(), true, "rtsp", mediaServerItem.getId()); subscribe.removeSubscribe(hookSubscribe); hookSubscribe.getContent().put("stream", String.format("%08x", Integer.parseInt(ssrcInResponse)).toUpperCase()); subscribe.addSubscribe(hookSubscribe, (MediaServerItem mediaServerItemInUse, JSONObject response) -> { @@ -336,30 +333,30 @@ public class PlayServiceImpl implements IPlayService { }); } // 关闭rtp server - mediaServerService.closeRTPServer(mediaServerItem, finalSsrcInfo.getStream()); + mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream()); // 重新开启ssrc server - mediaServerService.openRTPServer(mediaServerItem, finalSsrcInfo.getStream(), ssrcInResponse, device.isSsrcCheck(), false, finalSsrcInfo.getPort()); + mediaServerService.openRTPServer(mediaServerItem, ssrcInfo.getStream(), ssrcInResponse, device.isSsrcCheck(), false, ssrcInfo.getPort()); } } }, (event) -> { dynamicTask.stop(timeOutTaskKey); - mediaServerService.closeRTPServer(mediaServerItem, finalSsrcInfo.getStream()); + mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream()); // 释放ssrc - mediaServerService.releaseSsrc(mediaServerItem.getId(), finalSsrcInfo.getSsrc()); + mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc()); - streamSession.remove(device.getDeviceId(), channelId, finalSsrcInfo.getStream()); + streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream()); errorEvent.response(event); }); } catch (InvalidArgumentException | SipException | ParseException e) { logger.error("[命令发送失败] 点播消息: {}", e.getMessage()); dynamicTask.stop(timeOutTaskKey); - mediaServerService.closeRTPServer(mediaServerItem, finalSsrcInfo.getStream()); + mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream()); // 释放ssrc - mediaServerService.releaseSsrc(mediaServerItem.getId(), finalSsrcInfo.getSsrc()); + mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc()); - streamSession.remove(device.getDeviceId(), channelId, finalSsrcInfo.getStream()); + streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream()); SipSubscribe.EventResult eventResult = new SipSubscribe.EventResult(new CmdSendFailEvent(null)); eventResult.msg = "命令发送失败"; errorEvent.response(eventResult); diff --git a/src/main/java/com/genersoft/iot/vmp/storager/dao/PlatformGbStreamMapper.java b/src/main/java/com/genersoft/iot/vmp/storager/dao/PlatformGbStreamMapper.java index 7e9c8121..98afd6b7 100644 --- a/src/main/java/com/genersoft/iot/vmp/storager/dao/PlatformGbStreamMapper.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/dao/PlatformGbStreamMapper.java @@ -23,10 +23,10 @@ public interface PlatformGbStreamMapper { @Insert("") int batchAdd(List streamPushItems); From 694076dc8c447754b02dff24a891249f54427ffb Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: Wed, 16 Nov 2022 09:39:27 +0800 Subject: [PATCH 2/3] =?UTF-8?q?=E4=BC=98=E5=8C=96=E5=9B=BD=E6=A0=87?= =?UTF-8?q?=E7=BA=A7=E8=81=94=E5=8F=91=E6=B5=81=E5=B9=B6=E5=8F=91=E8=83=BD?= =?UTF-8?q?=E5=8A=9B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- sql/mysql.sql | 1 - sql/update.sql | 3 + .../genersoft/iot/vmp/conf/UserSetting.java | 10 ++ .../iot/vmp/gb28181/event/SipSubscribe.java | 7 +- .../event/record/RecordEndEventListener.java | 8 +- .../transmit/cmd/impl/SIPCommander.java | 33 ++--- .../cmd/impl/SIPCommanderFroPlatform.java | 6 +- .../request/impl/AckRequestProcessor.java | 22 +++- .../request/impl/InviteRequestProcessor.java | 15 ++- .../vmp/media/zlm/ZLMHttpHookListener.java | 26 ++++ .../vmp/media/zlm/ZLMRTPServerFactory.java | 119 +++++++++--------- .../iot/vmp/media/zlm/ZLMRunner.java | 12 +- .../media/zlm/dto/HookSubscribeFactory.java | 11 ++ .../dto/HookSubscribeForRtpServerTimeout.java | 44 +++++++ .../zlm/dto/HookSubscribeForStreamChange.java | 1 + .../iot/vmp/media/zlm/dto/HookType.java | 2 + .../vmp/media/zlm/dto/MediaServerItem.java | 12 -- .../dto/hook/OnRtpServerTimeoutHookParam.java | 53 ++++++++ .../vmp/service/impl/DeviceServiceImpl.java | 13 +- .../service/impl/MediaServerServiceImpl.java | 6 +- .../iot/vmp/service/impl/PlayServiceImpl.java | 2 +- .../vmp/storager/dao/MediaServerMapper.java | 4 - .../gb28181/media/MediaController.java | 4 +- .../vmp/vmanager/server/ServerController.java | 6 - src/main/resources/all-application.yml | 3 + .../src/components/dialog/MediaServerEdit.vue | 20 --- 26 files changed, 277 insertions(+), 166 deletions(-) create mode 100644 src/main/java/com/genersoft/iot/vmp/media/zlm/dto/HookSubscribeForRtpServerTimeout.java create mode 100644 src/main/java/com/genersoft/iot/vmp/media/zlm/dto/hook/OnRtpServerTimeoutHookParam.java diff --git a/sql/mysql.sql b/sql/mysql.sql index e3e9e234..b5ca0161 100644 --- a/sql/mysql.sql +++ b/sql/mysql.sql @@ -281,7 +281,6 @@ CREATE TABLE `media_server` ( `secret` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL, `rtpEnable` int NOT NULL, `rtpPortRange` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL, - `sendRtpPortRange` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL, `recordAssistPort` int NOT NULL, `defaultServer` int NOT NULL, `createTime` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL, diff --git a/sql/update.sql b/sql/update.sql index 1c2f855f..1a72b808 100644 --- a/sql/update.sql +++ b/sql/update.sql @@ -1,6 +1,9 @@ alter table media_server drop column streamNoneReaderDelayMS; +alter table media_server + drop column sendRtpPortRange; + alter table stream_proxy add enable_disable_none_reader bit(1) default null; diff --git a/src/main/java/com/genersoft/iot/vmp/conf/UserSetting.java b/src/main/java/com/genersoft/iot/vmp/conf/UserSetting.java index 48dd85b1..9f5fbae6 100644 --- a/src/main/java/com/genersoft/iot/vmp/conf/UserSetting.java +++ b/src/main/java/com/genersoft/iot/vmp/conf/UserSetting.java @@ -37,6 +37,8 @@ public class UserSetting { private Boolean pushAuthority = Boolean.TRUE; + private Boolean gbSendStreamStrict = Boolean.FALSE; + private String serverId = "000000"; private String thirdPartyGBIdReg = "[\\s\\S]*"; @@ -166,4 +168,12 @@ public class UserSetting { public void setPushAuthority(Boolean pushAuthority) { this.pushAuthority = pushAuthority; } + + public Boolean getGbSendStreamStrict() { + return gbSendStreamStrict; + } + + public void setGbSendStreamStrict(Boolean gbSendStreamStrict) { + this.gbSendStreamStrict = gbSendStreamStrict; + } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/event/SipSubscribe.java b/src/main/java/com/genersoft/iot/vmp/gb28181/event/SipSubscribe.java index e5b995e4..efa4d424 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/event/SipSubscribe.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/event/SipSubscribe.java @@ -7,10 +7,12 @@ import org.slf4j.LoggerFactory; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; -import javax.sip.*; +import javax.sip.DialogTerminatedEvent; +import javax.sip.ResponseEvent; +import javax.sip.TimeoutEvent; +import javax.sip.TransactionTerminatedEvent; import javax.sip.header.CallIdHeader; import javax.sip.message.Response; -import java.text.ParseException; import java.time.Instant; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -29,6 +31,7 @@ public class SipSubscribe { private Map okSubscribes = new ConcurrentHashMap<>(); private Map okTimeSubscribes = new ConcurrentHashMap<>(); + private Map errorTimeSubscribes = new ConcurrentHashMap<>(); // @Scheduled(cron="*/5 * * * * ?") //每五秒执行一次 diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/event/record/RecordEndEventListener.java b/src/main/java/com/genersoft/iot/vmp/gb28181/event/record/RecordEndEventListener.java index 4965026d..aeca07ab 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/event/record/RecordEndEventListener.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/event/record/RecordEndEventListener.java @@ -1,18 +1,18 @@ package com.genersoft.iot.vmp.gb28181.event.record; import com.genersoft.iot.vmp.gb28181.bean.RecordInfo; -import com.genersoft.iot.vmp.gb28181.bean.RecordItem; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.context.ApplicationListener; import org.springframework.stereotype.Component; import org.springframework.web.servlet.mvc.method.annotation.SseEmitter; -import java.io.IOException; -import java.util.*; +import java.util.HashMap; +import java.util.Hashtable; +import java.util.Map; /** - * @description: 录像查询结束时间 + * @description: 录像查询结束事件 * @author: pan * @data: 2022-02-23 */ diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java index b36242e2..91f09461 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java @@ -2,7 +2,6 @@ package com.genersoft.iot.vmp.gb28181.transmit.cmd.impl; import com.alibaba.fastjson2.JSONObject; import com.genersoft.iot.vmp.common.StreamInfo; -import com.genersoft.iot.vmp.conf.DynamicTask; import com.genersoft.iot.vmp.conf.SipConfig; import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.conf.exception.SsrcTransactionNotFoundException; @@ -12,45 +11,31 @@ import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager; import com.genersoft.iot.vmp.gb28181.transmit.SIPSender; import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander; import com.genersoft.iot.vmp.gb28181.transmit.cmd.SIPRequestHeaderProvider; +import com.genersoft.iot.vmp.gb28181.utils.NumericUtil; import com.genersoft.iot.vmp.gb28181.utils.SipUtils; +import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe; import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory; import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange; -import com.genersoft.iot.vmp.utils.DateUtil; -import com.genersoft.iot.vmp.gb28181.utils.NumericUtil; -import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import com.genersoft.iot.vmp.service.IMediaServerService; import com.genersoft.iot.vmp.service.bean.SSRCInfo; -import com.genersoft.iot.vmp.storager.IRedisCatchStorage; -import com.genersoft.iot.vmp.storager.IVideoManagerStorage; -import com.genersoft.iot.vmp.utils.GitUtil; -import gov.nist.javax.sip.SIPConstants; -import gov.nist.javax.sip.SipProviderImpl; -import gov.nist.javax.sip.SipStackImpl; +import com.genersoft.iot.vmp.utils.DateUtil; import gov.nist.javax.sip.message.SIPRequest; import gov.nist.javax.sip.message.SIPResponse; -import gov.nist.javax.sip.stack.SIPClientTransaction; -import gov.nist.javax.sip.stack.SIPClientTransactionImpl; -import gov.nist.javax.sip.stack.SIPDialog; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.DependsOn; -import org.springframework.context.annotation.Lazy; import org.springframework.stereotype.Component; import org.springframework.util.ObjectUtils; -import javax.sip.*; -import javax.sip.address.Address; -import javax.sip.address.SipURI; -import javax.sip.header.*; -import javax.sip.message.Message; +import javax.sip.InvalidArgumentException; +import javax.sip.ResponseEvent; +import javax.sip.SipException; +import javax.sip.SipFactory; +import javax.sip.header.CallIdHeader; import javax.sip.message.Request; -import javax.sip.message.Response; -import java.lang.reflect.Field; import java.text.ParseException; -import java.util.HashSet; /** * @description:设备能力接口,用于定义设备的控制、查询能力 @@ -183,7 +168,7 @@ public class SIPCommander implements ISIPCommander { public void ptzCmd(Device device, String channelId, int leftRight, int upDown, int inOut, int moveSpeed, int zoomSpeed) throws InvalidArgumentException, SipException, ParseException { String cmdStr = SipUtils.cmdString(leftRight, upDown, inOut, moveSpeed, zoomSpeed); - StringBuffer ptzXml = new StringBuffer(200); + StringBuilder ptzXml = new StringBuilder(200); String charset = device.getCharset(); ptzXml.append("\r\n"); ptzXml.append("\r\n"); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java index c0ccdce2..87b5fdb0 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java @@ -1,7 +1,6 @@ package com.genersoft.iot.vmp.gb28181.transmit.cmd.impl; import com.alibaba.fastjson2.JSON; -import com.alibaba.fastjson2.JSONObject; import com.genersoft.iot.vmp.gb28181.bean.*; import com.genersoft.iot.vmp.gb28181.event.SipSubscribe; import com.genersoft.iot.vmp.gb28181.transmit.SIPSender; @@ -15,15 +14,12 @@ import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import com.genersoft.iot.vmp.service.IMediaServerService; import com.genersoft.iot.vmp.service.bean.GPSMsgInfo; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; -import gov.nist.javax.sip.SipProviderImpl; import gov.nist.javax.sip.message.MessageFactoryImpl; import gov.nist.javax.sip.message.SIPRequest; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.DependsOn; -import org.springframework.context.annotation.Lazy; import org.springframework.lang.Nullable; import org.springframework.stereotype.Component; import org.springframework.util.ObjectUtils; @@ -638,7 +634,7 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform { MediaServerItem mediaServerItem = mediaServerService.getOne(mediaServerId); if (mediaServerItem != null) { mediaServerService.releaseSsrc(mediaServerItem.getId(), sendRtpItem.getSsrc()); - zlmrtpServerFactory.closeRTPServer(mediaServerItem, sendRtpItem.getStreamId()); + zlmrtpServerFactory.closeRtpServer(mediaServerItem, sendRtpItem.getStreamId()); } SIPRequest byeRequest = headerProviderPlatformProvider.createByeRequest(platform, sendRtpItem); if (byeRequest == null) { diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java index 22f48f79..45e42b2d 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java @@ -10,8 +10,8 @@ import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander; import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform; import com.genersoft.iot.vmp.gb28181.transmit.event.request.ISIPRequestProcessor; import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent; -import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe; import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory; +import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import com.genersoft.iot.vmp.service.IMediaServerService; import com.genersoft.iot.vmp.service.bean.RequestPushStreamMsg; @@ -33,7 +33,8 @@ import javax.sip.header.FromHeader; import javax.sip.header.HeaderAddress; import javax.sip.header.ToHeader; import java.text.ParseException; -import java.util.*; +import java.util.HashMap; +import java.util.Map; /** * SIP命令类型: ACK请求 @@ -62,6 +63,9 @@ public class AckRequestProcessor extends SIPRequestProcessorParent implements In @Autowired private ZLMRTPServerFactory zlmrtpServerFactory; + @Autowired + private ZlmHttpHookSubscribe hookSubscribe; + @Autowired private IMediaServerService mediaServerService; @@ -130,8 +134,18 @@ public class AckRequestProcessor extends SIPRequestProcessorParent implements In startSendRtpStreamHand(evt, sendRtpItem, parentPlatform, jsonObject, param, callIdHeader); }); }else { - JSONObject jsonObject = zlmrtpServerFactory.startSendRtpStream(mediaInfo, param); - startSendRtpStreamHand(evt, sendRtpItem, parentPlatform, jsonObject, param, callIdHeader); + // 如果是非严格模式,需要关闭端口占用 + JSONObject startSendRtpStreamResult = null; + if (sendRtpItem.getLocalPort() != 0) { + if (zlmrtpServerFactory.releasePort(mediaInfo, sendRtpItem.getSsrc())) { + startSendRtpStreamResult = zlmrtpServerFactory.startSendRtpStream(mediaInfo, param); + } + }else { + startSendRtpStreamResult = zlmrtpServerFactory.startSendRtpStream(mediaInfo, param); + } + if (startSendRtpStreamResult != null) { + startSendRtpStreamHand(evt, sendRtpItem, parentPlatform, startSendRtpStreamResult, param, callIdHeader); + } } } private void startSendRtpStreamHand(RequestEvent evt, SendRtpItem sendRtpItem, ParentPlatform parentPlatform, diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java index 598c0876..f221880c 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java @@ -45,6 +45,7 @@ import javax.sip.header.CallIdHeader; import javax.sip.message.Response; import java.text.ParseException; import java.time.Instant; +import java.util.Random; import java.util.Vector; /** @@ -157,11 +158,6 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements StreamProxyItem proxyByAppAndStream =null; // 不是通道可能是直播流 if (channel != null && gbStream == null) { -// if (channel.getStatus() == 0) { -// logger.info("通道离线,返回400"); -// responseAck(request, Response.BAD_REQUEST, "channel [" + channel.getChannelId() + "] offline"); -// return; -// } // 通道存在,发100,TRYING try { responseAck(request, Response.TRYING); @@ -385,7 +381,12 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements } else { content.append("t=0 0\r\n"); } - content.append("m=video " + sendRtpItem.getLocalPort() + " RTP/AVP 96\r\n"); + int localPort = sendRtpItem.getLocalPort(); + if (localPort == 0) { + // 非严格模式端口不统一, 增加兼容性,修改为一个不为0的端口 + localPort = new Random().nextInt(65535) + 1; + } + content.append("m=video " + localPort + " RTP/AVP 96\r\n"); content.append("a=sendonly\r\n"); content.append("a=rtpmap:96 PS/90000\r\n"); content.append("y=" + sendRtpItem.getSsrc() + "\r\n"); @@ -476,9 +477,11 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements // 写入redis, 超时时回复 redisCatchStorage.updateSendRTPSever(sendRtpItem); + MediaServerItem finalMediaServerItem = mediaServerItem; playService.play(mediaServerItem, ssrcInfo, device, channelId, hookEvent, errorEvent, (code, msg) -> { logger.info("[上级点播]超时, 用户:{}, 通道:{}", username, channelId); redisCatchStorage.deleteSendRTPServer(platform.getServerGBId(), channelId, callIdHeader.getCallId(), null); + zlmrtpServerFactory.releasePort(finalMediaServerItem, sendRtpItem.getSsrc()); }, null); } else { sendRtpItem.setStreamId(playTransaction.getStream()); diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java index e2b5a289..f4a2672e 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java @@ -626,6 +626,32 @@ public class ZLMHttpHookListener { return ret; } + /** + * rtpServer收流超时 + */ + @ResponseBody + @PostMapping(value = "/on_rtp_server_timeout", produces = "application/json;charset=UTF-8") + public JSONObject onRtpServerTimeout(HttpServletRequest request, @RequestBody OnRtpServerTimeoutHookParam param){ + System.out.println(param); + logger.info("[ZLM HOOK] rtpServer收流超时:{}->{}({})", param.getMediaServerId(), param.getStream_id(), param.getSsrc()); + + JSONObject ret = new JSONObject(); + ret.put("code", 0); + ret.put("msg", "success"); + + taskExecutor.execute(()->{ + JSONObject json = (JSONObject) JSON.toJSON(param); + List subscribes = this.subscribe.getSubscribes(HookType.on_rtp_server_timeout); + if (subscribes != null && subscribes.size() > 0) { + for (ZlmHttpHookSubscribe.Event subscribe : subscribes) { + subscribe.response(null, json); + } + } + }); + + return ret; + } + private Map urlParamToMap(String params) { HashMap map = new HashMap<>(); if (ObjectUtils.isEmpty(params)) { diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java index 9819343f..03dd48c0 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java @@ -1,15 +1,15 @@ package com.genersoft.iot.vmp.media.zlm; +import com.alibaba.fastjson2.JSON; import com.alibaba.fastjson2.JSONArray; import com.alibaba.fastjson2.JSONObject; import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem; -import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; +import com.genersoft.iot.vmp.media.zlm.dto.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; -import org.springframework.util.ObjectUtils; import java.util.*; @@ -24,6 +24,9 @@ public class ZLMRTPServerFactory { @Autowired private UserSetting userSetting; + @Autowired + private ZlmHttpHookSubscribe hookSubscribe; + private int[] portRangeArray = new int[2]; public int getFreePort(MediaServerItem mediaServerItem, int startPort, int endPort, List usedFreelist) { @@ -141,7 +144,7 @@ public class ZLMRTPServerFactory { return result; } - public boolean closeRTPServer(MediaServerItem serverItem, String streamId) { + public boolean closeRtpServer(MediaServerItem serverItem, String streamId) { boolean result = false; if (serverItem !=null){ Map param = new HashMap<>(); @@ -161,32 +164,6 @@ public class ZLMRTPServerFactory { return result; } -// private int getPortFromportRange(MediaServerItem mediaServerItem) { -// int currentPort = mediaServerItem.getCurrentPort(); -// if (currentPort == 0) { -// String[] portRangeStrArray = mediaServerItem.getSendRtpPortRange().split(","); -// if (portRangeStrArray.length != 2) { -// portRangeArray[0] = 30000; -// portRangeArray[1] = 30500; -// }else { -// portRangeArray[0] = Integer.parseInt(portRangeStrArray[0]); -// portRangeArray[1] = Integer.parseInt(portRangeStrArray[1]); -// } -// } -// -// if (currentPort == 0 || currentPort++ > portRangeArray[1]) { -// currentPort = portRangeArray[0]; -// mediaServerItem.setCurrentPort(currentPort); -// return portRangeArray[0]; -// } else { -// if (currentPort % 2 == 1) { -// currentPort++; -// } -// currentPort++; -// mediaServerItem.setCurrentPort(currentPort); -// return currentPort; -// } -// } /** * 创建一个国标推流 @@ -200,21 +177,15 @@ public class ZLMRTPServerFactory { */ public SendRtpItem createSendRtpItem(MediaServerItem serverItem, String ip, int port, String ssrc, String platformId, String deviceId, String channelId, boolean tcp){ - // 使用RTPServer 功能找一个可用的端口 - String sendRtpPortRange = serverItem.getSendRtpPortRange(); - if (ObjectUtils.isEmpty(sendRtpPortRange)) { - return null; - } - String[] portRangeStrArray = serverItem.getSendRtpPortRange().split(","); - int localPort = -1; - if (portRangeStrArray.length != 2) { - localPort = getFreePort(serverItem, 30000, 30500, null); - }else { - localPort = getFreePort(serverItem, Integer.parseInt(portRangeStrArray[0]), Integer.parseInt(portRangeStrArray[1]), null); - } - if (localPort == -1) { - logger.error("没有可用的端口"); - return null; + // 默认为随机端口 + int localPort = 0; + if (userSetting.getGbSendStreamStrict()) { + if (userSetting.getGbSendStreamStrict()) { + localPort = keepPort(serverItem, ssrc); + if (localPort == 0) { + return null; + } + } } SendRtpItem sendRtpItem = new SendRtpItem(); sendRtpItem.setIp(ip); @@ -242,21 +213,13 @@ public class ZLMRTPServerFactory { * @return SendRtpItem */ public SendRtpItem createSendRtpItem(MediaServerItem serverItem, String ip, int port, String ssrc, String platformId, String app, String stream, String channelId, boolean tcp){ - // 使用RTPServer 功能找一个可用的端口 - String sendRtpPortRange = serverItem.getSendRtpPortRange(); - if (ObjectUtils.isEmpty(sendRtpPortRange)) { - return null; - } - String[] portRangeStrArray = serverItem.getSendRtpPortRange().split(","); - int localPort = -1; - if (portRangeStrArray.length != 2) { - localPort = getFreePort(serverItem, 30000, 30500, null); - }else { - localPort = getFreePort(serverItem, Integer.parseInt(portRangeStrArray[0]), Integer.parseInt(portRangeStrArray[1]), null); - } - if (localPort == -1) { - logger.error("没有可用的端口"); - return null; + // 默认为随机端口 + int localPort = 0; + if (userSetting.getGbSendStreamStrict()) { + localPort = keepPort(serverItem, ssrc); + if (localPort == 0) { + return null; + } } SendRtpItem sendRtpItem = new SendRtpItem(); sendRtpItem.setIp(ip); @@ -273,6 +236,42 @@ public class ZLMRTPServerFactory { return sendRtpItem; } + /** + * 保持端口,直到需要需要发流时再释放 + */ + public int keepPort(MediaServerItem serverItem, String ssrc) { + int localPort = 0; + Map param = new HashMap<>(3); + param.put("port", 0); + param.put("enable_tcp", 1); + param.put("stream_id", ssrc); + JSONObject jsonObject = zlmresTfulUtils.openRtpServer(serverItem, param); + if (jsonObject.getInteger("code") == 0) { + localPort = jsonObject.getInteger("port"); + HookSubscribeForRtpServerTimeout hookSubscribeForRtpServerTimeout = HookSubscribeFactory.on_rtp_server_timeout(ssrc, null, serverItem.getId()); + // 订阅 zlm启动事件, 新的zlm也会从这里进入系统 + hookSubscribe.addSubscribe(hookSubscribeForRtpServerTimeout, + (MediaServerItem mediaServerItem, JSONObject response)->{ + logger.info("[上级点播] {}->监听端口到期继续保持监听", ssrc); + keepPort(serverItem, ssrc); + }); + } + logger.info("[上级点播] {}->监听端口: {}", ssrc, localPort); + return localPort; + } + + /** + * 释放保持的端口 + */ + public boolean releasePort(MediaServerItem serverItem, String ssrc) { + logger.info("[上级点播] {}->释放监听端口,等待推流", ssrc); + boolean closeRTPServerResult = closeRtpServer(serverItem, ssrc); + HookSubscribeForRtpServerTimeout hookSubscribeForRtpServerTimeout = HookSubscribeFactory.on_rtp_server_timeout(ssrc, null, serverItem.getId()); + // 订阅 zlm启动事件, 新的zlm也会从这里进入系统 + hookSubscribe.removeSubscribe(hookSubscribeForRtpServerTimeout); + return closeRTPServerResult; + } + /** * 调用zlm RESTFUL API —— startSendRtp */ @@ -333,7 +332,7 @@ public class ZLMRTPServerFactory { result= true; logger.info("[停止RTP推流] 成功"); } else { - logger.error("[停止RTP推流] 失败: {}, 参数:{}->\r\n{}",jsonObject.getString("msg"),jsonObject.toJSONString(param)); + logger.error("[停止RTP推流] 失败: {}, 参数:{}->\r\n{}",jsonObject.getString("msg"), JSON.toJSON(param), jsonObject); } return result; } diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRunner.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRunner.java index b97d1355..04fde171 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRunner.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRunner.java @@ -18,7 +18,11 @@ import org.springframework.core.annotation.Order; import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Component; -import java.util.*; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; @Component @Order(value=1) @@ -73,8 +77,6 @@ public class ZLMRunner implements CommandLineRunner { } }); - - // 获取zlm信息 logger.info("[zlm] 等待默认zlm中..."); @@ -87,7 +89,7 @@ public class ZLMRunner implements CommandLineRunner { } for (MediaServerItem mediaServerItem : all) { if (startGetMedia == null) { - startGetMedia = new HashMap<>(); + startGetMedia = new ConcurrentHashMap<>(); } startGetMedia.put(mediaServerItem.getId(), true); connectZlmServer(mediaServerItem); @@ -95,7 +97,7 @@ public class ZLMRunner implements CommandLineRunner { } String taskKey = "zlm-connect-timeout"; dynamicTask.startDelay(taskKey, ()->{ - if (startGetMedia != null) { + if (startGetMedia != null && startGetMedia.size() > 0) { Set allZlmId = startGetMedia.keySet(); for (String id : allZlmId) { logger.error("[ {} ]]主动连接失败,不再尝试连接", id); diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/HookSubscribeFactory.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/HookSubscribeFactory.java index aa0f2a61..f3f389a7 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/HookSubscribeFactory.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/HookSubscribeFactory.java @@ -24,6 +24,17 @@ public class HookSubscribeFactory { return hookSubscribe; } + public static HookSubscribeForRtpServerTimeout on_rtp_server_timeout(String stream, String ssrc, String mediaServerId) { + HookSubscribeForRtpServerTimeout hookSubscribe = new HookSubscribeForRtpServerTimeout(); + JSONObject subscribeKey = new com.alibaba.fastjson2.JSONObject(); + subscribeKey.put("stream_id", stream); + subscribeKey.put("ssrc", ssrc); + subscribeKey.put("mediaServerId", mediaServerId); + hookSubscribe.setContent(subscribeKey); + + return hookSubscribe; + } + public static HookSubscribeForServerStarted on_server_started() { HookSubscribeForServerStarted hookSubscribe = new HookSubscribeForServerStarted(); hookSubscribe.setContent(new JSONObject()); diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/HookSubscribeForRtpServerTimeout.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/HookSubscribeForRtpServerTimeout.java new file mode 100644 index 00000000..d633560d --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/HookSubscribeForRtpServerTimeout.java @@ -0,0 +1,44 @@ +package com.genersoft.iot.vmp.media.zlm.dto; + +import com.alibaba.fastjson2.JSONObject; +import com.alibaba.fastjson2.annotation.JSONField; + +import java.time.Instant; + +/** + * hook订阅-收流超时 + * @author lin + */ +public class HookSubscribeForRtpServerTimeout implements IHookSubscribe{ + + private HookType hookType = HookType.on_rtp_server_timeout; + + private JSONObject content; + + @JSONField(format="yyyy-MM-dd HH:mm:ss") + private Instant expires; + + @Override + public HookType getHookType() { + return hookType; + } + + @Override + public JSONObject getContent() { + return content; + } + + public void setContent(JSONObject content) { + this.content = content; + } + + @Override + public Instant getExpires() { + return expires; + } + + @Override + public void setExpires(Instant expires) { + this.expires = expires; + } +} diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/HookSubscribeForStreamChange.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/HookSubscribeForStreamChange.java index dda29766..b73d74c1 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/HookSubscribeForStreamChange.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/HookSubscribeForStreamChange.java @@ -15,6 +15,7 @@ public class HookSubscribeForStreamChange implements IHookSubscribe{ private JSONObject content; + @JSONField(format="yyyy-MM-dd HH:mm:ss") private Instant expires; @Override diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/HookType.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/HookType.java index 797ab81b..a4557e9a 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/HookType.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/HookType.java @@ -19,5 +19,7 @@ public enum HookType { on_stream_none_reader, on_stream_not_found, on_server_started, + + on_rtp_server_timeout, on_server_keepalive } diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/MediaServerItem.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/MediaServerItem.java index 620b167c..f3eb3d67 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/MediaServerItem.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/MediaServerItem.java @@ -65,9 +65,6 @@ public class MediaServerItem{ @Schema(description = "多端口RTP收流端口范围") private String rtpPortRange; - @Schema(description = "RTP发流端口范围") - private String sendRtpPortRange; - @Schema(description = "assist服务端口") private int recordAssistPort; @@ -118,7 +115,6 @@ public class MediaServerItem{ hookAliveInterval = zlmServerConfig.getHookAliveInterval(); rtpEnable = false; // 默认使用单端口;直到用户自己设置开启多端口 rtpPortRange = zlmServerConfig.getPortRange().replace("_",","); // 默认使用30000,30500作为级联时发送流的端口号 - sendRtpPortRange = "30000,30500"; // 默认使用30000,30500作为级联时发送流的端口号 recordAssistPort = 0; // 默认关闭 } @@ -323,14 +319,6 @@ public class MediaServerItem{ this.lastKeepaliveTime = lastKeepaliveTime; } - public String getSendRtpPortRange() { - return sendRtpPortRange; - } - - public void setSendRtpPortRange(String sendRtpPortRange) { - this.sendRtpPortRange = sendRtpPortRange; - } - public Float getHookAliveInterval() { return hookAliveInterval; } diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/hook/OnRtpServerTimeoutHookParam.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/hook/OnRtpServerTimeoutHookParam.java new file mode 100644 index 00000000..e1912bbf --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/hook/OnRtpServerTimeoutHookParam.java @@ -0,0 +1,53 @@ +package com.genersoft.iot.vmp.media.zlm.dto.hook; + +/** + * zlm hook事件中的on_rtp_server_timeout事件的参数 + * @author lin + */ +public class OnRtpServerTimeoutHookParam extends HookParam{ + private int local_port; + private String stream_id; + private int tcpMode; + private boolean re_use_port; + private String ssrc; + + public int getLocal_port() { + return local_port; + } + + public void setLocal_port(int local_port) { + this.local_port = local_port; + } + + public String getStream_id() { + return stream_id; + } + + public void setStream_id(String stream_id) { + this.stream_id = stream_id; + } + + public int getTcpMode() { + return tcpMode; + } + + public void setTcpMode(int tcpMode) { + this.tcpMode = tcpMode; + } + + public boolean isRe_use_port() { + return re_use_port; + } + + public void setRe_use_port(boolean re_use_port) { + this.re_use_port = re_use_port; + } + + public String getSsrc() { + return ssrc; + } + + public void setSsrc(String ssrc) { + this.ssrc = ssrc; + } +} diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceServiceImpl.java index 98530797..ac48e4df 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceServiceImpl.java @@ -4,13 +4,12 @@ import com.genersoft.iot.vmp.conf.DynamicTask; import com.genersoft.iot.vmp.gb28181.bean.*; import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager; import com.genersoft.iot.vmp.gb28181.task.ISubscribeTask; -import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander; -import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.response.cmd.CatalogResponseMessageHandler; -import com.genersoft.iot.vmp.gb28181.utils.Coordtransform; -import com.genersoft.iot.vmp.service.IDeviceChannelService; -import com.genersoft.iot.vmp.service.IDeviceService; import com.genersoft.iot.vmp.gb28181.task.impl.CatalogSubscribeTask; import com.genersoft.iot.vmp.gb28181.task.impl.MobilePositionSubscribeTask; +import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander; +import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.response.cmd.CatalogResponseMessageHandler; +import com.genersoft.iot.vmp.service.IDeviceChannelService; +import com.genersoft.iot.vmp.service.IDeviceService; import com.genersoft.iot.vmp.service.IMediaServerService; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.IVideoManagerStorage; @@ -24,12 +23,10 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.jdbc.datasource.DataSourceTransactionManager; -import org.springframework.jdbc.support.incrementer.AbstractIdentityColumnMaxValueIncrementer; import org.springframework.stereotype.Service; import org.springframework.transaction.TransactionDefinition; import org.springframework.transaction.TransactionStatus; import org.springframework.util.ObjectUtils; -import org.springframework.util.StringUtils; import javax.sip.InvalidArgumentException; import javax.sip.SipException; @@ -171,7 +168,7 @@ public class DeviceServiceImpl implements IDeviceService { redisCatchStorage.updateDevice(device); deviceMapper.update(device); //进行通道离线 - deviceChannelMapper.offlineByDeviceId(deviceId); +// deviceChannelMapper.offlineByDeviceId(deviceId); // 离线释放所有ssrc List ssrcTransactions = streamSession.getSsrcTransactionForAll(deviceId, null, null, null); if (ssrcTransactions != null && ssrcTransactions.size() > 0) { diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java index 702f8a22..b30b2ccc 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java @@ -168,7 +168,7 @@ public class MediaServerServiceImpl implements IMediaServerService { if (mediaServerItem == null) { return; } - zlmrtpServerFactory.closeRTPServer(mediaServerItem, streamId); + zlmrtpServerFactory.closeRtpServer(mediaServerItem, streamId); releaseSsrc(mediaServerItem.getId(), streamId); } @@ -535,6 +535,7 @@ public class MediaServerServiceImpl implements IMediaServerService { param.put("hook.on_stream_not_found",String.format("%s/on_stream_not_found", hookPrex)); param.put("hook.on_server_keepalive",String.format("%s/on_server_keepalive", hookPrex)); param.put("hook.on_send_rtp_stopped",String.format("%s/on_send_rtp_stopped", hookPrex)); + param.put("hook.on_rtp_server_timeout",String.format("%s/on_rtp_server_timeout", hookPrex)); if (mediaServerItem.getRecordAssistPort() > 0) { param.put("hook.on_record_mp4",String.format("http://127.0.0.1:%s/api/record/on_record_mp4", mediaServerItem.getRecordAssistPort())); }else { @@ -545,8 +546,7 @@ public class MediaServerServiceImpl implements IMediaServerService { // 置0关闭此特性(推流断开会导致立即断开播放器) // 此参数不应大于播放器超时时间 // 优化此消息以更快的收到流注销事件 - param.put("general.continue_push_ms", "3000" ); - param.put("general.publishToHls", "0" ); + param.put("protocol.continue_push_ms", "3000" ); // 最多等待未初始化的Track时间,单位毫秒,超时之后会忽略未初始化的Track, 设置此选项优化那些音频错误的不规范流, // 等zlm支持给每个rtpServer设置关闭音频的时候可以不设置此选项 // param.put("general.wait_track_ready_ms", "3000" ); diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java index 2dd4e617..f4e32278 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java @@ -283,7 +283,7 @@ public class PlayServiceImpl implements IPlayService { try { cmder.playStreamCmd(mediaServerItem, ssrcInfo, device, channelId, (MediaServerItem mediaServerItemInuse, JSONObject response) -> { logger.info("收到订阅消息: " + response.toJSONString()); - System.out.println("停止超时任务: " + timeOutTaskKey); + dynamicTask.stop(timeOutTaskKey); // hook响应 onPublishHandlerForPlay(mediaServerItemInuse, response, device.getDeviceId(), channelId, uuid); diff --git a/src/main/java/com/genersoft/iot/vmp/storager/dao/MediaServerMapper.java b/src/main/java/com/genersoft/iot/vmp/storager/dao/MediaServerMapper.java index 95b3d150..e9254a56 100644 --- a/src/main/java/com/genersoft/iot/vmp/storager/dao/MediaServerMapper.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/dao/MediaServerMapper.java @@ -28,7 +28,6 @@ public interface MediaServerMapper { "secret, " + "rtpEnable, " + "rtpPortRange, " + - "sendRtpPortRange, " + "recordAssistPort, " + "defaultServer, " + "createTime, " + @@ -52,7 +51,6 @@ public interface MediaServerMapper { "'${secret}', " + "${rtpEnable}, " + "'${rtpPortRange}', " + - "'${sendRtpPortRange}', " + "${recordAssistPort}, " + "${defaultServer}, " + "'${createTime}', " + @@ -77,7 +75,6 @@ public interface MediaServerMapper { ", autoConfig=${autoConfig}" + ", rtpEnable=${rtpEnable}" + ", rtpPortRange='${rtpPortRange}'" + - ", sendRtpPortRange='${sendRtpPortRange}'" + ", secret='${secret}'" + ", recordAssistPort=${recordAssistPort}" + ", hookAliveInterval=${hookAliveInterval}" + @@ -101,7 +98,6 @@ public interface MediaServerMapper { ", autoConfig=${autoConfig}" + ", rtpEnable=${rtpEnable}" + ", rtpPortRange='${rtpPortRange}'" + - ", sendRtpPortRange='${sendRtpPortRange}'" + ", secret='${secret}'" + ", recordAssistPort=${recordAssistPort}" + ", hookAliveInterval=${hookAliveInterval}" + diff --git a/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/media/MediaController.java b/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/media/MediaController.java index 6f7132e7..7e73e058 100644 --- a/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/media/MediaController.java +++ b/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/media/MediaController.java @@ -62,7 +62,9 @@ public class MediaController { if (callId != null) { // 权限校验 StreamAuthorityInfo streamAuthorityInfo = redisCatchStorage.getStreamAuthorityInfo(app, stream); - if (streamAuthorityInfo.getCallId().equals(callId)) { + if (streamAuthorityInfo != null + && streamAuthorityInfo.getCallId() != null + && streamAuthorityInfo.getCallId().equals(callId)) { authority = true; }else { throw new ControllerException(ErrorCode.ERROR400); diff --git a/src/main/java/com/genersoft/iot/vmp/vmanager/server/ServerController.java b/src/main/java/com/genersoft/iot/vmp/vmanager/server/ServerController.java index a6713a40..d09e5657 100644 --- a/src/main/java/com/genersoft/iot/vmp/vmanager/server/ServerController.java +++ b/src/main/java/com/genersoft/iot/vmp/vmanager/server/ServerController.java @@ -135,14 +135,8 @@ public class ServerController { MediaServerItem mediaServerItemInDatabase = mediaServerService.getOne(mediaServerItem.getId()); if (mediaServerItemInDatabase != null) { - if (ObjectUtils.isEmpty(mediaServerItemInDatabase.getSendRtpPortRange()) && ObjectUtils.isEmpty(mediaServerItem.getSendRtpPortRange())) { - mediaServerItem.setSendRtpPortRange("30000,30500"); - } mediaServerService.update(mediaServerItem); } else { - if (ObjectUtils.isEmpty(mediaServerItem.getSendRtpPortRange())) { - mediaServerItem.setSendRtpPortRange("30000,30500"); - } mediaServerService.add(mediaServerItem); } } diff --git a/src/main/resources/all-application.yml b/src/main/resources/all-application.yml index e4006bf6..3294e747 100644 --- a/src/main/resources/all-application.yml +++ b/src/main/resources/all-application.yml @@ -192,6 +192,9 @@ user-settings: stream-on-demand: true # 推流鉴权, 默认开启 push-authority: true + # 国标级联发流严格模式,严格模式会使用与sdp信息中一致的端口发流,端口共享media.rtp.port-range,这会损失一些性能, + # 非严格模式使用随机端口发流,性能更好, 默认关闭 + gb-send-stream-strict: false # 关闭在线文档(生产环境建议关闭) springdoc: diff --git a/web_src/src/components/dialog/MediaServerEdit.vue b/web_src/src/components/dialog/MediaServerEdit.vue index 24f8c85f..9353a811 100644 --- a/web_src/src/components/dialog/MediaServerEdit.vue +++ b/web_src/src/components/dialog/MediaServerEdit.vue @@ -89,11 +89,6 @@ - - - - - - - @@ -177,15 +172,12 @@ export default { rtmpSSlPort: "", rtpEnable: false, rtpPortRange: "", - sendRtpPortRange: "", rtpProxyPort: "", rtspPort: "", rtspSSLPort: "", }, rtpPortRange1:30000, rtpPortRange2:30500, - sendRtpPortRange1:30000, - sendRtpPortRange2:30500, rules: { ip: [{ required: true, validator: isValidIp, message: '请输入有效的IP地址', trigger: 'blur' }], @@ -196,8 +188,6 @@ export default { rtmpSSlPort: [{ required: true, validator: isValidPort, message: '请输入有效的端口号', trigger: 'blur' }], rtpPortRange1: [{ required: true, validator: isValidPort, message: '请输入有效的端口号', trigger: 'blur' }], rtpPortRange2: [{ required: true, validator: isValidPort, message: '请输入有效的端口号', trigger: 'blur' }], - sendRtpPortRange1: [{ required: true, validator: isValidPort, message: '请输入有效的端口号', trigger: 'blur' }], - sendRtpPortRange2: [{ required: true, validator: isValidPort, message: '请输入有效的端口号', trigger: 'blur' }], rtpProxyPort: [{ required: true, validator: isValidPort, message: '请输入有效的端口号', trigger: 'blur' }], rtspPort: [{ required: true, validator: isValidPort, message: '请输入有效的端口号', trigger: 'blur' }], rtspSSLPort: [{ required: true, validator: isValidPort, message: '请输入有效的端口号', trigger: 'blur' }], @@ -229,9 +219,6 @@ export default { this.rtpPortRange2 = rtpPortRange[1] } } - let sendRtpPortRange = this.mediaServerForm.sendRtpPortRange.split(","); - this.sendRtpPortRange1 = sendRtpPortRange[0] - this.sendRtpPortRange2 = sendRtpPortRange[1] } }, checkServer: function() { @@ -251,8 +238,6 @@ export default { that.mediaServerForm = data.data; that.mediaServerForm.httpPort = httpPort; that.mediaServerForm.autoConfig = true; - that.sendRtpPortRange1 = 30000 - that.sendRtpPortRange2 = 30500 that.rtpPortRange1 = 30000 that.rtpPortRange2 = 30500 that.serverCheck = 1; @@ -336,13 +321,10 @@ export default { rtmpSSlPort: "", rtpEnable: false, rtpPortRange: "", - sendRtpPortRange: "", rtpProxyPort: "", rtspPort: "", rtspSSLPort: "", }; - this.sendRtpPortRange1 = 30000; - this.sendRtpPortRange2 = 30500; this.rtpPortRange1 = 30500; this.rtpPortRange2 = 30500; this.listChangeCallback = null @@ -367,9 +349,7 @@ export default { } }, portRangeChange: function() { - this.mediaServerForm.sendRtpPortRange = this.sendRtpPortRange1 + "," + this.sendRtpPortRange2 this.mediaServerForm.rtpPortRange = this.rtpPortRange1 + "," + this.rtpPortRange2 - console.log(this.mediaServerForm.sendRtpPortRange) console.log(this.mediaServerForm.rtpPortRange) } }, From c04de4cd70d1698741547998b1b6d67f46c64735 Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: Thu, 17 Nov 2022 10:04:05 +0800 Subject: [PATCH 3/3] =?UTF-8?q?=E6=B3=A8=E9=87=8A=E4=B8=8E=E6=97=A5?= =?UTF-8?q?=E5=BF=97=E4=BC=98=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java | 2 +- .../com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java index f4a2672e..a6320df1 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java @@ -183,7 +183,7 @@ public class ZLMHttpHookListener { if (!"rtp".equals(param.getApp())) { if (userSetting.getPushAuthority()) { -// 推流鉴权 + // 推流鉴权 if (param.getParams() == null) { logger.info("推流鉴权失败: 缺少不要参数:sign=md5(user表的pushKey)"); ret.put("code", 401); diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java index 03dd48c0..dd054f85 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java @@ -264,7 +264,7 @@ public class ZLMRTPServerFactory { * 释放保持的端口 */ public boolean releasePort(MediaServerItem serverItem, String ssrc) { - logger.info("[上级点播] {}->释放监听端口,等待推流", ssrc); + logger.info("[上级点播] {}->释放监听端口", ssrc); boolean closeRTPServerResult = closeRtpServer(serverItem, ssrc); HookSubscribeForRtpServerTimeout hookSubscribeForRtpServerTimeout = HookSubscribeFactory.on_rtp_server_timeout(ssrc, null, serverItem.getId()); // 订阅 zlm启动事件, 新的zlm也会从这里进入系统