diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/session/PresetDataCatch.java b/src/main/java/com/genersoft/iot/vmp/gb28181/session/PresetDataCatch.java index 886ff778..ab096856 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/session/PresetDataCatch.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/session/PresetDataCatch.java @@ -38,13 +38,13 @@ public class PresetDataCatch { } } - public void put(int sn, int total, Device device, List deviceChannelList) { + public void put(int sn, int total, List presetItemList) { PresetData presetData = data.get(sn); if (presetData == null) { presetData = new PresetData(); presetData.setSn(sn); presetData.setTotal(total); - presetData.setPresetItemList(deviceChannelList); + presetData.setPresetItemList(presetItemList); presetData.setStatus(DataStatus.runIng); presetData.setLastTime(Instant.now()); data.put(sn, presetData); @@ -55,7 +55,7 @@ public class PresetDataCatch { } presetData.setTotal(total); presetData.setStatus(DataStatus.runIng); - presetData.getPresetItemList().addAll(deviceChannelList); + presetData.getPresetItemList().addAll(presetItemList); presetData.setLastTime(Instant.now()); } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java index 4e5ba645..5c1368ac 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java @@ -569,9 +569,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements sendRtpItem.setToTag(response.getToTag()); } redisCatchStorage.updateSendRTPSever(sendRtpItem); - } - } private void pushStream(RequestEvent evt, SIPRequest request, GbStream gbStream, StreamPush streamPushItem, ParentPlatform platform, CallIdHeader callIdHeader, MediaServerItem mediaServerItem, diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/PresetQueryResponseMessageHandler.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/PresetQueryResponseMessageHandler.java index f4d1c8f5..0917f26f 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/PresetQueryResponseMessageHandler.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/PresetQueryResponseMessageHandler.java @@ -2,6 +2,7 @@ package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.respon import com.genersoft.iot.vmp.gb28181.bean.PresetQuerySipReq; import com.genersoft.iot.vmp.gb28181.bean.*; +import com.genersoft.iot.vmp.gb28181.session.PresetDataCatch; import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder; import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage; import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent; @@ -43,6 +44,9 @@ public class PresetQueryResponseMessageHandler extends SIPRequestProcessorParent @Autowired private DeferredResultHolder deferredResultHolder; + @Autowired + private PresetDataCatch presetDataCatch; + @Override @@ -68,14 +72,14 @@ public class PresetQueryResponseMessageHandler extends SIPRequestProcessorParent return; } Element presetListNumElement = rootElement.element("PresetList"); - String sn = getText(rootElement, "SN"); + String snStr = getText(rootElement, "SN"); //该字段可能为通道或则设备的id String channelId = getText(rootElement, "DeviceID"); if (channelId == null) { } String key = DeferredResultHolder.CALLBACK_CMD_PRESETQUERY + deviceId + channelId; - if (sn == null || presetListNumElement == null) { + if (snStr == null || presetListNumElement == null) { try { responseAck(request, Response.BAD_REQUEST, "xml error"); } catch (InvalidArgumentException | ParseException | SipException e) { @@ -84,29 +88,34 @@ public class PresetQueryResponseMessageHandler extends SIPRequestProcessorParent return; } int sumNum = Integer.parseInt(presetListNumElement.attributeValue("Num")); - List presetQuerySipReqList = new ArrayList<>(); - if (sumNum > 0) { + int sn = Integer.parseInt(snStr); + List presetItems = new ArrayList<>(); + if (sumNum == 0) { + presetDataCatch.setChannelSyncEnd(sn, null ); + }else { for (Iterator presetIterator = presetListNumElement.elementIterator(); presetIterator.hasNext(); ) { Element itemListElement = presetIterator.next(); - PresetQuerySipReq presetQuerySipReq = new PresetQuerySipReq(); + PresetItem presetItem = new PresetItem(); for (Iterator itemListIterator = itemListElement.elementIterator(); itemListIterator.hasNext(); ) { // 遍历item Element itemOne = itemListIterator.next(); String name = itemOne.getName(); String textTrim = itemOne.getTextTrim(); if ("PresetID".equalsIgnoreCase(name)) { - presetQuerySipReq.setPresetId(textTrim); + presetItem.setPresetID(Integer.parseInt(textTrim)); } else { - presetQuerySipReq.setPresetName(textTrim); + presetItem.setPresetName(textTrim); } } - presetQuerySipReqList.add(presetQuerySipReq); + } + presetDataCatch.put(sn, sumNum, presetItems); + if (sumNum == presetDataCatch.get(sn).size()) { + RequestMessage requestMessage = new RequestMessage(); + requestMessage.setKey(key); + requestMessage.setData(presetDataCatch.get(sn)); + deferredResultHolder.invokeAllResult(requestMessage); } } - RequestMessage requestMessage = new RequestMessage(); - requestMessage.setKey(key); - requestMessage.setData(presetQuerySipReqList); - deferredResultHolder.invokeAllResult(requestMessage); try { responseAck(request, Response.OK); } catch (InvalidArgumentException | ParseException | SipException e) { diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java index 439c2378..c905ea36 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java @@ -206,41 +206,41 @@ public class PlayServiceImpl implements IPlayService { // 超时处理 String timeOutTaskKey = UUID.randomUUID().toString(); dynamicTask.startDelay(timeOutTaskKey, () -> { + // 取消订阅消息监听 + HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed("rtp", ssrcInfo.getStream(), true, "rtsp", mediaServerItem.getId()); + subscribe.removeSubscribe(hookSubscribe); + // 执行超时任务时查询是否已经成功,成功了则不执行超时任务,防止超时任务取消失败的情况 InviteInfo inviteInfoForTimeOut = inviteStreamService.getInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, device.getDeviceId(), channelId); - if (inviteInfoForTimeOut == null || inviteInfoForTimeOut.getStreamInfo() == null) { - logger.info("[点播超时] 收流超时 deviceId: {}, channelId: {},码流类型:{},端口:{}, SSRC: {}", + int code; + String msg; + if (inviteInfoForTimeOut == null || inviteInfoForTimeOut.getStatus().equals(InviteSessionStatus.ready)) { + logger.info("[点播超时] 信令超时 deviceId: {}, channelId: {},码流类型:{},端口:{}, SSRC: {}", device.getDeviceId(), channelId, device.isSwitchPrimarySubStream() ? "辅码流" : "主码流", ssrcInfo.getPort(), ssrcInfo.getSsrc()); - - callback.run(InviteErrorCode.ERROR_FOR_STREAM_TIMEOUT.getCode(), InviteErrorCode.ERROR_FOR_STREAM_TIMEOUT.getMsg(), null); - inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId, null, - InviteErrorCode.ERROR_FOR_STREAM_TIMEOUT.getCode(), InviteErrorCode.ERROR_FOR_STREAM_TIMEOUT.getMsg(), null); - inviteStreamService.removeInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, device.getDeviceId(), channelId); - - try { - cmder.streamByeCmd(device, channelId, ssrcInfo.getStream(), null); - } catch (InvalidArgumentException | ParseException | SipException | SsrcTransactionNotFoundException e) { - logger.error("[点播超时], 发送BYE失败 {}", e.getMessage()); - } finally { - mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc()); - mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream()); - streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream()); - mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream()); - // 取消订阅消息监听 - HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed("rtp", ssrcInfo.getStream(), true, "rtsp", mediaServerItem.getId()); - subscribe.removeSubscribe(hookSubscribe); - } + callback.run(InviteErrorCode.ERROR_FOR_SIGNALLING_TIMEOUT.getCode(), InviteErrorCode.ERROR_FOR_SIGNALLING_TIMEOUT.getMsg(), null); + code = InviteErrorCode.ERROR_FOR_SIGNALLING_TIMEOUT.getCode(); + msg = InviteErrorCode.ERROR_FOR_SIGNALLING_TIMEOUT.getMsg(); }else { logger.info("[点播超时] 收流超时 deviceId: {}, channelId: {},码流类型:{},端口:{}, SSRC: {}", device.getDeviceId(), channelId, device.isSwitchPrimarySubStream() ? "辅码流" : "主码流", ssrcInfo.getPort(), ssrcInfo.getSsrc()); + code = InviteErrorCode.ERROR_FOR_STREAM_TIMEOUT.getCode(); + msg = InviteErrorCode.ERROR_FOR_STREAM_TIMEOUT.getMsg(); - mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc()); - - mediaServerService.closeRTPServer(mediaServerItem.getId(), ssrcInfo.getStream()); - streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream()); + try { + cmder.streamByeCmd(device, channelId, ssrcInfo.getStream(), null); + } catch (InvalidArgumentException | ParseException | SipException | SsrcTransactionNotFoundException e) { + logger.error("[点播超时],发送BYE失败 {}", e.getMessage()); + } } + callback.run(code, msg, null); + inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId, null, code, msg, null); + inviteStreamService.removeInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, device.getDeviceId(), channelId); + mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc()); + streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream()); + mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream()); + }, userSetting.getPlayTimeout()); try { @@ -592,9 +592,11 @@ public class PlayServiceImpl implements IPlayService { Device device, String channelId, String timeOutTaskKey, ErrorCallback callback, InviteInfo inviteInfo, InviteSessionType inviteSessionType){ inviteInfo.setStatus(InviteSessionStatus.ok); + inviteStreamService.updateInviteInfo(inviteInfo); ResponseEvent responseEvent = (ResponseEvent) eventResult.event; String contentString = new String(responseEvent.getResponse().getRawContent()); String ssrcInResponse = SipUtils.getSsrcFromSdp(contentString); + logger.info("[点播收到回复OK] deviceId: {}, channelId: {}, 发流SSRC: {}", device.getDeviceId(), channelId, ssrcInResponse); // 兼容回复的消息中缺少ssrc(y字段)的情况 if (ssrcInResponse == null) { ssrcInResponse = ssrcInfo.getSsrc(); diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyResourceServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyResourceServiceImpl.java new file mode 100644 index 00000000..a1a14d79 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyResourceServiceImpl.java @@ -0,0 +1,45 @@ +package com.genersoft.iot.vmp.service.impl; + +import com.genersoft.iot.vmp.common.CommonGbChannel; +import com.genersoft.iot.vmp.service.IResourcePlayCallback; +import com.genersoft.iot.vmp.service.IResourceService; +import com.genersoft.iot.vmp.service.bean.CommonGbChannelType; +import org.springframework.stereotype.Service; + +@Service(CommonGbChannelType.PROXY) +public class StreamProxyResourceServiceImpl implements IResourceService { + @Override + public boolean deleteChannel(CommonGbChannel commonGbChannel) { + return false; + } + + @Override + public void startPlay(CommonGbChannel commonGbChannel, IResourcePlayCallback callback) { + + } + + @Override + public void stopPlay(CommonGbChannel commonGbChannel, IResourcePlayCallback callback) { + + } + + @Override + public boolean ptzControl(CommonGbChannel commonGbChannel, String command, Integer horizonSpeed, Integer verticalSpeed, Integer zoomSpeed) { + return false; + } + + @Override + public void streamOffline(String app, String streamId) { + + } + + @Override + public void startPlayback(CommonGbChannel channel, Long startTime, Long stopTime, IResourcePlayCallback callback) { + + } + + @Override + public void startDownload(CommonGbChannel channel, Long startTime, Long stopTime, Integer downloadSpeed, IResourcePlayCallback playCallback) { + + } +} diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushResourceServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushResourceServiceImpl.java new file mode 100644 index 00000000..de918b9d --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushResourceServiceImpl.java @@ -0,0 +1,45 @@ +package com.genersoft.iot.vmp.service.impl; + +import com.genersoft.iot.vmp.common.CommonGbChannel; +import com.genersoft.iot.vmp.service.IResourcePlayCallback; +import com.genersoft.iot.vmp.service.IResourceService; +import com.genersoft.iot.vmp.service.bean.CommonGbChannelType; +import org.springframework.stereotype.Service; + +@Service(CommonGbChannelType.PUSH) +public class StreamPushResourceServiceImpl implements IResourceService { + @Override + public boolean deleteChannel(CommonGbChannel commonGbChannel) { + return false; + } + + @Override + public void startPlay(CommonGbChannel commonGbChannel, IResourcePlayCallback callback) { + + } + + @Override + public void stopPlay(CommonGbChannel commonGbChannel, IResourcePlayCallback callback) { + + } + + @Override + public boolean ptzControl(CommonGbChannel commonGbChannel, String command, Integer horizonSpeed, Integer verticalSpeed, Integer zoomSpeed) { + return false; + } + + @Override + public void streamOffline(String app, String streamId) { + + } + + @Override + public void startPlayback(CommonGbChannel channel, Long startTime, Long stopTime, IResourcePlayCallback callback) { + + } + + @Override + public void startDownload(CommonGbChannel channel, Long startTime, Long stopTime, Integer downloadSpeed, IResourcePlayCallback playCallback) { + + } +} diff --git a/src/main/java/com/genersoft/iot/vmp/web/gb28181/ApiDeviceController.java b/src/main/java/com/genersoft/iot/vmp/web/gb28181/ApiDeviceController.java index aa241ebe..31f81f45 100644 --- a/src/main/java/com/genersoft/iot/vmp/web/gb28181/ApiDeviceController.java +++ b/src/main/java/com/genersoft/iot/vmp/web/gb28181/ApiDeviceController.java @@ -8,6 +8,7 @@ import com.genersoft.iot.vmp.gb28181.bean.PresetQuerySipReq; import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder; import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage; import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander; +import com.genersoft.iot.vmp.gb28181.utils.SipUtils; import com.genersoft.iot.vmp.service.IDeviceService; import com.genersoft.iot.vmp.storager.IVideoManagerStorage; import com.genersoft.iot.vmp.vmanager.bean.DeferredResultEx; @@ -193,15 +194,16 @@ public class ApiDeviceController { } Device device = storager.queryVideoDevice(serial); - String uuid = UUID.randomUUID().toString(); - String key = DeferredResultHolder.CALLBACK_CMD_PRESETQUERY + serial + code; + int sn = SipUtils.getNewSn(); + String msgId = sn + ""; + String key = DeferredResultHolder.CALLBACK_CMD_PRESETQUERY + sn; DeferredResult result = new DeferredResult<> (timeout * 1000L); DeferredResultEx deferredResultEx = new DeferredResultEx<>(result); result.onTimeout(()->{ logger.warn("<模拟接口> 获取设备预置位超时"); // 释放rtpserver RequestMessage msg = new RequestMessage(); - msg.setId(uuid); + msg.setId(msgId); msg.setKey(key); msg.setData("wait for presetquery timeout["+timeout+"s]"); resultHolder.invokeResult(msg); @@ -228,12 +230,12 @@ public class ApiDeviceController { return resultMap; }); - resultHolder.put(key, uuid, deferredResultEx); + resultHolder.put(key, msgId, deferredResultEx); try { - cmder.presetQuery(device, code, event -> { + cmder.presetQuery(device, code, sn, event -> { RequestMessage msg = new RequestMessage(); - msg.setId(uuid); + msg.setId(msgId); msg.setKey(key); msg.setData(String.format("获取设备预置位失败,错误码: %s, %s", event.statusCode, event.msg)); resultHolder.invokeResult(msg);