From 7db5fabc51db8abdaf1665b13a2b91695b7eb004 Mon Sep 17 00:00:00 2001
From: 648540858 <648540858@qq.com>
Date: Mon, 11 Mar 2024 17:15:52 +0800
Subject: [PATCH] =?UTF-8?q?=E4=B8=B4=E6=97=B6=E6=8F=90=E4=BA=A4?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
---
.../gb28181/GB28181ResourceServiceImpl.java | 4 +-
.../vmp/gb28181/bean/AudioBroadcastCatch.java | 14 ++
.../transmit/cmd/impl/SIPCommander.java | 70 +---------
.../cmd/impl/SIPCommanderFroPlatform.java | 3 +
.../request/impl/AckRequestProcessor.java | 10 +-
.../request/impl/ByeRequestProcessor.java | 16 +--
.../request/impl/InviteRequestProcessor.java | 2 +-
.../impl/info/InfoRequestProcessor.java | 2 +-
.../iot/vmp/media/zlm/IStreamSendManager.java | 2 -
.../vmp/media/zlm/ZLMHttpHookListener.java | 4 +-
.../media/zlm/impl/StreamSendManagerImpl.java | 11 +-
.../iot/vmp/service/IMediaService.java | 9 --
.../vmp/service/impl/DeviceServiceImpl.java | 29 ++--
.../vmp/service/impl/MediaServiceImpl.java | 4 +-
.../vmp/service/impl/PlatformServiceImpl.java | 32 +----
.../iot/vmp/service/impl/PlayServiceImpl.java | 130 +++++++-----------
.../RedisPushStreamCloseResponseListener.java | 2 +-
17 files changed, 118 insertions(+), 226 deletions(-)
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/GB28181ResourceServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/gb28181/GB28181ResourceServiceImpl.java
index 84f0519b..10d822d6 100644
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/GB28181ResourceServiceImpl.java
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/GB28181ResourceServiceImpl.java
@@ -347,7 +347,7 @@ public class GB28181ResourceServiceImpl implements IResourceService {
DateUtil.urlFormatter.format(startTime) + "_" + DateUtil.urlFormatter.format(stopTime);
MediaServerItem mediaServerItem = playService.getNewMediaServerItem(checkResult.device);
SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, stream, null,
- checkResult.device.isSsrcCheck(), true, 0, false, checkResult.device.getStreamModeForParam());
+ checkResult.device.isSsrcCheck(), true, 0, false, false, checkResult.device.getStreamModeForParam());
playService.playBack(mediaServerItem, ssrcInfo, checkResult.channel.getDeviceId(), checkResult.channel.getChannelId(),
startTimeStr, endTimeStr, (code, msg, data) -> {
if (code == InviteErrorCode.SUCCESS.getCode()) {
@@ -378,7 +378,7 @@ public class GB28181ResourceServiceImpl implements IResourceService {
DateUtil.urlFormatter.format(startTime) + "_" + DateUtil.urlFormatter.format(stopTime);
MediaServerItem mediaServerItem = playService.getNewMediaServerItem(checkResult.device);
SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, stream, null,
- checkResult.device.isSsrcCheck(), true, 0, false, checkResult.device.getStreamModeForParam());
+ checkResult.device.isSsrcCheck(), true, 0, false, false, checkResult.device.getStreamModeForParam());
playService.download(mediaServerItem, ssrcInfo, checkResult.channel.getDeviceId(), checkResult.channel.getChannelId(),
startTimeStr, endTimeStr, downloadSpeed, (code, msg, data) -> {
if (code == InviteErrorCode.SUCCESS.getCode()) {
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/AudioBroadcastCatch.java b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/AudioBroadcastCatch.java
index 814d9847..b001cef7 100644
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/AudioBroadcastCatch.java
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/AudioBroadcastCatch.java
@@ -45,6 +45,12 @@ public class AudioBroadcastCatch {
*/
private String channelId;
+
+ /**
+ * callId
+ */
+ private String callId;
+
/**
* 流媒体信息
*/
@@ -156,4 +162,12 @@ public class AudioBroadcastCatch {
public void setSipTransactionInfoByRequset(SIPResponse sipResponse) {
this.sipTransactionInfo = new SipTransactionInfo(sipResponse);
}
+
+ public String getCallId() {
+ return callId;
+ }
+
+ public void setCallId(String callId) {
+ this.callId = callId;
+ }
}
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java
index 55c664c6..e3b9b664 100755
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java
@@ -7,10 +7,6 @@ import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.conf.exception.SsrcTransactionNotFoundException;
import com.genersoft.iot.vmp.gb28181.SipLayer;
import com.genersoft.iot.vmp.gb28181.bean.*;
-import com.genersoft.iot.vmp.gb28181.bean.Device;
-import com.genersoft.iot.vmp.gb28181.bean.DeviceAlarm;
-import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel;
-import com.genersoft.iot.vmp.gb28181.bean.SsrcTransaction;
import com.genersoft.iot.vmp.gb28181.bean.command.PTZCommand;
import com.genersoft.iot.vmp.gb28181.event.SipSubscribe;
import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager;
@@ -38,11 +34,13 @@ import org.springframework.context.annotation.DependsOn;
import org.springframework.stereotype.Component;
import org.springframework.util.ObjectUtils;
-import javax.sip.*;
+import javax.sip.InvalidArgumentException;
+import javax.sip.ResponseEvent;
+import javax.sip.SipException;
+import javax.sip.SipFactory;
import javax.sip.header.CallIdHeader;
import javax.sip.message.Request;
import java.text.ParseException;
-import java.util.ArrayList;
import java.util.List;
/**
@@ -85,8 +83,8 @@ public class SIPCommander implements ISIPCommander {
@Override
- public void ptzZoomCmd(Device device, String channelId, int inOut, int zoomSpeed) throws InvalidArgumentException, ParseException, SipException {
- ptzCmd(device, channelId, 0, 0, inOut, 0, zoomSpeed);
+ public void ptzCmd(Device device, String channelId, PTZCommand ptzCommand) throws InvalidArgumentException, SipException, ParseException {
+
}
/**
@@ -120,39 +118,6 @@ public class SIPCommander implements ISIPCommander {
return builder.toString();
}
- /**
- * 云台控制,支持方向与缩放控制
- *
- * @param device 控制设备
- * @param channelId 预览通道
- * @param leftRight 镜头左移右移 0:停止 1:左移 2:右移
- * @param upDown 镜头上移下移 0:停止 1:上移 2:下移
- * @param inOut 镜头放大缩小 0:停止 1:缩小 2:放大
- * @param moveSpeed 镜头移动速度
- * @param zoomSpeed 镜头缩放速度
- */
- @Override
- public void ptzCmd(Device device, String channelId, int leftRight, int upDown, int inOut, int moveSpeed,
- int zoomSpeed) throws InvalidArgumentException, SipException, ParseException {
- String cmdStr = SipUtils.cmdString(leftRight, upDown, inOut, moveSpeed, zoomSpeed);
- StringBuilder ptzXml = new StringBuilder(200);
- String charset = device.getCharset();
- ptzXml.append("\r\n");
- ptzXml.append("\r\n");
- ptzXml.append("DeviceControl\r\n");
- ptzXml.append("" + (int) ((Math.random() * 9 + 1) * 100000) + "\r\n");
- ptzXml.append("" + channelId + "\r\n");
- ptzXml.append("" + cmdStr + "\r\n");
- ptzXml.append("\r\n");
- ptzXml.append("5\r\n");
- ptzXml.append("\r\n");
- ptzXml.append("\r\n");
-
- Request request = headerProvider.createMessageRequest(device, ptzXml.toString(), SipUtils.getNewViaTag(), SipUtils.getNewFromTag(), null, sipSender.getNewCallIdHeader(sipLayer.getLocalIp(device.getLocalIp()),device.getTransport()));
-
- sipSender.transmitRequest(sipLayer.getLocalIp(device.getLocalIp()),request);
- }
-
/**
* 前端控制,包括PTZ指令、FI指令、预置位指令、巡航指令、扫描指令和辅助开关指令
*
@@ -634,28 +599,7 @@ public class SIPCommander implements ISIPCommander {
* @param device 视频设备
*/
@Override
- public void audioBroadcastCmd(Device device) throws InvalidArgumentException, SipException, ParseException {
-
- StringBuffer broadcastXml = new StringBuffer(200);
- String charset = device.getCharset();
- broadcastXml.append("\r\n");
- broadcastXml.append("\r\n");
- broadcastXml.append("Broadcast\r\n");
- broadcastXml.append("" + SipUtils.getNewSn() + "\r\n");
- broadcastXml.append("" + sipConfig.getId() + "\r\n");
- broadcastXml.append("" + device.getDeviceId() + "\r\n");
- broadcastXml.append("\r\n");
-
-
-
- Request request = headerProvider.createMessageRequest(device, broadcastXml.toString(), SipUtils.getNewViaTag(), SipUtils.getNewFromTag(), null,sipSender.getNewCallIdHeader(sipLayer.getLocalIp(device.getLocalIp()),device.getTransport()));
- sipSender.transmitRequest(sipLayer.getLocalIp(device.getLocalIp()), request);
-
- }
-
- @Override
- public void audioBroadcastCmd(Device device, SipSubscribe.Event errorEvent) throws InvalidArgumentException, SipException, ParseException {
-
+ public void audioBroadcastCmd(Device device, String channelId, SipSubscribe.Event okEvent, SipSubscribe.Event errorEvent) throws InvalidArgumentException, SipException, ParseException {
StringBuffer broadcastXml = new StringBuffer(200);
String charset = device.getCharset();
broadcastXml.append("\r\n");
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java
index ef59494e..fa7e52f8 100755
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java
@@ -93,6 +93,9 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform {
@Autowired
private GitUtil gitUtil;
+ @Autowired
+ private SipLayer sipLayer;
+
@Override
public void register(ParentPlatform parentPlatform, SipSubscribe.Event errorEvent , SipSubscribe.Event okEvent) throws InvalidArgumentException, ParseException, SipException {
register(parentPlatform, null, null, errorEvent, okEvent, true);
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java
index 9ec18afc..daca5c78 100644
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java
@@ -11,7 +11,6 @@ import com.genersoft.iot.vmp.gb28181.transmit.event.request.ISIPRequestProcessor
import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent;
import com.genersoft.iot.vmp.media.zlm.IStreamSendManager;
import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory;
-import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.service.IDeviceService;
import com.genersoft.iot.vmp.service.IMediaServerService;
@@ -20,6 +19,7 @@ import com.genersoft.iot.vmp.service.bean.RequestPushStreamMsg;
import com.genersoft.iot.vmp.service.redisMsg.RedisGbPlayMsgListener;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
+import gov.nist.javax.sip.message.SIPRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;
@@ -29,9 +29,6 @@ import org.springframework.stereotype.Component;
import javax.sip.RequestEvent;
import javax.sip.address.SipURI;
import javax.sip.header.CallIdHeader;
-import javax.sip.header.FromHeader;
-import javax.sip.header.HeaderAddress;
-import javax.sip.header.ToHeader;
import java.util.HashMap;
import java.util.Map;
@@ -92,7 +89,10 @@ public class AckRequestProcessor extends SIPRequestProcessorParent implements In
public void process(RequestEvent evt) {
CallIdHeader callIdHeader = (CallIdHeader)evt.getRequest().getHeader(CallIdHeader.NAME);
dynamicTask.stop(callIdHeader.getCallId());
- String channelId = ((SipURI) ((HeaderAddress) evt.getRequest().getHeader(ToHeader.NAME)).getAddress().getURI()).getUser();
+ SIPRequest sipRequest = (SIPRequest) evt.getRequest();
+ String toUserId = ((SipURI) sipRequest.getToHeader().getAddress().getURI()).getUser();
+ String fromUserId = ((SipURI) sipRequest.getFromHeader().getAddress().getURI()).getUser();
+
SendRtpItem sendRtpItem = streamSendManager.getByCallId(callIdHeader.getCallId());
if (sendRtpItem == null) {
logger.warn("[收到ACK]:未找到来自{},目标为({})的推流信息",fromUserId, toUserId);
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java
index 7da6d466..da3d313c 100755
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java
@@ -123,7 +123,7 @@ public class ByeRequestProcessor extends SIPRequestProcessorParent implements In
SendRtpItem sendRtpItem = streamSendManager.getByCallId(callIdHeader.getCallId());
if (sendRtpItem != null){
logger.info("[收到bye] 来自{}, 停止通道:{}", sendRtpItem.getDestId(), sendRtpItem.getChannelId());
- String streamId = sendRtpItem.getStreamId();
+ String streamId = sendRtpItem.getStream();
Map param = new HashMap<>();
param.put("vhost","__defaultVhost__");
param.put("app",sendRtpItem.getApp());
@@ -145,32 +145,32 @@ public class ByeRequestProcessor extends SIPRequestProcessorParent implements In
ParentPlatform platform = platformService.queryPlatformByServerGBId(sendRtpItem.getDestId());
if (platform != null) {
MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(0,
- sendRtpItem.getApp(), sendRtpItem.getStreamId(), sendRtpItem.getChannelId(),
+ sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getChannelId(),
sendRtpItem.getDestId(), platform.getName(), userSetting.getServerId(), sendRtpItem.getMediaServerId());
messageForPushChannel.setPlatFormIndex(platform.getId());
redisCatchStorage.sendPlatformStopPlayMsg(messageForPushChannel);
}else {
- logger.info("[上级平台停止观看] 未找到平台{}的信息,发送redis消息失败", sendRtpItem.getPlatformId());
+ logger.info("[上级平台停止观看] 未找到平台{}的信息,发送redis消息失败", sendRtpItem.getDestId());
}
}
- AudioBroadcastCatch audioBroadcastCatch = audioBroadcastManager.get(sendRtpItem.getDeviceId(), sendRtpItem.getChannelId());
+ AudioBroadcastCatch audioBroadcastCatch = audioBroadcastManager.get(sendRtpItem.getSourceId(), sendRtpItem.getChannelId());
if (audioBroadcastCatch != null && audioBroadcastCatch.getSipTransactionInfo().getCallId().equals(callIdHeader.getCallId())) {
// 来自上级平台的停止对讲
- logger.info("[停止对讲] 来自上级,平台:{}, 通道:{}", sendRtpItem.getDeviceId(), sendRtpItem.getChannelId());
- audioBroadcastManager.del(sendRtpItem.getDeviceId(), sendRtpItem.getChannelId());
+ logger.info("[停止对讲] 来自上级,平台:{}, 通道:{}", sendRtpItem.getSourceId(), sendRtpItem.getChannelId());
+ audioBroadcastManager.del(sendRtpItem.getSourceId(), sendRtpItem.getChannelId());
}
int totalReaderCount = zlmServerFactory.totalReaderCount(mediaInfo, sendRtpItem.getApp(), streamId);
if (totalReaderCount <= 0) {
logger.info("[收到bye] {} 无其它观看者,通知设备停止推流", streamId);
if (sendRtpItem.getPlayType().equals(InviteStreamType.PLAY)) {
- Device device = deviceService.getDevice(sendRtpItem.getDeviceId());
+ Device device = deviceService.getDevice(sendRtpItem.getSourceId());
if (device == null) {
logger.info("[收到bye] {} 通知设备停止推流时未找到设备信息", streamId);
}
try {
- logger.info("[停止点播] {}/{}", sendRtpItem.getDeviceId(), sendRtpItem.getChannelId());
+ logger.info("[停止点播] {}/{}", sendRtpItem.getSourceId(), sendRtpItem.getChannelId());
cmder.streamByeCmd(device, sendRtpItem.getChannelId(), streamId, null);
} catch (InvalidArgumentException | ParseException | SipException |
SsrcTransactionNotFoundException e) {
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java
index 7a3295ee..615a93d3 100755
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java
@@ -932,7 +932,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
sipResponse = responseSdpAck(request, content.toString(), parentPlatform);
AudioBroadcastCatch audioBroadcastCatch = audioBroadcastManager.get(device.getDeviceId(), sendRtpItem.getChannelId());
-
+ audioBroadcastCatch.setCallId(request.getCallIdHeader().getCallId());
audioBroadcastCatch.setStatus(AudioBroadcastCatchStatus.Ok);
audioBroadcastCatch.setSipTransactionInfoByRequset(sipResponse);
audioBroadcastManager.update(audioBroadcastCatch);
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/info/InfoRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/info/InfoRequestProcessor.java
index 8a394fca..adaf0030 100755
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/info/InfoRequestProcessor.java
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/info/InfoRequestProcessor.java
@@ -114,7 +114,7 @@ public class InfoRequestProcessor extends SIPRequestProcessorParent implements I
if ("Application".equalsIgnoreCase(contentType) && "MANSRTSP".equalsIgnoreCase(contentSubType)) {
SendRtpItem sendRtpItem = streamSendManager.getByCallId(callIdHeader.getCallId());
- String streamId = sendRtpItem.getStreamId();
+ String streamId = sendRtpItem.getStream();
InviteInfo inviteInfo = inviteStreamService.getInviteInfoByStream(InviteSessionType.PLAYBACK, streamId);
if (null == inviteInfo) {
responseAck(request, Response.NOT_FOUND, "stream " + streamId + " not found");
diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/IStreamSendManager.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/IStreamSendManager.java
index 275ca7c8..6983fbb7 100644
--- a/src/main/java/com/genersoft/iot/vmp/media/zlm/IStreamSendManager.java
+++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/IStreamSendManager.java
@@ -1,7 +1,6 @@
package com.genersoft.iot.vmp.media.zlm;
import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
-import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import java.util.List;
@@ -30,5 +29,4 @@ public interface IStreamSendManager {
void remove(SendRtpItem sendRtpItem);
void remove(List sendRtpItemList);
-
}
diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java
index 008560f4..dc30c9ea 100755
--- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java
+++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java
@@ -395,9 +395,7 @@ public class ZLMHttpHookListener {
mediaServerService.removeCount(param.getMediaServerId());
}
- int updateStatusResult = streamProxyService.updateStatus(param.isRegist(), param.getApp(), param.getStream());
- if (updateStatusResult > 0) {
- }
+ streamProxyService.updatePullingStatus(param.isRegist(), param.getApp(), param.getStream());
if ("rtp".equals(param.getApp())) {
if (!param.isRegist()) {
InviteInfo inviteInfo = inviteStreamService.getInviteInfoByStream(null, param.getStream());
diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/impl/StreamSendManagerImpl.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/impl/StreamSendManagerImpl.java
index 5ed2e031..b55a9ce6 100644
--- a/src/main/java/com/genersoft/iot/vmp/media/zlm/impl/StreamSendManagerImpl.java
+++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/impl/StreamSendManagerImpl.java
@@ -1,11 +1,8 @@
package com.genersoft.iot.vmp.media.zlm.impl;
-import com.genersoft.iot.vmp.common.VideoManagerConstants;
import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
import com.genersoft.iot.vmp.media.zlm.IStreamSendManager;
import com.genersoft.iot.vmp.utils.redis.RedisUtil;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
@@ -39,8 +36,8 @@ public class StreamSendManagerImpl implements IStreamSendManager {
if (sendRtpItem.getCallId() != null) {
redisTemplate.opsForValue().set(getCallIdKey(sendRtpItem.getCallId()), dateId);
}
- if (sendRtpItem.getApp() != null && sendRtpItem.getStreamId() != null) {
- redisTemplate.opsForSet().add(getAppAndStreamKey(sendRtpItem.getApp(), sendRtpItem.getStreamId()), dateId);
+ if (sendRtpItem.getApp() != null && sendRtpItem.getStream() != null) {
+ redisTemplate.opsForSet().add(getAppAndStreamKey(sendRtpItem.getApp(), sendRtpItem.getStream()), dateId);
}
if (sendRtpItem.getMediaServerId() != null) {
redisTemplate.opsForSet().add(getMediaServerIdKey(sendRtpItem.getMediaServerId()), dateId);
@@ -156,8 +153,8 @@ public class StreamSendManagerImpl implements IStreamSendManager {
if (sendRtpItem.getCallId() != null) {
redisTemplate.delete(getCallIdKey(sendRtpItem.getCallId()));
}
- if (sendRtpItem.getApp() != null && sendRtpItem.getStreamId() != null) {
- redisTemplate.opsForSet().remove(getAppAndStreamKey(sendRtpItem.getApp(), sendRtpItem.getStreamId()), dateId);
+ if (sendRtpItem.getApp() != null && sendRtpItem.getStream() != null) {
+ redisTemplate.opsForSet().remove(getAppAndStreamKey(sendRtpItem.getApp(), sendRtpItem.getStream()), dateId);
}
if (sendRtpItem.getMediaServerId() != null) {
redisTemplate.opsForSet().remove(getMediaServerIdKey(sendRtpItem.getMediaServerId()), dateId);
diff --git a/src/main/java/com/genersoft/iot/vmp/service/IMediaService.java b/src/main/java/com/genersoft/iot/vmp/service/IMediaService.java
index d6cfdb4e..3b528e27 100755
--- a/src/main/java/com/genersoft/iot/vmp/service/IMediaService.java
+++ b/src/main/java/com/genersoft/iot/vmp/service/IMediaService.java
@@ -1,6 +1,5 @@
package com.genersoft.iot.vmp.service;
-import com.alibaba.fastjson2.JSONArray;
import com.genersoft.iot.vmp.common.StreamInfo;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
@@ -34,14 +33,6 @@ public interface IMediaService {
*/
StreamInfo getStreamInfoByAppAndStream(MediaServerItem mediaServerItem, String app, String stream, Object tracks, String callId);
- /**
- * 根据应用名和流ID获取播放地址, 只是地址拼接,返回的ip使用远程访问ip,适用与zlm与wvp在一台主机的情况
- * @param app
- * @param stream
- * @return
- */
- StreamInfo getStreamInfoByAppAndStream(MediaServerItem mediaInfo, String app, String stream, Object tracks, String addr, String callId);
-
/**
* 查看流是否已经注册
*/
diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceServiceImpl.java
index 41bac415..3b16b5aa 100755
--- a/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceServiceImpl.java
+++ b/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceServiceImpl.java
@@ -12,16 +12,11 @@ import com.genersoft.iot.vmp.gb28181.task.ISubscribeTask;
import com.genersoft.iot.vmp.gb28181.task.impl.CatalogSubscribeTask;
import com.genersoft.iot.vmp.gb28181.task.impl.MobilePositionSubscribeTask;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander;
-import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.response.cmd.CatalogResponseMessageHandler;
-import com.genersoft.iot.vmp.service.*;
-import com.genersoft.iot.vmp.service.bean.Group;
+import com.genersoft.iot.vmp.media.zlm.IStreamSendManager;
import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
-import com.genersoft.iot.vmp.service.IDeviceChannelService;
-import com.genersoft.iot.vmp.service.IDeviceService;
-import com.genersoft.iot.vmp.service.IInviteStreamService;
-import com.genersoft.iot.vmp.service.IMediaServerService;
+import com.genersoft.iot.vmp.service.*;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.dao.DeviceChannelMapper;
import com.genersoft.iot.vmp.storager.dao.DeviceMapper;
@@ -36,7 +31,6 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
import org.springframework.stereotype.Service;
import org.springframework.transaction.TransactionDefinition;
-import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.ObjectUtils;
@@ -56,8 +50,6 @@ public class DeviceServiceImpl implements IDeviceService {
private final static Logger logger = LoggerFactory.getLogger(DeviceServiceImpl.class);
- @Autowired
- private SIPCommander cmder;
@Autowired
private DynamicTask dynamicTask;
@@ -110,6 +102,9 @@ public class DeviceServiceImpl implements IDeviceService {
@Autowired
private ZLMRESTfulUtils zlmresTfulUtils;
+ @Autowired
+ private IStreamSendManager streamSendManager;
+
@Override
public void online(Device device, SipTransactionInfo sipTransactionInfo) {
logger.info("[设备上线] deviceId:{}->{}:{}", device.getDeviceId(), device.getIp(), device.getPort());
@@ -250,13 +245,17 @@ public class DeviceServiceImpl implements IDeviceService {
removeCatalogSubscribe(device, null);
removeMobilePositionSubscribe(device, null);
+ // 清理语音对讲
List audioBroadcastCatches = audioBroadcastManager.get(deviceId);
- if (audioBroadcastCatches.size() > 0) {
+ if (!audioBroadcastCatches.isEmpty()) {
for (AudioBroadcastCatch audioBroadcastCatch : audioBroadcastCatches) {
-
- SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(deviceId, audioBroadcastCatch.getChannelId(), null, null);
+ if (ObjectUtils.isEmpty(audioBroadcastCatch.getCallId())) {
+ audioBroadcastManager.del(deviceId, audioBroadcastCatch.getChannelId());
+ continue;
+ }
+ SendRtpItem sendRtpItem = streamSendManager.getByCallId(audioBroadcastCatch.getCallId());
if (sendRtpItem != null) {
- redisCatchStorage.deleteSendRTPServer(deviceId, sendRtpItem.getChannelId(), null, null);
+ streamSendManager.remove(sendRtpItem);
MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());
Map param = new HashMap<>();
param.put("vhost", "__defaultVhost__");
@@ -264,8 +263,8 @@ public class DeviceServiceImpl implements IDeviceService {
param.put("stream", sendRtpItem.getStream());
zlmresTfulUtils.stopSendRtp(mediaInfo, param);
}
-
audioBroadcastManager.del(deviceId, audioBroadcastCatch.getChannelId());
+
}
}
}
diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServiceImpl.java
index a2ff2525..8c0c506b 100755
--- a/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServiceImpl.java
+++ b/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServiceImpl.java
@@ -1,6 +1,5 @@
package com.genersoft.iot.vmp.service.impl;
-import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONArray;
import com.alibaba.fastjson2.JSONObject;
import com.genersoft.iot.vmp.common.StreamInfo;
@@ -13,8 +12,6 @@ import com.genersoft.iot.vmp.media.zlm.dto.StreamPush;
import com.genersoft.iot.vmp.service.IMediaServerService;
import com.genersoft.iot.vmp.service.IMediaService;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
-import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
-import com.genersoft.iot.vmp.service.IMediaService;
import com.genersoft.iot.vmp.storager.dao.StreamProxyMapper;
import com.genersoft.iot.vmp.storager.dao.StreamPushMapper;
import org.springframework.beans.factory.annotation.Autowired;
@@ -142,3 +139,4 @@ public class MediaServiceImpl implements IMediaService {
return result;
}
}
+
diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/PlatformServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/PlatformServiceImpl.java
index ef49145a..63a9d675 100755
--- a/src/main/java/com/genersoft/iot/vmp/service/impl/PlatformServiceImpl.java
+++ b/src/main/java/com/genersoft/iot/vmp/service/impl/PlatformServiceImpl.java
@@ -1,12 +1,8 @@
package com.genersoft.iot.vmp.service.impl;
-import com.genersoft.iot.vmp.common.CommonGbChannel;
-import com.genersoft.iot.vmp.common.VideoManagerConstants;
import com.alibaba.fastjson2.JSONObject;
-import com.genersoft.iot.vmp.common.InviteInfo;
-import com.genersoft.iot.vmp.common.InviteSessionStatus;
-import com.genersoft.iot.vmp.common.InviteSessionType;
import com.baomidou.dynamic.datasource.annotation.DS;
+import com.genersoft.iot.vmp.common.*;
import com.genersoft.iot.vmp.conf.DynamicTask;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.conf.exception.ControllerException;
@@ -19,26 +15,19 @@ import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommanderFroPlatform;
import com.genersoft.iot.vmp.gb28181.utils.SipUtils;
import com.genersoft.iot.vmp.media.zlm.IStreamSendManager;
import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils;
-import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe;
-import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory;
-import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange;
-import com.genersoft.iot.vmp.gb28181.utils.SipUtils;
import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory;
+import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam;
-import com.genersoft.iot.vmp.service.IInviteStreamService;
-import com.genersoft.iot.vmp.service.IMediaServerService;
-import com.genersoft.iot.vmp.service.IPlatformChannelService;
-import com.genersoft.iot.vmp.service.IPlatformService;
-import com.genersoft.iot.vmp.service.IPlayService;
+import com.genersoft.iot.vmp.service.*;
import com.genersoft.iot.vmp.service.bean.*;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
-import com.genersoft.iot.vmp.storager.dao.*;
+import com.genersoft.iot.vmp.storager.dao.CommonChannelMapper;
+import com.genersoft.iot.vmp.storager.dao.ParentPlatformMapper;
import com.genersoft.iot.vmp.utils.DateUtil;
import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
import com.github.pagehelper.PageHelper;
import com.github.pagehelper.PageInfo;
-import gov.nist.javax.sip.message.SIPRequest;
import gov.nist.javax.sip.message.SIPResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -48,17 +37,8 @@ import org.springframework.stereotype.Service;
import javax.sdp.*;
import javax.sip.InvalidArgumentException;
import javax.sip.ResponseEvent;
-import javax.sip.PeerUnavailableException;
import javax.sip.SipException;
import java.text.ParseException;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
import java.util.*;
/**
@@ -86,8 +66,6 @@ public class PlatformServiceImpl implements IPlatformService {
@Autowired
private IStreamSendManager streamSendManager;
- @Autowired
- private IRedisCatchStorage redisCatchStorage;
@Autowired
private SSRCFactory ssrcFactory;
diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java
index 263df61b..10f2e121 100755
--- a/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java
+++ b/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java
@@ -18,51 +18,25 @@ import com.genersoft.iot.vmp.gb28181.event.SipSubscribe;
import com.genersoft.iot.vmp.gb28181.session.AudioBroadcastManager;
import com.genersoft.iot.vmp.gb28181.session.SSRCFactory;
import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager;
-import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform;
-import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander;
-import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommanderFroPlatform;
+import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform;
import com.genersoft.iot.vmp.gb28181.utils.SipUtils;
-import com.genersoft.iot.vmp.media.zlm.IStreamSendManager;
-import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils;
-import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory;
-import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe;
-import com.genersoft.iot.vmp.media.zlm.dto.*;
import com.genersoft.iot.vmp.media.zlm.*;
import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory;
-import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange;
-import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
-import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory;
import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForRecordMp4;
import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.media.zlm.dto.hook.HookParam;
import com.genersoft.iot.vmp.media.zlm.dto.hook.OnRecordMp4HookParam;
import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam;
-import com.genersoft.iot.vmp.service.IInviteStreamService;
-import com.genersoft.iot.vmp.service.IMediaServerService;
-import com.genersoft.iot.vmp.service.IMediaService;
-import com.genersoft.iot.vmp.service.IPlayService;
-import com.genersoft.iot.vmp.service.bean.DownloadFileInfo;
-import com.genersoft.iot.vmp.service.bean.ErrorCallback;
-import com.genersoft.iot.vmp.service.bean.InviteErrorCode;
-import com.genersoft.iot.vmp.service.bean.SSRCInfo;
import com.genersoft.iot.vmp.service.*;
import com.genersoft.iot.vmp.service.bean.*;
-import com.genersoft.iot.vmp.service.bean.ErrorCallback;
-import com.genersoft.iot.vmp.service.bean.InviteErrorCode;
-import com.genersoft.iot.vmp.service.bean.RequestPushStreamMsg;
-import com.genersoft.iot.vmp.service.bean.SSRCInfo;
import com.genersoft.iot.vmp.service.redisMsg.RedisGbPlayMsgListener;
-import com.genersoft.iot.vmp.service.bean.DownloadFileInfo;
-import com.genersoft.iot.vmp.service.bean.ErrorCallback;
-import com.genersoft.iot.vmp.service.bean.InviteErrorCode;
-import com.genersoft.iot.vmp.service.bean.SSRCInfo;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
+import com.genersoft.iot.vmp.storager.dao.CloudRecordServiceMapper;
import com.genersoft.iot.vmp.storager.dao.DeviceChannelMapper;
import com.genersoft.iot.vmp.storager.dao.DeviceMapper;
-import com.genersoft.iot.vmp.storager.dao.CloudRecordServiceMapper;
import com.genersoft.iot.vmp.utils.CloudRecordUtils;
import com.genersoft.iot.vmp.utils.DateUtil;
import com.genersoft.iot.vmp.vmanager.bean.AudioBroadcastResult;
@@ -265,8 +239,8 @@ public class PlayServiceImpl implements IPlayService {
sendRtpItem.setApp("talk");
sendRtpItem.setStream(stream);
sendRtpItem.setSsrc(playSsrc);
- sendRtpItem.setDeviceId(device.getDeviceId());
- sendRtpItem.setPlatformId(device.getDeviceId());
+ sendRtpItem.setDestId(device.getDeviceId());
+ sendRtpItem.setSourceId(sipConfig.getId());
sendRtpItem.setChannelId(channelId);
sendRtpItem.setRtcp(false);
sendRtpItem.setMediaServerId(mediaServerItem.getId());
@@ -352,7 +326,7 @@ public class PlayServiceImpl implements IPlayService {
sendRtpItem.setFromTag(response.getFromTag());
sendRtpItem.setToTag(response.getToTag());
sendRtpItem.setCallId(response.getCallIdHeader().getCallId());
- redisCatchStorage.updateSendRTPSever(sendRtpItem);
+ streamSendManager.update(sendRtpItem);
streamSession.put(device.getDeviceId(), channelId, "talk",
sendRtpItem.getStream(), sendRtpItem.getSsrc(), sendRtpItem.getMediaServerId(),
@@ -453,8 +427,8 @@ public class PlayServiceImpl implements IPlayService {
streamSession.remove(device.getDeviceId(), channel.getChannelId(), ssrcInfo.getStream());
mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream());
// 取消订阅消息监听
- HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed("rtp", ssrcInfo.getStream(), true, "rtsp", mediaServerItem.getId());
- subscribe.removeSubscribe(hookSubscribe);
+ HookSubscribeForStreamChange hookSubscribeForStreamChange = HookSubscribeFactory.on_stream_changed("rtp", ssrcInfo.getStream(), true, "rtsp", mediaServerItem.getId());
+ subscribe.removeSubscribe(hookSubscribeForStreamChange);
}
}else {
logger.info("[点播超时] 收流超时 deviceId: {}, channelId: {},码流:{},端口:{}, SSRC: {}",
@@ -517,8 +491,8 @@ public class PlayServiceImpl implements IPlayService {
// 释放ssrc
mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
- streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream());
- inviteStreamService.removeInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, device.getDeviceId(), channelId);
+ streamSession.remove(device.getDeviceId(), channel.getChannelId(), ssrcInfo.getStream());
+ inviteStreamService.removeInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, device.getDeviceId(), channel.getChannelId());
try {
callback.run(InviteErrorCode.ERROR_FOR_SIGNALLING_ERROR.getCode(),
@@ -527,7 +501,7 @@ public class PlayServiceImpl implements IPlayService {
logger.warn("[invite] 发送回调失败", e);
}
try {
- inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId, null,
+ inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channel.getChannelId(), null,
InviteErrorCode.ERROR_FOR_RESET_SSRC.getCode(),
String.format("点播失败, 错误码: %s, %s", event.statusCode, event.msg), null);
}catch (Exception e) {
@@ -551,7 +525,7 @@ public class PlayServiceImpl implements IPlayService {
logger.warn("[invite] 发送回调失败", exception);
}
try {
- inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId, null,
+ inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channel.getChannelId(), null,
InviteErrorCode.ERROR_FOR_SIP_SENDING_FAILED.getCode(),
InviteErrorCode.ERROR_FOR_SIP_SENDING_FAILED.getMsg(), null);
}catch (Exception exception) {
@@ -1236,9 +1210,10 @@ public class PlayServiceImpl implements IPlayService {
event.call("开启语音广播的时候未找到通道");
return false;
}
+ AudioBroadcastCatch audioBroadcastCatchInCatch = audioBroadcastManager.get(device.getDeviceId(), channelId);
// 查询通道使用状态
- if (audioBroadcastManager.exit(device.getDeviceId(), channelId)) {
- SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(device.getDeviceId(), channelId, null, null);
+ if (audioBroadcastCatchInCatch != null && audioBroadcastCatchInCatch.getCallId() != null) {
+ SendRtpItem sendRtpItem = streamSendManager.getByCallId(audioBroadcastCatchInCatch.getCallId());
if (sendRtpItem != null && sendRtpItem.isOnlyAudio()) {
// 查询流是否存在,不存在则认为是异常状态
Boolean streamReady = zlmServerFactory.isStreamReady(mediaServerItem, sendRtpItem.getApp(), sendRtpItem.getStream());
@@ -1280,8 +1255,9 @@ public class PlayServiceImpl implements IPlayService {
@Override
public boolean audioBroadcastInUse(Device device, String channelId) {
- if (audioBroadcastManager.exit(device.getDeviceId(), channelId)) {
- SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(device.getDeviceId(), channelId, null, null);
+ AudioBroadcastCatch audioBroadcastCatch = audioBroadcastManager.get(device.getDeviceId(), channelId);
+ if (audioBroadcastCatch != null) {
+ SendRtpItem sendRtpItem = streamSendManager.getByCallId(audioBroadcastCatch.getCallId());
if (sendRtpItem != null && sendRtpItem.isOnlyAudio()) {
// 查询流是否存在,不存在则认为是异常状态
MediaServerItem mediaServerServiceOne = mediaServerService.getOne(sendRtpItem.getMediaServerId());
@@ -1300,34 +1276,38 @@ public class PlayServiceImpl implements IPlayService {
public void stopAudioBroadcast(String deviceId, String channelId) {
logger.info("[停止对讲] 设备:{}, 通道:{}", deviceId, channelId);
List audioBroadcastCatchList = new ArrayList<>();
+ Device device = deviceService.getDevice(deviceId);
+ if (device == null) {
+ return;
+ }
if (channelId == null) {
audioBroadcastCatchList.addAll(audioBroadcastManager.get(deviceId));
} else {
audioBroadcastCatchList.add(audioBroadcastManager.get(deviceId, channelId));
}
- if (audioBroadcastCatchList.size() > 0) {
+ if (!audioBroadcastCatchList.isEmpty()) {
for (AudioBroadcastCatch audioBroadcastCatch : audioBroadcastCatchList) {
- Device device = deviceService.getDevice(deviceId);
- if (device == null || audioBroadcastCatch == null) {
- return;
+ if (audioBroadcastCatch == null) {
+ continue;
}
- SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(deviceId, audioBroadcastCatch.getChannelId(), null, null);
- if (sendRtpItem != null) {
- redisCatchStorage.deleteSendRTPServer(deviceId, sendRtpItem.getChannelId(), null, null);
- MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());
- Map param = new HashMap<>();
- param.put("vhost", "__defaultVhost__");
- param.put("app", sendRtpItem.getApp());
- param.put("stream", sendRtpItem.getStream());
- zlmresTfulUtils.stopSendRtp(mediaInfo, param);
- try {
- cmder.streamByeCmdForDeviceInvite(device, sendRtpItem.getChannelId(), audioBroadcastCatch.getSipTransactionInfo(), null);
- } catch (InvalidArgumentException | ParseException | SipException |
- SsrcTransactionNotFoundException e) {
- logger.error("[消息发送失败] 发送语音喊话BYE失败");
+ if (audioBroadcastCatch.getCallId() != null) {
+ SendRtpItem sendRtpItem = streamSendManager.getByCallId(audioBroadcastCatch.getCallId());
+ if (sendRtpItem != null) {
+ streamSendManager.remove(sendRtpItem);
+ MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());
+ Map param = new HashMap<>();
+ param.put("vhost", "__defaultVhost__");
+ param.put("app", sendRtpItem.getApp());
+ param.put("stream", sendRtpItem.getStream());
+ zlmresTfulUtils.stopSendRtp(mediaInfo, param);
+ try {
+ cmder.streamByeCmdForDeviceInvite(device, sendRtpItem.getChannelId(), audioBroadcastCatch.getSipTransactionInfo(), null);
+ } catch (InvalidArgumentException | ParseException | SipException |
+ SsrcTransactionNotFoundException e) {
+ logger.error("[消息发送失败] 发送语音喊话BYE失败");
+ }
}
}
-
audioBroadcastManager.del(deviceId, channelId);
}
}
@@ -1513,8 +1493,8 @@ public class PlayServiceImpl implements IPlayService {
} else {
logger.error("RTP推流失败: {}, 参数:{}", jsonObject.getString("msg"), JSONObject.toJSONString(param));
if (sendRtpItem.isOnlyAudio()) {
- Device device = deviceService.getDevice(sendRtpItem.getDeviceId());
- AudioBroadcastCatch audioBroadcastCatch = audioBroadcastManager.get(sendRtpItem.getDeviceId(), sendRtpItem.getChannelId());
+ Device device = deviceService.getDevice(sendRtpItem.getDestId());
+ AudioBroadcastCatch audioBroadcastCatch = audioBroadcastManager.get(sendRtpItem.getDestId(), sendRtpItem.getChannelId());
if (audioBroadcastCatch != null) {
try {
cmder.streamByeCmd(device, sendRtpItem.getChannelId(), audioBroadcastCatch.getSipTransactionInfo(), null);
@@ -1551,8 +1531,9 @@ public class PlayServiceImpl implements IPlayService {
return;
}
// 查询通道使用状态
- if (audioBroadcastManager.exit(device.getDeviceId(), channelId)) {
- SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(device.getDeviceId(), channelId, null, null);
+ AudioBroadcastCatch audioBroadcastCatch = audioBroadcastManager.get(device.getDeviceId(), channelId);
+ if (audioBroadcastCatch != null && audioBroadcastCatch.getCallId() != null) {
+ SendRtpItem sendRtpItem = streamSendManager.getByCallId(audioBroadcastCatch.getCallId());
if (sendRtpItem != null && sendRtpItem.isOnlyAudio()) {
// 查询流是否存在,不存在则认为是异常状态
MediaServerItem mediaServer = mediaServerService.getOne(sendRtpItem.getMediaServerId());
@@ -1567,19 +1548,6 @@ public class PlayServiceImpl implements IPlayService {
}
}
- SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(device.getDeviceId(), channelId, stream, null);
- if (sendRtpItem != null) {
- MediaServerItem mediaServer = mediaServerService.getOne(sendRtpItem.getMediaServerId());
- Boolean streamReady = zlmServerFactory.isStreamReady(mediaServer, "rtp", sendRtpItem.getReceiveStream());
- if (streamReady) {
- logger.warn("[语音对讲] 进行中: {}", channelId);
- event.call("语音对讲进行中");
- return;
- } else {
- stopTalk(device, channelId);
- }
- }
-
talk(mediaServerItem, device, channelId, stream, (mediaServerItem1, hookParam) -> {
logger.info("[语音对讲] 收到设备发来的流");
}, eventResult -> {
@@ -1603,7 +1571,11 @@ public class PlayServiceImpl implements IPlayService {
@Override
public void stopTalk(Device device, String channelId, Boolean streamIsReady) {
logger.info("[语音对讲] 停止, {}/{}", device.getDeviceId(), channelId);
- SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(device.getDeviceId(), channelId, null, null);
+ AudioBroadcastCatch audioBroadcastCatch = audioBroadcastManager.get(device.getDeviceId(), channelId);
+ if (audioBroadcastCatch == null || audioBroadcastCatch.getCallId() == null) {
+ return;
+ }
+ SendRtpItem sendRtpItem = streamSendManager.getByCallId(audioBroadcastCatch.getCallId());
if (sendRtpItem == null) {
logger.info("[语音对讲] 停止失败, 未找到发送信息,可能已经停止");
return;
@@ -1635,7 +1607,7 @@ public class PlayServiceImpl implements IPlayService {
logger.info("[语音对讲] 停止消息发送失败,可能已经停止");
}
}
- redisCatchStorage.deleteSendRTPServer(device.getDeviceId(), channelId,null, null);
+ streamSendManager.remove(sendRtpItem);
}
@Override
diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamCloseResponseListener.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamCloseResponseListener.java
index 2c304c2a..7856523e 100755
--- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamCloseResponseListener.java
+++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamCloseResponseListener.java
@@ -103,7 +103,7 @@ public class RedisPushStreamCloseResponseListener implements MessageListener {
zlmServerFactory.stopSendRtpStream(mediaInfo, param);
if (InviteStreamType.PUSH == sendRtpItem.getPlayType()) {
MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(0,
- sendRtpItem.getApp(), sendRtpItem.getStreamId(), sendRtpItem.getChannelId(),
+ sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getChannelId(),
sendRtpItem.getDestId(), parentPlatform.getName(), userSetting.getServerId(), sendRtpItem.getMediaServerId());
messageForPushChannel.setPlatFormIndex(parentPlatform.getId());
redisCatchStorage.sendPlatformStopPlayMsg(messageForPushChannel);