完善语音对讲

结构优化
648540858 2023-02-22 18:06:52 +08:00
parent a209d17390
commit 100252a253
6 changed files with 74 additions and 42 deletions

View File

@ -3,6 +3,7 @@ package com.genersoft.iot.vmp.conf.redis;
import com.genersoft.iot.vmp.common.VideoManagerConstants; import com.genersoft.iot.vmp.common.VideoManagerConstants;
import com.genersoft.iot.vmp.service.redisMsg.*; import com.genersoft.iot.vmp.service.redisMsg.*;
import com.genersoft.iot.vmp.utils.redis.FastJsonRedisSerializer;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cache.annotation.CachingConfigurerSupport; import org.springframework.cache.annotation.CachingConfigurerSupport;
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Bean;
@ -13,8 +14,6 @@ import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer; import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.serializer.StringRedisSerializer; import org.springframework.data.redis.serializer.StringRedisSerializer;
import com.genersoft.iot.vmp.utils.redis.FastJsonRedisSerializer;
/** /**
* @description:Redis使spring-data-redisapplication.ymlredis * @description:Redis使spring-data-redisapplication.ymlredis

View File

@ -1,6 +1,7 @@
package com.genersoft.iot.vmp.gb28181.bean; package com.genersoft.iot.vmp.gb28181.bean;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import gov.nist.javax.sip.message.SIPResponse; import gov.nist.javax.sip.message.SIPResponse;
/** /**
@ -10,10 +11,18 @@ import gov.nist.javax.sip.message.SIPResponse;
public class AudioBroadcastCatch { public class AudioBroadcastCatch {
public AudioBroadcastCatch(String deviceId, String channelId, AudioBroadcastCatchStatus status) { public AudioBroadcastCatch(String deviceId,
String channelId,
AudioBroadcastCatchStatus status,
MediaServerItem mediaServerItem,
String app,
String stream) {
this.deviceId = deviceId; this.deviceId = deviceId;
this.channelId = channelId; this.channelId = channelId;
this.status = status; this.status = status;
this.mediaServerItem = mediaServerItem;
this.app = app;
this.stream = stream;
} }
public AudioBroadcastCatch() { public AudioBroadcastCatch() {
@ -29,6 +38,21 @@ public class AudioBroadcastCatch {
*/ */
private String channelId; private String channelId;
/**
* 使
*/
private MediaServerItem mediaServerItem;
/**
*
*/
private String app;
/**
* ID
*/
private String stream;
/** /**
* 广 * 广
*/ */
@ -68,6 +92,30 @@ public class AudioBroadcastCatch {
return sipTransactionInfo; return sipTransactionInfo;
} }
public MediaServerItem getMediaServerItem() {
return mediaServerItem;
}
public void setMediaServerItem(MediaServerItem mediaServerItem) {
this.mediaServerItem = mediaServerItem;
}
public String getApp() {
return app;
}
public void setApp(String app) {
this.app = app;
}
public String getStream() {
return stream;
}
public void setStream(String stream) {
this.stream = stream;
}
public void setSipTransactionInfo(SipTransactionInfo sipTransactionInfo) { public void setSipTransactionInfo(SipTransactionInfo sipTransactionInfo) {
this.sipTransactionInfo = sipTransactionInfo; this.sipTransactionInfo = sipTransactionInfo;
} }

View File

