From 8870f5f5a182f4af527dc2b89ad75063019df14f Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: Thu, 10 Nov 2022 09:40:01 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96=E4=BD=BF=E7=94=A8=E6=9D=A5?= =?UTF-8?q?=E6=BA=90ip=E4=BD=9C=E4=B8=BA=E6=B5=81ip?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../genersoft/iot/vmp/common/StreamInfo.java | 92 +++++++++- .../genersoft/iot/vmp/common/StreamURL.java | 4 +- .../com/genersoft/iot/vmp/conf/SipConfig.java | 9 - .../genersoft/iot/vmp/gb28181/SipLayer.java | 7 + .../iot/vmp/gb28181/transmit/SIPSender.java | 9 +- .../callback/DeferredResultHolder.java | 65 +++++-- .../request/impl/InviteRequestProcessor.java | 10 +- .../iot/vmp/service/IPlayService.java | 7 +- .../iot/vmp/service/impl/PlayServiceImpl.java | 166 ++++++++++-------- .../utils/redis/FastJsonRedisSerializer.java | 10 +- .../vmp/vmanager/bean/DeferredResultEx.java | 31 ++++ .../vmanager/bean/DeferredResultFilter.java | 6 + .../vmanager/gb28181/play/PlayController.java | 74 +++++--- .../vmp/web/gb28181/ApiStreamController.java | 2 +- src/main/resources/all-application.yml | 2 - 15 files changed, 343 insertions(+), 151 deletions(-) create mode 100644 src/main/java/com/genersoft/iot/vmp/vmanager/bean/DeferredResultEx.java create mode 100644 src/main/java/com/genersoft/iot/vmp/vmanager/bean/DeferredResultFilter.java diff --git a/src/main/java/com/genersoft/iot/vmp/common/StreamInfo.java b/src/main/java/com/genersoft/iot/vmp/common/StreamInfo.java index 20a286cd..46b0e00d 100644 --- a/src/main/java/com/genersoft/iot/vmp/common/StreamInfo.java +++ b/src/main/java/com/genersoft/iot/vmp/common/StreamInfo.java @@ -2,10 +2,10 @@ package com.genersoft.iot.vmp.common; import io.swagger.v3.oas.annotations.media.Schema; -import java.net.URL; +import java.io.Serializable; @Schema(description = "流信息") -public class StreamInfo { +public class StreamInfo implements Serializable { @Schema(description = "应用名") private String app; @@ -79,6 +79,94 @@ public class StreamInfo { @Schema(description = "是否暂停(录像回放使用)") private boolean pause; + public void setFlv(StreamURL flv) { + this.flv = flv; + } + + public void setHttps_flv(StreamURL https_flv) { + this.https_flv = https_flv; + } + + public void setWs_flv(StreamURL ws_flv) { + this.ws_flv = ws_flv; + } + + public void setWss_flv(StreamURL wss_flv) { + this.wss_flv = wss_flv; + } + + public void setFmp4(StreamURL fmp4) { + this.fmp4 = fmp4; + } + + public void setHttps_fmp4(StreamURL https_fmp4) { + this.https_fmp4 = https_fmp4; + } + + public void setWs_fmp4(StreamURL ws_fmp4) { + this.ws_fmp4 = ws_fmp4; + } + + public void setWss_fmp4(StreamURL wss_fmp4) { + this.wss_fmp4 = wss_fmp4; + } + + public void setHls(StreamURL hls) { + this.hls = hls; + } + + public void setHttps_hls(StreamURL https_hls) { + this.https_hls = https_hls; + } + + public void setWs_hls(StreamURL ws_hls) { + this.ws_hls = ws_hls; + } + + public void setWss_hls(StreamURL wss_hls) { + this.wss_hls = wss_hls; + } + + public void setTs(StreamURL ts) { + this.ts = ts; + } + + public void setHttps_ts(StreamURL https_ts) { + this.https_ts = https_ts; + } + + public void setWs_ts(StreamURL ws_ts) { + this.ws_ts = ws_ts; + } + + public void setWss_ts(StreamURL wss_ts) { + this.wss_ts = wss_ts; + } + + public void setRtmp(StreamURL rtmp) { + this.rtmp = rtmp; + } + + public void setRtmps(StreamURL rtmps) { + this.rtmps = rtmps; + } + + public void setRtsp(StreamURL rtsp) { + this.rtsp = rtsp; + } + + public void setRtsps(StreamURL rtsps) { + this.rtsps = rtsps; + } + + public void setRtc(StreamURL rtc) { + this.rtc = rtc; + } + + public void setRtcs(StreamURL rtcs) { + this.rtcs = rtcs; + } + public void setRtmp(String host, int port, int sslPort, String app, String stream, String callIdParam) { String file = String.format("%s/%s/%s", app, stream, callIdParam); this.rtmp = new StreamURL("rtmp", host, port, file); diff --git a/src/main/java/com/genersoft/iot/vmp/common/StreamURL.java b/src/main/java/com/genersoft/iot/vmp/common/StreamURL.java index bb67dee1..eecf469f 100644 --- a/src/main/java/com/genersoft/iot/vmp/common/StreamURL.java +++ b/src/main/java/com/genersoft/iot/vmp/common/StreamURL.java @@ -2,9 +2,11 @@ package com.genersoft.iot.vmp.common; import io.swagger.v3.oas.annotations.media.Schema; +import java.io.Serializable; + @Schema(description = "流地址信息") -public class StreamURL { +public class StreamURL implements Serializable { @Schema(description = "协议") private String protocol; diff --git a/src/main/java/com/genersoft/iot/vmp/conf/SipConfig.java b/src/main/java/com/genersoft/iot/vmp/conf/SipConfig.java index 67b89d67..ff9008ef 100644 --- a/src/main/java/com/genersoft/iot/vmp/conf/SipConfig.java +++ b/src/main/java/com/genersoft/iot/vmp/conf/SipConfig.java @@ -20,8 +20,6 @@ public class SipConfig { Integer ptzSpeed = 50; - Integer keepaliveTimeOut = 255; - Integer registerTimeInterval = 120; private boolean alarm; @@ -50,9 +48,6 @@ public class SipConfig { this.ptzSpeed = ptzSpeed; } - public void setKeepaliveTimeOut(Integer keepaliveTimeOut) { - this.keepaliveTimeOut = keepaliveTimeOut; - } public void setRegisterTimeInterval(Integer registerTimeInterval) { this.registerTimeInterval = registerTimeInterval; @@ -86,10 +81,6 @@ public class SipConfig { return ptzSpeed; } - public Integer getKeepaliveTimeOut() { - return keepaliveTimeOut; - } - public Integer getRegisterTimeInterval() { return registerTimeInterval; } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/SipLayer.java b/src/main/java/com/genersoft/iot/vmp/gb28181/SipLayer.java index f3c8bedf..152c7fdd 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/SipLayer.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/SipLayer.java @@ -11,6 +11,7 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.CommandLineRunner; import org.springframework.core.annotation.Order; import org.springframework.stereotype.Component; +import org.springframework.util.ObjectUtils; import javax.sip.*; import java.util.*; @@ -107,6 +108,9 @@ public class SipLayer implements CommandLineRunner { } public SipProviderImpl getUdpSipProvider(String ip) { + if (ObjectUtils.isEmpty(ip)) { + return null; + } return udpSipProviderMap.get(ip); } @@ -125,6 +129,9 @@ public class SipLayer implements CommandLineRunner { } public SipProviderImpl getTcpSipProvider(String ip) { + if (ObjectUtils.isEmpty(ip)) { + return null; + } return tcpSipProviderMap.get(ip); } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/SIPSender.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/SIPSender.java index a240ce4b..f723a6f9 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/SIPSender.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/SIPSender.java @@ -5,22 +5,19 @@ import com.genersoft.iot.vmp.gb28181.event.SipSubscribe; import com.genersoft.iot.vmp.gb28181.utils.SipUtils; import com.genersoft.iot.vmp.utils.GitUtil; import gov.nist.javax.sip.SipProviderImpl; -import gov.nist.javax.sip.message.SIPRequest; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.stereotype.Component; +import org.springframework.util.ObjectUtils; import javax.sip.SipException; -import javax.sip.SipFactory; import javax.sip.header.CallIdHeader; import javax.sip.header.UserAgentHeader; import javax.sip.header.ViaHeader; import javax.sip.message.Message; import javax.sip.message.Request; import javax.sip.message.Response; -import java.net.InetAddress; import java.text.ParseException; /** @@ -109,6 +106,10 @@ public class SIPSender { } public CallIdHeader getNewCallIdHeader(String ip, String transport){ + if (ObjectUtils.isEmpty(ip) || ObjectUtils.isEmpty(transport)) { + return transport.equalsIgnoreCase("TCP") ? sipLayer.getTcpSipProvider().getNewCallId() + : sipLayer.getUdpSipProvider().getNewCallId(); + } return transport.equalsIgnoreCase("TCP") ? sipLayer.getTcpSipProvider(ip).getNewCallId() : sipLayer.getUdpSipProvider(ip).getNewCallId(); } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/callback/DeferredResultHolder.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/callback/DeferredResultHolder.java index ba65581a..3d5c2947 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/callback/DeferredResultHolder.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/callback/DeferredResultHolder.java @@ -1,15 +1,16 @@ package com.genersoft.iot.vmp.gb28181.transmit.callback; -import java.util.HashMap; +import com.alibaba.fastjson2.JSON; +import com.genersoft.iot.vmp.vmanager.bean.DeferredResultEx; +import org.springframework.stereotype.Component; +import org.springframework.util.ObjectUtils; +import org.springframework.web.context.request.async.DeferredResult; + +import java.util.Collection; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; -import org.springframework.http.HttpStatus; -import org.springframework.http.ResponseEntity; -import org.springframework.stereotype.Component; -import org.springframework.web.context.request.async.DeferredResult; - /** * @description: 异步请求处理 * @author: swwheihei @@ -51,31 +52,48 @@ public class DeferredResultHolder { public static final String CALLBACK_CMD_BROADCAST = "CALLBACK_BROADCAST"; - private Map> map = new ConcurrentHashMap<>(); + private Map> map = new ConcurrentHashMap<>(); - public void put(String key, String id, DeferredResult result) { - Map deferredResultMap = map.get(key); + public void put(String key, String id, DeferredResultEx result) { + Map deferredResultMap = map.get(key); if (deferredResultMap == null) { deferredResultMap = new ConcurrentHashMap<>(); map.put(key, deferredResultMap); } deferredResultMap.put(id, result); } - - public DeferredResult get(String key, String id) { - Map deferredResultMap = map.get(key); + + public void put(String key, String id, DeferredResult result) { + Map deferredResultMap = map.get(key); if (deferredResultMap == null) { + deferredResultMap = new ConcurrentHashMap<>(); + map.put(key, deferredResultMap); + } + deferredResultMap.put(id, new DeferredResultEx(result)); + } + + public DeferredResultEx get(String key, String id) { + Map deferredResultMap = map.get(key); + if (deferredResultMap == null || ObjectUtils.isEmpty(id)) { return null; } return deferredResultMap.get(id); } + public Collection getAllByKey(String key) { + Map deferredResultMap = map.get(key); + if (deferredResultMap == null) { + return null; + } + return deferredResultMap.values(); + } + public boolean exist(String key, String id){ if (key == null) { return false; } - Map deferredResultMap = map.get(key); + Map deferredResultMap = map.get(key); if (id == null) { return deferredResultMap != null; }else { @@ -88,15 +106,15 @@ public class DeferredResultHolder { * @param msg */ public void invokeResult(RequestMessage msg) { - Map deferredResultMap = map.get(msg.getKey()); + Map deferredResultMap = map.get(msg.getKey()); if (deferredResultMap == null) { return; } - DeferredResult result = deferredResultMap.get(msg.getId()); + DeferredResultEx result = deferredResultMap.get(msg.getId()); if (result == null) { return; } - result.setResult(msg.getData()); + result.getDeferredResult().setResult(msg.getData()); deferredResultMap.remove(msg.getId()); if (deferredResultMap.size() == 0) { map.remove(msg.getKey()); @@ -108,18 +126,27 @@ public class DeferredResultHolder { * @param msg */ public void invokeAllResult(RequestMessage msg) { - Map deferredResultMap = map.get(msg.getKey()); + Map deferredResultMap = map.get(msg.getKey()); if (deferredResultMap == null) { return; } Set ids = deferredResultMap.keySet(); for (String id : ids) { - DeferredResult result = deferredResultMap.get(id); + DeferredResultEx result = deferredResultMap.get(id); if (result == null) { return; } - result.setResult(msg.getData()); + if (result.getFilter() != null) { + Object handler = result.getFilter().handler(msg.getData()); + System.out.println(JSON.toJSONString(handler)); + result.getDeferredResult().setResult(handler); + }else { + result.getDeferredResult().setResult(msg.getData()); + } + } map.remove(msg.getKey()); } + + } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java index ea8275fe..8817122c 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java @@ -8,14 +8,13 @@ import com.genersoft.iot.vmp.gb28181.event.SipSubscribe; import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager; import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver; import com.genersoft.iot.vmp.gb28181.transmit.SIPSender; -import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander; import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommanderFroPlatform; import com.genersoft.iot.vmp.gb28181.transmit.event.request.ISIPRequestProcessor; import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent; import com.genersoft.iot.vmp.gb28181.utils.SipUtils; -import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe; import com.genersoft.iot.vmp.media.zlm.ZLMMediaListManager; import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory; +import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe; import com.genersoft.iot.vmp.media.zlm.dto.*; import com.genersoft.iot.vmp.service.IMediaServerService; import com.genersoft.iot.vmp.service.IPlayService; @@ -39,9 +38,10 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import javax.sdp.*; -import javax.sip.*; +import javax.sip.InvalidArgumentException; +import javax.sip.RequestEvent; +import javax.sip.SipException; import javax.sip.header.CallIdHeader; -import javax.sip.message.Request; import javax.sip.message.Response; import java.text.ParseException; import java.time.Instant; @@ -479,7 +479,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements playService.play(mediaServerItem, ssrcInfo, device, channelId, hookEvent, errorEvent, (code, msg) -> { logger.info("[上级点播]超时, 用户:{}, 通道:{}", username, channelId); redisCatchStorage.deleteSendRTPServer(platform.getServerGBId(), channelId, callIdHeader.getCallId(), null); - }, null); + }); } else { sendRtpItem.setStreamId(playTransaction.getStream()); // 写入redis, 超时时回复 diff --git a/src/main/java/com/genersoft/iot/vmp/service/IPlayService.java b/src/main/java/com/genersoft/iot/vmp/service/IPlayService.java index 19913923..8b3984fc 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/IPlayService.java +++ b/src/main/java/com/genersoft/iot/vmp/service/IPlayService.java @@ -13,7 +13,6 @@ import com.genersoft.iot.vmp.service.bean.InviteTimeOutCallback; import com.genersoft.iot.vmp.service.bean.PlayBackCallback; import com.genersoft.iot.vmp.service.bean.SSRCInfo; import com.genersoft.iot.vmp.vmanager.bean.WVPResult; -import com.genersoft.iot.vmp.vmanager.gb28181.play.bean.PlayResult; import org.springframework.web.context.request.async.DeferredResult; import javax.sip.InvalidArgumentException; @@ -25,12 +24,12 @@ import java.text.ParseException; */ public interface IPlayService { - void onPublishHandlerForPlay(MediaServerItem mediaServerItem, JSONObject resonse, String deviceId, String channelId, String uuid); + void onPublishHandlerForPlay(MediaServerItem mediaServerItem, JSONObject resonse, String deviceId, String channelId); void play(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, Device device, String channelId, ZlmHttpHookSubscribe.Event hookEvent, SipSubscribe.Event errorEvent, - InviteTimeOutCallback timeoutCallback, String uuid); - PlayResult play(MediaServerItem mediaServerItem, String deviceId, String channelId, ZlmHttpHookSubscribe.Event event, SipSubscribe.Event errorEvent, Runnable timeoutCallback); + InviteTimeOutCallback timeoutCallback); + void play(MediaServerItem mediaServerItem, String deviceId, String channelId, ZlmHttpHookSubscribe.Event event, SipSubscribe.Event errorEvent, Runnable timeoutCallback); MediaServerItem getNewMediaServerItem(Device device); diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java index f899db3e..3bbe88bf 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java @@ -1,49 +1,28 @@ package com.genersoft.iot.vmp.service.impl; -import java.math.BigDecimal; -import java.math.RoundingMode; -import java.text.ParseException; -import java.util.*; - -import javax.sip.InvalidArgumentException; -import javax.sip.ResponseEvent; -import javax.sip.SipException; - -import com.genersoft.iot.vmp.conf.exception.ControllerException; -import com.genersoft.iot.vmp.conf.exception.ServiceException; -import com.genersoft.iot.vmp.conf.exception.SsrcTransactionNotFoundException; -import com.genersoft.iot.vmp.gb28181.bean.*; -import com.genersoft.iot.vmp.service.IDeviceService; -import com.genersoft.iot.vmp.utils.redis.RedisUtil; -import com.genersoft.iot.vmp.vmanager.bean.ErrorCode; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Qualifier; -import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; -import org.springframework.stereotype.Service; -import org.springframework.util.ObjectUtils; -import org.springframework.web.context.request.async.DeferredResult; - import com.alibaba.fastjson2.JSON; import com.alibaba.fastjson2.JSONArray; import com.alibaba.fastjson2.JSONObject; import com.genersoft.iot.vmp.common.StreamInfo; import com.genersoft.iot.vmp.conf.DynamicTask; import com.genersoft.iot.vmp.conf.UserSetting; +import com.genersoft.iot.vmp.conf.exception.ControllerException; +import com.genersoft.iot.vmp.conf.exception.ServiceException; +import com.genersoft.iot.vmp.conf.exception.SsrcTransactionNotFoundException; +import com.genersoft.iot.vmp.gb28181.bean.*; import com.genersoft.iot.vmp.gb28181.event.SipSubscribe; import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager; import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder; import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage; import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander; import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommanderFroPlatform; +import com.genersoft.iot.vmp.media.zlm.AssistRESTfulUtils; +import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils; +import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe; import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory; import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange; -import com.genersoft.iot.vmp.utils.DateUtil; -import com.genersoft.iot.vmp.media.zlm.AssistRESTfulUtils; -import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe; -import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; +import com.genersoft.iot.vmp.service.IDeviceService; import com.genersoft.iot.vmp.service.IMediaServerService; import com.genersoft.iot.vmp.service.IMediaService; import com.genersoft.iot.vmp.service.IPlayService; @@ -53,8 +32,27 @@ import com.genersoft.iot.vmp.service.bean.PlayBackResult; import com.genersoft.iot.vmp.service.bean.SSRCInfo; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.IVideoManagerStorage; +import com.genersoft.iot.vmp.utils.DateUtil; +import com.genersoft.iot.vmp.utils.redis.RedisUtil; +import com.genersoft.iot.vmp.vmanager.bean.ErrorCode; import com.genersoft.iot.vmp.vmanager.bean.WVPResult; -import com.genersoft.iot.vmp.vmanager.gb28181.play.bean.PlayResult; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; +import org.springframework.stereotype.Service; +import org.springframework.util.ObjectUtils; +import org.springframework.web.context.request.async.DeferredResult; + +import javax.sip.InvalidArgumentException; +import javax.sip.ResponseEvent; +import javax.sip.SipException; +import java.math.BigDecimal; +import java.math.RoundingMode; +import java.text.ParseException; +import java.util.List; +import java.util.UUID; @SuppressWarnings(value = {"rawtypes", "unchecked"}) @Service @@ -111,46 +109,19 @@ public class PlayServiceImpl implements IPlayService { private ThreadPoolTaskExecutor taskExecutor; @Override - public PlayResult play(MediaServerItem mediaServerItem, String deviceId, String channelId, - ZlmHttpHookSubscribe.Event hookEvent, SipSubscribe.Event errorEvent, - Runnable timeoutCallback) { + public void play(MediaServerItem mediaServerItem, String deviceId, String channelId, + ZlmHttpHookSubscribe.Event hookEvent, SipSubscribe.Event errorEvent, + Runnable timeoutCallback) { if (mediaServerItem == null) { throw new ControllerException(ErrorCode.ERROR100.getCode(), "未找到可用的zlm"); } - PlayResult playResult = new PlayResult(); + RequestMessage msg = new RequestMessage(); String key = DeferredResultHolder.CALLBACK_CMD_PLAY + deviceId + channelId; msg.setKey(key); - String uuid = UUID.randomUUID().toString(); - msg.setId(uuid); - playResult.setUuid(uuid); - DeferredResult> result = new DeferredResult<>(userSetting.getPlayTimeout().longValue()); - playResult.setResult(result); - // 录像查询以channelId作为deviceId查询 - resultHolder.put(key, uuid, result); Device device = redisCatchStorage.getDevice(deviceId); StreamInfo streamInfo = redisCatchStorage.queryPlayByDevice(deviceId, channelId); - playResult.setDevice(device); - - result.onCompletion(() -> { - // 点播结束时调用截图接口 - taskExecutor.execute(() -> { - // TODO 应该在上流时调用更好,结束也可能是错误结束 - String path = "snap"; - String fileName = deviceId + "_" + channelId + ".jpg"; - WVPResult wvpResult = (WVPResult) result.getResult(); - if (Objects.requireNonNull(wvpResult).getCode() == 0) { - StreamInfo streamInfoForSuccess = (StreamInfo) wvpResult.getData(); - MediaServerItem mediaInfo = mediaServerService.getOne(streamInfoForSuccess.getMediaServerId()); - String streamUrl = streamInfoForSuccess.getFmp4().getUrl(); - - // 请求截图 - logger.info("[请求截图]: " + fileName); - zlmresTfulUtils.getSnap(mediaInfo, streamUrl, 15, 1, path, fileName); - } - }); - }); if (streamInfo != null) { String streamId = streamInfo.getStream(); @@ -160,7 +131,7 @@ public class PlayServiceImpl implements IPlayService { wvpResult.setMsg("点播失败, redis缓存streamId等于null"); msg.setData(wvpResult); resultHolder.invokeAllResult(msg); - return playResult; + return; } String mediaServerId = streamInfo.getMediaServerId(); MediaServerItem mediaInfo = mediaServerService.getOne(mediaServerId); @@ -178,14 +149,13 @@ public class PlayServiceImpl implements IPlayService { msg.setData(wvpResult); resultHolder.invokeAllResult(msg); - return playResult; + return; } else { WVPResult wvpResult = new WVPResult(); wvpResult.setCode(ErrorCode.SUCCESS.getCode()); wvpResult.setMsg(ErrorCode.SUCCESS.getMsg()); wvpResult.setData(streamInfo); msg.setData(wvpResult); - resultHolder.invokeAllResult(msg); if (hookEvent != null) { hookEvent.response(mediaServerItem, JSON.parseObject(JSON.toJSONString(streamInfo))); @@ -211,7 +181,6 @@ public class PlayServiceImpl implements IPlayService { streamId = String.format("%s_%s", device.getDeviceId(), channelId); } SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, streamId, device.isSsrcCheck(), false); - logger.info(JSONObject.toJSONString(ssrcInfo)); play(mediaServerItem, ssrcInfo, device, channelId, (mediaServerItemInUse, response) -> { if (hookEvent != null) { hookEvent.response(mediaServerItem, response); @@ -238,16 +207,15 @@ public class PlayServiceImpl implements IPlayService { msg.setData(wvpResult); // 回复之前所有的点播请求 resultHolder.invokeAllResult(msg); - }, uuid); + }); } - return playResult; } @Override public void play(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, Device device, String channelId, ZlmHttpHookSubscribe.Event hookEvent, SipSubscribe.Event errorEvent, - InviteTimeOutCallback timeoutCallback, String uuid) { + InviteTimeOutCallback timeoutCallback) { String streamId = null; if (mediaServerItem.isRtpEnable()) { @@ -281,6 +249,16 @@ public class PlayServiceImpl implements IPlayService { //端口获取失败的ssrcInfo 没有必要发送点播指令 if (ssrcInfo.getPort() <= 0) { logger.info("[点播端口分配异常],deviceId={},channelId={},ssrcInfo={}", device.getDeviceId(), channelId, ssrcInfo); + dynamicTask.stop(timeOutTaskKey); + // 释放ssrc + mediaServerService.releaseSsrc(mediaServerItem.getId(), finalSsrcInfo.getSsrc()); + + streamSession.remove(device.getDeviceId(), channelId, finalSsrcInfo.getStream()); + + RequestMessage msg = new RequestMessage(); + msg.setKey(DeferredResultHolder.CALLBACK_CMD_PLAY + device.getDeviceId() + channelId); + msg.setData(WVPResult.fail(ErrorCode.ERROR100.getCode(), "点播端口分配异常")); + resultHolder.invokeAllResult(msg); return; } try { @@ -289,9 +267,15 @@ public class PlayServiceImpl implements IPlayService { System.out.println("停止超时任务: " + timeOutTaskKey); dynamicTask.stop(timeOutTaskKey); // hook响应 - onPublishHandlerForPlay(mediaServerItemInuse, response, device.getDeviceId(), channelId, uuid); + onPublishHandlerForPlay(mediaServerItemInuse, response, device.getDeviceId(), channelId); hookEvent.response(mediaServerItemInuse, response); logger.info("[点播成功] deviceId: {}, channelId: {}", device.getDeviceId(), channelId); + String streamUrl = String.format("rtsp://127.0.0.1:%s/%s/%s", mediaServerItemInuse.getRtspPort(), "rtp", stream); + String path = "snap"; + String fileName = device.getDeviceId() + "_" + channelId + ".jpg"; + // 请求截图 + logger.info("[请求截图]: " + fileName); + zlmresTfulUtils.getSnap(mediaServerItemInuse, streamUrl, 15, 1, path, fileName); }, (event) -> { ResponseEvent responseEvent = (ResponseEvent) event.event; @@ -331,7 +315,7 @@ public class PlayServiceImpl implements IPlayService { logger.info("[ZLM HOOK] ssrc修正后收到订阅消息: " + response.toJSONString()); dynamicTask.stop(timeOutTaskKey); // hook响应 - onPublishHandlerForPlay(mediaServerItemInUse, response, device.getDeviceId(), channelId, uuid); + onPublishHandlerForPlay(mediaServerItemInUse, response, device.getDeviceId(), channelId); hookEvent.response(mediaServerItemInUse, response); }); } @@ -367,13 +351,41 @@ public class PlayServiceImpl implements IPlayService { } @Override - public void onPublishHandlerForPlay(MediaServerItem mediaServerItem, JSONObject response, String deviceId, String channelId, String uuid) { + public void onPublishHandlerForPlay(MediaServerItem mediaServerItem, JSONObject response, String deviceId, String channelId) { + StreamInfo streamInfo = onPublishHandler(mediaServerItem, response, deviceId, channelId); RequestMessage msg = new RequestMessage(); - if (uuid != null) { + msg.setKey(DeferredResultHolder.CALLBACK_CMD_PLAY + deviceId + channelId); + if (streamInfo != null) { + DeviceChannel deviceChannel = storager.queryChannel(deviceId, channelId); + if (deviceChannel != null) { + deviceChannel.setStreamId(streamInfo.getStream()); + storager.startPlay(deviceId, channelId, streamInfo.getStream()); + } + redisCatchStorage.startPlay(streamInfo); + + WVPResult wvpResult = new WVPResult(); + wvpResult.setCode(ErrorCode.SUCCESS.getCode()); + wvpResult.setMsg(ErrorCode.SUCCESS.getMsg()); + wvpResult.setData(streamInfo); + + msg.setData(wvpResult); + resultHolder.invokeAllResult(msg); + + } else { + logger.warn("设备预览API调用失败!"); + msg.setData(WVPResult.fail(ErrorCode.ERROR100.getCode(), "设备预览API调用失败!")); + resultHolder.invokeAllResult(msg); + } + } + + private void onPublishHandlerForPlayback(MediaServerItem mediaServerItem, JSONObject response, String deviceId, String channelId, String uuid) { + RequestMessage msg = new RequestMessage(); + msg.setKey(DeferredResultHolder.CALLBACK_CMD_PLAY + deviceId + channelId); + if (!ObjectUtils.isEmpty(uuid)) { msg.setId(uuid); } - msg.setKey(DeferredResultHolder.CALLBACK_CMD_PLAY + deviceId + channelId); StreamInfo streamInfo = onPublishHandler(mediaServerItem, response, deviceId, channelId); + if (streamInfo != null) { DeviceChannel deviceChannel = storager.queryChannel(deviceId, channelId); if (deviceChannel != null) { @@ -390,8 +402,8 @@ public class PlayServiceImpl implements IPlayService { resultHolder.invokeAllResult(msg); } else { - logger.warn("设备预览API调用失败!"); - msg.setData(WVPResult.fail(ErrorCode.ERROR100.getCode(), "设备预览API调用失败!")); + logger.warn("录像回放调用失败!"); + msg.setData(WVPResult.fail(ErrorCode.ERROR100.getCode(), "录像回放调用失败!")); resultHolder.invokeAllResult(msg); } } @@ -545,7 +557,7 @@ public class PlayServiceImpl implements IPlayService { logger.info("[ZLM HOOK] ssrc修正后收到订阅消息: " + response.toJSONString()); dynamicTask.stop(playBackTimeOutTaskKey); // hook响应 - onPublishHandlerForPlay(mediaServerItemInUse, response, device.getDeviceId(), channelId, uuid); + onPublishHandlerForPlayback(mediaServerItemInUse, response, device.getDeviceId(), channelId, uuid); hookEvent.call(new InviteStreamInfo(mediaServerItem, null, eventResult.callId, "rtp", ssrcInfo.getStream())); }); } @@ -568,6 +580,8 @@ public class PlayServiceImpl implements IPlayService { return result; } + + @Override public DeferredResult> download(String deviceId, String channelId, String startTime, String endTime, int downloadSpeed, InviteStreamCallback infoCallBack, PlayBackCallback hookCallBack) { Device device = storager.queryVideoDevice(deviceId); diff --git a/src/main/java/com/genersoft/iot/vmp/utils/redis/FastJsonRedisSerializer.java b/src/main/java/com/genersoft/iot/vmp/utils/redis/FastJsonRedisSerializer.java index 81e62499..466a5035 100644 --- a/src/main/java/com/genersoft/iot/vmp/utils/redis/FastJsonRedisSerializer.java +++ b/src/main/java/com/genersoft/iot/vmp/utils/redis/FastJsonRedisSerializer.java @@ -1,14 +1,12 @@ package com.genersoft.iot.vmp.utils.redis; -import java.nio.charset.Charset; - +import com.alibaba.fastjson2.JSON; import com.alibaba.fastjson2.JSONReader; import com.alibaba.fastjson2.JSONWriter; import org.springframework.data.redis.serializer.RedisSerializer; import org.springframework.data.redis.serializer.SerializationException; -import com.alibaba.fastjson2.JSON; -import com.alibaba.fastjson2.JSONWriter.Feature; +import java.nio.charset.Charset; /** * @description:使用fastjson实现redis的序列化 @@ -31,7 +29,7 @@ public class FastJsonRedisSerializer implements RedisSerializer { if (t == null) { return new byte[0]; } - return JSON.toJSONString(t, JSONWriter.Feature.WriteClassName).getBytes(DEFAULT_CHARSET); + return JSON.toJSONString(t, JSONWriter.Feature.WriteClassName, JSONWriter.Feature.WritePairAsJavaBean).getBytes(DEFAULT_CHARSET); } @Override @@ -42,4 +40,6 @@ public class FastJsonRedisSerializer implements RedisSerializer { String str = new String(bytes, DEFAULT_CHARSET); return JSON.parseObject(str, clazz, JSONReader.Feature.SupportAutoType); } + + } diff --git a/src/main/java/com/genersoft/iot/vmp/vmanager/bean/DeferredResultEx.java b/src/main/java/com/genersoft/iot/vmp/vmanager/bean/DeferredResultEx.java new file mode 100644 index 00000000..0b9d3d9a --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/vmanager/bean/DeferredResultEx.java @@ -0,0 +1,31 @@ +package com.genersoft.iot.vmp.vmanager.bean; + +import org.springframework.web.context.request.async.DeferredResult; + +public class DeferredResultEx { + + private DeferredResult deferredResult; + + private DeferredResultFilter filter; + + public DeferredResultEx(DeferredResult result) { + this.deferredResult = result; + } + + + public DeferredResult getDeferredResult() { + return deferredResult; + } + + public void setDeferredResult(DeferredResult deferredResult) { + this.deferredResult = deferredResult; + } + + public DeferredResultFilter getFilter() { + return filter; + } + + public void setFilter(DeferredResultFilter filter) { + this.filter = filter; + } +} diff --git a/src/main/java/com/genersoft/iot/vmp/vmanager/bean/DeferredResultFilter.java b/src/main/java/com/genersoft/iot/vmp/vmanager/bean/DeferredResultFilter.java new file mode 100644 index 00000000..18c22407 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/vmanager/bean/DeferredResultFilter.java @@ -0,0 +1,6 @@ +package com.genersoft.iot.vmp.vmanager.bean; + +public interface DeferredResultFilter { + + Object handler(Object o); +} diff --git a/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/play/PlayController.java b/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/play/PlayController.java index ae76b95f..87e203a9 100644 --- a/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/play/PlayController.java +++ b/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/play/PlayController.java @@ -1,44 +1,36 @@ package com.genersoft.iot.vmp.vmanager.gb28181.play; import com.alibaba.fastjson2.JSONArray; +import com.alibaba.fastjson2.JSONObject; import com.genersoft.iot.vmp.common.StreamInfo; import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.conf.exception.ControllerException; import com.genersoft.iot.vmp.conf.exception.SsrcTransactionNotFoundException; +import com.genersoft.iot.vmp.gb28181.bean.Device; import com.genersoft.iot.vmp.gb28181.bean.SsrcTransaction; import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager; -import com.genersoft.iot.vmp.gb28181.bean.Device; import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder; import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage; +import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander; import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import com.genersoft.iot.vmp.service.IMediaServerService; -import com.genersoft.iot.vmp.storager.IRedisCatchStorage; -import com.genersoft.iot.vmp.vmanager.bean.ErrorCode; -import com.genersoft.iot.vmp.vmanager.bean.WVPResult; -import com.genersoft.iot.vmp.vmanager.gb28181.play.bean.PlayResult; import com.genersoft.iot.vmp.service.IMediaService; import com.genersoft.iot.vmp.service.IPlayService; - +import com.genersoft.iot.vmp.storager.IRedisCatchStorage; +import com.genersoft.iot.vmp.storager.IVideoManagerStorage; +import com.genersoft.iot.vmp.vmanager.bean.DeferredResultEx; +import com.genersoft.iot.vmp.vmanager.bean.ErrorCode; +import com.genersoft.iot.vmp.vmanager.bean.WVPResult; import io.swagger.v3.oas.annotations.Operation; import io.swagger.v3.oas.annotations.Parameter; import io.swagger.v3.oas.annotations.tags.Tag; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.web.bind.annotation.CrossOrigin; -import org.springframework.web.bind.annotation.GetMapping; -import org.springframework.web.bind.annotation.PathVariable; -import org.springframework.web.bind.annotation.PostMapping; -import org.springframework.web.bind.annotation.RequestMapping; -import org.springframework.web.bind.annotation.RestController; - -import com.alibaba.fastjson2.JSONObject; -import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander; -import com.genersoft.iot.vmp.storager.IVideoManagerStorage; +import org.springframework.web.bind.annotation.*; import org.springframework.web.context.request.async.DeferredResult; -import javax.servlet.http.HttpServlet; import javax.servlet.http.HttpServletRequest; import javax.sip.InvalidArgumentException; import javax.sip.SipException; @@ -91,16 +83,52 @@ public class PlayController { public DeferredResult> play(HttpServletRequest request, @PathVariable String deviceId, @PathVariable String channelId) { + String localAddr = request.getLocalAddr(); + String localName = request.getLocalName(); + String remoteHost = request.getRemoteHost(); + String remoteAddr = request.getRemoteAddr(); + String remoteUser = request.getRemoteUser(); + String requestURI = request.getRequestURI(); + System.out.println(3333333); + System.out.println(localAddr); + System.out.println(localName); + System.out.println(remoteHost); + System.out.println(remoteAddr); + System.out.println(remoteUser); + System.out.println(requestURI); + System.out.println(4444444); // 获取可用的zlm Device device = storager.queryVideoDevice(deviceId); MediaServerItem newMediaServerItem = playService.getNewMediaServerItem(device); - PlayResult playResult = playService.play(newMediaServerItem, deviceId, channelId, null, null, null); - playResult.getResult().onCompletion(()->{ - WVPResult result = (WVPResult)playResult.getResult().getResult(); - result.getData().channgeStreamIp(request.getLocalAddr()); - playResult.getResult().setResult(result); + + RequestMessage msg = new RequestMessage(); + String key = DeferredResultHolder.CALLBACK_CMD_PLAY + deviceId + channelId; + boolean exist = resultHolder.exist(key, null); + msg.setKey(key); + String uuid = UUID.randomUUID().toString(); + msg.setId(uuid); + DeferredResult> result = new DeferredResult<>(userSetting.getPlayTimeout().longValue()); + DeferredResultEx> deferredResultEx = new DeferredResultEx<>(result); + + deferredResultEx.setFilter(result1 -> { + System.out.println(1111); + System.out.println(request.getLocalName()); + WVPResult wvpResult = (WVPResult)result1; + if (wvpResult.getCode() == ErrorCode.SUCCESS.getCode()) { + StreamInfo data = wvpResult.getData(); + data.channgeStreamIp(request.getLocalName()); + ((WVPResult)result1).setData(data); + } + return result1; }); - return playResult.getResult(); + + // 录像查询以channelId作为deviceId查询 + resultHolder.put(key, uuid, deferredResultEx); + + if (!exist) { + playService.play(newMediaServerItem, deviceId, channelId, null, null, null); + } + return result; } diff --git a/src/main/java/com/genersoft/iot/vmp/web/gb28181/ApiStreamController.java b/src/main/java/com/genersoft/iot/vmp/web/gb28181/ApiStreamController.java index 0f003c76..ab769f5b 100644 --- a/src/main/java/com/genersoft/iot/vmp/web/gb28181/ApiStreamController.java +++ b/src/main/java/com/genersoft/iot/vmp/web/gb28181/ApiStreamController.java @@ -112,7 +112,7 @@ public class ApiStreamController { return resultDeferredResult; } MediaServerItem newMediaServerItem = playService.getNewMediaServerItem(device); - PlayResult play = playService.play(newMediaServerItem, serial, code, (mediaServerItem, response)->{ + playService.play(newMediaServerItem, serial, code, (mediaServerItem, response)->{ StreamInfo streamInfo = redisCatchStorage.queryPlayByDevice(serial, code); JSONObject result = new JSONObject(); result.put("StreamID", streamInfo.getStream()); diff --git a/src/main/resources/all-application.yml b/src/main/resources/all-application.yml index a6208595..759b744f 100644 --- a/src/main/resources/all-application.yml +++ b/src/main/resources/all-application.yml @@ -105,8 +105,6 @@ sip: id: 44010200492000000001 # [可选] 默认设备认证密码,后续扩展使用设备单独密码, 移除密码将不进行校验 password: admin123 - # [可选] 心跳超时时间, 建议设置为心跳周期的三倍 - keepalive-timeout: 255 # [可选] 国标级联注册失败,再次发起注册的时间间隔。 默认60秒 register-time-interval: 60 # [可选] 云台控制速度