diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/GB28181ResourceServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/gb28181/GB28181ResourceServiceImpl.java index 84f0519b..10d822d6 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/GB28181ResourceServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/GB28181ResourceServiceImpl.java @@ -347,7 +347,7 @@ public class GB28181ResourceServiceImpl implements IResourceService { DateUtil.urlFormatter.format(startTime) + "_" + DateUtil.urlFormatter.format(stopTime); MediaServerItem mediaServerItem = playService.getNewMediaServerItem(checkResult.device); SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, stream, null, - checkResult.device.isSsrcCheck(), true, 0, false, checkResult.device.getStreamModeForParam()); + checkResult.device.isSsrcCheck(), true, 0, false, false, checkResult.device.getStreamModeForParam()); playService.playBack(mediaServerItem, ssrcInfo, checkResult.channel.getDeviceId(), checkResult.channel.getChannelId(), startTimeStr, endTimeStr, (code, msg, data) -> { if (code == InviteErrorCode.SUCCESS.getCode()) { @@ -378,7 +378,7 @@ public class GB28181ResourceServiceImpl implements IResourceService { DateUtil.urlFormatter.format(startTime) + "_" + DateUtil.urlFormatter.format(stopTime); MediaServerItem mediaServerItem = playService.getNewMediaServerItem(checkResult.device); SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, stream, null, - checkResult.device.isSsrcCheck(), true, 0, false, checkResult.device.getStreamModeForParam()); + checkResult.device.isSsrcCheck(), true, 0, false, false, checkResult.device.getStreamModeForParam()); playService.download(mediaServerItem, ssrcInfo, checkResult.channel.getDeviceId(), checkResult.channel.getChannelId(), startTimeStr, endTimeStr, downloadSpeed, (code, msg, data) -> { if (code == InviteErrorCode.SUCCESS.getCode()) { diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/AudioBroadcastCatch.java b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/AudioBroadcastCatch.java index 814d9847..b001cef7 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/AudioBroadcastCatch.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/AudioBroadcastCatch.java @@ -45,6 +45,12 @@ public class AudioBroadcastCatch { */ private String channelId; + + /** + * callId + */ + private String callId; + /** * 流媒体信息 */ @@ -156,4 +162,12 @@ public class AudioBroadcastCatch { public void setSipTransactionInfoByRequset(SIPResponse sipResponse) { this.sipTransactionInfo = new SipTransactionInfo(sipResponse); } + + public String getCallId() { + return callId; + } + + public void setCallId(String callId) { + this.callId = callId; + } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java index 55c664c6..e3b9b664 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java @@ -7,10 +7,6 @@ import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.conf.exception.SsrcTransactionNotFoundException; import com.genersoft.iot.vmp.gb28181.SipLayer; import com.genersoft.iot.vmp.gb28181.bean.*; -import com.genersoft.iot.vmp.gb28181.bean.Device; -import com.genersoft.iot.vmp.gb28181.bean.DeviceAlarm; -import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel; -import com.genersoft.iot.vmp.gb28181.bean.SsrcTransaction; import com.genersoft.iot.vmp.gb28181.bean.command.PTZCommand; import com.genersoft.iot.vmp.gb28181.event.SipSubscribe; import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager; @@ -38,11 +34,13 @@ import org.springframework.context.annotation.DependsOn; import org.springframework.stereotype.Component; import org.springframework.util.ObjectUtils; -import javax.sip.*; +import javax.sip.InvalidArgumentException; +import javax.sip.ResponseEvent; +import javax.sip.SipException; +import javax.sip.SipFactory; import javax.sip.header.CallIdHeader; import javax.sip.message.Request; import java.text.ParseException; -import java.util.ArrayList; import java.util.List; /** @@ -85,8 +83,8 @@ public class SIPCommander implements ISIPCommander { @Override - public void ptzZoomCmd(Device device, String channelId, int inOut, int zoomSpeed) throws InvalidArgumentException, ParseException, SipException { - ptzCmd(device, channelId, 0, 0, inOut, 0, zoomSpeed); + public void ptzCmd(Device device, String channelId, PTZCommand ptzCommand) throws InvalidArgumentException, SipException, ParseException { + } /** @@ -120,39 +118,6 @@ public class SIPCommander implements ISIPCommander { return builder.toString(); } - /** - * 云台控制,支持方向与缩放控制 - * - * @param device 控制设备 - * @param channelId 预览通道 - * @param leftRight 镜头左移右移 0:停止 1:左移 2:右移 - * @param upDown 镜头上移下移 0:停止 1:上移 2:下移 - * @param inOut 镜头放大缩小 0:停止 1:缩小 2:放大 - * @param moveSpeed 镜头移动速度 - * @param zoomSpeed 镜头缩放速度 - */ - @Override - public void ptzCmd(Device device, String channelId, int leftRight, int upDown, int inOut, int moveSpeed, - int zoomSpeed) throws InvalidArgumentException, SipException, ParseException { - String cmdStr = SipUtils.cmdString(leftRight, upDown, inOut, moveSpeed, zoomSpeed); - StringBuilder ptzXml = new StringBuilder(200); - String charset = device.getCharset(); - ptzXml.append("\r\n"); - ptzXml.append("\r\n"); - ptzXml.append("DeviceControl\r\n"); - ptzXml.append("" + (int) ((Math.random() * 9 + 1) * 100000) + "\r\n"); - ptzXml.append("" + channelId + "\r\n"); - ptzXml.append("" + cmdStr + "\r\n"); - ptzXml.append("\r\n"); - ptzXml.append("5\r\n"); - ptzXml.append("\r\n"); - ptzXml.append("\r\n"); - - Request request = headerProvider.createMessageRequest(device, ptzXml.toString(), SipUtils.getNewViaTag(), SipUtils.getNewFromTag(), null, sipSender.getNewCallIdHeader(sipLayer.getLocalIp(device.getLocalIp()),device.getTransport())); - - sipSender.transmitRequest(sipLayer.getLocalIp(device.getLocalIp()),request); - } - /** * 前端控制,包括PTZ指令、FI指令、预置位指令、巡航指令、扫描指令和辅助开关指令 * @@ -634,28 +599,7 @@ public class SIPCommander implements ISIPCommander { * @param device 视频设备 */ @Override - public void audioBroadcastCmd(Device device) throws InvalidArgumentException, SipException, ParseException { - - StringBuffer broadcastXml = new StringBuffer(200); - String charset = device.getCharset(); - broadcastXml.append("\r\n"); - broadcastXml.append("\r\n"); - broadcastXml.append("Broadcast\r\n"); - broadcastXml.append("" + SipUtils.getNewSn() + "\r\n"); - broadcastXml.append("" + sipConfig.getId() + "\r\n"); - broadcastXml.append("" + device.getDeviceId() + "\r\n"); - broadcastXml.append("\r\n"); - - - - Request request = headerProvider.createMessageRequest(device, broadcastXml.toString(), SipUtils.getNewViaTag(), SipUtils.getNewFromTag(), null,sipSender.getNewCallIdHeader(sipLayer.getLocalIp(device.getLocalIp()),device.getTransport())); - sipSender.transmitRequest(sipLayer.getLocalIp(device.getLocalIp()), request); - - } - - @Override - public void audioBroadcastCmd(Device device, SipSubscribe.Event errorEvent) throws InvalidArgumentException, SipException, ParseException { - + public void audioBroadcastCmd(Device device, String channelId, SipSubscribe.Event okEvent, SipSubscribe.Event errorEvent) throws InvalidArgumentException, SipException, ParseException { StringBuffer broadcastXml = new StringBuffer(200); String charset = device.getCharset(); broadcastXml.append("\r\n"); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java index ef59494e..fa7e52f8 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java @@ -93,6 +93,9 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform { @Autowired private GitUtil gitUtil; + @Autowired + private SipLayer sipLayer; + @Override public void register(ParentPlatform parentPlatform, SipSubscribe.Event errorEvent , SipSubscribe.Event okEvent) throws InvalidArgumentException, ParseException, SipException { register(parentPlatform, null, null, errorEvent, okEvent, true); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java index 9ec18afc..daca5c78 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java @@ -11,7 +11,6 @@ import com.genersoft.iot.vmp.gb28181.transmit.event.request.ISIPRequestProcessor import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent; import com.genersoft.iot.vmp.media.zlm.IStreamSendManager; import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory; -import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import com.genersoft.iot.vmp.service.IDeviceService; import com.genersoft.iot.vmp.service.IMediaServerService; @@ -20,6 +19,7 @@ import com.genersoft.iot.vmp.service.bean.RequestPushStreamMsg; import com.genersoft.iot.vmp.service.redisMsg.RedisGbPlayMsgListener; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.IVideoManagerStorage; +import gov.nist.javax.sip.message.SIPRequest; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.InitializingBean; @@ -29,9 +29,6 @@ import org.springframework.stereotype.Component; import javax.sip.RequestEvent; import javax.sip.address.SipURI; import javax.sip.header.CallIdHeader; -import javax.sip.header.FromHeader; -import javax.sip.header.HeaderAddress; -import javax.sip.header.ToHeader; import java.util.HashMap; import java.util.Map; @@ -92,7 +89,10 @@ public class AckRequestProcessor extends SIPRequestProcessorParent implements In public void process(RequestEvent evt) { CallIdHeader callIdHeader = (CallIdHeader)evt.getRequest().getHeader(CallIdHeader.NAME); dynamicTask.stop(callIdHeader.getCallId()); - String channelId = ((SipURI) ((HeaderAddress) evt.getRequest().getHeader(ToHeader.NAME)).getAddress().getURI()).getUser(); + SIPRequest sipRequest = (SIPRequest) evt.getRequest(); + String toUserId = ((SipURI) sipRequest.getToHeader().getAddress().getURI()).getUser(); + String fromUserId = ((SipURI) sipRequest.getFromHeader().getAddress().getURI()).getUser(); + SendRtpItem sendRtpItem = streamSendManager.getByCallId(callIdHeader.getCallId()); if (sendRtpItem == null) { logger.warn("[收到ACK]:未找到来自{},目标为({})的推流信息",fromUserId, toUserId); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java index 7da6d466..da3d313c 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java @@ -123,7 +123,7 @@ public class ByeRequestProcessor extends SIPRequestProcessorParent implements In SendRtpItem sendRtpItem = streamSendManager.getByCallId(callIdHeader.getCallId()); if (sendRtpItem != null){ logger.info("[收到bye] 来自{}, 停止通道:{}", sendRtpItem.getDestId(), sendRtpItem.getChannelId()); - String streamId = sendRtpItem.getStreamId(); + String streamId = sendRtpItem.getStream(); Map param = new HashMap<>(); param.put("vhost","__defaultVhost__"); param.put("app",sendRtpItem.getApp()); @@ -145,32 +145,32 @@ public class ByeRequestProcessor extends SIPRequestProcessorParent implements In ParentPlatform platform = platformService.queryPlatformByServerGBId(sendRtpItem.getDestId()); if (platform != null) { MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(0, - sendRtpItem.getApp(), sendRtpItem.getStreamId(), sendRtpItem.getChannelId(), + sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getChannelId(), sendRtpItem.getDestId(), platform.getName(), userSetting.getServerId(), sendRtpItem.getMediaServerId()); messageForPushChannel.setPlatFormIndex(platform.getId()); redisCatchStorage.sendPlatformStopPlayMsg(messageForPushChannel); }else { - logger.info("[上级平台停止观看] 未找到平台{}的信息,发送redis消息失败", sendRtpItem.getPlatformId()); + logger.info("[上级平台停止观看] 未找到平台{}的信息,发送redis消息失败", sendRtpItem.getDestId()); } } - AudioBroadcastCatch audioBroadcastCatch = audioBroadcastManager.get(sendRtpItem.getDeviceId(), sendRtpItem.getChannelId()); + AudioBroadcastCatch audioBroadcastCatch = audioBroadcastManager.get(sendRtpItem.getSourceId(), sendRtpItem.getChannelId()); if (audioBroadcastCatch != null && audioBroadcastCatch.getSipTransactionInfo().getCallId().equals(callIdHeader.getCallId())) { // 来自上级平台的停止对讲 - logger.info("[停止对讲] 来自上级,平台:{}, 通道:{}", sendRtpItem.getDeviceId(), sendRtpItem.getChannelId()); - audioBroadcastManager.del(sendRtpItem.getDeviceId(), sendRtpItem.getChannelId()); + logger.info("[停止对讲] 来自上级,平台:{}, 通道:{}", sendRtpItem.getSourceId(), sendRtpItem.getChannelId()); + audioBroadcastManager.del(sendRtpItem.getSourceId(), sendRtpItem.getChannelId()); } int totalReaderCount = zlmServerFactory.totalReaderCount(mediaInfo, sendRtpItem.getApp(), streamId); if (totalReaderCount <= 0) { logger.info("[收到bye] {} 无其它观看者,通知设备停止推流", streamId); if (sendRtpItem.getPlayType().equals(InviteStreamType.PLAY)) { - Device device = deviceService.getDevice(sendRtpItem.getDeviceId()); + Device device = deviceService.getDevice(sendRtpItem.getSourceId()); if (device == null) { logger.info("[收到bye] {} 通知设备停止推流时未找到设备信息", streamId); } try { - logger.info("[停止点播] {}/{}", sendRtpItem.getDeviceId(), sendRtpItem.getChannelId()); + logger.info("[停止点播] {}/{}", sendRtpItem.getSourceId(), sendRtpItem.getChannelId()); cmder.streamByeCmd(device, sendRtpItem.getChannelId(), streamId, null); } catch (InvalidArgumentException | ParseException | SipException | SsrcTransactionNotFoundException e) { diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java index 7a3295ee..615a93d3 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java @@ -932,7 +932,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements sipResponse = responseSdpAck(request, content.toString(), parentPlatform); AudioBroadcastCatch audioBroadcastCatch = audioBroadcastManager.get(device.getDeviceId(), sendRtpItem.getChannelId()); - + audioBroadcastCatch.setCallId(request.getCallIdHeader().getCallId()); audioBroadcastCatch.setStatus(AudioBroadcastCatchStatus.Ok); audioBroadcastCatch.setSipTransactionInfoByRequset(sipResponse); audioBroadcastManager.update(audioBroadcastCatch); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/info/InfoRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/info/InfoRequestProcessor.java index 8a394fca..adaf0030 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/info/InfoRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/info/InfoRequestProcessor.java @@ -114,7 +114,7 @@ public class InfoRequestProcessor extends SIPRequestProcessorParent implements I if ("Application".equalsIgnoreCase(contentType) && "MANSRTSP".equalsIgnoreCase(contentSubType)) { SendRtpItem sendRtpItem = streamSendManager.getByCallId(callIdHeader.getCallId()); - String streamId = sendRtpItem.getStreamId(); + String streamId = sendRtpItem.getStream(); InviteInfo inviteInfo = inviteStreamService.getInviteInfoByStream(InviteSessionType.PLAYBACK, streamId); if (null == inviteInfo) { responseAck(request, Response.NOT_FOUND, "stream " + streamId + " not found"); diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/IStreamSendManager.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/IStreamSendManager.java index 275ca7c8..6983fbb7 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/IStreamSendManager.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/IStreamSendManager.java @@ -1,7 +1,6 @@ package com.genersoft.iot.vmp.media.zlm; import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem; -import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import java.util.List; @@ -30,5 +29,4 @@ public interface IStreamSendManager { void remove(SendRtpItem sendRtpItem); void remove(List sendRtpItemList); - } diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java index 008560f4..dc30c9ea 100755 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java @@ -395,9 +395,7 @@ public class ZLMHttpHookListener { mediaServerService.removeCount(param.getMediaServerId()); } - int updateStatusResult = streamProxyService.updateStatus(param.isRegist(), param.getApp(), param.getStream()); - if (updateStatusResult > 0) { - } + streamProxyService.updatePullingStatus(param.isRegist(), param.getApp(), param.getStream()); if ("rtp".equals(param.getApp())) { if (!param.isRegist()) { InviteInfo inviteInfo = inviteStreamService.getInviteInfoByStream(null, param.getStream()); diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/impl/StreamSendManagerImpl.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/impl/StreamSendManagerImpl.java index 5ed2e031..b55a9ce6 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/impl/StreamSendManagerImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/impl/StreamSendManagerImpl.java @@ -1,11 +1,8 @@ package com.genersoft.iot.vmp.media.zlm.impl; -import com.genersoft.iot.vmp.common.VideoManagerConstants; import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem; import com.genersoft.iot.vmp.media.zlm.IStreamSendManager; import com.genersoft.iot.vmp.utils.redis.RedisUtil; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.stereotype.Component; @@ -39,8 +36,8 @@ public class StreamSendManagerImpl implements IStreamSendManager { if (sendRtpItem.getCallId() != null) { redisTemplate.opsForValue().set(getCallIdKey(sendRtpItem.getCallId()), dateId); } - if (sendRtpItem.getApp() != null && sendRtpItem.getStreamId() != null) { - redisTemplate.opsForSet().add(getAppAndStreamKey(sendRtpItem.getApp(), sendRtpItem.getStreamId()), dateId); + if (sendRtpItem.getApp() != null && sendRtpItem.getStream() != null) { + redisTemplate.opsForSet().add(getAppAndStreamKey(sendRtpItem.getApp(), sendRtpItem.getStream()), dateId); } if (sendRtpItem.getMediaServerId() != null) { redisTemplate.opsForSet().add(getMediaServerIdKey(sendRtpItem.getMediaServerId()), dateId); @@ -156,8 +153,8 @@ public class StreamSendManagerImpl implements IStreamSendManager { if (sendRtpItem.getCallId() != null) { redisTemplate.delete(getCallIdKey(sendRtpItem.getCallId())); } - if (sendRtpItem.getApp() != null && sendRtpItem.getStreamId() != null) { - redisTemplate.opsForSet().remove(getAppAndStreamKey(sendRtpItem.getApp(), sendRtpItem.getStreamId()), dateId); + if (sendRtpItem.getApp() != null && sendRtpItem.getStream() != null) { + redisTemplate.opsForSet().remove(getAppAndStreamKey(sendRtpItem.getApp(), sendRtpItem.getStream()), dateId); } if (sendRtpItem.getMediaServerId() != null) { redisTemplate.opsForSet().remove(getMediaServerIdKey(sendRtpItem.getMediaServerId()), dateId); diff --git a/src/main/java/com/genersoft/iot/vmp/service/IMediaService.java b/src/main/java/com/genersoft/iot/vmp/service/IMediaService.java index d6cfdb4e..3b528e27 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/IMediaService.java +++ b/src/main/java/com/genersoft/iot/vmp/service/IMediaService.java @@ -1,6 +1,5 @@ package com.genersoft.iot.vmp.service; -import com.alibaba.fastjson2.JSONArray; import com.genersoft.iot.vmp.common.StreamInfo; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; @@ -34,14 +33,6 @@ public interface IMediaService { */ StreamInfo getStreamInfoByAppAndStream(MediaServerItem mediaServerItem, String app, String stream, Object tracks, String callId); - /** - * 根据应用名和流ID获取播放地址, 只是地址拼接,返回的ip使用远程访问ip,适用与zlm与wvp在一台主机的情况 - * @param app - * @param stream - * @return - */ - StreamInfo getStreamInfoByAppAndStream(MediaServerItem mediaInfo, String app, String stream, Object tracks, String addr, String callId); - /** * 查看流是否已经注册 */ diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceServiceImpl.java index 41bac415..3b16b5aa 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceServiceImpl.java @@ -12,16 +12,11 @@ import com.genersoft.iot.vmp.gb28181.task.ISubscribeTask; import com.genersoft.iot.vmp.gb28181.task.impl.CatalogSubscribeTask; import com.genersoft.iot.vmp.gb28181.task.impl.MobilePositionSubscribeTask; import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander; -import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander; import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.response.cmd.CatalogResponseMessageHandler; -import com.genersoft.iot.vmp.service.*; -import com.genersoft.iot.vmp.service.bean.Group; +import com.genersoft.iot.vmp.media.zlm.IStreamSendManager; import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; -import com.genersoft.iot.vmp.service.IDeviceChannelService; -import com.genersoft.iot.vmp.service.IDeviceService; -import com.genersoft.iot.vmp.service.IInviteStreamService; -import com.genersoft.iot.vmp.service.IMediaServerService; +import com.genersoft.iot.vmp.service.*; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.dao.DeviceChannelMapper; import com.genersoft.iot.vmp.storager.dao.DeviceMapper; @@ -36,7 +31,6 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.jdbc.datasource.DataSourceTransactionManager; import org.springframework.stereotype.Service; import org.springframework.transaction.TransactionDefinition; -import org.springframework.transaction.TransactionStatus; import org.springframework.transaction.annotation.Transactional; import org.springframework.util.ObjectUtils; @@ -56,8 +50,6 @@ public class DeviceServiceImpl implements IDeviceService { private final static Logger logger = LoggerFactory.getLogger(DeviceServiceImpl.class); - @Autowired - private SIPCommander cmder; @Autowired private DynamicTask dynamicTask; @@ -110,6 +102,9 @@ public class DeviceServiceImpl implements IDeviceService { @Autowired private ZLMRESTfulUtils zlmresTfulUtils; + @Autowired + private IStreamSendManager streamSendManager; + @Override public void online(Device device, SipTransactionInfo sipTransactionInfo) { logger.info("[设备上线] deviceId:{}->{}:{}", device.getDeviceId(), device.getIp(), device.getPort()); @@ -250,13 +245,17 @@ public class DeviceServiceImpl implements IDeviceService { removeCatalogSubscribe(device, null); removeMobilePositionSubscribe(device, null); + // 清理语音对讲 List audioBroadcastCatches = audioBroadcastManager.get(deviceId); - if (audioBroadcastCatches.size() > 0) { + if (!audioBroadcastCatches.isEmpty()) { for (AudioBroadcastCatch audioBroadcastCatch : audioBroadcastCatches) { - - SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(deviceId, audioBroadcastCatch.getChannelId(), null, null); + if (ObjectUtils.isEmpty(audioBroadcastCatch.getCallId())) { + audioBroadcastManager.del(deviceId, audioBroadcastCatch.getChannelId()); + continue; + } + SendRtpItem sendRtpItem = streamSendManager.getByCallId(audioBroadcastCatch.getCallId()); if (sendRtpItem != null) { - redisCatchStorage.deleteSendRTPServer(deviceId, sendRtpItem.getChannelId(), null, null); + streamSendManager.remove(sendRtpItem); MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId()); Map param = new HashMap<>(); param.put("vhost", "__defaultVhost__"); @@ -264,8 +263,8 @@ public class DeviceServiceImpl implements IDeviceService { param.put("stream", sendRtpItem.getStream()); zlmresTfulUtils.stopSendRtp(mediaInfo, param); } - audioBroadcastManager.del(deviceId, audioBroadcastCatch.getChannelId()); + } } } diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServiceImpl.java index a2ff2525..8c0c506b 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServiceImpl.java @@ -1,6 +1,5 @@ package com.genersoft.iot.vmp.service.impl; -import com.alibaba.fastjson2.JSON; import com.alibaba.fastjson2.JSONArray; import com.alibaba.fastjson2.JSONObject; import com.genersoft.iot.vmp.common.StreamInfo; @@ -13,8 +12,6 @@ import com.genersoft.iot.vmp.media.zlm.dto.StreamPush; import com.genersoft.iot.vmp.service.IMediaServerService; import com.genersoft.iot.vmp.service.IMediaService; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; -import com.genersoft.iot.vmp.storager.IVideoManagerStorage; -import com.genersoft.iot.vmp.service.IMediaService; import com.genersoft.iot.vmp.storager.dao.StreamProxyMapper; import com.genersoft.iot.vmp.storager.dao.StreamPushMapper; import org.springframework.beans.factory.annotation.Autowired; @@ -142,3 +139,4 @@ public class MediaServiceImpl implements IMediaService { return result; } } + diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/PlatformServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/PlatformServiceImpl.java index ef49145a..63a9d675 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/PlatformServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/PlatformServiceImpl.java @@ -1,12 +1,8 @@ package com.genersoft.iot.vmp.service.impl; -import com.genersoft.iot.vmp.common.CommonGbChannel; -import com.genersoft.iot.vmp.common.VideoManagerConstants; import com.alibaba.fastjson2.JSONObject; -import com.genersoft.iot.vmp.common.InviteInfo; -import com.genersoft.iot.vmp.common.InviteSessionStatus; -import com.genersoft.iot.vmp.common.InviteSessionType; import com.baomidou.dynamic.datasource.annotation.DS; +import com.genersoft.iot.vmp.common.*; import com.genersoft.iot.vmp.conf.DynamicTask; import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.conf.exception.ControllerException; @@ -19,26 +15,19 @@ import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommanderFroPlatform; import com.genersoft.iot.vmp.gb28181.utils.SipUtils; import com.genersoft.iot.vmp.media.zlm.IStreamSendManager; import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils; -import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe; -import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory; -import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange; -import com.genersoft.iot.vmp.gb28181.utils.SipUtils; import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory; +import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam; -import com.genersoft.iot.vmp.service.IInviteStreamService; -import com.genersoft.iot.vmp.service.IMediaServerService; -import com.genersoft.iot.vmp.service.IPlatformChannelService; -import com.genersoft.iot.vmp.service.IPlatformService; -import com.genersoft.iot.vmp.service.IPlayService; +import com.genersoft.iot.vmp.service.*; import com.genersoft.iot.vmp.service.bean.*; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; -import com.genersoft.iot.vmp.storager.dao.*; +import com.genersoft.iot.vmp.storager.dao.CommonChannelMapper; +import com.genersoft.iot.vmp.storager.dao.ParentPlatformMapper; import com.genersoft.iot.vmp.utils.DateUtil; import com.genersoft.iot.vmp.vmanager.bean.ErrorCode; import com.github.pagehelper.PageHelper; import com.github.pagehelper.PageInfo; -import gov.nist.javax.sip.message.SIPRequest; import gov.nist.javax.sip.message.SIPResponse; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -48,17 +37,8 @@ import org.springframework.stereotype.Service; import javax.sdp.*; import javax.sip.InvalidArgumentException; import javax.sip.ResponseEvent; -import javax.sip.PeerUnavailableException; import javax.sip.SipException; import java.text.ParseException; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.UUID; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.UUID; import java.util.*; /** @@ -86,8 +66,6 @@ public class PlatformServiceImpl implements IPlatformService { @Autowired private IStreamSendManager streamSendManager; - @Autowired - private IRedisCatchStorage redisCatchStorage; @Autowired private SSRCFactory ssrcFactory; diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java index 263df61b..10f2e121 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java @@ -18,51 +18,25 @@ import com.genersoft.iot.vmp.gb28181.event.SipSubscribe; import com.genersoft.iot.vmp.gb28181.session.AudioBroadcastManager; import com.genersoft.iot.vmp.gb28181.session.SSRCFactory; import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager; -import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform; -import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander; import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander; -import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommanderFroPlatform; +import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform; import com.genersoft.iot.vmp.gb28181.utils.SipUtils; -import com.genersoft.iot.vmp.media.zlm.IStreamSendManager; -import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils; -import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory; -import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe; -import com.genersoft.iot.vmp.media.zlm.dto.*; import com.genersoft.iot.vmp.media.zlm.*; import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory; -import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange; -import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; -import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory; import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForRecordMp4; import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import com.genersoft.iot.vmp.media.zlm.dto.hook.HookParam; import com.genersoft.iot.vmp.media.zlm.dto.hook.OnRecordMp4HookParam; import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam; -import com.genersoft.iot.vmp.service.IInviteStreamService; -import com.genersoft.iot.vmp.service.IMediaServerService; -import com.genersoft.iot.vmp.service.IMediaService; -import com.genersoft.iot.vmp.service.IPlayService; -import com.genersoft.iot.vmp.service.bean.DownloadFileInfo; -import com.genersoft.iot.vmp.service.bean.ErrorCallback; -import com.genersoft.iot.vmp.service.bean.InviteErrorCode; -import com.genersoft.iot.vmp.service.bean.SSRCInfo; import com.genersoft.iot.vmp.service.*; import com.genersoft.iot.vmp.service.bean.*; -import com.genersoft.iot.vmp.service.bean.ErrorCallback; -import com.genersoft.iot.vmp.service.bean.InviteErrorCode; -import com.genersoft.iot.vmp.service.bean.RequestPushStreamMsg; -import com.genersoft.iot.vmp.service.bean.SSRCInfo; import com.genersoft.iot.vmp.service.redisMsg.RedisGbPlayMsgListener; -import com.genersoft.iot.vmp.service.bean.DownloadFileInfo; -import com.genersoft.iot.vmp.service.bean.ErrorCallback; -import com.genersoft.iot.vmp.service.bean.InviteErrorCode; -import com.genersoft.iot.vmp.service.bean.SSRCInfo; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.IVideoManagerStorage; +import com.genersoft.iot.vmp.storager.dao.CloudRecordServiceMapper; import com.genersoft.iot.vmp.storager.dao.DeviceChannelMapper; import com.genersoft.iot.vmp.storager.dao.DeviceMapper; -import com.genersoft.iot.vmp.storager.dao.CloudRecordServiceMapper; import com.genersoft.iot.vmp.utils.CloudRecordUtils; import com.genersoft.iot.vmp.utils.DateUtil; import com.genersoft.iot.vmp.vmanager.bean.AudioBroadcastResult; @@ -265,8 +239,8 @@ public class PlayServiceImpl implements IPlayService { sendRtpItem.setApp("talk"); sendRtpItem.setStream(stream); sendRtpItem.setSsrc(playSsrc); - sendRtpItem.setDeviceId(device.getDeviceId()); - sendRtpItem.setPlatformId(device.getDeviceId()); + sendRtpItem.setDestId(device.getDeviceId()); + sendRtpItem.setSourceId(sipConfig.getId()); sendRtpItem.setChannelId(channelId); sendRtpItem.setRtcp(false); sendRtpItem.setMediaServerId(mediaServerItem.getId()); @@ -352,7 +326,7 @@ public class PlayServiceImpl implements IPlayService { sendRtpItem.setFromTag(response.getFromTag()); sendRtpItem.setToTag(response.getToTag()); sendRtpItem.setCallId(response.getCallIdHeader().getCallId()); - redisCatchStorage.updateSendRTPSever(sendRtpItem); + streamSendManager.update(sendRtpItem); streamSession.put(device.getDeviceId(), channelId, "talk", sendRtpItem.getStream(), sendRtpItem.getSsrc(), sendRtpItem.getMediaServerId(), @@ -453,8 +427,8 @@ public class PlayServiceImpl implements IPlayService { streamSession.remove(device.getDeviceId(), channel.getChannelId(), ssrcInfo.getStream()); mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream()); // 取消订阅消息监听 - HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed("rtp", ssrcInfo.getStream(), true, "rtsp", mediaServerItem.getId()); - subscribe.removeSubscribe(hookSubscribe); + HookSubscribeForStreamChange hookSubscribeForStreamChange = HookSubscribeFactory.on_stream_changed("rtp", ssrcInfo.getStream(), true, "rtsp", mediaServerItem.getId()); + subscribe.removeSubscribe(hookSubscribeForStreamChange); } }else { logger.info("[点播超时] 收流超时 deviceId: {}, channelId: {},码流:{},端口:{}, SSRC: {}", @@ -517,8 +491,8 @@ public class PlayServiceImpl implements IPlayService { // 释放ssrc mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc()); - streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream()); - inviteStreamService.removeInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, device.getDeviceId(), channelId); + streamSession.remove(device.getDeviceId(), channel.getChannelId(), ssrcInfo.getStream()); + inviteStreamService.removeInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, device.getDeviceId(), channel.getChannelId()); try { callback.run(InviteErrorCode.ERROR_FOR_SIGNALLING_ERROR.getCode(), @@ -527,7 +501,7 @@ public class PlayServiceImpl implements IPlayService { logger.warn("[invite] 发送回调失败", e); } try { - inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId, null, + inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channel.getChannelId(), null, InviteErrorCode.ERROR_FOR_RESET_SSRC.getCode(), String.format("点播失败, 错误码: %s, %s", event.statusCode, event.msg), null); }catch (Exception e) { @@ -551,7 +525,7 @@ public class PlayServiceImpl implements IPlayService { logger.warn("[invite] 发送回调失败", exception); } try { - inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId, null, + inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channel.getChannelId(), null, InviteErrorCode.ERROR_FOR_SIP_SENDING_FAILED.getCode(), InviteErrorCode.ERROR_FOR_SIP_SENDING_FAILED.getMsg(), null); }catch (Exception exception) { @@ -1236,9 +1210,10 @@ public class PlayServiceImpl implements IPlayService { event.call("开启语音广播的时候未找到通道"); return false; } + AudioBroadcastCatch audioBroadcastCatchInCatch = audioBroadcastManager.get(device.getDeviceId(), channelId); // 查询通道使用状态 - if (audioBroadcastManager.exit(device.getDeviceId(), channelId)) { - SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(device.getDeviceId(), channelId, null, null); + if (audioBroadcastCatchInCatch != null && audioBroadcastCatchInCatch.getCallId() != null) { + SendRtpItem sendRtpItem = streamSendManager.getByCallId(audioBroadcastCatchInCatch.getCallId()); if (sendRtpItem != null && sendRtpItem.isOnlyAudio()) { // 查询流是否存在,不存在则认为是异常状态 Boolean streamReady = zlmServerFactory.isStreamReady(mediaServerItem, sendRtpItem.getApp(), sendRtpItem.getStream()); @@ -1280,8 +1255,9 @@ public class PlayServiceImpl implements IPlayService { @Override public boolean audioBroadcastInUse(Device device, String channelId) { - if (audioBroadcastManager.exit(device.getDeviceId(), channelId)) { - SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(device.getDeviceId(), channelId, null, null); + AudioBroadcastCatch audioBroadcastCatch = audioBroadcastManager.get(device.getDeviceId(), channelId); + if (audioBroadcastCatch != null) { + SendRtpItem sendRtpItem = streamSendManager.getByCallId(audioBroadcastCatch.getCallId()); if (sendRtpItem != null && sendRtpItem.isOnlyAudio()) { // 查询流是否存在,不存在则认为是异常状态 MediaServerItem mediaServerServiceOne = mediaServerService.getOne(sendRtpItem.getMediaServerId()); @@ -1300,34 +1276,38 @@ public class PlayServiceImpl implements IPlayService { public void stopAudioBroadcast(String deviceId, String channelId) { logger.info("[停止对讲] 设备:{}, 通道:{}", deviceId, channelId); List audioBroadcastCatchList = new ArrayList<>(); + Device device = deviceService.getDevice(deviceId); + if (device == null) { + return; + } if (channelId == null) { audioBroadcastCatchList.addAll(audioBroadcastManager.get(deviceId)); } else { audioBroadcastCatchList.add(audioBroadcastManager.get(deviceId, channelId)); } - if (audioBroadcastCatchList.size() > 0) { + if (!audioBroadcastCatchList.isEmpty()) { for (AudioBroadcastCatch audioBroadcastCatch : audioBroadcastCatchList) { - Device device = deviceService.getDevice(deviceId); - if (device == null || audioBroadcastCatch == null) { - return; + if (audioBroadcastCatch == null) { + continue; } - SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(deviceId, audioBroadcastCatch.getChannelId(), null, null); - if (sendRtpItem != null) { - redisCatchStorage.deleteSendRTPServer(deviceId, sendRtpItem.getChannelId(), null, null); - MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId()); - Map param = new HashMap<>(); - param.put("vhost", "__defaultVhost__"); - param.put("app", sendRtpItem.getApp()); - param.put("stream", sendRtpItem.getStream()); - zlmresTfulUtils.stopSendRtp(mediaInfo, param); - try { - cmder.streamByeCmdForDeviceInvite(device, sendRtpItem.getChannelId(), audioBroadcastCatch.getSipTransactionInfo(), null); - } catch (InvalidArgumentException | ParseException | SipException | - SsrcTransactionNotFoundException e) { - logger.error("[消息发送失败] 发送语音喊话BYE失败"); + if (audioBroadcastCatch.getCallId() != null) { + SendRtpItem sendRtpItem = streamSendManager.getByCallId(audioBroadcastCatch.getCallId()); + if (sendRtpItem != null) { + streamSendManager.remove(sendRtpItem); + MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId()); + Map param = new HashMap<>(); + param.put("vhost", "__defaultVhost__"); + param.put("app", sendRtpItem.getApp()); + param.put("stream", sendRtpItem.getStream()); + zlmresTfulUtils.stopSendRtp(mediaInfo, param); + try { + cmder.streamByeCmdForDeviceInvite(device, sendRtpItem.getChannelId(), audioBroadcastCatch.getSipTransactionInfo(), null); + } catch (InvalidArgumentException | ParseException | SipException | + SsrcTransactionNotFoundException e) { + logger.error("[消息发送失败] 发送语音喊话BYE失败"); + } } } - audioBroadcastManager.del(deviceId, channelId); } } @@ -1513,8 +1493,8 @@ public class PlayServiceImpl implements IPlayService { } else { logger.error("RTP推流失败: {}, 参数:{}", jsonObject.getString("msg"), JSONObject.toJSONString(param)); if (sendRtpItem.isOnlyAudio()) { - Device device = deviceService.getDevice(sendRtpItem.getDeviceId()); - AudioBroadcastCatch audioBroadcastCatch = audioBroadcastManager.get(sendRtpItem.getDeviceId(), sendRtpItem.getChannelId()); + Device device = deviceService.getDevice(sendRtpItem.getDestId()); + AudioBroadcastCatch audioBroadcastCatch = audioBroadcastManager.get(sendRtpItem.getDestId(), sendRtpItem.getChannelId()); if (audioBroadcastCatch != null) { try { cmder.streamByeCmd(device, sendRtpItem.getChannelId(), audioBroadcastCatch.getSipTransactionInfo(), null); @@ -1551,8 +1531,9 @@ public class PlayServiceImpl implements IPlayService { return; } // 查询通道使用状态 - if (audioBroadcastManager.exit(device.getDeviceId(), channelId)) { - SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(device.getDeviceId(), channelId, null, null); + AudioBroadcastCatch audioBroadcastCatch = audioBroadcastManager.get(device.getDeviceId(), channelId); + if (audioBroadcastCatch != null && audioBroadcastCatch.getCallId() != null) { + SendRtpItem sendRtpItem = streamSendManager.getByCallId(audioBroadcastCatch.getCallId()); if (sendRtpItem != null && sendRtpItem.isOnlyAudio()) { // 查询流是否存在,不存在则认为是异常状态 MediaServerItem mediaServer = mediaServerService.getOne(sendRtpItem.getMediaServerId()); @@ -1567,19 +1548,6 @@ public class PlayServiceImpl implements IPlayService { } } - SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(device.getDeviceId(), channelId, stream, null); - if (sendRtpItem != null) { - MediaServerItem mediaServer = mediaServerService.getOne(sendRtpItem.getMediaServerId()); - Boolean streamReady = zlmServerFactory.isStreamReady(mediaServer, "rtp", sendRtpItem.getReceiveStream()); - if (streamReady) { - logger.warn("[语音对讲] 进行中: {}", channelId); - event.call("语音对讲进行中"); - return; - } else { - stopTalk(device, channelId); - } - } - talk(mediaServerItem, device, channelId, stream, (mediaServerItem1, hookParam) -> { logger.info("[语音对讲] 收到设备发来的流"); }, eventResult -> { @@ -1603,7 +1571,11 @@ public class PlayServiceImpl implements IPlayService { @Override public void stopTalk(Device device, String channelId, Boolean streamIsReady) { logger.info("[语音对讲] 停止, {}/{}", device.getDeviceId(), channelId); - SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(device.getDeviceId(), channelId, null, null); + AudioBroadcastCatch audioBroadcastCatch = audioBroadcastManager.get(device.getDeviceId(), channelId); + if (audioBroadcastCatch == null || audioBroadcastCatch.getCallId() == null) { + return; + } + SendRtpItem sendRtpItem = streamSendManager.getByCallId(audioBroadcastCatch.getCallId()); if (sendRtpItem == null) { logger.info("[语音对讲] 停止失败, 未找到发送信息,可能已经停止"); return; @@ -1635,7 +1607,7 @@ public class PlayServiceImpl implements IPlayService { logger.info("[语音对讲] 停止消息发送失败,可能已经停止"); } } - redisCatchStorage.deleteSendRTPServer(device.getDeviceId(), channelId,null, null); + streamSendManager.remove(sendRtpItem); } @Override diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamCloseResponseListener.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamCloseResponseListener.java index 2c304c2a..7856523e 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamCloseResponseListener.java +++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamCloseResponseListener.java @@ -103,7 +103,7 @@ public class RedisPushStreamCloseResponseListener implements MessageListener { zlmServerFactory.stopSendRtpStream(mediaInfo, param); if (InviteStreamType.PUSH == sendRtpItem.getPlayType()) { MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(0, - sendRtpItem.getApp(), sendRtpItem.getStreamId(), sendRtpItem.getChannelId(), + sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getChannelId(), sendRtpItem.getDestId(), parentPlatform.getName(), userSetting.getServerId(), sendRtpItem.getMediaServerId()); messageForPushChannel.setPlatFormIndex(parentPlatform.getId()); redisCatchStorage.sendPlatformStopPlayMsg(messageForPushChannel);