优化子码流切换逻辑,去除主子多开逻辑。优化单端口模式ssrc变化导致的无法移出点播缓存 #895

pull/894/head
648540858 2023-06-22 22:35:35 +08:00
parent f4e5e03962
commit 155838ce2e
20 changed files with 165 additions and 624 deletions

View File

@ -29,7 +29,7 @@ public class InviteInfo {
private StreamInfo streamInfo; private StreamInfo streamInfo;
public static InviteInfo getinviteInfo(String deviceId, String channelId, String stream, SSRCInfo ssrcInfo, public static InviteInfo getInviteInfo(String deviceId, String channelId, String stream, SSRCInfo ssrcInfo,
String receiveIp, Integer receivePort, String streamMode, String receiveIp, Integer receivePort, String streamMode,
InviteSessionType type, InviteSessionStatus status) { InviteSessionType type, InviteSessionStatus status) {
InviteInfo inviteInfo = new InviteInfo(); InviteInfo inviteInfo = new InviteInfo();
@ -138,25 +138,6 @@ public class InviteInfo {
this.subStream = subStream; this.subStream = subStream;
} }
public static InviteInfo getInviteInfo(String deviceId, String channelId,Boolean isSubStream, String stream, SSRCInfo ssrcInfo,
String receiveIp, Integer receivePort, String streamMode,
InviteSessionType type, InviteSessionStatus status) {
InviteInfo inviteInfo = new InviteInfo();
inviteInfo.setDeviceId(deviceId);
inviteInfo.setChannelId(channelId);
inviteInfo.setStream(stream);
inviteInfo.setSsrcInfo(ssrcInfo);
inviteInfo.setReceiveIp(receiveIp);
inviteInfo.setReceivePort(receivePort);
inviteInfo.setStreamMode(streamMode);
inviteInfo.setType(type);
inviteInfo.setStatus(status);
if(isSubStream != null){
inviteInfo.setSubStream(isSubStream);
}
return inviteInfo;
}
/*=========================设备主子码流逻辑END====================*/

View File

@ -542,17 +542,5 @@ public class StreamInfo implements Serializable, Cloneable{
this.subStream = subStream; this.subStream = subStream;
} }
public static String getPlayStream(String deviceId,String channelId,boolean isSubStream){
String streamId;
if(isSubStream){
streamId = String.format("%s_%s_%s","sub",deviceId, channelId);
}else {
streamId = String.format("%s_%s_%s","main", deviceId, channelId);
}
return streamId;
}
/*=========================设备主子码流逻辑END====================*/
} }

View File