@ -903,8 +903,8 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
// 非上级平台请求,查询是否设备请求(通常为接收语音广播的设备) // 非上级平台请求,查询是否设备请求(通常为接收语音广播的设备)
Device device = redisCatchStorage.getDevice(requesterId); Device device = redisCatchStorage.getDevice(requesterId);
AudioBroadcastCatch audioBroadcastCatch = audioBroadcastManager.get(requesterId, channelId); AudioBroadcastCatch broadcastCatch = audioBroadcastManager.get(requesterId, channelId);
if (audioBroadcastCatch == null) { if (broadcastCatch == null) {
logger.warn("来自设备的Invite请求非语音广播已忽略requesterId {}/{}", requesterId, channelId); logger.warn("来自设备的Invite请求非语音广播已忽略requesterId {}/{}", requesterId, channelId);
try { try {
responseAck(request, Response.FORBIDDEN); responseAck(request, Response.FORBIDDEN);
@ -915,13 +915,13 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
} }
if (device != null) { if (device != null) {
logger.info("收到设备" + requesterId + "的语音广播Invite请求"); logger.info("收到设备" + requesterId + "的语音广播Invite请求");
String key = VideoManagerConstants.BROADCAST_WAITE_INVITE + device.getDeviceId() + audioBroadcastCatch.getChannelId(); String key = VideoManagerConstants.BROADCAST_WAITE_INVITE + device.getDeviceId() + broadcastCatch.getChannelId();
dynamicTask.stop(key); dynamicTask.stop(key);
try { try {
responseAck(request, Response.TRYING); responseAck(request, Response.TRYING);
} catch (SipException | InvalidArgumentException | ParseException e) { } catch (SipException | InvalidArgumentException | ParseException e) {
logger.error("[命令发送失败] invite BAD_REQUEST: {}", e.getMessage()); logger.error("[命令发送失败] invite BAD_REQUEST: {}", e.getMessage());
playService.stopAudioBroadcast(device.getDeviceId(), audioBroadcastCatch.getChannelId()); playService.stopAudioBroadcast(device.getDeviceId(), broadcastCatch.getChannelId());
return; return;
} }
String contentString = new String(request.getRawContent()); String contentString = new String(request.getRawContent());
@ -977,7 +977,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
responseAck(request, Response.UNSUPPORTED_MEDIA_TYPE); // 不支持的格式发415 responseAck(request, Response.UNSUPPORTED_MEDIA_TYPE); // 不支持的格式发415
} catch (SipException | InvalidArgumentException | ParseException e) { } catch (SipException | InvalidArgumentException | ParseException e) {
logger.error("[命令发送失败] invite 不支持的媒体格式: {}", e.getMessage()); logger.error("[命令发送失败] invite 不支持的媒体格式: {}", e.getMessage());
playService.stopAudioBroadcast(device.getDeviceId(), audioBroadcastCatch.getChannelId()); playService.stopAudioBroadcast(device.getDeviceId(), broadcastCatch.getChannelId());
return; return;
} }
return; return;
@ -986,19 +986,9 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
logger.info("设备{}请求语音流,地址:{}:{}ssrc{}, {}", requesterId, addressStr, port, ssrc, logger.info("设备{}请求语音流,地址:{}:{}ssrc{}, {}", requesterId, addressStr, port, ssrc,
mediaTransmissionTCP ? (tcpActive? "TCP主动":"TCP被动") : "UDP"); mediaTransmissionTCP ? (tcpActive? "TCP主动":"TCP被动") : "UDP");
MediaServerItem mediaServerItem = playService.getNewMediaServerItem(device); MediaServerItem mediaServerItem = broadcastCatch.getMediaServerItem();
if (mediaServerItem == null) {
logger.warn("未找到可用的zlm");
try {
responseAck(request, Response.BUSY_HERE);
} catch (SipException | InvalidArgumentException | ParseException e) {
logger.error("[命令发送失败] invite 未找到可用的zlm: {}", e.getMessage());
playService.stopAudioBroadcast(device.getDeviceId(), audioBroadcastCatch.getChannelId());
}
return;
}
SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(mediaServerItem, addressStr, port, ssrc, requesterId, SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(mediaServerItem, addressStr, port, ssrc, requesterId,
device.getDeviceId(), audioBroadcastCatch.getChannelId(), device.getDeviceId(), broadcastCatch.getChannelId(),
mediaTransmissionTCP, false); mediaTransmissionTCP, false);
if (sendRtpItem == null) { if (sendRtpItem == null) {
@ -1007,22 +997,20 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
responseAck(request, Response.BUSY_HERE); responseAck(request, Response.BUSY_HERE);
} catch (SipException | InvalidArgumentException | ParseException e) { } catch (SipException | InvalidArgumentException | ParseException e) {
logger.error("[命令发送失败] invite 服务器端口资源不足: {}", e.getMessage()); logger.error("[命令发送失败] invite 服务器端口资源不足: {}", e.getMessage());
playService.stopAudioBroadcast(device.getDeviceId(), audioBroadcastCatch.getChannelId()); playService.stopAudioBroadcast(device.getDeviceId(), broadcastCatch.getChannelId());
return; return;
} }
return; return;
} }
String app = "broadcast";
String stream = device.getDeviceId() + "_" + audioBroadcastCatch.getChannelId();
CallIdHeader callIdHeader = (CallIdHeader) request.getHeader(CallIdHeader.NAME); CallIdHeader callIdHeader = (CallIdHeader) request.getHeader(CallIdHeader.NAME);
sendRtpItem.setPlayType(InviteStreamType.TALK); sendRtpItem.setPlayType(InviteStreamType.TALK);
sendRtpItem.setCallId(callIdHeader.getCallId()); sendRtpItem.setCallId(callIdHeader.getCallId());
sendRtpItem.setPlatformId(requesterId); sendRtpItem.setPlatformId(requesterId);
sendRtpItem.setStatus(1); sendRtpItem.setStatus(1);
sendRtpItem.setApp(app); sendRtpItem.setApp(broadcastCatch.getApp());
sendRtpItem.setStreamId(stream); sendRtpItem.setStreamId(broadcastCatch.getStream());
sendRtpItem.setPt(8); sendRtpItem.setPt(8);
sendRtpItem.setUsePs(false); sendRtpItem.setUsePs(false);
sendRtpItem.setRtcp(false); sendRtpItem.setRtcp(false);
@ -1034,22 +1022,22 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
redisCatchStorage.updateSendRTPSever(sendRtpItem); redisCatchStorage.updateSendRTPSever(sendRtpItem);
Boolean streamReady = zlmrtpServerFactory.isStreamReady(mediaServerItem, app, stream); Boolean streamReady = zlmrtpServerFactory.isStreamReady(mediaServerItem, broadcastCatch.getApp(), broadcastCatch.getStream());
if (streamReady) { if (streamReady) {
sendOk(device, sendRtpItem, sdp, request, mediaServerItem, mediaTransmissionTCP, ssrc); sendOk(device, sendRtpItem, sdp, request, mediaServerItem, mediaTransmissionTCP, ssrc);
}else { }else {
logger.warn("[语音通话] 未发现待推送的流,app={},stream={}", app, stream); logger.warn("[语音通话] 未发现待推送的流,app={},stream={}", broadcastCatch.getApp(), broadcastCatch.getStream());
try { try {
responseAck(request, Response.GONE); responseAck(request, Response.GONE);
} catch (SipException | InvalidArgumentException | ParseException e) { } catch (SipException | InvalidArgumentException | ParseException e) {
logger.error("[命令发送失败] 语音通话 回复410失败 {}", e.getMessage()); logger.error("[命令发送失败] 语音通话 回复410失败 {}", e.getMessage());
return; return;
} }
playService.stopAudioBroadcast(device.getDeviceId(), audioBroadcastCatch.getChannelId()); playService.stopAudioBroadcast(device.getDeviceId(), broadcastCatch.getChannelId());
} }
} catch (SdpException e) { } catch (SdpException e) {
logger.error("[SDP解析异常]", e); logger.error("[SDP解析异常]", e);
playService.stopAudioBroadcast(device.getDeviceId(), audioBroadcastCatch.getChannelId()); playService.stopAudioBroadcast(device.getDeviceId(), broadcastCatch.getChannelId());
} }
} else { } else {
logger.warn("来自无效设备/平台的请求"); logger.warn("来自无效设备/平台的请求");

View File

@ -274,18 +274,17 @@ public class ZLMHttpHookListener {
logger.info("[ZLM HOOK] 流注销, {}->{}->{}/{}", param.getMediaServerId(), param.getSchema(), param.getApp(), param.getStream()); logger.info("[ZLM HOOK] 流注销, {}->{}->{}/{}", param.getMediaServerId(), param.getSchema(), param.getApp(), param.getStream());
} }
MediaServerItem mediaInfo = mediaServerService.getOne(param.getMediaServerId());
JSONObject json = (JSONObject) JSON.toJSON(param); JSONObject json = (JSONObject) JSON.toJSON(param);
taskExecutor.execute(() -> { taskExecutor.execute(() -> {
ZlmHttpHookSubscribe.Event subscribe = this.subscribe.sendNotify(HookType.on_stream_changed, json); ZlmHttpHookSubscribe.Event subscribe = this.subscribe.sendNotify(HookType.on_stream_changed, json);
if (subscribe != null) { if (subscribe != null) {
MediaServerItem mediaInfo = mediaServerService.getOne(param.getMediaServerId());
if (mediaInfo != null) { if (mediaInfo != null) {
subscribe.response(mediaInfo, json); subscribe.response(mediaInfo, json);
} }
} }
// 流消失移除redis play // 流消失移除redis play
List<OnStreamChangedHookParam.MediaTrack> tracks = param.getTracks();
if (param.isRegist()) { if (param.isRegist()) {
if (param.getOriginType() == OriginType.RTMP_PUSH.ordinal() if (param.getOriginType() == OriginType.RTMP_PUSH.ordinal()
|| param.getOriginType() == OriginType.RTSP_PUSH.ordinal() || param.getOriginType() == OriginType.RTSP_PUSH.ordinal()
@ -343,7 +342,7 @@ public class ZLMHttpHookListener {
} }
// 开启语音对讲通道 // 开启语音对讲通道
try { try {
playService.audioBroadcastCmd(device, channelId, 60, (msg)->{ playService.audioBroadcastCmd(device, channelId, 60, mediaInfo, param.getApp(), param.getStream(), (msg)->{
logger.info("[语音对讲] 通道建立成功, device: {}, channel: {}", deviceId, channelId); logger.info("[语音对讲] 通道建立成功, device: {}, channel: {}", deviceId, channelId);
}); });
} catch (InvalidArgumentException | ParseException | SipException e) { } catch (InvalidArgumentException | ParseException | SipException e) {
@ -375,7 +374,7 @@ public class ZLMHttpHookListener {
if (sendRtpItem == null) { if (sendRtpItem == null) {
// TODO 可能数据错误,重新开启语音通道 // TODO 可能数据错误,重新开启语音通道
}else { }else {
MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId()); MediaServerItem mediaServerItem = mediaServerService.getOne(sendRtpItem.getMediaServerId());
logger.info("rtp/{}开始向上级推流, 目标={}:{}SSRC={}", sendRtpItem.getStreamId(), sendRtpItem.getIp(), sendRtpItem.getPort(), sendRtpItem.getSsrc()); logger.info("rtp/{}开始向上级推流, 目标={}:{}SSRC={}", sendRtpItem.getStreamId(), sendRtpItem.getIp(), sendRtpItem.getPort(), sendRtpItem.getSsrc());
Map<String, Object> sendParam = new HashMap<>(12); Map<String, Object> sendParam = new HashMap<>(12);
sendParam.put("vhost","__defaultVhost__"); sendParam.put("vhost","__defaultVhost__");
@ -389,12 +388,12 @@ public class ZLMHttpHookListener {
JSONObject jsonObject; JSONObject jsonObject;
if (sendRtpItem.isTcpActive()) { if (sendRtpItem.isTcpActive()) {
jsonObject = zlmrtpServerFactory.startSendRtpPassive(mediaInfo, sendParam); jsonObject = zlmrtpServerFactory.startSendRtpPassive(mediaServerItem, sendParam);
} else { } else {
sendParam.put("is_udp", sendRtpItem.isTcp() ? "0" : "1"); sendParam.put("is_udp", sendRtpItem.isTcp() ? "0" : "1");
sendParam.put("dst_url", sendRtpItem.getIp()); sendParam.put("dst_url", sendRtpItem.getIp());
sendParam.put("dst_port", sendRtpItem.getPort()); sendParam.put("dst_port", sendRtpItem.getPort());
jsonObject = zlmrtpServerFactory.startSendRtpStream(mediaInfo, sendParam); jsonObject = zlmrtpServerFactory.startSendRtpStream(mediaServerItem, sendParam);
} }
if (jsonObject != null && jsonObject.getInteger("code") == 0) { if (jsonObject != null && jsonObject.getInteger("code") == 0) {
logger.info("[语音对讲] 自动推流成功, device: {}, channel: {}", deviceId, channelId); logger.info("[语音对讲] 自动推流成功, device: {}, channel: {}", deviceId, channelId);

View File

@ -12,9 +12,7 @@ import com.genersoft.iot.vmp.service.bean.PlayBackCallback;
import com.genersoft.iot.vmp.service.bean.SSRCInfo; import com.genersoft.iot.vmp.service.bean.SSRCInfo;
import com.genersoft.iot.vmp.vmanager.bean.AudioBroadcastResult; import com.genersoft.iot.vmp.vmanager.bean.AudioBroadcastResult;
import com.genersoft.iot.vmp.vmanager.gb28181.play.bean.AudioBroadcastEvent; import com.genersoft.iot.vmp.vmanager.gb28181.play.bean.AudioBroadcastEvent;
import com.genersoft.iot.vmp.vmanager.bean.WVPResult;
import gov.nist.javax.sip.message.SIPResponse; import gov.nist.javax.sip.message.SIPResponse;
import org.springframework.web.context.request.async.DeferredResult;
import javax.sip.InvalidArgumentException; import javax.sip.InvalidArgumentException;
import javax.sip.SipException; import javax.sip.SipException;
@ -62,7 +60,7 @@ public interface IPlayService {
AudioBroadcastResult audioBroadcast(Device device, String channelId); AudioBroadcastResult audioBroadcast(Device device, String channelId);
void stopAudioBroadcast(String deviceId, String channelId); void stopAudioBroadcast(String deviceId, String channelId);
void audioBroadcastCmd(Device device, String channelId, int timeout, AudioBroadcastEvent event) throws InvalidArgumentException, ParseException, SipException; void audioBroadcastCmd(Device device, String channelId, int timeout, MediaServerItem mediaServerItem, String sourceApp, String sourceStream, AudioBroadcastEvent event) throws InvalidArgumentException, ParseException, SipException;
void pauseRtp(String streamId) throws ServiceException, InvalidArgumentException, ParseException, SipException; void pauseRtp(String streamId) throws ServiceException, InvalidArgumentException, ParseException, SipException;

View File

@ -1011,7 +1011,7 @@ public class PlayServiceImpl implements IPlayService {
} }
@Override @Override
public void audioBroadcastCmd(Device device, String channelId, int timeout, AudioBroadcastEvent event) throws InvalidArgumentException, ParseException, SipException { public void audioBroadcastCmd(Device device, String channelId, int timeout, MediaServerItem mediaServerItem, String sourceApp, String sourceStream, AudioBroadcastEvent event) throws InvalidArgumentException, ParseException, SipException {
if (device == null || channelId == null) { if (device == null || channelId == null) {
return; return;
} }
@ -1027,7 +1027,6 @@ public class PlayServiceImpl implements IPlayService {
SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(device.getDeviceId(), channelId, null, null); SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(device.getDeviceId(), channelId, null, null);
if (sendRtpItem != null && sendRtpItem.isOnlyAudio()) { if (sendRtpItem != null && sendRtpItem.isOnlyAudio()) {
// 查询流是否存在,不存在则认为是异常状态 // 查询流是否存在,不存在则认为是异常状态
MediaServerItem mediaServerItem = mediaServerService.getOne(sendRtpItem.getMediaServerId());
Boolean streamReady = zlmrtpServerFactory.isStreamReady(mediaServerItem, sendRtpItem.getApp(), sendRtpItem.getStreamId()); Boolean streamReady = zlmrtpServerFactory.isStreamReady(mediaServerItem, sendRtpItem.getApp(), sendRtpItem.getStreamId());
if (streamReady) { if (streamReady) {
logger.warn("语音广播已经开启: {}", channelId); logger.warn("语音广播已经开启: {}", channelId);
@ -1042,7 +1041,8 @@ public class PlayServiceImpl implements IPlayService {
// 发送通知 // 发送通知
cmder.audioBroadcastCmd(device, channelId, eventResultForOk -> { cmder.audioBroadcastCmd(device, channelId, eventResultForOk -> {
// 发送成功 // 发送成功
AudioBroadcastCatch audioBroadcastCatch = new AudioBroadcastCatch(device.getDeviceId(), channelId, AudioBroadcastCatchStatus.Ready); AudioBroadcastCatch audioBroadcastCatch = new AudioBroadcastCatch(device.getDeviceId(), channelId,
AudioBroadcastCatchStatus.Ready, mediaServerItem, sourceApp, sourceStream);
audioBroadcastManager.update(audioBroadcastCatch); audioBroadcastManager.update(audioBroadcastCatch);
}, eventResultForError -> { }, eventResultForError -> {
// 发送失败 // 发送失败