From a9bdb0a706e10d2dffb50ae5a8086dd744bbd976 Mon Sep 17 00:00:00 2001
From: 648540858 <648540858@qq.com>
Date: Tue, 10 May 2022 17:45:07 +0800
Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96=E8=AF=AD=E9=9F=B3=E5=B9=BF?=
 =?UTF-8?q?=E6=92=AD=E6=B5=81=E7=A8=8B?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit

---
 .../request/impl/AckRequestProcessor.java     | 15 ++++-
 .../request/impl/ByeRequestProcessor.java     |  8 +--
 .../request/impl/InviteRequestProcessor.java  | 64 +++++++++++--------
 .../iot/vmp/service/impl/PlayServiceImpl.java | 37 +++++------
 4 files changed, 71 insertions(+), 53 deletions(-)

diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java
index 6cc19a7f9..153a08aba 100644
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java
@@ -4,9 +4,8 @@ import com.alibaba.fastjson.JSON;
 import com.alibaba.fastjson.JSONObject;
 import com.genersoft.iot.vmp.common.StreamInfo;
 import com.genersoft.iot.vmp.conf.DynamicTask;
-import com.genersoft.iot.vmp.gb28181.bean.InviteStreamType;
-import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform;
-import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
+import com.genersoft.iot.vmp.gb28181.bean.*;
+import com.genersoft.iot.vmp.gb28181.session.AudioBroadcastManager;
 import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver;
 import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander;
 import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform;
@@ -82,6 +81,9 @@ public class AckRequestProcessor extends SIPRequestProcessorParent implements In
 	@Autowired
 	private ISIPCommanderForPlatform commanderForPlatform;
 
