添加对海康平台录像回放的兼容,修复录像信息发送失败, 级联平台支持开启rtcp保活

pull/608/head
648540858 2022-09-05 17:10:21 +08:00
parent 0134488428
commit d5e8aa62a1
15 changed files with 202 additions and 69 deletions

View File

@ -113,7 +113,6 @@ public class ParentPlatform {
/**
* RTCP
* TODO ,
*/
@Schema(description = "RTCP流保活")
private boolean rtcp;

View File

@ -103,7 +103,7 @@ public interface ISIPCommander {
* @param startTime ,yyyy-MM-dd HH:mm:ss
* @param endTime ,yyyy-MM-dd HH:mm:ss
*/
void playbackStreamCmd(MediaServerItem mediaServerItem, SSRCInfo ssrcInf, Device device, String channelId, String startTime, String endTime,InviteStreamCallback inviteStreamCallback, InviteStreamCallback event, SipSubscribe.Event errorEvent);
void playbackStreamCmd(MediaServerItem mediaServerItem, SSRCInfo ssrcInf, Device device, String channelId, String startTime, String endTime,InviteStreamCallback inviteStreamCallback, InviteStreamCallback event, SipSubscribe.Event okEvent, SipSubscribe.Event errorEvent);
/**
*

View File

@ -456,7 +456,7 @@ public class SIPCommander implements ISIPCommander {
@Override
public void playbackStreamCmd(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, Device device, String channelId,
String startTime, String endTime, InviteStreamCallback inviteStreamCallback, InviteStreamCallback hookEvent,
SipSubscribe.Event errorEvent) {
SipSubscribe.Event okEvent,SipSubscribe.Event errorEvent) {
try {
logger.info("{} 分配的ZLM为: {} [{}:{}]", ssrcInfo.getStream(), mediaServerItem.getId(), mediaServerItem.getIp(), ssrcInfo.getPort());
@ -535,10 +535,11 @@ public class SIPCommander implements ISIPCommander {
});
Request request = headerProvider.createPlaybackInviteRequest(device, channelId, content.toString(), null, "fromplybck" + tm, null, callIdHeader, ssrcInfo.getSsrc());
transmitRequest(device, request, errorEvent, okEvent -> {
ResponseEvent responseEvent = (ResponseEvent) okEvent.event;
transmitRequest(device, request, errorEvent, event -> {
ResponseEvent responseEvent = (ResponseEvent) event.event;
streamSession.put(device.getDeviceId(), channelId, callIdHeader.getCallId(), ssrcInfo.getStream(), ssrcInfo.getSsrc(), mediaServerItem.getId(), responseEvent.getClientTransaction(), VideoStreamSessionManager.SessionType.playback);
streamSession.put(device.getDeviceId(), channelId, callIdHeader.getCallId(), okEvent.dialog);
streamSession.put(device.getDeviceId(), channelId, callIdHeader.getCallId(), event.dialog);
okEvent.response(event);
});
if (inviteStreamCallback != null) {
inviteStreamCallback.call(new InviteStreamInfo(mediaServerItem, null, callIdHeader.getCallId(), "rtp", ssrcInfo.getStream()));

View File

@ -115,6 +115,11 @@ public class AckRequestProcessor extends SIPRequestProcessorParent implements In
param.put("pt", sendRtpItem.getPt());
param.put("use_ps", sendRtpItem.isUsePs() ? "1" : "0");
param.put("only_audio", sendRtpItem.isOnlyAudio() ? "1" : "0");
if (!sendRtpItem.isTcp() && parentPlatform.isRtcp()) {
// 开启rtcp保活
param.put("udp_rtcp_timeout", "1");
}
if (mediaInfo == null) {
RequestPushStreamMsg requestPushStreamMsg = RequestPushStreamMsg.getInstance(
sendRtpItem.getMediaServerId(), sendRtpItem.getApp(), sendRtpItem.getStreamId(),

View File

@ -98,8 +98,8 @@ public class ByeRequestProcessor extends SIPRequestProcessorParent implements In
param.put("ssrc",sendRtpItem.getSsrc());
logger.info("收到bye:停止向上级推流:" + streamId);
MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());
zlmrtpServerFactory.stopSendRtpStream(mediaInfo, param);
redisCatchStorage.deleteSendRTPServer(platformGbId, channelId, callIdHeader.getCallId(), null);
zlmrtpServerFactory.stopSendRtpStream(mediaInfo, param);
int totalReaderCount = zlmrtpServerFactory.totalReaderCount(mediaInfo, sendRtpItem.getApp(), streamId);
if (totalReaderCount <= 0) {
logger.info("收到bye: {} 无其它观看者,通知设备停止推流", streamId);

View File

@ -563,6 +563,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
responseAck(evt, Response.BAD_REQUEST, "channel [" + gbStream.getGbId() + "] offline");
} else if ("push".equals(gbStream.getStreamType())) {
if (!platform.isStartOfflinePush()) {
// 平台设置中关闭了拉起离线的推流则直接回复
responseAck(evt, Response.TEMPORARILY_UNAVAILABLE, "channel unavailable");
return;
}
@ -599,7 +600,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
app, stream, channelId, mediaTransmissionTCP);
if (sendRtpItem == null) {
logger.warn("服务器端口资源不足");
logger.warn("上级点时创建sendRTPItem失败可能是服务器端口资源不足");
try {
responseAck(evt, Response.BUSY_HERE);
} catch (SipException e) {

View File

@ -50,7 +50,7 @@ public class AssistRESTfulUtils {
if (mediaServerItem == null) {
return null;
}
if (mediaServerItem.getRecordAssistPort() > 0) {
if (mediaServerItem.getRecordAssistPort() <= 0) {
logger.warn("未启用Assist服务");
return null;
}

View File

@ -19,8 +19,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.util.ObjectUtils;
import org.springframework.web.bind.annotation.PostMapping;
@ -544,6 +542,8 @@ public class ZLMHttpHookListener {
for (SendRtpItem sendRtpItem : sendRtpItems) {
ParentPlatform parentPlatform = storager.queryParentPlatByServerGBId(sendRtpItem.getPlatformId());
commanderFroPlatform.streamByeCmd(parentPlatform, sendRtpItem.getCallId());
redisCatchStorage.deleteSendRTPServer(parentPlatform.getServerGBId(), sendRtpItem.getChannelId(),
sendRtpItem.getCallId(), sendRtpItem.getStreamId());
}
}
}
@ -573,13 +573,19 @@ public class ZLMHttpHookListener {
return ret;
}else {
StreamProxyItem streamProxyItem = streamProxyService.getStreamProxyByAppAndStream(app, streamId);
if (streamProxyItem != null && streamProxyItem.isEnable_remove_none_reader()) {
ret.put("close", true);
streamProxyService.del(app, streamId);
String url = streamProxyItem.getUrl() != null?streamProxyItem.getUrl():streamProxyItem.getSrc_url();
logger.info("[{}/{}]<-[{}] 拉流代理无人观看已经移除", app, streamId, url);
}else {
ret.put("close", false);
if (streamProxyItem != null ) {
if (streamProxyItem.isEnable_remove_none_reader()) {
// 无人观看自动移除
ret.put("close", true);
streamProxyService.del(app, streamId);
String url = streamProxyItem.getUrl() != null?streamProxyItem.getUrl():streamProxyItem.getSrc_url();
logger.info("[{}/{}]<-[{}] 拉流代理无人观看已经移除", app, streamId, url);
}else if (streamProxyItem.isEnable_disable_none_reader()) {
// 无人观看停用
ret.put("close", true);
}else {
ret.put("close", false);
}
}
return ret;
}
@ -649,6 +655,39 @@ public class ZLMHttpHookListener {
return ret;
}
/**
* rtp(startSendRtp)
*/
@ResponseBody
@PostMapping(value = "/on_send_rtp_stopped", produces = "application/json;charset=UTF-8")
public JSONObject onSendRtpStopped(HttpServletRequest request, @RequestBody JSONObject jsonObject){
logger.info("[ ZLM HOOK ]on_send_rtp_stopped API调用参数" + jsonObject);
JSONObject ret = new JSONObject();
ret.put("code", 0);
ret.put("msg", "success");
// 查找对应的上级推流,发送停止
String app = jsonObject.getString("app");
if (!"rtp".equals(app)) {
return ret;
}
String stream = jsonObject.getString("stream");
List<SendRtpItem> sendRtpItems = redisCatchStorage.querySendRTPServerByStream(stream);
if (sendRtpItems.size() > 0) {
for (SendRtpItem sendRtpItem : sendRtpItems) {
ParentPlatform parentPlatform = storager.queryParentPlatByServerGBId(sendRtpItem.getPlatformId());
commanderFroPlatform.streamByeCmd(parentPlatform, sendRtpItem.getCallId());
redisCatchStorage.deleteSendRTPServer(parentPlatform.getServerGBId(), sendRtpItem.getChannelId(),
sendRtpItem.getCallId(), sendRtpItem.getStreamId());
}
}
return ret;
}
private Map<String, String> urlParamToMap(String params) {
HashMap<String, String> map = new HashMap<>();
if (ObjectUtils.isEmpty(params)) {

View File

@ -37,6 +37,9 @@ public class StreamProxyItem extends GbStream {
private boolean enable_mp4;
@Schema(description = "是否 无人观看时删除")
private boolean enable_remove_none_reader;
@Schema(description = "是否 无人观看时不启用")
private boolean enable_disable_none_reader;
@Schema(description = "上级平台国标ID")
private String platformGbId;
@Schema(description = "创建时间")
@ -177,4 +180,11 @@ public class StreamProxyItem extends GbStream {
this.enable_remove_none_reader = enable_remove_none_reader;
}
public boolean isEnable_disable_none_reader() {
return enable_disable_none_reader;
}
public void setEnable_disable_none_reader(boolean enable_disable_none_reader) {
this.enable_disable_none_reader = enable_disable_none_reader;
}
}

View File

@ -531,6 +531,7 @@ public class MediaServerServiceImpl implements IMediaServerService {
param.put("hook.on_stream_none_reader",String.format("%s/on_stream_none_reader", hookPrex));
param.put("hook.on_stream_not_found",String.format("%s/on_stream_not_found", hookPrex));
param.put("hook.on_server_keepalive",String.format("%s/on_server_keepalive", hookPrex));
param.put("hook.on_send_rtp_stopped",String.format("%s/on_send_rtp_stopped", hookPrex));
if (mediaServerItem.getRecordAssistPort() > 0) {
param.put("hook.on_record_mp4",String.format("http://127.0.0.1:%s/api/record/on_record_mp4", mediaServerItem.getRecordAssistPort()));
}else {

View File

@ -73,7 +73,6 @@ public class MediaServiceImpl implements IMediaService {
}else {
streamInfo = getStreamInfoByAppAndStream(mediaInfo, app, stream, tracks, addr,null);
}
}
}
return streamInfo;

View File

@ -291,7 +291,7 @@ public class PlayServiceImpl implements IPlayService {
}
logger.info("[点播消息] 收到invite 200, 发现下级自定义了ssrc: {}", ssrcInResponse );
if (!mediaServerItem.isRtpEnable() || device.isSsrcCheck()) {
logger.info("[SIP 消息] SSRC修正 {}->{}", ssrc, ssrcInResponse);
logger.info("[点播消息] SSRC修正 {}->{}", ssrc, ssrcInResponse);
if (!mediaServerItem.getSsrcConfig().checkSsrc(ssrcInResponse)) {
// ssrc 不可用
@ -441,37 +441,92 @@ public class PlayServiceImpl implements IPlayService {
resultHolder.exist(DeferredResultHolder.CALLBACK_CMD_PLAYBACK + deviceId + channelId, uuid);
}, userSetting.getPlayTimeout());
SipSubscribe.Event errorEvent = event -> {
dynamicTask.stop(playBackTimeOutTaskKey);
requestMessage.setData(WVPResult.fail(ErrorCode.ERROR100.getCode(), String.format("回放失败, 错误码: %s, %s", event.statusCode, event.msg)));
playBackResult.setCode(ErrorCode.ERROR100.getCode());
playBackResult.setMsg(String.format("回放失败, 错误码: %s, %s", event.statusCode, event.msg));
playBackResult.setData(requestMessage);
playBackResult.setEvent(event);
playBackCallback.call(playBackResult);
streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream());
};
InviteStreamCallback hookEvent = (InviteStreamInfo inviteStreamInfo) -> {
logger.info("收到回放订阅消息: " + inviteStreamInfo.getResponse().toJSONString());
dynamicTask.stop(playBackTimeOutTaskKey);
StreamInfo streamInfo = onPublishHandler(inviteStreamInfo.getMediaServerItem(), inviteStreamInfo.getResponse(), deviceId, channelId);
if (streamInfo == null) {
logger.warn("设备回放API调用失败");
playBackResult.setCode(ErrorCode.ERROR100.getCode());
playBackResult.setMsg("设备回放API调用失败");
playBackCallback.call(playBackResult);
return;
}
redisCatchStorage.startPlayback(streamInfo, inviteStreamInfo.getCallId());
WVPResult<StreamInfo> success = WVPResult.success(streamInfo);
requestMessage.setData(success);
playBackResult.setCode(ErrorCode.SUCCESS.getCode());
playBackResult.setMsg(ErrorCode.SUCCESS.getMsg());
playBackResult.setData(requestMessage);
playBackResult.setMediaServerItem(inviteStreamInfo.getMediaServerItem());
playBackResult.setResponse(inviteStreamInfo.getResponse());
playBackCallback.call(playBackResult);
};
cmder.playbackStreamCmd(mediaServerItem, ssrcInfo, device, channelId, startTime, endTime, infoCallBack,
(InviteStreamInfo inviteStreamInfo) -> {
logger.info("收到订阅消息: " + inviteStreamInfo.getResponse().toJSONString());
dynamicTask.stop(playBackTimeOutTaskKey);
StreamInfo streamInfo = onPublishHandler(inviteStreamInfo.getMediaServerItem(), inviteStreamInfo.getResponse(), deviceId, channelId);
if (streamInfo == null) {
logger.warn("设备回放API调用失败");
playBackResult.setCode(ErrorCode.ERROR100.getCode());
playBackResult.setMsg("设备回放API调用失败");
playBackCallback.call(playBackResult);
return;
hookEvent, eventResult -> {
if (eventResult.type == SipSubscribe.EventResultType.response) {
ResponseEvent responseEvent = (ResponseEvent)eventResult.event;
String contentString = new String(responseEvent.getResponse().getRawContent());
// 获取ssrc
int ssrcIndex = contentString.indexOf("y=");
// 检查是否有y字段
if (ssrcIndex >= 0) {
//ssrc规定长度为10字节不取余下长度以避免后续还有“f=”字段 TODO 后续对不规范的非10位ssrc兼容
String ssrcInResponse = contentString.substring(ssrcIndex + 2, ssrcIndex + 12);
// 查询到ssrc不一致且开启了ssrc校验则需要针对处理
if (ssrcInfo.getSsrc().equals(ssrcInResponse)) {
return;
}
logger.info("[回放消息] 收到invite 200, 发现下级自定义了ssrc: {}", ssrcInResponse );
if (!mediaServerItem.isRtpEnable() || device.isSsrcCheck()) {
logger.info("[回放消息] SSRC修正 {}->{}", ssrcInfo.getSsrc(), ssrcInResponse);
if (!mediaServerItem.getSsrcConfig().checkSsrc(ssrcInResponse)) {
// ssrc 不可用
// 释放ssrc
mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream());
eventResult.msg = "下级自定义了ssrc,但是此ssrc不可用";
eventResult.statusCode = 400;
errorEvent.response(eventResult);
return;
}
// 单端口模式streamId也有变化需要重新设置监听
if (!mediaServerItem.isRtpEnable()) {
// 添加订阅
HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed("rtp", ssrcInfo.getStream(), true, "rtsp", mediaServerItem.getId());
subscribe.removeSubscribe(hookSubscribe);
hookSubscribe.getContent().put("stream", String.format("%08x", Integer.parseInt(ssrcInResponse)).toUpperCase());
subscribe.addSubscribe(hookSubscribe, (MediaServerItem mediaServerItemInUse, JSONObject response)->{
logger.info("[ZLM HOOK] ssrc修正后收到订阅消息 " + response.toJSONString());
dynamicTask.stop(playBackTimeOutTaskKey);
// hook响应
onPublishHandlerForPlay(mediaServerItemInUse, response, device.getDeviceId(), channelId, uuid);
hookEvent.call(new InviteStreamInfo(mediaServerItem, null, eventResult.callId, "rtp", ssrcInfo.getStream()));
});
}
// 关闭rtp server
mediaServerService.closeRTPServer(device.getDeviceId(), channelId, ssrcInfo.getStream());
// 重新开启ssrc server
mediaServerService.openRTPServer(mediaServerItem, ssrcInfo.getStream(), ssrcInResponse, device.isSsrcCheck(), true, ssrcInfo.getPort());
}
}
}
redisCatchStorage.startPlayback(streamInfo, inviteStreamInfo.getCallId());
WVPResult<StreamInfo> success = WVPResult.success(streamInfo);
requestMessage.setData(success);
playBackResult.setCode(ErrorCode.SUCCESS.getCode());
playBackResult.setMsg(ErrorCode.SUCCESS.getMsg());
playBackResult.setData(requestMessage);
playBackResult.setMediaServerItem(inviteStreamInfo.getMediaServerItem());
playBackResult.setResponse(inviteStreamInfo.getResponse());
playBackCallback.call(playBackResult);
}, event -> {
dynamicTask.stop(playBackTimeOutTaskKey);
requestMessage.setData(WVPResult.fail(ErrorCode.ERROR100.getCode(), String.format("回放失败, 错误码: %s, %s", event.statusCode, event.msg)));
playBackResult.setCode(ErrorCode.ERROR100.getCode());
playBackResult.setMsg(String.format("回放失败, 错误码: %s, %s", event.statusCode, event.msg));
playBackResult.setData(requestMessage);
playBackResult.setEvent(event);
playBackCallback.call(playBackResult);
streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream());
});
}, errorEvent);
return result;
}

View File

@ -236,4 +236,6 @@ public interface IRedisCatchStorage {
void sendStreamPushRequestedMsgForStatus();
List<SendRtpItem> querySendRTPServerByChnnelId(String channelId);
List<SendRtpItem> querySendRTPServerByStream(String stream);
}

View File

@ -387,6 +387,24 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage {
return result;
}
@Override
public List<SendRtpItem> querySendRTPServerByStream(String stream) {
if (stream == null) {
return null;
}
String platformGbId = "*";
String callId = "*";
String channelId = "*";
String key = VideoManagerConstants.PLATFORM_SEND_RTP_INFO_PREFIX + userSetting.getServerId() + "_" + platformGbId
+ "_" + channelId + "_" + stream + "_" + callId;
List<Object> scan = RedisUtil.scan(key);
List<SendRtpItem> result = new ArrayList<>();
for (Object o : scan) {
result.add((SendRtpItem) RedisUtil.get((String) o));
}
return result;
}
@Override
public List<SendRtpItem> querySendRTPServer(String platformGbId) {
if (platformGbId == null) {

View File

@ -37,13 +37,13 @@
<el-form-item label="本地端口" prop="devicePort">
<el-input v-model="platform.devicePort" :disabled="true" type="number"></el-input>
</el-form-item>
<el-form-item label="SIP认证用户名" prop="username">
<el-input v-model="platform.username"></el-input>
</el-form-item>
</el-form>
</el-col>
<el-col :span="12">
<el-form ref="platform2" :rules="rules" :model="platform" label-width="160px">
<el-form-item label="SIP认证用户名" prop="username">
<el-input v-model="platform.username"></el-input>
</el-form-item>
<el-form-item label="行政区划" prop="administrativeDivision">
<el-input v-model="platform.administrativeDivision" clearable></el-input>
</el-form-item>
@ -79,7 +79,7 @@
</el-select>
</el-form-item>
<el-form-item label="目录结构" prop="treeType" >
<el-select v-model="platform.treeType" style="width: 100%" >
<el-select v-model="platform.treeType" style="width: 100%" @change="treeTypeChange">
<el-option key="WGS84" label="行政区划" value="CivilCode"></el-option>
<el-option key="GCJ02" label="业务分组" value="BusinessGroup"></el-option>
</el-select>
@ -98,6 +98,7 @@
<el-checkbox label="启用" v-model="platform.enable" @change="checkExpires"></el-checkbox>
<el-checkbox label="云台控制" v-model="platform.ptz"></el-checkbox>
<el-checkbox label="拉起离线推流" v-model="platform.startOfflinePush"></el-checkbox>
<el-checkbox label="RTCP保活" v-model="platform.rtcp" @change="rtcpCheckBoxChange"></el-checkbox>
</el-form-item>
<el-form-item>
<el-button type="primary" @click="onSubmit">{{
@ -251,21 +252,7 @@ export default {
},
onSubmit: function () {
if (this.onSubmit_text === "保存") {
this.$confirm("修改目录结构会导致关联目录与通道数据被清空", '提示', {
dangerouslyUseHTMLString: true,
confirmButtonText: '确定',
cancelButtonText: '取消',
center: true,
type: 'warning'
}).then(() => {
this.saveForm()
}).catch(() => {
});
}else {
this.saveForm()
}
this.saveForm()
},
saveForm: function (){
this.$axios({
@ -343,6 +330,22 @@ export default {
if (this.platform.enable && this.platform.expires == "0") {
this.platform.expires = "300";
}
},
rtcpCheckBoxChange: function (result){
if (result) {
this.$message({
showClose: true,
message: "开启RTCP保活需要上级平台支持可以避免无效推流",
type: "warning",
});
}
},
treeTypeChange: function (){
this.$message({
showClose: true,
message: "修改目录结构会导致关联目录与通道数据被清空,保存后生效",
type: "warning",
});
}
},
};