diff --git a/src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java b/src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java index 47e51baa..3db903bb 100644 --- a/src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java +++ b/src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java @@ -69,6 +69,7 @@ public class VideoManagerConstants { public static final String SYSTEM_INFO_NET_PREFIX = "VMP_SYSTEM_INFO_NET_"; public static final String SYSTEM_INFO_DISK_PREFIX = "VMP_SYSTEM_INFO_DISK_"; + public static final String BROADCAST_WAITE_INVITE = "task_broadcast_waite_invite_"; diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/session/AudioBroadcastManager.java b/src/main/java/com/genersoft/iot/vmp/gb28181/session/AudioBroadcastManager.java index 7186fad7..072d0cbc 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/session/AudioBroadcastManager.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/session/AudioBroadcastManager.java @@ -23,10 +23,6 @@ public class AudioBroadcastManager { public static Map data = new ConcurrentHashMap<>(); - public void add(AudioBroadcastCatch audioBroadcastCatch) { - this.update(audioBroadcastCatch); - } - public void update(AudioBroadcastCatch audioBroadcastCatch) { if (SipUtils.isFrontEnd(audioBroadcastCatch.getDeviceId())) { data.put(audioBroadcastCatch.getDeviceId(), audioBroadcastCatch); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/callback/DeferredResultHolder.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/callback/DeferredResultHolder.java index a351445e..43d41865 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/callback/DeferredResultHolder.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/callback/DeferredResultHolder.java @@ -49,8 +49,6 @@ public class DeferredResultHolder { public static final String CALLBACK_CMD_ALARM = "CALLBACK_ALARM"; - public static final String CALLBACK_CMD_BROADCAST = "CALLBACK_BROADCAST"; - private Map> map = new ConcurrentHashMap<>(); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java index f33f5df1..8f306413 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java @@ -1,6 +1,7 @@ package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl; import com.alibaba.fastjson2.JSONObject; +import com.genersoft.iot.vmp.common.VideoManagerConstants; import com.genersoft.iot.vmp.conf.DynamicTask; import com.genersoft.iot.vmp.conf.SipConfig; import com.genersoft.iot.vmp.conf.UserSetting; @@ -914,11 +915,14 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements } if (device != null) { logger.info("收到设备" + requesterId + "的语音广播Invite请求"); - + String key = VideoManagerConstants.BROADCAST_WAITE_INVITE + request.getCallIdHeader().getCallId(); + dynamicTask.stop(key); try { responseAck(request, Response.TRYING); } catch (SipException | InvalidArgumentException | ParseException e) { logger.error("[命令发送失败] invite BAD_REQUEST: {}", e.getMessage()); + playService.stopAudioBroadcast(device.getDeviceId(), audioBroadcastCatch.getChannelId()); + return; } String contentString = new String(request.getRawContent()); // jainSip不支持y=字段, 移除移除以解析。 @@ -973,6 +977,8 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements responseAck(request, Response.UNSUPPORTED_MEDIA_TYPE); // 不支持的格式,发415 } catch (SipException | InvalidArgumentException | ParseException e) { logger.error("[命令发送失败] invite 不支持的媒体格式: {}", e.getMessage()); + playService.stopAudioBroadcast(device.getDeviceId(), audioBroadcastCatch.getChannelId()); + return; } return; } @@ -987,6 +993,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements responseAck(request, Response.BUSY_HERE); } catch (SipException | InvalidArgumentException | ParseException e) { logger.error("[命令发送失败] invite 未找到可用的zlm: {}", e.getMessage()); + playService.stopAudioBroadcast(device.getDeviceId(), audioBroadcastCatch.getChannelId()); } return; } @@ -1000,6 +1007,8 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements responseAck(request, Response.BUSY_HERE); } catch (SipException | InvalidArgumentException | ParseException e) { logger.error("[命令发送失败] invite 服务器端口资源不足: {}", e.getMessage()); + playService.stopAudioBroadcast(device.getDeviceId(), audioBroadcastCatch.getChannelId()); + return; } return; } @@ -1034,11 +1043,13 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements responseAck(request, Response.GONE); } catch (SipException | InvalidArgumentException | ParseException e) { logger.error("[命令发送失败] 语音通话 回复410失败, {}", e.getMessage()); + return; } playService.stopAudioBroadcast(device.getDeviceId(), audioBroadcastCatch.getChannelId()); } } catch (SdpException e) { logger.error("[SDP解析异常]", e); + playService.stopAudioBroadcast(device.getDeviceId(), audioBroadcastCatch.getChannelId()); } } else { logger.warn("来自无效设备/平台的请求"); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/BroadcastResponseMessageHandler.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/BroadcastResponseMessageHandler.java index 56fb7894..7306ce38 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/BroadcastResponseMessageHandler.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/BroadcastResponseMessageHandler.java @@ -1,17 +1,16 @@ package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.response.cmd; -import com.alibaba.fastjson2.JSONObject; +import com.genersoft.iot.vmp.common.VideoManagerConstants; +import com.genersoft.iot.vmp.conf.DynamicTask; import com.genersoft.iot.vmp.gb28181.bean.AudioBroadcastCatch; import com.genersoft.iot.vmp.gb28181.bean.AudioBroadcastCatchStatus; import com.genersoft.iot.vmp.gb28181.bean.Device; import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform; import com.genersoft.iot.vmp.gb28181.session.AudioBroadcastManager; -import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder; -import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage; import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent; import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.IMessageHandler; import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.response.ResponseMessageHandler; -import com.genersoft.iot.vmp.gb28181.utils.XmlUtil; +import com.genersoft.iot.vmp.service.IPlayService; import gov.nist.javax.sip.message.SIPRequest; import org.dom4j.Element; import org.slf4j.Logger; @@ -38,11 +37,14 @@ public class BroadcastResponseMessageHandler extends SIPRequestProcessorParent i private ResponseMessageHandler responseMessageHandler; @Autowired - private DeferredResultHolder deferredResultHolder; + private DynamicTask dynamicTask; @Autowired private AudioBroadcastManager audioBroadcastManager; + @Autowired + private IPlayService playService; + @Override public void afterPropertiesSet() throws Exception { responseMessageHandler.addHandler(cmdType, this); @@ -50,33 +52,33 @@ public class BroadcastResponseMessageHandler extends SIPRequestProcessorParent i @Override public void handForDevice(RequestEvent evt, Device device, Element rootElement) { + + String channelId = getText(rootElement, "DeviceID"); + SIPRequest request = (SIPRequest) evt.getRequest(); try { - String channelId = getText(rootElement, "DeviceID"); - String key = DeferredResultHolder.CALLBACK_CMD_BROADCAST + device.getDeviceId() + channelId; - - // 此处是对本平台发出Broadcast指令的应答 - JSONObject json = new JSONObject(); - XmlUtil.node2Json(rootElement, json); - if (logger.isDebugEnabled()) { - logger.debug(json.toJSONString()); - } - RequestMessage msg = new RequestMessage(); - msg.setKey(key); - msg.setData(json); - deferredResultHolder.invokeAllResult(msg); - - if (!audioBroadcastManager.exit(device.getDeviceId(), channelId)) { // 回复410 responseAck((SIPRequest) evt.getRequest(), Response.GONE); return; } - logger.info("收到语音广播的回复:{}/{}", device.getDeviceId(), channelId ); - AudioBroadcastCatch audioBroadcastCatch = audioBroadcastManager.get(device.getDeviceId(), channelId); - audioBroadcastCatch.setStatus(AudioBroadcastCatchStatus.WaiteInvite); - audioBroadcastManager.update(audioBroadcastCatch); + String result = getText(rootElement, "Result"); + logger.info("[语音广播]回复:{}, {}/{}", result, device.getDeviceId(), channelId ); + // 回复200 OK - responseAck((SIPRequest) evt.getRequest(), Response.OK); + responseAck(request, Response.OK); + if (result.equalsIgnoreCase("OK")) { + AudioBroadcastCatch audioBroadcastCatch = audioBroadcastManager.get(device.getDeviceId(), channelId); + audioBroadcastCatch.setStatus(AudioBroadcastCatchStatus.WaiteInvite); + audioBroadcastManager.update(audioBroadcastCatch); + // 等待invite消息, 超时则结束 + String key = VideoManagerConstants.BROADCAST_WAITE_INVITE + request.getCallIdHeader().getCallId(); + dynamicTask.startDelay(key, ()->{ + logger.info("[语音广播]等待invite消息超时:{}/{}", device.getDeviceId(), channelId); + playService.stopAudioBroadcast(device.getDeviceId(), channelId); + }, 2000); + }else { + playService.stopAudioBroadcast(device.getDeviceId(), channelId); + } } catch (ParseException | SipException | InvalidArgumentException e) { logger.error("[命令发送失败] 国标级联 语音喊话: {}", e.getMessage()); } diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java index d4328a7c..31c71e70 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java @@ -1045,8 +1045,7 @@ public class PlayServiceImpl implements IPlayService { event.call("语音广播已经开启"); return; } else { - audioBroadcastManager.del(deviceChannel.getDeviceId(), channelId); - redisCatchStorage.deleteSendRTPServer(device.getDeviceId(), channelId, sendRtpItem.getCallId(), sendRtpItem.getStreamId()); + stopAudioBroadcast(device.getDeviceId(), channelId); } } } @@ -1055,7 +1054,7 @@ public class PlayServiceImpl implements IPlayService { cmder.audioBroadcastCmd(device, channelId, eventResultForOk -> { // 发送成功 AudioBroadcastCatch audioBroadcastCatch = new AudioBroadcastCatch(device.getDeviceId(), channelId, AudioBroadcastCatchStatus.Ready); - audioBroadcastManager.add(audioBroadcastCatch); + audioBroadcastManager.update(audioBroadcastCatch); }, eventResultForError -> { // 发送失败 logger.error("语音广播发送失败: {}:{}", channelId, eventResultForError.msg);