初步实现语音喊话

结构优化
648540858 2022-05-09 18:15:30 +08:00
parent 9f0ef439cc
commit c2aaae9325
20 changed files with 760 additions and 178 deletions

View File

@ -0,0 +1,59 @@
package com.genersoft.iot.vmp.gb28181.bean;
/**
* 广
* @author lin
*/
public class AudioBroadcastCatch {
public AudioBroadcastCatch(String deviceId, String channelId, AudioBroadcastCatchStatus status) {
this.deviceId = deviceId;
this.channelId = channelId;
this.status = status;
}
public AudioBroadcastCatch() {
}
/**
*
*/
private String deviceId;
/**
*
*/
private String channelId;
/**
* 广
*/
private AudioBroadcastCatchStatus status;
public String getDeviceId() {
return deviceId;
}
public void setDeviceId(String deviceId) {
this.deviceId = deviceId;
}
public String getChannelId() {
return channelId;
}
public void setChannelId(String channelId) {
this.channelId = channelId;
}
public AudioBroadcastCatchStatus getStatus() {
return status;
}
public void setStatus(AudioBroadcastCatchStatus status) {
this.status = status;
}
}

View File

@ -0,0 +1,15 @@
package com.genersoft.iot.vmp.gb28181.bean;
/**
* 广
* @author lin
*/
public enum AudioBroadcastCatchStatus {
// 发送语音广播消息等待对方回复语音广播
Ready,
// 收到回复等待invite消息
WaiteInvite,
// 收到invite消息
Ok,
}

View File

