国标级联支持拉流代理自动拉起

pull/647/head
648540858 2022-10-19 11:17:08 +08:00
parent b6bcb5f2f4
commit b25f3631ee
1 changed files with 30 additions and 12 deletions

View File

@ -502,22 +502,18 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId); mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId);
} }
}else if ("proxy".equals(gbStream.getStreamType())){ }else if ("proxy".equals(gbStream.getStreamType())){
if(null != proxyByAppAndStream &&proxyByAppAndStream.isStatus()){ if (null != proxyByAppAndStream) {
if(proxyByAppAndStream.isStatus()){
pushProxyStream(evt, request, gbStream, platform, callIdHeader, mediaServerItem, port, tcpActive, pushProxyStream(evt, request, gbStream, platform, callIdHeader, mediaServerItem, port, tcpActive,
mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId); mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId);
}else{ }else{
//开启代理拉流 //开启代理拉流
boolean start1 = streamProxyService.start(gbStream.getApp(), gbStream.getStream());
if(start1) {
pushProxyStream(evt, request, gbStream, platform, callIdHeader, mediaServerItem, port, tcpActive,
mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId);
}else{
//失败后通知
notifyStreamOnline(evt, request,gbStream, null, platform, callIdHeader, mediaServerItem, port, tcpActive, notifyStreamOnline(evt, request,gbStream, null, platform, callIdHeader, mediaServerItem, port, tcpActive,
mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId); mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId);
} }
} }
} }
} }
} }
@ -629,11 +625,33 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
if ("proxy".equals(gbStream.getStreamType())) { if ("proxy".equals(gbStream.getStreamType())) {
// TODO 控制启用以使设备上线 // TODO 控制启用以使设备上线
logger.info("[ app={}, stream={} ]通道未推流,启用流后开始推流", gbStream.getApp(), gbStream.getStream()); logger.info("[ app={}, stream={} ]通道未推流,启用流后开始推流", gbStream.getApp(), gbStream.getStream());
// 监听流上线
HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed(gbStream.getApp(), gbStream.getStream(), true, "rtsp", mediaServerItem.getId());
zlmHttpHookSubscribe.addSubscribe(hookSubscribe, (mediaServerItemInUSe, responseJSON) -> {
String app = responseJSON.getString("app");
String stream = responseJSON.getString("stream");
logger.info("[上级点播]拉流代理已经就绪, {}/{}", app, stream);
dynamicTask.stop(callIdHeader.getCallId());
pushProxyStream(evt, request, gbStream, platform, callIdHeader, mediaServerItem, port, tcpActive,
mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId);
});
dynamicTask.startDelay(callIdHeader.getCallId(), () -> {
logger.info("[ app={}, stream={} ] 等待拉流代理流超时", gbStream.getApp(), gbStream.getStream());
zlmHttpHookSubscribe.removeSubscribe(hookSubscribe);
}, userSetting.getPlatformPlayTimeout());
boolean start = streamProxyService.start(gbStream.getApp(), gbStream.getStream());
if (!start) {
try { try {
responseAck(request, Response.BAD_REQUEST, "channel [" + gbStream.getGbId() + "] offline"); responseAck(request, Response.BUSY_HERE, "channel [" + gbStream.getGbId() + "] offline");
} catch (SipException | InvalidArgumentException | ParseException e) { } catch (SipException | InvalidArgumentException | ParseException e) {
logger.error("[命令发送失败] invite 通道未推流: {}", e.getMessage()); logger.error("[命令发送失败] invite 通道未推流: {}", e.getMessage());
} }
zlmHttpHookSubscribe.removeSubscribe(hookSubscribe);
dynamicTask.stop(callIdHeader.getCallId());
}
} else if ("push".equals(gbStream.getStreamType())) { } else if ("push".equals(gbStream.getStreamType())) {
if (!platform.isStartOfflinePush()) { if (!platform.isStartOfflinePush()) {
// 平台设置中关闭了拉起离线的推流则直接回复 // 平台设置中关闭了拉起离线的推流则直接回复