diff --git a/pom.xml b/pom.xml index 64d414d7..4c26f332 100644 --- a/pom.xml +++ b/pom.xml @@ -11,7 +11,7 @@ com.genersoft wvp-pro - 2.3.2 + 2.6.6 web video platform 国标28181视频平台 diff --git a/src/main/java/com/genersoft/iot/vmp/conf/DynamicTask.java b/src/main/java/com/genersoft/iot/vmp/conf/DynamicTask.java index b2c23580..9ca936c6 100644 --- a/src/main/java/com/genersoft/iot/vmp/conf/DynamicTask.java +++ b/src/main/java/com/genersoft/iot/vmp/conf/DynamicTask.java @@ -1,10 +1,7 @@ package com.genersoft.iot.vmp.conf; -import com.genersoft.iot.vmp.gb28181.task.ISubscribeTask; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.context.annotation.Bean; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; import org.springframework.stereotype.Component; @@ -101,12 +98,14 @@ public class DynamicTask { } } - public void stop(String key) { - if (futureMap.get(key) != null && !futureMap.get(key).isCancelled()) { - futureMap.get(key).cancel(false); + public boolean stop(String key) { + boolean result = false; + if (futureMap.get(key) != null && !futureMap.get(key).isCancelled() && !futureMap.get(key).isDone()) { + result = futureMap.get(key).cancel(false); futureMap.remove(key); runnableMap.remove(key); } + return result; } public boolean contains(String key) { diff --git a/src/main/java/com/genersoft/iot/vmp/conf/MediaConfig.java b/src/main/java/com/genersoft/iot/vmp/conf/MediaConfig.java index 708f72be..29b2d8db 100644 --- a/src/main/java/com/genersoft/iot/vmp/conf/MediaConfig.java +++ b/src/main/java/com/genersoft/iot/vmp/conf/MediaConfig.java @@ -2,13 +2,11 @@ package com.genersoft.iot.vmp.conf; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import com.genersoft.iot.vmp.utils.DateUtil; -import com.genersoft.iot.vmp.vmanager.gb28181.device.DeviceQuery; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Configuration; import org.springframework.util.ObjectUtils; -import org.springframework.util.StringUtils; import java.net.InetAddress; import java.net.UnknownHostException; @@ -75,10 +73,6 @@ public class MediaConfig{ @Value("${media.rtp.port-range}") private String rtpPortRange; - - @Value("${media.rtp.send-port-range}") - private String sendRtpPortRange; - @Value("${media.record-assist-port:0}") private Integer recordAssistPort = 0; @@ -191,10 +185,6 @@ public class MediaConfig{ return sipDomain; } - public String getSendRtpPortRange() { - return sendRtpPortRange; - } - public MediaServerItem getMediaSerItem(){ MediaServerItem mediaServerItem = new MediaServerItem(); mediaServerItem.setId(id); @@ -214,9 +204,8 @@ public class MediaConfig{ mediaServerItem.setSecret(secret); mediaServerItem.setRtpEnable(rtpEnable); mediaServerItem.setRtpPortRange(rtpPortRange); - mediaServerItem.setSendRtpPortRange(sendRtpPortRange); mediaServerItem.setRecordAssistPort(recordAssistPort); - mediaServerItem.setHookAliveInterval(120); + mediaServerItem.setHookAliveInterval(30.00f); mediaServerItem.setCreateTime(DateUtil.getNow()); mediaServerItem.setUpdateTime(DateUtil.getNow()); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java index 6634322e..598c0876 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java @@ -8,14 +8,13 @@ import com.genersoft.iot.vmp.gb28181.event.SipSubscribe; import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager; import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver; import com.genersoft.iot.vmp.gb28181.transmit.SIPSender; -import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander; import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommanderFroPlatform; import com.genersoft.iot.vmp.gb28181.transmit.event.request.ISIPRequestProcessor; import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent; import com.genersoft.iot.vmp.gb28181.utils.SipUtils; -import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe; import com.genersoft.iot.vmp.media.zlm.ZLMMediaListManager; import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory; +import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe; import com.genersoft.iot.vmp.media.zlm.dto.*; import com.genersoft.iot.vmp.service.IMediaServerService; import com.genersoft.iot.vmp.service.IPlayService; @@ -39,9 +38,10 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import javax.sdp.*; -import javax.sip.*; +import javax.sip.InvalidArgumentException; +import javax.sip.RequestEvent; +import javax.sip.SipException; import javax.sip.header.CallIdHeader; -import javax.sip.message.Request; import javax.sip.message.Response; import java.text.ParseException; import java.time.Instant; @@ -656,6 +656,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements if (!platform.isStartOfflinePush()) { // 平台设置中关闭了拉起离线的推流则直接回复 try { + logger.info("[上级点播] 失败,推流设备未推流,channel: {}, app: {}, stream: {}", gbStream.getGbId(), gbStream.getApp(), gbStream.getStream()); responseAck(request, Response.TEMPORARILY_UNAVAILABLE, "channel stream not pushing"); } catch (SipException | InvalidArgumentException | ParseException e) { logger.error("[命令发送失败] invite 通道未推流: {}", e.getMessage()); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/RegisterRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/RegisterRequestProcessor.java index 6ca99e4f..0b657b56 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/RegisterRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/RegisterRequestProcessor.java @@ -85,7 +85,7 @@ public class RegisterRequestProcessor extends SIPRequestProcessorParent implemen String password = (device != null && !ObjectUtils.isEmpty(device.getPassword()))? device.getPassword() : sipConfig.getPassword(); AuthorizationHeader authHead = (AuthorizationHeader) request.getHeader(AuthorizationHeader.NAME); if (authHead == null && !ObjectUtils.isEmpty(password)) { - logger.info("[注册请求] 未携带授权头 回复401: {}", requestAddress); + logger.info("[注册请求] 回复401: {}", requestAddress); response = getMessageFactory().createResponse(Response.UNAUTHORIZED, request); new DigestServerAuthenticationHelper().generateChallenge(getHeaderFactory(), response, sipConfig.getDomain()); sipSender.transmitRequest(response); diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java index d925e4b7..e2b5a289 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java @@ -1,19 +1,19 @@ package com.genersoft.iot.vmp.media.zlm; -import java.text.ParseException; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - import com.alibaba.fastjson2.JSON; +import com.alibaba.fastjson2.JSONObject; import com.genersoft.iot.vmp.common.StreamInfo; import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.conf.exception.SsrcTransactionNotFoundException; import com.genersoft.iot.vmp.gb28181.bean.*; import com.genersoft.iot.vmp.gb28181.event.EventPublisher; import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager; +import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander; import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommanderFroPlatform; -import com.genersoft.iot.vmp.media.zlm.dto.*; +import com.genersoft.iot.vmp.media.zlm.dto.HookType; +import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; +import com.genersoft.iot.vmp.media.zlm.dto.StreamAuthorityInfo; +import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem; import com.genersoft.iot.vmp.media.zlm.dto.hook.*; import com.genersoft.iot.vmp.service.*; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; @@ -24,18 +24,15 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.util.ObjectUtils; -import org.springframework.web.bind.annotation.PostMapping; -import org.springframework.web.bind.annotation.RequestBody; -import org.springframework.web.bind.annotation.RequestMapping; -import org.springframework.web.bind.annotation.ResponseBody; -import org.springframework.web.bind.annotation.RestController; - -import com.alibaba.fastjson2.JSONObject; -import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander; +import org.springframework.web.bind.annotation.*; import javax.servlet.http.HttpServletRequest; import javax.sip.InvalidArgumentException; import javax.sip.SipException; +import java.text.ParseException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; /** * @description:针对 ZLMediaServer的hook事件监听 @@ -571,6 +568,8 @@ public class ZLMHttpHookListener { public JSONObject onServerStarted(HttpServletRequest request, @RequestBody JSONObject jsonObject){ jsonObject.put("ip", request.getRemoteAddr()); + System.out.println(jsonObject.toJSONString() + ); ZLMServerConfig zlmServerConfig = JSON.to(ZLMServerConfig.class, jsonObject); zlmServerConfig.setIp(request.getRemoteAddr()); logger.info("[ZLM HOOK] zlm 启动 " + zlmServerConfig.getGeneralMediaServerId()); diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java index 49eb7b5b..9819343f 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java @@ -10,7 +10,6 @@ import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import org.springframework.util.ObjectUtils; -import org.springframework.util.StringUtils; import java.util.*; @@ -294,7 +293,8 @@ public class ZLMRTPServerFactory { */ public Boolean isStreamReady(MediaServerItem mediaServerItem, String app, String streamId) { JSONObject mediaInfo = zlmresTfulUtils.getMediaList(mediaServerItem, app, streamId); - return (mediaInfo.getInteger("code") == 0 + return mediaInfo != null && (mediaInfo.getInteger("code") == 0 + && mediaInfo.getJSONArray("data") != null && mediaInfo.getJSONArray("data").size() > 0); } diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMServerConfig.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMServerConfig.java index f5f772e0..fcf24012 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMServerConfig.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMServerConfig.java @@ -66,7 +66,7 @@ public class ZLMServerConfig { private String hookAdminParams; @JSONField(name = "hook.alive_interval") - private int hookAliveInterval; + private Float hookAliveInterval; @JSONField(name = "hook.enable") private String hookEnable; @@ -798,11 +798,11 @@ public class ZLMServerConfig { this.shellPhell = shellPhell; } - public int getHookAliveInterval() { + public Float getHookAliveInterval() { return hookAliveInterval; } - public void setHookAliveInterval(int hookAliveInterval) { + public void setHookAliveInterval(Float hookAliveInterval) { this.hookAliveInterval = hookAliveInterval; } diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/MediaServerItem.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/MediaServerItem.java index bf58187e..620b167c 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/MediaServerItem.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/MediaServerItem.java @@ -5,7 +5,6 @@ import com.genersoft.iot.vmp.gb28181.session.SsrcConfig; import com.genersoft.iot.vmp.media.zlm.ZLMServerConfig; import io.swagger.v3.oas.annotations.media.Schema; import org.springframework.util.ObjectUtils; -import org.springframework.util.StringUtils; import java.util.HashMap; @@ -55,7 +54,7 @@ public class MediaServerItem{ private String secret; @Schema(description = "keepalive hook触发间隔,单位秒") - private int hookAliveInterval; + private Float hookAliveInterval; @Schema(description = "是否使用多端口模式") private boolean rtpEnable; @@ -332,11 +331,11 @@ public class MediaServerItem{ this.sendRtpPortRange = sendRtpPortRange; } - public int getHookAliveInterval() { + public Float getHookAliveInterval() { return hookAliveInterval; } - public void setHookAliveInterval(int hookAliveInterval) { + public void setHookAliveInterval(Float hookAliveInterval) { this.hookAliveInterval = hookAliveInterval; } } diff --git a/src/main/java/com/genersoft/iot/vmp/service/IStreamPushService.java b/src/main/java/com/genersoft/iot/vmp/service/IStreamPushService.java index 8885ed51..cf6f0ed2 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/IStreamPushService.java +++ b/src/main/java/com/genersoft/iot/vmp/service/IStreamPushService.java @@ -11,6 +11,9 @@ import com.github.pagehelper.PageInfo; import java.util.List; import java.util.Map; +/** + * @author lin + */ public interface IStreamPushService { List handleJSON(String json, MediaServerItem mediaServerItem); diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java index 1f53c7f4..702f8a22 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java @@ -1,19 +1,32 @@ package com.genersoft.iot.vmp.service.impl; -import java.time.LocalDateTime; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Set; - +import com.alibaba.fastjson2.JSON; +import com.alibaba.fastjson2.JSONArray; +import com.alibaba.fastjson2.JSONObject; +import com.genersoft.iot.vmp.common.VideoManagerConstants; import com.genersoft.iot.vmp.conf.DynamicTask; +import com.genersoft.iot.vmp.conf.SipConfig; +import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.conf.exception.ControllerException; +import com.genersoft.iot.vmp.gb28181.event.EventPublisher; +import com.genersoft.iot.vmp.gb28181.session.SsrcConfig; +import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager; +import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils; +import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory; +import com.genersoft.iot.vmp.media.zlm.ZLMServerConfig; +import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import com.genersoft.iot.vmp.media.zlm.dto.ServerKeepaliveData; +import com.genersoft.iot.vmp.service.IMediaServerService; import com.genersoft.iot.vmp.service.bean.MediaServerLoad; +import com.genersoft.iot.vmp.service.bean.SSRCInfo; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; +import com.genersoft.iot.vmp.storager.dao.MediaServerMapper; +import com.genersoft.iot.vmp.utils.DateUtil; +import com.genersoft.iot.vmp.utils.redis.RedisUtil; import com.genersoft.iot.vmp.vmanager.bean.ErrorCode; +import okhttp3.OkHttpClient; +import okhttp3.Request; +import okhttp3.Response; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -24,28 +37,8 @@ import org.springframework.transaction.TransactionDefinition; import org.springframework.transaction.TransactionStatus; import org.springframework.util.ObjectUtils; -import com.alibaba.fastjson2.JSON; -import com.alibaba.fastjson2.JSONArray; -import com.alibaba.fastjson2.JSONObject; -import com.genersoft.iot.vmp.common.VideoManagerConstants; -import com.genersoft.iot.vmp.conf.SipConfig; -import com.genersoft.iot.vmp.conf.UserSetting; -import com.genersoft.iot.vmp.gb28181.event.EventPublisher; -import com.genersoft.iot.vmp.gb28181.session.SsrcConfig; -import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager; -import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils; -import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory; -import com.genersoft.iot.vmp.media.zlm.ZLMServerConfig; -import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; -import com.genersoft.iot.vmp.service.IMediaServerService; -import com.genersoft.iot.vmp.service.bean.SSRCInfo; -import com.genersoft.iot.vmp.storager.dao.MediaServerMapper; -import com.genersoft.iot.vmp.utils.DateUtil; -import com.genersoft.iot.vmp.utils.redis.RedisUtil; - -import okhttp3.OkHttpClient; -import okhttp3.Request; -import okhttp3.Response; +import java.time.LocalDateTime; +import java.util.*; /** * 媒体服务器节点管理 @@ -129,6 +122,7 @@ public class MediaServerServiceImpl implements IMediaServerService { @Override public SSRCInfo openRTPServer(MediaServerItem mediaServerItem, String streamId, String presetSsrc, boolean ssrcCheck, boolean isPlayback, Integer port) { if (mediaServerItem == null || mediaServerItem.getId() == null) { + logger.info("[openRTPServer] 失败, mediaServerItem == null || mediaServerItem.getId() == null"); return null; } // 获取mediaServer可用的ssrc @@ -306,7 +300,7 @@ public class MediaServerServiceImpl implements IMediaServerService { public void add(MediaServerItem mediaServerItem) { mediaServerItem.setCreateTime(DateUtil.getNow()); mediaServerItem.setUpdateTime(DateUtil.getNow()); - mediaServerItem.setHookAliveInterval(120); + mediaServerItem.setHookAliveInterval(30f); JSONObject responseJSON = zlmresTfulUtils.getMediaServerConfig(mediaServerItem); if (responseJSON != null) { JSONArray data = responseJSON.getJSONArray("data"); @@ -413,7 +407,7 @@ public class MediaServerServiceImpl implements IMediaServerService { } final String zlmKeepaliveKey = zlmKeepaliveKeyPrefix + serverItem.getId(); dynamicTask.stop(zlmKeepaliveKey); - dynamicTask.startDelay(zlmKeepaliveKey, new KeepAliveTimeoutRunnable(serverItem), (serverItem.getHookAliveInterval() + 5) * 1000); + dynamicTask.startDelay(zlmKeepaliveKey, new KeepAliveTimeoutRunnable(serverItem), (Math.getExponent(serverItem.getHookAliveInterval()) + 5) * 1000); publisher.zlmOnlineEventPublish(serverItem.getId()); logger.info("[ZLM] 连接成功 {} - {}:{} ", zlmServerConfig.getGeneralMediaServerId(), zlmServerConfig.getIp(), zlmServerConfig.getHttpPort()); @@ -666,7 +660,7 @@ public class MediaServerServiceImpl implements IMediaServerService { } final String zlmKeepaliveKey = zlmKeepaliveKeyPrefix + mediaServerItem.getId(); dynamicTask.stop(zlmKeepaliveKey); - dynamicTask.startDelay(zlmKeepaliveKey, new KeepAliveTimeoutRunnable(mediaServerItem), (mediaServerItem.getHookAliveInterval() + 5) * 1000); + dynamicTask.startDelay(zlmKeepaliveKey, new KeepAliveTimeoutRunnable(mediaServerItem), (mediaServerItem.getHookAliveInterval().intValue() + 5) * 1000); } private MediaServerItem getOneFromDatabase(String mediaServerId) { diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java index 97682900..2dd4e617 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java @@ -1,49 +1,28 @@ package com.genersoft.iot.vmp.service.impl; -import java.math.BigDecimal; -import java.math.RoundingMode; -import java.text.ParseException; -import java.util.*; - -import javax.sip.InvalidArgumentException; -import javax.sip.ResponseEvent; -import javax.sip.SipException; - -import com.genersoft.iot.vmp.conf.exception.ControllerException; -import com.genersoft.iot.vmp.conf.exception.ServiceException; -import com.genersoft.iot.vmp.conf.exception.SsrcTransactionNotFoundException; -import com.genersoft.iot.vmp.gb28181.bean.*; -import com.genersoft.iot.vmp.service.IDeviceService; -import com.genersoft.iot.vmp.utils.redis.RedisUtil; -import com.genersoft.iot.vmp.vmanager.bean.ErrorCode; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Qualifier; -import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; -import org.springframework.stereotype.Service; -import org.springframework.util.ObjectUtils; -import org.springframework.web.context.request.async.DeferredResult; - import com.alibaba.fastjson2.JSON; import com.alibaba.fastjson2.JSONArray; import com.alibaba.fastjson2.JSONObject; import com.genersoft.iot.vmp.common.StreamInfo; import com.genersoft.iot.vmp.conf.DynamicTask; import com.genersoft.iot.vmp.conf.UserSetting; +import com.genersoft.iot.vmp.conf.exception.ControllerException; +import com.genersoft.iot.vmp.conf.exception.ServiceException; +import com.genersoft.iot.vmp.conf.exception.SsrcTransactionNotFoundException; +import com.genersoft.iot.vmp.gb28181.bean.*; import com.genersoft.iot.vmp.gb28181.event.SipSubscribe; import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager; import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder; import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage; import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander; import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommanderFroPlatform; +import com.genersoft.iot.vmp.media.zlm.AssistRESTfulUtils; +import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils; +import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe; import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory; import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange; -import com.genersoft.iot.vmp.utils.DateUtil; -import com.genersoft.iot.vmp.media.zlm.AssistRESTfulUtils; -import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe; -import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; +import com.genersoft.iot.vmp.service.IDeviceService; import com.genersoft.iot.vmp.service.IMediaServerService; import com.genersoft.iot.vmp.service.IMediaService; import com.genersoft.iot.vmp.service.IPlayService; @@ -53,8 +32,29 @@ import com.genersoft.iot.vmp.service.bean.PlayBackResult; import com.genersoft.iot.vmp.service.bean.SSRCInfo; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.IVideoManagerStorage; +import com.genersoft.iot.vmp.utils.DateUtil; +import com.genersoft.iot.vmp.utils.redis.RedisUtil; +import com.genersoft.iot.vmp.vmanager.bean.ErrorCode; import com.genersoft.iot.vmp.vmanager.bean.WVPResult; import com.genersoft.iot.vmp.vmanager.gb28181.play.bean.PlayResult; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; +import org.springframework.stereotype.Service; +import org.springframework.util.ObjectUtils; +import org.springframework.web.context.request.async.DeferredResult; + +import javax.sip.InvalidArgumentException; +import javax.sip.ResponseEvent; +import javax.sip.SipException; +import java.math.BigDecimal; +import java.math.RoundingMode; +import java.text.ParseException; +import java.util.List; +import java.util.Objects; +import java.util.UUID; @SuppressWarnings(value = {"rawtypes", "unchecked"}) @Service @@ -212,6 +212,15 @@ public class PlayServiceImpl implements IPlayService { } SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, streamId, device.isSsrcCheck(), false); logger.info(JSONObject.toJSONString(ssrcInfo)); + if (ssrcInfo == null) { + WVPResult wvpResult = new WVPResult(); + wvpResult.setCode(ErrorCode.ERROR100.getCode()); + wvpResult.setMsg("开启收流失败"); + msg.setData(wvpResult); + + resultHolder.invokeAllResult(msg); + return playResult; + } play(mediaServerItem, ssrcInfo, device, channelId, (mediaServerItemInUse, response) -> { if (hookEvent != null) { hookEvent.response(mediaServerItem, response); @@ -249,45 +258,33 @@ public class PlayServiceImpl implements IPlayService { ZlmHttpHookSubscribe.Event hookEvent, SipSubscribe.Event errorEvent, InviteTimeOutCallback timeoutCallback, String uuid) { - String streamId = null; - if (mediaServerItem.isRtpEnable()) { - streamId = String.format("%s_%s", device.getDeviceId(), channelId); - } - if (ssrcInfo == null) { - ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, streamId, device.isSsrcCheck(), false); - } logger.info("[点播开始] deviceId: {}, channelId: {},收流端口: {}, 收流模式:{}, SSRC: {}, SSRC校验:{}", device.getDeviceId(), channelId, ssrcInfo.getPort(), device.getStreamMode(), ssrcInfo.getSsrc(), device.isSsrcCheck()); // 超时处理 String timeOutTaskKey = UUID.randomUUID().toString(); - SSRCInfo finalSsrcInfo = ssrcInfo; dynamicTask.startDelay(timeOutTaskKey, () -> { - - logger.info("[点播超时] 收流超时 deviceId: {}, channelId: {},端口:{}, SSRC: {}", device.getDeviceId(), channelId, finalSsrcInfo.getPort(), finalSsrcInfo.getSsrc()); - timeoutCallback.run(1, "收流超时"); - // 点播超时回复BYE 同时释放ssrc以及此次点播的资源 - try { - cmder.streamByeCmd(device, channelId, finalSsrcInfo.getStream(), null); - } catch (InvalidArgumentException | ParseException | SipException e) { - logger.error("[点播超时], 发送BYE失败 {}", e.getMessage()); - } catch (SsrcTransactionNotFoundException e) { - timeoutCallback.run(0, "点播超时"); - mediaServerService.releaseSsrc(mediaServerItem.getId(), finalSsrcInfo.getSsrc()); - mediaServerService.closeRTPServer(mediaServerItem, finalSsrcInfo.getStream()); - streamSession.remove(device.getDeviceId(), channelId, finalSsrcInfo.getStream()); + // 执行超时任务时查询是否已经成功,成功了则不执行超时任务,防止超时任务取消失败的情况 + if (redisCatchStorage.queryPlayByDevice(device.getDeviceId(), channelId) == null) { + logger.info("[点播超时] 收流超时 deviceId: {}, channelId: {},端口:{}, SSRC: {}", device.getDeviceId(), channelId, ssrcInfo.getPort(), ssrcInfo.getSsrc()); + // 点播超时回复BYE 同时释放ssrc以及此次点播的资源 + try { + cmder.streamByeCmd(device, channelId, ssrcInfo.getStream(), null); + } catch (InvalidArgumentException | ParseException | SipException | SsrcTransactionNotFoundException e) { + logger.error("[点播超时], 发送BYE失败 {}", e.getMessage()); + } finally { + timeoutCallback.run(1, "收流超时"); + mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc()); + mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream()); + streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream()); + mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream()); + } } }, userSetting.getPlayTimeout()); - final String ssrc = ssrcInfo.getSsrc(); - final String stream = ssrcInfo.getStream(); - //端口获取失败的ssrcInfo 没有必要发送点播指令 - if (ssrcInfo.getPort() <= 0) { - logger.info("[点播端口分配异常],deviceId={},channelId={},ssrcInfo={}", device.getDeviceId(), channelId, ssrcInfo); - return; - } + try { cmder.playStreamCmd(mediaServerItem, ssrcInfo, device, channelId, (MediaServerItem mediaServerItemInuse, JSONObject response) -> { logger.info("收到订阅消息: " + response.toJSONString()); System.out.println("停止超时任务: " + timeOutTaskKey); - dynamicTask.stop(timeOutTaskKey); + // hook响应 onPublishHandlerForPlay(mediaServerItemInuse, response, device.getDeviceId(), channelId, uuid); hookEvent.response(mediaServerItemInuse, response); @@ -303,18 +300,18 @@ public class PlayServiceImpl implements IPlayService { //ssrc规定长度为10字节,不取余下长度以避免后续还有“f=”字段 TODO 后续对不规范的非10位ssrc兼容 String ssrcInResponse = contentString.substring(ssrcIndex + 2, ssrcIndex + 12); // 查询到ssrc不一致且开启了ssrc校验则需要针对处理 - if (ssrc.equals(ssrcInResponse)) { + if (ssrcInfo.getSsrc().equals(ssrcInResponse)) { return; } logger.info("[点播消息] 收到invite 200, 发现下级自定义了ssrc: {}", ssrcInResponse); if (!mediaServerItem.isRtpEnable() || device.isSsrcCheck()) { - logger.info("[点播消息] SSRC修正 {}->{}", ssrc, ssrcInResponse); + logger.info("[点播消息] SSRC修正 {}->{}", ssrcInfo.getSsrc(), ssrcInResponse); if (!mediaServerItem.getSsrcConfig().checkSsrc(ssrcInResponse)) { // ssrc 不可用 // 释放ssrc - mediaServerService.releaseSsrc(mediaServerItem.getId(), finalSsrcInfo.getSsrc()); - streamSession.remove(device.getDeviceId(), channelId, finalSsrcInfo.getStream()); + mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc()); + streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream()); event.msg = "下级自定义了ssrc,但是此ssrc不可用"; event.statusCode = 400; errorEvent.response(event); @@ -324,7 +321,7 @@ public class PlayServiceImpl implements IPlayService { // 单端口模式streamId也有变化,需要重新设置监听 if (!mediaServerItem.isRtpEnable()) { // 添加订阅 - HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed("rtp", stream, true, "rtsp", mediaServerItem.getId()); + HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed("rtp", ssrcInfo.getStream(), true, "rtsp", mediaServerItem.getId()); subscribe.removeSubscribe(hookSubscribe); hookSubscribe.getContent().put("stream", String.format("%08x", Integer.parseInt(ssrcInResponse)).toUpperCase()); subscribe.addSubscribe(hookSubscribe, (MediaServerItem mediaServerItemInUse, JSONObject response) -> { @@ -336,30 +333,30 @@ public class PlayServiceImpl implements IPlayService { }); } // 关闭rtp server - mediaServerService.closeRTPServer(mediaServerItem, finalSsrcInfo.getStream()); + mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream()); // 重新开启ssrc server - mediaServerService.openRTPServer(mediaServerItem, finalSsrcInfo.getStream(), ssrcInResponse, device.isSsrcCheck(), false, finalSsrcInfo.getPort()); + mediaServerService.openRTPServer(mediaServerItem, ssrcInfo.getStream(), ssrcInResponse, device.isSsrcCheck(), false, ssrcInfo.getPort()); } } }, (event) -> { dynamicTask.stop(timeOutTaskKey); - mediaServerService.closeRTPServer(mediaServerItem, finalSsrcInfo.getStream()); + mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream()); // 释放ssrc - mediaServerService.releaseSsrc(mediaServerItem.getId(), finalSsrcInfo.getSsrc()); + mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc()); - streamSession.remove(device.getDeviceId(), channelId, finalSsrcInfo.getStream()); + streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream()); errorEvent.response(event); }); } catch (InvalidArgumentException | SipException | ParseException e) { logger.error("[命令发送失败] 点播消息: {}", e.getMessage()); dynamicTask.stop(timeOutTaskKey); - mediaServerService.closeRTPServer(mediaServerItem, finalSsrcInfo.getStream()); + mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream()); // 释放ssrc - mediaServerService.releaseSsrc(mediaServerItem.getId(), finalSsrcInfo.getSsrc()); + mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc()); - streamSession.remove(device.getDeviceId(), channelId, finalSsrcInfo.getStream()); + streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream()); SipSubscribe.EventResult eventResult = new SipSubscribe.EventResult(new CmdSendFailEvent(null)); eventResult.msg = "命令发送失败"; errorEvent.response(eventResult); diff --git a/src/main/java/com/genersoft/iot/vmp/storager/dao/PlatformGbStreamMapper.java b/src/main/java/com/genersoft/iot/vmp/storager/dao/PlatformGbStreamMapper.java index 7e9c8121..98afd6b7 100644 --- a/src/main/java/com/genersoft/iot/vmp/storager/dao/PlatformGbStreamMapper.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/dao/PlatformGbStreamMapper.java @@ -23,10 +23,10 @@ public interface PlatformGbStreamMapper { @Insert("") int batchAdd(List streamPushItems);