临时提交

pull/1642/head
648540858 2024-09-04 18:01:54 +08:00
parent 9a0f1512c9
commit ad5b66eb71
18 changed files with 161 additions and 476 deletions

View File

@ -1,20 +1,22 @@
package com.genersoft.iot.vmp.gb28181.bean; package com.genersoft.iot.vmp.gb28181.bean;
import com.genersoft.iot.vmp.media.bean.MediaServer;
import com.genersoft.iot.vmp.gb28181.controller.bean.AudioBroadcastEvent; import com.genersoft.iot.vmp.gb28181.controller.bean.AudioBroadcastEvent;
import com.genersoft.iot.vmp.media.bean.MediaServer;
import gov.nist.javax.sip.message.SIPResponse; import gov.nist.javax.sip.message.SIPResponse;
import lombok.Data;
/** /**
* 广 * 广
* @author lin * @author lin
*/ */
@Data
public class AudioBroadcastCatch { public class AudioBroadcastCatch {
public AudioBroadcastCatch( public AudioBroadcastCatch(
String deviceId, String deviceId,
String channelId, Integer channelId,
MediaServer mediaServerItem, MediaServer mediaServerItem,
String app, String app,
String stream, String stream,
@ -43,7 +45,7 @@ public class AudioBroadcastCatch {
/** /**
* *
*/ */
private String channelId; private Integer channelId;
/** /**
* *
@ -81,79 +83,7 @@ public class AudioBroadcastCatch {
private AudioBroadcastEvent event; private AudioBroadcastEvent event;
public String getDeviceId() { public void setSipTransactionInfoByRequest(SIPResponse sipResponse) {
return deviceId;
}
public void setDeviceId(String deviceId) {
this.deviceId = deviceId;
}
public String getChannelId() {
return channelId;
}
public void setChannelId(String channelId) {
this.channelId = channelId;
}
public AudioBroadcastCatchStatus getStatus() {
return status;
}
public void setStatus(AudioBroadcastCatchStatus status) {
this.status = status;
}
public SipTransactionInfo getSipTransactionInfo() {
return sipTransactionInfo;
}
public MediaServer getMediaServerItem() {
return mediaServerItem;
}
public void setMediaServerItem(MediaServer mediaServerItem) {
this.mediaServerItem = mediaServerItem;
}
public String getApp() {
return app;
}
public void setApp(String app) {
this.app = app;
}
public String getStream() {
return stream;
}
public void setStream(String stream) {
this.stream = stream;
}
public boolean isFromPlatform() {
return isFromPlatform;
}
public void setFromPlatform(boolean fromPlatform) {
isFromPlatform = fromPlatform;
}
public void setSipTransactionInfo(SipTransactionInfo sipTransactionInfo) {
this.sipTransactionInfo = sipTransactionInfo;
}
public AudioBroadcastEvent getEvent() {
return event;
}
public void setEvent(AudioBroadcastEvent event) {
this.event = event;
}
public void setSipTransactionInfoByRequset(SIPResponse sipResponse) {
this.sipTransactionInfo = new SipTransactionInfo(sipResponse); this.sipTransactionInfo = new SipTransactionInfo(sipResponse);
} }
} }

View File

@ -235,7 +235,7 @@ public class DeviceServiceImpl implements IDeviceService {
removeCatalogSubscribe(device, null); removeCatalogSubscribe(device, null);
removeMobilePositionSubscribe(device, null); removeMobilePositionSubscribe(device, null);
List<AudioBroadcastCatch> audioBroadcastCatches = audioBroadcastManager.get(deviceId); List<AudioBroadcastCatch> audioBroadcastCatches = audioBroadcastManager.getByDeviceId(deviceId);
if (audioBroadcastCatches.size() > 0) { if (audioBroadcastCatches.size() > 0) {
for (AudioBroadcastCatch audioBroadcastCatch : audioBroadcastCatches) { for (AudioBroadcastCatch audioBroadcastCatch : audioBroadcastCatches) {

View File

@ -112,7 +112,6 @@ public class PlatformServiceImpl implements IPlatformService {
if (sendRtpItem != null && sendRtpItem.getApp().equals(event.getApp())) { if (sendRtpItem != null && sendRtpItem.getApp().equals(event.getApp())) {
String platformId = sendRtpItem.getPlatformId(); String platformId = sendRtpItem.getPlatformId();
Platform platform = platformMapper.getParentPlatByServerGBId(platformId); Platform platform = platformMapper.getParentPlatByServerGBId(platformId);
try { try {
if (platform != null) { if (platform != null) {
commanderForPlatform.streamByeCmd(platform, sendRtpItem); commanderForPlatform.streamByeCmd(platform, sendRtpItem);
@ -552,7 +551,7 @@ public class PlatformServiceImpl implements IPlatformService {
log.info("[国标级联] 发起语音喊话 收流超时 deviceId: {}, channelId: {},端口:{}, SSRC: {}", platform.getServerGBId(), channel.getGbDeviceId(), ssrcInfo.getPort(), ssrcInfo.getSsrc()); log.info("[国标级联] 发起语音喊话 收流超时 deviceId: {}, channelId: {},端口:{}, SSRC: {}", platform.getServerGBId(), channel.getGbDeviceId(), ssrcInfo.getPort(), ssrcInfo.getSsrc());
// 点播超时回复BYE 同时释放ssrc以及此次点播的资源 // 点播超时回复BYE 同时释放ssrc以及此次点播的资源
try { try {
commanderForPlatform.streamByeCmd(platform, channel.getGbDeviceId(), ssrcInfo.getStream(), null, null); commanderForPlatform.streamByeCmd(platform, channel, ssrcInfo.getStream(), null, null);
} catch (InvalidArgumentException | ParseException | SipException | SsrcTransactionNotFoundException e) { } catch (InvalidArgumentException | ParseException | SipException | SsrcTransactionNotFoundException e) {
log.error("[点播超时] 发送BYE失败 {}", e.getMessage()); log.error("[点播超时] 发送BYE失败 {}", e.getMessage());
} finally { } finally {
@ -564,7 +563,7 @@ public class PlatformServiceImpl implements IPlatformService {
} }
} }
}, userSetting.getPlayTimeout()); }, userSetting.getPlayTimeout());
commanderForPlatform.broadcastInviteCmd(platform, channel.getGbDeviceId(), mediaServerItem, ssrcInfo, (hookData)->{ commanderForPlatform.broadcastInviteCmd(platform, channel, mediaServerItem, ssrcInfo, (hookData)->{
log.info("[国标级联] 发起语音喊话 收到上级推流 deviceId: {}, channelId: {}", platform.getServerGBId(), channel.getGbDeviceId()); log.info("[国标级联] 发起语音喊话 收到上级推流 deviceId: {}, channelId: {}", platform.getServerGBId(), channel.getGbDeviceId());
dynamicTask.stop(timeOutTaskKey); dynamicTask.stop(timeOutTaskKey);
// hook响应 // hook响应

View File

@ -180,11 +180,11 @@ public class PlayServiceImpl implements IPlayService {
cmder.streamByeCmd(device, sendRtpItem.getChannelId(), event.getStream(), sendRtpItem.getCallId()); cmder.streamByeCmd(device, sendRtpItem.getChannelId(), event.getStream(), sendRtpItem.getCallId());
if (sendRtpItem.getPlayType().equals(InviteStreamType.BROADCAST) if (sendRtpItem.getPlayType().equals(InviteStreamType.BROADCAST)
|| sendRtpItem.getPlayType().equals(InviteStreamType.TALK)) { || sendRtpItem.getPlayType().equals(InviteStreamType.TALK)) {
AudioBroadcastCatch audioBroadcastCatch = audioBroadcastManager.get(sendRtpItem.getDeviceId(), sendRtpItem.getChannelId()); AudioBroadcastCatch audioBroadcastCatch = audioBroadcastManager.getByDeviceId(sendRtpItem.getDeviceId(), sendRtpItem.getChannelId());
if (audioBroadcastCatch != null) { if (audioBroadcastCatch != null) {
// 来自上级平台的停止对讲 // 来自上级平台的停止对讲
log.info("[停止对讲] 来自上级,平台:{}, 通道:{}", sendRtpItem.getDeviceId(), sendRtpItem.getChannelId()); log.info("[停止对讲] 来自上级,平台:{}, 通道:{}", sendRtpItem.getDeviceId(), sendRtpItem.getChannelId());
audioBroadcastManager.del(sendRtpItem.getDeviceId(), sendRtpItem.getChannelId()); audioBroadcastManager.del(sendRtpItem.getChannelId());
} }
} }
} }
@ -1284,9 +1284,9 @@ public class PlayServiceImpl implements IPlayService {
log.info("[停止对讲] 设备:{}, 通道:{}", deviceId, channelId); log.info("[停止对讲] 设备:{}, 通道:{}", deviceId, channelId);
List<AudioBroadcastCatch> audioBroadcastCatchList = new ArrayList<>(); List<AudioBroadcastCatch> audioBroadcastCatchList = new ArrayList<>();
if (channelId == null) { if (channelId == null) {
audioBroadcastCatchList.addAll(audioBroadcastManager.get(deviceId)); audioBroadcastCatchList.addAll(audioBroadcastManager.getByDeviceId(deviceId));
} else { } else {
audioBroadcastCatchList.add(audioBroadcastManager.get(deviceId, channelId)); audioBroadcastCatchList.add(audioBroadcastManager.getByDeviceId(deviceId, channelId));
} }
if (audioBroadcastCatchList.size() > 0) { if (audioBroadcastCatchList.size() > 0) {
for (AudioBroadcastCatch audioBroadcastCatch : audioBroadcastCatchList) { for (AudioBroadcastCatch audioBroadcastCatch : audioBroadcastCatchList) {
@ -1450,7 +1450,7 @@ public class PlayServiceImpl implements IPlayService {
public void startSendRtpStreamFailHand(SendRtpItem sendRtpItem, Platform platform, CallIdHeader callIdHeader) { public void startSendRtpStreamFailHand(SendRtpItem sendRtpItem, Platform platform, CallIdHeader callIdHeader) {
if (sendRtpItem.isOnlyAudio()) { if (sendRtpItem.isOnlyAudio()) {
Device device = deviceService.getDeviceByDeviceId(sendRtpItem.getDeviceId()); Device device = deviceService.getDeviceByDeviceId(sendRtpItem.getDeviceId());
AudioBroadcastCatch audioBroadcastCatch = audioBroadcastManager.get(sendRtpItem.getDeviceId(), sendRtpItem.getChannelId()); AudioBroadcastCatch audioBroadcastCatch = audioBroadcastManager.get(sendRtpItem.getChannelId());
if (audioBroadcastCatch != null) { if (audioBroadcastCatch != null) {
try { try {
cmder.streamByeCmd(device, sendRtpItem.getChannelId(), audioBroadcastCatch.getSipTransactionInfo(), null); cmder.streamByeCmd(device, sendRtpItem.getChannelId(), audioBroadcastCatch.getSipTransactionInfo(), null);

View File

@ -7,10 +7,11 @@ import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import java.util.*; import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import java.util.stream.Stream;
/** /**
* 广 * 广
@ -23,81 +24,42 @@ public class AudioBroadcastManager {
@Autowired @Autowired
private SipConfig config; private SipConfig config;
public static Map<String, AudioBroadcastCatch> data = new ConcurrentHashMap<>(); public static Map<Integer, AudioBroadcastCatch> data = new ConcurrentHashMap<>();
public void update(AudioBroadcastCatch audioBroadcastCatch) { public void update(AudioBroadcastCatch audioBroadcastCatch) {
if (SipUtils.isFrontEnd(audioBroadcastCatch.getDeviceId())) { data.put(audioBroadcastCatch.getChannelId(), audioBroadcastCatch);
audioBroadcastCatch.setChannelId(audioBroadcastCatch.getDeviceId());
data.put(audioBroadcastCatch.getDeviceId(), audioBroadcastCatch);
}else {
data.put(audioBroadcastCatch.getDeviceId() + audioBroadcastCatch.getChannelId(), audioBroadcastCatch);
}
} }
public void del(String deviceId, String channelId) { public void del(Integer channelId) {
if (SipUtils.isFrontEnd(deviceId)) { data.remove(channelId);
data.remove(deviceId);
}else {
data.remove(deviceId + channelId);
}
} }
public void delByDeviceId(String deviceId) {
for (String key : data.keySet()) {
if (key.startsWith(deviceId)) {
data.remove(key);
}
}
}
public List<AudioBroadcastCatch> getAll(){ public List<AudioBroadcastCatch> getAll(){
Collection<AudioBroadcastCatch> values = data.values(); Collection<AudioBroadcastCatch> values = data.values();
return new ArrayList<>(values); return new ArrayList<>(values);
} }
public boolean exit(String deviceId, String channelId) { public boolean exit(Integer channelId) {
for (String key : data.keySet()) { return data.containsKey(channelId);
if (SipUtils.isFrontEnd(deviceId)) {
return key.equals(deviceId);
}else {
return key.equals(deviceId + channelId);
}
}
return false;
} }
public AudioBroadcastCatch get(String deviceId, String channelId) { public AudioBroadcastCatch get(Integer channelId) {
AudioBroadcastCatch audioBroadcastCatch; return data.get(channelId);
if (SipUtils.isFrontEnd(deviceId)) {
audioBroadcastCatch = data.get(deviceId);
}else {
audioBroadcastCatch = data.get(deviceId + channelId);
}
if (audioBroadcastCatch == null) {
Stream<AudioBroadcastCatch> allAudioBroadcastCatchStreamForDevice = data.values().stream().filter(
audioBroadcastCatchItem -> Objects.equals(audioBroadcastCatchItem.getDeviceId(), deviceId));
List<AudioBroadcastCatch> audioBroadcastCatchList = allAudioBroadcastCatchStreamForDevice.collect(Collectors.toList());
if (audioBroadcastCatchList.size() == 1 && Objects.equals(config.getId(), channelId)) {
audioBroadcastCatch = audioBroadcastCatchList.get(0);
}
} }
return audioBroadcastCatch; public List<AudioBroadcastCatch> getByDeviceId(String deviceId) {
}
public List<AudioBroadcastCatch> get(String deviceId) {
List<AudioBroadcastCatch> audioBroadcastCatchList= new ArrayList<>(); List<AudioBroadcastCatch> audioBroadcastCatchList= new ArrayList<>();
if (SipUtils.isFrontEnd(deviceId)) { if (SipUtils.isFrontEnd(deviceId)) {
if (data.get(deviceId) != null) { if (data.get(deviceId) != null) {
audioBroadcastCatchList.add(data.get(deviceId)); audioBroadcastCatchList.add(data.get(deviceId));
} }
}else { }else {
for (String key : data.keySet()) { for (AudioBroadcastCatch broadcastCatch : data.values()) {
if (key.startsWith(deviceId)) { if (broadcastCatch.getDeviceId().equals(deviceId)) {
audioBroadcastCatchList.add(data.get(key)); audioBroadcastCatchList.add(broadcastCatch);
} }
} }
} }

View File

@ -38,7 +38,7 @@ public class VideoStreamSessionManager {
* @param mediaServerId 使ID * @param mediaServerId 使ID
* @param response * @param response
*/ */
public void put(String deviceId, String channelId, String callId, String stream, String ssrc, String mediaServerId, SIPResponse response, InviteSessionType type){ public void put(String deviceId, Integer channelId, String callId, String stream, String ssrc, String mediaServerId, SIPResponse response, InviteSessionType type){
SsrcTransaction ssrcTransaction = new SsrcTransaction(); SsrcTransaction ssrcTransaction = new SsrcTransaction();
ssrcTransaction.setDeviceId(deviceId); ssrcTransaction.setDeviceId(deviceId);
ssrcTransaction.setChannelId(channelId); ssrcTransaction.setChannelId(channelId);

View File

@ -152,9 +152,9 @@ public interface ISIPCommanderForPlatform {
void streamByeCmd(Platform platform, SendRtpItem sendRtpItem) throws SipException, InvalidArgumentException, ParseException; void streamByeCmd(Platform platform, SendRtpItem sendRtpItem) throws SipException, InvalidArgumentException, ParseException;
void streamByeCmd(Platform platform, String channelId, String stream, String callId, SipSubscribe.Event okEvent) throws InvalidArgumentException, SipException, ParseException, SsrcTransactionNotFoundException; void streamByeCmd(Platform platform, CommonGBChannel channel, String stream, String callId, SipSubscribe.Event okEvent) throws InvalidArgumentException, SipException, ParseException, SsrcTransactionNotFoundException;
void broadcastInviteCmd(Platform platform, String channelId, MediaServer mediaServerItem, void broadcastInviteCmd(Platform platform, CommonGBChannel channel, MediaServer mediaServerItem,
SSRCInfo ssrcInfo, HookSubscribe.Event event, SipSubscribe.Event okEvent, SSRCInfo ssrcInfo, HookSubscribe.Event event, SipSubscribe.Event okEvent,
SipSubscribe.Event errorEvent) throws ParseException, SipException, InvalidArgumentException; SipSubscribe.Event errorEvent) throws ParseException, SipException, InvalidArgumentException;

View File

@ -656,17 +656,17 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform {
} }
@Override @Override
public void streamByeCmd(Platform platform, String channelId, String stream, String callId, SipSubscribe.Event okEvent) throws InvalidArgumentException, SipException, ParseException, SsrcTransactionNotFoundException { public void streamByeCmd(Platform platform, CommonGBChannel channel, String stream, String callId, SipSubscribe.Event okEvent) throws InvalidArgumentException, SipException, ParseException, SsrcTransactionNotFoundException {
SsrcTransaction ssrcTransaction = streamSession.getSsrcTransaction(platform.getServerGBId(), channelId, callId, stream); SsrcTransaction ssrcTransaction = streamSession.getSsrcTransaction(channel.getGbId(), callId, stream);
if (ssrcTransaction == null) { if (ssrcTransaction == null) {
throw new SsrcTransactionNotFoundException(platform.getServerGBId(), channelId, callId, stream); throw new SsrcTransactionNotFoundException(platform.getServerGBId(), channel.getGbDeviceId(), callId, stream);
} }
mediaServerService.releaseSsrc(ssrcTransaction.getMediaServerId(), ssrcTransaction.getSsrc()); mediaServerService.releaseSsrc(ssrcTransaction.getMediaServerId(), ssrcTransaction.getSsrc());
mediaServerService.closeRTPServer(ssrcTransaction.getMediaServerId(), ssrcTransaction.getStream()); mediaServerService.closeRTPServer(ssrcTransaction.getMediaServerId(), ssrcTransaction.getStream());
streamSession.remove(ssrcTransaction.getDeviceId(), ssrcTransaction.getChannelId(), ssrcTransaction.getStream()); streamSession.remove(ssrcTransaction.getDeviceId(), ssrcTransaction.getChannelId(), ssrcTransaction.getStream());
Request byteRequest = headerProviderPlatformProvider.createByteRequest(platform, channelId, ssrcTransaction.getSipTransactionInfo()); Request byteRequest = headerProviderPlatformProvider.createByteRequest(platform, channel.getGbDeviceId(), ssrcTransaction.getSipTransactionInfo());
sipSender.transmitRequest(sipLayer.getLocalIp(platform.getDeviceIp()), byteRequest, null, okEvent); sipSender.transmitRequest(sipLayer.getLocalIp(platform.getDeviceIp()), byteRequest, null, okEvent);
} }
@ -694,7 +694,7 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform {
} }
@Override @Override
public void broadcastInviteCmd(Platform platform, String channelId, MediaServer mediaServerItem, public void broadcastInviteCmd(Platform platform, CommonGBChannel channel, MediaServer mediaServerItem,
SSRCInfo ssrcInfo, HookSubscribe.Event event, SipSubscribe.Event okEvent, SSRCInfo ssrcInfo, HookSubscribe.Event event, SipSubscribe.Event okEvent,
SipSubscribe.Event errorEvent) throws ParseException, SipException, InvalidArgumentException { SipSubscribe.Event errorEvent) throws ParseException, SipException, InvalidArgumentException {
String stream = ssrcInfo.getStream(); String stream = ssrcInfo.getStream();
@ -715,7 +715,7 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform {
StringBuffer content = new StringBuffer(200); StringBuffer content = new StringBuffer(200);
content.append("v=0\r\n"); content.append("v=0\r\n");
content.append("o=" + channelId + " 0 0 IN IP4 " + sdpIp + "\r\n"); content.append("o=" + channel.getGbDeviceId() + " 0 0 IN IP4 " + sdpIp + "\r\n");
content.append("s=Play\r\n"); content.append("s=Play\r\n");
content.append("c=IN IP4 " + sdpIp + "\r\n"); content.append("c=IN IP4 " + sdpIp + "\r\n");
content.append("t=0 0\r\n"); content.append("t=0 0\r\n");
@ -742,18 +742,18 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform {
content.append("y=" + ssrcInfo.getSsrc() + "\r\n");//ssrc content.append("y=" + ssrcInfo.getSsrc() + "\r\n");//ssrc
CallIdHeader callIdHeader = sipSender.getNewCallIdHeader(sipLayer.getLocalIp(platform.getDeviceIp()), platform.getTransport()); CallIdHeader callIdHeader = sipSender.getNewCallIdHeader(sipLayer.getLocalIp(platform.getDeviceIp()), platform.getTransport());
Request request = headerProviderPlatformProvider.createInviteRequest(platform, channelId, Request request = headerProviderPlatformProvider.createInviteRequest(platform, channel.getGbDeviceId(),
content.toString(), SipUtils.getNewViaTag(), SipUtils.getNewFromTag(), ssrcInfo.getSsrc(), content.toString(), SipUtils.getNewViaTag(), SipUtils.getNewFromTag(), ssrcInfo.getSsrc(),
callIdHeader); callIdHeader);
sipSender.transmitRequest(sipLayer.getLocalIp(platform.getDeviceIp()), request, (e -> { sipSender.transmitRequest(sipLayer.getLocalIp(platform.getDeviceIp()), request, (e -> {
streamSession.remove(platform.getServerGBId(), channelId, ssrcInfo.getStream()); streamSession.remove(platform.getServerGBId(), channel.getGbId(), ssrcInfo.getStream());
mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc()); mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
subscribe.removeSubscribe(hook); subscribe.removeSubscribe(hook);
errorEvent.response(e); errorEvent.response(e);
}), e -> { }), e -> {
ResponseEvent responseEvent = (ResponseEvent) e.event; ResponseEvent responseEvent = (ResponseEvent) e.event;
SIPResponse response = (SIPResponse) responseEvent.getResponse(); SIPResponse response = (SIPResponse) responseEvent.getResponse();
streamSession.put(platform.getServerGBId(), channelId, callIdHeader.getCallId(), stream, ssrcInfo.getSsrc(), mediaServerItem.getId(), response, InviteSessionType.BROADCAST); streamSession.put(platform.getServerGBId(), channel.getGbId(), callIdHeader.getCallId(), stream, ssrcInfo.getSsrc(), mediaServerItem.getId(), response, InviteSessionType.BROADCAST);
okEvent.response(e); okEvent.response(e);
}); });
} }

View File

@ -152,11 +152,11 @@ public class ByeRequestProcessor extends SIPRequestProcessorParent implements In
} }
MediaServer mediaServer = mediaServerService.getOne(sendRtpItem.getMediaServerId()); MediaServer mediaServer = mediaServerService.getOne(sendRtpItem.getMediaServerId());
if (mediaServer != null) { if (mediaServer != null) {
AudioBroadcastCatch audioBroadcastCatch = audioBroadcastManager.get(sendRtpItem.getDeviceId(), sendRtpItem.getChannelId()); AudioBroadcastCatch audioBroadcastCatch = audioBroadcastManager.getByDeviceId(sendRtpItem.getDeviceId(), sendRtpItem.getChannelId());
if (audioBroadcastCatch != null && audioBroadcastCatch.getSipTransactionInfo().getCallId().equals(callIdHeader.getCallId())) { if (audioBroadcastCatch != null && audioBroadcastCatch.getSipTransactionInfo().getCallId().equals(callIdHeader.getCallId())) {
// 来自上级平台的停止对讲 // 来自上级平台的停止对讲
log.info("[停止对讲] 来自上级,平台:{}, 通道:{}", sendRtpItem.getDeviceId(), sendRtpItem.getChannelId()); log.info("[停止对讲] 来自上级,平台:{}, 通道:{}", sendRtpItem.getDeviceId(), sendRtpItem.getChannelId());
audioBroadcastManager.del(sendRtpItem.getDeviceId(), sendRtpItem.getChannelId()); audioBroadcastManager.del(sendRtpItem.getChannelId());
} }
MediaInfo mediaInfo = mediaServerService.getMediaInfo(mediaServer, sendRtpItem.getApp(), streamId); MediaInfo mediaInfo = mediaServerService.getMediaInfo(mediaServer, sendRtpItem.getApp(), streamId);
@ -231,7 +231,7 @@ public class ByeRequestProcessor extends SIPRequestProcessorParent implements In
case TALK: case TALK:
// 查找来源的对讲设备,发送停止 // 查找来源的对讲设备,发送停止
Device sourceDevice = storager.queryVideoDeviceByPlatformIdAndChannelId(ssrcTransaction.getDeviceId(), ssrcTransaction.getChannelId()); Device sourceDevice = storager.queryVideoDeviceByPlatformIdAndChannelId(ssrcTransaction.getDeviceId(), ssrcTransaction.getChannelId());
AudioBroadcastCatch audioBroadcastCatch = audioBroadcastManager.get(ssrcTransaction.getDeviceId(), channel.getDeviceId()); AudioBroadcastCatch audioBroadcastCatch = audioBroadcastManager.getByDeviceId(ssrcTransaction.getDeviceId(), channel.getDeviceId());
if (sourceDevice != null) { if (sourceDevice != null) {
playService.stopAudioBroadcast(sourceDevice.getDeviceId(), channel.getDeviceId()); playService.stopAudioBroadcast(sourceDevice.getDeviceId(), channel.getDeviceId());
} }

View File

@ -1,7 +1,5 @@
package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl; package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl;
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONObject;
import com.genersoft.iot.vmp.common.InviteSessionType; import com.genersoft.iot.vmp.common.InviteSessionType;
import com.genersoft.iot.vmp.common.VideoManagerConstants; import com.genersoft.iot.vmp.common.VideoManagerConstants;
import com.genersoft.iot.vmp.conf.DynamicTask; import com.genersoft.iot.vmp.conf.DynamicTask;
@ -9,10 +7,7 @@ import com.genersoft.iot.vmp.conf.SipConfig;
import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.conf.exception.ControllerException; import com.genersoft.iot.vmp.conf.exception.ControllerException;
import com.genersoft.iot.vmp.gb28181.bean.*; import com.genersoft.iot.vmp.gb28181.bean.*;
import com.genersoft.iot.vmp.gb28181.service.IGbChannelPlayService; import com.genersoft.iot.vmp.gb28181.service.*;
import com.genersoft.iot.vmp.gb28181.service.IGbChannelService;
import com.genersoft.iot.vmp.gb28181.service.IPlatformService;
import com.genersoft.iot.vmp.gb28181.service.IPlayService;
import com.genersoft.iot.vmp.gb28181.session.AudioBroadcastManager; import com.genersoft.iot.vmp.gb28181.session.AudioBroadcastManager;
import com.genersoft.iot.vmp.gb28181.session.SSRCFactory; import com.genersoft.iot.vmp.gb28181.session.SSRCFactory;
import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager; import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager;
@ -21,15 +16,11 @@ import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.ISIPRequestProcessor; import com.genersoft.iot.vmp.gb28181.transmit.event.request.ISIPRequestProcessor;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent; import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent;
import com.genersoft.iot.vmp.gb28181.utils.SipUtils; import com.genersoft.iot.vmp.gb28181.utils.SipUtils;
import com.genersoft.iot.vmp.media.bean.MediaInfo;
import com.genersoft.iot.vmp.media.bean.MediaServer; import com.genersoft.iot.vmp.media.bean.MediaServer;
import com.genersoft.iot.vmp.media.event.hook.Hook;
import com.genersoft.iot.vmp.media.event.hook.HookSubscribe; import com.genersoft.iot.vmp.media.event.hook.HookSubscribe;
import com.genersoft.iot.vmp.media.event.hook.HookType;
import com.genersoft.iot.vmp.media.service.IMediaServerService; import com.genersoft.iot.vmp.media.service.IMediaServerService;
import com.genersoft.iot.vmp.media.zlm.SendRtpPortManager; import com.genersoft.iot.vmp.media.zlm.SendRtpPortManager;
import com.genersoft.iot.vmp.service.bean.InviteErrorCode; import com.genersoft.iot.vmp.service.bean.InviteErrorCode;
import com.genersoft.iot.vmp.service.bean.MessageForPushChannel;
import com.genersoft.iot.vmp.service.redisMsg.IRedisRpcService; import com.genersoft.iot.vmp.service.redisMsg.IRedisRpcService;
import com.genersoft.iot.vmp.service.redisMsg.RedisPushStreamResponseListener; import com.genersoft.iot.vmp.service.redisMsg.RedisPushStreamResponseListener;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
@ -56,8 +47,6 @@ import javax.sip.header.CallIdHeader;
import javax.sip.message.Response; import javax.sip.message.Response;
import java.text.ParseException; import java.text.ParseException;
import java.time.Instant; import java.time.Instant;
import java.util.Map;
import java.util.Random;
import java.util.Vector; import java.util.Vector;
/** /**
@ -76,6 +65,9 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
@Autowired @Autowired
private IVideoManagerStorage storager; private IVideoManagerStorage storager;
@Autowired
private IDeviceChannelService deviceChannelService;
@Autowired @Autowired
private IGbChannelService channelService; private IGbChannelService channelService;
@ -193,7 +185,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
SendRtpItem sendRtpItem = mediaServerService.createSendRtpItem(streamInfo.getMediaServer(), SendRtpItem sendRtpItem = mediaServerService.createSendRtpItem(streamInfo.getMediaServer(),
inviteInfo.getIp(), inviteInfo.getPort(), inviteInfo.getSsrc(), platform.getServerGBId(), inviteInfo.getIp(), inviteInfo.getPort(), inviteInfo.getSsrc(), platform.getServerGBId(),
streamInfo.getApp(), streamInfo.getStream(), streamInfo.getApp(), streamInfo.getStream(),
channel.getGbDeviceId(), inviteInfo.isTcp(), platform.isRtcp()); channel.getGbId(), inviteInfo.isTcp(), platform.isRtcp());
if (inviteInfo.isTcp() && inviteInfo.isTcpActive()) { if (inviteInfo.isTcp() && inviteInfo.isTcpActive()) {
sendRtpItem.setTcpActive(true); sendRtpItem.setTcpActive(true);
} }
@ -744,263 +736,6 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
} }
} }
private void startSendRtpStreamHand(RequestEvent evt, SendRtpItem sendRtpItem, Platform parentPlatform,
JSONObject jsonObject, Map<String, Object> param, CallIdHeader callIdHeader) {
if (jsonObject == null) {
log.error("下级TCP被动启动监听失败: 请检查ZLM服务");
} else if (jsonObject.getInteger("code") == 0) {
log.info("调用ZLM-TCP被动推流接口, 结果: {}", jsonObject);
log.info("启动监听TCP被动推流成功[ {}/{} ]{}->{}:{}, " ,param.get("app"), param.get("stream"), jsonObject.getString("local_port"), param.get("dst_url"), param.get("dst_port"));
} else {
log.error("启动监听TCP被动推流失败: {}, 参数:{}",jsonObject.getString("msg"), JSON.toJSONString(param));
}
}
/**
*
*/
private void sendProxyStream(SendRtpItem sendRtpItem, MediaServer mediaServerItem, Platform platform, SIPRequest request) {
MediaInfo mediaInfo = mediaServerService.getMediaInfo(mediaServerItem, sendRtpItem.getApp(), sendRtpItem.getStream());
if (mediaInfo != null) {
// 自平台内容
int localPort = sendRtpPortManager.getNextPort(mediaServerItem);
if (localPort == 0) {
log.warn("服务器端口资源不足");
try {
responseAck(request, Response.BUSY_HERE);
} catch (SipException | InvalidArgumentException | ParseException e) {
log.error("[命令发送失败] invite 服务器端口资源不足: {}", e.getMessage());
}
return;
}
sendRtpItem.setPlayType(InviteStreamType.PROXY);
// 写入redis 超时时回复
sendRtpItem.setStatus(1);
sendRtpItem.setLocalIp(mediaServerItem.getSdpIp());
SIPResponse response = sendStreamAck(request, sendRtpItem, platform);
if (response != null) {
sendRtpItem.setToTag(response.getToTag());
}
redisCatchStorage.updateSendRTPSever(sendRtpItem);
}
}
private void sendPushStream(SendRtpItem sendRtpItem, MediaServer mediaServerItem, Platform platform, SIPRequest request) {
// 推流
if (sendRtpItem.getServerId().equals(userSetting.getServerId())) {
MediaInfo mediaInfo = mediaServerService.getMediaInfo(mediaServerItem, sendRtpItem.getApp(), sendRtpItem.getStream());
if (mediaInfo != null ) {
// 自平台内容
int localPort = sendRtpPortManager.getNextPort(mediaServerItem);
if (localPort == 0) {
log.warn("服务器端口资源不足");
try {
responseAck(request, Response.BUSY_HERE);
} catch (SipException | InvalidArgumentException | ParseException e) {
log.error("[命令发送失败] invite 服务器端口资源不足: {}", e.getMessage());
}
return;
}
// 写入redis 超时时回复
sendRtpItem.setStatus(1);
SIPResponse response = sendStreamAck(request, sendRtpItem, platform);
if (response != null) {
sendRtpItem.setToTag(response.getToTag());
}
if (sendRtpItem.getSsrc() == null) {
// 上级平台点播时不使用上级平台指定的ssrc使用自定义的ssrc参考国标文档-点播外域设备媒体流SSRC处理方式
String ssrc = "Play".equalsIgnoreCase(sendRtpItem.getSessionName()) ? ssrcFactory.getPlaySsrc(mediaServerItem.getId()) : ssrcFactory.getPlayBackSsrc(mediaServerItem.getId());
sendRtpItem.setSsrc(ssrc);
}
redisCatchStorage.updateSendRTPSever(sendRtpItem);
} else {
// 不在线 拉起
notifyPushStreamOnline(sendRtpItem, mediaServerItem, platform, request);
}
} else {
// 其他平台内容
otherWvpPushStream(sendRtpItem, request, platform);
}
}
/**
* 线
*/
private void notifyProxyStreamOnline(SendRtpItem sendRtpItem, MediaServer mediaServerItem, Platform platform, SIPRequest request) {
// TODO 控制启用以使设备上线
log.info("[ app={}, stream={} ]通道未推流,启用流后开始推流", sendRtpItem.getApp(), sendRtpItem.getStream());
// 监听流上线
Hook hook = Hook.getInstance(HookType.on_media_arrival, sendRtpItem.getApp(), sendRtpItem.getStream(), mediaServerItem.getId());
hookSubscribe.addSubscribe(hook, (hookData)->{
log.info("[上级点播]拉流代理已经就绪, {}/{}", sendRtpItem.getApp(), sendRtpItem.getStream());
dynamicTask.stop(sendRtpItem.getCallId());
sendProxyStream(sendRtpItem, mediaServerItem, platform, request);
});
dynamicTask.startDelay(sendRtpItem.getCallId(), () -> {
log.info("[ app={}, stream={} ] 等待拉流代理流超时", sendRtpItem.getApp(), sendRtpItem.getStream());
hookSubscribe.removeSubscribe(hook);
}, userSetting.getPlatformPlayTimeout());
boolean start = streamProxyService.startByAppAndStream(sendRtpItem.getApp(), sendRtpItem.getStream());
if (!start) {
try {
responseAck(request, Response.BUSY_HERE, "channel [" + sendRtpItem.getChannelId() + "] offline");
} catch (SipException | InvalidArgumentException | ParseException e) {
log.error("[命令发送失败] invite 通道未推流: {}", e.getMessage());
}
hookSubscribe.removeSubscribe(hook);
dynamicTask.stop(sendRtpItem.getCallId());
}
}
/**
* 线
*/
private void notifyPushStreamOnline(SendRtpItem sendRtpItem, MediaServer mediaServerItem, Platform platform, SIPRequest request) {
// 发送redis消息以使设备上线流上线后被
log.info("[ app={}, stream={} ]通道未推流发送redis信息控制设备开始推流", sendRtpItem.getApp(), sendRtpItem.getStream());
MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(1,
sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getChannelId(), sendRtpItem.getPlatformId(),
platform.getName(), userSetting.getServerId(), sendRtpItem.getMediaServerId());
redisCatchStorage.sendStreamPushRequestedMsg(messageForPushChannel);
// 设置超时
dynamicTask.startDelay(sendRtpItem.getCallId(), () -> {
redisRpcService.stopWaitePushStreamOnline(sendRtpItem);
log.info("[ app={}, stream={} ] 等待设备开始推流超时", sendRtpItem.getApp(), sendRtpItem.getStream());
try {
responseAck(request, Response.REQUEST_TIMEOUT); // 超时
} catch (SipException | InvalidArgumentException | ParseException e) {
log.error("未处理的异常 ", e);
}
}, userSetting.getPlatformPlayTimeout());
//
long key = redisRpcService.waitePushStreamOnline(sendRtpItem, (sendRtpItemKey) -> {
dynamicTask.stop(sendRtpItem.getCallId());
if (sendRtpItemKey == null) {
log.warn("[级联点播] 等待推流得到结果未空: {}/{}", sendRtpItem.getApp(), sendRtpItem.getStream());
try {
responseAck(request, Response.BUSY_HERE);
} catch (SipException | InvalidArgumentException | ParseException e) {
log.error("未处理的异常 ", e);
}
return;
}
SendRtpItem sendRtpItemFromRedis = (SendRtpItem)redisTemplate.opsForValue().get(sendRtpItemKey);
if (sendRtpItemFromRedis == null) {
log.warn("[级联点播] 等待推流, 未找到redis中缓存的发流信息 {}/{}", sendRtpItem.getApp(), sendRtpItem.getStream());
try {
responseAck(request, Response.BUSY_HERE);
} catch (SipException | InvalidArgumentException | ParseException e) {
log.error("未处理的异常 ", e);
}
return;
}
if (sendRtpItemFromRedis.getServerId().equals(userSetting.getServerId())) {
log.info("[级联点播] 等待的推流在本平台上线 {}/{}", sendRtpItem.getApp(), sendRtpItem.getStream());
int localPort = sendRtpPortManager.getNextPort(mediaServerItem);
if (localPort == 0) {
log.warn("上级点时创建sendRTPItem失败可能是服务器端口资源不足");
try {
responseAck(request, Response.BUSY_HERE);
} catch (SipException | InvalidArgumentException | ParseException e) {
log.error("未处理的异常 ", e);
}
return;
}
sendRtpItem.setLocalPort(localPort);
if (!ObjectUtils.isEmpty(platform.getSendStreamIp())) {
sendRtpItem.setLocalIp(platform.getSendStreamIp());
}
// 写入redis 超时时回复
sendRtpItem.setStatus(1);
SIPResponse response = sendStreamAck(request, sendRtpItem, platform);
if (response != null) {
sendRtpItem.setToTag(response.getToTag());
}
redisCatchStorage.updateSendRTPSever(sendRtpItem);
} else {
// 其他平台内容
otherWvpPushStream(sendRtpItemFromRedis, request, platform);
}
});
// 添加回复的拒绝或者错误的通知
// redis消息例如 PUBLISH VM_MSG_STREAM_PUSH_RESPONSE '{"code":1,"msg":"失败","app":"1","stream":"2"}'
redisPushStreamResponseListener.addEvent(sendRtpItem.getApp(), sendRtpItem.getStream(), response -> {
if (response.getCode() != 0) {
dynamicTask.stop(sendRtpItem.getCallId());
redisRpcService.stopWaitePushStreamOnline(sendRtpItem);
redisRpcService.removeCallback(key);
try {
responseAck(request, Response.TEMPORARILY_UNAVAILABLE, response.getMsg());
} catch (SipException | InvalidArgumentException | ParseException e) {
log.error("[命令发送失败] 国标级联 点播回复: {}", e.getMessage());
}
}
});
}
/**
* wvp
*/
private void otherWvpPushStream(SendRtpItem sendRtpItem, SIPRequest request, Platform platform) {
log.info("[级联点播] 来自其他wvp的推流 {}/{}", sendRtpItem.getApp(), sendRtpItem.getStream());
sendRtpItem = redisRpcService.getSendRtpItem(sendRtpItem.getRedisKey());
if (sendRtpItem == null) {
return;
}
// 写入redis 超时时回复
sendRtpItem.setStatus(1);
SIPResponse response = sendStreamAck(request, sendRtpItem, platform);
if (response != null) {
sendRtpItem.setToTag(response.getToTag());
}
redisCatchStorage.updateSendRTPSever(sendRtpItem);
}
public SIPResponse sendStreamAck(SIPRequest request, SendRtpItem sendRtpItem, Platform platform) {
String sdpIp = sendRtpItem.getLocalIp();
if (!ObjectUtils.isEmpty(platform.getSendStreamIp())) {
sdpIp = platform.getSendStreamIp();
}
StringBuffer content = new StringBuffer(200);
content.append("v=0\r\n");
content.append("o=" + sendRtpItem.getChannelId() + " 0 0 IN IP4 " + sdpIp + "\r\n");
content.append("s=Play\r\n");
content.append("c=IN IP4 " + sdpIp + "\r\n");
content.append("t=0 0\r\n");
// 非严格模式端口不统一, 增加兼容性修改为一个不为0的端口
int localPort = sendRtpItem.getLocalPort();
if (localPort == 0) {
localPort = new Random().nextInt(65535) + 1;
}
content.append("m=video " + localPort + " RTP/AVP 96\r\n");
content.append("a=sendonly\r\n");
content.append("a=rtpmap:96 PS/90000\r\n");
if (sendRtpItem.isTcp()) {
content.append("a=connection:new\r\n");
if (!sendRtpItem.isTcpActive()) {
content.append("a=setup:active\r\n");
} else {
content.append("a=setup:passive\r\n");
}
}
content.append("y=" + sendRtpItem.getSsrc() + "\r\n");
content.append("f=\r\n");
try {
return responseSdpAck(request, content.toString(), platform);
} catch (SipException | InvalidArgumentException | ParseException e) {
log.error("未处理的异常 ", e);
}
return null;
}
public void inviteFromDeviceHandle(SIPRequest request, String requesterId, String channelId) { public void inviteFromDeviceHandle(SIPRequest request, String requesterId, String channelId) {
String realChannelId = null; String realChannelId = null;
@ -1032,8 +767,17 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
} }
return; return;
} }
DeviceChannel deviceChannel = deviceChannelService.getOne(device.getDeviceId(), realChannelId);
AudioBroadcastCatch broadcastCatch = audioBroadcastManager.get(device.getDeviceId(), realChannelId); if (deviceChannel == null) {
log.warn("来自设备的Invite请求无法从请求信息中确定所属通道已忽略requesterId {}/{}", requesterId, realChannelId);
try {
responseAck(request, Response.FORBIDDEN);
} catch (SipException | InvalidArgumentException | ParseException e) {
log.error("[命令发送失败] 来自设备的Invite请求无法从请求信息中确定所属设备 FORBIDDEN: {}", e.getMessage());
}
return;
}
AudioBroadcastCatch broadcastCatch = audioBroadcastManager.get(deviceChannel.getId());
if (broadcastCatch == null) { if (broadcastCatch == null) {
log.warn("来自设备的Invite请求非语音广播已忽略requesterId {}/{}", requesterId, channelId); log.warn("来自设备的Invite请求非语音广播已忽略requesterId {}/{}", requesterId, channelId);
try { try {
@ -1054,7 +798,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
responseAck(request, Response.TRYING); responseAck(request, Response.TRYING);
} catch (SipException | InvalidArgumentException | ParseException e) { } catch (SipException | InvalidArgumentException | ParseException e) {
log.error("[命令发送失败] invite BAD_REQUEST: {}", e.getMessage()); log.error("[命令发送失败] invite BAD_REQUEST: {}", e.getMessage());
playService.stopAudioBroadcast(device.getDeviceId(), broadcastCatch.getChannelId()); playService.stopAudioBroadcast(device.getDeviceId(), deviceChannel.getDeviceId());
return; return;
} }
String contentString = new String(request.getRawContent()); String contentString = new String(request.getRawContent());
@ -1099,7 +843,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
responseAck(request, Response.UNSUPPORTED_MEDIA_TYPE); // 不支持的格式发415 responseAck(request, Response.UNSUPPORTED_MEDIA_TYPE); // 不支持的格式发415
} catch (SipException | InvalidArgumentException | ParseException e) { } catch (SipException | InvalidArgumentException | ParseException e) {
log.error("[命令发送失败] invite 不支持的媒体格式: {}", e.getMessage()); log.error("[命令发送失败] invite 不支持的媒体格式: {}", e.getMessage());
playService.stopAudioBroadcast(device.getDeviceId(), broadcastCatch.getChannelId()); playService.stopAudioBroadcast(device.getDeviceId(), deviceChannel.getDeviceId());
return; return;
} }
return; return;
@ -1115,7 +859,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
responseAck(request, Response.BUSY_HERE); responseAck(request, Response.BUSY_HERE);
} catch (SipException | InvalidArgumentException | ParseException e) { } catch (SipException | InvalidArgumentException | ParseException e) {
log.error("[命令发送失败] invite 未找到可用的zlm: {}", e.getMessage()); log.error("[命令发送失败] invite 未找到可用的zlm: {}", e.getMessage());
playService.stopAudioBroadcast(device.getDeviceId(), broadcastCatch.getChannelId()); playService.stopAudioBroadcast(device.getDeviceId(), deviceChannel.getDeviceId());
} }
return; return;
} }
@ -1124,7 +868,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
CallIdHeader callIdHeader = (CallIdHeader) request.getHeader(CallIdHeader.NAME); CallIdHeader callIdHeader = (CallIdHeader) request.getHeader(CallIdHeader.NAME);
SendRtpItem sendRtpItem = mediaServerService.createSendRtpItem(mediaServerItem, addressStr, port, gb28181Sdp.getSsrc(), requesterId, SendRtpItem sendRtpItem = mediaServerService.createSendRtpItem(mediaServerItem, addressStr, port, gb28181Sdp.getSsrc(), requesterId,
device.getDeviceId(), broadcastCatch.getChannelId(), device.getDeviceId(), deviceChannel.getId(),
mediaTransmissionTCP, false); mediaTransmissionTCP, false);
if (sendRtpItem == null) { if (sendRtpItem == null) {
@ -1133,7 +877,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
responseAck(request, Response.BUSY_HERE); responseAck(request, Response.BUSY_HERE);
} catch (SipException | InvalidArgumentException | ParseException e) { } catch (SipException | InvalidArgumentException | ParseException e) {
log.error("[命令发送失败] invite 服务器端口资源不足: {}", e.getMessage()); log.error("[命令发送失败] invite 服务器端口资源不足: {}", e.getMessage());
playService.stopAudioBroadcast(device.getDeviceId(), broadcastCatch.getChannelId()); playService.stopAudioBroadcast(device.getDeviceId(), deviceChannel.getDeviceId());
return; return;
} }
return; return;
@ -1168,11 +912,11 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
log.error("[命令发送失败] 语音通话 回复410失败 {}", e.getMessage()); log.error("[命令发送失败] 语音通话 回复410失败 {}", e.getMessage());
return; return;
} }
playService.stopAudioBroadcast(device.getDeviceId(), broadcastCatch.getChannelId()); playService.stopAudioBroadcast(device.getDeviceId(), deviceChannel.getDeviceId());
} }
} catch (SdpException e) { } catch (SdpException e) {
log.error("[SDP解析异常]", e); log.error("[SDP解析异常]", e);
playService.stopAudioBroadcast(device.getDeviceId(), broadcastCatch.getChannelId()); playService.stopAudioBroadcast(device.getDeviceId(), deviceChannel.getDeviceId());
} }
} else { } else {
log.warn("来自无效设备/平台的请求"); log.warn("来自无效设备/平台的请求");
@ -1224,10 +968,10 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
sipResponse = responseSdpAck(request, content.toString(), parentPlatform); sipResponse = responseSdpAck(request, content.toString(), parentPlatform);
AudioBroadcastCatch audioBroadcastCatch = audioBroadcastManager.get(device.getDeviceId(), sendRtpItem.getChannelId()); AudioBroadcastCatch audioBroadcastCatch = audioBroadcastManager.get(sendRtpItem.getChannelId());
audioBroadcastCatch.setStatus(AudioBroadcastCatchStatus.Ok); audioBroadcastCatch.setStatus(AudioBroadcastCatchStatus.Ok);
audioBroadcastCatch.setSipTransactionInfoByRequset(sipResponse); audioBroadcastCatch.setSipTransactionInfoByRequest(sipResponse);
audioBroadcastManager.update(audioBroadcastCatch); audioBroadcastManager.update(audioBroadcastCatch);
streamSession.put(device.getDeviceId(), sendRtpItem.getChannelId(), request.getCallIdHeader().getCallId(), sendRtpItem.getStream(), sendRtpItem.getSsrc(), sendRtpItem.getMediaServerId(), sipResponse, InviteSessionType.BROADCAST); streamSession.put(device.getDeviceId(), sendRtpItem.getChannelId(), request.getCallIdHeader().getCallId(), sendRtpItem.getStream(), sendRtpItem.getSsrc(), sendRtpItem.getMediaServerId(), sipResponse, InviteSessionType.BROADCAST);
// 开启发流大华在收到200OK后就会开始建立连接 // 开启发流大华在收到200OK后就会开始建立连接

View File

@ -127,7 +127,7 @@ public class BroadcastNotifyMessageHandler extends SIPRequestProcessorParent imp
try { try {
platformService.broadcastInvite(platform, channel, mediaServerForMinimumLoad, (hookData)->{ platformService.broadcastInvite(platform, channel, mediaServerForMinimumLoad, (hookData)->{
// 上级平台推流成功 // 上级平台推流成功
AudioBroadcastCatch broadcastCatch = audioBroadcastManager.get(device.getDeviceId(), targetId); AudioBroadcastCatch broadcastCatch = audioBroadcastManager.getByDeviceId(device.getDeviceId(), targetId);
if (broadcastCatch != null ) { if (broadcastCatch != null ) {
if (playService.audioBroadcastInUse(device, targetId)) { if (playService.audioBroadcastInUse(device, targetId)) {
log.info("[国标级联] 语音喊话 设备正在使用中 platform {} channel: {}", log.info("[国标级联] 语音喊话 设备正在使用中 platform {} channel: {}",

View File

@ -70,7 +70,7 @@ public class BroadcastResponseMessageHandler extends SIPRequestProcessorParent i
// 回复200 OK // 回复200 OK
responseAck(request, Response.OK); responseAck(request, Response.OK);
if (result.equalsIgnoreCase("OK")) { if (result.equalsIgnoreCase("OK")) {
AudioBroadcastCatch audioBroadcastCatch = audioBroadcastManager.get(device.getDeviceId(), channelId); AudioBroadcastCatch audioBroadcastCatch = audioBroadcastManager.getByDeviceId(device.getDeviceId(), channelId);
audioBroadcastCatch.setStatus(AudioBroadcastCatchStatus.WaiteInvite); audioBroadcastCatch.setStatus(AudioBroadcastCatchStatus.WaiteInvite);
audioBroadcastManager.update(audioBroadcastCatch); audioBroadcastManager.update(audioBroadcastCatch);
}else { }else {

View File

@ -146,10 +146,10 @@ public interface IMediaServerService {
void startSendRtp(MediaServer mediaServer, SendRtpItem sendRtpItem); void startSendRtp(MediaServer mediaServer, SendRtpItem sendRtpItem);
SendRtpItem createSendRtpItem(MediaServer mediaServerItem, String addressStr, int port, String ssrc, String requesterId, String deviceId, String channelId, boolean mediaTransmissionTCP, boolean rtcp); SendRtpItem createSendRtpItem(MediaServer mediaServerItem, String addressStr, int port, String ssrc, String requesterId, String deviceId, Integer channelId, boolean mediaTransmissionTCP, boolean rtcp);
SendRtpItem createSendRtpItem(MediaServer serverItem, String ip, int port, String ssrc, String platformId, SendRtpItem createSendRtpItem(MediaServer serverItem, String ip, int port, String ssrc, String platformId,
String app, String stream, String channelId, boolean tcp, boolean rtcp); String app, String stream, Integer channelId, boolean tcp, boolean rtcp);
MediaServer getMediaServerByAppAndStream(String app, String stream); MediaServer getMediaServerByAppAndStream(String app, String stream);

View File

@ -852,7 +852,7 @@ public class MediaServerServiceImpl implements IMediaServerService {
} }
@Override @Override
public SendRtpItem createSendRtpItem(MediaServer mediaServer, String ip, int port, String ssrc, String requesterId, String deviceId, String channelId, boolean isTcp, boolean rtcp) { public SendRtpItem createSendRtpItem(MediaServer mediaServer, String ip, int port, String ssrc, String requesterId, String deviceId, Integer channelId, boolean isTcp, boolean rtcp) {
int localPort = sendRtpPortManager.getNextPort(mediaServer); int localPort = sendRtpPortManager.getNextPort(mediaServer);
if (localPort == 0) { if (localPort == 0) {
return null; return null;
@ -861,7 +861,6 @@ public class MediaServerServiceImpl implements IMediaServerService {
sendRtpItem.setIp(ip); sendRtpItem.setIp(ip);
sendRtpItem.setPort(port); sendRtpItem.setPort(port);
sendRtpItem.setSsrc(ssrc); sendRtpItem.setSsrc(ssrc);
sendRtpItem.setPlatformId(deviceId);
sendRtpItem.setDeviceId(deviceId); sendRtpItem.setDeviceId(deviceId);
sendRtpItem.setChannelId(channelId); sendRtpItem.setChannelId(channelId);
sendRtpItem.setTcp(isTcp); sendRtpItem.setTcp(isTcp);
@ -875,7 +874,7 @@ public class MediaServerServiceImpl implements IMediaServerService {
@Override @Override
public SendRtpItem createSendRtpItem(MediaServer serverItem, String ip, int port, String ssrc, String platformId, public SendRtpItem createSendRtpItem(MediaServer serverItem, String ip, int port, String ssrc, String platformId,
String app, String stream, String channelId, boolean tcp, boolean rtcp){ String app, String stream, Integer channelId, boolean tcp, boolean rtcp){
int localPort = sendRtpPortManager.getNextPort(serverItem); int localPort = sendRtpPortManager.getNextPort(serverItem);
if (localPort <= 0) { if (localPort <= 0) {

View File

@ -0,0 +1,4 @@
package com.genersoft.iot.vmp.service;
public interface IReceiveRtpServerService {
}

View File

@ -0,0 +1,19 @@
package com.genersoft.iot.vmp.service.bean;
import com.genersoft.iot.vmp.media.bean.MediaServer;
import lombok.Data;
@Data
public class RTPServerParam {
MediaServer mediaServerItem;
String streamId;
String presetSsrc;
boolean ssrcCheck;
boolean isPlayback;
Integer port;
Boolean onlyAuto;
Boolean disableAudio;
Boolean reUsePort;
Integer tcpMode;
}

View File

@ -0,0 +1,62 @@
package com.genersoft.iot.vmp.service.impl;
import com.genersoft.iot.vmp.common.StreamInfo;
import com.genersoft.iot.vmp.conf.DynamicTask;
import com.genersoft.iot.vmp.conf.exception.SsrcTransactionNotFoundException;
import com.genersoft.iot.vmp.gb28181.bean.AudioBroadcastCatch;
import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.bean.InviteStreamType;
import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
import com.genersoft.iot.vmp.media.event.media.MediaArrivalEvent;
import com.genersoft.iot.vmp.media.event.media.MediaDepartureEvent;
import com.genersoft.iot.vmp.media.service.IMediaServerService;
import com.genersoft.iot.vmp.service.IReceiveRtpServerService;
import com.genersoft.iot.vmp.service.bean.ErrorCallback;
import com.genersoft.iot.vmp.service.bean.RTPServerParam;
import com.genersoft.iot.vmp.service.bean.SSRCInfo;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.event.EventListener;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import javax.sip.InvalidArgumentException;
import javax.sip.SipException;
import java.text.ParseException;
import java.util.List;
@Service
public class RtpServerServiceImpl implements IReceiveRtpServerService {
@Autowired
private IMediaServerService mediaServerService;
@Autowired
private DynamicTask dynamicTask;
/**
*
*/
@Async("taskExecutor")
@org.springframework.context.event.EventListener
public void onApplicationEvent(MediaArrivalEvent event) {
}
/**
*
*/
@Async("taskExecutor")
@EventListener
public void onApplicationEvent(MediaDepartureEvent event) {
}
@Override
public SSRCInfo openRTPServer(RTPServerParam rtpServerParam, ErrorCallback<StreamInfo> callback) {
// 开启流到来的监听
// 设置流超时的定时任务
// 调用节点,开启端口监听
}
}

View File

@ -8,7 +8,6 @@ import com.genersoft.iot.vmp.conf.redis.RedisRpcConfig;
import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcMessage; import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcMessage;
import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcRequest; import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcRequest;
import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcResponse; import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcResponse;
import com.genersoft.iot.vmp.gb28181.bean.Platform;
import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem; import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
import com.genersoft.iot.vmp.gb28181.service.IPlatformService; import com.genersoft.iot.vmp.gb28181.service.IPlatformService;
import com.genersoft.iot.vmp.gb28181.session.SSRCFactory; import com.genersoft.iot.vmp.gb28181.session.SSRCFactory;
@ -28,10 +27,6 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import javax.sip.InvalidArgumentException;
import javax.sip.SipException;
import java.text.ParseException;
/** /**
* wvprpc RedisRpcConfig * wvprpc RedisRpcConfig
*/ */
@ -303,35 +298,6 @@ public class RedisRpcController {
return response; return response;
} }
/**
* wvp
*/
public RedisRpcResponse rtpSendStopped(RedisRpcRequest request) {
String sendRtpItemKey = request.getParam().toString();
SendRtpItem sendRtpItem = (SendRtpItem) redisTemplate.opsForValue().get(sendRtpItemKey);
RedisRpcResponse response = request.getResponse();
response.setStatusCode(200);
if (sendRtpItem == null) {
log.info("[redis-rpc] 推流已经停止, 未找到redis中的发流信息 key{}", sendRtpItemKey);
return response;
}
log.info("[redis-rpc] 推流已经停止: {}/{}, 目标地址: {}{}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort() );
String platformId = sendRtpItem.getPlatformId();
Platform platform = platformService.queryPlatformByServerGBId(platformId);
if (platform == null) {
return response;
}
try {
commanderFroPlatform.streamByeCmd(platform, sendRtpItem);
redisCatchStorage.deleteSendRTPServer(platformId, sendRtpItem.getChannelId(),
sendRtpItem.getCallId(), sendRtpItem.getStream());
redisCatchStorage.sendPlatformStopPlayMsg(sendRtpItem, platform);
} catch (SipException | InvalidArgumentException | ParseException e) {
log.error("[命令发送失败] 发送BYE: {}", e.getMessage());
}
return response;
}
private void sendResponse(RedisRpcResponse response){ private void sendResponse(RedisRpcResponse response){
log.info("[redis-rpc] >> {}", response); log.info("[redis-rpc] >> {}", response);
response.setToId(userSetting.getServerId()); response.setToId(userSetting.getServerId());