优化国标级联的端口保持

pull/908/head
648540858 2023-06-21 14:03:31 +08:00
parent 3350b65259
commit 7b24d51db9
20 changed files with 168 additions and 74 deletions

View File

@ -1,13 +1,14 @@
package com.genersoft.iot.vmp.gb28181.transmit.cmd.impl;
import com.alibaba.fastjson2.JSONObject;
import com.genersoft.iot.vmp.common.InviteSessionType;
import com.genersoft.iot.vmp.common.StreamInfo;
import com.genersoft.iot.vmp.conf.SipConfig;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.conf.exception.SsrcTransactionNotFoundException;
import com.genersoft.iot.vmp.gb28181.SipLayer;
import com.genersoft.iot.vmp.gb28181.bean.*;
import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.bean.DeviceAlarm;
import com.genersoft.iot.vmp.gb28181.bean.SsrcTransaction;
import com.genersoft.iot.vmp.gb28181.event.SipSubscribe;
import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager;
import com.genersoft.iot.vmp.gb28181.transmit.SIPSender;
@ -19,6 +20,7 @@ import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe;
import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory;
import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.media.zlm.dto.hook.HookParam;
import com.genersoft.iot.vmp.service.IMediaServerService;
import com.genersoft.iot.vmp.service.bean.SSRCInfo;
import com.genersoft.iot.vmp.utils.DateUtil;
@ -276,9 +278,9 @@ public class SIPCommander implements ISIPCommander {
logger.info("{} 分配的ZLM为: {} [{}:{}]", stream, mediaServerItem.getId(), mediaServerItem.getSdpIp(), ssrcInfo.getPort());
HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed("rtp", stream, true, "rtsp", mediaServerItem.getId());
subscribe.addSubscribe(hookSubscribe, (MediaServerItem mediaServerItemInUse, JSONObject json) -> {
subscribe.addSubscribe(hookSubscribe, (MediaServerItem mediaServerItemInUse, HookParam hookParam) -> {
if (event != null) {
event.response(mediaServerItemInUse, json);
event.response(mediaServerItemInUse, hookParam);
subscribe.removeSubscribe(hookSubscribe);
}
});
@ -466,9 +468,9 @@ public class SIPCommander implements ISIPCommander {
HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed("rtp", ssrcInfo.getStream(), true, "rtsp", mediaServerItem.getId());
// 添加订阅
subscribe.addSubscribe(hookSubscribe, (MediaServerItem mediaServerItemInUse, JSONObject json) -> {
subscribe.addSubscribe(hookSubscribe, (MediaServerItem mediaServerItemInUse, HookParam hookParam) -> {
if (hookEvent != null) {
hookEvent.response(mediaServerItemInUse, json);
hookEvent.response(mediaServerItemInUse, hookParam);
}
subscribe.removeSubscribe(hookSubscribe);
});
@ -569,15 +571,15 @@ public class SIPCommander implements ISIPCommander {
// 添加订阅
CallIdHeader newCallIdHeader = sipSender.getNewCallIdHeader(sipLayer.getLocalIp(device.getLocalIp()), device.getTransport());
String callId= newCallIdHeader.getCallId();
subscribe.addSubscribe(hookSubscribe, (MediaServerItem mediaServerItemInUse, JSONObject json) -> {
subscribe.addSubscribe(hookSubscribe, (mediaServerItemInUse, hookParam) -> {
logger.debug("sipc 添加订阅===callId {}",callId);
hookEvent.response(mediaServerItemInUse, json);
hookEvent.response(mediaServerItemInUse, hookParam);
subscribe.removeSubscribe(hookSubscribe);
hookSubscribe.getContent().put("regist", false);
hookSubscribe.getContent().put("schema", "rtsp");
// 添加流注销的订阅注销了后向设备发送bye
subscribe.addSubscribe(hookSubscribe,
(MediaServerItem mediaServerItemForEnd, JSONObject jsonForEnd) -> {
(mediaServerItemForEnd, hookParam1) -> {
logger.info("[录像]下载结束, 发送BYE");
try {
streamByeCmd(device, channelId, ssrcInfo.getStream(), callId);

View File

@ -15,6 +15,7 @@ import com.genersoft.iot.vmp.media.zlm.ZLMMediaListManager;
import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory;
import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe;
import com.genersoft.iot.vmp.media.zlm.dto.*;
import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam;
import com.genersoft.iot.vmp.service.IMediaServerService;
import com.genersoft.iot.vmp.service.IPlayService;
import com.genersoft.iot.vmp.service.IStreamProxyService;
@ -646,10 +647,9 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
logger.info("[ app={}, stream={} ]通道未推流,启用流后开始推流", gbStream.getApp(), gbStream.getStream());
// 监听流上线
HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed(gbStream.getApp(), gbStream.getStream(), true, "rtsp", mediaServerItem.getId());
zlmHttpHookSubscribe.addSubscribe(hookSubscribe, (mediaServerItemInUSe, responseJSON) -> {
String app = responseJSON.getString("app");
String stream = responseJSON.getString("stream");
logger.info("[上级点播]拉流代理已经就绪, {}/{}", app, stream);
zlmHttpHookSubscribe.addSubscribe(hookSubscribe, (mediaServerItemInUSe, hookParam) -> {
OnStreamChangedHookParam streamChangedHookParam = (OnStreamChangedHookParam)hookParam;
logger.info("[上级点播]拉流代理已经就绪, {}/{}", streamChangedHookParam.getApp(), streamChangedHookParam.getStream());
dynamicTask.stop(callIdHeader.getCallId());
pushProxyStream(evt, request, gbStream, platform, callIdHeader, mediaServerItem, port, tcpActive,
mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId);

View File

@ -124,14 +124,12 @@ public class ZLMHttpHookListener {
@PostMapping(value = "/on_server_keepalive", produces = "application/json;charset=UTF-8")
public HookResult onServerKeepalive(@RequestBody OnServerKeepaliveHookParam param) {
// logger.info("[ZLM HOOK] 收到zlm心跳" + param.getMediaServerId());
taskExecutor.execute(() -> {
List<ZlmHttpHookSubscribe.Event> subscribes = this.subscribe.getSubscribes(HookType.on_server_keepalive);
JSONObject json = (JSONObject) JSON.toJSON(param);
if (subscribes != null && subscribes.size() > 0) {
for (ZlmHttpHookSubscribe.Event subscribe : subscribes) {
subscribe.response(null, json);
subscribe.response(null, param);
}
}
});
@ -158,7 +156,7 @@ public class ZLMHttpHookListener {
if (subscribe != null) {
MediaServerItem mediaInfo = mediaServerService.getOne(mediaServerId);
if (mediaInfo != null) {
subscribe.response(mediaInfo, json);
subscribe.response(mediaInfo, param);
}
}
});
@ -234,7 +232,7 @@ public class ZLMHttpHookListener {
ZlmHttpHookSubscribe.Event subscribe = this.subscribe.sendNotify(HookType.on_publish, json);
if (subscribe != null) {
if (mediaInfo != null) {
subscribe.response(mediaInfo, json);
subscribe.response(mediaInfo, param);
} else {
new HookResultForOnPublish(1, "zlm not register");
}
@ -306,7 +304,7 @@ public class ZLMHttpHookListener {
return;
}
if (subscribe != null) {
subscribe.response(mediaInfo, json);
subscribe.response(mediaInfo, param);
}
List<OnStreamChangedHookParam.MediaTrack> tracks = param.getTracks();
@ -649,7 +647,7 @@ public class ZLMHttpHookListener {
List<ZlmHttpHookSubscribe.Event> subscribes = this.subscribe.getSubscribes(HookType.on_server_started);
if (subscribes != null && subscribes.size() > 0) {
for (ZlmHttpHookSubscribe.Event subscribe : subscribes) {
subscribe.response(null, jsonObject);
subscribe.response(null, zlmServerConfig);
}
}
mediaServerService.zlmServerOnline(zlmServerConfig);
@ -704,7 +702,7 @@ public class ZLMHttpHookListener {
List<ZlmHttpHookSubscribe.Event> subscribes = this.subscribe.getSubscribes(HookType.on_rtp_server_timeout);
if (subscribes != null && subscribes.size() > 0) {
for (ZlmHttpHookSubscribe.Event subscribe : subscribes) {
subscribe.response(null, json);
subscribe.response(null, param);
}
}
});

View File

@ -270,6 +270,11 @@ public class ZLMRESTfulUtils {
}
public JSONObject openRtpServer(MediaServerItem mediaServerItem, Map<String, Object> param){
System.out.println("==============openRtpServer=================");
for (String s : param.keySet()) {
System.out.println(s + "-->" + param.get(s));
}
System.out.println("===============================");
return sendPost(mediaServerItem, "openRtpServer",param, null);
}

View File

@ -9,6 +9,8 @@ import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory;
import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForRtpServerTimeout;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.media.zlm.dto.hook.HookParam;
import com.genersoft.iot.vmp.media.zlm.dto.hook.OnRtpServerTimeoutHookParam;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@ -225,7 +227,8 @@ public class ZLMRTPServerFactory {
int localPort = 0;
if (userSetting.getGbSendStreamStrict()) {
if (userSetting.getGbSendStreamStrict()) {
localPort = keepPort(serverItem, ssrc);
System.out.println("createSendRtpItem1");
localPort = keepPort(serverItem, ssrc, localPort);
if (localPort == 0) {
return null;
}
@ -261,7 +264,8 @@ public class ZLMRTPServerFactory {
// 默认为随机端口
int localPort = 0;
if (userSetting.getGbSendStreamStrict()) {
localPort = keepPort(serverItem, ssrc);
System.out.println("createSendRtpItem2");
localPort = keepPort(serverItem, ssrc, localPort);
if (localPort == 0) {
return null;
}
@ -285,30 +289,37 @@ public class ZLMRTPServerFactory {
/**
*
*/
public int keepPort(MediaServerItem serverItem, String ssrc) {
int localPort = 0;
public int keepPort(MediaServerItem serverItem, String ssrc, Integer localPort) {
Map<String, Object> param = new HashMap<>(3);
param.put("port", 0);
param.put("port", localPort);
param.put("enable_tcp", 1);
param.put("stream_id", ssrc);
System.out.println("用于收流");
JSONObject jsonObject = zlmresTfulUtils.openRtpServer(serverItem, param);
System.out.println(jsonObject);
if (jsonObject.getInteger("code") == 0) {
localPort = jsonObject.getInteger("port");
HookSubscribeForRtpServerTimeout hookSubscribeForRtpServerTimeout = HookSubscribeFactory.on_rtp_server_timeout(ssrc, null, serverItem.getId());
Integer finalLocalPort = localPort;
hookSubscribe.addSubscribe(hookSubscribeForRtpServerTimeout,
(MediaServerItem mediaServerItem, JSONObject response)->{
logger.info("[上级点播] {}->监听端口到期继续保持监听", ssrc);
int port = keepPort(serverItem, ssrc);
(MediaServerItem mediaServerItem, HookParam hookParam)->{
logger.info("[上级点播] {}->监听端口到期继续保持监听: {}", ssrc, finalLocalPort);
OnRtpServerTimeoutHookParam rtpServerTimeoutHookParam = (OnRtpServerTimeoutHookParam) hookParam;
if (!ssrc.equals(rtpServerTimeoutHookParam.getSsrc())) {
return;
}
int port = keepPort(serverItem, ssrc, finalLocalPort);
if (port == 0) {
logger.info("[上级点播] {}->监听端口失败,移除监听", ssrc);
hookSubscribe.removeSubscribe(hookSubscribeForRtpServerTimeout);
}
});
logger.info("[上级点播] {}->监听端口: {}", ssrc, localPort);
return localPort;
}else {
logger.info("[上级点播] 监听端口失败: {}", ssrc);
logger.info("[上级点播] 监听端口失败: {}->{}", ssrc, localPort);
return 0;
}
return localPort;
}
/**

View File

@ -65,8 +65,8 @@ public class ZLMRunner implements CommandLineRunner {
HookSubscribeForServerStarted hookSubscribeForServerStarted = HookSubscribeFactory.on_server_started();
// 订阅 zlm启动事件, 新的zlm也会从这里进入系统
hookSubscribe.addSubscribe(hookSubscribeForServerStarted,
(MediaServerItem mediaServerItem, JSONObject response)->{
ZLMServerConfig zlmServerConfig = response.to(ZLMServerConfig.class);
(mediaServerItem, hookParam)->{
ZLMServerConfig zlmServerConfig = (ZLMServerConfig)hookParam;
if (zlmServerConfig !=null ) {
if (startGetMedia != null) {
startGetMedia.remove(zlmServerConfig.getGeneralMediaServerId());

View File

@ -1,8 +1,9 @@
package com.genersoft.iot.vmp.media.zlm;
import com.alibaba.fastjson2.annotation.JSONField;
import com.genersoft.iot.vmp.media.zlm.dto.hook.HookParam;
public class ZLMServerConfig {
public class ZLMServerConfig extends HookParam {
@JSONField(name = "api.apiDebug")
private String apiDebug;

View File

@ -4,6 +4,7 @@ import com.alibaba.fastjson2.JSONObject;
import com.genersoft.iot.vmp.media.zlm.dto.HookType;
import com.genersoft.iot.vmp.media.zlm.dto.IHookSubscribe;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.media.zlm.dto.hook.HookParam;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.annotation.Scheduled;
@ -26,7 +27,7 @@ public class ZlmHttpHookSubscribe {
@FunctionalInterface
public interface Event{
void response(MediaServerItem mediaServerItem, JSONObject response);
void response(MediaServerItem mediaServerItem, HookParam hookParam);
}
private Map<HookType, Map<IHookSubscribe, ZlmHttpHookSubscribe.Event>> allSubscribes = new ConcurrentHashMap<>();

View File

@ -50,4 +50,6 @@ public class HookResultForOnPublish extends HookResult{
public void setMp4_save_path(String mp4_save_path) {
this.mp4_save_path = mp4_save_path;
}
}

View File

@ -81,6 +81,15 @@ public class OnPlayHookParam extends HookParam{
@Override
public String toString() {
return String.format("%s://%s:%s/%s/%s?%s", schema, ip, port, app, stream, params);
return "OnPlayHookParam{" +
"id='" + id + '\'' +
", app='" + app + '\'' +
", stream='" + stream + '\'' +
", ip='" + ip + '\'' +
", params='" + params + '\'' +
", port=" + port +
", schema='" + schema + '\'' +
", vhost='" + vhost + '\'' +
'}';
}
}

View File

@ -81,6 +81,15 @@ public class OnPublishHookParam extends HookParam{
@Override
public String toString() {
return String.format("%s://%s:%s/%s/%s?%s", schema, ip, port, app, stream, params);
return "OnPublishHookParam{" +
"id='" + id + '\'' +
", app='" + app + '\'' +
", stream='" + stream + '\'' +
", ip='" + ip + '\'' +
", params='" + params + '\'' +
", port=" + port +
", schema='" + schema + '\'' +
", vhost='" + vhost + '\'' +
'}';
}
}

View File

@ -50,4 +50,15 @@ public class OnRtpServerTimeoutHookParam extends HookParam{
public void setSsrc(String ssrc) {
this.ssrc = ssrc;
}
@Override
public String toString() {
return "OnRtpServerTimeoutHookParam{" +
"local_port=" + local_port +
", stream_id='" + stream_id + '\'' +
", tcpMode=" + tcpMode +
", re_use_port=" + re_use_port +
", ssrc='" + ssrc + '\'' +
'}';
}
}

View File

@ -24,4 +24,12 @@ public class OnSendRtpStoppedHookParam extends HookParam{
public void setStream(String stream) {
this.stream = stream;
}
@Override
public String toString() {
return "OnSendRtpStoppedHookParam{" +
"app='" + app + '\'' +
", stream='" + stream + '\'' +
'}';
}
}

View File

@ -17,4 +17,11 @@ public class OnServerKeepaliveHookParam extends HookParam{
public void setData(ServerKeepaliveData data) {
this.data = data;
}
@Override
public String toString() {
return "OnServerKeepaliveHookParam{" +
"data=" + data +
'}';
}
}

View File

@ -430,4 +430,14 @@ public class OnStreamChangedHookParam extends HookParam{
public void setCallId(String callId) {
this.callId = callId;
}
@Override
public String toString() {
return "OnStreamChangedHookParam{" +
"regist=" + regist +
", app='" + app + '\'' +
", stream='" + stream + '\'' +
", severId='" + severId + '\'' +
'}';
}
}

View File

@ -38,4 +38,14 @@ public class OnStreamNoneReaderHookParam extends HookParam{
public void setVhost(String vhost) {
this.vhost = vhost;
}
@Override
public String toString() {
return "OnStreamNoneReaderHookParam{" +
"schema='" + schema + '\'' +
", app='" + app + '\'' +
", stream='" + stream + '\'' +
", vhost='" + vhost + '\'' +
'}';
}
}

View File

@ -81,6 +81,15 @@ public class OnStreamNotFoundHookParam extends HookParam{
@Override
public String toString() {
return String.format("%s://%s:%s/%s/%s?%s", schema, ip, port, app, stream, params);
return "OnStreamNotFoundHookParam{" +
"id='" + id + '\'' +
", app='" + app + '\'' +
", stream='" + stream + '\'' +
", ip='" + ip + '\'' +
", params='" + params + '\'' +
", port=" + port +
", schema='" + schema + '\'' +
", vhost='" + vhost + '\'' +
'}';
}
}

View File

@ -1,6 +1,5 @@
package com.genersoft.iot.vmp.service.impl;
import com.alibaba.fastjson2.JSONArray;
import com.alibaba.fastjson2.JSONObject;
import com.genersoft.iot.vmp.common.InviteInfo;
import com.genersoft.iot.vmp.common.InviteSessionStatus;
@ -25,6 +24,8 @@ import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe;
import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory;
import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.media.zlm.dto.hook.HookParam;
import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam;
import com.genersoft.iot.vmp.service.*;
import com.genersoft.iot.vmp.service.bean.ErrorCallback;
import com.genersoft.iot.vmp.service.bean.InviteErrorCode;
@ -321,11 +322,11 @@ public class PlayServiceImpl implements IPlayService {
}, userSetting.getPlayTimeout());
try {
cmder.playStreamCmd(mediaServerItem, ssrcInfo, device, channelId,isSubStream, (MediaServerItem mediaServerItemInuse, JSONObject response) -> {
logger.info("收到订阅消息: " + response.toJSONString());
cmder.playStreamCmd(mediaServerItem, ssrcInfo, device, channelId,isSubStream, (mediaServerItemInuse, hookParam ) -> {
logger.info("收到订阅消息: " + hookParam);
dynamicTask.stop(timeOutTaskKey);
// hook响应
StreamInfo streamInfo = onPublishHandlerForPlay(mediaServerItemInuse, response, device.getDeviceId(), channelId,isSubStream);
StreamInfo streamInfo = onPublishHandlerForPlay(mediaServerItemInuse, hookParam, device.getDeviceId(), channelId,isSubStream);
if (streamInfo == null){
callback.run(InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getCode(),
InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getMsg(), null);
@ -438,11 +439,11 @@ public class PlayServiceImpl implements IPlayService {
String stream = String.format("%08x", Integer.parseInt(ssrcInResponse)).toUpperCase();
hookSubscribe.getContent().put("stream", stream);
inviteInfo.setStream(stream);
subscribe.addSubscribe(hookSubscribe, (MediaServerItem mediaServerItemInUse, JSONObject response) -> {
logger.info("[ZLM HOOK] ssrc修正后收到订阅消息 " + response.toJSONString());
subscribe.addSubscribe(hookSubscribe, (mediaServerItemInUse, hookParam) -> {
logger.info("[ZLM HOOK] ssrc修正后收到订阅消息 " + hookParam);
dynamicTask.stop(timeOutTaskKey);
// hook响应
StreamInfo streamInfo = onPublishHandlerForPlay(mediaServerItemInUse, response, device.getDeviceId(), channelId,isSubStream);
StreamInfo streamInfo = onPublishHandlerForPlay(mediaServerItemInUse, hookParam, device.getDeviceId(), channelId,isSubStream);
if (streamInfo == null){
callback.run(InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getCode(),
InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getMsg(), null);
@ -568,13 +569,14 @@ public class PlayServiceImpl implements IPlayService {
}
}
private StreamInfo onPublishHandlerForPlay(MediaServerItem mediaServerItem, JSONObject response, String deviceId, String channelId,boolean isSubStream) {
private StreamInfo onPublishHandlerForPlay(MediaServerItem mediaServerItem, HookParam hookParam, String deviceId, String channelId, boolean isSubStream) {
StreamInfo streamInfo = null;
Device device = redisCatchStorage.getDevice(deviceId);
OnStreamChangedHookParam streamChangedHookParam = (OnStreamChangedHookParam)hookParam;
if( device.isSwitchPrimarySubStream() ){
streamInfo = onPublishHandler(mediaServerItem, response, deviceId, channelId,isSubStream);
streamInfo = onPublishHandler(mediaServerItem, streamChangedHookParam, deviceId, channelId,isSubStream);
}else {
streamInfo = onPublishHandler(mediaServerItem, response, deviceId, channelId);
streamInfo = onPublishHandler(mediaServerItem, streamChangedHookParam, deviceId, channelId);
}
if (streamInfo != null) {
InviteInfo inviteInfo;
@ -603,9 +605,9 @@ public class PlayServiceImpl implements IPlayService {
}
private StreamInfo onPublishHandlerForPlayback(MediaServerItem mediaServerItem, JSONObject response, String deviceId, String channelId, String startTime, String endTime) {
StreamInfo streamInfo = onPublishHandler(mediaServerItem, response, deviceId, channelId);
private StreamInfo onPublishHandlerForPlayback(MediaServerItem mediaServerItem, HookParam param, String deviceId, String channelId, String startTime, String endTime) {
OnStreamChangedHookParam streamChangedHookParam = (OnStreamChangedHookParam) param;
StreamInfo streamInfo = onPublishHandler(mediaServerItem, streamChangedHookParam, deviceId, channelId);
if (streamInfo != null) {
streamInfo.setStartTime(startTime);
streamInfo.setEndTime(endTime);
@ -724,10 +726,10 @@ public class PlayServiceImpl implements IPlayService {
inviteStreamService.removeInviteInfo(inviteInfo);
};
ZlmHttpHookSubscribe.Event hookEvent = (mediaServerItemInuse, jsonObject) -> {
logger.info("收到回放订阅消息: " + jsonObject);
ZlmHttpHookSubscribe.Event hookEvent = (mediaServerItemInuse, hookParam) -> {
logger.info("收到回放订阅消息: " + hookParam);
dynamicTask.stop(playBackTimeOutTaskKey);
StreamInfo streamInfo = onPublishHandlerForPlayback(mediaServerItemInuse, jsonObject, deviceId, channelId, startTime, endTime);
StreamInfo streamInfo = onPublishHandlerForPlayback(mediaServerItemInuse, hookParam, deviceId, channelId, startTime, endTime);
if (streamInfo == null) {
logger.warn("设备回放API调用失败");
callback.run(InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getCode(),
@ -804,11 +806,11 @@ public class PlayServiceImpl implements IPlayService {
String stream = String.format("%08x", Integer.parseInt(ssrcInResponse)).toUpperCase();
hookSubscribe.getContent().put("stream", stream);
inviteInfo.setStream(stream);
subscribe.addSubscribe(hookSubscribe, (MediaServerItem mediaServerItemInUse, JSONObject response) -> {
logger.info("[ZLM HOOK] ssrc修正后收到订阅消息 " + response.toJSONString());
subscribe.addSubscribe(hookSubscribe, (mediaServerItemInUse, hookParam) -> {
logger.info("[ZLM HOOK] ssrc修正后收到订阅消息 " + hookParam);
dynamicTask.stop(playBackTimeOutTaskKey);
// hook响应
hookEvent.response(mediaServerItemInUse, response);
hookEvent.response(mediaServerItemInUse, hookParam);
});
}
// 更新ssrc
@ -920,10 +922,10 @@ public class PlayServiceImpl implements IPlayService {
streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream());
inviteStreamService.removeInviteInfo(inviteInfo);
};
ZlmHttpHookSubscribe.Event hookEvent = (mediaServerItemInuse, jsonObject) -> {
logger.info("[录像下载]收到订阅消息: " + jsonObject);
ZlmHttpHookSubscribe.Event hookEvent = (mediaServerItemInuse, hookParam) -> {
logger.info("[录像下载]收到订阅消息: " + hookParam);
dynamicTask.stop(downLoadTimeOutTaskKey);
StreamInfo streamInfo = onPublishHandlerForDownload(mediaServerItemInuse, jsonObject, deviceId, channelId, startTime, endTime);
StreamInfo streamInfo = onPublishHandlerForDownload(mediaServerItemInuse, hookParam, deviceId, channelId, startTime, endTime);
if (streamInfo == null) {
logger.warn("[录像下载] 获取流地址信息失败");
callback.run(InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getCode(),
@ -997,10 +999,10 @@ public class PlayServiceImpl implements IPlayService {
HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed("rtp", ssrcInfo.getStream(), true, "rtsp", mediaServerItem.getId());
subscribe.removeSubscribe(hookSubscribe);
hookSubscribe.getContent().put("stream", String.format("%08x", Integer.parseInt(ssrcInResponse)).toUpperCase());
subscribe.addSubscribe(hookSubscribe, (MediaServerItem mediaServerItemInUse, JSONObject response) -> {
logger.info("[ZLM HOOK] ssrc修正后收到订阅消息 " + response.toJSONString());
subscribe.addSubscribe(hookSubscribe, (mediaServerItemInUse, hookParam) -> {
logger.info("[ZLM HOOK] ssrc修正后收到订阅消息 " + hookParam);
dynamicTask.stop(downLoadTimeOutTaskKey);
hookEvent.response(mediaServerItemInUse, response);
hookEvent.response(mediaServerItemInUse, hookParam);
});
}
@ -1090,8 +1092,9 @@ public class PlayServiceImpl implements IPlayService {
return null;
}
private StreamInfo onPublishHandlerForDownload(MediaServerItem mediaServerItemInuse, JSONObject response, String deviceId, String channelId, String startTime, String endTime) {
StreamInfo streamInfo = onPublishHandler(mediaServerItemInuse, response, deviceId, channelId);
private StreamInfo onPublishHandlerForDownload(MediaServerItem mediaServerItemInuse, HookParam hookParam, String deviceId, String channelId, String startTime, String endTime) {
OnStreamChangedHookParam streamChangedHookParam = (OnStreamChangedHookParam) hookParam;
StreamInfo streamInfo = onPublishHandler(mediaServerItemInuse, streamChangedHookParam, deviceId, channelId);
if (streamInfo != null) {
streamInfo.setProgress(0);
streamInfo.setStartTime(startTime);
@ -1108,10 +1111,8 @@ public class PlayServiceImpl implements IPlayService {
}
public StreamInfo onPublishHandler(MediaServerItem mediaServerItem, JSONObject resonse, String deviceId, String channelId) {
String streamId = resonse.getString("stream");
JSONArray tracks = resonse.getJSONArray("tracks");
StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStream(mediaServerItem, "rtp", streamId, tracks, null);
public StreamInfo onPublishHandler(MediaServerItem mediaServerItem, OnStreamChangedHookParam hookParam, String deviceId, String channelId) {
StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStream(mediaServerItem, "rtp", hookParam.getStream(), hookParam.getTracks(), null);
streamInfo.setDeviceID(deviceId);
streamInfo.setChannelId(channelId);
return streamInfo;
@ -1307,9 +1308,9 @@ public class PlayServiceImpl implements IPlayService {
/*======================设备主子码流逻辑START=========================*/
public StreamInfo onPublishHandler(MediaServerItem mediaServerItem, JSONObject resonse, String deviceId, String channelId,boolean isSubStream) {
String streamId = resonse.getString("stream");
JSONArray tracks = resonse.getJSONArray("tracks");
public StreamInfo onPublishHandler(MediaServerItem mediaServerItem, OnStreamChangedHookParam hookParam, String deviceId, String channelId,boolean isSubStream) {
String streamId = hookParam.getStream();
List<OnStreamChangedHookParam.MediaTrack> tracks = hookParam.getTracks();
StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStream(mediaServerItem, "rtp", streamId, tracks, null);
streamInfo.setDeviceID(deviceId);
streamInfo.setChannelId(channelId);

View File

@ -289,7 +289,7 @@ public class RedisGbPlayMsgListener implements MessageListener {
// 添加订阅
HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed(content.getApp(), content.getStream(), true, "rtsp", mediaServerItem.getId());
subscribe.addSubscribe(hookSubscribe, (MediaServerItem mediaServerItemInUse, JSONObject json)->{
subscribe.addSubscribe(hookSubscribe, (mediaServerItemInUse, hookParam)->{
dynamicTask.stop(taskKey);
responseSendItem(mediaServerItem, content, toId, serial);
});

View File

@ -2,4 +2,4 @@ spring:
application:
name: wvp
profiles:
active: dev
active: local