@ -134,16 +134,6 @@ public class Device {
*/
private boolean ssrcCheck;
/**
*
*/
private String audioChannelForReceive;
/**
*
*/
private String audioChannelForSend;
public String getDeviceId() {
return deviceId;
@ -345,11 +335,4 @@ public class Device {
this.ssrcCheck = ssrcCheck;
}
public String getAudioChannelForReceive() {
return audioChannelForReceive;
}
public void setAudioChannelForReceive(String audioChannelForReceive) {
this.audioChannelForReceive = audioChannelForReceive;
}
}

View File

@ -0,0 +1,59 @@
package com.genersoft.iot.vmp.gb28181.session;
import com.genersoft.iot.vmp.gb28181.bean.AudioBroadcastCatch;
import org.springframework.stereotype.Component;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* 广
* @author lin
*/
@Component
public class AudioBroadcastManager {
public static Map<String, AudioBroadcastCatch> data = new ConcurrentHashMap<>();
public void add(AudioBroadcastCatch audioBroadcastCatch) {
this.update(audioBroadcastCatch);
}
public void update(AudioBroadcastCatch audioBroadcastCatch) {
data.put(audioBroadcastCatch.getDeviceId() + audioBroadcastCatch.getChannelId(), audioBroadcastCatch);
}
public void del(String deviceId, String channelId) {
data.remove(deviceId + channelId);
}
public void delByDeviceId(String deviceId) {
for (String key : data.keySet()) {
if (key.startsWith(deviceId)) {
data.remove(key);
}
}
}
public List<AudioBroadcastCatch> getAll(){
Collection<AudioBroadcastCatch> values = data.values();
return new ArrayList<>(values);
}
public boolean exit(String deviceId, String channelId) {
for (String key : data.keySet()) {
if (key.equals(deviceId + channelId)) {
return true;
}
}
return false;
}
public AudioBroadcastCatch get(String deviceId, String channelId) {
return data.get(deviceId + channelId);
}
}

View File

@ -20,9 +20,6 @@ public class CatalogDataCatch {
public static Map<String, CatalogData> data = new ConcurrentHashMap<>();
@Autowired
private DeferredResultHolder deferredResultHolder;
@Autowired
private IVideoManagerStorage storager;

View File

@ -6,8 +6,12 @@ import com.genersoft.iot.vmp.gb28181.event.SipSubscribe;
import com.genersoft.iot.vmp.media.zlm.ZLMHttpHookSubscribe;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.service.bean.SSRCInfo;
import gov.nist.javax.sip.message.SIPRequest;
import gov.nist.javax.sip.stack.SIPDialog;
import javax.sip.Dialog;
import javax.sip.SipException;
import java.text.ParseException;
/**
* @description:
@ -123,6 +127,7 @@ public interface ISIPCommander {
*/
void streamByeCmd(String deviceId, String channelId, String stream, String callId, SipSubscribe.Event okEvent);
void streamByeCmd(String deviceId, String channelId, String stream, String callId);
void streamByeCmd(SIPDialog dialog, SIPRequest request, SipSubscribe.Event okEvent) throws SipException, ParseException;
/**
*
@ -144,21 +149,13 @@ public interface ISIPCommander {
*/
void playSpeedCmd(Device device, StreamInfo streamInfo, Double speed);
/**
* 广
*
* @param device
* @param channelId
*/
boolean audioBroadcastCmd(Device device,String channelId);
/**
* 广
*
* @param device
*/
void audioBroadcastCmd(Device device, SipSubscribe.Event okEvent);
boolean audioBroadcastCmd(Device device);
boolean audioBroadcastCmd(Device device, String channelId, SipSubscribe.Event okEvent, SipSubscribe.Event errorEvent);
/**
*

View File

@ -733,9 +733,17 @@ public class SIPCommander implements ISIPCommander {
}
}
streamByeCmd(dialog, (SIPRequest)transaction.getRequest(), okEvent);
} catch (SipException | ParseException e) {
e.printStackTrace();
}
}
@Override
public void streamByeCmd(SIPDialog dialog, SIPRequest request, SipSubscribe.Event okEvent) throws SipException, ParseException {
Request byeRequest = dialog.createRequest(Request.BYE);
SipURI byeURI = (SipURI) byeRequest.getRequestURI();
SIPRequest request = (SIPRequest)transaction.getRequest();
byeURI.setHost(request.getRemoteAddress().getHostAddress());
byeURI.setPort(request.getRemotePort());
ViaHeader viaHeader = (ViaHeader) byeRequest.getHeader(ViaHeader.NAME);
@ -753,22 +761,6 @@ public class SIPCommander implements ISIPCommander {
}
dialog.sendRequest(clientTransaction);
} catch (SipException | ParseException e) {
e.printStackTrace();
}
}
/**
* 广
*
* @param device
* @param channelId
*/
@Override
public boolean audioBroadcastCmd(Device device, String channelId) {
// 改为新的实现
return false;
}
/**
@ -777,7 +769,7 @@ public class SIPCommander implements ISIPCommander {
* @param device
*/
@Override
public boolean audioBroadcastCmd(Device device) {
public boolean audioBroadcastCmd(Device device,String channelId, SipSubscribe.Event okEvent, SipSubscribe.Event errorEvent) {
try {
StringBuffer broadcastXml = new StringBuffer(200);
String charset = device.getCharset();
@ -786,7 +778,7 @@ public class SIPCommander implements ISIPCommander {
broadcastXml.append("<CmdType>Broadcast</CmdType>\r\n");
broadcastXml.append("<SN>" + (int)((Math.random()*9+1)*100000) + "</SN>\r\n");
broadcastXml.append("<SourceID>" + sipConfig.getId() + "</SourceID>\r\n");
broadcastXml.append("<TargetID>" + device.getDeviceId() + "</TargetID>\r\n");
broadcastXml.append("<TargetID>" + channelId + "</TargetID>\r\n");
broadcastXml.append("</Notify>\r\n");
String tm = Long.toString(System.currentTimeMillis());
@ -795,38 +787,13 @@ public class SIPCommander implements ISIPCommander {
: udpSipProvider.getNewCallId();
Request request = headerProvider.createMessageRequest(device, broadcastXml.toString(), "z9hG4bK-ViaBcst-" + tm, "FromBcst" + tm, null, callIdHeader);
transmitRequest(device, request);
transmitRequest(device, request, errorEvent, okEvent);
return true;
} catch (SipException | ParseException | InvalidArgumentException e) {
e.printStackTrace();
}
return false;
}
@Override
public void audioBroadcastCmd(Device device, SipSubscribe.Event errorEvent) {
try {
StringBuffer broadcastXml = new StringBuffer(200);
String charset = device.getCharset();
broadcastXml.append("<?xml version=\"1.0\" encoding=\"" + charset + "\"?>\r\n");
broadcastXml.append("<Notify>\r\n");
broadcastXml.append("<CmdType>Broadcast</CmdType>\r\n");
broadcastXml.append("<SN>" + (int)((Math.random()*9+1)*100000) + "</SN>\r\n");
broadcastXml.append("<SourceID>" + sipConfig.getId() + "</SourceID>\r\n");
broadcastXml.append("<TargetID>" + device.getDeviceId() + "</TargetID>\r\n");
broadcastXml.append("</Notify>\r\n");
String tm = Long.toString(System.currentTimeMillis());
CallIdHeader callIdHeader = device.getTransport().equals("TCP") ? tcpSipProvider.getNewCallId()
: udpSipProvider.getNewCallId();
Request request = headerProvider.createMessageRequest(device, broadcastXml.toString(), "z9hG4bK-ViaBcst-" + tm, "FromBcst" + tm, null, callIdHeader);
transmitRequest(device, request, errorEvent);
} catch (SipException | ParseException | InvalidArgumentException e) {
e.printStackTrace();
}
}
/**
*

View File

@ -94,6 +94,9 @@ public class AckRequestProcessor extends SIPRequestProcessorParent implements In
param.put("dst_port", sendRtpItem.getPort());
param.put("is_udp", is_Udp);
param.put("src_port", sendRtpItem.getLocalPort());
param.put("pt", 8);
param.put("use_ps", 0);
param.put("only_audio", 1);
zlmrtpServerFactory.startSendRtpStream(mediaInfo, param);

View File

@ -2,21 +2,27 @@ package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl;
import com.alibaba.fastjson.JSONObject;
import com.genersoft.iot.vmp.conf.DynamicTask;
import com.genersoft.iot.vmp.conf.SipConfig;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.gb28181.bean.*;
import com.genersoft.iot.vmp.gb28181.event.SipSubscribe;
import com.genersoft.iot.vmp.gb28181.session.AudioBroadcastManager;
import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager;
import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver;
import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder;
import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommanderFroPlatform;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.ISIPRequestProcessor;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent;
import com.genersoft.iot.vmp.gb28181.utils.SipUtils;
import com.genersoft.iot.vmp.gb28181.utils.XmlUtil;
import com.genersoft.iot.vmp.media.zlm.ZLMHttpHookSubscribe;
import com.genersoft.iot.vmp.media.zlm.ZLMMediaListManager;
import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItemLite;
import com.genersoft.iot.vmp.service.IMediaServerService;
import com.genersoft.iot.vmp.service.IPlayService;
import com.genersoft.iot.vmp.service.bean.MessageForPushChannel;
@ -24,8 +30,12 @@ import com.genersoft.iot.vmp.service.bean.SSRCInfo;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
import com.genersoft.iot.vmp.utils.SerializeUtils;
import com.genersoft.iot.vmp.vmanager.bean.AudioBroadcastResult;
import com.genersoft.iot.vmp.vmanager.bean.WVPResult;
import gov.nist.javax.sdp.TimeDescriptionImpl;
import gov.nist.javax.sdp.fields.TimeField;
import gov.nist.javax.sip.message.SIPRequest;
import gov.nist.javax.sip.stack.SIPDialog;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;
@ -41,6 +51,7 @@ import javax.sip.message.Response;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.List;
import java.util.Vector;
/**
@ -73,7 +84,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
private IPlayService playService;
@Autowired
private ISIPCommander commander;
private AudioBroadcastManager audioBroadcastManager;
@Autowired
private ZLMRTPServerFactory zlmrtpServerFactory;
@ -93,6 +104,15 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
@Autowired
private ZLMMediaListManager mediaListManager;
@Autowired
private DeferredResultHolder resultHolder;
@Autowired
private ZLMHttpHookSubscribe subscribe;
@Autowired
private SipConfig config;
@Override
public void afterPropertiesSet() throws Exception {
@ -126,7 +146,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
// 查询请求是否来自上级平台\设备
ParentPlatform platform = storager.queryParentPlatByServerGBId(requesterId);
if (platform == null) {
inviteFromDeviceHandle(evt, requesterId);
inviteFromDeviceHandle(evt, requesterId, channelId);
}else {
// 查询平台下是否有该通道
DeviceChannel channel = storager.queryChannelInParentPlatform(requesterId, channelId);
@ -542,10 +562,25 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
}
}
public void inviteFromDeviceHandle(RequestEvent evt, String requesterId) throws InvalidArgumentException, ParseException, SipException, SdpException {
public void inviteFromDeviceHandle(RequestEvent evt, String requesterId, String channelId) throws InvalidArgumentException, ParseException, SipException, SdpException {
// 兼容奇葩的海康这里使用的不是通道编号而是本平台编号
// if (channelId.equals(config.getId())) {
// List<AudioBroadcastCatch> all = audioBroadcastManager.getAll();
// for (AudioBroadcastCatch audioBroadcastCatch : all) {
// if (audioBroadcastCatch.getDeviceId().equals(requesterId)) {
// channelId = audioBroadcastCatch.getChannelId();
// }
// }
// }
// // 兼容失败
// if (channelId.equals(config.getId())) {
// responseAck(evt, Response.BAD_REQUEST);
// return;
// }
// 非上级平台请求,查询是否设备请求(通常为接收语音广播的设备)
Device device = redisCatchStorage.getDevice(requesterId);
Request request = evt.getRequest();
if (device != null) {
logger.info("收到设备" + requesterId + "的语音广播Invite请求");
@ -558,7 +593,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
int ssrcIndex = contentString.indexOf("y=");
if (ssrcIndex > 0) {
substring = contentString.substring(0, ssrcIndex);
ssrc = contentString.substring(ssrcIndex + 2, ssrcIndex + 12);
ssrc = contentString.substring(ssrcIndex + 2, ssrcIndex + 12).trim();
}
ssrcIndex = substring.indexOf("f=");
if (ssrcIndex > 0) {
@ -568,6 +603,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
// 获取支持的格式
Vector mediaDescriptions = sdp.getMediaDescriptions(true);
// 查看是否支持PS 负载96
int port = -1;
//boolean recvonly = false;
@ -602,10 +638,150 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
responseAck(evt, Response.UNSUPPORTED_MEDIA_TYPE); // 不支持的格式发415
return;
}
String username = sdp.getOrigin().getUsername();
String sessionName = sdp.getSessionName().getValue();
String addressStr = sdp.getOrigin().getAddress();
logger.info("设备{}请求语音流,地址:{}:{}ssrc{}", username, addressStr, port, ssrc);
logger.info("设备{}请求语音流,地址:{}:{}ssrc{}", requesterId, addressStr, port, ssrc);
MediaServerItem mediaServerItem = playService.getNewMediaServerItem(device);
if (mediaServerItem == null) {
logger.warn("未找到可用的zlm");
responseAck(evt, Response.BUSY_HERE);
return;
}
SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(mediaServerItem, addressStr, port, ssrc, requesterId,
device.getDeviceId(), channelId,
mediaTransmissionTCP);
sendRtpItem.setTcp(mediaTransmissionTCP);
if (tcpActive != null) {
sendRtpItem.setTcpActive(tcpActive);
}
if (sendRtpItem == null) {
logger.warn("服务器端口资源不足");
responseAck(evt, Response.BUSY_HERE);
return;
}
String app = "broadcast";
String stream = device.getDeviceId() + "_" + channelId;
CallIdHeader callIdHeader = (CallIdHeader) request.getHeader(CallIdHeader.NAME);
sendRtpItem.setPlayType(InviteStreamType.PLAY);
sendRtpItem.setCallId(callIdHeader.getCallId());
sendRtpItem.setPlatformId(requesterId);
sendRtpItem.setStatus(1);
sendRtpItem.setApp(app);
sendRtpItem.setStreamId(stream);
redisCatchStorage.updateSendRTPSever(sendRtpItem);
// hook监听等待设备推流上来
// 添加订阅
JSONObject subscribeKey = new JSONObject();
subscribeKey.put("app", app);
subscribeKey.put("stream", stream);
subscribeKey.put("regist", true);
subscribeKey.put("schema", "rtmp");
subscribeKey.put("mediaServerId", mediaServerItem.getId());
String finalSsrc = ssrc;
String waiteStreamTimeoutTaskKey = "waite-stream-" + device.getDeviceId() + channelId;
if (zlmrtpServerFactory.isStreamReady(mediaServerItem, app, stream)) {
logger.info("发现已经在推流");
dynamicTask.stop(waiteStreamTimeoutTaskKey);
sendRtpItem.setStatus(2);
redisCatchStorage.updateSendRTPSever(sendRtpItem);
StringBuffer content = new StringBuffer(200);
content.append("v=0\r\n");
content.append("o="+ config.getId() +" "+ sdp.getOrigin().getSessionId() +" " + sdp.getOrigin().getSessionVersion() + " IN IP4 "+mediaServerItem.getSdpIp()+"\r\n");
content.append("s=Play\r\n");
content.append("c=IN IP4 "+mediaServerItem.getSdpIp()+"\r\n");
content.append("t=0 0\r\n");
content.append("m=audio "+ sendRtpItem.getLocalPort()+" RTP/AVP 8\r\n");
content.append("a=sendonly\r\n");
content.append("a=rtpmap:8 PCMA/8000\r\n");
content.append("y="+ finalSsrc + "\r\n");
content.append("f=v/////a/1/8/1\r\n");
ParentPlatform parentPlatform = new ParentPlatform();
parentPlatform.setServerIP(device.getIp());
parentPlatform.setServerPort(device.getPort());
parentPlatform.setServerGBId(device.getDeviceId());
try {
responseSdpAck(evt, content.toString(), parentPlatform);
} catch (SipException e) {
throw new RuntimeException(e);
} catch (InvalidArgumentException e) {
throw new RuntimeException(e);
} catch (ParseException e) {
throw new RuntimeException(e);
}
}else {
// 设置等待推流的超时; 默认20s
String finalChannelId = channelId;
dynamicTask.startDelay(waiteStreamTimeoutTaskKey, ()->{
logger.info("等待推流超时: {}/{}", app, stream);
if (audioBroadcastManager.exit(device.getDeviceId(), finalChannelId)) {
audioBroadcastManager.del(device.getDeviceId(), finalChannelId);
}else {
// 兼容海康使用了错误的通道ID的情况
audioBroadcastManager.delByDeviceId(device.getDeviceId());
}
// 发送bye
try {
cmder.streamByeCmd((SIPDialog)evt.getServerTransaction().getDialog(), (SIPRequest) evt.getRequest(), null);
} catch (SipException e) {
throw new RuntimeException(e);
} catch (ParseException e) {
throw new RuntimeException(e);
}
}, 20*1000);
subscribe.addSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, subscribeKey,
(MediaServerItem mediaServerItemInUse, JSONObject json)->{
sendRtpItem.setStatus(2);
redisCatchStorage.updateSendRTPSever(sendRtpItem);
StringBuffer content = new StringBuffer(200);
content.append("v=0\r\n");
content.append("o="+ finalChannelId +" 0 0 IN IP4 "+mediaServerItem.getSdpIp()+"\r\n");
content.append("s=Play\r\n");
content.append("c=IN IP4 "+mediaServerItem.getSdpIp()+"\r\n");
content.append("t=0 0\r\n");
content.append("m=video "+ sendRtpItem.getLocalPort()+" RTP/AVP 8\r\n");
content.append("a=sendonly\r\n");
content.append("a=rtpmap:8 PCMA/8000\r\n");
content.append("y="+ finalSsrc + "\r\n");
content.append("f=v/////a/1/8/1\r\n");
ParentPlatform parentPlatform = new ParentPlatform();
parentPlatform.setServerIP(device.getIp());
parentPlatform.setServerPort(device.getPort());
parentPlatform.setServerGBId(device.getDeviceId());
try {
responseSdpAck(evt, content.toString(), parentPlatform);
} catch (SipException e) {
throw new RuntimeException(e);
} catch (InvalidArgumentException e) {
throw new RuntimeException(e);
} catch (ParseException e) {
throw new RuntimeException(e);
}
});
}
String timeOutTaskKey = "audio-broadcast-" + device.getDeviceId() + channelId;
dynamicTask.stop(timeOutTaskKey);
String key = DeferredResultHolder.CALLBACK_CMD_BROADCAST + device.getDeviceId();
WVPResult<AudioBroadcastResult> wvpResult = new WVPResult<>();
wvpResult.setCode(0);
wvpResult.setMsg("success");
AudioBroadcastResult audioBroadcastResult = new AudioBroadcastResult();
audioBroadcastResult.setApp(app);
audioBroadcastResult.setStream(stream);
audioBroadcastResult.setMediaServerItem(new MediaServerItemLite(mediaServerItem));
audioBroadcastResult.setCodec("G.711");
wvpResult.setData(audioBroadcastResult);
RequestMessage requestMessage = new RequestMessage();
requestMessage.setKey(key);
requestMessage.setData(wvpResult);
resultHolder.invokeAllResult(requestMessage);
} else {
logger.warn("来自无效设备/平台的请求");
responseAck(evt, Response.BAD_REQUEST);

View File

@ -6,7 +6,11 @@ import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorP
import org.dom4j.Element;
import org.springframework.beans.factory.annotation.Autowired;
import javax.sip.InvalidArgumentException;
import javax.sip.RequestEvent;
import javax.sip.SipException;
import javax.sip.message.Response;
import java.text.ParseException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@ -23,6 +27,10 @@ public abstract class MessageHandlerAbstract extends SIPRequestProcessorParent i
@Override
public void handForDevice(RequestEvent evt, Device device, Element element) {
String cmd = getText(element, "CmdType");
if (cmd == null) {
handNullCmd(evt);
return;
}
IMessageHandler messageHandler = messageHandlerMap.get(cmd);
if (messageHandler != null) {
messageHandler.handForDevice(evt, device, element);
@ -37,4 +45,17 @@ public abstract class MessageHandlerAbstract extends SIPRequestProcessorParent i
messageHandler.handForPlatform(evt, parentPlatform, element);
}
}
public void handNullCmd(RequestEvent evt){
try {
responseAck(evt, Response.OK);
} catch (SipException e) {
throw new RuntimeException(e);
} catch (InvalidArgumentException e) {
throw new RuntimeException(e);
} catch (ParseException e) {
throw new RuntimeException(e);
}
return;
}
}

View File

@ -1,8 +1,11 @@
package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.response.cmd;
import com.alibaba.fastjson.JSONObject;
import com.genersoft.iot.vmp.gb28181.bean.AudioBroadcastCatch;
import com.genersoft.iot.vmp.gb28181.bean.AudioBroadcastCatchStatus;
import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform;
import com.genersoft.iot.vmp.gb28181.session.AudioBroadcastManager;
import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder;
import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent;
@ -36,6 +39,9 @@ public class BroadcastResponseMessageHandler extends SIPRequestProcessorParent i
@Autowired
private DeferredResultHolder deferredResultHolder;
@Autowired
private AudioBroadcastManager audioBroadcastManager;
@Override
public void afterPropertiesSet() throws Exception {
responseMessageHandler.addHandler(cmdType, this);
@ -45,21 +51,16 @@ public class BroadcastResponseMessageHandler extends SIPRequestProcessorParent i
public void handForDevice(RequestEvent evt, Device device, Element rootElement) {
try {
String channelId = getText(rootElement, "DeviceID");
String key = DeferredResultHolder.CALLBACK_CMD_BROADCAST + device.getDeviceId() + channelId;
// 回复200 OK
responseAck(evt, Response.OK);
// 此处是对本平台发出Broadcast指令的应答
JSONObject json = new JSONObject();
XmlUtil.node2Json(rootElement, json);
if (logger.isDebugEnabled()) {
logger.debug(json.toJSONString());
if (!audioBroadcastManager.exit(device.getDeviceId(), channelId)) {
// 回复410
responseAck(evt, Response.GONE);
return;
}
RequestMessage msg = new RequestMessage();
msg.setKey(key);
msg.setData(json);
deferredResultHolder.invokeAllResult(msg);
logger.info("收到语音广播的回复:{}/{}", device.getDeviceId(), channelId );
AudioBroadcastCatch audioBroadcastCatch = audioBroadcastManager.get(device.getDeviceId(), channelId);
audioBroadcastCatch.setStatus(AudioBroadcastCatchStatus.WaiteInvite);
audioBroadcastManager.update(audioBroadcastCatch);
responseAck(evt, Response.OK);
} catch (ParseException | SipException | InvalidArgumentException e) {
e.printStackTrace();
}

View File

@ -271,7 +271,7 @@ public class ZLMRTPServerFactory {
*
*/
public Boolean isStreamReady(MediaServerItem mediaServerItem, String app, String streamId) {
JSONObject mediaInfo = zlmresTfulUtils.getMediaInfo(mediaServerItem, app, "rtmp", streamId);
JSONObject mediaInfo = zlmresTfulUtils.getMediaInfo(mediaServerItem, app, "rtsp", streamId);
return (mediaInfo.getInteger("code") == 0 && mediaInfo.getBoolean("online"));
}

View File

@ -0,0 +1,197 @@
package com.genersoft.iot.vmp.media.zlm.dto;
import com.genersoft.iot.vmp.gb28181.session.SsrcConfig;
import com.genersoft.iot.vmp.media.zlm.ZLMServerConfig;
import org.springframework.util.StringUtils;
import java.util.HashMap;
/**
* MediaServerItem便
*/
public class MediaServerItemLite {
private String id;
private String ip;
private String hookIp;
private String sdpIp;
private String streamIp;
private int httpPort;
private int httpSSlPort;
private int rtmpPort;
private int rtmpSSlPort;
private int rtpProxyPort;
private int rtspPort;
private int rtspSSLPort;
private String secret;
private int streamNoneReaderDelayMS;
private int hookAliveInterval;
private int recordAssistPort;
public MediaServerItemLite(MediaServerItem mediaServerItem) {
this.id = mediaServerItem.getId();
this.ip = mediaServerItem.getIp();
this.hookIp = mediaServerItem.getHookIp();
this.sdpIp = mediaServerItem.getSdpIp();
this.streamIp = mediaServerItem.getStreamIp();
this.httpPort = mediaServerItem.getHttpPort();
this.httpSSlPort = mediaServerItem.getHttpSSlPort();
this.rtmpPort = mediaServerItem.getRtmpPort();
this.rtmpSSlPort = mediaServerItem.getRtmpSSlPort();
this.rtpProxyPort = mediaServerItem.getRtpProxyPort();
this.rtspPort = mediaServerItem.getRtspPort();
this.rtspSSLPort = mediaServerItem.getRtspSSLPort();
this.secret = mediaServerItem.getSecret();
this.streamNoneReaderDelayMS = mediaServerItem.getStreamNoneReaderDelayMS();
this.hookAliveInterval = mediaServerItem.getHookAliveInterval();
this.streamNoneReaderDelayMS = mediaServerItem.getStreamNoneReaderDelayMS();
this.recordAssistPort = mediaServerItem.getRecordAssistPort();
}
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getIp() {
return ip;
}
public void setIp(String ip) {
this.ip = ip;
}
public String getHookIp() {
return hookIp;
}
public void setHookIp(String hookIp) {
this.hookIp = hookIp;
}
public String getSdpIp() {
return sdpIp;
}
public void setSdpIp(String sdpIp) {
this.sdpIp = sdpIp;
}
public String getStreamIp() {
return streamIp;
}
public void setStreamIp(String streamIp) {
this.streamIp = streamIp;
}
public int getHttpPort() {
return httpPort;
}
public void setHttpPort(int httpPort) {
this.httpPort = httpPort;
}
public int getHttpSSlPort() {
return httpSSlPort;
}
public void setHttpSSlPort(int httpSSlPort) {
this.httpSSlPort = httpSSlPort;
}
public int getRtmpPort() {
return rtmpPort;
}
public void setRtmpPort(int rtmpPort) {
this.rtmpPort = rtmpPort;
}
public int getRtmpSSlPort() {
return rtmpSSlPort;
}
public void setRtmpSSlPort(int rtmpSSlPort) {
this.rtmpSSlPort = rtmpSSlPort;
}
public int getRtpProxyPort() {
return rtpProxyPort;
}
public void setRtpProxyPort(int rtpProxyPort) {
this.rtpProxyPort = rtpProxyPort;
}
public int getRtspPort() {
return rtspPort;
}
public void setRtspPort(int rtspPort) {
this.rtspPort = rtspPort;
}
public int getRtspSSLPort() {
return rtspSSLPort;
}
public void setRtspSSLPort(int rtspSSLPort) {
this.rtspSSLPort = rtspSSLPort;
}
public String getSecret() {
return secret;
}
public void setSecret(String secret) {
this.secret = secret;
}
public int getStreamNoneReaderDelayMS() {
return streamNoneReaderDelayMS;
}
public void setStreamNoneReaderDelayMS(int streamNoneReaderDelayMS) {
this.streamNoneReaderDelayMS = streamNoneReaderDelayMS;
}
public int getHookAliveInterval() {
return hookAliveInterval;
}
public void setHookAliveInterval(int hookAliveInterval) {
this.hookAliveInterval = hookAliveInterval;
}
public int getRecordAssistPort() {
return recordAssistPort;
}
public void setRecordAssistPort(int recordAssistPort) {
this.recordAssistPort = recordAssistPort;
}
}

View File

@ -11,6 +11,7 @@ import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.service.bean.InviteTimeOutCallback;
import com.genersoft.iot.vmp.service.bean.PlayBackCallback;
import com.genersoft.iot.vmp.service.bean.SSRCInfo;
import com.genersoft.iot.vmp.vmanager.gb28181.play.bean.AudioBroadcastEvent;
import com.genersoft.iot.vmp.vmanager.gb28181.play.bean.PlayResult;
import org.springframework.http.ResponseEntity;
import org.springframework.web.context.request.async.DeferredResult;
@ -40,4 +41,6 @@ public interface IPlayService {
DeferredResult<ResponseEntity<String>> download(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo,String deviceId, String channelId, String startTime, String endTime, int downloadSpeed, InviteStreamCallback infoCallBack, PlayBackCallback hookCallBack);
StreamInfo getDownLoadInfo(String deviceId, String channelId, String stream);
void audioBroadcast(Device device, String channelId, int timeout, AudioBroadcastEvent event);
}

View File

@ -8,6 +8,7 @@ import com.genersoft.iot.vmp.conf.DynamicTask;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.gb28181.bean.*;
import com.genersoft.iot.vmp.gb28181.event.SipSubscribe;
import com.genersoft.iot.vmp.gb28181.session.AudioBroadcastManager;
import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager;
import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder;
import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage;
@ -26,7 +27,9 @@ import com.genersoft.iot.vmp.service.bean.SSRCInfo;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
import com.genersoft.iot.vmp.utils.redis.RedisUtil;
import com.genersoft.iot.vmp.vmanager.bean.AudioBroadcastResult;
import com.genersoft.iot.vmp.vmanager.bean.WVPResult;
import com.genersoft.iot.vmp.vmanager.gb28181.play.bean.AudioBroadcastEvent;
import com.genersoft.iot.vmp.vmanager.gb28181.play.bean.PlayResult;
import com.genersoft.iot.vmp.service.IMediaService;
import com.genersoft.iot.vmp.service.IPlayService;
@ -57,6 +60,9 @@ public class PlayServiceImpl implements IPlayService {
@Autowired
private SIPCommander cmder;
@Autowired
private AudioBroadcastManager audioBroadcastManager;
@Autowired
private SIPCommanderFroPlatform sipCommanderFroPlatform;
@ -621,4 +627,42 @@ public class PlayServiceImpl implements IPlayService {
}
}
}
@Override
public void audioBroadcast(Device device, String channelId, int timeout, AudioBroadcastEvent event) {
if (device == null || channelId == null) {
return;
}
DeviceChannel deviceChannel = storager.queryChannel(device.getDeviceId(), channelId);
if (deviceChannel == null) {
logger.warn("开启语音广播的时候未找到通道: {}", channelId);
event.call("开启语音广播的时候未找到通道");
return;
}
// 查询通道使用状态
if (audioBroadcastManager.exit(device.getDeviceId(), channelId)) {
logger.warn("语音广播已经开启: {}", channelId);
event.call("语音广播已经开启");
return;
}
String timeOutTaskKey = "audio-broadcast-" + device.getDeviceId() + channelId;
dynamicTask.startDelay(timeOutTaskKey, ()->{
logger.error("语音广播发送超时: {}:{}", device.getDeviceId(), channelId);
event.call("语音广播发送超时");
audioBroadcastManager.del(device.getDeviceId(), channelId);
}, timeout * 1000);
// 发送通知
cmder.audioBroadcastCmd(device, channelId, eventResultForOk -> {
// 发送成功
AudioBroadcastCatch audioBroadcastCatch = new AudioBroadcastCatch(device.getDeviceId(), channelId, AudioBroadcastCatchStatus.Ready);
audioBroadcastManager.add(audioBroadcastCatch);
}, eventResultForError -> {
dynamicTask.stop(timeOutTaskKey);
// 发送失败
logger.error("语音广播发送失败: {}:{}", channelId, eventResultForError.msg);
event.call("语音广播发送失败");
audioBroadcastManager.del(device.getDeviceId(), channelId);
});
}
}

View File

@ -37,8 +37,6 @@ public interface DeviceMapper {
"subscribeCycleForMobilePosition," +
"mobilePositionSubmissionInterval," +
"subscribeCycleForAlarm," +
"audioChannelForReceive," +
"audioChannelForSend," +
"ssrcCheck," +
"online" +
") VALUES (" +
@ -62,8 +60,6 @@ public interface DeviceMapper {
"#{subscribeCycleForMobilePosition}," +
"#{mobilePositionSubmissionInterval}," +
"#{subscribeCycleForAlarm}," +
"#{audioChannelForReceive}," +
"#{audioChannelForSend}," +
"#{ssrcCheck}," +
"#{online}" +
")")
@ -90,8 +86,6 @@ public interface DeviceMapper {
"<if test=\"subscribeCycleForMobilePosition != null\">, subscribeCycleForMobilePosition=${subscribeCycleForMobilePosition}</if>" +
"<if test=\"mobilePositionSubmissionInterval != null\">, mobilePositionSubmissionInterval=${mobilePositionSubmissionInterval}</if>" +
"<if test=\"subscribeCycleForAlarm != null\">, subscribeCycleForAlarm=${subscribeCycleForAlarm}</if>" +
"<if test=\"audioChannelForReceive != null\">, audioChannelForReceive=#{audioChannelForReceive}</if>" +
"<if test=\"audioChannelForSend != null\">, audioChannelForSend=#{audioChannelForSend}</if>" +
"<if test=\"ssrcCheck != null\">, ssrcCheck=${ssrcCheck}</if>" +
"WHERE deviceId='${deviceId}'"+
" </script>"})

View File

@ -0,0 +1,62 @@
package com.genersoft.iot.vmp.vmanager.bean;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItemLite;
/**
* @author lin
*/
public class AudioBroadcastResult {
/**
*
*/
private MediaServerItemLite mediaServerItem;
/**
*
*/
private String codec;
/**
* zlm
*/
private String app;
/**
* zlmID
*/
private String stream;
public MediaServerItemLite getMediaServerItem() {
return mediaServerItem;
}
public void setMediaServerItem(MediaServerItemLite mediaServerItem) {
this.mediaServerItem = mediaServerItem;
}
public String getCodec() {
return codec;
}
public void setCodec(String codec) {
this.codec = codec;
}
public String getApp() {
return app;
}
public void setApp(String app) {
this.app = app;
}
public String getStream() {
return stream;
}
public void setStream(String stream) {
this.stream = stream;
}
}

View File

@ -11,6 +11,7 @@ import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.service.IMediaServerService;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.vmanager.bean.AudioBroadcastResult;
import com.genersoft.iot.vmp.vmanager.bean.WVPResult;
import com.genersoft.iot.vmp.vmanager.gb28181.play.bean.PlayResult;
import com.genersoft.iot.vmp.service.IMediaService;
@ -39,6 +40,9 @@ import org.springframework.web.context.request.async.DeferredResult;
import java.util.List;
import java.util.UUID;
/**
* @author lin
*/
@Api(tags = "国标设备点播")
@CrossOrigin
@RestController
@ -102,7 +106,7 @@ public class PlayController {
logger.debug(String.format("设备预览/回放停止API调用streamId%s_%s", deviceId, channelId ));
String uuid = UUID.randomUUID().toString();
DeferredResult<ResponseEntity<String>> result = new DeferredResult<ResponseEntity<String>>();
DeferredResult<ResponseEntity<String>> result = new DeferredResult<>();
// 录像查询以channelId作为deviceId查询
String key = DeferredResultHolder.CALLBACK_CMD_STOP + deviceId + channelId;
@ -123,7 +127,7 @@ public class PlayController {
RequestMessage msgForSuccess = new RequestMessage();
msgForSuccess.setId(uuid);
msgForSuccess.setKey(key);
msgForSuccess.setData(String.format("success"));
msgForSuccess.setData("success");
resultHolder.invokeAllResult(msgForSuccess);
});
@ -251,81 +255,73 @@ public class PlayController {
@ApiOperation("语音广播命令")
@ApiImplicitParams({
@ApiImplicitParam(name = "deviceId", value = "设备Id", dataTypeClass = String.class),
@ApiImplicitParam(name = "channelForSend", value = "设备用于发送语音数据的通道", dataTypeClass = String.class),
@ApiImplicitParam(name = "channelForReceive", value = "设备用于接收语音数据的通道", dataTypeClass = String.class),
@ApiImplicitParam(name = "channelId", value = "通道Id", dataTypeClass = String.class),
@ApiImplicitParam(name = "timeout", value = "推流超时时间(秒)", dataTypeClass = Integer.class),
})
@GetMapping("/broadcast/{deviceId}")
@PostMapping("/broadcast/{deviceId}")
public DeferredResult<ResponseEntity<String>> broadcastApi(@PathVariable String deviceId,
String channelForSend,
String channelForReceive) {
@GetMapping("/broadcast/{deviceId}/{channelId}")
@PostMapping("/broadcast/{deviceId}/{channelId}")
public DeferredResult<WVPResult<AudioBroadcastResult>> broadcastApi(@PathVariable String deviceId, @PathVariable String channelId, Integer timeout) {
if (logger.isDebugEnabled()) {
logger.debug("语音广播API调用");
}
Device device = storager.queryVideoDevice(deviceId);
DeferredResult<ResponseEntity<String>> result = new DeferredResult<>(3 * 1000L);
if (device == null) {
WVPResult<AudioBroadcastResult> result = new WVPResult<>();
result.setCode(-1);
result.setMsg("未找到设备: " + deviceId);
DeferredResult<WVPResult<AudioBroadcastResult>> deferredResult = new DeferredResult<>();
deferredResult.setResult(result);
return deferredResult;
}
if (channelId == null) {
WVPResult<AudioBroadcastResult> result = new WVPResult<>();
result.setCode(-1);
result.setMsg("未找到通道: " + channelId);
DeferredResult<WVPResult<AudioBroadcastResult>> deferredResult = new DeferredResult<>();
deferredResult.setResult(result);
return deferredResult;
}
String key = DeferredResultHolder.CALLBACK_CMD_BROADCAST + deviceId;
if (resultHolder.exist(key, null)) {
result.setResult(new ResponseEntity<>("设备使用中",HttpStatus.OK));
return result;
WVPResult<AudioBroadcastResult> wvpResult = new WVPResult<>();
wvpResult.setCode(-1);
wvpResult.setMsg("设备使用中");
DeferredResult<WVPResult<AudioBroadcastResult>> deferredResult = new DeferredResult<>();
deferredResult.setResult(wvpResult);
return deferredResult;
}
// playService.audioBroadcast(deviceId, channelForSend, channelForReceive);
if (timeout == null){
timeout = 30;
}
DeferredResult<WVPResult<AudioBroadcastResult>> result = new DeferredResult<>(timeout.longValue()*1000 + 2000);
String uuid = UUID.randomUUID().toString();
if (device == null) {
resultHolder.put(key, key, result);
RequestMessage msg = new RequestMessage();
msg.setKey(key);
msg.setId(uuid);
JSONObject json = new JSONObject();
json.put("DeviceID", deviceId);
json.put("CmdType", "Broadcast");
json.put("Result", "Failed");
json.put("Description", "Device 不存在");
msg.setData(json);
resultHolder.invokeResult(msg);
return result;
}
cmder.audioBroadcastCmd(device, (event) -> {
RequestMessage msg = new RequestMessage();
msg.setKey(key);
msg.setId(uuid);
JSONObject json = new JSONObject();
json.put("DeviceID", deviceId);
json.put("CmdType", "Broadcast");
json.put("Result", "Failed");
json.put("Description", String.format("语音广播操作失败,错误码: %s, %s", event.statusCode, event.msg));
msg.setData(json);
resultHolder.invokeResult(msg);
result.onTimeout(()->{
WVPResult<AudioBroadcastResult> wvpResult = new WVPResult<>();
wvpResult.setCode(-1);
wvpResult.setMsg("请求超时");
RequestMessage requestMessage = new RequestMessage();
requestMessage.setKey(key);
requestMessage.setData(wvpResult);
resultHolder.invokeAllResult(requestMessage);
});
result.onTimeout(() -> {
logger.warn(String.format("语音广播操作超时, 设备未返回应答指令"));
RequestMessage msg = new RequestMessage();
msg.setKey(key);
msg.setId(uuid);
JSONObject json = new JSONObject();
json.put("DeviceID", deviceId);
json.put("CmdType", "Broadcast");
json.put("Result", "Failed");
json.put("Error", "Timeout. Device did not response to broadcast command.");
msg.setData(json);
resultHolder.invokeResult(msg);
playService.audioBroadcast(device, channelId, timeout, (msg)->{
WVPResult<AudioBroadcastResult> wvpResult = new WVPResult<>();
wvpResult.setCode(-1);
wvpResult.setMsg(msg);
RequestMessage requestMessage = new RequestMessage();
requestMessage.setKey(key);
requestMessage.setData(wvpResult);
resultHolder.invokeAllResult(requestMessage);
});
resultHolder.put(key, uuid, result);
return result;
}
@ApiOperation("获取所有的ssrc")
@GetMapping("/ssrc")
public WVPResult<JSONObject> getSSRC() {
public WVPResult<JSONObject> getSsrc() {
if (logger.isDebugEnabled()) {
logger.debug("获取所有的ssrc");
}

View File

@ -0,0 +1,9 @@
package com.genersoft.iot.vmp.vmanager.gb28181.play.bean;
/**
* @author lin
*/
public interface AudioBroadcastEvent {
void call(String msg);
}

View File

@ -37,9 +37,6 @@
</el-select>
</el-form-item>
<el-form-item label="语音发送通道" prop="name">
<el-input v-model="form.audioChannelForSend" clearable></el-input>
</el-form-item>
<el-form-item label="语音接收送通道" prop="name">
<el-input v-model="form.audioChannelForReceive" clearable></el-input>
</el-form-item>
<el-form-item label="目录订阅" title="0为取消订阅" prop="subscribeCycleForCatalog" >
@ -105,6 +102,8 @@ export default {
})
},
onSubmit: function () {
console.log("onSubmit");
console.log(this.form);
this.form.subscribeCycleForCatalog = this.form.subscribeCycleForCatalog||0
this.form.subscribeCycleForMobilePosition = this.form.subscribeCycleForMobilePosition||0
this.form.mobilePositionSubmissionInterval = this.form.mobilePositionSubmissionInterval||0
@ -124,7 +123,7 @@ export default {
});
}
}).catch(function (error) {
console.error(error);
console.log(error);
});
},
close: function () {