diff --git a/README.md b/README.md index 572746df..10cd12a2 100644 --- a/README.md +++ b/README.md @@ -27,7 +27,7 @@ wvp使用文档 [https://doc.wvp-pro.cn](https://doc.wvp-pro.cn) ZLM使用文档 [https://github.com/ZLMediaKit/ZLMediaKit](https://github.com/ZLMediaKit/ZLMediaKit) > wvp文档由gitee提供服务,如果遇到打不开请多刷新几次。 -# 社群地址 +# 付费社群 [![社群](doc/_media/shequ.png "shequ")](https://t.zsxq.com/0d8VAD3Dm) > 收费是为了提供更好的服务,也是对作者更大的激励。加入星球的用户三天后可以私信我留下微信号,我会拉大家入群。加入三天内不满意可以直接退款,大家不需要有顾虑,来白嫖三天也不是不可以。 @@ -105,6 +105,7 @@ https://gitee.com/pan648540858/wvp-GB28181-pro.git - [X] 支持打包可执行jar和war - [X] 支持跨域请求,支持前后端分离部署 - [X] 支持Mysql,Postgresql,金仓等数据库 +- [X] 支持Onvif(目前在onvif分支,需要安装onvif服务,服务请在知识星球获取) # 授权协议 本项目自有代码使用宽松的MIT协议,在保留版权信息的情况下可以自由应用于各自商用、非商业的项目。 但是本项目也零碎的使用了一些其他的开源代码,在商用的情况下请自行替代或剔除; 由于使用本项目而产生的商业纠纷或侵权行为一概与本项目及开发者无关,请自行承担法律风险。 在使用本项目代码时,也应该在授权协议中同时表明本项目依赖的第三方库的协议 diff --git a/doc/README.md b/doc/README.md index 3d2f09f7..c2cbfba4 100644 --- a/doc/README.md +++ b/doc/README.md @@ -14,7 +14,7 @@ - 完全开源,且使用MIT许可协议。保留版权的情况下可以用于商业项目。 - 支持多流媒体节点负载均衡。 -# 社群 +# 付费社群 [![社群](_media/shequ.png "shequ")](https://t.zsxq.com/0d8VAD3Dm) > 收费是为了提供更好的服务,也是对作者更大的激励。加入星球的用户三天后可以私信我留下微信号,我会拉大家入群。加入三天内不满意可以直接退款,大家不需要有顾虑,来白嫖三天也不是不可以。 @@ -62,16 +62,16 @@ - [X] 注册 - [X] 注销 - [X] 实时视音频点播 -- [ ] 设备控制 - - [ ] 云台控制 +- [X] 设备控制 + - [X] 云台控制 - [ ] 远程启动 - - [ ] 录像控制 - - [ ] 报警布防/撤防 - - [ ] 报警复位 - - [ ] 强制关键帧 - - [ ] 拉框放大 - - [ ] 拉框缩小 - - [ ] 看守位控制 + - [X] 录像控制 + - [X] 报警布防/撤防 + - [X] 报警复位 + - [X] 强制关键帧 + - [X] 拉框放大 + - [X] 拉框缩小 + - [X] 看守位控制 - [ ] 设备配置 - [ ] 报警事件通知和分发 - [X] 设备目录订阅 @@ -79,7 +79,7 @@ - [X] 设备目录查询 - [X] 设备状态查询 - [ ] 设备配置查询 - - [ ] 设备预置位查询 + - [X] 设备预置位查询 - [X] 状态信息报送 - [X] 设备视音频文件检索 - [X] 历史视音频的回放 @@ -87,7 +87,7 @@ - [x] 暂停 - [x] 进/退 - [x] 停止 -- [ ] 视音频文件下载 +- [X] 视音频文件下载 - [ ] ~~校时~~ - [X] 订阅和通知 - [X] 事件订阅 diff --git a/doc/_media/1372762149.jpg b/doc/_media/1372762149.jpg new file mode 100644 index 00000000..471ec072 Binary files /dev/null and b/doc/_media/1372762149.jpg differ diff --git a/doc/_media/903207146.jpg b/doc/_media/903207146.jpg new file mode 100644 index 00000000..0bbc7e67 Binary files /dev/null and b/doc/_media/903207146.jpg differ diff --git a/pom.xml b/pom.xml index bb0de72f..2ecfad30 100644 --- a/pom.xml +++ b/pom.xml @@ -11,7 +11,7 @@ com.genersoft wvp-pro - 2.6.8 + 2.6.9 web video platform 国标28181视频平台 ${project.packaging} diff --git a/sql/2.6.8升级2.6.9.sql b/sql/2.6.8升级2.6.9.sql index d6d3a6b2..a6b9f12c 100644 --- a/sql/2.6.8升级2.6.9.sql +++ b/sql/2.6.8升级2.6.9.sql @@ -179,10 +179,6 @@ alter table device_mobile_position alter table device_mobile_position change createTime create_time varchar(50) null; -alter table gb_stream - add constraint gb_stream_pk - primary key (gbStreamId); - alter table gb_stream change gbStreamId gb_stream_id int auto_increment; diff --git a/src/main/java/com/genersoft/iot/vmp/common/GeneralCallback.java b/src/main/java/com/genersoft/iot/vmp/common/GeneralCallback.java new file mode 100644 index 00000000..df07fac5 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/common/GeneralCallback.java @@ -0,0 +1,5 @@ +package com.genersoft.iot.vmp.common; + +public interface GeneralCallback{ + void run(int code, String msg, T data); +} diff --git a/src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java b/src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java index 51c2ab10..a06c6d08 100644 --- a/src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java +++ b/src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java @@ -106,6 +106,11 @@ public class VideoManagerConstants { */ public static final String VM_MSG_STREAM_PUSH_RESPONSE = "VM_MSG_STREAM_PUSH_RESPONSE"; + /** + * redis 通知平台关闭推流 + */ + public static final String VM_MSG_STREAM_PUSH_CLOSE = "VM_MSG_STREAM_PUSH_CLOSE"; + /** * redis 消息请求所有的在线通道 */ diff --git a/src/main/java/com/genersoft/iot/vmp/conf/redis/RedisMsgListenConfig.java b/src/main/java/com/genersoft/iot/vmp/conf/redis/RedisMsgListenConfig.java index 9f484266..7e1cc1d6 100644 --- a/src/main/java/com/genersoft/iot/vmp/conf/redis/RedisMsgListenConfig.java +++ b/src/main/java/com/genersoft/iot/vmp/conf/redis/RedisMsgListenConfig.java @@ -43,6 +43,9 @@ public class RedisMsgListenConfig { @Autowired private RedisPushStreamResponseListener redisPushStreamResponseListener; + @Autowired + private RedisCloseStreamMsgListener redisCloseStreamMsgListener; + /** * redis消息监听器容器 可以添加多个监听不同话题的redis监听器,只需要把消息监听器和相应的消息订阅处理器绑定,该消息监听器 @@ -63,6 +66,7 @@ public class RedisMsgListenConfig { container.addMessageListener(redisPushStreamStatusMsgListener, new PatternTopic(VideoManagerConstants.VM_MSG_PUSH_STREAM_STATUS_CHANGE)); container.addMessageListener(redisPushStreamListMsgListener, new PatternTopic(VideoManagerConstants.VM_MSG_PUSH_STREAM_LIST_CHANGE)); container.addMessageListener(redisPushStreamResponseListener, new PatternTopic(VideoManagerConstants.VM_MSG_STREAM_PUSH_RESPONSE)); + container.addMessageListener(redisCloseStreamMsgListener, new PatternTopic(VideoManagerConstants.VM_MSG_STREAM_PUSH_CLOSE)); return container; } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/Gb28181Sdp.java b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/Gb28181Sdp.java new file mode 100644 index 00000000..4b9e26a0 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/Gb28181Sdp.java @@ -0,0 +1,46 @@ +package com.genersoft.iot.vmp.gb28181.bean; + +import javax.sdp.SessionDescription; + +/** + * 28181 的SDP解析器 + */ +public class Gb28181Sdp { + private SessionDescription baseSdb; + private String ssrc; + + private String mediaDescription; + + public static Gb28181Sdp getInstance(SessionDescription baseSdb, String ssrc, String mediaDescription) { + Gb28181Sdp gb28181Sdp = new Gb28181Sdp(); + gb28181Sdp.setBaseSdb(baseSdb); + gb28181Sdp.setSsrc(ssrc); + gb28181Sdp.setMediaDescription(mediaDescription); + return gb28181Sdp; + } + + + public SessionDescription getBaseSdb() { + return baseSdb; + } + + public void setBaseSdb(SessionDescription baseSdb) { + this.baseSdb = baseSdb; + } + + public String getSsrc() { + return ssrc; + } + + public void setSsrc(String ssrc) { + this.ssrc = ssrc; + } + + public String getMediaDescription() { + return mediaDescription; + } + + public void setMediaDescription(String mediaDescription) { + this.mediaDescription = mediaDescription; + } +} diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/session/SSRCFactory.java b/src/main/java/com/genersoft/iot/vmp/gb28181/session/SSRCFactory.java index ec8e0ba6..657bb2fa 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/session/SSRCFactory.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/session/SSRCFactory.java @@ -1,6 +1,7 @@ package com.genersoft.iot.vmp.gb28181.session; import com.genersoft.iot.vmp.conf.SipConfig; +import com.genersoft.iot.vmp.conf.UserSetting; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.stereotype.Component; @@ -31,10 +32,13 @@ public class SSRCFactory { @Autowired private SipConfig sipConfig; + @Autowired + private UserSetting userSetting; + public void initMediaServerSSRC(String mediaServerId, Set usedSet) { String ssrcPrefix = sipConfig.getDomain().substring(3, 8); - String redisKey = SSRC_INFO_KEY + mediaServerId; + String redisKey = SSRC_INFO_KEY + userSetting.getServerId() + "_" + mediaServerId; List ssrcList = new ArrayList<>(); for (int i = 1; i < MAX_STREAM_COUNT; i++) { String ssrc = String.format("%s%04d", ssrcPrefix, i); @@ -77,7 +81,7 @@ public class SSRCFactory { return; } String sn = ssrc.substring(1); - String redisKey = SSRC_INFO_KEY + mediaServerId; + String redisKey = SSRC_INFO_KEY + userSetting.getServerId() + "_" + mediaServerId; redisTemplate.opsForSet().add(redisKey, sn); } @@ -86,7 +90,7 @@ public class SSRCFactory { */ private String getSN(String mediaServerId) { String sn = null; - String redisKey = SSRC_INFO_KEY + mediaServerId; + String redisKey = SSRC_INFO_KEY + userSetting.getServerId() + "_" + mediaServerId; Long size = redisTemplate.opsForSet().size(redisKey); if (size == null || size == 0) { throw new RuntimeException("ssrc已经用完"); @@ -113,20 +117,8 @@ public class SSRCFactory { * @param mediaServerId 流媒体服务ID */ public boolean hasMediaServerSSRC(String mediaServerId) { - String redisKey = SSRC_INFO_KEY + mediaServerId; + String redisKey = SSRC_INFO_KEY + userSetting.getServerId() + "_" + mediaServerId; return redisTemplate.opsForSet().members(redisKey) != null; } - /** - * 查询ssrc是否可用 - * - * @param mediaServerId - * @param ssrc - * @return - */ - public boolean checkSsrc(String mediaServerId, String ssrc) { - String sn = ssrc.substring(1); - String redisKey = SSRC_INFO_KEY + mediaServerId; - return redisTemplate.opsForSet().isMember(redisKey, sn) != null; - } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/callback/DeferredResultHolder.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/callback/DeferredResultHolder.java index 9b57fb07..5677c958 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/callback/DeferredResultHolder.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/callback/DeferredResultHolder.java @@ -39,11 +39,13 @@ public class DeferredResultHolder { public static final String CALLBACK_CMD_DOWNLOAD = "CALLBACK_DOWNLOAD"; + public static final String CALLBACK_CMD_PROXY = "CALLBACK_PROXY"; + public static final String CALLBACK_CMD_STOP = "CALLBACK_STOP"; public static final String UPLOAD_FILE_CHANNEL = "UPLOAD_FILE_CHANNEL"; - public static final String CALLBACK_CMD_MOBILEPOSITION = "CALLBACK_MOBILEPOSITION"; + public static final String CALLBACK_CMD_MOBILE_POSITION = "CALLBACK_CMD_MOBILE_POSITION"; public static final String CALLBACK_CMD_PRESETQUERY = "CALLBACK_PRESETQUERY"; diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/SIPRequestHeaderPlarformProvider.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/SIPRequestHeaderPlarformProvider.java index 831897a7..22017df8 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/SIPRequestHeaderPlarformProvider.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/SIPRequestHeaderPlarformProvider.java @@ -54,8 +54,8 @@ public class SIPRequestHeaderPlarformProvider { parentPlatform.getServerIP() + ":" + parentPlatform.getServerPort()); //via ArrayList viaHeaders = new ArrayList(); - ViaHeader viaHeader = SipFactory.getInstance().createHeaderFactory().createViaHeader(parentPlatform.getServerIP(), - parentPlatform.getServerPort(), parentPlatform.getTransport(), SipUtils.getNewViaTag()); + ViaHeader viaHeader = SipFactory.getInstance().createHeaderFactory().createViaHeader(parentPlatform.getDeviceIp(), + Integer.parseInt(parentPlatform.getDevicePort()), parentPlatform.getTransport(), SipUtils.getNewViaTag()); viaHeader.setRPort(); viaHeaders.add(viaHeader); //from diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java index f2300e82..ccf8151a 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java @@ -472,7 +472,7 @@ public class SIPCommander implements ISIPCommander { } subscribe.removeSubscribe(hookSubscribe); }); - Request request = headerProvider.createPlaybackInviteRequest(device, channelId, content.toString(), null, SipUtils.getNewFromTag(), null,sipSender.getNewCallIdHeader(sipLayer.getLocalIp(device.getLocalIp()),device.getTransport()), ssrcInfo.getSsrc()); + Request request = headerProvider.createPlaybackInviteRequest(device, channelId, content.toString(), SipUtils.getNewViaTag(), SipUtils.getNewFromTag(), null,sipSender.getNewCallIdHeader(sipLayer.getLocalIp(device.getLocalIp()),device.getTransport()), ssrcInfo.getSsrc()); sipSender.transmitRequest(sipLayer.getLocalIp(device.getLocalIp()), request, errorEvent, event -> { ResponseEvent responseEvent = (ResponseEvent) event.event; @@ -588,17 +588,13 @@ public class SIPCommander implements ISIPCommander { }); }); - Request request = headerProvider.createPlaybackInviteRequest(device, channelId, content.toString(), null, SipUtils.getNewFromTag(), null,newCallIdHeader, ssrcInfo.getSsrc()); + Request request = headerProvider.createPlaybackInviteRequest(device, channelId, content.toString(), SipUtils.getNewViaTag(), SipUtils.getNewFromTag(), null,newCallIdHeader, ssrcInfo.getSsrc()); sipSender.transmitRequest(sipLayer.getLocalIp(device.getLocalIp()), request, errorEvent, event -> { ResponseEvent responseEvent = (ResponseEvent) event.event; SIPResponse response = (SIPResponse) responseEvent.getResponse(); String contentString =new String(response.getRawContent()); - int ssrcIndex = contentString.indexOf("y="); - String ssrc=ssrcInfo.getSsrc(); - if (ssrcIndex >= 0) { - ssrc = contentString.substring(ssrcIndex + 2, ssrcIndex + 12); - } + String ssrc = SipUtils.getSsrcFromSdp(contentString); streamSession.put(device.getDeviceId(), channelId, response.getCallIdHeader().getCallId(), ssrcInfo.getStream(), ssrc, mediaServerItem.getId(), response, InviteSessionType.DOWNLOAD); okEvent.response(event); }); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java index f9a51af0..a1591890 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java @@ -241,18 +241,8 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements // 解析sdp消息, 使用jainsip 自带的sdp解析方式 String contentString = new String(request.getRawContent()); - // jainSip不支持y=字段, 移除以解析。 - // 检查是否有y字段 - int ssrcIndex = contentString.indexOf("y="); - - SessionDescription sdp; - if (ssrcIndex >= 0) { - //ssrc规定长度为10个字节,不取余下长度以避免后续还有“f=”字段 - String substring = contentString.substring(0, ssrcIndex); - sdp = SdpFactory.getInstance().createSessionDescription(substring); - } else { - sdp = SdpFactory.getInstance().createSessionDescription(contentString); - } + Gb28181Sdp gb28181Sdp = SipUtils.parseSDP(contentString); + SessionDescription sdp = gb28181Sdp.getBaseSdb(); String sessionName = sdp.getSessionName().getValue(); Long startTime = null; @@ -340,11 +330,11 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements } String ssrc; - if (userSetting.getUseCustomSsrcForParentInvite() || ssrcIndex < 0) { + if (userSetting.getUseCustomSsrcForParentInvite() || gb28181Sdp.getSsrc() == null) { // 上级平台点播时不使用上级平台指定的ssrc,使用自定义的ssrc,参考国标文档-点播外域设备媒体流SSRC处理方式 ssrc = "Play".equalsIgnoreCase(sessionName) ? ssrcFactory.getPlaySsrc(mediaServerItem.getId()) : ssrcFactory.getPlayBackSsrc(mediaServerItem.getId()); }else { - ssrc = contentString.substring(ssrcIndex + 2, ssrcIndex + 12); + ssrc = gb28181Sdp.getSsrc(); } String streamTypeStr = null; if (mediaTransmissionTCP) { @@ -513,11 +503,11 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements } else if (gbStream != null) { String ssrc; - if (userSetting.getUseCustomSsrcForParentInvite() || ssrcIndex < 0) { + if (userSetting.getUseCustomSsrcForParentInvite() || gb28181Sdp.getSsrc() == null) { // 上级平台点播时不使用上级平台指定的ssrc,使用自定义的ssrc,参考国标文档-点播外域设备媒体流SSRC处理方式 ssrc = "Play".equalsIgnoreCase(sessionName) ? ssrcFactory.getPlaySsrc(mediaServerItem.getId()) : ssrcFactory.getPlayBackSsrc(mediaServerItem.getId()); }else { - ssrc = contentString.substring(ssrcIndex + 2, ssrcIndex + 12); + ssrc = gb28181Sdp.getSsrc(); } if("push".equals(gbStream.getStreamType())) { @@ -891,20 +881,11 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements } String contentString = new String(request.getRawContent()); // jainSip不支持y=字段, 移除移除以解析。 - String substring = contentString; String ssrc = "0000000404"; - int ssrcIndex = contentString.indexOf("y="); - if (ssrcIndex > 0) { - substring = contentString.substring(0, ssrcIndex); - ssrc = contentString.substring(ssrcIndex + 2, ssrcIndex + 12); - } - ssrcIndex = substring.indexOf("f="); - if (ssrcIndex > 0) { - substring = contentString.substring(0, ssrcIndex); - } - SessionDescription sdp = null; + try { - sdp = SdpFactory.getInstance().createSessionDescription(substring); + Gb28181Sdp gb28181Sdp = SipUtils.parseSDP(contentString); + SessionDescription sdp = gb28181Sdp.getBaseSdb(); // 获取支持的格式 Vector mediaDescriptions = sdp.getMediaDescriptions(true); // 查看是否支持PS 负载96 diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForCatalogProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForCatalogProcessor.java index e614bf97..c0b1be4c 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForCatalogProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForCatalogProcessor.java @@ -175,6 +175,11 @@ public class NotifyRequestForCatalogProcessor extends SIPRequestProcessorParent } }else { addChannelMap.put(channel.getChannelId(), channel); + if (userSetting.getDeviceStatusNotify()) { + // 发送redis消息 + redisCatchStorage.sendChannelAddOrDelete(device.getDeviceId(), channel.getChannelId(), true); + } + if (addChannelMap.keySet().size() > 300) { executeSaveForAdd(); } @@ -185,6 +190,10 @@ public class NotifyRequestForCatalogProcessor extends SIPRequestProcessorParent // 删除 logger.info("[收到删除通道通知] 来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId()); deleteChannelList.add(channel); + if (userSetting.getDeviceStatusNotify()) { + // 发送redis消息 + redisCatchStorage.sendChannelAddOrDelete(device.getDeviceId(), channel.getChannelId(), false); + } if (deleteChannelList.size() > 300) { executeSaveForDelete(); } @@ -205,6 +214,10 @@ public class NotifyRequestForCatalogProcessor extends SIPRequestProcessorParent if (addChannelMap.keySet().size() > 300) { executeSaveForAdd(); } + if (userSetting.getDeviceStatusNotify()) { + // 发送redis消息 + redisCatchStorage.sendChannelAddOrDelete(device.getDeviceId(), channel.getChannelId(), true); + } } break; default: diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java index cb780e76..e97b720b 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java @@ -192,7 +192,12 @@ public class NotifyRequestProcessor extends SIPRequestProcessorParent implements mobilePosition.setDeviceId(device.getDeviceId()); mobilePosition.setChannelId(channelId); String time = XmlUtil.getText(rootElement, "Time"); - mobilePosition.setTime(time); + if (ObjectUtils.isEmpty(time)){ + mobilePosition.setTime(DateUtil.getNow()); + }else { + mobilePosition.setTime(SipUtils.parseTime(time)); + } + mobilePosition.setLongitude(Double.parseDouble(XmlUtil.getText(rootElement, "Longitude"))); mobilePosition.setLatitude(Double.parseDouble(XmlUtil.getText(rootElement, "Latitude"))); if (NumericUtil.isDouble(XmlUtil.getText(rootElement, "Speed"))) { @@ -237,7 +242,7 @@ public class NotifyRequestProcessor extends SIPRequestProcessorParent implements // 发送redis消息。 通知位置信息的变化 JSONObject jsonObject = new JSONObject(); - jsonObject.put("time", time); + jsonObject.put("time", DateUtil.yyyy_MM_dd_HH_mm_ssToISO8601(mobilePosition.getTime())); jsonObject.put("serial", deviceId); jsonObject.put("code", channelId); jsonObject.put("longitude", mobilePosition.getLongitude()); @@ -339,7 +344,7 @@ public class NotifyRequestProcessor extends SIPRequestProcessorParent implements storager.updateChannelPosition(deviceChannel); // 发送redis消息。 通知位置信息的变化 JSONObject jsonObject = new JSONObject(); - jsonObject.put("time", mobilePosition.getTime()); + jsonObject.put("time", DateUtil.yyyy_MM_dd_HH_mm_ssToISO8601(mobilePosition.getTime())); jsonObject.put("serial", deviceChannel.getDeviceId()); jsonObject.put("code", deviceChannel.getChannelId()); jsonObject.put("longitude", mobilePosition.getLongitude()); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/AlarmNotifyMessageHandler.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/AlarmNotifyMessageHandler.java index 9268e9b5..6928def2 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/AlarmNotifyMessageHandler.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/AlarmNotifyMessageHandler.java @@ -164,7 +164,7 @@ public class AlarmNotifyMessageHandler extends SIPRequestProcessorParent impleme // 发送redis消息。 通知位置信息的变化 JSONObject jsonObject = new JSONObject(); - jsonObject.put("time", mobilePosition.getTime()); + jsonObject.put("time", DateUtil.yyyy_MM_dd_HH_mm_ssToISO8601(mobilePosition.getTime())); jsonObject.put("serial", deviceChannel.getDeviceId()); jsonObject.put("code", deviceChannel.getChannelId()); jsonObject.put("longitude", mobilePosition.getLongitude()); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/MobilePositionNotifyMessageHandler.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/MobilePositionNotifyMessageHandler.java index 1f0bdf14..9a82b8ab 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/MobilePositionNotifyMessageHandler.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/MobilePositionNotifyMessageHandler.java @@ -7,6 +7,7 @@ import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorP import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.IMessageHandler; import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.notify.NotifyMessageHandler; import com.genersoft.iot.vmp.gb28181.utils.NumericUtil; +import com.genersoft.iot.vmp.gb28181.utils.SipUtils; import com.genersoft.iot.vmp.service.IDeviceChannelService; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.IVideoManagerStorage; @@ -95,7 +96,12 @@ public class MobilePositionNotifyMessageHandler extends SIPRequestProcessorParen } mobilePosition.setDeviceId(sipMsgInfo.getDevice().getDeviceId()); mobilePosition.setChannelId(getText(rootElementAfterCharset, "DeviceID")); - mobilePosition.setTime(getText(rootElementAfterCharset, "Time")); + String time = getText(rootElementAfterCharset, "Time"); + if (ObjectUtils.isEmpty(time)){ + mobilePosition.setTime(DateUtil.getNow()); + }else { + mobilePosition.setTime(SipUtils.parseTime(time)); + } mobilePosition.setLongitude(Double.parseDouble(getText(rootElementAfterCharset, "Longitude"))); mobilePosition.setLatitude(Double.parseDouble(getText(rootElementAfterCharset, "Latitude"))); if (NumericUtil.isDouble(getText(rootElementAfterCharset, "Speed"))) { @@ -138,7 +144,7 @@ public class MobilePositionNotifyMessageHandler extends SIPRequestProcessorParen // 发送redis消息。 通知位置信息的变化 JSONObject jsonObject = new JSONObject(); - jsonObject.put("time", mobilePosition.getTime()); + jsonObject.put("time", DateUtil.yyyy_MM_dd_HH_mm_ssToISO8601(mobilePosition.getTime())); jsonObject.put("serial", deviceChannel.getDeviceId()); jsonObject.put("code", deviceChannel.getChannelId()); jsonObject.put("longitude", mobilePosition.getLongitude()); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/MobilePositionResponseMessageHandler.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/MobilePositionResponseMessageHandler.java index 332f3635..36a72bcd 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/MobilePositionResponseMessageHandler.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/MobilePositionResponseMessageHandler.java @@ -2,17 +2,21 @@ package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.respon import com.alibaba.fastjson2.JSONObject; import com.genersoft.iot.vmp.conf.UserSetting; -import com.genersoft.iot.vmp.gb28181.bean.*; +import com.genersoft.iot.vmp.gb28181.bean.Device; +import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel; +import com.genersoft.iot.vmp.gb28181.bean.MobilePosition; +import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform; +import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder; +import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage; import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent; import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.IMessageHandler; import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.response.ResponseMessageHandler; -import com.genersoft.iot.vmp.gb28181.utils.Coordtransform; import com.genersoft.iot.vmp.gb28181.utils.NumericUtil; +import com.genersoft.iot.vmp.gb28181.utils.SipUtils; import com.genersoft.iot.vmp.service.IDeviceChannelService; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.IVideoManagerStorage; import com.genersoft.iot.vmp.utils.DateUtil; -import com.genersoft.iot.vmp.utils.GpsUtil; import gov.nist.javax.sip.message.SIPRequest; import org.dom4j.DocumentException; import org.dom4j.Element; @@ -56,6 +60,9 @@ public class MobilePositionResponseMessageHandler extends SIPRequestProcessorPar @Autowired private IDeviceChannelService deviceChannelService; + @Autowired + private DeferredResultHolder resultHolder; + @Override public void afterPropertiesSet() throws Exception { responseMessageHandler.addHandler(cmdType, this); @@ -83,7 +90,13 @@ public class MobilePositionResponseMessageHandler extends SIPRequestProcessorPar } mobilePosition.setDeviceId(device.getDeviceId()); mobilePosition.setChannelId(getText(rootElement, "DeviceID")); - mobilePosition.setTime(getText(rootElement, "Time")); + //兼容ISO 8601格式时间 + String time = getText(rootElement, "Time"); + if (ObjectUtils.isEmpty(time)){ + mobilePosition.setTime(DateUtil.getNow()); + }else { + mobilePosition.setTime(SipUtils.parseTime(time)); + } mobilePosition.setLongitude(Double.parseDouble(getText(rootElement, "Longitude"))); mobilePosition.setLatitude(Double.parseDouble(getText(rootElement, "Latitude"))); if (NumericUtil.isDouble(getText(rootElement, "Speed"))) { @@ -121,11 +134,18 @@ public class MobilePositionResponseMessageHandler extends SIPRequestProcessorPar if (userSetting.getSavePositionHistory()) { storager.insertMobilePosition(mobilePosition); } + storager.updateChannelPosition(deviceChannel); + String key = DeferredResultHolder.CALLBACK_CMD_MOBILE_POSITION + device.getDeviceId(); + RequestMessage msg = new RequestMessage(); + msg.setKey(key); + msg.setData(mobilePosition); + resultHolder.invokeAllResult(msg); + // 发送redis消息。 通知位置信息的变化 JSONObject jsonObject = new JSONObject(); - jsonObject.put("time", mobilePosition.getTime()); + jsonObject.put("time", DateUtil.yyyy_MM_dd_HH_mm_ssToISO8601(mobilePosition.getTime())); jsonObject.put("serial", deviceChannel.getDeviceId()); jsonObject.put("code", deviceChannel.getChannelId()); jsonObject.put("longitude", mobilePosition.getLongitude()); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/response/impl/InviteResponseProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/response/impl/InviteResponseProcessor.java index f647b96b..436d2a43 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/response/impl/InviteResponseProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/response/impl/InviteResponseProcessor.java @@ -1,10 +1,12 @@ package com.genersoft.iot.vmp.gb28181.transmit.event.response.impl; import com.genersoft.iot.vmp.gb28181.SipLayer; +import com.genersoft.iot.vmp.gb28181.bean.Gb28181Sdp; import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver; import com.genersoft.iot.vmp.gb28181.transmit.SIPSender; import com.genersoft.iot.vmp.gb28181.transmit.cmd.SIPRequestHeaderProvider; import com.genersoft.iot.vmp.gb28181.transmit.event.response.SIPResponseProcessorAbstract; +import com.genersoft.iot.vmp.gb28181.utils.SipUtils; import gov.nist.javax.sip.ResponseEventExt; import gov.nist.javax.sip.message.SIPResponse; import org.slf4j.Logger; @@ -12,7 +14,6 @@ import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; -import javax.sdp.SdpFactory; import javax.sdp.SdpParseException; import javax.sdp.SessionDescription; import javax.sip.InvalidArgumentException; @@ -79,18 +80,8 @@ public class InviteResponseProcessor extends SIPResponseProcessorAbstract { ResponseEventExt event = (ResponseEventExt)evt; String contentString = new String(response.getRawContent()); - // jainSip不支持y=字段, 移除以解析。 - int ssrcIndex = contentString.indexOf("y="); - // 检查是否有y字段 - SessionDescription sdp; - if (ssrcIndex >= 0) { - //ssrc规定长度为10字节,不取余下长度以避免后续还有“f=”字段 - String substring = contentString.substring(0, contentString.indexOf("y=")); - sdp = SdpFactory.getInstance().createSessionDescription(substring); - } else { - sdp = SdpFactory.getInstance().createSessionDescription(contentString); - } - + Gb28181Sdp gb28181Sdp = SipUtils.parseSDP(contentString); + SessionDescription sdp = gb28181Sdp.getBaseSdb(); SipURI requestUri = SipFactory.getInstance().createAddressFactory().createSipURI(sdp.getOrigin().getUsername(), event.getRemoteIpAddress() + ":" + event.getRemotePort()); Request reqAck = headerProvider.createAckRequest(response.getLocalAddress().getHostAddress(), requestUri, response); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/utils/SipUtils.java b/src/main/java/com/genersoft/iot/vmp/gb28181/utils/SipUtils.java index a7ce8c0b..2cfe16ed 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/utils/SipUtils.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/utils/SipUtils.java @@ -1,14 +1,22 @@ package com.genersoft.iot.vmp.gb28181.utils; import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel; +import com.genersoft.iot.vmp.gb28181.bean.Gb28181Sdp; import com.genersoft.iot.vmp.gb28181.bean.RemoteAddressInfo; +import com.genersoft.iot.vmp.utils.DateUtil; import com.genersoft.iot.vmp.utils.GitUtil; import gov.nist.javax.sip.address.AddressImpl; import gov.nist.javax.sip.address.SipUri; import gov.nist.javax.sip.header.Subject; import gov.nist.javax.sip.message.SIPRequest; +import org.apache.commons.lang3.RandomStringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.util.ObjectUtils; +import javax.sdp.SdpFactory; +import javax.sdp.SdpParseException; +import javax.sdp.SessionDescription; import javax.sip.PeerUnavailableException; import javax.sip.SipFactory; import javax.sip.header.FromHeader; @@ -16,6 +24,8 @@ import javax.sip.header.Header; import javax.sip.header.UserAgentHeader; import javax.sip.message.Request; import java.text.ParseException; +import java.time.LocalDateTime; +import java.time.format.DateTimeParseException; import java.util.ArrayList; import java.util.List; import java.util.UUID; @@ -28,6 +38,8 @@ import java.util.UUID; */ public class SipUtils { + private final static Logger logger = LoggerFactory.getLogger(SipUtils.class); + public static String getUserIdFromFromHeader(Request request) { FromHeader fromHeader = (FromHeader)request.getHeader(FromHeader.NAME); return getUserIdFromFromHeader(fromHeader); @@ -51,7 +63,7 @@ public class SipUtils { } public static String getNewViaTag() { - return "z9hG4bK" + System.currentTimeMillis(); + return "z9hG4bK" + RandomStringUtils.randomNumeric(10); } public static UserAgentHeader createUserAgentHeader(GitUtil gitUtil) throws PeerUnavailableException, ParseException { @@ -189,4 +201,67 @@ public class SipUtils { } return deviceChannel; } + + public static Gb28181Sdp parseSDP(String sdpStr) throws SdpParseException { + + // jainSip不支持y= f=字段, 移除以解析。 + int ssrcIndex = sdpStr.indexOf("y="); + int mediaDescriptionIndex = sdpStr.indexOf("f="); + // 检查是否有y字段 + SessionDescription sdp; + String ssrc = null; + String mediaDescription = null; + if (mediaDescriptionIndex == 0 && ssrcIndex == 0) { + sdp = SdpFactory.getInstance().createSessionDescription(sdpStr); + }else { + String lines[] = sdpStr.split("\\r?\\n"); + StringBuilder sdpBuffer = new StringBuilder(); + for (String line : lines) { + if (line.trim().startsWith("y=")) { + ssrc = line.substring(2); + }else if (line.trim().startsWith("f=")) { + mediaDescription = line.substring(2); + }else { + sdpBuffer.append(line.trim()).append("\r\n"); + } + } + sdp = SdpFactory.getInstance().createSessionDescription(sdpBuffer.toString()); + } + return Gb28181Sdp.getInstance(sdp, ssrc, mediaDescription); + } + + public static String getSsrcFromSdp(String sdpStr) { + + // jainSip不支持y= f=字段, 移除以解析。 + int ssrcIndex = sdpStr.indexOf("y="); + if (ssrcIndex == 0) { + return null; + } + String lines[] = sdpStr.split("\\r?\\n"); + for (String line : lines) { + if (line.trim().startsWith("y=")) { + return line.substring(2); + } + } + return null; + } + + public static String parseTime(String timeStr) { + if (ObjectUtils.isEmpty(timeStr)){ + return null; + } + System.out.println(timeStr); + LocalDateTime localDateTime; + try { + localDateTime = LocalDateTime.parse(timeStr); + }catch (DateTimeParseException e) { + try { + localDateTime = LocalDateTime.parse(timeStr, DateUtil.formatterISO8601); + }catch (DateTimeParseException e2) { + logger.error("[格式化时间] 无法格式化时间: {}", timeStr); + return null; + } + } + return localDateTime.format(DateUtil.formatterISO8601); + } } diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java index 9a642d81..081d9191 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java @@ -6,7 +6,9 @@ import com.alibaba.fastjson2.JSONObject; import com.genersoft.iot.vmp.common.CommonCallback; import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem; -import com.genersoft.iot.vmp.media.zlm.dto.*; +import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory; +import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForRtpServerTimeout; +import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/StreamProxyItem.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/StreamProxyItem.java index b0e74e8f..dd517e3d 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/StreamProxyItem.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/StreamProxyItem.java @@ -20,28 +20,26 @@ public class StreamProxyItem extends GbStream { @Schema(description = "拉流地址") private String url; @Schema(description = "拉流地址") - private String src_url; + private String srcUrl; @Schema(description = "目标地址") - private String dst_url; + private String dstUrl; @Schema(description = "超时时间") - private int timeout_ms; + private int timeoutMs; @Schema(description = "ffmpeg模板KEY") - private String ffmpeg_cmd_key; + private String ffmpegCmdKey; @Schema(description = "rtsp拉流时,拉流方式,0:tcp,1:udp,2:组播") - private String rtp_type; + private String rtpType; @Schema(description = "是否启用") private boolean enable; @Schema(description = "是否启用音频") - private boolean enable_audio; + private boolean enableAudio; @Schema(description = "是否启用MP4") - private boolean enable_mp4; + private boolean enableMp4; @Schema(description = "是否 无人观看时删除") - private boolean enable_remove_none_reader; + private boolean enableRemoveNoneReader; @Schema(description = "是否 无人观看时自动停用") - private boolean enable_disable_none_reader; - @Schema(description = "创建时间") - private String createTime; + private boolean enableDisableNoneReader; public String getType() { return type; @@ -89,44 +87,44 @@ public class StreamProxyItem extends GbStream { this.url = url; } - public String getSrc_url() { - return src_url; + public String getSrcUrl() { + return srcUrl; } - public void setSrc_url(String src_url) { - this.src_url = src_url; + public void setSrcUrl(String src_url) { + this.srcUrl = src_url; } - public String getDst_url() { - return dst_url; + public String getDstUrl() { + return dstUrl; } - public void setDst_url(String dst_url) { - this.dst_url = dst_url; + public void setDstUrl(String dst_url) { + this.dstUrl = dst_url; } - public int getTimeout_ms() { - return timeout_ms; + public int getTimeoutMs() { + return timeoutMs; } - public void setTimeout_ms(int timeout_ms) { - this.timeout_ms = timeout_ms; + public void setTimeoutMs(int timeout_ms) { + this.timeoutMs = timeout_ms; } - public String getFfmpeg_cmd_key() { - return ffmpeg_cmd_key; + public String getFfmpegCmdKey() { + return ffmpegCmdKey; } - public void setFfmpeg_cmd_key(String ffmpeg_cmd_key) { - this.ffmpeg_cmd_key = ffmpeg_cmd_key; + public void setFfmpegCmdKey(String ffmpeg_cmd_key) { + this.ffmpegCmdKey = ffmpeg_cmd_key; } - public String getRtp_type() { - return rtp_type; + public String getRtpType() { + return rtpType; } - public void setRtp_type(String rtp_type) { - this.rtp_type = rtp_type; + public void setRtpType(String rtp_type) { + this.rtpType = rtp_type; } public boolean isEnable() { @@ -137,45 +135,37 @@ public class StreamProxyItem extends GbStream { this.enable = enable; } - public boolean isEnable_mp4() { - return enable_mp4; + public boolean isEnableMp4() { + return enableMp4; } - public void setEnable_mp4(boolean enable_mp4) { - this.enable_mp4 = enable_mp4; + public void setEnableMp4(boolean enable_mp4) { + this.enableMp4 = enable_mp4; } - @Override - public String getCreateTime() { - return createTime; + public boolean isEnableRemoveNoneReader() { + return enableRemoveNoneReader; } - @Override - public void setCreateTime(String createTime) { - this.createTime = createTime; + public void setEnableRemoveNoneReader(boolean enable_remove_none_reader) { + this.enableRemoveNoneReader = enable_remove_none_reader; } - public boolean isEnable_remove_none_reader() { - return enable_remove_none_reader; + public boolean isEnableDisableNoneReader() { + return enableDisableNoneReader; } - public void setEnable_remove_none_reader(boolean enable_remove_none_reader) { - this.enable_remove_none_reader = enable_remove_none_reader; + public void setEnableDisableNoneReader(boolean enable_disable_none_reader) { + this.enableDisableNoneReader = enable_disable_none_reader; } - public boolean isEnable_disable_none_reader() { - return enable_disable_none_reader; + public boolean isEnableAudio() { + return enableAudio; } - public void setEnable_disable_none_reader(boolean enable_disable_none_reader) { - this.enable_disable_none_reader = enable_disable_none_reader; + public void setEnableAudio(boolean enable_audio) { + this.enableAudio = enable_audio; } - public boolean isEnable_audio() { - return enable_audio; - } - public void setEnable_audio(boolean enable_audio) { - this.enable_audio = enable_audio; - } } diff --git a/src/main/java/com/genersoft/iot/vmp/service/IStreamProxyService.java b/src/main/java/com/genersoft/iot/vmp/service/IStreamProxyService.java index 0e1c97bf..c4968a74 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/IStreamProxyService.java +++ b/src/main/java/com/genersoft/iot/vmp/service/IStreamProxyService.java @@ -1,6 +1,7 @@ package com.genersoft.iot.vmp.service; import com.alibaba.fastjson2.JSONObject; +import com.genersoft.iot.vmp.common.GeneralCallback; import com.genersoft.iot.vmp.common.StreamInfo; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem; @@ -13,7 +14,7 @@ public interface IStreamProxyService { * 保存视频代理 * @param param */ - StreamInfo save(StreamProxyItem param); + void save(StreamProxyItem param, GeneralCallback callback); /** * 添加视频代理到zlm diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceServiceImpl.java index 195b1e6d..5f779aad 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceServiceImpl.java @@ -187,7 +187,7 @@ public class DeviceServiceImpl implements IDeviceService { @Override public void offline(String deviceId, String reason) { - logger.error("[设备离线],{}, device:{}", reason, deviceId); + logger.warn("[设备离线],{}, device:{}", reason, deviceId); Device device = deviceMapper.getDeviceByDeviceId(deviceId); if (device == null) { return; diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java index d997a43b..7f8af3f7 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java @@ -418,7 +418,7 @@ public class MediaServerServiceImpl implements IMediaServerService { } final String zlmKeepaliveKey = zlmKeepaliveKeyPrefix + serverItem.getId(); dynamicTask.stop(zlmKeepaliveKey); - dynamicTask.startDelay(zlmKeepaliveKey, new KeepAliveTimeoutRunnable(serverItem), (Math.getExponent(serverItem.getHookAliveInterval()) + 5) * 1000); + dynamicTask.startDelay(zlmKeepaliveKey, new KeepAliveTimeoutRunnable(serverItem), (serverItem.getHookAliveInterval().intValue() + 5) * 1000); publisher.zlmOnlineEventPublish(serverItem.getId()); logger.info("[ZLM] 连接成功 {} - {}:{} ", diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java index 3037e3f1..9ac6ab93 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java @@ -2,12 +2,16 @@ package com.genersoft.iot.vmp.service.impl; import com.alibaba.fastjson2.JSONArray; import com.alibaba.fastjson2.JSONObject; +import com.genersoft.iot.vmp.common.GeneralCallback; import com.genersoft.iot.vmp.common.StreamInfo; import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.conf.exception.ControllerException; import com.genersoft.iot.vmp.gb28181.event.EventPublisher; import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent; import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils; +import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe; +import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory; +import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem; import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam; @@ -85,6 +89,9 @@ public class StreamProxyServiceImpl implements IStreamProxyService { @Autowired private IMediaServerService mediaServerService; + @Autowired + private ZlmHttpHookSubscribe hookSubscribe; + @Autowired DataSourceTransactionManager dataSourceTransactionManager; @@ -93,7 +100,7 @@ public class StreamProxyServiceImpl implements IStreamProxyService { @Override - public StreamInfo save(StreamProxyItem param) { + public void save(StreamProxyItem param, GeneralCallback callback) { MediaServerItem mediaInfo; if (ObjectUtils.isEmpty(param.getMediaServerId()) || "auto".equals(param.getMediaServerId())){ mediaInfo = mediaServerService.getMediaServerForMinimumLoad(null); @@ -104,10 +111,43 @@ public class StreamProxyServiceImpl implements IStreamProxyService { logger.warn("保存代理未找到在线的ZLM..."); throw new ControllerException(ErrorCode.ERROR100.getCode(), "保存代理未找到在线的ZLM"); } - String dstUrl = String.format("rtmp://%s:%s/%s/%s", "127.0.0.1", mediaInfo.getRtmpPort(), param.getApp(), - param.getStream() ); - param.setDst_url(dstUrl); - StringBuffer resultMsg = new StringBuffer(); + String dstUrl; + if ("ffmpeg".equalsIgnoreCase(param.getType())) { + JSONObject jsonObject = zlmresTfulUtils.getMediaServerConfig(mediaInfo); + if (jsonObject.getInteger("code") != 0) { + throw new ControllerException(ErrorCode.ERROR100.getCode(), "获取流媒体配置失败"); + } + JSONArray dataArray = jsonObject.getJSONArray("data"); + JSONObject mediaServerConfig = dataArray.getJSONObject(0); + String ffmpegCmd = mediaServerConfig.getString(param.getFfmpegCmdKey()); + String schema = getSchemaFromFFmpegCmd(ffmpegCmd); + if (schema == null) { + throw new ControllerException(ErrorCode.ERROR100.getCode(), "ffmpeg拉流代理无法从ffmpeg cmd中获取到输出格式"); + } + int port; + String schemaForUri; + if (schema.equalsIgnoreCase("rtsp")) { + port = mediaInfo.getRtspPort(); + schemaForUri = schema; + }else if (schema.equalsIgnoreCase("flv")) { + port = mediaInfo.getHttpPort(); + schemaForUri = "http"; + }else if (schema.equalsIgnoreCase("rtmp")) { + port = mediaInfo.getRtmpPort(); + schemaForUri = schema; + }else { + port = mediaInfo.getRtmpPort(); + schemaForUri = schema; + } + + dstUrl = String.format("%s://%s:%s/%s/%s", schemaForUri, "127.0.0.1", port, param.getApp(), + param.getStream()); + }else { + dstUrl = String.format("rtmp://%s:%s/%s/%s", "127.0.0.1", mediaInfo.getRtmpPort(), param.getApp(), + param.getStream()); + } + param.setDstUrl(dstUrl); + logger.info("[拉流代理] 输出地址为:{}", dstUrl); param.setMediaServerId(mediaInfo.getId()); boolean saveResult; // 更新 @@ -117,29 +157,60 @@ public class StreamProxyServiceImpl implements IStreamProxyService { saveResult = addStreamProxy(param); } if (!saveResult) { - throw new ControllerException(ErrorCode.ERROR100.getCode(),"保存失败"); + callback.run(ErrorCode.ERROR100.getCode(), "保存失败", null); + return; } - StreamInfo resultForStreamInfo = null; - resultMsg.append("保存成功"); + + HookSubscribeForStreamChange hookSubscribeForStreamChange = HookSubscribeFactory.on_stream_changed(param.getApp(), param.getStream(), true, "rtsp", mediaInfo.getId()); + hookSubscribe.addSubscribe(hookSubscribeForStreamChange, (mediaServerItem, response) -> { + StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStream( + mediaInfo, param.getApp(), param.getStream(), null, null); + callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), streamInfo); + }); + if (param.isEnable()) { JSONObject jsonObject = addStreamProxyToZlm(param); - if (jsonObject == null || jsonObject.getInteger("code") != 0) { - resultMsg.append(", 但是启用失败,请检查流地址是否可用"); + if (jsonObject != null && jsonObject.getInteger("code") == 0) { + hookSubscribe.removeSubscribe(hookSubscribeForStreamChange); + StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStream( + mediaInfo, param.getApp(), param.getStream(), null, null); + callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), streamInfo); + }else { param.setEnable(false); // 直接移除 - if (param.isEnable_remove_none_reader()) { + if (param.isEnableRemoveNoneReader()) { del(param.getApp(), param.getStream()); }else { updateStreamProxy(param); } + if (jsonObject == null){ + callback.run(ErrorCode.ERROR100.getCode(), "记录已保存,启用失败", null); + return; + }else { + callback.run(ErrorCode.ERROR100.getCode(), jsonObject.getString("msg"), null); + return; + } + } + } + } - }else { - resultForStreamInfo = mediaService.getStreamInfoByAppAndStream( - mediaInfo, param.getApp(), param.getStream(), null, null); + private String getSchemaFromFFmpegCmd(String ffmpegCmd) { + ffmpegCmd = ffmpegCmd.replaceAll(" + ", " "); + String[] paramArray = ffmpegCmd.split(" "); + if (paramArray.length == 0) { + return null; + } + for (int i = 0; i < paramArray.length; i++) { + if (paramArray[i].equalsIgnoreCase("-f")) { + if (i + 1 < paramArray.length - 1) { + return paramArray[i+1]; + }else { + return null; + } } } - return resultForStreamInfo; + return null; } /** @@ -228,11 +299,11 @@ public class StreamProxyServiceImpl implements IStreamProxyService { } if ("default".equals(param.getType())){ result = zlmresTfulUtils.addStreamProxy(mediaServerItem, param.getApp(), param.getStream(), param.getUrl(), - param.isEnable_audio(), param.isEnable_mp4(), param.getRtp_type()); + param.isEnableAudio(), param.isEnableMp4(), param.getRtpType()); }else if ("ffmpeg".equals(param.getType())) { - result = zlmresTfulUtils.addFFmpegSource(mediaServerItem, param.getSrc_url(), param.getDst_url(), - param.getTimeout_ms() + "", param.isEnable_audio(), param.isEnable_mp4(), - param.getFfmpeg_cmd_key()); + result = zlmresTfulUtils.addFFmpegSource(mediaServerItem, param.getSrcUrl(), param.getDstUrl(), + param.getTimeoutMs() + "", param.isEnableAudio(), param.isEnableMp4(), + param.getFfmpegCmdKey()); } return result; } @@ -286,7 +357,7 @@ public class StreamProxyServiceImpl implements IStreamProxyService { updateStreamProxy(streamProxy); }else { logger.info("启用代理失败: {}/{}->{}({})", app, stream, jsonObject.getString("msg"), - streamProxy.getSrc_url() == null? streamProxy.getUrl():streamProxy.getSrc_url()); + streamProxy.getSrcUrl() == null? streamProxy.getUrl():streamProxy.getSrcUrl()); } } return result; diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java index 0a03c660..dcaab9e3 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java @@ -183,6 +183,7 @@ public class StreamPushServiceImpl implements IStreamPushService { @Override public boolean stop(String app, String streamId) { + logger.info("[推流 ] 停止流: {}/{}", app, streamId); StreamPushItem streamPushItem = streamPushMapper.selectOne(app, streamId); if (streamPushItem != null) { gbStreamService.sendCatalogMsg(streamPushItem, CatalogEvent.DEL); diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisCloseStreamMsgListener.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisCloseStreamMsgListener.java new file mode 100644 index 00000000..d0104756 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisCloseStreamMsgListener.java @@ -0,0 +1,59 @@ +package com.genersoft.iot.vmp.service.redisMsg; + +import com.alibaba.fastjson2.JSON; +import com.alibaba.fastjson2.JSONObject; +import com.genersoft.iot.vmp.service.IStreamPushService; +import org.jetbrains.annotations.NotNull; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.data.redis.connection.Message; +import org.springframework.data.redis.connection.MessageListener; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; +import org.springframework.stereotype.Component; + +import java.util.concurrent.ConcurrentLinkedQueue; + +/** + * 接收来自redis的关闭流更新通知 + * @author lin + */ +@Component +public class RedisCloseStreamMsgListener implements MessageListener { + + private final static Logger logger = LoggerFactory.getLogger(RedisCloseStreamMsgListener.class); + + + @Autowired + private IStreamPushService pushService; + + private ConcurrentLinkedQueue taskQueue = new ConcurrentLinkedQueue<>(); + + @Qualifier("taskExecutor") + @Autowired + private ThreadPoolTaskExecutor taskExecutor; + + @Override + public void onMessage(@NotNull Message message, byte[] bytes) { + boolean isEmpty = taskQueue.isEmpty(); + taskQueue.offer(message); + if (isEmpty) { + taskExecutor.execute(() -> { + while (!taskQueue.isEmpty()) { + Message msg = taskQueue.poll(); + try { + JSONObject jsonObject = JSON.parseObject(msg.getBody()); + String app = jsonObject.getString("app"); + String stream = jsonObject.getString("stream"); + pushService.stop(app, stream); + + }catch (Exception e) { + logger.warn("[REDIS的关闭推流通知] 发现未处理的异常, \r\n{}", JSON.toJSONString(message)); + logger.error("[REDIS的关闭推流通知] 异常内容: ", e); + } + } + }); + } + } +} diff --git a/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java b/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java index 5e6c16b3..469f6c8c 100644 --- a/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java @@ -202,4 +202,5 @@ public interface IRedisCatchStorage { void removeAllDevice(); void sendDeviceOrChannelStatus(String deviceId, String channelId, boolean online); + void sendChannelAddOrDelete(String deviceId, String channelId, boolean add); } diff --git a/src/main/java/com/genersoft/iot/vmp/storager/dao/ParentPlatformMapper.java b/src/main/java/com/genersoft/iot/vmp/storager/dao/ParentPlatformMapper.java index 056b409c..2c6852af 100644 --- a/src/main/java/com/genersoft/iot/vmp/storager/dao/ParentPlatformMapper.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/dao/ParentPlatformMapper.java @@ -96,6 +96,6 @@ public interface ParentPlatformMapper { @Select("select 'channel' as name, count(pgc.platform_id) count from wvp_platform_gb_channel pgc left join wvp_device_channel dc on dc.id = pgc.device_channel_id where pgc.platform_id=#{platform_id} and dc.channel_id =#{gbId} " + "union " + - "select 'stream' as name, count(pgs.platform_id) count from wvp_platform_gb_stream pgs left join wvp_gb_stream gs on pgs.gb_stream_id = gs.gb_stream_id where pgs.platform_id=#{platform_id} and gs.gb_id #{gbId}") + "select 'stream' as name, count(pgs.platform_id) count from wvp_platform_gb_stream pgs left join wvp_gb_stream gs on pgs.gb_stream_id = gs.gb_stream_id where pgs.platform_id=#{platform_id} and gs.gb_id =#{gbId}") List getChannelSource(String platform_id, String gbId); } diff --git a/src/main/java/com/genersoft/iot/vmp/storager/dao/PlatformCatalogMapper.java b/src/main/java/com/genersoft/iot/vmp/storager/dao/PlatformCatalogMapper.java index 19e7d1ab..15fed7b0 100644 --- a/src/main/java/com/genersoft/iot/vmp/storager/dao/PlatformCatalogMapper.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/dao/PlatformCatalogMapper.java @@ -12,7 +12,7 @@ import java.util.List; @Repository public interface PlatformCatalogMapper { - @Insert("INSERT INTO platform_catalog (id, name, platform_id, parent_id, civil_code, business_group_id) VALUES" + + @Insert("INSERT INTO wvp_platform_catalog (id, name, platform_id, parent_id, civil_code, business_group_id) VALUES" + "(#{id}, #{name}, #{platformId}, #{parentId}, #{civilCode}, #{businessGroupId})") int add(PlatformCatalog platformCatalog); @@ -32,7 +32,7 @@ public interface PlatformCatalogMapper { PlatformCatalog select(String id); @Update(value = {" "}) @@ -41,11 +41,11 @@ public interface PlatformCatalogMapper { @Select("SELECT *, (SELECT COUNT(1) from wvp_platform_catalog where parent_id = pc.id) as children_count from wvp_platform_catalog pc WHERE pc.platform_id=#{platformId}") List selectByPlatForm(String platformId); - @Select("SELECT pc.* FROM platform_catalog pc WHERE pc.id = (SELECT pp.catalog_id from wvp_platform pp WHERE pp.server_gb_id=#{platformId})") + @Select("SELECT pc.* FROM wvp_platform_catalog pc WHERE pc.id = (SELECT pp.catalog_id from wvp_platform pp WHERE pp.server_gb_id=#{platformId})") PlatformCatalog selectDefaultByPlatFormId(String platformId); - @Select("SELECT pc.* FROM platform_catalog pc WHERE pc.id = #{id}") + @Select("SELECT pc.* FROM wvp_platform_catalog pc WHERE pc.id = #{id}") PlatformCatalog selectParentCatalog(String id); @Select("SELECT pc.id as channel_id, pc.name, pc.civil_code, pc.business_group_id,'1' as parental, pc.parent_id " + diff --git a/src/main/java/com/genersoft/iot/vmp/storager/dao/StreamProxyMapper.java b/src/main/java/com/genersoft/iot/vmp/storager/dao/StreamProxyMapper.java index 3a1a69ea..a5a80c47 100644 --- a/src/main/java/com/genersoft/iot/vmp/storager/dao/StreamProxyMapper.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/dao/StreamProxyMapper.java @@ -13,9 +13,9 @@ public interface StreamProxyMapper { @Insert("INSERT INTO wvp_stream_proxy (type, name, app, stream,media_server_id, url, src_url, dst_url, " + "timeout_ms, ffmpeg_cmd_key, rtp_type, enable_audio, enable_mp4, enable, status, enable_remove_none_reader, enable_disable_none_reader, create_time) VALUES" + - "(#{type}, #{name}, #{app}, #{stream}, #{mediaServerId}, #{url}, #{src_url}, #{dst_url}, " + - "#{timeout_ms}, #{ffmpeg_cmd_key}, #{rtp_type}, #{enable_audio}, #{enable_mp4}, #{enable}, #{status}, " + - "#{enable_remove_none_reader}, #{enable_disable_none_reader}, #{createTime} )") + "(#{type}, #{name}, #{app}, #{stream}, #{mediaServerId}, #{url}, #{srcUrl}, #{dstUrl}, " + + "#{timeoutMs}, #{ffmpegCmdKey}, #{rtpType}, #{enableAudio}, #{enableMp4}, #{enable}, #{status}, " + + "#{enableRemoveNoneReader}, #{enableDisableNoneReader}, #{createTime} )") int add(StreamProxyItem streamProxyDto); @Update("UPDATE wvp_stream_proxy " + @@ -25,17 +25,17 @@ public interface StreamProxyMapper { "stream=#{stream}," + "url=#{url}, " + "media_server_id=#{mediaServerId}, " + - "src_url=#{src_url}," + - "dst_url=#{dst_url}, " + - "timeout_ms=#{timeout_ms}, " + - "ffmpeg_cmd_key=#{ffmpeg_cmd_key}, " + - "rtp_type=#{rtp_type}, " + - "enable_audio=#{enable_audio}, " + + "src_url=#{srcUrl}," + + "dst_url=#{dstUrl}, " + + "timeout_ms=#{timeoutMs}, " + + "ffmpeg_cmd_key=#{ffmpegCmdKey}, " + + "rtp_type=#{rtpType}, " + + "enable_audio=#{enableAudio}, " + "enable=#{enable}, " + "status=#{status}, " + - "enable_remove_none_reader=#{enable_remove_none_reader}, " + - "enable_disable_none_reader=#{enable_disable_none_reader}, " + - "enable_mp4=#{enable_mp4} " + + "enable_remove_none_reader=#{enableRemoveNoneReader}, " + + "enable_disable_none_reader=#{enableDisableNoneReader}, " + + "enable_mp4=#{enableMp4} " + "WHERE app=#{app} AND stream=#{stream}") int update(StreamProxyItem streamProxyDto); diff --git a/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java b/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java index 75e91982..5360294d 100644 --- a/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java @@ -596,18 +596,29 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage { @Override public void sendDeviceOrChannelStatus(String deviceId, String channelId, boolean online) { String key = VideoManagerConstants.VM_MSG_SUBSCRIBE_DEVICE_STATUS; - if (channelId == null) { - logger.info("[redis通知] 推送设备状态, {}-{}", deviceId, online); - }else { - logger.info("[redis通知] 推送通道状态, {}/{}-{}", deviceId, channelId, online); - } - StringBuilder msg = new StringBuilder(); msg.append(deviceId); if (channelId != null) { msg.append(":").append(channelId); } msg.append(" ").append(online? "ON":"OFF"); + logger.info("[redis通知] 推送状态-> {} ", msg); + // 使用 RedisTemplate 发送字符串消息会导致发送的消息多带了双引号 + stringRedisTemplate.convertAndSend(key, msg.toString()); + } + + @Override + public void sendChannelAddOrDelete(String deviceId, String channelId, boolean add) { + String key = VideoManagerConstants.VM_MSG_SUBSCRIBE_DEVICE_STATUS; + + + StringBuilder msg = new StringBuilder(); + msg.append(deviceId); + if (channelId != null) { + msg.append(":").append(channelId); + } + msg.append(" ").append(add? "ADD":"DELETE"); + logger.info("[redis通知] 推送通道-> {}", msg); // 使用 RedisTemplate 发送字符串消息会导致发送的消息多带了双引号 stringRedisTemplate.convertAndSend(key, msg.toString()); } diff --git a/src/main/java/com/genersoft/iot/vmp/vmanager/bean/ErrorCode.java b/src/main/java/com/genersoft/iot/vmp/vmanager/bean/ErrorCode.java index 7e2b5120..e2e3879b 100644 --- a/src/main/java/com/genersoft/iot/vmp/vmanager/bean/ErrorCode.java +++ b/src/main/java/com/genersoft/iot/vmp/vmanager/bean/ErrorCode.java @@ -6,7 +6,7 @@ package com.genersoft.iot.vmp.vmanager.bean; public enum ErrorCode { SUCCESS(0, "成功"), ERROR100(100, "失败"), - ERROR400(400, "参数不全或者错误"), + ERROR400(400, "参数或方法错误"), ERROR404(404, "资源未找到"), ERROR403(403, "无权限操作"), ERROR401(401, "请登录后重新请求"), diff --git a/src/main/java/com/genersoft/iot/vmp/vmanager/bean/SnapPath.java b/src/main/java/com/genersoft/iot/vmp/vmanager/bean/SnapPath.java new file mode 100644 index 00000000..ce34d72c --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/vmanager/bean/SnapPath.java @@ -0,0 +1,50 @@ +package com.genersoft.iot.vmp.vmanager.bean; + +import io.swagger.v3.oas.annotations.media.Schema; + +@Schema(description = "截图地址信息") +public class SnapPath { + + @Schema(description = "相对地址") + private String path; + + @Schema(description = "绝对地址") + private String absoluteFilePath; + + @Schema(description = "请求地址") + private String url; + + + public static SnapPath getInstance(String path, String absoluteFilePath, String url) { + SnapPath snapPath = new SnapPath(); + snapPath.setPath(path); + snapPath.setAbsoluteFilePath(absoluteFilePath); + snapPath.setUrl(url); + return snapPath; + } + + + public String getPath() { + return path; + } + + public void setPath(String path) { + this.path = path; + } + + public String getAbsoluteFilePath() { + return absoluteFilePath; + } + + public void setAbsoluteFilePath(String absoluteFilePath) { + this.absoluteFilePath = absoluteFilePath; + } + + public String getUrl() { + return url; + } + + public void setUrl(String url) { + this.url = url; + } +} diff --git a/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/MobilePosition/MobilePositionController.java b/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/MobilePosition/MobilePositionController.java index 5046faa0..684708cd 100644 --- a/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/MobilePosition/MobilePositionController.java +++ b/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/MobilePosition/MobilePositionController.java @@ -102,7 +102,7 @@ public class MobilePositionController { public DeferredResult realTimePosition(@PathVariable String deviceId) { Device device = storager.queryVideoDevice(deviceId); String uuid = UUID.randomUUID().toString(); - String key = DeferredResultHolder.CALLBACK_CMD_MOBILEPOSITION + deviceId; + String key = DeferredResultHolder.CALLBACK_CMD_MOBILE_POSITION + deviceId; try { cmder.mobilePostitionQuery(device, event -> { RequestMessage msg = new RequestMessage(); diff --git a/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/device/DeviceQuery.java b/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/device/DeviceQuery.java index 0ff8ba11..06dfb006 100644 --- a/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/device/DeviceQuery.java +++ b/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/device/DeviceQuery.java @@ -466,10 +466,12 @@ public class DeviceQuery { @Operation(summary = "请求截图") @Parameter(name = "deviceId", description = "设备国标编号", required = true) @Parameter(name = "channelId", description = "通道国标编号", required = true) - public void getSnap(HttpServletResponse resp, @PathVariable String deviceId, @PathVariable String channelId) { + @Parameter(name = "mark", description = "标识", required = false) + public void getSnap(HttpServletResponse resp, @PathVariable String deviceId, @PathVariable String channelId, @RequestParam(required = false) String mark) { try { - final InputStream in = Files.newInputStream(new File("snap" + File.separator + deviceId + "_" + channelId + ".jpg").toPath()); + + final InputStream in = Files.newInputStream(new File("snap" + File.separator + deviceId + "_" + channelId + (mark == null? ".jpg": ("_" + mark + ".jpg"))).toPath()); resp.setContentType(MediaType.IMAGE_PNG_VALUE); IOUtils.copy(in, resp.getOutputStream()); } catch (IOException e) { diff --git a/src/main/java/com/genersoft/iot/vmp/vmanager/streamProxy/StreamProxyController.java b/src/main/java/com/genersoft/iot/vmp/vmanager/streamProxy/StreamProxyController.java index 4a8522b6..0689f423 100644 --- a/src/main/java/com/genersoft/iot/vmp/vmanager/streamProxy/StreamProxyController.java +++ b/src/main/java/com/genersoft/iot/vmp/vmanager/streamProxy/StreamProxyController.java @@ -1,13 +1,18 @@ package com.genersoft.iot.vmp.vmanager.streamProxy; import com.alibaba.fastjson2.JSONObject; +import com.genersoft.iot.vmp.common.StreamInfo; +import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.conf.exception.ControllerException; +import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder; +import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem; import com.genersoft.iot.vmp.service.IMediaServerService; import com.genersoft.iot.vmp.service.IStreamProxyService; import com.genersoft.iot.vmp.vmanager.bean.ErrorCode; import com.genersoft.iot.vmp.vmanager.bean.StreamContent; +import com.genersoft.iot.vmp.vmanager.bean.WVPResult; import com.github.pagehelper.PageInfo; import io.swagger.v3.oas.annotations.Operation; import io.swagger.v3.oas.annotations.Parameter; @@ -18,6 +23,9 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Controller; import org.springframework.util.ObjectUtils; import org.springframework.web.bind.annotation.*; +import org.springframework.web.context.request.async.DeferredResult; + +import java.util.UUID; @SuppressWarnings("rawtypes") /** @@ -37,6 +45,12 @@ public class StreamProxyController { @Autowired private IStreamProxyService streamProxyService; + @Autowired + private DeferredResultHolder resultHolder; + + @Autowired + private UserSetting userSetting; + @Operation(summary = "分页查询流代理") @Parameter(name = "page", description = "当前页") @@ -58,7 +72,7 @@ public class StreamProxyController { }) @PostMapping(value = "/save") @ResponseBody - public StreamContent save(@RequestBody StreamProxyItem param){ + public DeferredResult save(@RequestBody StreamProxyItem param){ logger.info("添加代理: " + JSONObject.toJSONString(param)); if (ObjectUtils.isEmpty(param.getMediaServerId())) { param.setMediaServerId("auto"); @@ -69,7 +83,33 @@ public class StreamProxyController { if (ObjectUtils.isEmpty(param.getGbId())) { param.setGbId(null); } - return new StreamContent(streamProxyService.save(param)); + + RequestMessage requestMessage = new RequestMessage(); + String key = DeferredResultHolder.CALLBACK_CMD_PROXY + param.getApp() + param.getStream(); + requestMessage.setKey(key); + String uuid = UUID.randomUUID().toString(); + requestMessage.setId(uuid); + DeferredResult result = new DeferredResult<>(userSetting.getPlayTimeout().longValue()); + // 录像查询以channelId作为deviceId查询 + resultHolder.put(key, uuid, result); + result.onTimeout(()->{ + WVPResult wvpResult = new WVPResult<>(); + wvpResult.setCode(ErrorCode.ERROR100.getCode()); + wvpResult.setMsg("超时"); + requestMessage.setData(wvpResult); + resultHolder.invokeAllResult(requestMessage); + }); + + streamProxyService.save(param, (code, msg, streamInfo) -> { + logger.info("[拉流代理] {}", code == ErrorCode.SUCCESS.getCode()? "成功":"失败: " + msg); + if (code == ErrorCode.SUCCESS.getCode()) { + requestMessage.setData(new StreamContent(streamInfo)); + }else { + requestMessage.setData(WVPResult.fail(code, msg)); + } + resultHolder.invokeAllResult(requestMessage); + }); + return result; } @GetMapping(value = "/ffmpeg_cmd/list") diff --git a/src/main/resources/all-application.yml b/src/main/resources/all-application.yml index def2a3c9..d67eac5a 100644 --- a/src/main/resources/all-application.yml +++ b/src/main/resources/all-application.yml @@ -43,10 +43,6 @@ spring: idle-timeout: 300000 # 允许连接在连接池中空闲的最长时间(以毫秒为单位) max-lifetime: 1200000 # 是池中连接关闭后的最长生命周期(以毫秒为单位) -# 修改为数据库字段下划线分隔直接对应java驼峰命名 -mybatis: - configuration: - map-underscore-to-camel-case: true # 修改分页插件为 postgresql, 数据库类型为mysql不需要 #pagehelper: diff --git a/web_src/src/components/StreamProxyList.vue b/web_src/src/components/StreamProxyList.vue index 47ccde8b..ff079cb8 100644 --- a/web_src/src/components/StreamProxyList.vue +++ b/web_src/src/components/StreamProxyList.vue @@ -22,8 +22,8 @@ {{scope.row.url}} - - {{scope.row.src_url}} + + {{scope.row.srcUrl}} @@ -58,25 +58,25 @@ @@ -197,7 +197,7 @@ this.$refs.onvifEdit.openDialog(res.data.data, (url)=>{ if (url != null) { this.$refs.onvifEdit.close(); - this.$refs.streamProxyEdit.openDialog({type: "default", url: url, src_url: url}, this.initData()) + this.$refs.streamProxyEdit.openDialog({type: "default", url: url, srcUrl: url}, this.initData()) } }) }else { @@ -245,18 +245,25 @@ }, deleteStreamProxy: function(row){ let that = this; - that.$axios({ - method:"delete", - url:"/api/proxy/del", - params:{ - app: row.app, - stream: row.stream - } - }).then((res)=>{ - that.initData() - }).catch(function (error) { - console.log(error); - }); + this.$confirm('确定删除此代理吗?', '提示', { + confirmButtonText: '确定', + cancelButtonText: '取消', + type: 'warning' + }).then(() => { + that.$axios({ + method:"delete", + url:"/api/proxy/del", + params:{ + app: row.app, + stream: row.stream + } + }).then((res)=>{ + that.initData() + }).catch(function (error) { + console.log(error); + }); + }).catch(() => { + }); }, start: function(row){ this.stopUpdateList() diff --git a/web_src/src/components/dialog/StreamProxyEdit.vue b/web_src/src/components/dialog/StreamProxyEdit.vue index 76011fac..588f1142 100644 --- a/web_src/src/components/dialog/StreamProxyEdit.vue +++ b/web_src/src/components/dialog/StreamProxyEdit.vue @@ -33,13 +33,13 @@ - - + + - - + + - + - - + @@ -72,9 +71,9 @@ - + @@ -83,10 +82,10 @@ - + @@ -98,8 +97,8 @@
- - + +
@@ -155,17 +154,17 @@ export default { app: null, stream: null, url: "", - src_url: null, - timeout_ms: null, - ffmpeg_cmd_key: null, + srcUrl: null, + timeoutMs: null, + ffmpegCmdKey: null, gbId: null, - rtp_type: null, + rtpType: null, enable: true, - enable_audio: true, - enable_mp4: false, - none_reader: null, - enable_remove_none_reader: false, - enable_disable_none_reader: false, + enableAudio: true, + enableMp4: false, + noneReader: null, + enableRemoveNoneReader: false, + enableDisableNoneReader: false, platformGbId: null, mediaServerId: null, }, @@ -177,9 +176,9 @@ export default { app: [{ required: true, message: "请输入应用名", trigger: "blur" }], stream: [{ required: true, message: "请输入流ID", trigger: "blur" }], url: [{ required: true, message: "请输入要代理的流", trigger: "blur" }], - src_url: [{ required: true, message: "请输入要代理的流", trigger: "blur" }], - timeout_ms: [{ required: true, message: "请输入FFmpeg推流成功超时时间", trigger: "blur" }], - ffmpeg_cmd_key: [{ required: false, message: "请输入FFmpeg命令参数模板(可选)", trigger: "blur" }], + srcUrl: [{ required: true, message: "请输入要代理的流", trigger: "blur" }], + timeoutMs: [{ required: true, message: "请输入FFmpeg推流成功超时时间", trigger: "blur" }], + ffmpegCmdKey: [{ required: false, message: "请输入FFmpeg命令参数模板(可选)", trigger: "blur" }], }, }; }, @@ -189,7 +188,7 @@ export default { this.listChangeCallback = callback; if (proxyParam != null) { this.proxyParam = proxyParam; - this.proxyParam.none_reader = null; + this.proxyParam.noneReader = null; } let that = this; @@ -218,7 +217,7 @@ export default { } }).then(function (res) { that.ffmpegCmdList = res.data.data; - that.proxyParam.ffmpeg_cmd_key = Object.keys(res.data.data)[0]; + that.proxyParam.ffmpegCmdKey = Object.keys(res.data.data)[0]; }).catch(function (error) { console.log(error); }); @@ -275,15 +274,15 @@ export default { } }, noneReaderHandler: function() { - if (this.proxyParam.none_reader === null || this.proxyParam.none_reader === "0") { - this.proxyParam.enable_disable_none_reader = false; - this.proxyParam.enable_remove_none_reader = false; - }else if (this.proxyParam.none_reader === "1"){ - this.proxyParam.enable_disable_none_reader = true; - this.proxyParam.enable_remove_none_reader = false; - }else if (this.proxyParam.none_reader ==="2"){ - this.proxyParam.enable_disable_none_reader = false; - this.proxyParam.enable_remove_none_reader = true; + if (this.proxyParam.noneReader === null || this.proxyParam.noneReader === "0") { + this.proxyParam.enableDisableNoneReader = false; + this.proxyParam.enableRemoveNoneReader = false; + }else if (this.proxyParam.noneReader === "1"){ + this.proxyParam.enableDisableNoneReader = true; + this.proxyParam.enableRemoveNoneReader = false; + }else if (this.proxyParam.noneReader ==="2"){ + this.proxyParam.enableDisableNoneReader = false; + this.proxyParam.enableRemoveNoneReader = true; } }, }, diff --git a/web_src/src/components/dialog/devicePlayer.vue b/web_src/src/components/dialog/devicePlayer.vue index e83a29c5..9e4eea00 100644 --- a/web_src/src/components/dialog/devicePlayer.vue +++ b/web_src/src/components/dialog/devicePlayer.vue @@ -14,7 +14,6 @@ h265web敬请期待 - wsPlayer 敬请期待 @@ -451,7 +450,15 @@ export default { playFromStreamInfo: function (realHasAudio, streamInfo) { this.showVideoDialog = true; this.hasaudio = realHasAudio && this.hasaudio; - this.$refs[this.activePlayer].play(this.getUrlByStreamInfo(streamInfo)) + if (this.$refs[this.activePlayer]) { + this.$refs[this.activePlayer].play(this.getUrlByStreamInfo(streamInfo)) + }else { + this.$nextTick(() => { + this.$refs[this.activePlayer].play(this.getUrlByStreamInfo(streamInfo)) + }); + } + + }, close: function () { console.log('关闭视频');