@ -45,7 +45,6 @@ public class DynamicTask {
* @return * @return
*/ */
public void startCron(String key, Runnable task, int cycleForCatalog) { public void startCron(String key, Runnable task, int cycleForCatalog) {
System.out.println(cycleForCatalog);
ScheduledFuture<?> future = futureMap.get(key); ScheduledFuture<?> future = futureMap.get(key);
if (future != null) { if (future != null) {
if (future.isCancelled()) { if (future.isCancelled()) {

View File

@ -156,29 +156,5 @@ public class DeferredResultHolder {
} }
} }
/*============================设备主子码流逻辑START========================*/
public static String getPlayKey(String deviceId,String channelId,boolean deviceSwitchSubStream,boolean isSubStream){
String key = null;
if(deviceSwitchSubStream){
key = CALLBACK_CMD_PLAY + isSubStream + deviceId + channelId;
}else {
key = CALLBACK_CMD_PLAY +deviceId + channelId;
}
return key;
}
public static String getSnapKey(String deviceId,String channelId,boolean deviceSwitchSubStream,boolean isSubStream){
String key = null;
if(deviceSwitchSubStream){
key = CALLBACK_CMD_SNAP + isSubStream + deviceId + channelId;
}else {
key = CALLBACK_CMD_SNAP +deviceId + channelId;
}
return key;
}
/*============================设备主子码流逻辑END========================*/
} }

View File

@ -98,7 +98,7 @@ public interface ISIPCommander {
* @param device * @param device
* @param channelId * @param channelId
*/ */
void playStreamCmd(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, Device device, String channelId,boolean isSubStream, ZlmHttpHookSubscribe.Event event, SipSubscribe.Event okEvent, SipSubscribe.Event errorEvent) throws InvalidArgumentException, SipException, ParseException; void playStreamCmd(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, Device device, String channelId, ZlmHttpHookSubscribe.Event event, SipSubscribe.Event okEvent, SipSubscribe.Event errorEvent) throws InvalidArgumentException, SipException, ParseException;
/** /**
* *

View File

@ -268,7 +268,7 @@ public class SIPCommander implements ISIPCommander {
* @param errorEvent sip * @param errorEvent sip
*/ */
@Override @Override
public void playStreamCmd(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, Device device, String channelId,boolean isSubStream, public void playStreamCmd(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, Device device, String channelId,
ZlmHttpHookSubscribe.Event event, SipSubscribe.Event okEvent, SipSubscribe.Event errorEvent) throws InvalidArgumentException, SipException, ParseException { ZlmHttpHookSubscribe.Event event, SipSubscribe.Event okEvent, SipSubscribe.Event errorEvent) throws InvalidArgumentException, SipException, ParseException {
String stream = ssrcInfo.getStream(); String stream = ssrcInfo.getStream();
@ -345,13 +345,13 @@ public class SIPCommander implements ISIPCommander {
if( device.isSwitchPrimarySubStream() ){ if( device.isSwitchPrimarySubStream() ){
if("TP-LINK".equals(device.getManufacturer())){ if("TP-LINK".equals(device.getManufacturer())){
if (isSubStream){ if (device.isSwitchPrimarySubStream()){
content.append("a=streamMode:sub\r\n"); content.append("a=streamMode:sub\r\n");
}else { }else {
content.append("a=streamMode:main\r\n"); content.append("a=streamMode:main\r\n");
} }
}else { }else {
if (isSubStream){ if (device.isSwitchPrimarySubStream()){
content.append("a=streamprofile:1\r\n"); content.append("a=streamprofile:1\r\n");
}else { }else {
content.append("a=streamprofile:0\r\n"); content.append("a=streamprofile:0\r\n");
@ -374,11 +374,8 @@ public class SIPCommander implements ISIPCommander {
// 这里为例避免一个通道的点播只有一个callID这个参数使用一个固定值 // 这里为例避免一个通道的点播只有一个callID这个参数使用一个固定值
ResponseEvent responseEvent = (ResponseEvent) e.event; ResponseEvent responseEvent = (ResponseEvent) e.event;
SIPResponse response = (SIPResponse) responseEvent.getResponse(); SIPResponse response = (SIPResponse) responseEvent.getResponse();
if(device.isSwitchPrimarySubStream()){ streamSession.put(device.getDeviceId(), channelId, "play", stream, ssrcInfo.getSsrc(), mediaServerItem.getId(), response,
streamSession.put(device.getDeviceId(), channelId, "switch-play", stream, ssrcInfo.getSsrc(), mediaServerItem.getId(), response, InviteSessionType.PLAY); InviteSessionType.PLAY);
}else {
streamSession.put(device.getDeviceId(), channelId, "play", stream, ssrcInfo.getSsrc(), mediaServerItem.getId(), response, InviteSessionType.PLAY);
}
okEvent.response(e); okEvent.response(e);
}); });
} }

View File

@ -142,13 +142,8 @@ public class ByeRequestProcessor extends SIPRequestProcessorParent implements In
// 可能是设备主动停止 // 可能是设备主动停止
Device device = storager.queryVideoDeviceByChannelId(platformGbId); Device device = storager.queryVideoDeviceByChannelId(platformGbId);
if (device != null) { if (device != null) {
SsrcTransaction ssrcTransactionForPlay = null; storager.stopPlay(device.getDeviceId(), channelId);
if (device.isSwitchPrimarySubStream() ) { SsrcTransaction ssrcTransactionForPlay = streamSession.getSsrcTransaction(device.getDeviceId(), channelId, "play", null);
ssrcTransactionForPlay = streamSession.getSsrcTransaction(device.getDeviceId(), channelId, "switch-play", null);
} else {
storager.stopPlay(device.getDeviceId(), channelId);
ssrcTransactionForPlay = streamSession.getSsrcTransaction(device.getDeviceId(), channelId, "play", null);
}
if (ssrcTransactionForPlay != null){ if (ssrcTransactionForPlay != null){
if (ssrcTransactionForPlay.getCallId().equals(callIdHeader.getCallId())){ if (ssrcTransactionForPlay.getCallId().equals(callIdHeader.getCallId())){
// 释放ssrc // 释放ssrc
@ -158,16 +153,8 @@ public class ByeRequestProcessor extends SIPRequestProcessorParent implements In
} }
streamSession.remove(device.getDeviceId(), channelId, ssrcTransactionForPlay.getStream()); streamSession.remove(device.getDeviceId(), channelId, ssrcTransactionForPlay.getStream());
} }
InviteInfo inviteInfo = null; InviteInfo inviteInfo = inviteStreamService.getInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, device.getDeviceId(), channelId);
if (device.isSwitchPrimarySubStream() ) { inviteStreamService.removeInviteInfo(inviteInfo);
String streamType = ssrcTransactionForPlay.getStream().split("_")[0];
boolean isSubStream = "sub".equals(streamType);
inviteInfo = inviteStreamService.getInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, device.getDeviceId(), channelId,isSubStream);
inviteStreamService.removeInviteInfo(inviteInfo.getType(),inviteInfo.getDeviceId(),inviteInfo.getChannelId(),isSubStream,inviteInfo.getStream());
}else {
inviteInfo = inviteStreamService.getInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, device.getDeviceId(), channelId);
inviteStreamService.removeInviteInfo(inviteInfo);
}
if (inviteInfo != null) { if (inviteInfo != null) {
if (inviteInfo.getStreamInfo() != null) { if (inviteInfo.getStreamInfo() != null) {
mediaServerService.closeRTPServer(inviteInfo.getStreamInfo().getMediaServerId(), inviteInfo.getStream()); mediaServerService.closeRTPServer(inviteInfo.getStreamInfo().getMediaServerId(), inviteInfo.getStream());

View File

@ -490,7 +490,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
} }
sendRtpItem.setStreamId(streamId); sendRtpItem.setStreamId(streamId);
redisCatchStorage.updateSendRTPSever(sendRtpItem); redisCatchStorage.updateSendRTPSever(sendRtpItem);
playService.play(mediaServerItem, device.getDeviceId(), channelId,false, ((code, msg, data) -> { playService.play(mediaServerItem, device.getDeviceId(), channelId, ((code, msg, data) -> {
if (code == InviteErrorCode.SUCCESS.getCode()){ if (code == InviteErrorCode.SUCCESS.getCode()){
hookEvent.run(code, msg, data); hookEvent.run(code, msg, data);
}else if (code == InviteErrorCode.ERROR_FOR_SIGNALLING_TIMEOUT.getCode() || code == InviteErrorCode.ERROR_FOR_STREAM_TIMEOUT.getCode()){ }else if (code == InviteErrorCode.ERROR_FOR_SIGNALLING_TIMEOUT.getCode() || code == InviteErrorCode.ERROR_FOR_STREAM_TIMEOUT.getCode()){

View File

@ -252,7 +252,6 @@ public class NotifyRequestForCatalogProcessor extends SIPRequestProcessorParent
} }
private void executeSave(){ private void executeSave(){
System.out.println("定时存储数据");
executeSaveForUpdate(); executeSaveForUpdate();
executeSaveForDelete(); executeSaveForDelete();
executeSaveForOnline(); executeSaveForOnline();

View File

@ -250,7 +250,6 @@ public class SipUtils {
if (ObjectUtils.isEmpty(timeStr)){ if (ObjectUtils.isEmpty(timeStr)){
return null; return null;
} }
System.out.println(timeStr);
LocalDateTime localDateTime; LocalDateTime localDateTime;
try { try {
localDateTime = LocalDateTime.parse(timeStr); localDateTime = LocalDateTime.parse(timeStr);

View File

@ -343,19 +343,10 @@ public class ZLMHttpHookListener {
} }
if ("rtp".equals(param.getApp()) && !param.isRegist()) { if ("rtp".equals(param.getApp()) && !param.isRegist()) {
if(param.getStream().split("_").length == 3){ InviteInfo inviteInfo = inviteStreamService.getInviteInfoByStream(null, param.getStream());
boolean isSubStream = "sub".equals(param.getStream().split("_")[0]); if (inviteInfo != null && (inviteInfo.getType() == InviteSessionType.PLAY || inviteInfo.getType() == InviteSessionType.PLAYBACK)) {
InviteInfo inviteInfo = inviteStreamService.getInviteInfoByStream(null, param.getStream(), isSubStream); inviteStreamService.removeInviteInfo(inviteInfo);
if(inviteInfo != null && (inviteInfo.getType() == InviteSessionType.PLAY )){ storager.stopPlay(inviteInfo.getDeviceId(), inviteInfo.getChannelId());
inviteStreamService.removeInviteInfo(inviteInfo.getType(),inviteInfo.getDeviceId(),
inviteInfo.getChannelId(),inviteInfo.isSubStream(),inviteInfo.getStream());
}
}else {
InviteInfo inviteInfo = inviteStreamService.getInviteInfoByStream(null, param.getStream());
if (inviteInfo != null && (inviteInfo.getType() == InviteSessionType.PLAY || inviteInfo.getType() == InviteSessionType.PLAYBACK)) {
inviteStreamService.removeInviteInfo(inviteInfo);
storager.stopPlay(inviteInfo.getDeviceId(), inviteInfo.getChannelId());
}
} }
} else { } else {
if (!"rtp".equals(param.getApp())) { if (!"rtp".equals(param.getApp())) {
@ -483,15 +474,8 @@ public class ZLMHttpHookListener {
Device device = deviceService.getDevice(inviteInfo.getDeviceId()); Device device = deviceService.getDevice(inviteInfo.getDeviceId());
if (device != null) { if (device != null) {
try { try {
InviteInfo info = null; InviteInfo info = inviteStreamService.getInviteInfo(inviteInfo.getType(),
if(device.isSwitchPrimarySubStream()){ inviteInfo.getDeviceId(), inviteInfo.getChannelId(), inviteInfo.getStream());
boolean isSubStream = "sub".equals(param.getStream().split("_")[0]);
info = inviteStreamService.getInviteInfo(inviteInfo.getType(), inviteInfo.getDeviceId(), inviteInfo.getChannelId(),isSubStream, inviteInfo.getStream());
}else {
info = inviteStreamService.getInviteInfo(inviteInfo.getType(), inviteInfo.getDeviceId(), inviteInfo.getChannelId(), inviteInfo.getStream());
}
if (info != null) { if (info != null) {
cmder.streamByeCmd(device, inviteInfo.getChannelId(), cmder.streamByeCmd(device, inviteInfo.getChannelId(),
inviteInfo.getStream(), null); inviteInfo.getStream(), null);
@ -502,15 +486,9 @@ public class ZLMHttpHookListener {
} }
} }
if(device.isSwitchPrimarySubStream()){ inviteStreamService.removeInviteInfo(inviteInfo.getType(), inviteInfo.getDeviceId(),
boolean isSubStream = "sub".equals(param.getStream().split("_")[0]); inviteInfo.getChannelId(), inviteInfo.getStream());
inviteStreamService.removeInviteInfo(inviteInfo.getType(), inviteInfo.getDeviceId(), storager.stopPlay(inviteInfo.getDeviceId(), inviteInfo.getChannelId());
inviteInfo.getChannelId(),isSubStream, inviteInfo.getStream());
}else {
inviteStreamService.removeInviteInfo(inviteInfo.getType(), inviteInfo.getDeviceId(),
inviteInfo.getChannelId(), inviteInfo.getStream());
storager.stopPlay(inviteInfo.getDeviceId(), inviteInfo.getChannelId());
}
return ret; return ret;
} }
} else { } else {
@ -563,26 +541,12 @@ public class ZLMHttpHookListener {
if ("rtp".equals(param.getApp())) { if ("rtp".equals(param.getApp())) {
String[] s = param.getStream().split("_"); String[] s = param.getStream().split("_");
if (!mediaInfo.isRtpEnable() ) { if (!mediaInfo.isRtpEnable() || s.length != 2) {
defaultResult.setResult(HookResult.SUCCESS());
return defaultResult;
}else if(s.length != 2 && s.length != 3 ){
defaultResult.setResult(HookResult.SUCCESS()); defaultResult.setResult(HookResult.SUCCESS());
return defaultResult; return defaultResult;
} }
String deviceId = null; String deviceId = s[0];
String channelId = null; String channelId = s[1];
boolean isSubStream = false;
if (s[0].length() < 20) {
if ("sub".equals(s[0])) {
isSubStream = true;
}
deviceId = s[1];
channelId = s[2];
} else {
deviceId = s[0];
channelId = s[1];
}
Device device = redisCatchStorage.getDevice(deviceId); Device device = redisCatchStorage.getDevice(deviceId);
if (device == null) { if (device == null) {
defaultResult.setResult(new HookResult(ErrorCode.ERROR404.getCode(), ErrorCode.ERROR404.getMsg())); defaultResult.setResult(new HookResult(ErrorCode.ERROR404.getCode(), ErrorCode.ERROR404.getMsg()));
@ -596,7 +560,7 @@ public class ZLMHttpHookListener {
logger.info("[ZLM HOOK] 流未找到, 发起自动点播:{}->{}->{}/{}", param.getMediaServerId(), param.getSchema(), param.getApp(), param.getStream()); logger.info("[ZLM HOOK] 流未找到, 发起自动点播:{}->{}->{}/{}", param.getMediaServerId(), param.getSchema(), param.getApp(), param.getStream());
RequestMessage msg = new RequestMessage(); RequestMessage msg = new RequestMessage();
String key = DeferredResultHolder.getPlayKey(deviceId, channelId, device.isSwitchPrimarySubStream(), isSubStream); String key = DeferredResultHolder.CALLBACK_CMD_PLAY + deviceId + channelId;
boolean exist = resultHolder.exist(key, null); boolean exist = resultHolder.exist(key, null);
msg.setKey(key); msg.setKey(key);
String uuid = UUID.randomUUID().toString(); String uuid = UUID.randomUUID().toString();
@ -614,7 +578,7 @@ public class ZLMHttpHookListener {
resultHolder.put(key, uuid, result); resultHolder.put(key, uuid, result);
if (!exist) { if (!exist) {
playService.play(mediaInfo, deviceId, channelId,isSubStream, (code, message, data) -> { playService.play(mediaInfo, deviceId, channelId, (code, message, data) -> {
msg.setData(new HookResult(code, message)); msg.setData(new HookResult(code, message));
resultHolder.invokeResult(msg); resultHolder.invokeResult(msg);
}); });

View File

@ -270,11 +270,6 @@ public class ZLMRESTfulUtils {
} }
public JSONObject openRtpServer(MediaServerItem mediaServerItem, Map<String, Object> param){ public JSONObject openRtpServer(MediaServerItem mediaServerItem, Map<String, Object> param){
System.out.println("==============openRtpServer=================");
for (String s : param.keySet()) {
System.out.println(s + "-->" + param.get(s));
}
System.out.println("===============================");
return sendPost(mediaServerItem, "openRtpServer",param, null); return sendPost(mediaServerItem, "openRtpServer",param, null);
} }

View File

@ -227,7 +227,6 @@ public class ZLMRTPServerFactory {
int localPort = 0; int localPort = 0;
if (userSetting.getGbSendStreamStrict()) { if (userSetting.getGbSendStreamStrict()) {
if (userSetting.getGbSendStreamStrict()) { if (userSetting.getGbSendStreamStrict()) {
System.out.println("createSendRtpItem1");
localPort = keepPort(serverItem, ssrc, localPort); localPort = keepPort(serverItem, ssrc, localPort);
if (localPort == 0) { if (localPort == 0) {
return null; return null;
@ -264,7 +263,6 @@ public class ZLMRTPServerFactory {
// 默认为随机端口 // 默认为随机端口
int localPort = 0; int localPort = 0;
if (userSetting.getGbSendStreamStrict()) { if (userSetting.getGbSendStreamStrict()) {
System.out.println("createSendRtpItem2");
localPort = keepPort(serverItem, ssrc, localPort); localPort = keepPort(serverItem, ssrc, localPort);
if (localPort == 0) { if (localPort == 0) {
return null; return null;
@ -294,9 +292,7 @@ public class ZLMRTPServerFactory {
param.put("port", localPort); param.put("port", localPort);
param.put("enable_tcp", 1); param.put("enable_tcp", 1);
param.put("stream_id", ssrc); param.put("stream_id", ssrc);
System.out.println("用于收流");
JSONObject jsonObject = zlmresTfulUtils.openRtpServer(serverItem, param); JSONObject jsonObject = zlmresTfulUtils.openRtpServer(serverItem, param);
System.out.println(jsonObject);
if (jsonObject.getInteger("code") == 0) { if (jsonObject.getInteger("code") == 0) {
localPort = jsonObject.getInteger("port"); localPort = jsonObject.getInteger("port");
HookSubscribeForRtpServerTimeout hookSubscribeForRtpServerTimeout = HookSubscribeFactory.on_rtp_server_timeout(ssrc, null, serverItem.getId()); HookSubscribeForRtpServerTimeout hookSubscribeForRtpServerTimeout = HookSubscribeFactory.on_rtp_server_timeout(ssrc, null, serverItem.getId());

View File

@ -4,8 +4,6 @@ import com.genersoft.iot.vmp.common.InviteInfo;
import com.genersoft.iot.vmp.common.InviteSessionType; import com.genersoft.iot.vmp.common.InviteSessionType;
import com.genersoft.iot.vmp.service.bean.ErrorCallback; import com.genersoft.iot.vmp.service.bean.ErrorCallback;
import java.util.List;
/** /**
* *
*/ */
@ -16,6 +14,8 @@ public interface IInviteStreamService {
*/ */
void updateInviteInfo(InviteInfo inviteInfo); void updateInviteInfo(InviteInfo inviteInfo);
InviteInfo updateInviteInfoForStream(InviteInfo inviteInfo, String stream);
/** /**
* *
*/ */
@ -74,48 +74,5 @@ public interface IInviteStreamService {
int getStreamInfoCount(String mediaServerId); int getStreamInfoCount(String mediaServerId);
/*======================设备主子码流逻辑START=========================*/
/**
*
*/
InviteInfo getInviteInfoByDeviceAndChannel(InviteSessionType type,
String deviceId,
String channelId,boolean isSubStream);
void removeInviteInfoByDeviceAndChannel(InviteSessionType inviteSessionType, String deviceId, String channelId,boolean isSubStream);
InviteInfo getInviteInfo(InviteSessionType type,
String deviceId,
String channelId,
boolean isSubStream,
String stream);
void removeInviteInfo(InviteSessionType type,
String deviceId,
String channelId,
boolean isSubStream,
String stream);
void once(InviteSessionType type, String deviceId, String channelId,boolean isSubStream, String stream, ErrorCallback<Object> callback);
void call(InviteSessionType type, String deviceId, String channelId,boolean isSubStream, String stream, int code, String msg, Object data);
void updateInviteInfoSub(InviteInfo inviteInfo);
/**
*
*/
InviteInfo getInviteInfoByStream(InviteSessionType type, String stream,boolean isSubStream);
/**
*
*/
List<Object> getInviteInfos(InviteSessionType type,
String deviceId,
String channelId,
String stream);
/*======================设备主子码流逻辑END=========================*/
} }

View File

@ -16,9 +16,9 @@ import java.text.ParseException;
*/ */
public interface IPlayService { public interface IPlayService {
void play(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, Device device, String channelId,boolean isSubStream, void play(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, Device device, String channelId,
ErrorCallback<Object> callback); ErrorCallback<Object> callback);
SSRCInfo play(MediaServerItem mediaServerItem, String deviceId, String channelId,boolean isSubStream, ErrorCallback<Object> callback); SSRCInfo play(MediaServerItem mediaServerItem, String deviceId, String channelId, ErrorCallback<Object> callback);
MediaServerItem getNewMediaServerItem(Device device); MediaServerItem getNewMediaServerItem(Device device);
@ -43,5 +43,5 @@ public interface IPlayService {
void resumeRtp(String streamId) throws ServiceException, InvalidArgumentException, ParseException, SipException; void resumeRtp(String streamId) throws ServiceException, InvalidArgumentException, ParseException, SipException;
void getSnap(String deviceId, String channelId, String fileName,boolean isSubStream, ErrorCallback errorCallback); void getSnap(String deviceId, String channelId, String fileName, ErrorCallback errorCallback);
} }

View File

@ -84,6 +84,24 @@ public class InviteStreamServiceImpl implements IInviteStreamService {
redisTemplate.opsForValue().set(key, inviteInfoForUpdate); redisTemplate.opsForValue().set(key, inviteInfoForUpdate);
} }
@Override
public InviteInfo updateInviteInfoForStream(InviteInfo inviteInfo, String stream) {
InviteInfo inviteInfoInDb = getInviteInfo(inviteInfo.getType(), inviteInfo.getDeviceId(), inviteInfo.getChannelId(), inviteInfo.getStream());
if (inviteInfoInDb == null) {
return null;
}
removeInviteInfo(inviteInfoInDb);
String key = VideoManagerConstants.INVITE_PREFIX +
"_" + inviteInfo.getType() +
"_" + inviteInfo.getDeviceId() +
"_" + inviteInfo.getChannelId() +
"_" + stream;
inviteInfoInDb.setStream(stream);
redisTemplate.opsForValue().set(key, inviteInfoInDb);
return inviteInfoInDb;
}
@Override @Override
public InviteInfo getInviteInfo(InviteSessionType type, String deviceId, String channelId, String stream) { public InviteInfo getInviteInfo(InviteSessionType type, String deviceId, String channelId, String stream) {
String key = VideoManagerConstants.INVITE_PREFIX + String key = VideoManagerConstants.INVITE_PREFIX +
@ -152,19 +170,6 @@ public class InviteStreamServiceImpl implements IInviteStreamService {
} }
@Override
public void call(InviteSessionType type, String deviceId, String channelId, String stream, int code, String msg, Object data) {
String key = buildKey(type, deviceId, channelId, stream);
List<ErrorCallback<Object>> callbacks = inviteErrorCallbackMap.get(key);
if (callbacks == null) {
return;
}
for (ErrorCallback<Object> callback : callbacks) {
callback.run(code, msg, data);
}
inviteErrorCallbackMap.remove(key);
}
private String buildKey(InviteSessionType type, String deviceId, String channelId, String stream) { private String buildKey(InviteSessionType type, String deviceId, String channelId, String stream) {
String key = type + "_" + deviceId + "_" + channelId; String key = type + "_" + deviceId + "_" + channelId;
// 如果ssrc未null那么可以实现一个通道只能一次操作ssrc不为null则可以支持一个通道多次invite // 如果ssrc未null那么可以实现一个通道只能一次操作ssrc不为null则可以支持一个通道多次invite
@ -199,69 +204,9 @@ public class InviteStreamServiceImpl implements IInviteStreamService {
return count; return count;
} }
/*======================设备主子码流逻辑START=========================*/
@Override @Override
public InviteInfo getInviteInfoByDeviceAndChannel(InviteSessionType type, String deviceId, String channelId, boolean isSubStream) { public void call(InviteSessionType type, String deviceId, String channelId, String stream, int code, String msg, Object data) {
return getInviteInfo(type, deviceId, channelId,isSubStream, null); String key = buildSubStreamKey(type, deviceId, channelId, stream);
}
@Override
public void removeInviteInfoByDeviceAndChannel(InviteSessionType inviteSessionType, String deviceId, String channelId, boolean isSubStream) {
removeInviteInfo(inviteSessionType, deviceId, channelId,isSubStream, null);
}
@Override
public InviteInfo getInviteInfo(InviteSessionType type, String deviceId, String channelId,boolean isSubStream, String stream) {
String key = VideoManagerConstants.INVITE_PREFIX +
"_" + (type != null ? type : "*") +
"_" + (isSubStream ? "sub" : "main") +
"_" + (deviceId != null ? deviceId : "*") +
"_" + (channelId != null ? channelId : "*") +
"_" + (stream != null ? stream : "*");
List<Object> scanResult = RedisUtil.scan(redisTemplate, key);
if (scanResult.size() != 1) {
return null;
}
return (InviteInfo) redisTemplate.opsForValue().get(scanResult.get(0));
}
@Override
public void removeInviteInfo(InviteSessionType type, String deviceId, String channelId, boolean isSubStream, String stream) {
String scanKey = VideoManagerConstants.INVITE_PREFIX +
"_" + (type != null ? type : "*") +
"_" + (isSubStream ? "sub" : "main") +
"_" + (deviceId != null ? deviceId : "*") +
"_" + (channelId != null ? channelId : "*") +
"_" + (stream != null ? stream : "*");
List<Object> scanResult = RedisUtil.scan(redisTemplate, scanKey);
if (scanResult.size() > 0) {
for (Object keyObj : scanResult) {
String key = (String) keyObj;
InviteInfo inviteInfo = (InviteInfo) redisTemplate.opsForValue().get(key);
if (inviteInfo == null) {
continue;
}
redisTemplate.delete(key);
inviteErrorCallbackMap.remove(buildKey(type, deviceId, channelId, inviteInfo.getStream()));
}
}
}
@Override
public void once(InviteSessionType type, String deviceId, String channelId, boolean isSubStream, String stream, ErrorCallback<Object> callback) {
String key = buildSubStreamKey(type, deviceId, channelId,isSubStream, stream);
List<ErrorCallback<Object>> callbacks = inviteErrorCallbackMap.get(key);
if (callbacks == null) {
callbacks = new CopyOnWriteArrayList<>();
inviteErrorCallbackMap.put(key, callbacks);
}
callbacks.add(callback);
}
@Override
public void call(InviteSessionType type, String deviceId, String channelId, boolean isSubStream, String stream, int code, String msg, Object data) {
String key = buildSubStreamKey(type, deviceId, channelId,isSubStream, stream);
List<ErrorCallback<Object>> callbacks = inviteErrorCallbackMap.get(key); List<ErrorCallback<Object>> callbacks = inviteErrorCallbackMap.get(key);
if (callbacks == null) { if (callbacks == null) {
return; return;
@ -273,89 +218,12 @@ public class InviteStreamServiceImpl implements IInviteStreamService {
} }
private String buildSubStreamKey(InviteSessionType type, String deviceId, String channelId, boolean isSubStream, String stream) { private String buildSubStreamKey(InviteSessionType type, String deviceId, String channelId, String stream) {
String key = type + "_" + (isSubStream ? "sub":"main") + "_" + deviceId + "_" + channelId; String key = type + "_" + "_" + deviceId + "_" + channelId;
// 如果ssrc为null那么可以实现一个通道只能一次操作ssrc不为null则可以支持一个通道多次invite // 如果ssrc为null那么可以实现一个通道只能一次操作ssrc不为null则可以支持一个通道多次invite
if (stream != null) { if (stream != null) {
key += ("_" + stream); key += ("_" + stream);
} }
return key; return key;
} }
@Override
public void updateInviteInfoSub(InviteInfo inviteInfo) {
if (inviteInfo == null || (inviteInfo.getDeviceId() == null || inviteInfo.getChannelId() == null)) {
logger.warn("[更新Invite信息],参数不全: {}", JSON.toJSON(inviteInfo));
return;
}
InviteInfo inviteInfoForUpdate = null;
if (InviteSessionStatus.ready == inviteInfo.getStatus()) {
if (inviteInfo.getDeviceId() == null
|| inviteInfo.getChannelId() == null
|| inviteInfo.getType() == null
|| inviteInfo.getStream() == null
) {
return;
}
inviteInfoForUpdate = inviteInfo;
} else {
InviteInfo inviteInfoInRedis = getInviteInfo(inviteInfo.getType(), inviteInfo.getDeviceId(),
inviteInfo.getChannelId(),inviteInfo.isSubStream(), inviteInfo.getStream());
if (inviteInfoInRedis == null) {
logger.warn("[更新Invite信息]未从缓存中读取到Invite信息 deviceId: {}, channel: {}, stream: {}",
inviteInfo.getDeviceId(), inviteInfo.getChannelId(), inviteInfo.getStream());
return;
}
if (inviteInfo.getStreamInfo() != null) {
inviteInfoInRedis.setStreamInfo(inviteInfo.getStreamInfo());
}
if (inviteInfo.getSsrcInfo() != null) {
inviteInfoInRedis.setSsrcInfo(inviteInfo.getSsrcInfo());
}
if (inviteInfo.getStreamMode() != null) {
inviteInfoInRedis.setStreamMode(inviteInfo.getStreamMode());
}
if (inviteInfo.getReceiveIp() != null) {
inviteInfoInRedis.setReceiveIp(inviteInfo.getReceiveIp());
}
if (inviteInfo.getReceivePort() != null) {
inviteInfoInRedis.setReceivePort(inviteInfo.getReceivePort());
}
if (inviteInfo.getStatus() != null) {
inviteInfoInRedis.setStatus(inviteInfo.getStatus());
}
inviteInfoForUpdate = inviteInfoInRedis;
}
String key = VideoManagerConstants.INVITE_PREFIX +
"_" + inviteInfoForUpdate.getType() +
"_" + (inviteInfoForUpdate.isSubStream() ? "sub":"main") +
"_" + inviteInfoForUpdate.getDeviceId() +
"_" + inviteInfoForUpdate.getChannelId() +
"_" + inviteInfoForUpdate.getStream();
redisTemplate.opsForValue().set(key, inviteInfoForUpdate);
}
@Override
public InviteInfo getInviteInfoByStream(InviteSessionType type, String stream, boolean isSubStream) {
return getInviteInfo(type, null, null,isSubStream, stream);
}
@Override
public List<Object> getInviteInfos(InviteSessionType type, String deviceId, String channelId, String stream) {
String key = VideoManagerConstants.INVITE_PREFIX +
"_" + (type != null ? type : "*") +
"_" + (deviceId != null ? deviceId : "*") +
"_" + (channelId != null ? channelId : "*") +
"_" + (stream != null ? stream : "*");
List<Object> scanResult = RedisUtil.scan(redisTemplate, key);
return scanResult;
}
/*======================设备主子码流逻辑END=========================*/
} }

View File

@ -116,43 +116,27 @@ public class PlayServiceImpl implements IPlayService {
@Override @Override
public SSRCInfo play(MediaServerItem mediaServerItem, String deviceId, String channelId,boolean isSubStream, ErrorCallback<Object> callback) { public SSRCInfo play(MediaServerItem mediaServerItem, String deviceId, String channelId, ErrorCallback<Object> callback) {
if (mediaServerItem == null) { if (mediaServerItem == null) {
throw new ControllerException(ErrorCode.ERROR100.getCode(), "未找到可用的zlm"); throw new ControllerException(ErrorCode.ERROR100.getCode(), "未找到可用的zlm");
} }
Device device = redisCatchStorage.getDevice(deviceId); Device device = redisCatchStorage.getDevice(deviceId);
InviteInfo inviteInfo; InviteInfo inviteInfo = inviteStreamService.getInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, deviceId, channelId);
if(device.isSwitchPrimarySubStream()){
inviteInfo = inviteStreamService.getInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, deviceId, channelId,isSubStream);
}else {
inviteInfo = inviteStreamService.getInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, deviceId, channelId);
}
if (inviteInfo != null ) { if (inviteInfo != null ) {
if (inviteInfo.getStreamInfo() == null) { if (inviteInfo.getStreamInfo() == null) {
// 点播发起了但是尚未成功, 仅注册回调等待结果即可 // 点播发起了但是尚未成功, 仅注册回调等待结果即可
if(device.isSwitchPrimarySubStream()){ inviteStreamService.once(InviteSessionType.PLAY, deviceId, channelId, null, callback);
inviteStreamService.once(InviteSessionType.PLAY, deviceId, channelId,isSubStream, null, callback);
}else {
inviteStreamService.once(InviteSessionType.PLAY, deviceId, channelId, null, callback);
}
return inviteInfo.getSsrcInfo(); return inviteInfo.getSsrcInfo();
}else { }else {
StreamInfo streamInfo = inviteInfo.getStreamInfo(); StreamInfo streamInfo = inviteInfo.getStreamInfo();
String streamId = streamInfo.getStream(); String streamId = streamInfo.getStream();
if (streamId == null) { if (streamId == null) {
callback.run(InviteErrorCode.ERROR_FOR_CATCH_DATA.getCode(), "点播失败, redis缓存streamId等于null", null); callback.run(InviteErrorCode.ERROR_FOR_CATCH_DATA.getCode(), "点播失败, redis缓存streamId等于null", null);
if(device.isSwitchPrimarySubStream()){ inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId, null,
inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId,isSubStream, null, InviteErrorCode.ERROR_FOR_CATCH_DATA.getCode(),
InviteErrorCode.ERROR_FOR_CATCH_DATA.getCode(), "点播失败, redis缓存streamId等于null",
"点播失败, redis缓存streamId等于null", null);
null);
}else {
inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId, null,
InviteErrorCode.ERROR_FOR_CATCH_DATA.getCode(),
"点播失败, redis缓存streamId等于null",
null);
}
return inviteInfo.getSsrcInfo(); return inviteInfo.getSsrcInfo();
} }
String mediaServerId = streamInfo.getMediaServerId(); String mediaServerId = streamInfo.getMediaServerId();
@ -161,64 +145,38 @@ public class PlayServiceImpl implements IPlayService {
Boolean ready = zlmrtpServerFactory.isStreamReady(mediaInfo, "rtp", streamId); Boolean ready = zlmrtpServerFactory.isStreamReady(mediaInfo, "rtp", streamId);
if (ready != null && ready) { if (ready != null && ready) {
callback.run(InviteErrorCode.SUCCESS.getCode(), InviteErrorCode.SUCCESS.getMsg(), streamInfo); callback.run(InviteErrorCode.SUCCESS.getCode(), InviteErrorCode.SUCCESS.getMsg(), streamInfo);
if(device.isSwitchPrimarySubStream()){ inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId, null,
inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId,isSubStream, null, InviteErrorCode.SUCCESS.getCode(),
InviteErrorCode.SUCCESS.getCode(), InviteErrorCode.SUCCESS.getMsg(),
InviteErrorCode.SUCCESS.getMsg(), streamInfo);
streamInfo);
}else {
inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId, null,
InviteErrorCode.SUCCESS.getCode(),
InviteErrorCode.SUCCESS.getMsg(),
streamInfo);
}
return inviteInfo.getSsrcInfo(); return inviteInfo.getSsrcInfo();
}else { }else {
// 点播发起了但是尚未成功, 仅注册回调等待结果即可 // 点播发起了但是尚未成功, 仅注册回调等待结果即可
if(device.isSwitchPrimarySubStream()) { inviteStreamService.once(InviteSessionType.PLAY, deviceId, channelId, null, callback);
inviteStreamService.once(InviteSessionType.PLAY, deviceId, channelId, null, callback); storager.stopPlay(streamInfo.getDeviceID(), streamInfo.getChannelId());
storager.stopPlay(streamInfo.getDeviceID(), streamInfo.getChannelId()); inviteStreamService.removeInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, deviceId, channelId);
inviteStreamService.removeInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, deviceId, channelId);
}else {
inviteStreamService.once(InviteSessionType.PLAY, deviceId, channelId,isSubStream, null, callback);
inviteStreamService.removeInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, deviceId, channelId,isSubStream);
}
} }
} }
} }
String streamId = null; String streamId = String.format("%s_%s", device.getDeviceId(), channelId);
if (mediaServerItem.isRtpEnable()) {
if(device.isSwitchPrimarySubStream()){
streamId = StreamInfo.getPlayStream(deviceId, channelId, isSubStream);
}else {
streamId = String.format("%s_%s", device.getDeviceId(), channelId);
}
}
SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, streamId, null, device.isSsrcCheck(), false, 0, false, device.getStreamModeForParam()); SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, streamId, null, device.isSsrcCheck(), false, 0, false, device.getStreamModeForParam());
if (ssrcInfo == null) { if (ssrcInfo == null) {
callback.run(InviteErrorCode.ERROR_FOR_RESOURCE_EXHAUSTION.getCode(), InviteErrorCode.ERROR_FOR_RESOURCE_EXHAUSTION.getMsg(), null); callback.run(InviteErrorCode.ERROR_FOR_RESOURCE_EXHAUSTION.getCode(), InviteErrorCode.ERROR_FOR_RESOURCE_EXHAUSTION.getMsg(), null);
if(device.isSwitchPrimarySubStream()){ inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId, null,
inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId,isSubStream, null, InviteErrorCode.ERROR_FOR_RESOURCE_EXHAUSTION.getCode(),
InviteErrorCode.ERROR_FOR_RESOURCE_EXHAUSTION.getCode(), InviteErrorCode.ERROR_FOR_RESOURCE_EXHAUSTION.getMsg(),
InviteErrorCode.ERROR_FOR_RESOURCE_EXHAUSTION.getMsg(), null);
null);
}else {
inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId, null,
InviteErrorCode.ERROR_FOR_RESOURCE_EXHAUSTION.getCode(),
InviteErrorCode.ERROR_FOR_RESOURCE_EXHAUSTION.getMsg(),
null);
}
return null; return null;
} }
// TODO 记录点播的状态 // TODO 记录点播的状态
play(mediaServerItem, ssrcInfo, device, channelId,isSubStream, callback); play(mediaServerItem, ssrcInfo, device, channelId, callback);
return ssrcInfo; return ssrcInfo;
} }
@Override @Override
public void play(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, Device device, String channelId,boolean isSubStream, public void play(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, Device device, String channelId,
ErrorCallback<Object> callback) { ErrorCallback<Object> callback) {
if (mediaServerItem == null || ssrcInfo == null) { if (mediaServerItem == null || ssrcInfo == null) {
@ -227,11 +185,9 @@ public class PlayServiceImpl implements IPlayService {
null); null);
return; return;
} }
if( device.isSwitchPrimarySubStream() ){ logger.info("[点播开始] deviceId: {}, channelId: {},码流类型:{},收流端口: {}, 收流模式:{}, SSRC: {}, SSRC校验{}",
logger.info("[点播开始] deviceId: {}, channelId: {},码流类型:{},收流端口: {}, 收流模式:{}, SSRC: {}, SSRC校验{}", device.getDeviceId(), channelId,isSubStream ? "辅码流" : "主码流", ssrcInfo.getPort(), device.getStreamMode(), ssrcInfo.getSsrc(), device.isSsrcCheck()); device.getDeviceId(), channelId, device.isSwitchPrimarySubStream() ? "辅码流" : "主码流", ssrcInfo.getPort(),
}else { device.getStreamMode(), ssrcInfo.getSsrc(), device.isSsrcCheck());
logger.info("[点播开始] deviceId: {}, channelId: {},收流端口: {}, 收流模式:{}, SSRC: {}, SSRC校验{}", device.getDeviceId(), channelId, ssrcInfo.getPort(), device.getStreamMode(), ssrcInfo.getSsrc(), device.isSsrcCheck());
}
//端口获取失败的ssrcInfo 没有必要发送点播指令 //端口获取失败的ssrcInfo 没有必要发送点播指令
if (ssrcInfo.getPort() <= 0) { if (ssrcInfo.getPort() <= 0) {
logger.info("[点播端口分配异常]deviceId={},channelId={},ssrcInfo={}", device.getDeviceId(), channelId, ssrcInfo); logger.info("[点播端口分配异常]deviceId={},channelId={},ssrcInfo={}", device.getDeviceId(), channelId, ssrcInfo);
@ -240,50 +196,27 @@ public class PlayServiceImpl implements IPlayService {
streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream()); streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream());
callback.run(InviteErrorCode.ERROR_FOR_RESOURCE_EXHAUSTION.getCode(), "点播端口分配异常", null); callback.run(InviteErrorCode.ERROR_FOR_RESOURCE_EXHAUSTION.getCode(), "点播端口分配异常", null);
if(device.isSwitchPrimarySubStream()){ inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId, null,
inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId,isSubStream, null, InviteErrorCode.ERROR_FOR_RESOURCE_EXHAUSTION.getCode(), "点播端口分配异常", null);
InviteErrorCode.ERROR_FOR_RESOURCE_EXHAUSTION.getCode(), "点播端口分配异常", null);
}else {
inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId, null,
InviteErrorCode.ERROR_FOR_RESOURCE_EXHAUSTION.getCode(), "点播端口分配异常", null);
}
return; return;
} }
// 初始化redis中的invite消息状态 // 初始化redis中的invite消息状态
InviteInfo inviteInfo; InviteInfo inviteInfo = InviteInfo.getInviteInfo(device.getDeviceId(), channelId, ssrcInfo.getStream(), ssrcInfo,
mediaServerItem.getSdpIp(), ssrcInfo.getPort(), device.getStreamMode(), InviteSessionType.PLAY,
if(device.isSwitchPrimarySubStream()){ InviteSessionStatus.ready);
// 初始化redis中的invite消息状态 inviteInfo.setSubStream(device.isSwitchPrimarySubStream());
inviteInfo = InviteInfo.getInviteInfo(device.getDeviceId(), channelId,isSubStream, ssrcInfo.getStream(), ssrcInfo, inviteStreamService.updateInviteInfo(inviteInfo);
mediaServerItem.getSdpIp(), ssrcInfo.getPort(), device.getStreamMode(), InviteSessionType.PLAY,
InviteSessionStatus.ready);
inviteStreamService.updateInviteInfoSub(inviteInfo);
}else {
// 初始化redis中的invite消息状态
inviteInfo = InviteInfo.getinviteInfo(device.getDeviceId(), channelId, ssrcInfo.getStream(), ssrcInfo,
mediaServerItem.getSdpIp(), ssrcInfo.getPort(), device.getStreamMode(), InviteSessionType.PLAY,
InviteSessionStatus.ready);
inviteStreamService.updateInviteInfo(inviteInfo);
}
// 超时处理 // 超时处理
String timeOutTaskKey = UUID.randomUUID().toString(); String timeOutTaskKey = UUID.randomUUID().toString();
dynamicTask.startDelay(timeOutTaskKey, () -> { dynamicTask.startDelay(timeOutTaskKey, () -> {
// 执行超时任务时查询是否已经成功,成功了则不执行超时任务,防止超时任务取消失败的情况 // 执行超时任务时查询是否已经成功,成功了则不执行超时任务,防止超时任务取消失败的情况
InviteInfo inviteInfoForTimeOut; InviteInfo inviteInfoForTimeOut = inviteStreamService.getInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, device.getDeviceId(), channelId);
if(device.isSwitchPrimarySubStream()){
// 初始化redis中的invite消息状态
inviteInfoForTimeOut = inviteStreamService.getInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, device.getDeviceId(), channelId,isSubStream);
}else {
// 初始化redis中的invite消息状态
inviteInfoForTimeOut = inviteStreamService.getInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, device.getDeviceId(), channelId);
}
if (inviteInfoForTimeOut == null || inviteInfoForTimeOut.getStreamInfo() == null) { if (inviteInfoForTimeOut == null || inviteInfoForTimeOut.getStreamInfo() == null) {
if( device.isSwitchPrimarySubStream()){ logger.info("[点播超时] 收流超时 deviceId: {}, channelId: {},码流类型:{},端口:{}, SSRC: {}",
logger.info("[点播超时] 收流超时 deviceId: {}, channelId: {},码流类型:{},端口:{}, SSRC: {}", device.getDeviceId(), channelId,isSubStream ? "辅码流" : "主码流", ssrcInfo.getPort(), ssrcInfo.getSsrc()); device.getDeviceId(), channelId, device.isSwitchPrimarySubStream() ? "辅码流" : "主码流",
}else { ssrcInfo.getPort(), ssrcInfo.getSsrc());
logger.info("[点播超时] 收流超时 deviceId: {}, channelId: {},端口:{}, SSRC: {}", device.getDeviceId(), channelId, ssrcInfo.getPort(), ssrcInfo.getSsrc());
}
// 点播超时回复BYE 同时释放ssrc以及此次点播的资源 // 点播超时回复BYE 同时释放ssrc以及此次点播的资源
// InviteInfo inviteInfoForTimeout = inviteStreamService.getInviteInfoByDeviceAndChannel(InviteSessionType.play, device.getDeviceId(), channelId); // InviteInfo inviteInfoForTimeout = inviteStreamService.getInviteInfoByDeviceAndChannel(InviteSessionType.play, device.getDeviceId(), channelId);
// if (inviteInfoForTimeout == null) { // if (inviteInfoForTimeout == null) {
@ -295,16 +228,10 @@ public class PlayServiceImpl implements IPlayService {
// // TODO 发送cancel // // TODO 发送cancel
// } // }
callback.run(InviteErrorCode.ERROR_FOR_STREAM_TIMEOUT.getCode(), InviteErrorCode.ERROR_FOR_STREAM_TIMEOUT.getMsg(), null); callback.run(InviteErrorCode.ERROR_FOR_STREAM_TIMEOUT.getCode(), InviteErrorCode.ERROR_FOR_STREAM_TIMEOUT.getMsg(), null);
if( device.isSwitchPrimarySubStream()){ inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId, null,
inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId,isSubStream, null, InviteErrorCode.ERROR_FOR_STREAM_TIMEOUT.getCode(), InviteErrorCode.ERROR_FOR_STREAM_TIMEOUT.getMsg(), null);
InviteErrorCode.ERROR_FOR_STREAM_TIMEOUT.getCode(), InviteErrorCode.ERROR_FOR_STREAM_TIMEOUT.getMsg(), null); inviteStreamService.removeInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, device.getDeviceId(), channelId);
inviteStreamService.removeInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, device.getDeviceId(), channelId,isSubStream);
}else {
inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId, null,
InviteErrorCode.ERROR_FOR_STREAM_TIMEOUT.getCode(), InviteErrorCode.ERROR_FOR_STREAM_TIMEOUT.getMsg(), null);
inviteStreamService.removeInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, device.getDeviceId(), channelId);
}
try { try {
cmder.streamByeCmd(device, channelId, ssrcInfo.getStream(), null); cmder.streamByeCmd(device, channelId, ssrcInfo.getStream(), null);
} catch (InvalidArgumentException | ParseException | SipException | SsrcTransactionNotFoundException e) { } catch (InvalidArgumentException | ParseException | SipException | SsrcTransactionNotFoundException e) {
@ -322,42 +249,26 @@ public class PlayServiceImpl implements IPlayService {
}, userSetting.getPlayTimeout()); }, userSetting.getPlayTimeout());
try { try {
cmder.playStreamCmd(mediaServerItem, ssrcInfo, device, channelId,isSubStream, (mediaServerItemInuse, hookParam ) -> { cmder.playStreamCmd(mediaServerItem, ssrcInfo, device, channelId, (mediaServerItemInuse, hookParam ) -> {
logger.info("收到订阅消息: " + hookParam); logger.info("收到订阅消息: " + hookParam);
dynamicTask.stop(timeOutTaskKey); dynamicTask.stop(timeOutTaskKey);
// hook响应 // hook响应
StreamInfo streamInfo = onPublishHandlerForPlay(mediaServerItemInuse, hookParam, device.getDeviceId(), channelId,isSubStream); StreamInfo streamInfo = onPublishHandlerForPlay(mediaServerItemInuse, hookParam, device.getDeviceId(), channelId);
if (streamInfo == null){ if (streamInfo == null){
callback.run(InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getCode(), callback.run(InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getCode(),
InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getMsg(), null); InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getMsg(), null);
if( device.isSwitchPrimarySubStream()){ inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId, null,
inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId,isSubStream, null, InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getCode(),
InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getCode(), InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getMsg(), null);
InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getMsg(), null);
}else {
inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId, null,
InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getCode(),
InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getMsg(), null);
}
return; return;
} }
callback.run(InviteErrorCode.SUCCESS.getCode(), InviteErrorCode.SUCCESS.getMsg(), streamInfo); callback.run(InviteErrorCode.SUCCESS.getCode(), InviteErrorCode.SUCCESS.getMsg(), streamInfo);
if( device.isSwitchPrimarySubStream()){ inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId, null,
inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId,isSubStream, null, InviteErrorCode.SUCCESS.getCode(),
InviteErrorCode.SUCCESS.getCode(), InviteErrorCode.SUCCESS.getMsg(),
InviteErrorCode.SUCCESS.getMsg(), streamInfo);
streamInfo); logger.info("[点播成功] deviceId: {}, channelId: {},码流类型:{}", device.getDeviceId(),
}else { device.isSwitchPrimarySubStream() ? "辅码流" : "主码流");
inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId, null,
InviteErrorCode.SUCCESS.getCode(),
InviteErrorCode.SUCCESS.getMsg(),
streamInfo);
}
if( device.isSwitchPrimarySubStream() ){
logger.info("[点播成功] deviceId: {}, channelId: {},码流类型:{}", device.getDeviceId(), channelId,isSubStream ? "辅码流" : "主码流");
}else {
logger.info("[点播成功] deviceId: {}, channelId: {}", device.getDeviceId(), channelId);
}
String streamUrl; String streamUrl;
if (mediaServerItemInuse.getRtspPort() != 0) { if (mediaServerItemInuse.getRtspPort() != 0) {
streamUrl = String.format("rtsp://127.0.0.1:%s/%s/%s", mediaServerItemInuse.getRtspPort(), "rtp", ssrcInfo.getStream()); streamUrl = String.format("rtsp://127.0.0.1:%s/%s/%s", mediaServerItemInuse.getRtspPort(), "rtp", ssrcInfo.getStream());
@ -413,15 +324,9 @@ public class PlayServiceImpl implements IPlayService {
callback.run(InviteErrorCode.ERROR_FOR_SDP_PARSING_EXCEPTIONS.getCode(), callback.run(InviteErrorCode.ERROR_FOR_SDP_PARSING_EXCEPTIONS.getCode(),
InviteErrorCode.ERROR_FOR_SDP_PARSING_EXCEPTIONS.getMsg(), null); InviteErrorCode.ERROR_FOR_SDP_PARSING_EXCEPTIONS.getMsg(), null);
if(device.isSwitchPrimarySubStream()){ inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId, null,
inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId,isSubStream, null, InviteErrorCode.ERROR_FOR_SDP_PARSING_EXCEPTIONS.getCode(),
InviteErrorCode.ERROR_FOR_SDP_PARSING_EXCEPTIONS.getCode(), InviteErrorCode.ERROR_FOR_SDP_PARSING_EXCEPTIONS.getMsg(), null);
InviteErrorCode.ERROR_FOR_SDP_PARSING_EXCEPTIONS.getMsg(), null);
}else {
inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId, null,
InviteErrorCode.ERROR_FOR_SDP_PARSING_EXCEPTIONS.getCode(),
InviteErrorCode.ERROR_FOR_SDP_PARSING_EXCEPTIONS.getMsg(), null);
}
} }
} }
return; return;
@ -438,39 +343,26 @@ public class PlayServiceImpl implements IPlayService {
subscribe.removeSubscribe(hookSubscribe); subscribe.removeSubscribe(hookSubscribe);
String stream = String.format("%08x", Integer.parseInt(ssrcInResponse)).toUpperCase(); String stream = String.format("%08x", Integer.parseInt(ssrcInResponse)).toUpperCase();
hookSubscribe.getContent().put("stream", stream); hookSubscribe.getContent().put("stream", stream);
inviteInfo.setStream(stream); inviteStreamService.updateInviteInfoForStream(inviteInfo, stream);
subscribe.addSubscribe(hookSubscribe, (mediaServerItemInUse, hookParam) -> { subscribe.addSubscribe(hookSubscribe, (mediaServerItemInUse, hookParam) -> {
logger.info("[ZLM HOOK] ssrc修正后收到订阅消息 " + hookParam); logger.info("[ZLM HOOK] ssrc修正后收到订阅消息 " + hookParam);
dynamicTask.stop(timeOutTaskKey); dynamicTask.stop(timeOutTaskKey);
// hook响应 // hook响应
StreamInfo streamInfo = onPublishHandlerForPlay(mediaServerItemInUse, hookParam, device.getDeviceId(), channelId,isSubStream); StreamInfo streamInfo = onPublishHandlerForPlay(mediaServerItemInUse, hookParam, device.getDeviceId(), channelId);
if (streamInfo == null){ if (streamInfo == null){
callback.run(InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getCode(), callback.run(InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getCode(),
InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getMsg(), null); InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getMsg(), null);
if( device.isSwitchPrimarySubStream()){ inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId, null,
inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId,isSubStream, null, InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getCode(),
InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getCode(), InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getMsg(), null);
InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getMsg(), null);
}else {
inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId, null,
InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getCode(),
InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getMsg(), null);
}
return; return;
} }
callback.run(InviteErrorCode.SUCCESS.getCode(), callback.run(InviteErrorCode.SUCCESS.getCode(),
InviteErrorCode.SUCCESS.getMsg(), streamInfo); InviteErrorCode.SUCCESS.getMsg(), streamInfo);
if( device.isSwitchPrimarySubStream()){ inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId, null,
inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId,isSubStream, null, InviteErrorCode.SUCCESS.getCode(),
InviteErrorCode.SUCCESS.getCode(), InviteErrorCode.SUCCESS.getMsg(),
InviteErrorCode.SUCCESS.getMsg(), streamInfo);
streamInfo);
}else {
inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId, null,
InviteErrorCode.SUCCESS.getCode(),
InviteErrorCode.SUCCESS.getMsg(),
streamInfo);
}
}); });
return; return;
} }
@ -493,17 +385,14 @@ public class PlayServiceImpl implements IPlayService {
callback.run(InviteErrorCode.ERROR_FOR_RESET_SSRC.getCode(), callback.run(InviteErrorCode.ERROR_FOR_RESET_SSRC.getCode(),
"下级自定义了ssrc,重新设置收流信息失败", null); "下级自定义了ssrc,重新设置收流信息失败", null);
if( device.isSwitchPrimarySubStream()){ inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId, null,
inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId,isSubStream, null, InviteErrorCode.ERROR_FOR_RESET_SSRC.getCode(),
InviteErrorCode.ERROR_FOR_RESET_SSRC.getCode(), "下级自定义了ssrc,重新设置收流信息失败", null);
"下级自定义了ssrc,重新设置收流信息失败", null);
}else {
inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId, null,
InviteErrorCode.ERROR_FOR_RESET_SSRC.getCode(),
"下级自定义了ssrc,重新设置收流信息失败", null);
}
}else { }else {
if (ssrcInfo.getStream()!= null && !ssrcInfo.getStream().equals(inviteInfo.getStream())) {
inviteStreamService.removeInviteInfo(inviteInfo);
}
ssrcInfo.setSsrc(ssrcInResponse); ssrcInfo.setSsrc(ssrcInResponse);
inviteInfo.setSsrcInfo(ssrcInfo); inviteInfo.setSsrcInfo(ssrcInfo);
inviteInfo.setStream(ssrcInfo.getStream()); inviteInfo.setStream(ssrcInfo.getStream());
@ -512,11 +401,7 @@ public class PlayServiceImpl implements IPlayService {
logger.info("[点播消息] 收到invite 200, 下级自定义了ssrc, 但是当前模式无需修正"); logger.info("[点播消息] 收到invite 200, 下级自定义了ssrc, 但是当前模式无需修正");
} }
} }
if(device.isSwitchPrimarySubStream()){ inviteStreamService.updateInviteInfo(inviteInfo);
inviteStreamService.updateInviteInfoSub(inviteInfo);
}else {
inviteStreamService.updateInviteInfo(inviteInfo);
}
}, (event) -> { }, (event) -> {
dynamicTask.stop(timeOutTaskKey); dynamicTask.stop(timeOutTaskKey);
mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream()); mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream());
@ -527,19 +412,11 @@ public class PlayServiceImpl implements IPlayService {
callback.run(InviteErrorCode.ERROR_FOR_SIGNALLING_ERROR.getCode(), callback.run(InviteErrorCode.ERROR_FOR_SIGNALLING_ERROR.getCode(),
String.format("点播失败, 错误码: %s, %s", event.statusCode, event.msg), null); String.format("点播失败, 错误码: %s, %s", event.statusCode, event.msg), null);
if( device.isSwitchPrimarySubStream()){ inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId, null,
inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId,isSubStream, null, InviteErrorCode.ERROR_FOR_RESET_SSRC.getCode(),
InviteErrorCode.ERROR_FOR_RESET_SSRC.getCode(), String.format("点播失败, 错误码: %s, %s", event.statusCode, event.msg), null);
String.format("点播失败, 错误码: %s, %s", event.statusCode, event.msg), null);
inviteStreamService.removeInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, device.getDeviceId(), channelId,isSubStream); inviteStreamService.removeInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, device.getDeviceId(), channelId);
}else {
inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId, null,
InviteErrorCode.ERROR_FOR_RESET_SSRC.getCode(),
String.format("点播失败, 错误码: %s, %s", event.statusCode, event.msg), null);
inviteStreamService.removeInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, device.getDeviceId(), channelId);
}
}); });
} catch (InvalidArgumentException | SipException | ParseException e) { } catch (InvalidArgumentException | SipException | ParseException e) {
@ -553,52 +430,30 @@ public class PlayServiceImpl implements IPlayService {
callback.run(InviteErrorCode.ERROR_FOR_SIP_SENDING_FAILED.getCode(), callback.run(InviteErrorCode.ERROR_FOR_SIP_SENDING_FAILED.getCode(),
InviteErrorCode.ERROR_FOR_SIP_SENDING_FAILED.getMsg(), null); InviteErrorCode.ERROR_FOR_SIP_SENDING_FAILED.getMsg(), null);
if( device.isSwitchPrimarySubStream()){ inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId, null,
inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId,isSubStream, null, InviteErrorCode.ERROR_FOR_SIP_SENDING_FAILED.getCode(),
InviteErrorCode.ERROR_FOR_SIP_SENDING_FAILED.getCode(), InviteErrorCode.ERROR_FOR_SIP_SENDING_FAILED.getMsg(), null);
InviteErrorCode.ERROR_FOR_SIP_SENDING_FAILED.getMsg(), null);
inviteStreamService.removeInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, device.getDeviceId(), channelId,isSubStream); inviteStreamService.removeInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, device.getDeviceId(), channelId);
}else {
inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId, null,
InviteErrorCode.ERROR_FOR_SIP_SENDING_FAILED.getCode(),
InviteErrorCode.ERROR_FOR_SIP_SENDING_FAILED.getMsg(), null);
inviteStreamService.removeInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, device.getDeviceId(), channelId);
}
} }
} }
private StreamInfo onPublishHandlerForPlay(MediaServerItem mediaServerItem, HookParam hookParam, String deviceId, String channelId, boolean isSubStream) { private StreamInfo onPublishHandlerForPlay(MediaServerItem mediaServerItem, HookParam hookParam, String deviceId, String channelId) {
StreamInfo streamInfo = null; StreamInfo streamInfo = null;
Device device = redisCatchStorage.getDevice(deviceId); Device device = redisCatchStorage.getDevice(deviceId);
OnStreamChangedHookParam streamChangedHookParam = (OnStreamChangedHookParam)hookParam; OnStreamChangedHookParam streamChangedHookParam = (OnStreamChangedHookParam)hookParam;
if( device.isSwitchPrimarySubStream() ){ streamInfo = onPublishHandler(mediaServerItem, streamChangedHookParam, deviceId, channelId);
streamInfo = onPublishHandler(mediaServerItem, streamChangedHookParam, deviceId, channelId,isSubStream);
}else {
streamInfo = onPublishHandler(mediaServerItem, streamChangedHookParam, deviceId, channelId);
}
if (streamInfo != null) { if (streamInfo != null) {
InviteInfo inviteInfo; DeviceChannel deviceChannel = storager.queryChannel(deviceId, channelId);
if(device.isSwitchPrimarySubStream()){ if (deviceChannel != null) {
inviteInfo = inviteStreamService.getInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, deviceId, channelId,isSubStream); deviceChannel.setStreamId(streamInfo.getStream());
}else { storager.startPlay(deviceId, channelId, streamInfo.getStream());
DeviceChannel deviceChannel = storager.queryChannel(deviceId, channelId);
if (deviceChannel != null) {
deviceChannel.setStreamId(streamInfo.getStream());
storager.startPlay(deviceId, channelId, streamInfo.getStream());
}
inviteInfo = inviteStreamService.getInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, deviceId, channelId);
} }
InviteInfo inviteInfo = inviteStreamService.getInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, deviceId, channelId);
if (inviteInfo != null) { if (inviteInfo != null) {
inviteInfo.setStatus(InviteSessionStatus.ok); inviteInfo.setStatus(InviteSessionStatus.ok);
inviteInfo.setStreamInfo(streamInfo); inviteInfo.setStreamInfo(streamInfo);
if(device.isSwitchPrimarySubStream()){ inviteStreamService.updateInviteInfo(inviteInfo);
inviteStreamService.updateInviteInfoSub(inviteInfo);
}else {
inviteStreamService.updateInviteInfo(inviteInfo);
}
} }
} }
return streamInfo; return streamInfo;
@ -693,7 +548,7 @@ public class PlayServiceImpl implements IPlayService {
device.getDeviceId(), channelId, startTime, endTime, ssrcInfo.getPort(), device.getStreamMode(), device.getDeviceId(), channelId, startTime, endTime, ssrcInfo.getPort(), device.getStreamMode(),
ssrcInfo.getSsrc(), device.isSsrcCheck()); ssrcInfo.getSsrc(), device.isSsrcCheck());
// 初始化redis中的invite消息状态 // 初始化redis中的invite消息状态
InviteInfo inviteInfo = InviteInfo.getinviteInfo(device.getDeviceId(), channelId, ssrcInfo.getStream(), ssrcInfo, InviteInfo inviteInfo = InviteInfo.getInviteInfo(device.getDeviceId(), channelId, ssrcInfo.getStream(), ssrcInfo,
mediaServerItem.getSdpIp(), ssrcInfo.getPort(), device.getStreamMode(), InviteSessionType.PLAYBACK, mediaServerItem.getSdpIp(), ssrcInfo.getPort(), device.getStreamMode(), InviteSessionType.PLAYBACK,
InviteSessionStatus.ready); InviteSessionStatus.ready);
inviteStreamService.updateInviteInfo(inviteInfo); inviteStreamService.updateInviteInfo(inviteInfo);
@ -805,7 +660,7 @@ public class PlayServiceImpl implements IPlayService {
subscribe.removeSubscribe(hookSubscribe); subscribe.removeSubscribe(hookSubscribe);
String stream = String.format("%08x", Integer.parseInt(ssrcInResponse)).toUpperCase(); String stream = String.format("%08x", Integer.parseInt(ssrcInResponse)).toUpperCase();
hookSubscribe.getContent().put("stream", stream); hookSubscribe.getContent().put("stream", stream);
inviteInfo.setStream(stream); inviteStreamService.updateInviteInfoForStream(inviteInfo, stream);
subscribe.addSubscribe(hookSubscribe, (mediaServerItemInUse, hookParam) -> { subscribe.addSubscribe(hookSubscribe, (mediaServerItemInUse, hookParam) -> {
logger.info("[ZLM HOOK] ssrc修正后收到订阅消息 " + hookParam); logger.info("[ZLM HOOK] ssrc修正后收到订阅消息 " + hookParam);
dynamicTask.stop(playBackTimeOutTaskKey); dynamicTask.stop(playBackTimeOutTaskKey);
@ -834,6 +689,10 @@ public class PlayServiceImpl implements IPlayService {
"下级自定义了ssrc,重新设置收流信息失败", null); "下级自定义了ssrc,重新设置收流信息失败", null);
}else { }else {
if (ssrcInfo.getStream()!= null && !ssrcInfo.getStream().equals(inviteInfo.getStream())) {
inviteStreamService.removeInviteInfo(inviteInfo);
}
ssrcInfo.setSsrc(ssrcInResponse); ssrcInfo.setSsrc(ssrcInResponse);
inviteInfo.setSsrcInfo(ssrcInfo); inviteInfo.setSsrcInfo(ssrcInfo);
inviteInfo.setStream(ssrcInfo.getStream()); inviteInfo.setStream(ssrcInfo.getStream());
@ -892,7 +751,7 @@ public class PlayServiceImpl implements IPlayService {
} }
logger.info("[录像下载] deviceId: {}, channelId: {}, 下载速度:{}, 收流端口:{}, 收流模式:{}, SSRC: {}, SSRC校验{}", device.getDeviceId(), channelId, downloadSpeed, ssrcInfo.getPort(), device.getStreamMode(), ssrcInfo.getSsrc(), device.isSsrcCheck()); logger.info("[录像下载] deviceId: {}, channelId: {}, 下载速度:{}, 收流端口:{}, 收流模式:{}, SSRC: {}, SSRC校验{}", device.getDeviceId(), channelId, downloadSpeed, ssrcInfo.getPort(), device.getStreamMode(), ssrcInfo.getSsrc(), device.isSsrcCheck());
// 初始化redis中的invite消息状态 // 初始化redis中的invite消息状态
InviteInfo inviteInfo = InviteInfo.getinviteInfo(device.getDeviceId(), channelId, ssrcInfo.getStream(), ssrcInfo, InviteInfo inviteInfo = InviteInfo.getInviteInfo(device.getDeviceId(), channelId, ssrcInfo.getStream(), ssrcInfo,
mediaServerItem.getSdpIp(), ssrcInfo.getPort(), device.getStreamMode(), InviteSessionType.DOWNLOAD, mediaServerItem.getSdpIp(), ssrcInfo.getPort(), device.getStreamMode(), InviteSessionType.DOWNLOAD,
InviteSessionStatus.ready); InviteSessionStatus.ready);
inviteStreamService.updateInviteInfo(inviteInfo); inviteStreamService.updateInviteInfo(inviteInfo);
@ -998,7 +857,9 @@ public class PlayServiceImpl implements IPlayService {
// 添加订阅 // 添加订阅
HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed("rtp", ssrcInfo.getStream(), true, "rtsp", mediaServerItem.getId()); HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed("rtp", ssrcInfo.getStream(), true, "rtsp", mediaServerItem.getId());
subscribe.removeSubscribe(hookSubscribe); subscribe.removeSubscribe(hookSubscribe);
hookSubscribe.getContent().put("stream", String.format("%08x", Integer.parseInt(ssrcInResponse)).toUpperCase()); String stream = String.format("%08x", Integer.parseInt(ssrcInResponse)).toUpperCase();
hookSubscribe.getContent().put("stream", stream);
inviteStreamService.updateInviteInfoForStream(inviteInfo, stream);
subscribe.addSubscribe(hookSubscribe, (mediaServerItemInUse, hookParam) -> { subscribe.addSubscribe(hookSubscribe, (mediaServerItemInUse, hookParam) -> {
logger.info("[ZLM HOOK] ssrc修正后收到订阅消息 " + hookParam); logger.info("[ZLM HOOK] ssrc修正后收到订阅消息 " + hookParam);
dynamicTask.stop(downLoadTimeOutTaskKey); dynamicTask.stop(downLoadTimeOutTaskKey);
@ -1026,6 +887,9 @@ public class PlayServiceImpl implements IPlayService {
"下级自定义了ssrc,重新设置收流信息失败", null); "下级自定义了ssrc,重新设置收流信息失败", null);
}else { }else {
if (ssrcInfo.getStream()!= null && !ssrcInfo.getStream().equals(inviteInfo.getStream())) {
inviteStreamService.removeInviteInfo(inviteInfo);
}
ssrcInfo.setSsrc(ssrcInResponse); ssrcInfo.setSsrc(ssrcInResponse);
inviteInfo.setSsrcInfo(ssrcInfo); inviteInfo.setSsrcInfo(ssrcInfo);
inviteInfo.setStream(ssrcInfo.getStream()); inviteInfo.setStream(ssrcInfo.getStream());
@ -1034,6 +898,7 @@ public class PlayServiceImpl implements IPlayService {
logger.info("[录像下载] 收到invite 200, 下级自定义了ssrc, 但是当前模式无需修正"); logger.info("[录像下载] 收到invite 200, 下级自定义了ssrc, 但是当前模式无需修正");
} }
} }
inviteStreamService.updateInviteInfo(inviteInfo);
}); });
} catch (InvalidArgumentException | SipException | ParseException e) { } catch (InvalidArgumentException | SipException | ParseException e) {
logger.error("[命令发送失败] 录像下载: {}", e.getMessage()); logger.error("[命令发送失败] 录像下载: {}", e.getMessage());
@ -1255,18 +1120,13 @@ public class PlayServiceImpl implements IPlayService {
} }
@Override @Override
public void getSnap(String deviceId, String channelId, String fileName,boolean isSubStream, ErrorCallback errorCallback) { public void getSnap(String deviceId, String channelId, String fileName, ErrorCallback errorCallback) {
Device device = deviceService.getDevice(deviceId); Device device = deviceService.getDevice(deviceId);
if (device == null) { if (device == null) {
errorCallback.run(InviteErrorCode.ERROR_FOR_PARAMETER_ERROR.getCode(), InviteErrorCode.ERROR_FOR_PARAMETER_ERROR.getMsg(), null); errorCallback.run(InviteErrorCode.ERROR_FOR_PARAMETER_ERROR.getCode(), InviteErrorCode.ERROR_FOR_PARAMETER_ERROR.getMsg(), null);
return; return;
} }
InviteInfo inviteInfo; InviteInfo inviteInfo = inviteStreamService.getInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, deviceId, channelId);
if(device.isSwitchPrimarySubStream()){
inviteInfo = inviteStreamService.getInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, deviceId, channelId,isSubStream);
}else {
inviteInfo = inviteStreamService.getInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, deviceId, channelId);
}
if (inviteInfo != null) { if (inviteInfo != null) {
if (inviteInfo.getStreamInfo() != null) { if (inviteInfo.getStreamInfo() != null) {
// 已存在线直接截图 // 已存在线直接截图
@ -1292,11 +1152,11 @@ public class PlayServiceImpl implements IPlayService {
} }
MediaServerItem newMediaServerItem = getNewMediaServerItem(device); MediaServerItem newMediaServerItem = getNewMediaServerItem(device);
play(newMediaServerItem, deviceId, channelId,isSubStream, (code, msg, data)->{ play(newMediaServerItem, deviceId, channelId, (code, msg, data)->{
if (code == InviteErrorCode.SUCCESS.getCode()) { if (code == InviteErrorCode.SUCCESS.getCode()) {
InviteInfo inviteInfoForPlay = inviteStreamService.getInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, deviceId, channelId); InviteInfo inviteInfoForPlay = inviteStreamService.getInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, deviceId, channelId);
if (inviteInfoForPlay != null && inviteInfoForPlay.getStreamInfo() != null) { if (inviteInfoForPlay != null && inviteInfoForPlay.getStreamInfo() != null) {
getSnap(deviceId, channelId, fileName,isSubStream, errorCallback); getSnap(deviceId, channelId, fileName, errorCallback);
}else { }else {
errorCallback.run(InviteErrorCode.FAIL.getCode(), InviteErrorCode.FAIL.getMsg(), null); errorCallback.run(InviteErrorCode.FAIL.getCode(), InviteErrorCode.FAIL.getMsg(), null);
} }
@ -1306,17 +1166,4 @@ public class PlayServiceImpl implements IPlayService {
}); });
} }
/*======================设备主子码流逻辑START=========================*/
public StreamInfo onPublishHandler(MediaServerItem mediaServerItem, OnStreamChangedHookParam hookParam, String deviceId, String channelId,boolean isSubStream) {
String streamId = hookParam.getStream();
List<OnStreamChangedHookParam.MediaTrack> tracks = hookParam.getTracks();
StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStream(mediaServerItem, "rtp", streamId, tracks, null);
streamInfo.setDeviceID(deviceId);
streamInfo.setChannelId(channelId);
streamInfo.setSubStream(isSubStream);
return streamInfo;
}
/*======================设备主子码流逻辑END=========================*/
} }

