修复语音对讲

pull/1642/head
648540858 2024-09-11 16:59:52 +08:00
parent 91a8dc59a8
commit e078360928
11 changed files with 194 additions and 180 deletions

View File

@ -6,7 +6,8 @@ import lombok.Data;
@Data
public class InviteInfo {
private String requesterId;
private String channelId;
private String targetChannelId;
private String sourceChannelId;
private String sessionName;
private String ssrc;
private boolean tcp;

View File

@ -459,6 +459,7 @@ public class DeviceServiceImpl implements IDeviceService {
if (!ObjectUtils.isEmpty(device.getStreamMode())) {
deviceInStore.setStreamMode(device.getStreamMode());
}
deviceInStore.setBroadcastPushAfterAck(device.isBroadcastPushAfterAck());
// 目录订阅相关的信息
if (deviceInStore.getSubscribeCycleForCatalog() != device.getSubscribeCycleForCatalog()) {
if (device.getSubscribeCycleForCatalog() > 0) {

View File

@ -293,12 +293,14 @@ public class InviteStreamServiceImpl implements IInviteStreamService {
public void call(InviteSessionType type, Integer channelId, String stream, int code, String msg, StreamInfo data) {
String key = buildSubStreamKey(type, channelId, stream);
List<ErrorCallback<StreamInfo>> callbacks = inviteErrorCallbackMap.get(key);
if (callbacks == null) {
if (callbacks == null || callbacks.isEmpty()) {
return;
}
for (ErrorCallback<StreamInfo> callback : callbacks) {
if (callback != null) {
callback.run(code, msg, data);
}
}
inviteErrorCallbackMap.remove(key);
}

View File

@ -127,7 +127,7 @@ public class PlayServiceImpl implements IPlayService {
@Async("taskExecutor")
@org.springframework.context.event.EventListener
public void onApplicationEvent(MediaArrivalEvent event) {
if ("broadcast".equals(event.getApp())) {
if ("broadcast".equals(event.getApp()) || "talk".equals(event.getApp())) {
if (event.getStream().indexOf("_") > 0) {
String[] streamArray = event.getStream().split("_");
if (streamArray.length == 2) {

View File

@ -2,7 +2,6 @@ package com.genersoft.iot.vmp.gb28181.session;
import com.genersoft.iot.vmp.conf.SipConfig;
import com.genersoft.iot.vmp.gb28181.bean.AudioBroadcastCatch;
import com.genersoft.iot.vmp.gb28181.utils.SipUtils;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@ -52,17 +51,11 @@ public class AudioBroadcastManager {
public List<AudioBroadcastCatch> getByDeviceId(String deviceId) {
List<AudioBroadcastCatch> audioBroadcastCatchList= new ArrayList<>();
if (SipUtils.isFrontEnd(deviceId)) {
if (data.get(deviceId) != null) {
audioBroadcastCatchList.add(data.get(deviceId));
}
}else {
for (AudioBroadcastCatch broadcastCatch : data.values()) {
if (broadcastCatch.getDeviceId().equals(deviceId)) {
audioBroadcastCatchList.add(broadcastCatch);
}
}
}
return audioBroadcastCatchList;
}

View File

@ -164,11 +164,12 @@ public class SIPRequestHeaderProvider {
// SipURI requestLine = SipFactory.getInstance().createAddressFactory().createSipURI(device.getDeviceId(), device.getHostAddress());
// via
ArrayList<ViaHeader> viaHeaders = new ArrayList<ViaHeader>();
ViaHeader viaHeader = SipFactory.getInstance().createHeaderFactory().createViaHeader(sipLayer.getLocalIp(device.getLocalIp()), sipConfig.getPort(), device.getTransport(), SipUtils.getNewViaTag());
ViaHeader viaHeader = SipFactory.getInstance().createHeaderFactory().createViaHeader(sipLayer.getLocalIp(device.getLocalIp()), sipConfig.getPort(), device.getTransport(), transactionInfo.getViaBranch());
// viaHeader.setRPort();
viaHeaders.add(viaHeader);
//from
SipURI fromSipURI = SipFactory.getInstance().createAddressFactory().createSipURI(sipConfig.getId(),sipConfig.getDomain());
// SipURI fromSipURI = SipFactory.getInstance().createAddressFactory().createSipURI(sipConfig.getId(),sipConfig.getDomain());
SipURI fromSipURI = SipFactory.getInstance().createAddressFactory().createSipURI(sipConfig.getId(), sipLayer.getLocalIp(device.getLocalIp()) + ":" + sipConfig.getPort());
Address fromAddress = SipFactory.getInstance().createAddressFactory().createAddress(fromSipURI);
FromHeader fromHeader = SipFactory.getInstance().createHeaderFactory().createFromHeader(fromAddress, transactionInfo.getFromTag());
//to

View File

@ -46,6 +46,7 @@ import javax.sip.header.CallIdHeader;
import javax.sip.message.Response;
import java.text.ParseException;
import java.time.Instant;
import java.util.List;
import java.util.Vector;
/**
@ -143,12 +144,12 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
// 查询请求是否来自上级平台\设备
Platform platform = platformService.queryPlatformByServerGBId(inviteInfo.getRequesterId());
if (platform == null) {
inviteFromDeviceHandle(request, inviteInfo.getRequesterId(), inviteInfo.getChannelId());
inviteFromDeviceHandle(request, inviteInfo);
} else {
// 查询平台下是否有该通道
CommonGBChannel channel= channelService.queryOneWithPlatform(platform.getId(), inviteInfo.getChannelId());
CommonGBChannel channel= channelService.queryOneWithPlatform(platform.getId(), inviteInfo.getTargetChannelId());
if (channel == null) {
log.info("[上级INVITE] 通道不存在返回404: {}", inviteInfo.getChannelId());
log.info("[上级INVITE] 通道不存在返回404: {}", inviteInfo.getTargetChannelId());
try {
// 通道不存在发404资源不存在
responseAck(request, Response.NOT_FOUND);
@ -593,7 +594,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
InviteInfo inviteInfo = new InviteInfo();
SIPRequest request = (SIPRequest)evt.getRequest();
String channelIdFromSub = SipUtils.getChannelIdFromRequest(request);
String[] channelIdArrayFromSub = SipUtils.getChannelIdFromRequest(request);
// 解析sdp消息, 使用jainsip 自带的sdp解析方式
String contentString = new String(request.getRawContent());
@ -605,7 +606,8 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
URIField uriField = (URIField)sdp.getURI();
channelIdFromSdp = uriField.getURI().split(":")[0];
}
final String channelId = StringUtils.isNotBlank(channelIdFromSdp) ? channelIdFromSdp : channelIdFromSub;
final String channelId = StringUtils.isNotBlank(channelIdFromSdp) ? channelIdFromSdp :
(channelIdArrayFromSub != null? channelIdArrayFromSub[0]: null);
String requesterId = SipUtils.getUserIdFromFromHeader(request);
CallIdHeader callIdHeader = (CallIdHeader) request.getHeader(CallIdHeader.NAME);
if (requesterId == null || channelId == null) {
@ -615,7 +617,10 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
log.info("[INVITE] 来源ID: {}, callId: {}, 来自:{}{}",
requesterId, callIdHeader.getCallId(), request.getRemoteAddress(), request.getRemotePort());
inviteInfo.setRequesterId(requesterId);
inviteInfo.setChannelId(channelId);
inviteInfo.setTargetChannelId(channelId);
if (channelIdArrayFromSub != null && channelIdArrayFromSub.length == 2) {
inviteInfo.setSourceChannelId(channelIdArrayFromSub[1]);
}
inviteInfo.setSessionName(sessionName);
inviteInfo.setSsrc(gb28181Sdp.getSsrc());
inviteInfo.setCallId(callIdHeader.getCallId());
@ -702,7 +707,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
private String createSendSdp(SendRtpInfo sendRtpItem, InviteInfo inviteInfo, String sdpIp) {
StringBuilder content = new StringBuilder(200);
content.append("v=0\r\n");
content.append("o=" + inviteInfo.getChannelId() + " 0 0 IN IP4 " + sdpIp + "\r\n");
content.append("o=" + inviteInfo.getTargetChannelId() + " 0 0 IN IP4 " + sdpIp + "\r\n");
content.append("s=" + inviteInfo.getSessionName() + "\r\n");
content.append("c=IN IP4 " + sdpIp + "\r\n");
if ("Playback".equalsIgnoreCase(inviteInfo.getSessionName())) {
@ -743,30 +748,31 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
}
}
public void inviteFromDeviceHandle(SIPRequest request, String requesterId, String channelId) {
String realChannelId = null;
public void inviteFromDeviceHandle(SIPRequest request, InviteInfo inviteInfo) {
if (inviteInfo.getSourceChannelId() == null) {
log.warn("来自设备的Invite请求无法从请求信息中确定请求来自的通道已忽略requesterId {}", inviteInfo.getRequesterId());
try {
responseAck(request, Response.FORBIDDEN);
} catch (SipException | InvalidArgumentException | ParseException e) {
log.error("[命令发送失败] 来自设备的Invite请求无法从请求信息中确定所属设备 FORBIDDEN: {}", e.getMessage());
}
return;
}
// 非上级平台请求,查询是否设备请求(通常为接收语音广播的设备)
Device device = redisCatchStorage.getDevice(requesterId);
Device device = redisCatchStorage.getDevice(inviteInfo.getRequesterId());
// 判断requesterId是设备还是通道
if (device == null) {
device = storager.queryVideoDeviceByChannelId(requesterId);
realChannelId = requesterId;
}else {
realChannelId = channelId;
device = storager.queryVideoDeviceByChannelId(inviteInfo.getRequesterId());
}
if (device == null) {
// 检查channelID是否可用
device = redisCatchStorage.getDevice(channelId);
if (device == null) {
device = storager.queryVideoDeviceByChannelId(channelId);
realChannelId = channelId;
}
device = storager.queryVideoDeviceByChannelId(inviteInfo.getSourceChannelId());
}
if (device == null) {
log.warn("来自设备的Invite请求无法从请求信息中确定所属设备已忽略requesterId {}/{}", requesterId, channelId);
log.warn("来自设备的Invite请求无法从请求信息中确定所属设备已忽略requesterId {}/{}", inviteInfo.getRequesterId(),
inviteInfo.getSourceChannelId());
try {
responseAck(request, Response.FORBIDDEN);
} catch (SipException | InvalidArgumentException | ParseException e) {
@ -774,19 +780,24 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
}
return;
}
DeviceChannel deviceChannel = deviceChannelService.getOne(device.getDeviceId(), realChannelId);
DeviceChannel deviceChannel = deviceChannelService.getOne(device.getDeviceId(), inviteInfo.getSourceChannelId());
if (deviceChannel == null) {
log.warn("来自设备的Invite请求无法从请求信息中确定所属通道已忽略requesterId {}/{}", requesterId, realChannelId);
List<AudioBroadcastCatch> audioBroadcastCatchList = audioBroadcastManager.getByDeviceId(device.getDeviceId());
if (audioBroadcastCatchList.isEmpty()) {
log.warn("来自设备的Invite请求无法从请求信息中确定所属通道已忽略requesterId {}/{}", inviteInfo.getRequesterId(), inviteInfo.getSourceChannelId());
try {
responseAck(request, Response.FORBIDDEN);
} catch (SipException | InvalidArgumentException | ParseException e) {
log.error("[命令发送失败] 来自设备的Invite请求无法从请求信息中确定所属设备 FORBIDDEN: {}", e.getMessage());
}
return;
}else {
deviceChannel = deviceChannelService.getOneById(audioBroadcastCatchList.get(0).getChannelId());
}
}
AudioBroadcastCatch broadcastCatch = audioBroadcastManager.get(deviceChannel.getId());
if (broadcastCatch == null) {
log.warn("来自设备的Invite请求非语音广播已忽略requesterId {}/{}", requesterId, channelId);
log.warn("来自设备的Invite请求非语音广播已忽略requesterId {}/{}", inviteInfo.getRequesterId(), inviteInfo.getSourceChannelId());
try {
responseAck(request, Response.FORBIDDEN);
} catch (SipException | InvalidArgumentException | ParseException e) {
@ -794,8 +805,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
}
return;
}
if (device != null) {
log.info("收到设备" + requesterId + "的语音广播Invite请求");
log.info("收到设备" + inviteInfo.getRequesterId() + "的语音广播Invite请求");
String key = VideoManagerConstants.BROADCAST_WAITE_INVITE + device.getDeviceId();
if (!SipUtils.isFrontEnd(device.getDeviceId())) {
key += broadcastCatch.getChannelId();
@ -856,7 +866,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
return;
}
String addressStr = sdp.getOrigin().getAddress();
log.info("设备{}请求语音流,地址:{}:{}ssrc{}, {}", requesterId, addressStr, port, gb28181Sdp.getSsrc(),
log.info("设备{}请求语音流,地址:{}:{}ssrc{}, {}", inviteInfo.getRequesterId(), addressStr, port, gb28181Sdp.getSsrc(),
mediaTransmissionTCP ? (tcpActive ? "TCP主动" : "TCP被动") : "UDP");
MediaServer mediaServerItem = broadcastCatch.getMediaServerItem();
@ -870,11 +880,11 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
}
return;
}
log.info("设备{}请求语音流, 收流地址:{}:{}ssrc{}, {}, 对讲方式:{}", requesterId, addressStr, port, gb28181Sdp.getSsrc(),
log.info("设备{}请求语音流, 收流地址:{}:{}ssrc{}, {}, 对讲方式:{}", inviteInfo.getRequesterId(), addressStr, port, gb28181Sdp.getSsrc(),
mediaTransmissionTCP ? (tcpActive ? "TCP主动" : "TCP被动") : "UDP", sdp.getSessionName().getValue());
CallIdHeader callIdHeader = (CallIdHeader) request.getHeader(CallIdHeader.NAME);
SendRtpInfo sendRtpItem = sendRtpServerService.createSendRtpInfo(mediaServerItem, addressStr, port, gb28181Sdp.getSsrc(), requesterId,
SendRtpInfo sendRtpItem = sendRtpServerService.createSendRtpInfo(mediaServerItem, addressStr, port, gb28181Sdp.getSsrc(), inviteInfo.getRequesterId(),
device.getDeviceId(), deviceChannel.getId(),
mediaTransmissionTCP, false);
@ -923,15 +933,6 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
log.error("[SDP解析异常]", e);
playService.stopAudioBroadcast(device, deviceChannel);
}
} else {
log.warn("来自无效设备/平台的请求");
try {
responseAck(request, Response.BAD_REQUEST);
; // 不支持的格式发415
} catch (SipException | InvalidArgumentException | ParseException e) {
log.error("[命令发送失败] invite 来自无效设备/平台的请求, {}", e.getMessage());
}
}
}
SIPResponse sendOk(Device device, DeviceChannel channel, SendRtpInfo sendRtpItem, SessionDescription sdp, SIPRequest request, MediaServer mediaServerItem, boolean mediaTransmissionTCP, String ssrc) {

View File

@ -18,7 +18,7 @@ import javax.sdp.SessionDescription;
import javax.sip.PeerUnavailableException;
import javax.sip.SipFactory;
import javax.sip.header.FromHeader;
import javax.sip.header.Header;
import javax.sip.header.SubjectHeader;
import javax.sip.header.UserAgentHeader;
import javax.sip.message.Request;
import java.text.ParseException;
@ -44,13 +44,22 @@ public class SipUtils {
/**
* subjectchannelId
* */
public static String getChannelIdFromRequest(Request request) {
Header subject = request.getHeader("subject");
public static String[] getChannelIdFromRequest(Request request) {
SubjectHeader subject = (Subject)request.getHeader("subject");
if (subject == null) {
// 如果缺失subject
return null;
}
return ((Subject) subject).getSubject().split(":")[0];
String[] result = new String[2];
String subjectStr = subject.getSubject();
if (subjectStr.indexOf(",") > 0) {
String[] subjectSplit = subjectStr.split(",");
result[0] = subjectSplit[0].split(":")[0];
result[1] = subjectSplit[1].split(":")[0];
}else {
result[0] = subjectStr.split(":")[0];
}
return result;
}
public static String getUserIdFromFromHeader(FromHeader fromHeader) {

View File

@ -855,6 +855,8 @@ public class MediaServerServiceImpl implements IMediaServerService {
log.info("[startSendRtpStream] 失败, mediaServer的类型 {},未找到对应的实现类", mediaServer.getType());
throw new ControllerException(ErrorCode.ERROR100.getCode(), "未找到mediaServer对应的实现类");
}
sendRtpItem.setRtcp(true);
log.info("[开始推流] {}/{}, 目标={}:{}SSRC={}, RTCP={}", sendRtpItem.getApp(), sendRtpItem.getStream(),
sendRtpItem.getIp(), sendRtpItem.getPort(), sendRtpItem.getSsrc(), sendRtpItem.isRtcp());
mediaNodeServerService.startSendRtpStream(mediaServer, sendRtpItem);

View File

@ -376,7 +376,7 @@ public class ZLMMediaNodeServerService implements IMediaNodeServerService {
param.put("is_udp", sendRtpItem.isTcp() ? "0" : "1");
if (!sendRtpItem.isTcp()) {
// udp模式下开启rtcp保活
param.put("udp_rtcp_timeout", sendRtpItem.isRtcp() ? "1" : "0");
param.put("udp_rtcp_timeout", sendRtpItem.isRtcp() ? "500" : "0");
}
param.put("dst_url", sendRtpItem.getIp());
param.put("dst_port", sendRtpItem.getPort());
@ -384,6 +384,7 @@ public class ZLMMediaNodeServerService implements IMediaNodeServerService {
if (jsonObject == null || jsonObject.getInteger("code") != 0 ) {
throw new ControllerException(jsonObject.getInteger("code"), jsonObject.getString("msg"));
}
log.info("[推流结果]{} ,参数: {}",jsonObject, JSONObject.toJSONString(param));
}
@Override

View File

@ -457,8 +457,11 @@ public class StreamPushServiceImpl implements IStreamPushService {
}
List<CommonGBChannel> commonGBChannelList = new ArrayList<>();
for (StreamPush streamPush : streamPushList) {
CommonGBChannel commonGBChannel = streamPush.buildCommonGBChannel();
if (commonGBChannel != null) {
commonGBChannelList.add(streamPush.buildCommonGBChannel());
}
}
gbChannelService.offline(commonGBChannelList);
}