diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/AudioBroadcastCatch.java b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/AudioBroadcastCatch.java index 64227d40..93a0d162 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/AudioBroadcastCatch.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/AudioBroadcastCatch.java @@ -1,6 +1,11 @@ package com.genersoft.iot.vmp.gb28181.bean; +import gov.nist.javax.sip.message.SIPRequest; +import gov.nist.javax.sip.stack.SIPDialog; + +import javax.sip.Dialog; + /** * 缓存语音广播的状态 * @author lin @@ -32,6 +37,16 @@ public class AudioBroadcastCatch { */ private AudioBroadcastCatchStatus status; + /** + * 请求信息 + */ + private SIPRequest request; + + /** + * 会话信息 + */ + private SIPDialog dialog; + public String getDeviceId() { return deviceId; @@ -56,4 +71,20 @@ public class AudioBroadcastCatch { public void setStatus(AudioBroadcastCatchStatus status) { this.status = status; } + + public void setDialog(SIPDialog dialog) { + this.dialog = dialog; + } + + public SIPDialog getDialog() { + return dialog; + } + + public SIPRequest getRequest() { + return request; + } + + public void setRequest(SIPRequest request) { + this.request = request; + } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/session/AudioBroadcastManager.java b/src/main/java/com/genersoft/iot/vmp/gb28181/session/AudioBroadcastManager.java index dec96c00..d3ddf527 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/session/AudioBroadcastManager.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/session/AudioBroadcastManager.java @@ -1,13 +1,14 @@ package com.genersoft.iot.vmp.gb28181.session; +import com.genersoft.iot.vmp.conf.SipConfig; import com.genersoft.iot.vmp.gb28181.bean.AudioBroadcastCatch; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; -import java.util.Map; +import java.util.*; import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; +import java.util.stream.Stream; /** * 语音广播消息管理类 @@ -16,6 +17,9 @@ import java.util.concurrent.ConcurrentHashMap; @Component public class AudioBroadcastManager { + @Autowired + private SipConfig config; + public static Map data = new ConcurrentHashMap<>(); public void add(AudioBroadcastCatch audioBroadcastCatch) { @@ -54,6 +58,16 @@ public class AudioBroadcastManager { } public AudioBroadcastCatch get(String deviceId, String channelId) { - return data.get(deviceId + channelId); + AudioBroadcastCatch audioBroadcastCatch = data.get(deviceId + channelId); + if (audioBroadcastCatch == null) { + Stream allAudioBroadcastCatchStreamForDevice = data.values().stream().filter( + audioBroadcastCatchItem -> Objects.equals(audioBroadcastCatchItem.getDeviceId(), deviceId)); + List audioBroadcastCatchList = allAudioBroadcastCatchStreamForDevice.collect(Collectors.toList()); + if (audioBroadcastCatchList.size() == 1 && Objects.equals(config.getId(), channelId)) { + audioBroadcastCatch = audioBroadcastCatchList.get(0); + } + } + + return audioBroadcastCatch; } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java index af61ed3f..6cc19a7f 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java @@ -18,6 +18,8 @@ import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import com.genersoft.iot.vmp.service.IMediaServerService; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.IVideoManagerStorage; +import gov.nist.javax.sip.message.SIPRequest; +import gov.nist.javax.sip.stack.SIPDialog; import org.ehcache.shadow.org.terracotta.offheapstore.storage.IntegerStorageEngine; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -28,15 +30,18 @@ import org.springframework.stereotype.Component; import javax.sip.Dialog; import javax.sip.DialogState; import javax.sip.RequestEvent; +import javax.sip.SipException; import javax.sip.address.SipURI; import javax.sip.header.CallIdHeader; import javax.sip.header.FromHeader; import javax.sip.header.HeaderAddress; import javax.sip.header.ToHeader; +import java.text.ParseException; import java.util.*; /** * SIP命令类型: ACK请求 + * @author lin */ @Component public class AckRequestProcessor extends SIPRequestProcessorParent implements InitializingBean, ISIPRequestProcessor { @@ -96,8 +101,8 @@ public class AckRequestProcessor extends SIPRequestProcessorParent implements In ParentPlatform parentPlatform = storager.queryParentPlatByServerGBId(platformGbId); // 取消设置的超时任务 dynamicTask.stop(callIdHeader.getCallId()); - String channelId = ((SipURI) ((HeaderAddress) evt.getRequest().getHeader(ToHeader.NAME)).getAddress().getURI()).getUser(); - SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(platformGbId, channelId, null, callIdHeader.getCallId()); +// String channelId = ((SipURI) ((HeaderAddress) evt.getRequest().getHeader(ToHeader.NAME)).getAddress().getURI()).getUser(); + SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(platformGbId, null, null, callIdHeader.getCallId()); String is_Udp = sendRtpItem.isTcp() ? "0" : "1"; MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId()); logger.info("收到ACK,开始向上级推流 rtp/{}", sendRtpItem.getStreamId()); @@ -121,7 +126,14 @@ public class AckRequestProcessor extends SIPRequestProcessorParent implements In } else { logger.error("RTP推流失败: {}, 参数:{}",jsonObject.getString("msg"),JSONObject.toJSON(param)); if (sendRtpItem.isOnlyAudio()) { - // TODO 可能是语音对讲 + // 语音对讲 + try { + cmder.streamByeCmd((SIPDialog) evt.getDialog(), (SIPRequest)evt.getRequest(), null); + } catch (SipException e) { + throw new RuntimeException(e); + } catch (ParseException e) { + throw new RuntimeException(e); + } }else { // 向上级平台 commanderForPlatform.streamByeCmd(parentPlatform, callIdHeader.getCallId()); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java index 75318093..c6116e3f 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java @@ -13,6 +13,7 @@ import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorP import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import com.genersoft.iot.vmp.service.IMediaServerService; +import com.genersoft.iot.vmp.service.IPlayService; import com.genersoft.iot.vmp.service.bean.MessageForPushChannel; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.IVideoManagerStorage; @@ -65,6 +66,9 @@ public class ByeRequestProcessor extends SIPRequestProcessorParent implements In @Autowired private VideoStreamSessionManager streamSession; + @Autowired + private IPlayService playService; + @Override public void afterPropertiesSet() throws Exception { // 添加消息处理的订阅 @@ -106,6 +110,9 @@ public class ByeRequestProcessor extends SIPRequestProcessorParent implements In if (sendRtpItem.getPlayType().equals(InviteStreamType.PLAY)) { cmder.streamByeCmd(sendRtpItem.getDeviceId(), channelId, streamId, null); } + if (sendRtpItem.isOnlyAudio()) { + playService.stopAudioBroadcast(sendRtpItem.getDeviceId(), channelId); + } if (sendRtpItem.getPlayType().equals(InviteStreamType.PUSH)) { MessageForPushChannel messageForPushChannel = new MessageForPushChannel(); messageForPushChannel.setType(0); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java index 12bfd7fd..dc77afca 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java @@ -114,6 +114,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements private SipConfig config; + @Override public void afterPropertiesSet() throws Exception { // 添加消息处理的订阅 @@ -492,7 +493,6 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements gbStream.getApp(), gbStream.getStream(), channelId, mediaTransmissionTCP); - if (sendRtpItem == null) { logger.warn("服务器端口资源不足"); responseAck(evt, Response.BUSY_HERE); @@ -562,25 +562,16 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements } } - public void inviteFromDeviceHandle(RequestEvent evt, String requesterId, String channelId) throws InvalidArgumentException, ParseException, SipException, SdpException { + public void inviteFromDeviceHandle(RequestEvent evt, String requesterId, String channelId1) throws InvalidArgumentException, ParseException, SipException, SdpException { - // 兼容奇葩的海康这里使用的不是通道编号而是本平台编号 -// if (channelId.equals(config.getId())) { -// List all = audioBroadcastManager.getAll(); -// for (AudioBroadcastCatch audioBroadcastCatch : all) { -// if (audioBroadcastCatch.getDeviceId().equals(requesterId)) { -// channelId = audioBroadcastCatch.getChannelId(); -// } -// } -// } -// // 兼容失败 -// if (channelId.equals(config.getId())) { -// responseAck(evt, Response.BAD_REQUEST); -// return; -// } // 非上级平台请求,查询是否设备请求(通常为接收语音广播的设备) Device device = redisCatchStorage.getDevice(requesterId); - + AudioBroadcastCatch audioBroadcastCatch = audioBroadcastManager.get(requesterId, channelId1); + if (audioBroadcastCatch == null) { + logger.warn("来自设备的Invite请求非语音广播,已忽略"); + responseAck(evt, Response.FORBIDDEN); + return; + } Request request = evt.getRequest(); if (device != null) { logger.info("收到设备" + requesterId + "的语音广播Invite请求"); @@ -606,7 +597,6 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements // 查看是否支持PS 负载96 int port = -1; - //boolean recvonly = false; boolean mediaTransmissionTCP = false; Boolean tcpActive = null; for (int i = 0; i < mediaDescriptions.size(); i++) { @@ -638,7 +628,6 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements responseAck(evt, Response.UNSUPPORTED_MEDIA_TYPE); // 不支持的格式,发415 return; } - String sessionName = sdp.getSessionName().getValue(); String addressStr = sdp.getOrigin().getAddress(); logger.info("设备{}请求语音流,地址:{}:{},ssrc:{}", requesterId, addressStr, port, ssrc); @@ -649,20 +638,19 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements return; } SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(mediaServerItem, addressStr, port, ssrc, requesterId, - device.getDeviceId(), channelId, + device.getDeviceId(), audioBroadcastCatch.getChannelId(), mediaTransmissionTCP); - sendRtpItem.setTcp(mediaTransmissionTCP); - if (tcpActive != null) { - sendRtpItem.setTcpActive(tcpActive); - } if (sendRtpItem == null) { logger.warn("服务器端口资源不足"); responseAck(evt, Response.BUSY_HERE); return; } - + sendRtpItem.setTcp(mediaTransmissionTCP); + if (tcpActive != null) { + sendRtpItem.setTcpActive(tcpActive); + } String app = "broadcast"; - String stream = device.getDeviceId() + "_" + channelId; + String stream = device.getDeviceId() + "_" + audioBroadcastCatch.getChannelId(); CallIdHeader callIdHeader = (CallIdHeader) request.getHeader(CallIdHeader.NAME); sendRtpItem.setPlayType(InviteStreamType.PLAY); @@ -685,12 +673,9 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements subscribeKey.put("schema", "rtmp"); subscribeKey.put("mediaServerId", mediaServerItem.getId()); String finalSsrc = ssrc; - String waiteStreamTimeoutTaskKey = "waite-stream-" + device.getDeviceId() + channelId; - // 流已经存在时直接推流 if (zlmrtpServerFactory.isStreamReady(mediaServerItem, app, stream)) { logger.info("发现已经在推流"); - dynamicTask.stop(waiteStreamTimeoutTaskKey); sendRtpItem.setStatus(2); redisCatchStorage.updateSendRTPSever(sendRtpItem); StringBuffer content = new StringBuffer(200); @@ -711,6 +696,10 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements parentPlatform.setServerGBId(device.getDeviceId()); try { responseSdpAck(evt, content.toString(), parentPlatform); + Dialog dialog = evt.getDialog(); + audioBroadcastCatch.setDialog((SIPDialog) dialog); + audioBroadcastCatch.setRequest((SIPRequest) request); + audioBroadcastManager.update(audioBroadcastCatch); } catch (SipException e) { throw new RuntimeException(e); } catch (InvalidArgumentException e) { @@ -721,20 +710,17 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements }else { // 流不存在时监听流上线 // 设置等待推流的超时; 默认20s + String waiteStreamTimeoutTaskKey = "waite-stream-" + device.getDeviceId() + audioBroadcastCatch.getChannelId(); dynamicTask.startDelay(waiteStreamTimeoutTaskKey, ()->{ logger.info("等待推流超时: {}/{}", app, stream); - if (audioBroadcastManager.exit(device.getDeviceId(), channelId)) { - audioBroadcastManager.del(device.getDeviceId(), channelId); - }else { - // 兼容海康使用了错误的通道ID的情况 - audioBroadcastManager.delByDeviceId(device.getDeviceId()); - } - + playService.stopAudioBroadcast(device.getDeviceId(), audioBroadcastCatch.getChannelId()); // 发送bye try { - cmder.streamByeCmd((SIPDialog)evt.getServerTransaction().getDialog(), (SIPRequest) evt.getRequest(), null); + responseAck(evt, Response.BUSY_HERE); } catch (SipException e) { throw new RuntimeException(e); + } catch (InvalidArgumentException e) { + throw new RuntimeException(e); } catch (ParseException e) { throw new RuntimeException(e); } @@ -743,10 +729,11 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements subscribe.addSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, subscribeKey, (MediaServerItem mediaServerItemInUse, JSONObject json)->{ sendRtpItem.setStatus(2); + dynamicTask.stop(waiteStreamTimeoutTaskKey); redisCatchStorage.updateSendRTPSever(sendRtpItem); StringBuffer content = new StringBuffer(200); content.append("v=0\r\n"); - content.append("o="+ channelId +" 0 0 IN IP4 "+mediaServerItem.getSdpIp()+"\r\n"); + content.append("o="+ audioBroadcastCatch.getChannelId() +" 0 0 IN IP4 "+mediaServerItem.getSdpIp()+"\r\n"); content.append("s=Play\r\n"); content.append("c=IN IP4 "+mediaServerItem.getSdpIp()+"\r\n"); content.append("t=0 0\r\n"); @@ -771,8 +758,6 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements } }); } - String timeOutTaskKey = "audio-broadcast-" + device.getDeviceId() + channelId; - dynamicTask.stop(timeOutTaskKey); String key = DeferredResultHolder.CALLBACK_CMD_BROADCAST + device.getDeviceId(); WVPResult wvpResult = new WVPResult<>(); wvpResult.setCode(0); diff --git a/src/main/java/com/genersoft/iot/vmp/service/IPlayService.java b/src/main/java/com/genersoft/iot/vmp/service/IPlayService.java index 63005516..239d0c68 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/IPlayService.java +++ b/src/main/java/com/genersoft/iot/vmp/service/IPlayService.java @@ -43,4 +43,5 @@ public interface IPlayService { StreamInfo getDownLoadInfo(String deviceId, String channelId, String stream); void audioBroadcast(Device device, String channelId, int timeout, AudioBroadcastEvent event); + void stopAudioBroadcast(String deviceId, String channelId); } diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java index a647c84c..934745e5 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java @@ -5,6 +5,7 @@ import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; import com.genersoft.iot.vmp.common.StreamInfo; import com.genersoft.iot.vmp.conf.DynamicTask; +import com.genersoft.iot.vmp.conf.SipConfig; import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.gb28181.bean.*; import com.genersoft.iot.vmp.gb28181.event.SipSubscribe; @@ -44,9 +45,13 @@ import org.springframework.util.ResourceUtils; import org.springframework.web.context.request.async.DeferredResult; import javax.sip.ResponseEvent; +import javax.sip.SipException; import java.io.FileNotFoundException; import java.math.BigDecimal; +import java.text.ParseException; import java.util.*; +import java.util.stream.Collectors; +import java.util.stream.Stream; @SuppressWarnings(value = {"rawtypes", "unchecked"}) @Service @@ -93,6 +98,9 @@ public class PlayServiceImpl implements IPlayService { @Autowired private UserSetting userSetting; + @Autowired + private SipConfig sipConfig; + @Autowired private DynamicTask dynamicTask; @@ -641,16 +649,13 @@ public class PlayServiceImpl implements IPlayService { } // 查询通道使用状态 if (audioBroadcastManager.exit(device.getDeviceId(), channelId)) { - logger.warn("语音广播已经开启: {}", channelId); - event.call("语音广播已经开启"); - return; + SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(device.getDeviceId(), channelId, null, null); + if (sendRtpItem != null && sendRtpItem.isOnlyAudio()) { + logger.warn("语音广播已经开启: {}", channelId); + event.call("语音广播已经开启"); + return; + } } - String timeOutTaskKey = "audio-broadcast-" + device.getDeviceId() + channelId; - dynamicTask.startDelay(timeOutTaskKey, ()->{ - logger.error("语音广播发送超时: {}:{}", device.getDeviceId(), channelId); - event.call("语音广播发送超时"); - audioBroadcastManager.del(device.getDeviceId(), channelId); - }, timeout * 1000); // 发送通知 cmder.audioBroadcastCmd(device, channelId, eventResultForOk -> { @@ -658,11 +663,38 @@ public class PlayServiceImpl implements IPlayService { AudioBroadcastCatch audioBroadcastCatch = new AudioBroadcastCatch(device.getDeviceId(), channelId, AudioBroadcastCatchStatus.Ready); audioBroadcastManager.add(audioBroadcastCatch); }, eventResultForError -> { - dynamicTask.stop(timeOutTaskKey); // 发送失败 logger.error("语音广播发送失败: {}:{}", channelId, eventResultForError.msg); event.call("语音广播发送失败"); - audioBroadcastManager.del(device.getDeviceId(), channelId); + stopAudioBroadcast(device.getDeviceId(), channelId); }); } + + @Override + public void stopAudioBroadcast(String deviceId, String channelId){ + AudioBroadcastCatch audioBroadcastCatch = audioBroadcastManager.get(deviceId, channelId); + if (audioBroadcastCatch != null) { + audioBroadcastManager.del(deviceId, audioBroadcastCatch.getChannelId()); + } + try { + SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(deviceId, channelId, null, null); + if (sendRtpItem != null) { + redisCatchStorage.deleteSendRTPServer(deviceId, sendRtpItem.getChannelId(), null, null); + MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId()); + Map param = new HashMap<>(); + param.put("vhost", "__defaultVhost__"); + param.put("app", sendRtpItem.getApp()); + param.put("stream", sendRtpItem.getStreamId()); + zlmresTfulUtils.stopSendRtp(mediaInfo, param); + } + if (audioBroadcastCatch.getStatus() == AudioBroadcastCatchStatus.Ok) { + cmder.streamByeCmd(audioBroadcastCatch.getDialog(), audioBroadcastCatch.getRequest(), null); + } + } catch (SipException e) { + throw new RuntimeException(e); + } catch (ParseException e) { + throw new RuntimeException(e); + } + + } } diff --git a/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/play/PlayController.java b/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/play/PlayController.java index d587e0d0..935fc425 100644 --- a/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/play/PlayController.java +++ b/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/play/PlayController.java @@ -319,6 +319,22 @@ public class PlayController { return result; } + + @ApiOperation("停止语音广播") + @ApiImplicitParams({ + @ApiImplicitParam(name = "deviceId", value = "设备Id", dataTypeClass = String.class), + @ApiImplicitParam(name = "channelId", value = "通道Id", dataTypeClass = String.class), + }) + @GetMapping("/broadcast/stop/{deviceId}/{channelId}") + @PostMapping("/broadcast/stop/{deviceId}/{channelId}") + public WVPResult stopBroadcastA(@PathVariable String deviceId, @PathVariable String channelId) { + if (logger.isDebugEnabled()) { + logger.debug("停止语音广播API调用"); + } + playService.stopAudioBroadcast(deviceId, channelId); + return new WVPResult<>(0, "success", null); + } + @ApiOperation("获取所有的ssrc") @GetMapping("/ssrc") public WVPResult getSsrc() {