diff --git a/src/main/java/com/genersoft/iot/vmp/conf/UserSetting.java b/src/main/java/com/genersoft/iot/vmp/conf/UserSetting.java index eae96b9a2..562c86407 100644 --- a/src/main/java/com/genersoft/iot/vmp/conf/UserSetting.java +++ b/src/main/java/com/genersoft/iot/vmp/conf/UserSetting.java @@ -47,8 +47,6 @@ public class UserSetting { private Boolean syncChannelOnDeviceOnline = Boolean.FALSE; - private Boolean pushStreamAfterAck = Boolean.FALSE; - private Boolean sipLog = Boolean.FALSE; private Boolean sqlLog = Boolean.FALSE; private Boolean sendToPlatformsWhenIdLost = Boolean.FALSE; @@ -234,14 +232,6 @@ public class UserSetting { this.broadcastForPlatform = broadcastForPlatform; } - public Boolean getPushStreamAfterAck() { - return pushStreamAfterAck; - } - - public void setPushStreamAfterAck(Boolean pushStreamAfterAck) { - this.pushStreamAfterAck = pushStreamAfterAck; - } - public Boolean getSipUseSourceIpAsRemoteAddress() { return sipUseSourceIpAsRemoteAddress; } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/Device.java b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/Device.java index 1318c59c5..60f5cf699 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/Device.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/Device.java @@ -188,8 +188,8 @@ public class Device { @Schema(description = "设备注册的事务信息") private SipTransactionInfo sipTransactionInfo; - - + @Schema(description = "控制语音对讲流程,释放收到ACK后发流") + private boolean broadcastPushAfterAck; public String getDeviceId() { return deviceId; @@ -465,4 +465,11 @@ public class Device { /*======================设备主子码流逻辑END=========================*/ + public boolean isBroadcastPushAfterAck() { + return broadcastPushAfterAck; + } + + public void setBroadcastPushAfterAck(boolean broadcastPushAfterAck) { + this.broadcastPushAfterAck = broadcastPushAfterAck; + } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java index cc1f0c0cd..7ca52efec 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java @@ -3,6 +3,7 @@ package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl; import com.alibaba.fastjson2.JSONObject; import com.genersoft.iot.vmp.conf.DynamicTask; import com.genersoft.iot.vmp.conf.UserSetting; +import com.genersoft.iot.vmp.gb28181.bean.Device; import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform; import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem; import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver; @@ -10,9 +11,8 @@ import com.genersoft.iot.vmp.gb28181.transmit.event.request.ISIPRequestProcessor import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent; import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory; import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe; -import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory; -import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForRtpServerTimeout; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; +import com.genersoft.iot.vmp.service.IDeviceService; import com.genersoft.iot.vmp.service.IMediaServerService; import com.genersoft.iot.vmp.service.IPlayService; import com.genersoft.iot.vmp.service.bean.RequestPushStreamMsg; @@ -62,6 +62,9 @@ public class AckRequestProcessor extends SIPRequestProcessorParent implements In @Autowired private IVideoManagerStorage storager; + @Autowired + private IDeviceService deviceService; + @Autowired private ZLMRTPServerFactory zlmrtpServerFactory; @@ -87,40 +90,23 @@ public class AckRequestProcessor extends SIPRequestProcessorParent implements In @Override public void process(RequestEvent evt) { CallIdHeader callIdHeader = (CallIdHeader)evt.getRequest().getHeader(CallIdHeader.NAME); + String fromUserId = ((SipURI) ((HeaderAddress) evt.getRequest().getHeader(FromHeader.NAME)).getAddress().getURI()).getUser(); + String toUserId = ((SipURI) ((HeaderAddress) evt.getRequest().getHeader(ToHeader.NAME)).getAddress().getURI()).getUser(); + logger.info("[收到ACK]: 来自->{}", fromUserId); + SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(null, null, null, callIdHeader.getCallId()); + if (sendRtpItem == null) { + logger.warn("[收到ACK]:未找到来自{},目标为({})的推流信息",fromUserId, toUserId); + return; + } + logger.info("[收到ACK]:rtp/{}开始级推流, 目标={}:{},SSRC={}, RTCP={}", sendRtpItem.getStream(), + sendRtpItem.getIp(), sendRtpItem.getPort(), sendRtpItem.getSsrc(), sendRtpItem.isRtcp()); + // 取消设置的超时任务 + dynamicTask.stop(callIdHeader.getCallId()); + MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId()); + ParentPlatform parentPlatform = storager.queryParentPlatByServerGBId(fromUserId); - String platformGbId = ((SipURI) ((HeaderAddress) evt.getRequest().getHeader(FromHeader.NAME)).getAddress().getURI()).getUser(); - logger.info("[收到ACK]: platformGbId->{}", platformGbId); - if (userSetting.getPushStreamAfterAck()) { - ParentPlatform parentPlatform = storager.queryParentPlatByServerGBId(platformGbId); - // 取消设置的超时任务 - dynamicTask.stop(callIdHeader.getCallId()); - String channelId = ((SipURI) ((HeaderAddress) evt.getRequest().getHeader(ToHeader.NAME)).getAddress().getURI()).getUser(); - SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(null, null, null, callIdHeader.getCallId()); - if (sendRtpItem == null) { - logger.warn("[收到ACK]:未找到通道({})的推流信息", channelId); - return; - } - String isUdp = sendRtpItem.isTcp() ? "0" : "1"; - MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId()); - logger.info("收到ACK,rtp/{}开始级推流, 目标={}:{},SSRC={}, RTCP={}", sendRtpItem.getStream(), - sendRtpItem.getIp(), sendRtpItem.getPort(), sendRtpItem.getSsrc(), sendRtpItem.isRtcp()); - Map param = new HashMap<>(12); - param.put("vhost","__defaultVhost__"); - param.put("app",sendRtpItem.getApp()); - param.put("stream",sendRtpItem.getStream()); - param.put("ssrc", sendRtpItem.getSsrc()); - param.put("dst_url",sendRtpItem.getIp()); - param.put("dst_port", sendRtpItem.getPort()); - param.put("src_port", sendRtpItem.getLocalPort()); - param.put("pt", sendRtpItem.getPt()); - param.put("use_ps", sendRtpItem.isUsePs() ? "1" : "0"); - param.put("only_audio", sendRtpItem.isOnlyAudio() ? "1" : "0"); - param.put("is_udp", isUdp); - if (!sendRtpItem.isTcp()) { - // udp模式下开启rtcp保活 - param.put("udp_rtcp_timeout", sendRtpItem.isRtcp()? "1":"0"); - } - + if (parentPlatform != null) { + Map param = getSendRtpParam(sendRtpItem); if (mediaInfo == null) { RequestPushStreamMsg requestPushStreamMsg = RequestPushStreamMsg.getInstance( sendRtpItem.getMediaServerId(), sendRtpItem.getApp(), sendRtpItem.getStream(), @@ -130,30 +116,75 @@ public class AckRequestProcessor extends SIPRequestProcessorParent implements In playService.startSendRtpStreamHand(sendRtpItem, parentPlatform, json, param, callIdHeader); }); } else { - // 如果是非严格模式,需要关闭端口占用 - JSONObject startSendRtpStreamResult = null; - if (sendRtpItem.getLocalPort() != 0) { - if (sendRtpItem.isTcpActive()) { - startSendRtpStreamResult = zlmrtpServerFactory.startSendRtpPassive(mediaInfo, param); - }else { - param.put("dst_url", sendRtpItem.getIp()); - param.put("dst_port", sendRtpItem.getPort()); - startSendRtpStreamResult = zlmrtpServerFactory.startSendRtpStream(mediaInfo, param); - } - }else { - if (sendRtpItem.isTcpActive()) { - startSendRtpStreamResult = zlmrtpServerFactory.startSendRtpPassive(mediaInfo, param); - }else { - param.put("dst_url", sendRtpItem.getIp()); - param.put("dst_port", sendRtpItem.getPort()); - startSendRtpStreamResult = zlmrtpServerFactory.startSendRtpStream(mediaInfo, param); - } - } + JSONObject startSendRtpStreamResult = sendRtp(sendRtpItem, mediaInfo, param); if (startSendRtpStreamResult != null) { playService.startSendRtpStreamHand(sendRtpItem, parentPlatform, startSendRtpStreamResult, param, callIdHeader); } } + }else { + Device device = deviceService.getDevice(fromUserId); + if (device == null) { + logger.warn("[收到ACK]:来自{},目标为({})的推流信息为找到流体服务[{}]信息",fromUserId, toUserId, sendRtpItem.getMediaServerId()); + return; + } + // 设置为收到ACK后发送语音的设备已经在发送200OK开始发流了 + if (!device.isBroadcastPushAfterAck()) { + return; + } + if (mediaInfo == null) { + logger.warn("[收到ACK]:来自{},目标为({})的推流信息为找到流体服务[{}]信息",fromUserId, toUserId, sendRtpItem.getMediaServerId()); + return; + } + Map param = getSendRtpParam(sendRtpItem); + JSONObject startSendRtpStreamResult = sendRtp(sendRtpItem, mediaInfo, param); + if (startSendRtpStreamResult != null) { + playService.startSendRtpStreamHand(sendRtpItem, device, startSendRtpStreamResult, param, callIdHeader); + } } } + private Map getSendRtpParam(SendRtpItem sendRtpItem) { + String isUdp = sendRtpItem.isTcp() ? "0" : "1"; + Map param = new HashMap<>(12); + param.put("vhost","__defaultVhost__"); + param.put("app",sendRtpItem.getApp()); + param.put("stream",sendRtpItem.getStream()); + param.put("ssrc", sendRtpItem.getSsrc()); + param.put("dst_url",sendRtpItem.getIp()); + param.put("dst_port", sendRtpItem.getPort()); + param.put("src_port", sendRtpItem.getLocalPort()); + param.put("pt", sendRtpItem.getPt()); + param.put("use_ps", sendRtpItem.isUsePs() ? "1" : "0"); + param.put("only_audio", sendRtpItem.isOnlyAudio() ? "1" : "0"); + param.put("is_udp", isUdp); + if (!sendRtpItem.isTcp()) { + // udp模式下开启rtcp保活 + param.put("udp_rtcp_timeout", sendRtpItem.isRtcp()? "1":"0"); + } + return param; + } + + private JSONObject sendRtp(SendRtpItem sendRtpItem, MediaServerItem mediaInfo, Map param){ + JSONObject startSendRtpStreamResult = null; + if (sendRtpItem.getLocalPort() != 0) { + if (sendRtpItem.isTcpActive()) { + startSendRtpStreamResult = zlmrtpServerFactory.startSendRtpPassive(mediaInfo, param); + }else { + param.put("dst_url", sendRtpItem.getIp()); + param.put("dst_port", sendRtpItem.getPort()); + startSendRtpStreamResult = zlmrtpServerFactory.startSendRtpStream(mediaInfo, param); + } + }else { + if (sendRtpItem.isTcpActive()) { + startSendRtpStreamResult = zlmrtpServerFactory.startSendRtpPassive(mediaInfo, param); + }else { + param.put("dst_url", sendRtpItem.getIp()); + param.put("dst_port", sendRtpItem.getPort()); + startSendRtpStreamResult = zlmrtpServerFactory.startSendRtpStream(mediaInfo, param); + } + } + return startSendRtpStreamResult; + + } + } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java index 423a3b454..7594b4822 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java @@ -427,23 +427,18 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements try { // 超时未收到Ack应该回复bye,当前等待时间为10秒 - if (userSetting.getPushStreamAfterAck()) { - dynamicTask.startDelay(callIdHeader.getCallId(), () -> { - logger.info("Ack 等待超时"); - mediaServerService.releaseSsrc(mediaServerItemInUSe.getId(), sendRtpItem.getSsrc()); - // 回复bye - try { - cmderFroPlatform.streamByeCmd(platform, callIdHeader.getCallId()); - } catch (SipException | InvalidArgumentException | ParseException e) { - logger.error("[命令发送失败] 国标级联 发送BYE: {}", e.getMessage()); - } - }, 60 * 1000); - } + dynamicTask.startDelay(callIdHeader.getCallId(), () -> { + logger.info("Ack 等待超时"); + mediaServerService.releaseSsrc(mediaServerItemInUSe.getId(), sendRtpItem.getSsrc()); + // 回复bye + try { + cmderFroPlatform.streamByeCmd(platform, callIdHeader.getCallId()); + } catch (SipException | InvalidArgumentException | ParseException e) { + logger.error("[命令发送失败] 国标级联 发送BYE: {}", e.getMessage()); + } + }, 60 * 1000); - SIPResponse sipResponse = responseSdpAck(request, content.toString(), platform); - if (!userSetting.getPushStreamAfterAck()) { - playService.startPushStream(sendRtpItem, sipResponse, platform, request.getCallIdHeader()); - } + responseSdpAck(request, content.toString(), platform); } catch (SipException | InvalidArgumentException | ParseException e) { logger.error("[命令发送失败] 国标级联 回复SdpAck", e); } @@ -650,7 +645,6 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements if (response != null) { sendRtpItem.setToTag(response.getToTag()); } - redisCatchStorage.updateSendRTPSever(sendRtpItem); } else { @@ -888,16 +882,8 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements content.append("f=\r\n"); try { - SIPResponse sipResponse = responseSdpAck(request, content.toString(), platform); - if (!userSetting.getPushStreamAfterAck()) { - playService.startPushStream(sendRtpItem, sipResponse, platform, request.getCallIdHeader()); - } - return sipResponse; - } catch (SipException e) { - logger.error("未处理的异常 ", e); - } catch (InvalidArgumentException e) { - logger.error("未处理的异常 ", e); - } catch (ParseException e) { + return responseSdpAck(request, content.toString(), platform); + } catch (SipException | InvalidArgumentException | ParseException e) { logger.error("未处理的异常 ", e); } return null; @@ -1132,7 +1118,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements audioBroadcastManager.update(audioBroadcastCatch); // 开启发流,大华在收到200OK后就会开始建立连接 - if (!userSetting.getPushStreamAfterAck()) { + if (!device.isBroadcastPushAfterAck()) { playService.startPushStream(sendRtpItem, sipResponse, parentPlatform, request.getCallIdHeader()); } diff --git a/src/main/java/com/genersoft/iot/vmp/service/IPlayService.java b/src/main/java/com/genersoft/iot/vmp/service/IPlayService.java index 44bf11b11..7725e1bc8 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/IPlayService.java +++ b/src/main/java/com/genersoft/iot/vmp/service/IPlayService.java @@ -64,7 +64,7 @@ public interface IPlayService { void startPushStream(SendRtpItem sendRtpItem, SIPResponse sipResponse, ParentPlatform platform, CallIdHeader callIdHeader); - void startSendRtpStreamHand(SendRtpItem sendRtpItem, ParentPlatform parentPlatform, + void startSendRtpStreamHand(SendRtpItem sendRtpItem, Object correlationInfo, JSONObject jsonObject, Map param, CallIdHeader callIdHeader); void talkCmd(Device device, String channelId, MediaServerItem mediaServerItem, String stream, AudioBroadcastEvent event); diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java index 71e9b65b7..ea998e9d4 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java @@ -1481,7 +1481,7 @@ public class PlayServiceImpl implements IPlayService { } @Override - public void startSendRtpStreamHand(SendRtpItem sendRtpItem, ParentPlatform parentPlatform, + public void startSendRtpStreamHand(SendRtpItem sendRtpItem, Object correlationInfo, JSONObject jsonObject, Map param, CallIdHeader callIdHeader) { if (jsonObject == null) { logger.error("RTP推流失败: 请检查ZLM服务"); @@ -1504,10 +1504,13 @@ public class PlayServiceImpl implements IPlayService { } } else { // 向上级平台 - try { - commanderForPlatform.streamByeCmd(parentPlatform, callIdHeader.getCallId()); - } catch (SipException | InvalidArgumentException | ParseException e) { - logger.error("[命令发送失败] 国标级联 发送BYE: {}", e.getMessage()); + if (correlationInfo instanceof ParentPlatform) { + try { + ParentPlatform parentPlatform = (ParentPlatform)correlationInfo; + commanderForPlatform.streamByeCmd(parentPlatform, callIdHeader.getCallId()); + } catch (SipException | InvalidArgumentException | ParseException e) { + logger.error("[命令发送失败] 国标级联 发送BYE: {}", e.getMessage()); + } } } } diff --git a/src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceMapper.java b/src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceMapper.java index 96773fe95..e2497a79b 100644 --- a/src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceMapper.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceMapper.java @@ -43,6 +43,7 @@ public interface DeviceMapper { "on_line," + "media_server_id," + "switch_primary_sub_stream," + + "broadcast_push_after_ack," + "(SELECT count(0) FROM wvp_device_channel WHERE device_id=wvp_device.device_id) as channel_count "+ " FROM wvp_device WHERE device_id = #{deviceId}") Device getDeviceByDeviceId(String deviceId); @@ -73,6 +74,7 @@ public interface DeviceMapper { "subscribe_cycle_for_alarm,"+ "ssrc_check,"+ "as_message_channel,"+ + "broadcast_push_after_ack,"+ "geo_coord_sys,"+ "on_line"+ ") VALUES (" + @@ -101,6 +103,7 @@ public interface DeviceMapper { "#{subscribeCycleForAlarm}," + "#{ssrcCheck}," + "#{asMessageChannel}," + + "#{broadcastPushAfterAck}," + "#{geoCoordSys}," + "#{onLine}" + ")") @@ -155,6 +158,7 @@ public interface DeviceMapper { "subscribe_cycle_for_alarm,"+ "ssrc_check,"+ "as_message_channel,"+ + "broadcast_push_after_ack,"+ "geo_coord_sys,"+ "on_line,"+ "media_server_id,"+ @@ -196,6 +200,7 @@ public interface DeviceMapper { "subscribe_cycle_for_alarm,"+ "ssrc_check,"+ "as_message_channel,"+ + "broadcast_push_after_ack,"+ "geo_coord_sys,"+ "on_line"+ " FROM wvp_device WHERE on_line = true") @@ -226,6 +231,7 @@ public interface DeviceMapper { "subscribe_cycle_for_alarm,"+ "ssrc_check,"+ "as_message_channel,"+ + "broadcast_push_after_ack,"+ "geo_coord_sys,"+ "on_line"+ " FROM wvp_device WHERE ip = #{host} AND port=#{port}") @@ -247,6 +253,7 @@ public interface DeviceMapper { ", subscribe_cycle_for_alarm=#{subscribeCycleForAlarm}" + ", ssrc_check=#{ssrcCheck}" + ", as_message_channel=#{asMessageChannel}" + + ", broadcast_push_after_ack=#{broadcastPushAfterAck}" + ", geo_coord_sys=#{geoCoordSys}" + ", switch_primary_sub_stream=#{switchPrimarySubStream}" + ", media_server_id=#{mediaServerId}" + @@ -264,6 +271,7 @@ public interface DeviceMapper { "charset,"+ "ssrc_check,"+ "as_message_channel,"+ + "broadcastPushAfterAck,"+ "geo_coord_sys,"+ "on_line,"+ "media_server_id,"+ @@ -278,6 +286,7 @@ public interface DeviceMapper { "#{charset}," + "#{ssrcCheck}," + "#{asMessageChannel}," + + "#{broadcastPushAfterAck}," + "#{geoCoordSys}," + "#{onLine}," + "#{mediaServerId}," + diff --git a/src/main/resources/local.jks b/src/main/resources/local.jks new file mode 100644 index 000000000..529be6b23 Binary files /dev/null and b/src/main/resources/local.jks differ diff --git a/web_src/src/components/dialog/deviceEdit.vue b/web_src/src/components/dialog/deviceEdit.vue index 499d6ca25..7cd357925 100644 --- a/web_src/src/components/dialog/deviceEdit.vue +++ b/web_src/src/components/dialog/deviceEdit.vue @@ -70,6 +70,7 @@ +