From e1af52fb5eaf0bcac221ba11cc90da7cd50d2895 Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: Fri, 30 Sep 2022 15:51:28 +0800 Subject: [PATCH] =?UTF-8?q?=E6=9B=B4=E6=96=B0=E6=96=87=E6=A1=A3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../vmp/media/zlm/ZLMHttpHookListener.java | 11 +- .../vmp/media/zlm/ZLMRTPServerFactory.java | 17 ++ .../iot/vmp/service/impl/PlayServiceImpl.java | 263 +++++++++--------- .../vmanager/gb28181/play/PlayController.java | 8 + 4 files changed, 165 insertions(+), 134 deletions(-) diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java index 5194a9aa..960a99db 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java @@ -553,7 +553,6 @@ public class ZLMHttpHookListener { if (sendRtpItem == null) { // TODO 可能数据错误,重新开启语音通道 }else { - String is_Udp = sendRtpItem.isTcp() ? "0" : "1"; MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId()); logger.info("rtp/{}开始向上级推流, 目标={}:{},SSRC={}", sendRtpItem.getStreamId(), sendRtpItem.getIp(), sendRtpItem.getPort(), sendRtpItem.getSsrc()); Map param = new HashMap<>(12); @@ -570,7 +569,7 @@ public class ZLMHttpHookListener { if (sendRtpItem.isTcpActive()) { jsonObject = zlmrtpServerFactory.startSendRtpPassive(mediaInfo, param); } else { - param.put("is_udp", is_Udp); + param.put("is_udp", sendRtpItem.isTcp() ? "0" : "1"); param.put("dst_url", sendRtpItem.getIp()); param.put("dst_port", sendRtpItem.getPort()); jsonObject = zlmrtpServerFactory.startSendRtpStream(mediaInfo, param); @@ -581,8 +580,8 @@ public class ZLMHttpHookListener { } }else { // 开启语音对讲通道 - MediaServerItem mediaServerForMinimumLoad = mediaServerService.getMediaServerForMinimumLoad(); - playService.talk(mediaServerForMinimumLoad, device, channelId, (mediaServerItem, jsonObject)->{ + MediaServerItem mediaServerItem = mediaServerService.getOne(mediaServerId); + playService.talk(mediaServerItem, device, channelId, (mediaServer, jsonObject)->{ System.out.println("开始推流"); }, eventResult -> { System.out.println(eventResult.msg); @@ -644,7 +643,7 @@ public class ZLMHttpHookListener { } } } - if (!regist) { + if (!regist ) { List sendRtpItems = redisCatchStorage.querySendRTPServerByStream(stream); if (sendRtpItems.size() > 0) { for (SendRtpItem sendRtpItem : sendRtpItems) { @@ -657,7 +656,7 @@ public class ZLMHttpHookListener { if (platform != null) { commanderFroPlatform.streamByeCmd(platform, sendRtpItem); }else { - if (sendRtpItem.isOnlyAudio()) { + if ("talk".equals(app) && sendRtpItem.isOnlyAudio()) { AudioBroadcastCatch audioBroadcastCatch = audioBroadcastManager.get(sendRtpItem.getDeviceId(), sendRtpItem.getChannelId()); if (device != null && audioBroadcastCatch != null) { // cmder.streamByeCmd(device, sendRtpItem.getChannelId(), audioBroadcastCatch.getSipTransactionInfo(), null); diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java index 2bd9a952..502418cf 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java @@ -4,6 +4,7 @@ import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem; +import com.genersoft.iot.vmp.media.zlm.dto.MediaItem; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -323,4 +324,20 @@ public class ZLMRTPServerFactory { public void closeAllSendRtpStream() { } + + public MediaItem getMediaInfo(MediaServerItem mediaServerItem, String app, String stream) { + JSONObject json = zlmresTfulUtils.getMediaList(mediaServerItem, app, stream); + MediaItem mediaItem = null; + if (json == null || json.getInteger("code") != 0) { + return null; + } else { + JSONArray data = json.getJSONArray("data"); + if (data == null || data.size() == 0) { + return null; + }else { + mediaItem = JSONObject.toJavaObject(data.getJSONObject(0), MediaItem.class); + } + } + return mediaItem; + } } diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java index 68055a4d..4d814c69 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java @@ -14,6 +14,7 @@ import com.genersoft.iot.vmp.conf.exception.ControllerException; import com.genersoft.iot.vmp.conf.exception.SsrcTransactionNotFoundException; import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform; import com.genersoft.iot.vmp.gb28181.utils.SipUtils; +import com.genersoft.iot.vmp.media.zlm.dto.MediaItem; import com.genersoft.iot.vmp.service.IDeviceService; import com.genersoft.iot.vmp.vmanager.bean.AudioBroadcastResult; import com.genersoft.iot.vmp.vmanager.bean.ErrorCode; @@ -290,148 +291,154 @@ public class PlayServiceImpl implements IPlayService { logger.info("[对讲] 端口分配异常,deviceId={},channelId={},ssrcInfo={}", device.getDeviceId(), channelId, ssrcInfo); return; } - try { - String callId = SipUtils.getNewCallId(); - cmder.talkStreamCmd(mediaServerItem, ssrcInfo, device, channelId, callId, (MediaServerItem mediaServerItemInuse, JSONObject response) -> { - logger.info("[对讲] 流已生成, 开始推流: " + response.toJSONString()); - dynamicTask.stop(timeOutTaskKey); - // TODO 暂不做处理 - }, (MediaServerItem mediaServerItemInuse, JSONObject json) -> { - logger.info("[对讲] 开始推流: " + json.toJSONString()); - dynamicTask.stop(timeOutTaskKey); - // 获取远程IP端口 作为回复语音流的地址 - String ip = json.getString("ip"); - Integer port = json.getInteger("port"); - logger.info("[远端设备开始推流]{}/{}, 来自ip:{}, 端口:{}", device.getDeviceId(), channelId, ip, port); - // 查看平台推流是否就绪 - Boolean ready = zlmrtpServerFactory.isStreamReady(mediaServerItemInuse, "talk", stream); - if (!ready) { - try { - cmder.streamByeCmd(device, channelId, finalSsrcInfo.getStream(), null); - } catch (InvalidArgumentException | ParseException | SipException e) { - logger.error("[对讲超时], 发送BYE失败 {}", e.getMessage()); - } catch (SsrcTransactionNotFoundException e) { - timeoutCallback.run(); - mediaServerService.releaseSsrc(mediaServerItem.getId(), finalSsrcInfo.getSsrc()); - mediaServerService.closeRTPServer(mediaServerItem, finalSsrcInfo.getStream()); - streamSession.remove(device.getDeviceId(), channelId, finalSsrcInfo.getStream()); - } - }else { - SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(mediaServerItem, ip, port, ssrcInfo.getSsrc(), device.getDeviceId(), - device.getDeviceId(), channelId, - false); - sendRtpItem.setTcpActive(false); - if (sendRtpItem == null || sendRtpItem.getLocalPort() == 0) { - logger.warn("服务器端口资源不足"); - try { - cmder.streamByeCmd(device, channelId, finalSsrcInfo.getStream(), null); - } catch (InvalidArgumentException | ParseException | SipException e) { - logger.error("[对讲超时], 发送BYE失败 {}", e.getMessage()); - } catch (SsrcTransactionNotFoundException e) { - timeoutCallback.run(); - mediaServerService.releaseSsrc(mediaServerItem.getId(), finalSsrcInfo.getSsrc()); - mediaServerService.closeRTPServer(mediaServerItem, finalSsrcInfo.getStream()); - streamSession.remove(device.getDeviceId(), channelId, finalSsrcInfo.getStream()); - } - return; - } - sendRtpItem.setCallId(callId); - sendRtpItem.setPlayType(InviteStreamType.TALK); - sendRtpItem.setStatus(1); - sendRtpItem.setIp(ip); - sendRtpItem.setPort(port); - sendRtpItem.setTcpActive(false); - sendRtpItem.setStreamId(ssrcInfo.getStream()); - sendRtpItem.setApp("talk"); - sendRtpItem.setSsrc(ssrc); - redisCatchStorage.updateSendRTPSever(sendRtpItem); - - Map param = new HashMap<>(12); - param.put("vhost","__defaultVhost__"); - param.put("app",sendRtpItem.getApp()); - param.put("stream",sendRtpItem.getStreamId()); - param.put("ssrc", sendRtpItem.getSsrc()); - param.put("src_port", sendRtpItem.getLocalPort()); - param.put("pt", sendRtpItem.getPt()); - param.put("use_ps", sendRtpItem.isUsePs() ? "1" : "0"); - param.put("only_audio", sendRtpItem.isOnlyAudio() ? "1" : "0"); - JSONObject jsonObject = zlmrtpServerFactory.startSendRtpStream(mediaServerItemInuse, param); - System.out.println(11111); - System.out.println(jsonObject); - } - - }, (event) -> { -// ResponseEvent responseEvent = (ResponseEvent) event.event; -// String contentString = new String(responseEvent.getResponse().getRawContent()); -// // 获取ssrc -// int ssrcIndex = contentString.indexOf("y="); -// // 检查是否有y字段 -// if (ssrcIndex >= 0) { -// //ssrc规定长度为10字节,不取余下长度以避免后续还有“f=”字段 TODO 后续对不规范的非10位ssrc兼容 -// String ssrcInResponse = contentString.substring(ssrcIndex + 2, ssrcIndex + 12); -// // 查询到ssrc不一致且开启了ssrc校验则需要针对处理 -// if (ssrc.equals(ssrcInResponse)) { -// return; -// } -// logger.info("[对讲消息] 收到invite 200, 发现下级自定义了ssrc: {}", ssrcInResponse); -// if (!mediaServerItem.isRtpEnable() || device.isSsrcCheck()) { -// logger.info("[对讲消息] SSRC修正 {}->{}", ssrc, ssrcInResponse); + String callId = SipUtils.getNewCallId(); + boolean pushing = false; + // 查看设备是否已经在推流 +// MediaItem mediaItem = zlmrtpServerFactory.getMediaInfo(mediaServerItem, "rtp",ssrcInfo.getStream()); +// if (mediaItem != null) { +// SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(mediaServerItem, +// mediaItem.getOriginSock().getPeer_ip(), mediaItem.getOriginSock().getPeer_port(), ssrcInfo.getSsrc(), device.getDeviceId(), +// device.getDeviceId(), channelId, +// false); // -// if (!mediaServerItem.getSsrcConfig().checkSsrc(ssrcInResponse)) { -// // ssrc 不可用 -// // 释放ssrc +// sendRtpItem.setTcpActive(false); +// sendRtpItem.setCallId(callId); +// sendRtpItem.setPlayType(InviteStreamType.TALK); +// sendRtpItem.setStatus(1); +// sendRtpItem.setIp(mediaItem.getOriginSock().getPeer_ip()); +// sendRtpItem.setPort(mediaItem.getOriginSock().getPeer_port()); +// sendRtpItem.setTcpActive(false); +// sendRtpItem.setStreamId(ssrcInfo.getStream()); +// sendRtpItem.setApp("1000"); +// sendRtpItem.setStreamId("1000"); +// sendRtpItem.setSsrc(ssrc); +// sendRtpItem.setOnlyAudio(true); +// redisCatchStorage.updateSendRTPSever(sendRtpItem); +// +// Map param = new HashMap<>(12); +// param.put("vhost","__defaultVhost__"); +// param.put("app",sendRtpItem.getApp()); +// param.put("stream",sendRtpItem.getStreamId()); +// param.put("ssrc", sendRtpItem.getSsrc()); +// param.put("dst_url", sendRtpItem.getIp()); +// param.put("dst_port", sendRtpItem.getPort()); +// param.put("src_port", sendRtpItem.getLocalPort()); +// param.put("pt", sendRtpItem.getPt()); +// param.put("use_ps", sendRtpItem.isUsePs() ? "1" : "0"); +// param.put("is_udp", sendRtpItem.isTcp() ? "0" : "1"); +// param.put("only_audio", sendRtpItem.isOnlyAudio() ? "1" : "0"); +// JSONObject jsonObject = zlmrtpServerFactory.startSendRtpStream(mediaServerItem, param); +// System.out.println(2222); +// System.out.println(jsonObject); +// }else { + try { + cmder.talkStreamCmd(mediaServerItem, ssrcInfo, device, channelId, callId, (MediaServerItem mediaServerItemInuse, JSONObject response) -> { + logger.info("[对讲] 流已生成, 开始推流: " + response.toJSONString()); + dynamicTask.stop(timeOutTaskKey); + // TODO 暂不做处理 + }, (MediaServerItem mediaServerItemInuse, JSONObject json) -> { + logger.info("[对讲] 设备开始推流: " + json.toJSONString()); + dynamicTask.stop(timeOutTaskKey); + // 获取远程IP端口 作为回复语音流的地址 + String ip = json.getString("ip"); + Integer port = json.getInteger("port"); + logger.info("[设备开始推流]{}/{}, 来自ip:{}, 端口:{}", device.getDeviceId(), channelId, ip, port); + // 查看平台推流是否就绪 +// Boolean ready = zlmrtpServerFactory.isStreamReady(mediaServerItemInuse, "talk", stream); +// if (!ready) { +// try { +// cmder.streamByeCmd(device, channelId, finalSsrcInfo.getStream(), null); +// } catch (InvalidArgumentException | ParseException | SipException e) { +// logger.error("[对讲超时], 发送BYE失败 {}", e.getMessage()); +// } catch (SsrcTransactionNotFoundException e) { +// timeoutCallback.run(); // mediaServerService.releaseSsrc(mediaServerItem.getId(), finalSsrcInfo.getSsrc()); +// mediaServerService.closeRTPServer(mediaServerItem, finalSsrcInfo.getStream()); // streamSession.remove(device.getDeviceId(), channelId, finalSsrcInfo.getStream()); -// event.msg = "下级自定义了ssrc,但是此ssrc不可用"; -// event.statusCode = 400; -// errorEvent.response(event); +// } +// }else { +// try { +// Thread.sleep(1000); +// } catch (InterruptedException e) { +// throw new RuntimeException(e); +// } + SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(mediaServerItem, ip, port, ssrcInfo.getSsrc(), device.getDeviceId(), + device.getDeviceId(), channelId, + false); + + +// if (sendRtpItem.getLocalPort() == 0) { +// logger.warn("服务器端口资源不足"); +// try { +// cmder.streamByeCmd(device, channelId, finalSsrcInfo.getStream(), null); +// } catch (InvalidArgumentException | ParseException | SipException e) { +// logger.error("[对讲超时], 发送BYE失败 {}", e.getMessage()); +// } catch (SsrcTransactionNotFoundException e) { +// timeoutCallback.run(); +// mediaServerService.releaseSsrc(mediaServerItem.getId(), finalSsrcInfo.getSsrc()); +// mediaServerService.closeRTPServer(mediaServerItem, finalSsrcInfo.getStream()); +// streamSession.remove(device.getDeviceId(), channelId, finalSsrcInfo.getStream()); +// } // return; // } -// -// // 单端口模式streamId也有变化,需要重新设置监听 -// if (!mediaServerItem.isRtpEnable()) { -// // 添加订阅 -// HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed("rtp", stream, true, "rtsp", mediaServerItem.getId()); -// subscribe.removeSubscribe(hookSubscribe); -// hookSubscribe.getContent().put("stream", String.format("%08x", Integer.parseInt(ssrcInResponse)).toUpperCase()); -// subscribe.addSubscribe(hookSubscribe, (MediaServerItem mediaServerItemInUse, JSONObject response) -> { -// logger.info("[ZLM HOOK] ssrc修正后收到订阅消息: " + response.toJSONString()); -// dynamicTask.stop(timeOutTaskKey); -// // hook响应 -// onPublishHandlerForPlay(mediaServerItemInUse, response, device.getDeviceId(), channelId, uuid); -// hookEvent.response(mediaServerItemInUse, response); -// }); -// } -// // 关闭rtp server -// mediaServerService.closeRTPServer(mediaServerItem, finalSsrcInfo.getStream()); -// // 重新开启ssrc server -// mediaServerService.openRTPServer(mediaServerItem, finalSsrcInfo.getStream(), ssrcInResponse, device.isSsrcCheck(), false, finalSsrcInfo.getPort()); -// + sendRtpItem.setTcpActive(false); + sendRtpItem.setCallId(callId); + sendRtpItem.setPlayType(InviteStreamType.TALK); + sendRtpItem.setStatus(1); + sendRtpItem.setIp(ip); + sendRtpItem.setPort(port); + sendRtpItem.setTcpActive(false); + sendRtpItem.setApp("1000"); + sendRtpItem.setStreamId("1000"); + sendRtpItem.setSsrc(ssrc); + sendRtpItem.setOnlyAudio(true); + redisCatchStorage.updateSendRTPSever(sendRtpItem); + + Map param = new HashMap<>(12); + param.put("vhost","__defaultVhost__"); + param.put("app",sendRtpItem.getApp()); + param.put("stream",sendRtpItem.getStreamId()); + param.put("ssrc", sendRtpItem.getSsrc()); + param.put("dst_url", sendRtpItem.getIp()); + param.put("dst_port", sendRtpItem.getPort()); + param.put("src_port", sendRtpItem.getLocalPort()); + param.put("pt", sendRtpItem.getPt()); + param.put("use_ps", sendRtpItem.isUsePs() ? "1" : "0"); + param.put("is_udp", sendRtpItem.isTcp() ? "0" : "1"); + param.put("only_audio", sendRtpItem.isOnlyAudio() ? "1" : "0"); + JSONObject jsonObject = zlmrtpServerFactory.startSendRtpStream(mediaServerItemInuse, param); + System.out.println(11111); + System.out.println(sendRtpItem.getIp() + ":" + sendRtpItem.getPort()); +// System.out.println(jsonObject); // } -// } - }, (event) -> { + + }, (event) -> { + + }, (event) -> { + dynamicTask.stop(timeOutTaskKey); + mediaServerService.closeRTPServer(mediaServerItem, finalSsrcInfo.getStream()); + // 释放ssrc + mediaServerService.releaseSsrc(mediaServerItem.getId(), finalSsrcInfo.getSsrc()); + + streamSession.remove(device.getDeviceId(), channelId, finalSsrcInfo.getStream()); + errorEvent.response(event); + }); + } catch (InvalidArgumentException | SipException | ParseException e) { + + logger.error("[命令发送失败] 对讲消息: {}", e.getMessage()); dynamicTask.stop(timeOutTaskKey); mediaServerService.closeRTPServer(mediaServerItem, finalSsrcInfo.getStream()); // 释放ssrc mediaServerService.releaseSsrc(mediaServerItem.getId(), finalSsrcInfo.getSsrc()); streamSession.remove(device.getDeviceId(), channelId, finalSsrcInfo.getStream()); - errorEvent.response(event); - }); - } catch (InvalidArgumentException | SipException | ParseException e) { + SipSubscribe.EventResult eventResult = new SipSubscribe.EventResult(new CmdSendFailEvent(null)); + eventResult.msg = "命令发送失败"; + errorEvent.response(eventResult); + } +// } - logger.error("[命令发送失败] 对讲消息: {}", e.getMessage()); - dynamicTask.stop(timeOutTaskKey); - mediaServerService.closeRTPServer(mediaServerItem, finalSsrcInfo.getStream()); - // 释放ssrc - mediaServerService.releaseSsrc(mediaServerItem.getId(), finalSsrcInfo.getSsrc()); - - streamSession.remove(device.getDeviceId(), channelId, finalSsrcInfo.getStream()); - SipSubscribe.EventResult eventResult = new SipSubscribe.EventResult(new CmdSendFailEvent(null)); - eventResult.msg = "命令发送失败"; - errorEvent.response(eventResult); - } } diff --git a/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/play/PlayController.java b/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/play/PlayController.java index bcd8bf17..baccfcae 100644 --- a/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/play/PlayController.java +++ b/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/play/PlayController.java @@ -232,6 +232,14 @@ public class PlayController { } + @GetMapping("/1111") + public void broadcastApi1() { + MediaServerItem defaultMediaServer = mediaServerService.getMediaServerForMinimumLoad(); + Device device = storager.queryVideoDevice("34020000001320090001"); + playService.talk(defaultMediaServer, device, "34020000001370000001", null, null, null); + + } + @Operation(summary = "停止语音广播") @Parameter(name = "deviceId", description = "设备Id", required = true)