+	@Autowired
+	private AudioBroadcastManager audioBroadcastManager;
+
 
 	/**   
 	 * 处理  ACK请求
@@ -122,6 +124,13 @@ public class AckRequestProcessor extends SIPRequestProcessorParent implements In
 			if (jsonObject == null) {
 				logger.error("RTP推流失败: 请检查ZLM服务");
 			} else if (jsonObject.getInteger("code") == 0) {
+				if (sendRtpItem.isOnlyAudio()) {
+					AudioBroadcastCatch audioBroadcastCatch = audioBroadcastManager.get(sendRtpItem.getDeviceId(), sendRtpItem.getChannelId());
+					audioBroadcastCatch.setStatus(AudioBroadcastCatchStatus.Ok);
+					audioBroadcastCatch.setDialog((SIPDialog) evt.getDialog());
+					audioBroadcastCatch.setRequest((SIPRequest) evt.getRequest());
+					audioBroadcastManager.update(audioBroadcastCatch);
+				}
 				logger.info("RTP推流成功[ {}/{} ],{}->{}:{}, " ,param.get("app"), param.get("stream"), jsonObject.getString("local_port"), param.get("dst_url"), param.get("dst_port"));
 			} else {
 				logger.error("RTP推流失败: {}, 参数:{}",jsonObject.getString("msg"),JSONObject.toJSON(param));
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java
index c6116e3fd..7944787a0 100644
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java
@@ -91,7 +91,7 @@ public class ByeRequestProcessor extends SIPRequestProcessorParent implements In
 			if (dialog.getState().equals(DialogState.TERMINATED)) {
 				String platformGbId = ((SipURI) ((HeaderAddress) evt.getRequest().getHeader(FromHeader.NAME)).getAddress().getURI()).getUser();
 				String channelId = ((SipURI) ((HeaderAddress) evt.getRequest().getHeader(ToHeader.NAME)).getAddress().getURI()).getUser();
-				SendRtpItem sendRtpItem =  redisCatchStorage.querySendRTPServer(platformGbId, channelId, null, callIdHeader.getCallId());
+				SendRtpItem sendRtpItem =  redisCatchStorage.querySendRTPServer(platformGbId, null, null, callIdHeader.getCallId());
 				logger.info("收到bye, [{}/{}]", platformGbId, channelId);
 				if (sendRtpItem != null){
 					String streamId = sendRtpItem.getStreamId();
@@ -103,15 +103,15 @@ public class ByeRequestProcessor extends SIPRequestProcessorParent implements In
 					logger.info("收到bye:停止向上级推流:" + streamId);
 					MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());
 					zlmrtpServerFactory.stopSendRtpStream(mediaInfo, param);
-					redisCatchStorage.deleteSendRTPServer(platformGbId, channelId, callIdHeader.getCallId(), null);
+					redisCatchStorage.deleteSendRTPServer(platformGbId, sendRtpItem.getChannelId(), callIdHeader.getCallId(), null);
 					int totalReaderCount = zlmrtpServerFactory.totalReaderCount(mediaInfo, sendRtpItem.getApp(), streamId);
 					if (totalReaderCount <= 0) {
 						logger.info("收到bye: {} 无其它观看者,通知设备停止推流", streamId);
 						if (sendRtpItem.getPlayType().equals(InviteStreamType.PLAY)) {
-							cmder.streamByeCmd(sendRtpItem.getDeviceId(), channelId, streamId, null);
+							cmder.streamByeCmd(sendRtpItem.getDeviceId(), sendRtpItem.getChannelId(), streamId, null);
 						}
 						if (sendRtpItem.isOnlyAudio()) {
-							playService.stopAudioBroadcast(sendRtpItem.getDeviceId(), channelId);
+							playService.stopAudioBroadcast(sendRtpItem.getDeviceId(), sendRtpItem.getChannelId());
 						}
 						if (sendRtpItem.getPlayType().equals(InviteStreamType.PUSH)) {
 							MessageForPushChannel messageForPushChannel = new MessageForPushChannel();
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java
index dc77afcab..e3a9b2748 100644
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java
@@ -713,6 +713,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
 				String waiteStreamTimeoutTaskKey = "waite-stream-" + device.getDeviceId() + audioBroadcastCatch.getChannelId();
 				dynamicTask.startDelay(waiteStreamTimeoutTaskKey, ()->{
 					logger.info("等待推流超时: {}/{}", app, stream);
+					subscribe.removeSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, subscribeKey);
 					playService.stopAudioBroadcast(device.getDeviceId(), audioBroadcastCatch.getChannelId());
 					// 发送bye
 					try {
@@ -728,35 +729,42 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
 
 				subscribe.addSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, subscribeKey,
 						(MediaServerItem mediaServerItemInUse, JSONObject json)->{
-							sendRtpItem.setStatus(2);
-							dynamicTask.stop(waiteStreamTimeoutTaskKey);
-							redisCatchStorage.updateSendRTPSever(sendRtpItem);
-							StringBuffer content = new StringBuffer(200);
-							content.append("v=0\r\n");
-							content.append("o="+ audioBroadcastCatch.getChannelId() +" 0 0 IN IP4 "+mediaServerItem.getSdpIp()+"\r\n");
-							content.append("s=Play\r\n");
-							content.append("c=IN IP4 "+mediaServerItem.getSdpIp()+"\r\n");
-							content.append("t=0 0\r\n");
-							content.append("m=video "+ sendRtpItem.getLocalPort()+" RTP/AVP 8\r\n");
-							content.append("a=sendonly\r\n");
-							content.append("a=rtpmap:8 PCMA/8000\r\n");
-							content.append("y="+ finalSsrc + "\r\n");
-							content.append("f=v/////a/1/8/1\r\n");
+					logger.info("收到语音对讲推流");
+					try {
+						sendRtpItem.setStatus(2);
+						redisCatchStorage.updateSendRTPSever(sendRtpItem);
+						StringBuffer content = new StringBuffer(200);
+						content.append("v=0\r\n");
+						content.append("o="+ config.getId() +" "+ sdp.getOrigin().getSessionId() +" " + sdp.getOrigin().getSessionVersion()  + " IN IP4 "+mediaServerItem.getSdpIp()+"\r\n");
+						content.append("s=Play\r\n");
+						content.append("c=IN IP4 "+mediaServerItem.getSdpIp()+"\r\n");
+						content.append("t=0 0\r\n");
+						content.append("m=audio "+ sendRtpItem.getLocalPort()+" RTP/AVP 8\r\n");
+						content.append("a=sendonly\r\n");
+						content.append("a=rtpmap:8 PCMA/8000\r\n");
+						content.append("y="+ finalSsrc + "\r\n");
+						content.append("f=v/////a/1/8/1\r\n");
 
-							ParentPlatform parentPlatform = new ParentPlatform();
-							parentPlatform.setServerIP(device.getIp());
-							parentPlatform.setServerPort(device.getPort());
-							parentPlatform.setServerGBId(device.getDeviceId());
-							try {
-								responseSdpAck(evt, content.toString(), parentPlatform);
-							} catch (SipException e) {
-								throw new RuntimeException(e);
-							} catch (InvalidArgumentException e) {
-								throw new RuntimeException(e);
-							} catch (ParseException e) {
-								throw new RuntimeException(e);
-							}
-						});
+						ParentPlatform parentPlatform = new ParentPlatform();
+						parentPlatform.setServerIP(device.getIp());
+						parentPlatform.setServerPort(device.getPort());
+						parentPlatform.setServerGBId(device.getDeviceId());
+
+						responseSdpAck(evt, content.toString(), parentPlatform);
+						Dialog dialog = evt.getDialog();
+						audioBroadcastCatch.setDialog((SIPDialog) dialog);
+						audioBroadcastCatch.setRequest((SIPRequest) request);
+						audioBroadcastManager.update(audioBroadcastCatch);
+					} catch (SipException e) {
+						throw new RuntimeException(e);
+					} catch (InvalidArgumentException e) {
+						throw new RuntimeException(e);
+					} catch (ParseException e) {
+						throw new RuntimeException(e);
+					} catch (SdpParseException e) {
+						throw new RuntimeException(e);
+					}
+				});
 			}
 			String key = DeferredResultHolder.CALLBACK_CMD_BROADCAST + device.getDeviceId();
 			WVPResult<AudioBroadcastResult> wvpResult = new WVPResult<>();
diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java
index 934745e53..27aa7a9f9 100644
--- a/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java
+++ b/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java
@@ -675,26 +675,27 @@ public class PlayServiceImpl implements IPlayService {
         AudioBroadcastCatch audioBroadcastCatch = audioBroadcastManager.get(deviceId, channelId);
         if (audioBroadcastCatch != null) {
             audioBroadcastManager.del(deviceId, audioBroadcastCatch.getChannelId());
-        }
-        try {
-            SendRtpItem sendRtpItem =  redisCatchStorage.querySendRTPServer(deviceId, channelId, null, null);
-            if (sendRtpItem != null) {
-                redisCatchStorage.deleteSendRTPServer(deviceId, sendRtpItem.getChannelId(), null, null);
-                MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());
-                Map<String, Object> param = new HashMap<>();
-                param.put("vhost", "__defaultVhost__");
-                param.put("app", sendRtpItem.getApp());
-                param.put("stream", sendRtpItem.getStreamId());
-                zlmresTfulUtils.stopSendRtp(mediaInfo, param);
+            try {
+                SendRtpItem sendRtpItem =  redisCatchStorage.querySendRTPServer(deviceId, audioBroadcastCatch.getChannelId(), null, null);
+                if (sendRtpItem != null) {
+                    redisCatchStorage.deleteSendRTPServer(deviceId, sendRtpItem.getChannelId(), null, null);
+                    MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());
+                    Map<String, Object> param = new HashMap<>();
+                    param.put("vhost", "__defaultVhost__");
+                    param.put("app", sendRtpItem.getApp());
+                    param.put("stream", sendRtpItem.getStreamId());
+                    zlmresTfulUtils.stopSendRtp(mediaInfo, param);
+                }
+                if (audioBroadcastCatch.getStatus() == AudioBroadcastCatchStatus.Ok) {
+                    cmder.streamByeCmd(audioBroadcastCatch.getDialog(), audioBroadcastCatch.getRequest(), null);
+                }
+            } catch (SipException e) {
+                throw new RuntimeException(e);
+            } catch (ParseException e) {
+                throw new RuntimeException(e);
             }
-            if (audioBroadcastCatch.getStatus() == AudioBroadcastCatchStatus.Ok) {
-                cmder.streamByeCmd(audioBroadcastCatch.getDialog(), audioBroadcastCatch.getRequest(), null);
-            }
-        } catch (SipException e) {
-            throw new RuntimeException(e);
-        } catch (ParseException e) {
-            throw new RuntimeException(e);
         }
 
+
     }
 }