View File

@ -70,7 +70,6 @@ public class UserServiceImpl implements IUserService {
continue; continue;
} }
String checkStr = callId == null? user.getPushKey():(callId + "_" + user.getPushKey()) ; String checkStr = callId == null? user.getPushKey():(callId + "_" + user.getPushKey()) ;
System.out.println(checkStr);
String checkSign = DigestUtils.md5DigestAsHex(checkStr.getBytes()); String checkSign = DigestUtils.md5DigestAsHex(checkStr.getBytes());
if (checkSign.equals(sign)) { if (checkSign.equals(sign)) {
return true; return true;

View File

@ -88,17 +88,16 @@ public class PlayController {
@Operation(summary = "开始点播") @Operation(summary = "开始点播")
@Parameter(name = "deviceId", description = "设备国标编号", required = true) @Parameter(name = "deviceId", description = "设备国标编号", required = true)
@Parameter(name = "channelId", description = "通道国标编号", required = true) @Parameter(name = "channelId", description = "通道国标编号", required = true)
@Parameter(name = "isSubStream", description = "是否子码流true-子码流false-主码流默认为false", required = true)
@GetMapping("/start/{deviceId}/{channelId}") @GetMapping("/start/{deviceId}/{channelId}")
public DeferredResult<WVPResult<StreamContent>> play(HttpServletRequest request, @PathVariable String deviceId, public DeferredResult<WVPResult<StreamContent>> play(HttpServletRequest request, @PathVariable String deviceId,
@PathVariable String channelId,boolean isSubStream) { @PathVariable String channelId) {
// 获取可用的zlm // 获取可用的zlm
Device device = storager.queryVideoDevice(deviceId); Device device = storager.queryVideoDevice(deviceId);
MediaServerItem newMediaServerItem = playService.getNewMediaServerItem(device); MediaServerItem newMediaServerItem = playService.getNewMediaServerItem(device);
RequestMessage requestMessage = new RequestMessage(); RequestMessage requestMessage = new RequestMessage();
String key = DeferredResultHolder.getPlayKey(deviceId,channelId,device.isSwitchPrimarySubStream(),isSubStream); String key = DeferredResultHolder.CALLBACK_CMD_PLAY + deviceId + channelId;
requestMessage.setKey(key); requestMessage.setKey(key);
String uuid = UUID.randomUUID().toString(); String uuid = UUID.randomUUID().toString();
requestMessage.setId(uuid); requestMessage.setId(uuid);
@ -117,7 +116,7 @@ public class PlayController {
// 录像查询以channelId作为deviceId查询 // 录像查询以channelId作为deviceId查询
resultHolder.put(key, uuid, result); resultHolder.put(key, uuid, result);
playService.play(newMediaServerItem, deviceId, channelId,isSubStream, (code, msg, data) -> { playService.play(newMediaServerItem, deviceId, channelId, (code, msg, data) -> {
WVPResult<StreamContent> wvpResult = new WVPResult<>(); WVPResult<StreamContent> wvpResult = new WVPResult<>();
if (code == InviteErrorCode.SUCCESS.getCode()) { if (code == InviteErrorCode.SUCCESS.getCode()) {
wvpResult.setCode(ErrorCode.SUCCESS.getCode()); wvpResult.setCode(ErrorCode.SUCCESS.getCode());
@ -158,12 +157,7 @@ public class PlayController {
throw new ControllerException(ErrorCode.ERROR100.getCode(), "设备[" + deviceId + "]不存在"); throw new ControllerException(ErrorCode.ERROR100.getCode(), "设备[" + deviceId + "]不存在");
} }
InviteInfo inviteInfo =null; InviteInfo inviteInfo = inviteStreamService.getInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, deviceId, channelId);
if(device.isSwitchPrimarySubStream()){
inviteInfo = inviteStreamService.getInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, deviceId, channelId,isSubStream);
}else {
inviteInfo = inviteStreamService.getInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, deviceId, channelId);
}
if (inviteInfo == null) { if (inviteInfo == null) {
throw new ControllerException(ErrorCode.ERROR100.getCode(), "点播未找到"); throw new ControllerException(ErrorCode.ERROR100.getCode(), "点播未找到");
} }
@ -176,12 +170,8 @@ public class PlayController {
throw new ControllerException(ErrorCode.ERROR100.getCode(), "命令发送失败: " + e.getMessage()); throw new ControllerException(ErrorCode.ERROR100.getCode(), "命令发送失败: " + e.getMessage());
} }
} }
if(device.isSwitchPrimarySubStream()){ inviteStreamService.removeInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, deviceId, channelId);
inviteStreamService.removeInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, deviceId, channelId,isSubStream); storager.stopPlay(deviceId, channelId);
}else {
inviteStreamService.removeInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, deviceId, channelId);
storager.stopPlay(deviceId, channelId);
}
JSONObject json = new JSONObject(); JSONObject json = new JSONObject();
json.put("deviceId", deviceId); json.put("deviceId", deviceId);
@ -360,9 +350,8 @@ public class PlayController {
logger.debug("获取截图: {}/{}", deviceId, channelId); logger.debug("获取截图: {}/{}", deviceId, channelId);
} }
Device device = storager.queryVideoDevice(deviceId);
DeferredResult<String> result = new DeferredResult<>(3 * 1000L); DeferredResult<String> result = new DeferredResult<>(3 * 1000L);
String key = DeferredResultHolder.getSnapKey(deviceId,channelId,device.isSwitchPrimarySubStream(),isSubStream); String key = DeferredResultHolder.CALLBACK_CMD_SNAP + deviceId;
String uuid = UUID.randomUUID().toString(); String uuid = UUID.randomUUID().toString();
resultHolder.put(key, uuid, result); resultHolder.put(key, uuid, result);
@ -371,7 +360,7 @@ public class PlayController {
message.setId(uuid); message.setId(uuid);
String fileName = deviceId + "_" + channelId + "_" + DateUtil.getNowForUrl() + "jpg"; String fileName = deviceId + "_" + channelId + "_" + DateUtil.getNowForUrl() + "jpg";
playService.getSnap(deviceId, channelId, fileName,isSubStream, (code, msg, data) -> { playService.getSnap(deviceId, channelId, fileName, (code, msg, data) -> {
if (code == InviteErrorCode.SUCCESS.getCode()) { if (code == InviteErrorCode.SUCCESS.getCode()) {
message.setData(data); message.setData(data);
}else { }else {

View File

@ -122,7 +122,7 @@ public class ApiStreamController {
MediaServerItem newMediaServerItem = playService.getNewMediaServerItem(device); MediaServerItem newMediaServerItem = playService.getNewMediaServerItem(device);
playService.play(newMediaServerItem, serial, code,false, (errorCode, msg, data) -> { playService.play(newMediaServerItem, serial, code, (errorCode, msg, data) -> {
if (errorCode == InviteErrorCode.SUCCESS.getCode()) { if (errorCode == InviteErrorCode.SUCCESS.getCode()) {
InviteInfo inviteInfo = inviteStreamService.getInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, serial, code); InviteInfo inviteInfo = inviteStreamService.getInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, serial, code);
if (inviteInfo != null && inviteInfo.getStreamInfo() != null) { if (inviteInfo != null && inviteInfo.getStreamInfo() != null) {