diff --git a/README.md b/README.md index 5431363b5..40204401a 100644 --- a/README.md +++ b/README.md @@ -50,6 +50,7 @@ https://gitee.com/pan648540858/wvp-GB28181-pro.git - [X] 支持电子地图,支持接入WGS84和GCJ02两种坐标系,并且自动转化为合适的坐标系进行展示和分发 - [X] 接入设备 - [X] 视频预览 + - [X] 支持主码流子码流切换 - [X] 无限制接入路数,能接入多少设备只取决于你的服务器性能 - [X] 云台控制,控制设备转向,拉近,拉远 - [X] 预置位查询,使用与设置 diff --git a/src/main/java/com/genersoft/iot/vmp/common/InviteInfo.java b/src/main/java/com/genersoft/iot/vmp/common/InviteInfo.java index dabdb4f07..dbe9e090a 100644 --- a/src/main/java/com/genersoft/iot/vmp/common/InviteInfo.java +++ b/src/main/java/com/genersoft/iot/vmp/common/InviteInfo.java @@ -1,7 +1,6 @@ package com.genersoft.iot.vmp.common; import com.genersoft.iot.vmp.service.bean.SSRCInfo; -import io.swagger.v3.oas.annotations.media.Schema; /** * 记录每次发送invite消息的状态 @@ -125,20 +124,4 @@ public class InviteInfo { this.streamMode = streamMode; } - - /*=========================设备主子码流逻辑START====================*/ - @Schema(description = "是否为子码流(true-是,false-主码流)") - private boolean subStream; - - public boolean isSubStream() { - return subStream; - } - - public void setSubStream(boolean subStream) { - this.subStream = subStream; - } - - - - } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/Device.java b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/Device.java index e625b1416..e6b769511 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/Device.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/Device.java @@ -469,23 +469,11 @@ public class Device { this.sipTransactionInfo = sipTransactionInfo; } - /*======================设备主子码流逻辑START=========================*/ - @Schema(description = "开启主子码流切换的开关(false-不开启,true-开启)") - private boolean switchPrimarySubStream; + public boolean isAutoSyncChannel() { + return autoSyncChannel; + } - public boolean isSwitchPrimarySubStream() { - return switchPrimarySubStream; - } - - public void setSwitchPrimarySubStream(boolean switchPrimarySubStream) { - this.switchPrimarySubStream = switchPrimarySubStream; - } - - public boolean isAutoSyncChannel() { - return autoSyncChannel; - } - - public void setAutoSyncChannel(boolean autoSyncChannel) { - this.autoSyncChannel = autoSyncChannel; - } + public void setAutoSyncChannel(boolean autoSyncChannel) { + this.autoSyncChannel = autoSyncChannel; + } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/DeviceChannel.java b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/DeviceChannel.java index 32939e6d9..233b424bc 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/DeviceChannel.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/DeviceChannel.java @@ -258,6 +258,10 @@ public class DeviceChannel { @Schema(description = "国标通用信息ID") private int commonGbChannelId; + @Schema(description = "码流标识,优先级高于设备中码流标识," + + "用于选择码流时组成码流标识。默认为null,不设置。可选值: stream/streamnumber/streamprofile/streamMode") + private String streamIdentification; + public int getId() { return id; } @@ -602,4 +606,12 @@ public class DeviceChannel { public void setCommonGbChannelId(int commonGbChannelId) { this.commonGbChannelId = commonGbChannelId; } + + public String getStreamIdentification() { + return streamIdentification; + } + + public void setStreamIdentification(String streamIdentification) { + this.streamIdentification = streamIdentification; + } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/GbSteamIdentification.java b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/GbSteamIdentification.java new file mode 100644 index 000000000..63c17a80c --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/GbSteamIdentification.java @@ -0,0 +1,44 @@ +package com.genersoft.iot.vmp.gb28181.bean; + +/** + * 码流索引标识 + */ +public enum GbSteamIdentification { + /** + * 主码流 stream:0 + * 子码流 stream:1s + */ + streamMain("stream", new String[]{"0","1"}), + /** + * 国标28181-2022定义的方式 + * 主码流 streamnumber:0 + * 子码流 streamnumber:1 + */ + streamnumber("streamnumber", new String[]{"0","1"}), + /** + * 主码流 streamprofile:0 + * 子码流 streamprofile:1 + */ + streamprofile("streamprofile", new String[]{"0","1"}), + /** + * 适用的品牌: TP-LINK + */ + streamMode("streamMode", new String[]{"main","sub"}), + ; + + GbSteamIdentification(String value, String[] indexArray) { + this.value = value; + this.indexArray = indexArray; + } + + private String value; + private String[] indexArray; + + public String getValue() { + return value; + } + + public String[] getIndexArray() { + return indexArray; + } +} diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SubscribeHolder.java b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SubscribeHolder.java index 041408ec7..f164b13c2 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SubscribeHolder.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SubscribeHolder.java @@ -52,7 +52,7 @@ public class SubscribeHolder { Runnable runnable = dynamicTask.get(taskOverdueKey); if (runnable instanceof ISubscribeTask) { ISubscribeTask subscribeTask = (ISubscribeTask) runnable; - subscribeTask.stop(); + subscribeTask.stop(null); } // 添加任务处理订阅过期 dynamicTask.stop(taskOverdueKey); @@ -87,7 +87,7 @@ public class SubscribeHolder { Runnable runnable = dynamicTask.get(taskOverdueKey); if (runnable instanceof ISubscribeTask) { ISubscribeTask subscribeTask = (ISubscribeTask) runnable; - subscribeTask.stop(); + subscribeTask.stop(null); } // 添加任务处理订阅过期 dynamicTask.stop(taskOverdueKey); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/event/SipSubscribe.java b/src/main/java/com/genersoft/iot/vmp/gb28181/event/SipSubscribe.java index 75751ad2e..7aceec88c 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/event/SipSubscribe.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/event/SipSubscribe.java @@ -2,6 +2,8 @@ package com.genersoft.iot.vmp.gb28181.event; import com.genersoft.iot.vmp.gb28181.bean.DeviceNotFoundEvent; import gov.nist.javax.sip.message.SIPRequest; +import gov.nist.javax.sip.message.SIPResponse; +import org.apache.commons.lang3.ObjectUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.scheduling.annotation.Scheduled; @@ -11,8 +13,7 @@ import javax.sip.DialogTerminatedEvent; import javax.sip.ResponseEvent; import javax.sip.TimeoutEvent; import javax.sip.TransactionTerminatedEvent; -import javax.sip.header.CallIdHeader; -import javax.sip.message.Response; +import javax.sip.header.WarningHeader; import java.time.Instant; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -95,14 +96,27 @@ public class SipSubscribe { this.event = event; if (event instanceof ResponseEvent) { ResponseEvent responseEvent = (ResponseEvent)event; - Response response = responseEvent.getResponse(); + SIPResponse response = (SIPResponse)responseEvent.getResponse(); this.type = EventResultType.response; if (response != null) { - this.msg = response.getReasonPhrase(); + WarningHeader warningHeader = (WarningHeader)response.getHeader(WarningHeader.NAME); + if (warningHeader != null && !ObjectUtils.isEmpty(warningHeader.getText())) { + this.msg = ""; + if (warningHeader.getCode() > 0) { + this.msg += warningHeader.getCode() + ":"; + } + if (warningHeader.getAgent() != null) { + this.msg += warningHeader.getCode() + ":"; + } + if (warningHeader.getText() != null) { + this.msg += warningHeader.getText(); + } + }else { + this.msg = response.getReasonPhrase(); + } this.statusCode = response.getStatusCode(); + this.callId = response.getCallIdHeader().getCallId(); } - this.callId = ((CallIdHeader)response.getHeader(CallIdHeader.NAME)).getCallId(); - }else if (event instanceof TimeoutEvent) { TimeoutEvent timeoutEvent = (TimeoutEvent)event; this.type = EventResultType.timeout; diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/session/CommonSessionManager.java b/src/main/java/com/genersoft/iot/vmp/gb28181/session/CommonSessionManager.java new file mode 100755 index 000000000..2d8c7e171 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/session/CommonSessionManager.java @@ -0,0 +1,86 @@ +package com.genersoft.iot.vmp.gb28181.session; + +import com.genersoft.iot.vmp.common.CommonCallback; +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.stereotype.Component; + +import java.util.Calendar; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +/** + * 通用回调管理 + */ +@Component +public class CommonSessionManager { + + public static Map callbackMap = new ConcurrentHashMap<>(); + + /** + * 存储回调相关的信息 + */ + class CommonSession{ + public String session; + public long createTime; + public int timeout; + + public CommonCallback callback; + public CommonCallback timeoutCallback; + } + + /** + * 添加回调 + * @param sessionId 唯一标识 + * @param callback 回调 + * @param timeout 超时时间, 单位分钟 + */ + public void add(String sessionId, CommonCallback callback, CommonCallback timeoutCallback, + Integer timeout) { + CommonSession commonSession = new CommonSession(); + commonSession.session = sessionId; + commonSession.callback = callback; + commonSession.createTime = System.currentTimeMillis(); + if (timeoutCallback != null) { + commonSession.timeoutCallback = timeoutCallback; + } + if (timeout != null) { + commonSession.timeout = timeout; + } + callbackMap.put(sessionId, commonSession); + } + + public void add(String sessionId, CommonCallback callback) { + add(sessionId, callback, null, 1); + } + + public CommonCallback get(String sessionId, boolean destroy) { + CommonSession commonSession = callbackMap.get(sessionId); + if (destroy) { + callbackMap.remove(sessionId); + } + return commonSession.callback; + } + + public CommonCallback get(String sessionId) { + return get(sessionId, false); + } + + public void delete(String sessionID) { + callbackMap.remove(sessionID); + } + + @Scheduled(fixedRate= 60) //每分钟执行一次 + public void execute(){ + Calendar cal = Calendar.getInstance(); + cal.add(Calendar.MINUTE, -1); + for (String session : callbackMap.keySet()) { + if (callbackMap.get(session).createTime < cal.getTimeInMillis()) { + // 超时 + if (callbackMap.get(session).timeoutCallback != null) { + callbackMap.get(session).timeoutCallback.run("timeout"); + } + callbackMap.remove(session); + } + } + } +} diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/task/ISubscribeTask.java b/src/main/java/com/genersoft/iot/vmp/gb28181/task/ISubscribeTask.java index a4e711d9e..8d1c7d2ee 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/task/ISubscribeTask.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/task/ISubscribeTask.java @@ -1,10 +1,10 @@ package com.genersoft.iot.vmp.gb28181.task; -import javax.sip.DialogState; +import com.genersoft.iot.vmp.common.CommonCallback; /** * @author lin */ public interface ISubscribeTask extends Runnable{ - void stop(); + void stop(CommonCallback callback); } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/task/impl/CatalogSubscribeTask.java b/src/main/java/com/genersoft/iot/vmp/gb28181/task/impl/CatalogSubscribeTask.java index 2ffbfe40a..d9270bbf0 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/task/impl/CatalogSubscribeTask.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/task/impl/CatalogSubscribeTask.java @@ -1,5 +1,6 @@ package com.genersoft.iot.vmp.gb28181.task.impl; +import com.genersoft.iot.vmp.common.CommonCallback; import com.genersoft.iot.vmp.conf.DynamicTask; import com.genersoft.iot.vmp.gb28181.bean.Device; import com.genersoft.iot.vmp.gb28181.task.ISubscribeTask; @@ -7,14 +8,13 @@ import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander; import gov.nist.javax.sip.message.SIPRequest; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.springframework.scheduling.annotation.Async; -import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; -import javax.sip.*; +import javax.sip.DialogState; +import javax.sip.InvalidArgumentException; +import javax.sip.ResponseEvent; +import javax.sip.SipException; import javax.sip.header.ToHeader; import java.text.ParseException; -import java.util.Timer; -import java.util.TimerTask; /** * 目录订阅任务 @@ -71,7 +71,7 @@ public class CatalogSubscribeTask implements ISubscribeTask { } @Override - public void stop() { + public void stop(CommonCallback callback) { /** * dialog 的各个状态 * EARLY-> Early state状态-初始请求发送以后,收到了一个临时响应消息 @@ -94,6 +94,9 @@ public class CatalogSubscribeTask implements ISubscribeTask { // 成功 logger.info("[取消目录订阅]成功: {}", device.getDeviceId()); } + if (callback != null) { + callback.run(event.getResponse().getRawContent() != null); + } },eventResult -> { // 失败 logger.warn("[取消目录订阅]失败,信令发送失败: {}-{} ", device.getDeviceId(), eventResult.msg); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/task/impl/MobilePositionSubscribeHandlerTask.java b/src/main/java/com/genersoft/iot/vmp/gb28181/task/impl/MobilePositionSubscribeHandlerTask.java index a7e997897..d46683690 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/task/impl/MobilePositionSubscribeHandlerTask.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/task/impl/MobilePositionSubscribeHandlerTask.java @@ -1,20 +1,9 @@ package com.genersoft.iot.vmp.gb28181.task.impl; -import com.genersoft.iot.vmp.conf.DynamicTask; -import com.genersoft.iot.vmp.gb28181.bean.*; +import com.genersoft.iot.vmp.common.CommonCallback; import com.genersoft.iot.vmp.gb28181.task.ISubscribeTask; -import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform; import com.genersoft.iot.vmp.service.IPlatformService; -import com.genersoft.iot.vmp.service.bean.GPSMsgInfo; -import com.genersoft.iot.vmp.storager.IRedisCatchStorage; -import com.genersoft.iot.vmp.storager.IVideoManagerStorage; import com.genersoft.iot.vmp.utils.SpringBeanFactory; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.scheduling.annotation.Async; - -import javax.sip.DialogState; -import java.util.List; /** * 向已经订阅(移动位置)的上级发送MobilePosition消息 @@ -38,7 +27,7 @@ public class MobilePositionSubscribeHandlerTask implements ISubscribeTask { } @Override - public void stop() { + public void stop(CommonCallback callback) { } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/task/impl/MobilePositionSubscribeTask.java b/src/main/java/com/genersoft/iot/vmp/gb28181/task/impl/MobilePositionSubscribeTask.java index 0abd3cae3..9fed0793b 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/task/impl/MobilePositionSubscribeTask.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/task/impl/MobilePositionSubscribeTask.java @@ -1,21 +1,19 @@ package com.genersoft.iot.vmp.gb28181.task.impl; +import com.genersoft.iot.vmp.common.CommonCallback; import com.genersoft.iot.vmp.conf.DynamicTask; import com.genersoft.iot.vmp.gb28181.bean.Device; import com.genersoft.iot.vmp.gb28181.task.ISubscribeTask; import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander; import gov.nist.javax.sip.message.SIPRequest; -import gov.nist.javax.sip.message.SIPResponse; -import org.dom4j.Element; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.springframework.scheduling.annotation.Async; -import javax.sip.*; +import javax.sip.InvalidArgumentException; +import javax.sip.ResponseEvent; +import javax.sip.SipException; import javax.sip.header.ToHeader; import java.text.ParseException; -import java.util.Timer; -import java.util.TimerTask; /** * 移动位置订阅的定时更新 @@ -70,7 +68,7 @@ public class MobilePositionSubscribeTask implements ISubscribeTask { } @Override - public void stop() { + public void stop(CommonCallback callback) { /** * dialog 的各个状态 * EARLY-> Early state状态-初始请求发送以后,收到了一个临时响应消息 @@ -92,6 +90,9 @@ public class MobilePositionSubscribeTask implements ISubscribeTask { // 成功 logger.info("[取消移动位置订阅]成功: {}", device.getDeviceId()); } + if (callback != null) { + callback.run(event.getResponse().getRawContent() != null); + } },eventResult -> { // 失败 logger.warn("[取消移动位置订阅]失败,信令发送失败: {}-{} ", device.getDeviceId(), eventResult.msg); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommander.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommander.java index 27fc7aefe..f5dd9a88c 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommander.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommander.java @@ -5,6 +5,7 @@ import com.genersoft.iot.vmp.conf.exception.SsrcTransactionNotFoundException; import com.genersoft.iot.vmp.gb28181.bean.Device; import com.genersoft.iot.vmp.gb28181.bean.DeviceAlarm; import com.genersoft.iot.vmp.gb28181.bean.command.PTZCommand; +import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel; import com.genersoft.iot.vmp.gb28181.event.SipSubscribe; import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; @@ -39,9 +40,9 @@ public interface ISIPCommander { /** * 请求预览视频流 * @param device 视频设备 - * @param channelId 预览通道 + * @param channel 预览通道 */ - void playStreamCmd(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, Device device, String channelId, ZlmHttpHookSubscribe.Event event, SipSubscribe.Event okEvent, SipSubscribe.Event errorEvent) throws InvalidArgumentException, SipException, ParseException; + void playStreamCmd(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, Device device, DeviceChannel channel, ZlmHttpHookSubscribe.Event event, SipSubscribe.Event okEvent, SipSubscribe.Event errorEvent) throws InvalidArgumentException, SipException, ParseException; /** * 请求回放视频流 diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java index cc4d4d4c9..929b5bb33 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java @@ -8,6 +8,7 @@ import com.genersoft.iot.vmp.conf.exception.SsrcTransactionNotFoundException; import com.genersoft.iot.vmp.gb28181.SipLayer; import com.genersoft.iot.vmp.gb28181.bean.Device; import com.genersoft.iot.vmp.gb28181.bean.DeviceAlarm; +import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel; import com.genersoft.iot.vmp.gb28181.bean.SsrcTransaction; import com.genersoft.iot.vmp.gb28181.bean.command.PTZCommand; import com.genersoft.iot.vmp.gb28181.event.SipSubscribe; @@ -125,12 +126,12 @@ public class SIPCommander implements ISIPCommander { * 请求预览视频流 * * @param device 视频设备 - * @param channelId 预览通道 + * @param channel 预览通道 * @param event hook订阅 * @param errorEvent sip错误订阅 */ @Override - public void playStreamCmd(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, Device device, String channelId, + public void playStreamCmd(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, Device device, DeviceChannel channel, ZlmHttpHookSubscribe.Event event, SipSubscribe.Event okEvent, SipSubscribe.Event errorEvent) throws InvalidArgumentException, SipException, ParseException { String stream = ssrcInfo.getStream(); @@ -154,7 +155,7 @@ public class SIPCommander implements ISIPCommander { } StringBuffer content = new StringBuffer(200); content.append("v=0\r\n"); - content.append("o=" + channelId + " 0 0 IN IP4 " + sdpIp + "\r\n"); + content.append("o=" + channel.getChannelId() + " 0 0 IN IP4 " + sdpIp + "\r\n"); content.append("s=Play\r\n"); content.append("c=IN IP4 " + sdpIp + "\r\n"); content.append("t=0 0\r\n"); @@ -205,20 +206,8 @@ public class SIPCommander implements ISIPCommander { } } - if( device.isSwitchPrimarySubStream() ){ - if("TP-LINK".equals(device.getManufacturer())){ - if (device.isSwitchPrimarySubStream()){ - content.append("a=streamMode:sub\r\n"); - }else { - content.append("a=streamMode:main\r\n"); - } - }else { - if (device.isSwitchPrimarySubStream()){ - content.append("a=streamprofile:1\r\n"); - }else { - content.append("a=streamprofile:0\r\n"); - } - } + if (!ObjectUtils.isEmpty(channel.getStreamIdentification())) { + content.append("a=" + channel.getStreamIdentification() + "\r\n"); } content.append("y=" + ssrcInfo.getSsrc() + "\r\n");//ssrc @@ -227,16 +216,16 @@ public class SIPCommander implements ISIPCommander { - Request request = headerProvider.createInviteRequest(device, channelId, content.toString(), SipUtils.getNewViaTag(), SipUtils.getNewFromTag(), null, ssrcInfo.getSsrc(),sipSender.getNewCallIdHeader(sipLayer.getLocalIp(device.getLocalIp()),device.getTransport())); + Request request = headerProvider.createInviteRequest(device, channel.getChannelId(), content.toString(), SipUtils.getNewViaTag(), SipUtils.getNewFromTag(), null, ssrcInfo.getSsrc(),sipSender.getNewCallIdHeader(sipLayer.getLocalIp(device.getLocalIp()),device.getTransport())); sipSender.transmitRequest(sipLayer.getLocalIp(device.getLocalIp()), request, (e -> { - streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream()); + streamSession.remove(device.getDeviceId(), channel.getChannelId(), ssrcInfo.getStream()); mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc()); errorEvent.response(e); }), e -> { ResponseEvent responseEvent = (ResponseEvent) e.event; SIPResponse response = (SIPResponse) responseEvent.getResponse(); String callId = response.getCallIdHeader().getCallId(); - streamSession.put(device.getDeviceId(), channelId, callId, stream, ssrcInfo.getSsrc(), mediaServerItem.getId(), response, + streamSession.put(device.getDeviceId(), channel.getChannelId(), callId, stream, ssrcInfo.getSsrc(), mediaServerItem.getId(), response, InviteSessionType.PLAY); okEvent.response(e); }); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java index 9570be469..1eed71d61 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java @@ -7,7 +7,6 @@ import com.genersoft.iot.vmp.conf.SipConfig; import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.gb28181.bean.*; import com.genersoft.iot.vmp.gb28181.event.EventPublisher; -import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent; import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver; import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder; import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander; @@ -39,7 +38,6 @@ import javax.sip.SipException; import javax.sip.header.FromHeader; import javax.sip.message.Response; import java.text.ParseException; -import java.util.Iterator; import java.util.List; import java.util.concurrent.ConcurrentLinkedQueue; @@ -223,7 +221,6 @@ public class NotifyRequestProcessor extends SIPRequestProcessorParent implements mobilePosition.getLongitude(), mobilePosition.getLatitude()); mobilePosition.setReportSource("Mobile Position"); - // 更新device channel 的经纬度 DeviceChannel deviceChannel = new DeviceChannel(); deviceChannel.setDeviceId(device.getDeviceId()); @@ -243,6 +240,8 @@ public class NotifyRequestProcessor extends SIPRequestProcessorParent implements } storager.updateChannelPosition(deviceChannel); + // 向关联了该通道并且开启移动位置订阅的上级平台发送移动位置订阅消息 + // 发送redis消息。 通知位置信息的变化 JSONObject jsonObject = new JSONObject(); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/KeepaliveNotifyMessageHandler.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/KeepaliveNotifyMessageHandler.java index 22c34a642..034e24f96 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/KeepaliveNotifyMessageHandler.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/KeepaliveNotifyMessageHandler.java @@ -82,8 +82,9 @@ public class KeepaliveNotifyMessageHandler extends SIPRequestProcessorParent imp device.setIp(remoteAddressInfo.getIp()); // 设备地址变化会引起目录订阅任务失效,需要重新添加 if (device.getSubscribeCycleForCatalog() > 0) { - deviceService.removeCatalogSubscribe(device); - deviceService.addCatalogSubscribe(device); + deviceService.removeCatalogSubscribe(device, result->{ + deviceService.addCatalogSubscribe(device); + }); } } if (device.getKeepaliveTime() == null) { diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/AssistRESTfulUtils.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/AssistRESTfulUtils.java index cb0d3ddff..d7c91dfbf 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/AssistRESTfulUtils.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/AssistRESTfulUtils.java @@ -2,9 +2,8 @@ package com.genersoft.iot.vmp.media.zlm; import com.alibaba.fastjson2.JSON; import com.alibaba.fastjson2.JSONObject; -import com.genersoft.iot.vmp.conf.exception.ControllerException; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; -import com.genersoft.iot.vmp.vmanager.bean.ErrorCode; +import com.genersoft.iot.vmp.utils.SSLSocketClientUtil; import okhttp3.*; import okhttp3.logging.HttpLoggingInterceptor; import org.jetbrains.annotations.NotNull; @@ -13,11 +12,10 @@ import org.slf4j.LoggerFactory; import org.springframework.stereotype.Component; import org.springframework.util.ObjectUtils; +import javax.net.ssl.X509TrustManager; import java.io.IOException; import java.net.ConnectException; -import java.net.MalformedURLException; import java.net.SocketTimeoutException; -import java.net.URL; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -61,6 +59,10 @@ public class AssistRESTfulUtils { // OkHttp進行添加攔截器loggingInterceptor httpClientBuilder.addInterceptor(logging); } + X509TrustManager manager = SSLSocketClientUtil.getX509TrustManager(); + // 设置ssl + httpClientBuilder.sslSocketFactory(SSLSocketClientUtil.getSocketFactory(manager), manager); + httpClientBuilder.hostnameVerifier(SSLSocketClientUtil.getHostnameVerifier());//忽略校验 client = httpClientBuilder.build(); } return client; diff --git a/src/main/java/com/genersoft/iot/vmp/service/IDeviceChannelService.java b/src/main/java/com/genersoft/iot/vmp/service/IDeviceChannelService.java index 978525a46..81d16cd40 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/IDeviceChannelService.java +++ b/src/main/java/com/genersoft/iot/vmp/service/IDeviceChannelService.java @@ -95,4 +95,9 @@ public interface IDeviceChannelService { * 根据通用通道ID查询通道 */ DeviceChannel getChannelByCommonChannelId(int commonGbId); + + /** + * 修改通道的码流类型 + */ + void updateChannelStreamIdentification(DeviceChannel channel); } diff --git a/src/main/java/com/genersoft/iot/vmp/service/IDeviceService.java b/src/main/java/com/genersoft/iot/vmp/service/IDeviceService.java index f3bc25255..3cd54c55f 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/IDeviceService.java +++ b/src/main/java/com/genersoft/iot/vmp/service/IDeviceService.java @@ -1,5 +1,6 @@ package com.genersoft.iot.vmp.service; +import com.genersoft.iot.vmp.common.CommonCallback; import com.genersoft.iot.vmp.gb28181.bean.Device; import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel; import com.genersoft.iot.vmp.gb28181.bean.SipTransactionInfo; @@ -40,7 +41,7 @@ public interface IDeviceService { * @param device 设备信息 * @return 布尔 */ - boolean removeCatalogSubscribe(Device device); + boolean removeCatalogSubscribe(Device device, CommonCallback callback); /** * 添加移动位置订阅 @@ -54,7 +55,7 @@ public interface IDeviceService { * @param device 设备信息 * @return 布尔 */ - boolean removeMobilePositionSubscribe(Device device); + boolean removeMobilePositionSubscribe(Device device, CommonCallback callback); /** * 移除移动位置订阅 diff --git a/src/main/java/com/genersoft/iot/vmp/service/IPlayService.java b/src/main/java/com/genersoft/iot/vmp/service/IPlayService.java index 24f12beb2..473a898e9 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/IPlayService.java +++ b/src/main/java/com/genersoft/iot/vmp/service/IPlayService.java @@ -3,6 +3,7 @@ package com.genersoft.iot.vmp.service; import com.genersoft.iot.vmp.common.StreamInfo; import com.genersoft.iot.vmp.conf.exception.ServiceException; import com.genersoft.iot.vmp.gb28181.bean.Device; +import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import com.genersoft.iot.vmp.service.bean.ErrorCallback; import com.genersoft.iot.vmp.service.bean.SSRCInfo; @@ -16,7 +17,7 @@ import java.text.ParseException; */ public interface IPlayService { - void play(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, Device device, String channelId, + void play(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, Device device, DeviceChannel channelId, ErrorCallback callback); SSRCInfo play(MediaServerItem mediaServerItem, String deviceId, String channelId, String ssrc, ErrorCallback callback); diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceChannelServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceChannelServiceImpl.java index aa286add5..7236fd74b 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceChannelServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceChannelServiceImpl.java @@ -27,6 +27,7 @@ import org.springframework.transaction.annotation.Transactional; import org.springframework.transaction.interceptor.TransactionAspectSupport; import org.springframework.util.CollectionUtils; import org.springframework.util.ObjectUtils; +import org.springframework.util.ObjectUtils; import java.util.*; import java.util.concurrent.CopyOnWriteArrayList; @@ -763,4 +764,17 @@ public class DeviceChannelServiceImpl implements IDeviceChannelService { public DeviceChannel getChannelByCommonChannelId(int commonGbId) { return channelMapper.getChannelByCommonChannelId(commonGbId); } + + @Override + public void updateChannelStreamIdentification(DeviceChannel channel) { + assert !ObjectUtils.isEmpty(channel.getDeviceId()); + assert !ObjectUtils.isEmpty(channel.getStreamIdentification()); + if (ObjectUtils.isEmpty(channel.getStreamIdentification())) { + logger.info("[重置通道码流类型] 设备: {}, 码流: {}", channel.getDeviceId(), channel.getStreamIdentification()); + }else { + logger.info("[更新通道码流类型] 设备: {}, 通道:{}, 码流: {}", channel.getDeviceId(), channel.getChannelId(), + channel.getStreamIdentification()); + } + channelMapper.updateChannelStreamIdentification(channel); + } } diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceServiceImpl.java index bd212ad7e..5407eae65 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceServiceImpl.java @@ -1,10 +1,10 @@ package com.genersoft.iot.vmp.service.impl; import com.baomidou.dynamic.datasource.annotation.DS; +import com.genersoft.iot.vmp.common.CommonCallback; import com.genersoft.iot.vmp.common.VideoManagerConstants; import com.genersoft.iot.vmp.conf.DynamicTask; import com.genersoft.iot.vmp.conf.UserSetting; -import com.genersoft.iot.vmp.conf.exception.SsrcTransactionNotFoundException; import com.genersoft.iot.vmp.gb28181.bean.*; import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager; import com.genersoft.iot.vmp.gb28181.task.ISubscribeTask; @@ -139,10 +139,6 @@ public class DeviceServiceImpl implements IDeviceService { } sync(device); }else { - - if (deviceInDb != null) { - device.setSwitchPrimarySubStream(deviceInDb.isSwitchPrimarySubStream()); - } if(!device.isOnLine()){ device.setOnLine(true); device.setCreateTime(now); @@ -240,8 +236,8 @@ public class DeviceServiceImpl implements IDeviceService { } } // 移除订阅 - removeCatalogSubscribe(device); - removeMobilePositionSubscribe(device); + removeCatalogSubscribe(device, null); + removeMobilePositionSubscribe(device, null); } @Override @@ -260,7 +256,7 @@ public class DeviceServiceImpl implements IDeviceService { } @Override - public boolean removeCatalogSubscribe(Device device) { + public boolean removeCatalogSubscribe(Device device, CommonCallback callback) { if (device == null || device.getSubscribeCycleForCatalog() < 0) { return false; } @@ -270,7 +266,7 @@ public class DeviceServiceImpl implements IDeviceService { Runnable runnable = dynamicTask.get(taskKey); if (runnable instanceof ISubscribeTask) { ISubscribeTask subscribeTask = (ISubscribeTask) runnable; - subscribeTask.stop(); + subscribeTask.stop(callback); } } dynamicTask.stop(taskKey); @@ -293,7 +289,7 @@ public class DeviceServiceImpl implements IDeviceService { } @Override - public boolean removeMobilePositionSubscribe(Device device) { + public boolean removeMobilePositionSubscribe(Device device, CommonCallback callback) { if (device == null || device.getSubscribeCycleForCatalog() < 0) { return false; } @@ -303,7 +299,7 @@ public class DeviceServiceImpl implements IDeviceService { Runnable runnable = dynamicTask.get(taskKey); if (runnable instanceof ISubscribeTask) { ISubscribeTask subscribeTask = (ISubscribeTask) runnable; - subscribeTask.stop(); + subscribeTask.stop(callback); } } dynamicTask.stop(taskKey); @@ -494,21 +490,6 @@ public class DeviceServiceImpl implements IDeviceService { logger.warn("更新设备时未找到设备信息"); return; } - if(deviceInStore.isSwitchPrimarySubStream() != device.isSwitchPrimarySubStream()){ - //当修改设备的主子码流开关时,需要校验是否存在流,如果存在流则直接关闭 - List ssrcTransactionForAll = streamSession.getSsrcTransactionForAll(device.getDeviceId(), null, null, null); - if(ssrcTransactionForAll != null){ - for (SsrcTransaction ssrcTransaction: ssrcTransactionForAll) { - try { - cmder.streamByeCmd(device, ssrcTransaction.getChannelId(), ssrcTransaction.getStream(), null, null); - } catch (InvalidArgumentException | SsrcTransactionNotFoundException | ParseException | SipException e) { - throw new RuntimeException(e); - } - } - } - deviceChannelMapper.clearPlay(device.getDeviceId()); - inviteStreamService.clearInviteInfo(device.getDeviceId()); - } if (!ObjectUtils.isEmpty(device.getName())) { deviceInStore.setName(device.getName()); @@ -531,39 +512,54 @@ public class DeviceServiceImpl implements IDeviceService { if (!ObjectUtils.isEmpty(device.getStreamMode())) { deviceInStore.setStreamMode(device.getStreamMode()); } - - // 目录订阅相关的信息 if (deviceInStore.getSubscribeCycleForCatalog() != device.getSubscribeCycleForCatalog()) { if (device.getSubscribeCycleForCatalog() > 0) { // 若已开启订阅,但订阅周期不同,则先取消 if (deviceInStore.getSubscribeCycleForCatalog() != 0) { - removeCatalogSubscribe(deviceInStore); + removeCatalogSubscribe(deviceInStore, result->{ + // 开启订阅 + deviceInStore.setSubscribeCycleForCatalog(device.getSubscribeCycleForCatalog()); + addCatalogSubscribe(deviceInStore); + // 因为是异步执行,需要在这里更新下数据 + deviceMapper.updateCustom(deviceInStore); + redisCatchStorage.updateDevice(deviceInStore); + }); + }else { + // 开启订阅 + deviceInStore.setSubscribeCycleForCatalog(device.getSubscribeCycleForCatalog()); + addCatalogSubscribe(deviceInStore); } - // 开启订阅 - deviceInStore.setSubscribeCycleForCatalog(device.getSubscribeCycleForCatalog()); - addCatalogSubscribe(deviceInStore); + }else if (device.getSubscribeCycleForCatalog() == 0) { // 取消订阅 - deviceInStore.setSubscribeCycleForCatalog(device.getSubscribeCycleForCatalog()); - removeCatalogSubscribe(deviceInStore); + deviceInStore.setSubscribeCycleForCatalog(0); + removeCatalogSubscribe(deviceInStore, null); } } - // 移动位置订阅相关的信息 - if (device.getSubscribeCycleForMobilePosition() > 0) { - if (deviceInStore.getSubscribeCycleForMobilePosition() == 0 || deviceInStore.getSubscribeCycleForMobilePosition() != device.getSubscribeCycleForMobilePosition()) { - deviceInStore.setMobilePositionSubmissionInterval(device.getMobilePositionSubmissionInterval()); - deviceInStore.setSubscribeCycleForMobilePosition(device.getSubscribeCycleForMobilePosition()); - // 开启订阅 - addMobilePositionSubscribe(deviceInStore); - } - }else if (device.getSubscribeCycleForMobilePosition() == 0) { - if (deviceInStore.getSubscribeCycleForMobilePosition() != 0) { - deviceInStore.setMobilePositionSubmissionInterval(device.getMobilePositionSubmissionInterval()); - deviceInStore.setSubscribeCycleForMobilePosition(device.getSubscribeCycleForMobilePosition()); + if (deviceInStore.getSubscribeCycleForMobilePosition() != device.getSubscribeCycleForMobilePosition()) { + if (device.getSubscribeCycleForMobilePosition() > 0) { + // 若已开启订阅,但订阅周期不同,则先取消 + if (deviceInStore.getSubscribeCycleForMobilePosition() != 0) { + removeMobilePositionSubscribe(deviceInStore, result->{ + // 开启订阅 + deviceInStore.setSubscribeCycleForMobilePosition(device.getSubscribeCycleForMobilePosition()); + addMobilePositionSubscribe(deviceInStore); + // 因为是异步执行,需要在这里更新下数据 + deviceMapper.updateCustom(deviceInStore); + redisCatchStorage.updateDevice(deviceInStore); + }); + }else { + // 开启订阅 + deviceInStore.setSubscribeCycleForMobilePosition(device.getSubscribeCycleForMobilePosition()); + addMobilePositionSubscribe(deviceInStore); + } + + }else if (device.getSubscribeCycleForMobilePosition() == 0) { // 取消订阅 - removeMobilePositionSubscribe(deviceInStore); + deviceInStore.setSubscribeCycleForCatalog(0); + removeCatalogSubscribe(deviceInStore, null); } } if (deviceInStore.getGeoCoordSys() != null) { @@ -583,9 +579,8 @@ public class DeviceServiceImpl implements IDeviceService { //作为消息通道 deviceInStore.setAsMessageChannel(device.isAsMessageChannel()); - // 更新redis deviceMapper.updateCustom(deviceInStore); - redisCatchStorage.removeDevice(deviceInStore.getDeviceId()); + redisCatchStorage.updateDevice(deviceInStore); } @Override diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/PlatformServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/PlatformServiceImpl.java index d50c13089..e80f39517 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/PlatformServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/PlatformServiceImpl.java @@ -24,22 +24,18 @@ import com.genersoft.iot.vmp.utils.DateUtil; import com.genersoft.iot.vmp.vmanager.bean.ErrorCode; import com.github.pagehelper.PageHelper; import com.github.pagehelper.PageInfo; -import gov.nist.javax.sip.message.SIPRequest; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import javax.sip.InvalidArgumentException; -import javax.sip.PeerUnavailableException; import javax.sip.SipException; -import javax.sip.SipFactory; -import javax.sip.address.Address; -import javax.sip.address.SipURI; -import javax.sip.header.*; -import javax.sip.message.Request; import java.text.ParseException; -import java.util.*; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; /** * @author lin @@ -359,7 +355,6 @@ public class PlatformServiceImpl implements IPlatformService { ()-> registerTask(platform, null), userSetting.getRegisterAgainAfterTime() * 1000); } - } } @@ -459,7 +454,8 @@ public class PlatformServiceImpl implements IPlatformService { } // 删除缓存的订阅信息 subscribeHolder.removeAllSubscribe(parentPlatform.getId()); - + parentPlatform.setEnable(false); + update(parentPlatform); // 发送注销的请求 if (parentPlatformCatch != null && parentPlatformCatch.getSipTransactionInfo() != null) { // 发送离线消息,无论是否成功都删除缓存 diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java index b2bcbf434..64fd2a82f 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java @@ -15,7 +15,7 @@ import com.genersoft.iot.vmp.conf.exception.SsrcTransactionNotFoundException; import com.genersoft.iot.vmp.gb28181.bean.*; import com.genersoft.iot.vmp.gb28181.event.SipSubscribe; import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager; -import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander; +import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander; import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommanderFroPlatform; import com.genersoft.iot.vmp.gb28181.utils.SipUtils; import com.genersoft.iot.vmp.media.zlm.IStreamSendManager; @@ -37,6 +37,11 @@ import com.genersoft.iot.vmp.service.bean.DownloadFileInfo; import com.genersoft.iot.vmp.service.bean.ErrorCallback; import com.genersoft.iot.vmp.service.bean.InviteErrorCode; import com.genersoft.iot.vmp.service.bean.SSRCInfo; +import com.genersoft.iot.vmp.service.*; +import com.genersoft.iot.vmp.service.bean.DownloadFileInfo; +import com.genersoft.iot.vmp.service.bean.ErrorCallback; +import com.genersoft.iot.vmp.service.bean.InviteErrorCode; +import com.genersoft.iot.vmp.service.bean.SSRCInfo; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.IVideoManagerStorage; import com.genersoft.iot.vmp.storager.dao.DeviceChannelMapper; @@ -74,7 +79,7 @@ public class PlayServiceImpl implements IPlayService { private IVideoManagerStorage storager; @Autowired - private SIPCommander cmder; + private ISIPCommander cmder; @Autowired private SIPCommanderFroPlatform sipCommanderFroPlatform; @@ -109,6 +114,9 @@ public class PlayServiceImpl implements IPlayService { @Autowired private DeviceMapper deviceMapper; + @Autowired + private IDeviceChannelService channelService; + @Autowired private UserSetting userSetting; @@ -131,6 +139,11 @@ public class PlayServiceImpl implements IPlayService { logger.warn("[点播] 单端口收流时不支持TCP主动方式收流 deviceId: {},channelId:{}", deviceId, channelId); throw new ControllerException(ErrorCode.ERROR100.getCode(), "单端口收流时不支持TCP主动方式收流"); } + DeviceChannel channel = channelService.getOne(deviceId, channelId); + if (channel == null) { + logger.warn("[点播] 未找到通道 deviceId: {},channelId:{}", deviceId, channelId); + throw new ControllerException(ErrorCode.ERROR100.getCode(), "未找到通道"); + } InviteInfo inviteInfo = inviteStreamService.getInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, deviceId, channelId); if (inviteInfo != null ) { if (inviteInfo.getStreamInfo() == null) { @@ -179,13 +192,13 @@ public class PlayServiceImpl implements IPlayService { null); return null; } - play(mediaServerItem, ssrcInfo, device, channelId, callback); + play(mediaServerItem, ssrcInfo, device, channel, callback); return ssrcInfo; } @Override - public void play(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, Device device, String channelId, + public void play(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, Device device, DeviceChannel channel, ErrorCallback callback) { if (mediaServerItem == null || ssrcInfo == null) { @@ -194,27 +207,26 @@ public class PlayServiceImpl implements IPlayService { null); return; } - logger.info("[点播开始] deviceId: {}, channelId: {},码流类型:{}, 收流端口: {}, STREAM:{}, 收流模式:{}, SSRC: {}, SSRC校验:{}", - device.getDeviceId(), channelId, device.isSwitchPrimarySubStream() ? "辅码流" : "主码流", ssrcInfo.getPort(), ssrcInfo.getStream(), + logger.info("[点播开始] deviceId: {}, channelId: {},码流类型:{}, 收流端口: {}, 码流:{}, 收流模式:{}, SSRC: {}, SSRC校验:{}", + device.getDeviceId(), channel.getChannelId(), channel.getStreamIdentification(), ssrcInfo.getPort(), ssrcInfo.getStream(), device.getStreamMode(), ssrcInfo.getSsrc(), device.isSsrcCheck()); //端口获取失败的ssrcInfo 没有必要发送点播指令 if (ssrcInfo.getPort() <= 0) { - logger.info("[点播端口分配异常],deviceId={},channelId={},ssrcInfo={}", device.getDeviceId(), channelId, ssrcInfo); + logger.info("[点播端口分配异常],deviceId={},channelId={},ssrcInfo={}", device.getDeviceId(), channel.getChannelId(), ssrcInfo); // 释放ssrc mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc()); - streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream()); + streamSession.remove(device.getDeviceId(), channel.getChannelId(), ssrcInfo.getStream()); callback.run(InviteErrorCode.ERROR_FOR_RESOURCE_EXHAUSTION.getCode(), "点播端口分配异常", null); - inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId, null, + inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channel.getChannelId(), null, InviteErrorCode.ERROR_FOR_RESOURCE_EXHAUSTION.getCode(), "点播端口分配异常", null); return; } // 初始化redis中的invite消息状态 - InviteInfo inviteInfo = InviteInfo.getInviteInfo(device.getDeviceId(), channelId, ssrcInfo.getStream(), ssrcInfo, + InviteInfo inviteInfo = InviteInfo.getInviteInfo(device.getDeviceId(), channel.getChannelId(), ssrcInfo.getStream(), ssrcInfo, mediaServerItem.getSdpIp(), ssrcInfo.getPort(), device.getStreamMode(), InviteSessionType.PLAY, InviteSessionStatus.ready); - inviteInfo.setSubStream(device.isSwitchPrimarySubStream()); inviteStreamService.updateInviteInfo(inviteInfo); // 超时处理 String timeOutTaskKey = UUID.randomUUID().toString(); @@ -224,44 +236,48 @@ public class PlayServiceImpl implements IPlayService { subscribe.removeSubscribe(hookSubscribe); // 执行超时任务时查询是否已经成功,成功了则不执行超时任务,防止超时任务取消失败的情况 - InviteInfo inviteInfoForTimeOut = inviteStreamService.getInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, device.getDeviceId(), channelId); - int code; - String msg; - if (inviteInfoForTimeOut == null || inviteInfoForTimeOut.getStatus().equals(InviteSessionStatus.ready)) { - logger.info("[点播超时] 信令超时 deviceId: {}, channelId: {},码流类型:{},端口:{}, SSRC: {}", - device.getDeviceId(), channelId, device.isSwitchPrimarySubStream() ? "辅码流" : "主码流", + InviteInfo inviteInfoForTimeOut = inviteStreamService.getInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, device.getDeviceId(), channel.getChannelId()); + if (inviteInfoForTimeOut == null || inviteInfoForTimeOut.getStreamInfo() == null) { + logger.info("[点播超时] 收流超时 deviceId: {}, channelId: {},码流:{},端口:{}, SSRC: {}", + device.getDeviceId(), channel.getChannelId(), channel.getStreamIdentification(), ssrcInfo.getPort(), ssrcInfo.getSsrc()); - callback.run(InviteErrorCode.ERROR_FOR_SIGNALLING_TIMEOUT.getCode(), InviteErrorCode.ERROR_FOR_SIGNALLING_TIMEOUT.getMsg(), null); - code = InviteErrorCode.ERROR_FOR_SIGNALLING_TIMEOUT.getCode(); - msg = InviteErrorCode.ERROR_FOR_SIGNALLING_TIMEOUT.getMsg(); - }else { - logger.info("[点播超时] 收流超时 deviceId: {}, channelId: {},码流类型:{},端口:{}, SSRC: {}", - device.getDeviceId(), channelId, device.isSwitchPrimarySubStream() ? "辅码流" : "主码流", - ssrcInfo.getPort(), ssrcInfo.getSsrc()); - code = InviteErrorCode.ERROR_FOR_STREAM_TIMEOUT.getCode(); - msg = InviteErrorCode.ERROR_FOR_STREAM_TIMEOUT.getMsg(); + + callback.run(InviteErrorCode.ERROR_FOR_STREAM_TIMEOUT.getCode(), InviteErrorCode.ERROR_FOR_STREAM_TIMEOUT.getMsg(), null); + inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channel.getChannelId(), null, + InviteErrorCode.ERROR_FOR_STREAM_TIMEOUT.getCode(), InviteErrorCode.ERROR_FOR_STREAM_TIMEOUT.getMsg(), null); + inviteStreamService.removeInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, device.getDeviceId(), channel.getChannelId()); try { - cmder.streamByeCmd(device, channelId, ssrcInfo.getStream(), null); + cmder.streamByeCmd(device, channel.getChannelId(), ssrcInfo.getStream(), null); } catch (InvalidArgumentException | ParseException | SipException | SsrcTransactionNotFoundException e) { - logger.error("[点播超时],发送BYE失败 {}", e.getMessage()); + logger.error("[点播超时], 发送BYE失败 {}", e.getMessage()); + } finally { + mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc()); + mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream()); + streamSession.remove(device.getDeviceId(), channel.getChannelId(), ssrcInfo.getStream()); + mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream()); + // 取消订阅消息监听 + HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed("rtp", ssrcInfo.getStream(), true, "rtsp", mediaServerItem.getId()); + subscribe.removeSubscribe(hookSubscribe); } - } - callback.run(code, msg, null); - inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId, null, code, msg, null); - inviteStreamService.removeInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, device.getDeviceId(), channelId); - mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc()); - streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream()); - mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream()); + }else { + logger.info("[点播超时] 收流超时 deviceId: {}, channelId: {},码流:{},端口:{}, SSRC: {}", + device.getDeviceId(), channel.getChannelId(), channel.getStreamIdentification(), + ssrcInfo.getPort(), ssrcInfo.getSsrc()); + mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc()); + + mediaServerService.closeRTPServer(mediaServerItem.getId(), ssrcInfo.getStream()); + streamSession.remove(device.getDeviceId(), channel.getChannelId(), ssrcInfo.getStream()); + } }, userSetting.getPlayTimeout()); try { - cmder.playStreamCmd(mediaServerItem, ssrcInfo, device, channelId, (mediaServerItemInuse, hookParam ) -> { + cmder.playStreamCmd(mediaServerItem, ssrcInfo, device, channel, (mediaServerItemInuse, hookParam ) -> { logger.info("收到订阅消息: " + hookParam); dynamicTask.stop(timeOutTaskKey); // hook响应 - StreamInfo streamInfo = onPublishHandlerForPlay(mediaServerItemInuse, hookParam, device.getDeviceId(), channelId); + StreamInfo streamInfo = onPublishHandlerForPlay(mediaServerItemInuse, hookParam, device.getDeviceId(), channel.getChannelId()); if (streamInfo == null){ try { callback.run(InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getCode(), @@ -270,7 +286,7 @@ public class PlayServiceImpl implements IPlayService { logger.warn("[invite hook响应] 发送回调失败", e); } try { - inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId, null, + inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channel.getChannelId(), null, InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getCode(), InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getMsg(), null); }catch (Exception e) { @@ -278,16 +294,16 @@ public class PlayServiceImpl implements IPlayService { } return; } - logger.info("[点播成功] deviceId: {}, channelId:{}, 码流类型:{}", device.getDeviceId(), channelId, - device.isSwitchPrimarySubStream() ? "辅码流" : "主码流"); - snapOnPlay(mediaServerItemInuse, device.getDeviceId(), channelId, ssrcInfo.getStream()); + logger.info("[点播成功] deviceId: {}, channelId:{}, 码流类型:{}", device.getDeviceId(), channel.getChannelId(), + channel.getStreamIdentification()); + snapOnPlay(mediaServerItemInuse, device.getDeviceId(), channel.getChannelId(), ssrcInfo.getStream()); try { callback.run(InviteErrorCode.SUCCESS.getCode(), InviteErrorCode.SUCCESS.getMsg(), streamInfo); }catch (Exception e) { logger.warn("[invite] 发送回调失败", e); } try { - inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId, null, + inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channel.getChannelId(), null, InviteErrorCode.SUCCESS.getCode(), InviteErrorCode.SUCCESS.getMsg(), streamInfo); @@ -296,10 +312,10 @@ public class PlayServiceImpl implements IPlayService { } }, (eventResult) -> { // 处理收到200ok后的TCP主动连接以及SSRC不一致的问题 - InviteOKHandler(eventResult, ssrcInfo, mediaServerItem, device, channelId, + InviteOKHandler(eventResult, ssrcInfo, mediaServerItem, device, channel.getChannelId(), timeOutTaskKey, callback, inviteInfo, InviteSessionType.PLAY); }, (event) -> { - logger.info("[点播失败] deviceId: {}, channelId:{}, {}: {}", device.getDeviceId(), channelId, event.statusCode, event.msg); + logger.info("[点播失败] deviceId: {}, channelId:{}, {}: {}", device.getDeviceId(), channel.getChannelId(), event.statusCode, event.msg); dynamicTask.stop(timeOutTaskKey); mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream()); // 释放ssrc @@ -330,9 +346,8 @@ public class PlayServiceImpl implements IPlayService { // 释放ssrc mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc()); - streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream()); + streamSession.remove(device.getDeviceId(), channel.getChannelId(), ssrcInfo.getStream()); - inviteStreamService.removeInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, device.getDeviceId(), channelId); try { callback.run(InviteErrorCode.ERROR_FOR_SIP_SENDING_FAILED.getCode(), InviteErrorCode.ERROR_FOR_SIP_SENDING_FAILED.getMsg(), null); @@ -346,6 +361,7 @@ public class PlayServiceImpl implements IPlayService { }catch (Exception exception) { logger.warn("[invite] 发送回调失败", exception); } + inviteStreamService.removeInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, device.getDeviceId(), channel.getChannelId()); } } @@ -515,7 +531,7 @@ public class PlayServiceImpl implements IPlayService { @Override public void playBack(String deviceId, String channelId, String startTime, - String endTime, ErrorCallback callback) { + String endTime, ErrorCallback callback) { Device device = storager.queryVideoDevice(deviceId); if (device == null) { logger.warn("[录像回放] 未找到设备 deviceId: {}, channelId:{},startTime:{}, endTime:{}", @@ -541,8 +557,8 @@ public class PlayServiceImpl implements IPlayService { @Override public void playBack(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, - String deviceId, String channelId, String startTime, - String endTime, ErrorCallback callback) { + String deviceId, String channelId, String startTime, + String endTime, ErrorCallback callback) { if (mediaServerItem == null || ssrcInfo == null) { callback.run(InviteErrorCode.ERROR_FOR_PARAMETER_ERROR.getCode(), InviteErrorCode.ERROR_FOR_PARAMETER_ERROR.getMsg(), @@ -963,7 +979,7 @@ public class PlayServiceImpl implements IPlayService { cmder.streamByeCmd(device, ssrcTransaction.getChannelId(), ssrcTransaction.getStream(), null); } catch (InvalidArgumentException | ParseException | SipException | - SsrcTransactionNotFoundException e) { + SsrcTransactionNotFoundException e) { logger.error("[zlm离线]为正在使用此zlm的设备, 发送BYE失败 {}", e.getMessage()); } } @@ -1116,16 +1132,16 @@ public class PlayServiceImpl implements IPlayService { MediaServerItem newMediaServerItem = getNewMediaServerItem(device); play(newMediaServerItem, deviceId, channelId, null, (code, msg, data)->{ - if (code == InviteErrorCode.SUCCESS.getCode()) { - InviteInfo inviteInfoForPlay = inviteStreamService.getInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, deviceId, channelId); - if (inviteInfoForPlay != null && inviteInfoForPlay.getStreamInfo() != null) { - getSnap(deviceId, channelId, fileName, errorCallback); - }else { - errorCallback.run(InviteErrorCode.FAIL.getCode(), InviteErrorCode.FAIL.getMsg(), null); - } - }else { - errorCallback.run(InviteErrorCode.FAIL.getCode(), InviteErrorCode.FAIL.getMsg(), null); - } + if (code == InviteErrorCode.SUCCESS.getCode()) { + InviteInfo inviteInfoForPlay = inviteStreamService.getInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, deviceId, channelId); + if (inviteInfoForPlay != null && inviteInfoForPlay.getStreamInfo() != null) { + getSnap(deviceId, channelId, fileName, errorCallback); + }else { + errorCallback.run(InviteErrorCode.FAIL.getCode(), InviteErrorCode.FAIL.getMsg(), null); + } + }else { + errorCallback.run(InviteErrorCode.FAIL.getCode(), InviteErrorCode.FAIL.getMsg(), null); + } }); } diff --git a/src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceChannelMapper.java b/src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceChannelMapper.java index 0994479a2..869e44281 100755 --- a/src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceChannelMapper.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceChannelMapper.java @@ -20,11 +20,11 @@ public interface DeviceChannelMapper { @Insert("INSERT INTO wvp_device_channel (channel_id, device_id, name, manufacture, model, owner, civil_code, block, " + "address, parental, parent_id, safety_way, register_way, cert_num, certifiable, err_code, secrecy, " + "ip_address, port, password, ptz_type, status, stream_id, longitude, latitude, longitude_gcj02, latitude_gcj02, " + - "longitude_wgs84, latitude_wgs84, has_audio, create_time, update_time, business_group_id, gps_time) " + + "longitude_wgs84, latitude_wgs84, has_audio, create_time, update_time, business_group_id, gps_time, stream_identification) " + "VALUES (#{channelId}, #{deviceId}, #{name}, #{manufacture}, #{model}, #{owner}, #{civilCode}, #{block}," + "#{address}, #{parental}, #{parentId}, #{safetyWay}, #{registerWay}, #{certNum}, #{certifiable}, #{errCode}, #{secrecy}, " + "#{ipAddress}, #{port}, #{password}, #{PTZType}, #{status}, #{streamId}, #{longitude}, #{latitude}, #{longitudeGcj02}, " + - "#{latitudeGcj02}, #{longitudeWgs84}, #{latitudeWgs84}, #{hasAudio}, #{createTime}, #{updateTime}, #{businessGroupId}, #{gpsTime})") + "#{latitudeGcj02}, #{longitudeWgs84}, #{latitudeWgs84}, #{hasAudio}, #{createTime}, #{updateTime}, #{businessGroupId}, #{gpsTime}, #{streamIdentification})") int add(DeviceChannel channel); @Update(value = {" "}) int update(DeviceChannel channel); @@ -102,6 +103,7 @@ public interface DeviceChannelMapper { "dc.longitude_wgs84, " + "dc.latitude_wgs84, " + "dc.business_group_id, " + + "dc.stream_identification, " + "dc.gps_time " + "from " + "wvp_device_channel dc " + @@ -215,7 +217,7 @@ public interface DeviceChannelMapper { " address, parental, parent_id, safety_way, register_way, cert_num, certifiable, err_code, secrecy, " + " ip_address,port,password,ptz_type,status,stream_id,longitude,latitude,longitude_gcj02,latitude_gcj02,"+ " longitude_wgs84,latitude_wgs84,has_audio,create_time,update_time,business_group_id,gps_time," + - " common_gb_channel_id)"+ + " common_gb_channel_id, stream_identification)"+ " values " + " " + "(#{item.channelId}, #{item.deviceId}, #{item.name}, #{item.manufacture}, #{item.model}, " + @@ -225,7 +227,7 @@ public interface DeviceChannelMapper { "#{item.ipAddress}, #{item.port}, #{item.password}, #{item.PTZType}, #{item.status}, " + "#{item.streamId}, #{item.longitude}, #{item.latitude},#{item.longitudeGcj02}, " + "#{item.latitudeGcj02},#{item.longitudeWgs84}, #{item.latitudeWgs84}, #{item.hasAudio}, now(), now(), " + - "#{item.businessGroupId}, #{item.gpsTime}, #{item.commonGbChannelId}) " + + "#{item.businessGroupId}, #{item.gpsTime}, #{item.commonGbChannelId}, #{item.streamIdentification}) " + " " + "") int batchAdd(@Param("addChannels") List addChannels); @@ -325,6 +327,7 @@ public interface DeviceChannelMapper { ", business_group_id=#{item.businessGroupId}" + ", gps_time=#{item.gpsTime}" + ", common_gb_channel_id=#{item.commonGbChannelId}" + + ", stream_identification=#{item.streamIdentification}" + "WHERE id=#{item.id}" + "WHERE device_id=#{item.deviceId} AND channel_id=#{item.channelId}" + "" + @@ -540,4 +543,9 @@ public interface DeviceChannelMapper { @Select("select de.* from wvp_device de left join wvp_device_channel dc on de.device_id = dc.device_id where dc.common_gb_channel_id=#{commonGbId}") Device getDeviceByChannelCommonGbId(@Param("commonGbId") int commonGbId); + @Update("") + void updateChannelStreamIdentification(DeviceChannel channel); } diff --git a/src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceMapper.java b/src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceMapper.java index 88a2128e7..65c491bbf 100755 --- a/src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceMapper.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceMapper.java @@ -44,7 +44,6 @@ public interface DeviceMapper { "geo_coord_sys," + "on_line," + "media_server_id," + - "switch_primary_sub_stream," + "(SELECT count(0) FROM wvp_device_channel WHERE device_id=wvp_device.device_id) as channel_count "+ " FROM wvp_device WHERE device_id = #{deviceId}") Device getDeviceByDeviceId(String deviceId); @@ -161,7 +160,6 @@ public interface DeviceMapper { "geo_coord_sys,"+ "on_line,"+ "media_server_id,"+ - "switch_primary_sub_stream switchPrimarySubStream,"+ "(SELECT count(0) FROM wvp_device_channel WHERE device_id=de.device_id) as channel_count " + "FROM wvp_device de" + " where on_line=${onLine}"+ @@ -251,7 +249,6 @@ public interface DeviceMapper { ", ssrc_check=#{ssrcCheck}" + ", as_message_channel=#{asMessageChannel}" + ", geo_coord_sys=#{geoCoordSys}" + - ", switch_primary_sub_stream=#{switchPrimarySubStream}" + ", media_server_id=#{mediaServerId}" + "WHERE device_id=#{deviceId}"+ " "}) @@ -269,8 +266,7 @@ public interface DeviceMapper { "as_message_channel,"+ "geo_coord_sys,"+ "on_line,"+ - "media_server_id,"+ - "switch_primary_sub_stream"+ + "media_server_id"+ ") VALUES (" + "#{deviceId}," + "#{name}," + @@ -283,8 +279,7 @@ public interface DeviceMapper { "#{asMessageChannel}," + "#{geoCoordSys}," + "#{onLine}," + - "#{mediaServerId}," + - "#{switchPrimarySubStream}" + + "#{mediaServerId}" + ")") void addCustomDevice(Device device); diff --git a/src/main/java/com/genersoft/iot/vmp/utils/DateUtil.java b/src/main/java/com/genersoft/iot/vmp/utils/DateUtil.java index e5f9fe430..f8ba4f2b2 100755 --- a/src/main/java/com/genersoft/iot/vmp/utils/DateUtil.java +++ b/src/main/java/com/genersoft/iot/vmp/utils/DateUtil.java @@ -30,6 +30,11 @@ public class DateUtil { */ private static final String ISO8601_PATTERN = "yyyy-MM-dd'T'HH:mm:ss"; + /** + * iso8601时间格式带时区,例如:2024-02-21T11:10:36+08:00 + */ + private static final String ISO8601_ZONE_PATTERN = "yyyy-MM-dd'T'HH:mm:ssXXX"; + /** * wvp内部统一时间格式 */ @@ -49,6 +54,7 @@ public class DateUtil { public static final DateTimeFormatter formatterCompatibleISO8601 = DateTimeFormatter.ofPattern(ISO8601_COMPATIBLE_PATTERN, Locale.getDefault()).withZone(ZoneId.of(zoneStr)); public static final DateTimeFormatter formatterISO8601 = DateTimeFormatter.ofPattern(ISO8601_PATTERN, Locale.getDefault()).withZone(ZoneId.of(zoneStr)); + public static final DateTimeFormatter formatterZoneISO8601 = DateTimeFormatter.ofPattern(ISO8601_ZONE_PATTERN, Locale.getDefault()).withZone(ZoneId.of(zoneStr)); public static final DateTimeFormatter formatter = DateTimeFormatter.ofPattern(PATTERN, Locale.getDefault()).withZone(ZoneId.of(zoneStr)); public static final DateTimeFormatter DateFormatter = DateTimeFormatter.ofPattern(date_PATTERN, Locale.getDefault()).withZone(ZoneId.of(zoneStr)); public static final DateTimeFormatter urlFormatter = DateTimeFormatter.ofPattern(URL_PATTERN, Locale.getDefault()).withZone(ZoneId.of(zoneStr)); @@ -59,7 +65,13 @@ public class DateUtil { } public static String ISO8601Toyyyy_MM_dd_HH_mm_ss(String formatTime) { - return formatter.format(formatterCompatibleISO8601.parse(formatTime)); + // 三种日期格式都尝试,为了兼容不同厂家的日期格式 + if (verification(formatTime, formatterCompatibleISO8601)) { + return formatter.format(formatterCompatibleISO8601.parse(formatTime)); + } else if (verification(formatTime, formatterZoneISO8601)) { + return formatter.format(formatterZoneISO8601.parse(formatTime)); + } + return formatter.format(formatterISO8601.parse(formatTime)); } public static String urlToyyyy_MM_dd_HH_mm_ss(String formatTime) { diff --git a/src/main/java/com/genersoft/iot/vmp/utils/SSLSocketClientUtil.java b/src/main/java/com/genersoft/iot/vmp/utils/SSLSocketClientUtil.java new file mode 100644 index 000000000..c85b16330 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/utils/SSLSocketClientUtil.java @@ -0,0 +1,53 @@ +package com.genersoft.iot.vmp.utils; + +import javax.net.ssl.*; +import java.security.KeyManagementException; +import java.security.NoSuchAlgorithmException; +import java.security.SecureRandom; +import java.security.cert.CertificateException; +import java.security.cert.X509Certificate; + +public class SSLSocketClientUtil { + public static SSLSocketFactory getSocketFactory(TrustManager manager) { + SSLSocketFactory socketFactory = null; + try { + SSLContext sslContext = SSLContext.getInstance("SSL"); + sslContext.init(null, new TrustManager[]{manager}, new SecureRandom()); + socketFactory = sslContext.getSocketFactory(); + } catch (NoSuchAlgorithmException e) { + e.printStackTrace(); + } catch (KeyManagementException e) { + e.printStackTrace(); + } + return socketFactory; + } + + public static X509TrustManager getX509TrustManager() { + return new X509TrustManager() { + @Override + public void checkClientTrusted(X509Certificate[] chain, String authType) throws CertificateException { + + } + + @Override + public void checkServerTrusted(X509Certificate[] chain, String authType) throws CertificateException { + + } + + @Override + public X509Certificate[] getAcceptedIssuers() { + return new X509Certificate[0]; + } + }; + } + + public static HostnameVerifier getHostnameVerifier() { + HostnameVerifier hostnameVerifier = new HostnameVerifier() { + @Override + public boolean verify(String s, SSLSession sslSession) { + return true; + } + }; + return hostnameVerifier; + } +} diff --git a/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/MobilePosition/MobilePositionController.java b/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/MobilePosition/MobilePositionController.java index 91c992fa0..05501cc44 100755 --- a/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/MobilePosition/MobilePositionController.java +++ b/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/MobilePosition/MobilePositionController.java @@ -153,10 +153,7 @@ public class MobilePositionController { Device device = storager.queryVideoDevice(deviceId); device.setSubscribeCycleForMobilePosition(Integer.parseInt(expires)); device.setMobilePositionSubmissionInterval(Integer.parseInt(interval)); - deviceService.updateDevice(device); - if (!deviceService.removeMobilePositionSubscribe(device)) { - throw new ControllerException(ErrorCode.ERROR100); - } + deviceService.updateCustomDevice(device); } /** diff --git a/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/device/DeviceQuery.java b/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/device/DeviceQuery.java index bdf03890b..0c1b278da 100755 --- a/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/device/DeviceQuery.java +++ b/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/device/DeviceQuery.java @@ -212,7 +212,7 @@ public class DeviceQuery { Runnable runnable = dynamicTask.get(key); if (runnable instanceof ISubscribeTask) { ISubscribeTask subscribeTask = (ISubscribeTask) runnable; - subscribeTask.stop(); + subscribeTask.stop(null); } dynamicTask.stop(key); } @@ -283,6 +283,14 @@ public class DeviceQuery { deviceChannelService.updateChannel(deviceId, deviceChannel); } + @Operation(summary = "修改通道的码流类型", security = @SecurityRequirement(name = JwtUtils.HEADER)) + @Parameter(name = "deviceId", description = "设备国标编号", required = true) + @Parameter(name = "channel", description = "通道信息", required = true) + @PostMapping("/channel/stream/identification/update/") + public void updateChannelStreamIdentification(DeviceChannel channel){ + deviceChannelService.updateChannelStreamIdentification(channel); + } + /** * 修改数据流传输模式 * @param deviceId 设备id diff --git a/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/play/PlayController.java b/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/play/PlayController.java index d5badf98b..02533c53f 100755 --- a/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/play/PlayController.java +++ b/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/play/PlayController.java @@ -141,6 +141,9 @@ public class PlayController { streamInfo.changeStreamIp(host); } wvpResult.setData(new StreamContent(streamInfo)); + }else { + wvpResult.setCode(code); + wvpResult.setMsg(msg); } }else { wvpResult.setCode(code); diff --git a/src/main/java/com/genersoft/iot/vmp/web/gb28181/ApiControlController.java b/src/main/java/com/genersoft/iot/vmp/web/gb28181/ApiControlController.java index 286c8d57a..06ba33cec 100644 --- a/src/main/java/com/genersoft/iot/vmp/web/gb28181/ApiControlController.java +++ b/src/main/java/com/genersoft/iot/vmp/web/gb28181/ApiControlController.java @@ -8,6 +8,7 @@ import com.genersoft.iot.vmp.vmanager.bean.ErrorCode; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; @@ -41,7 +42,7 @@ public class ApiControlController { * @param speed 速度(0~255) 默认值: 129 * @return */ - @RequestMapping(value = "/ptz") + @GetMapping(value = "/ptz") private void list(String serial,String command, @RequestParam(required = false)Integer channel, @RequestParam(required = false)String code, @@ -114,7 +115,7 @@ public class ApiControlController { * @param name 预置位名称, command=set 时有效 * @return */ - @RequestMapping(value = "/preset") + @GetMapping(value = "/preset") private void list(String serial,String command, @RequestParam(required = false)Integer channel, @RequestParam(required = false)String code, diff --git a/src/main/java/com/genersoft/iot/vmp/web/gb28181/ApiController.java b/src/main/java/com/genersoft/iot/vmp/web/gb28181/ApiController.java index bd5d5ef3f..3457cf993 100644 --- a/src/main/java/com/genersoft/iot/vmp/web/gb28181/ApiController.java +++ b/src/main/java/com/genersoft/iot/vmp/web/gb28181/ApiController.java @@ -6,6 +6,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Controller; +import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.ResponseBody; @@ -23,7 +24,7 @@ public class ApiController { private SipConfig sipConfig; - @RequestMapping("/getserverinfo") + @GetMapping("/getserverinfo") private JSONObject getserverinfo(){ JSONObject result = new JSONObject(); result.put("Authorization","ceshi"); @@ -50,7 +51,7 @@ public class ApiController { return result; } - @RequestMapping(value = "/userinfo") + @GetMapping(value = "/userinfo") private JSONObject userinfo(){ // JSONObject result = new JSONObject(); // result.put("ID","ceshi"); @@ -83,7 +84,7 @@ public class ApiController { * @param password 密码(经过md5加密,32位长度,不带中划线,不区分大小写) * @return */ - @RequestMapping(value = "/login") + @GetMapping(value = "/login") @ResponseBody private JSONObject login(String username,String password ){ if (logger.isDebugEnabled()) { diff --git a/src/main/java/com/genersoft/iot/vmp/web/gb28181/ApiDeviceController.java b/src/main/java/com/genersoft/iot/vmp/web/gb28181/ApiDeviceController.java index c3245ec53..45df56d63 100644 --- a/src/main/java/com/genersoft/iot/vmp/web/gb28181/ApiDeviceController.java +++ b/src/main/java/com/genersoft/iot/vmp/web/gb28181/ApiDeviceController.java @@ -20,6 +20,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.util.ObjectUtils; +import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; @@ -61,7 +62,7 @@ public class ApiDeviceController { * @param online * @return */ - @RequestMapping(value = "/list") + @GetMapping(value = "/list") public JSONObject list( @RequestParam(required = false)Integer start, @RequestParam(required = false)Integer limit, @RequestParam(required = false)String q, @@ -107,7 +108,7 @@ public class ApiDeviceController { return result; } - @RequestMapping(value = "/channellist") + @GetMapping(value = "/channellist") public JSONObject channellist( String serial, @RequestParam(required = false)String channel_type, @RequestParam(required = false)String code , @@ -188,7 +189,7 @@ public class ApiDeviceController { * @param timeout 超时时间(秒) 默认值: 15 * @return */ - @RequestMapping(value = "/fetchpreset") + @GetMapping(value = "/fetchpreset") private DeferredResult list(String serial, @RequestParam(required = false)Integer channel, @RequestParam(required = false)String code, diff --git a/src/main/java/com/genersoft/iot/vmp/web/gb28181/ApiStreamController.java b/src/main/java/com/genersoft/iot/vmp/web/gb28181/ApiStreamController.java index ef7c6bb3b..63d9f310d 100644 --- a/src/main/java/com/genersoft/iot/vmp/web/gb28181/ApiStreamController.java +++ b/src/main/java/com/genersoft/iot/vmp/web/gb28181/ApiStreamController.java @@ -3,6 +3,7 @@ package com.genersoft.iot.vmp.web.gb28181; import com.alibaba.fastjson2.JSONObject; import com.genersoft.iot.vmp.common.InviteInfo; import com.genersoft.iot.vmp.common.InviteSessionType; +import com.genersoft.iot.vmp.common.StreamInfo; import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.conf.exception.SsrcTransactionNotFoundException; import com.genersoft.iot.vmp.gb28181.bean.Device; @@ -13,15 +14,11 @@ import com.genersoft.iot.vmp.service.IDeviceService; import com.genersoft.iot.vmp.service.IInviteStreamService; import com.genersoft.iot.vmp.service.IPlayService; import com.genersoft.iot.vmp.service.bean.InviteErrorCode; -import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.IVideoManagerStorage; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.web.bind.annotation.RequestMapping; -import org.springframework.web.bind.annotation.RequestParam; -import org.springframework.web.bind.annotation.ResponseBody; -import org.springframework.web.bind.annotation.RestController; +import org.springframework.web.bind.annotation.*; import org.springframework.web.context.request.async.DeferredResult; import javax.sip.InvalidArgumentException; @@ -48,9 +45,6 @@ public class ApiStreamController { @Autowired private UserSetting userSetting; - @Autowired - private IRedisCatchStorage redisCatchStorage; - @Autowired private IDeviceService deviceService; @@ -73,7 +67,7 @@ public class ApiStreamController { * @param timeout 拉流超时(秒), * @return */ - @RequestMapping(value = "/start") + @GetMapping("/start") private DeferredResult start(String serial , @RequestParam(required = false)Integer channel , @RequestParam(required = false)String code, @@ -85,107 +79,111 @@ public class ApiStreamController { @RequestParam(required = false)String timeout ){ - DeferredResult resultDeferredResult = new DeferredResult<>(userSetting.getPlayTimeout().longValue() + 10); + DeferredResult result = new DeferredResult<>(userSetting.getPlayTimeout().longValue() + 10); Device device = storager.queryVideoDevice(serial); if (device == null ) { - JSONObject result = new JSONObject(); - result.put("error","device[ " + serial + " ]未找到"); - resultDeferredResult.setResult(result); - return resultDeferredResult; + JSONObject resultJSON = new JSONObject(); + resultJSON.put("error","device[ " + serial + " ]未找到"); + result.setResult(resultJSON); + return result; }else if (!device.isOnLine()) { - JSONObject result = new JSONObject(); - result.put("error","device[ " + code + " ]offline"); - resultDeferredResult.setResult(result); - return resultDeferredResult; + JSONObject resultJSON = new JSONObject(); + resultJSON.put("error","device[ " + code + " ]offline"); + result.setResult(resultJSON); + return result; } - resultDeferredResult.onTimeout(()->{ + result.onTimeout(()->{ logger.info("播放等待超时"); - JSONObject result = new JSONObject(); - result.put("error","timeout"); - resultDeferredResult.setResult(result); - + JSONObject resultJSON = new JSONObject(); + resultJSON.put("error","timeout"); + result.setResult(resultJSON); + inviteStreamService.removeInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, serial, code); + storager.stopPlay(serial, code); // 清理RTP server }); DeviceChannel deviceChannel = storager.queryChannel(serial, code); if (deviceChannel == null) { - JSONObject result = new JSONObject(); - result.put("error","channel[ " + code + " ]未找到"); - resultDeferredResult.setResult(result); - return resultDeferredResult; + JSONObject resultJSON = new JSONObject(); + resultJSON.put("error","channel[ " + code + " ]未找到"); + result.setResult(resultJSON); + return result; }else if (!deviceChannel.isStatus()) { - JSONObject result = new JSONObject(); - result.put("error","channel[ " + code + " ]offline"); - resultDeferredResult.setResult(result); - return resultDeferredResult; + JSONObject resultJSON = new JSONObject(); + resultJSON.put("error","channel[ " + code + " ]offline"); + result.setResult(resultJSON); + return result; } MediaServerItem newMediaServerItem = playService.getNewMediaServerItem(device); - playService.play(newMediaServerItem, serial, code, null, (errorCode, msg, data) -> { if (errorCode == InviteErrorCode.SUCCESS.getCode()) { - InviteInfo inviteInfo = inviteStreamService.getInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, serial, code); - if (inviteInfo != null && inviteInfo.getStreamInfo() != null) { - JSONObject result = new JSONObject(); - result.put("StreamID", inviteInfo.getStreamInfo().getStream()); - result.put("DeviceID", device.getDeviceId()); - result.put("ChannelID", code); - result.put("ChannelName", deviceChannel.getName()); - result.put("ChannelCustomName", ""); - result.put("FLV", inviteInfo.getStreamInfo().getFlv().getUrl()); - if(inviteInfo.getStreamInfo().getHttps_flv() != null) { - result.put("HTTPS_FLV", inviteInfo.getStreamInfo().getHttps_flv().getUrl()); + if (data != null) { + StreamInfo streamInfo = (StreamInfo)data; + JSONObject resultJjson = new JSONObject(); + resultJjson.put("StreamID", streamInfo.getStream()); + resultJjson.put("DeviceID", serial); + resultJjson.put("ChannelID", code); + resultJjson.put("ChannelName", deviceChannel.getName()); + resultJjson.put("ChannelCustomName", ""); + resultJjson.put("FLV", streamInfo.getFlv().getUrl()); + if(streamInfo.getHttps_flv() != null) { + resultJjson.put("HTTPS_FLV", streamInfo.getHttps_flv().getUrl()); } - result.put("WS_FLV", inviteInfo.getStreamInfo().getWs_flv().getUrl()); - if(inviteInfo.getStreamInfo().getWss_flv() != null) { - result.put("WSS_FLV", inviteInfo.getStreamInfo().getWss_flv().getUrl()); + resultJjson.put("WS_FLV", streamInfo.getWs_flv().getUrl()); + if(streamInfo.getWss_flv() != null) { + resultJjson.put("WSS_FLV", streamInfo.getWss_flv().getUrl()); } - result.put("RTMP", inviteInfo.getStreamInfo().getRtmp().getUrl()); - if (inviteInfo.getStreamInfo().getRtmps() != null) { - result.put("RTMPS", inviteInfo.getStreamInfo().getRtmps().getUrl()); + resultJjson.put("RTMP", streamInfo.getRtmp().getUrl()); + if (streamInfo.getRtmps() != null) { + resultJjson.put("RTMPS", streamInfo.getRtmps().getUrl()); } - result.put("HLS", inviteInfo.getStreamInfo().getHls().getUrl()); - if (inviteInfo.getStreamInfo().getHttps_hls() != null) { - result.put("HTTPS_HLS", inviteInfo.getStreamInfo().getHttps_hls().getUrl()); + resultJjson.put("HLS", streamInfo.getHls().getUrl()); + if (streamInfo.getHttps_hls() != null) { + resultJjson.put("HTTPS_HLS", streamInfo.getHttps_hls().getUrl()); } - result.put("RTSP", inviteInfo.getStreamInfo().getRtsp().getUrl()); - if (inviteInfo.getStreamInfo().getRtsps() != null) { - result.put("RTSPS", inviteInfo.getStreamInfo().getRtsps().getUrl()); + resultJjson.put("RTSP", streamInfo.getRtsp().getUrl()); + if (streamInfo.getRtsps() != null) { + resultJjson.put("RTSPS", streamInfo.getRtsps().getUrl()); } - result.put("WEBRTC", inviteInfo.getStreamInfo().getRtc().getUrl()); - if (inviteInfo.getStreamInfo().getRtcs() != null) { - result.put("HTTPS_WEBRTC", inviteInfo.getStreamInfo().getRtcs().getUrl()); + resultJjson.put("WEBRTC", streamInfo.getRtc().getUrl()); + if (streamInfo.getRtcs() != null) { + resultJjson.put("HTTPS_WEBRTC", streamInfo.getRtcs().getUrl()); } - result.put("CDN", ""); - result.put("SnapURL", ""); - result.put("Transport", device.getTransport()); - result.put("StartAt", ""); - result.put("Duration", ""); - result.put("SourceVideoCodecName", ""); - result.put("SourceVideoWidth", ""); - result.put("SourceVideoHeight", ""); - result.put("SourceVideoFrameRate", ""); - result.put("SourceAudioCodecName", ""); - result.put("SourceAudioSampleRate", ""); - result.put("AudioEnable", ""); - result.put("Ondemand", ""); - result.put("InBytes", ""); - result.put("InBitRate", ""); - result.put("OutBytes", ""); - result.put("NumOutputs", ""); - result.put("CascadeSize", ""); - result.put("RelaySize", ""); - result.put("ChannelPTZType", "0"); - resultDeferredResult.setResult(result); + resultJjson.put("CDN", ""); + resultJjson.put("SnapURL", ""); + resultJjson.put("Transport", device.getTransport()); + resultJjson.put("StartAt", ""); + resultJjson.put("Duration", ""); + resultJjson.put("SourceVideoCodecName", ""); + resultJjson.put("SourceVideoWidth", ""); + resultJjson.put("SourceVideoHeight", ""); + resultJjson.put("SourceVideoFrameRate", ""); + resultJjson.put("SourceAudioCodecName", ""); + resultJjson.put("SourceAudioSampleRate", ""); + resultJjson.put("AudioEnable", ""); + resultJjson.put("Ondemand", ""); + resultJjson.put("InBytes", ""); + resultJjson.put("InBitRate", ""); + resultJjson.put("OutBytes", ""); + resultJjson.put("NumOutputs", ""); + resultJjson.put("CascadeSize", ""); + resultJjson.put("RelaySize", ""); + resultJjson.put("ChannelPTZType", "0"); + result.setResult(resultJjson); + }else { + JSONObject resultJjson = new JSONObject(); + resultJjson.put("error", "channel[ " + code + " ] " + msg); + result.setResult(resultJjson); } }else { - JSONObject result = new JSONObject(); - result.put("error", "channel[ " + code + " ] " + msg); - resultDeferredResult.setResult(result); + JSONObject resultJjson = new JSONObject(); + resultJjson.put("error", "channel[ " + code + " ] " + msg); + result.setResult(resultJjson); } }); - return resultDeferredResult; + return result; } /** @@ -196,7 +194,7 @@ public class ApiStreamController { * @param check_outputs * @return */ - @RequestMapping(value = "/stop") + @GetMapping("/stop") @ResponseBody private JSONObject stop(String serial , @RequestParam(required = false)Integer channel , @@ -236,7 +234,7 @@ public class ApiStreamController { * @param code 通道国标编号 * @return */ - @RequestMapping(value = "/touch") + @GetMapping("/touch") @ResponseBody private JSONObject touch(String serial ,String t, @RequestParam(required = false)Integer channel , diff --git a/src/main/java/com/genersoft/iot/vmp/web/gb28181/AuthController.java b/src/main/java/com/genersoft/iot/vmp/web/gb28181/AuthController.java index d503f1531..4e0b05690 100644 --- a/src/main/java/com/genersoft/iot/vmp/web/gb28181/AuthController.java +++ b/src/main/java/com/genersoft/iot/vmp/web/gb28181/AuthController.java @@ -13,7 +13,7 @@ public class AuthController { @Autowired private IUserService userService; - @RequestMapping("/login") + @GetMapping("/login") public String devices(String name, String passwd){ User user = userService.getUser(name, passwd); if (user != null) { diff --git a/web_src/src/components/channelList.vue b/web_src/src/components/channelList.vue index 5547913bd..8af42692f 100755 --- a/web_src/src/components/channelList.vue +++ b/web_src/src/components/channelList.vue @@ -13,24 +13,30 @@ prefix-icon="el-icon-search" v-model="searchSrt" clearable> 通道类型: - 在线状态: - - 清晰度: - - - + 码流类型重置: + + + + + + + + + @@ -46,11 +52,11 @@ - + - + - + - + - + - + - + - + - + - + + + + - -