diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java index 0097ce0a..feb66b47 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java @@ -10,6 +10,9 @@ import com.genersoft.iot.vmp.gb28181.event.SipSubscribe; import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager; import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander; import com.genersoft.iot.vmp.gb28181.transmit.cmd.SIPRequestHeaderProvider; +import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory; +import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange; +import com.genersoft.iot.vmp.media.zlm.dto.HookType; import com.genersoft.iot.vmp.utils.DateUtil; import com.genersoft.iot.vmp.gb28181.utils.NumericUtil; import com.genersoft.iot.vmp.media.zlm.ZLMHttpHookSubscribe; @@ -348,25 +351,19 @@ public class SIPCommander implements ISIPCommander { @Override public void playStreamCmd(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, Device device, String channelId, ZLMHttpHookSubscribe.Event event, SipSubscribe.Event okEvent, SipSubscribe.Event errorEvent) { - String streamId = ssrcInfo.getStream(); + String stream = ssrcInfo.getStream(); try { if (device == null) { return; } String streamMode = device.getStreamMode().toUpperCase(); - logger.info("{} 分配的ZLM为: {} [{}:{}]", streamId, mediaServerItem.getId(), mediaServerItem.getIp(), ssrcInfo.getPort()); - // 添加订阅 - JSONObject subscribeKey = new JSONObject(); - subscribeKey.put("app", "rtp"); - subscribeKey.put("stream", streamId); - subscribeKey.put("regist", true); - subscribeKey.put("schema", "rtmp"); - subscribeKey.put("mediaServerId", mediaServerItem.getId()); - subscribe.addSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, subscribeKey, - (MediaServerItem mediaServerItemInUse, JSONObject json)->{ + logger.info("{} 分配的ZLM为: {} [{}:{}]", stream, mediaServerItem.getId(), mediaServerItem.getIp(), ssrcInfo.getPort()); + HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed("rtp", stream, true, "rtmp", mediaServerItem.getId()); + subscribe.addSubscribe(hookSubscribe, (MediaServerItem mediaServerItemInUse, JSONObject json)->{ if (event != null) { event.response(mediaServerItemInUse, json); + subscribe.removeSubscribe(hookSubscribe); } }); // @@ -440,7 +437,7 @@ public class SIPCommander implements ISIPCommander { errorEvent.response(e); }), e ->{ // 这里为例避免一个通道的点播只有一个callID这个参数使用一个固定值 - streamSession.put(device.getDeviceId(), channelId ,"play", streamId, ssrcInfo.getSsrc(), mediaServerItem.getId(), ((ResponseEvent)e.event).getClientTransaction(), VideoStreamSessionManager.SessionType.play); + streamSession.put(device.getDeviceId(), channelId ,"play", stream, ssrcInfo.getSsrc(), mediaServerItem.getId(), ((ResponseEvent)e.event).getClientTransaction(), VideoStreamSessionManager.SessionType.play); streamSession.put(device.getDeviceId(), channelId ,"play", e.dialog); okEvent.response(e); }); @@ -530,21 +527,14 @@ public class SIPCommander implements ISIPCommander { CallIdHeader callIdHeader = device.getTransport().equals("TCP") ? tcpSipProvider.getNewCallId() : udpSipProvider.getNewCallId(); - + HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed("rtp", ssrcInfo.getStream(), true, "rtmp", mediaServerItem.getId()); // 添加订阅 - JSONObject subscribeKey = new JSONObject(); - subscribeKey.put("app", "rtp"); - subscribeKey.put("stream", ssrcInfo.getStream()); - subscribeKey.put("regist", true); - subscribeKey.put("schema", "rtmp"); - subscribeKey.put("mediaServerId", mediaServerItem.getId()); - logger.debug("录像回放添加订阅,订阅内容:" + subscribeKey); - subscribe.addSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, subscribeKey, - (MediaServerItem mediaServerItemInUse, JSONObject json)->{ + subscribe.addSubscribe(hookSubscribe, (MediaServerItem mediaServerItemInUse, JSONObject json)->{ if (hookEvent != null) { InviteStreamInfo inviteStreamInfo = new InviteStreamInfo(mediaServerItemInUse, json, callIdHeader.getCallId(), "rtp", ssrcInfo.getStream()); hookEvent.call(inviteStreamInfo); } + subscribe.removeSubscribe(hookSubscribe); }); Request request = headerProvider.createPlaybackInviteRequest(device, channelId, content.toString(), null, "fromplybck" + tm, null, callIdHeader, ssrcInfo.getSsrc()); @@ -643,21 +633,15 @@ public class SIPCommander implements ISIPCommander { CallIdHeader callIdHeader = device.getTransport().equals("TCP") ? tcpSipProvider.getNewCallId() : udpSipProvider.getNewCallId(); + HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed("rtp", ssrcInfo.getStream(), true, null, mediaServerItem.getId()); // 添加订阅 - JSONObject subscribeKey = new JSONObject(); - subscribeKey.put("app", "rtp"); - subscribeKey.put("stream", ssrcInfo.getStream()); - subscribeKey.put("regist", true); - subscribeKey.put("mediaServerId", mediaServerItem.getId()); - logger.debug("录像回放添加订阅,订阅内容:" + subscribeKey.toString()); - subscribe.addSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, subscribeKey, - (MediaServerItem mediaServerItemInUse, JSONObject json)->{ + subscribe.addSubscribe(hookSubscribe, (MediaServerItem mediaServerItemInUse, JSONObject json)->{ hookEvent.call(new InviteStreamInfo(mediaServerItem, json, callIdHeader.getCallId(), "rtp", ssrcInfo.getStream())); - subscribe.removeSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, subscribeKey); - subscribeKey.put("regist", false); - subscribeKey.put("schema", "rtmp"); + subscribe.removeSubscribe(hookSubscribe); + hookSubscribe.getContent().put("regist", false); + hookSubscribe.getContent().put("schema", "rtmp"); // 添加流注销的订阅,注销了后向设备发送bye - subscribe.addSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, subscribeKey, + subscribe.addSubscribe(hookSubscribe, (MediaServerItem mediaServerItemForEnd, JSONObject jsonForEnd)->{ ClientTransaction transaction = streamSession.getTransaction(device.getDeviceId(), channelId, ssrcInfo.getStream(), callIdHeader.getCallId()); if (transaction != null) { diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java index b1f0fecb..18654dd1 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java @@ -102,7 +102,7 @@ public class ZLMHttpHookListener { logger.debug("[ ZLM HOOK ] on_server_keepalive API调用,参数:" + json.toString()); } String mediaServerId = json.getString("mediaServerId"); - List subscribes = this.subscribe.getSubscribes(ZLMHttpHookSubscribe.HookType.on_server_keepalive); + List subscribes = this.subscribe.getSubscribes(HookType.on_server_keepalive); if (subscribes != null && subscribes.size() > 0) { for (ZLMHttpHookSubscribe.Event subscribe : subscribes) { subscribe.response(null, json); @@ -168,7 +168,7 @@ public class ZLMHttpHookListener { logger.debug("[ ZLM HOOK ]on_play API调用,参数:" + JSON.toJSONString(param)); } String mediaServerId = param.getMediaServerId(); - ZLMHttpHookSubscribe.Event subscribe = this.subscribe.getSubscribe(ZLMHttpHookSubscribe.HookType.on_play, json); + ZLMHttpHookSubscribe.Event subscribe = this.subscribe.sendNotify(HookType.on_play, json); if (subscribe != null ) { MediaServerItem mediaInfo = mediaServerService.getOne(mediaServerId); if (mediaInfo != null) { @@ -253,7 +253,7 @@ public class ZLMHttpHookListener { } - ZLMHttpHookSubscribe.Event subscribe = this.subscribe.getSubscribe(ZLMHttpHookSubscribe.HookType.on_publish, json); + ZLMHttpHookSubscribe.Event subscribe = this.subscribe.sendNotify(HookType.on_publish, json); if (subscribe != null) { if (mediaInfo != null) { subscribe.response(mediaInfo, json); @@ -377,7 +377,7 @@ public class ZLMHttpHookListener { logger.debug("[ ZLM HOOK ]on_shell_login API调用,参数:" + json.toString()); } String mediaServerId = json.getString("mediaServerId"); - ZLMHttpHookSubscribe.Event subscribe = this.subscribe.getSubscribe(ZLMHttpHookSubscribe.HookType.on_shell_login, json); + ZLMHttpHookSubscribe.Event subscribe = this.subscribe.sendNotify(HookType.on_shell_login, json); if (subscribe != null ) { MediaServerItem mediaInfo = mediaServerService.getOne(mediaServerId); if (mediaInfo != null) { @@ -403,7 +403,7 @@ public class ZLMHttpHookListener { logger.info("[ ZLM HOOK ]on_stream_changed API调用,参数:" + JSONObject.toJSONString(item)); String mediaServerId = item.getMediaServerId(); JSONObject json = (JSONObject) JSON.toJSON(item); - ZLMHttpHookSubscribe.Event subscribe = this.subscribe.getSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, json); + ZLMHttpHookSubscribe.Event subscribe = this.subscribe.sendNotify(HookType.on_stream_changed, json); if (subscribe != null ) { MediaServerItem mediaInfo = mediaServerService.getOne(mediaServerId); if (mediaInfo != null) { @@ -614,7 +614,7 @@ public class ZLMHttpHookListener { } String remoteAddr = request.getRemoteAddr(); jsonObject.put("ip", remoteAddr); - List subscribes = this.subscribe.getSubscribes(ZLMHttpHookSubscribe.HookType.on_server_started); + List subscribes = this.subscribe.getSubscribes(HookType.on_server_started); if (subscribes != null && subscribes.size() > 0) { for (ZLMHttpHookSubscribe.Event subscribe : subscribes) { subscribe.response(null, jsonObject); diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookSubscribe.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookSubscribe.java index ffd8ec90..57b6d81f 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookSubscribe.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookSubscribe.java @@ -1,12 +1,16 @@ package com.genersoft.iot.vmp.media.zlm; import com.alibaba.fastjson.JSONObject; +import com.genersoft.iot.vmp.media.zlm.dto.HookType; +import com.genersoft.iot.vmp.media.zlm.dto.IHookSubscribe; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import org.springframework.stereotype.Component; import org.springframework.util.CollectionUtils; +import java.time.Instant; import java.util.*; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; /** * @description:针对 ZLMediaServer的hook事件订阅 @@ -16,51 +20,39 @@ import java.util.concurrent.ConcurrentHashMap; @Component public class ZLMHttpHookSubscribe { - public enum HookType{ - on_flow_report, - on_http_access, - on_play, - on_publish, - on_record_mp4, - on_rtsp_auth, - on_rtsp_realm, - on_shell_login, - on_stream_changed, - on_stream_none_reader, - on_stream_not_found, - on_server_started, - on_server_keepalive - } - @FunctionalInterface public interface Event{ void response(MediaServerItem mediaServerItem, JSONObject response); } - private Map> allSubscribes = new ConcurrentHashMap<>(); + private Map> allSubscribes = new ConcurrentHashMap<>(); - public void addSubscribe(HookType type, JSONObject hookResponse, ZLMHttpHookSubscribe.Event event) { - allSubscribes.computeIfAbsent(type, k -> new ConcurrentHashMap<>()).put(hookResponse, event); + public void addSubscribe(IHookSubscribe hookSubscribe, ZLMHttpHookSubscribe.Event event) { + if (hookSubscribe.getExpires() == null) { + // 默认5分钟过期 + Instant expiresInstant = Instant.now().plusSeconds(TimeUnit.MINUTES.toSeconds(5)); + hookSubscribe.setExpires(expiresInstant); + } + allSubscribes.computeIfAbsent(hookSubscribe.getHookType(), k -> new ConcurrentHashMap<>()).put(hookSubscribe, event); } - public ZLMHttpHookSubscribe.Event getSubscribe(HookType type, JSONObject hookResponse) { + public ZLMHttpHookSubscribe.Event sendNotify(HookType type, JSONObject hookResponse) { ZLMHttpHookSubscribe.Event event= null; - Map eventMap = allSubscribes.get(type); + Map eventMap = allSubscribes.get(type); if (eventMap == null) { return null; } - for (JSONObject key : eventMap.keySet()) { + for (IHookSubscribe key : eventMap.keySet()) { Boolean result = null; - for (String s : key.keySet()) { + for (String s : key.getContent().keySet()) { if (result == null) { - result = key.getString(s).equals(hookResponse.getString(s)); + result = key.getContent().getString(s).equals(hookResponse.getString(s)); }else { - if (key.getString(s) == null) { + if (key.getContent().getString(s) == null) { continue; } - result = result && key.getString(s).equals(hookResponse.getString(s)); + result = result && key.getContent().getString(s).equals(hookResponse.getString(s)); } - } if (null != result && result) { event = eventMap.get(key); @@ -69,26 +61,30 @@ public class ZLMHttpHookSubscribe { return event; } - public void removeSubscribe(HookType type, JSONObject hookResponse) { - Map eventMap = allSubscribes.get(type); + public void removeSubscribe(IHookSubscribe hookSubscribe) { + Map eventMap = allSubscribes.get(hookSubscribe.getHookType()); if (eventMap == null) { return; } - Set> entries = eventMap.entrySet(); + Set> entries = eventMap.entrySet(); if (entries.size() > 0) { - List> entriesToRemove = new ArrayList<>(); - for (Map.Entry entry : entries) { - JSONObject key = entry.getKey(); + List> entriesToRemove = new ArrayList<>(); + for (Map.Entry entry : entries) { + JSONObject content = entry.getKey().getContent(); + if (content == null || content.size() == 0) { + entriesToRemove.add(entry); + continue; + } Boolean result = null; - for (String s : key.keySet()) { + for (String s : content.keySet()) { if (result == null) { - result = key.getString(s).equals(hookResponse.getString(s)); + result = content.getString(s).equals(hookSubscribe.getContent().getString(s)); }else { - if (key.getString(s) == null) { + if (content.getString(s) == null) { continue; } - result = result && key.getString(s).equals(hookResponse.getString(s)); + result = result && content.getString(s).equals(hookSubscribe.getContent().getString(s)); } } if (null != result && result){ @@ -97,7 +93,7 @@ public class ZLMHttpHookSubscribe { } if (!CollectionUtils.isEmpty(entriesToRemove)) { - for (Map.Entry entry : entriesToRemove) { + for (Map.Entry entry : entriesToRemove) { entries.remove(entry); } } @@ -111,17 +107,25 @@ public class ZLMHttpHookSubscribe { * @return */ public List getSubscribes(HookType type) { - // ZLMHttpHookSubscribe.Event event= null; - Map eventMap = allSubscribes.get(type); + Map eventMap = allSubscribes.get(type); if (eventMap == null) { return null; } List result = new ArrayList<>(); - for (JSONObject key : eventMap.keySet()) { + for (IHookSubscribe key : eventMap.keySet()) { result.add(eventMap.get(key)); } return result; } + public List getAll(){ + ArrayList result = new ArrayList<>(); + Collection> values = allSubscribes.values(); + for (Map value : values) { + result.addAll(value.keySet()); + } + return result; + } + } diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRunner.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRunner.java index 1bfb7300..b24d0a1a 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRunner.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRunner.java @@ -6,22 +6,22 @@ import com.alibaba.fastjson.JSONObject; import com.genersoft.iot.vmp.conf.DynamicTask; import com.genersoft.iot.vmp.conf.MediaConfig; import com.genersoft.iot.vmp.gb28181.event.EventPublisher; +import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory; +import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForServerStarted; +import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import com.genersoft.iot.vmp.service.IMediaServerService; -import com.genersoft.iot.vmp.service.IStreamProxyService; -import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.boot.CommandLineRunner; import org.springframework.core.annotation.Order; import org.springframework.scheduling.annotation.Async; -import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.stereotype.Component; -import org.springframework.util.StringUtils; +import java.time.Instant; import java.util.*; +import java.util.concurrent.TimeUnit; @Component @Order(value=1) @@ -37,18 +37,12 @@ public class ZLMRunner implements CommandLineRunner { @Autowired private ZLMHttpHookSubscribe hookSubscribe; - @Autowired - private IStreamProxyService streamProxyService; - @Autowired private EventPublisher publisher; @Autowired private IMediaServerService mediaServerService; - @Autowired - private IRedisCatchStorage redisCatchStorage; - @Autowired private MediaConfig mediaConfig; @@ -67,17 +61,25 @@ public class ZLMRunner implements CommandLineRunner { mediaServerService.updateToDatabase(mediaSerItem); } mediaServerService.syncCatchFromDatabase(); + HookSubscribeForServerStarted hookSubscribeForServerStarted = HookSubscribeFactory.on_server_started(); +// Instant expiresInstant = Instant.now().plusSeconds(TimeUnit.SECONDS.toSeconds(60)); +// hookSubscribeForStreamChange.setExpires(expiresInstant); // 订阅 zlm启动事件, 新的zlm也会从这里进入系统 - hookSubscribe.addSubscribe(ZLMHttpHookSubscribe.HookType.on_server_started,new JSONObject(), + hookSubscribe.addSubscribe(hookSubscribeForServerStarted, (MediaServerItem mediaServerItem, JSONObject response)->{ ZLMServerConfig zlmServerConfig = JSONObject.toJavaObject(response, ZLMServerConfig.class); if (zlmServerConfig !=null ) { if (startGetMedia != null) { startGetMedia.remove(zlmServerConfig.getGeneralMediaServerId()); + if (startGetMedia.size() == 0) { + hookSubscribe.removeSubscribe(HookSubscribeFactory.on_server_started()); + } } } }); + + // 获取zlm信息 logger.info("[zlm] 等待默认zlm中..."); @@ -103,7 +105,6 @@ public class ZLMRunner implements CommandLineRunner { } startGetMedia = null; } - hookSubscribe.removeSubscribe(ZLMHttpHookSubscribe.HookType.on_server_started, new JSONObject()); // TODO 清理数据库中与redis不匹配的zlm }, 60 * 1000 ); } @@ -116,6 +117,9 @@ public class ZLMRunner implements CommandLineRunner { zlmServerConfigFirst.setIp(mediaServerItem.getIp()); zlmServerConfigFirst.setHttpPort(mediaServerItem.getHttpPort()); startGetMedia.remove(mediaServerItem.getId()); + if (startGetMedia.size() == 0) { + hookSubscribe.removeSubscribe(HookSubscribeFactory.on_server_started()); + } mediaServerService.zlmServerOnline(zlmServerConfigFirst); }else { logger.info("[ {} ]-[ {}:{} ]主动连接失败, 清理相关资源, 开始尝试重试连接", @@ -130,6 +134,9 @@ public class ZLMRunner implements CommandLineRunner { zlmServerConfig.setIp(mediaServerItem.getIp()); zlmServerConfig.setHttpPort(mediaServerItem.getHttpPort()); startGetMedia.remove(mediaServerItem.getId()); + if (startGetMedia.size() == 0) { + hookSubscribe.removeSubscribe(HookSubscribeFactory.on_server_started()); + } mediaServerService.zlmServerOnline(zlmServerConfig); } }, 2000); diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/HookSubscribeFactory.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/HookSubscribeFactory.java new file mode 100644 index 00000000..92172f3a --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/HookSubscribeFactory.java @@ -0,0 +1,33 @@ +package com.genersoft.iot.vmp.media.zlm.dto; + + +import com.alibaba.fastjson.JSONObject; + +/** + * hook 订阅工厂 + * @author lin + */ +public class HookSubscribeFactory { + + public static HookSubscribeForStreamChange on_stream_changed(String app, String stream, boolean regist, String scheam, String mediaServerId) { + HookSubscribeForStreamChange hookSubscribe = new HookSubscribeForStreamChange(); + JSONObject subscribeKey = new com.alibaba.fastjson.JSONObject(); + subscribeKey.put("app", app); + subscribeKey.put("stream", stream); + subscribeKey.put("regist", regist); + if (scheam != null) { + subscribeKey.put("schema", scheam); + } + subscribeKey.put("mediaServerId", mediaServerId); + hookSubscribe.setContent(subscribeKey); + + return hookSubscribe; + } + + public static HookSubscribeForServerStarted on_server_started() { + HookSubscribeForServerStarted hookSubscribe = new HookSubscribeForServerStarted(); + hookSubscribe.setContent(new JSONObject()); + + return hookSubscribe; + } +} diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/HookSubscribeForServerStarted.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/HookSubscribeForServerStarted.java new file mode 100644 index 00000000..0b781e6a --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/HookSubscribeForServerStarted.java @@ -0,0 +1,44 @@ +package com.genersoft.iot.vmp.media.zlm.dto; + +import com.alibaba.fastjson.JSONObject; +import com.alibaba.fastjson.annotation.JSONField; + +import java.time.Instant; + +/** + * hook订阅-流变化 + * @author lin + */ +public class HookSubscribeForServerStarted implements IHookSubscribe{ + + private HookType hookType = HookType.on_server_started; + + private JSONObject content; + + @JSONField(format="yyyy-MM-dd HH:mm:ss") + private Instant expires; + + @Override + public HookType getHookType() { + return hookType; + } + + @Override + public JSONObject getContent() { + return content; + } + + public void setContent(JSONObject content) { + this.content = content; + } + + @Override + public Instant getExpires() { + return expires; + } + + @Override + public void setExpires(Instant expires) { + this.expires = expires; + } +} diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/HookSubscribeForStreamChange.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/HookSubscribeForStreamChange.java new file mode 100644 index 00000000..d5b2fb81 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/HookSubscribeForStreamChange.java @@ -0,0 +1,43 @@ +package com.genersoft.iot.vmp.media.zlm.dto; + +import com.alibaba.fastjson.JSONObject; +import com.alibaba.fastjson.annotation.JSONField; + +import java.time.Instant; + +/** + * hook订阅-流变化 + * @author lin + */ +public class HookSubscribeForStreamChange implements IHookSubscribe{ + + private HookType hookType = HookType.on_stream_changed; + + private JSONObject content; + + private Instant expires; + + @Override + public HookType getHookType() { + return hookType; + } + + @Override + public JSONObject getContent() { + return content; + } + + public void setContent(JSONObject content) { + this.content = content; + } + + @Override + public Instant getExpires() { + return expires; + } + + @Override + public void setExpires(Instant expires) { + this.expires = expires; + } +} diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/HookType.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/HookType.java new file mode 100644 index 00000000..797ab81b --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/HookType.java @@ -0,0 +1,23 @@ +package com.genersoft.iot.vmp.media.zlm.dto; + +/** + * hook类型 + * @author lin + */ + +public enum HookType { + + on_flow_report, + on_http_access, + on_play, + on_publish, + on_record_mp4, + on_rtsp_auth, + on_rtsp_realm, + on_shell_login, + on_stream_changed, + on_stream_none_reader, + on_stream_not_found, + on_server_started, + on_server_keepalive +} diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/IHookSubscribe.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/IHookSubscribe.java new file mode 100644 index 00000000..5f2ca338 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/IHookSubscribe.java @@ -0,0 +1,36 @@ +package com.genersoft.iot.vmp.media.zlm.dto; + +import com.alibaba.fastjson.JSONObject; + +import java.time.Instant; + +/** + * zlm hook事件的参数 + * @author lin + */ +public interface IHookSubscribe { + + /** + * 获取hook类型 + * @return hook类型 + */ + HookType getHookType(); + + /** + * 获取hook的具体内容 + * @return hook的具体内容 + */ + JSONObject getContent(); + + /** + * 设置过期时间 + * @param instant 过期时间 + */ + void setExpires(Instant instant); + + /** + * 获取过期时间 + * @return 过期时间 + */ + Instant getExpires(); +} diff --git a/src/main/java/com/genersoft/iot/vmp/service/StreamGPSSubscribeTask.java b/src/main/java/com/genersoft/iot/vmp/service/StreamGPSSubscribeTask.java deleted file mode 100644 index 9d15c1f4..00000000 --- a/src/main/java/com/genersoft/iot/vmp/service/StreamGPSSubscribeTask.java +++ /dev/null @@ -1,38 +0,0 @@ -package com.genersoft.iot.vmp.service; - -import com.genersoft.iot.vmp.service.bean.GPSMsgInfo; -import com.genersoft.iot.vmp.storager.IRedisCatchStorage; -import com.genersoft.iot.vmp.storager.IVideoManagerStorage; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.scheduling.annotation.Scheduled; -import org.springframework.stereotype.Component; - -import java.util.List; - - -/** - * 定时查找redis中的GPS推送消息,并保存到对应的流中 - */ -@Component -public class StreamGPSSubscribeTask { - - @Autowired - private IRedisCatchStorage redisCatchStorage; - - @Autowired - private IVideoManagerStorage storager; - - - @Scheduled(fixedRate = 30 * 1000) //每30秒执行一次 - public void execute(){ - List gpsMsgInfo = redisCatchStorage.getAllGpsMsgInfo(); - if (gpsMsgInfo.size() > 0) { - storager.updateStreamGPS(gpsMsgInfo); - for (GPSMsgInfo msgInfo : gpsMsgInfo) { - msgInfo.setStored(true); - redisCatchStorage.updateGpsMsgInfo(msgInfo); - } - } - - } -} diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java index 6c909491..c0f42704 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java @@ -35,6 +35,10 @@ import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder; import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage; import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander; import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommanderFroPlatform; +import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory; +import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange; +import com.genersoft.iot.vmp.media.zlm.dto.HookType; +import com.genersoft.iot.vmp.utils.DateUtil; import com.genersoft.iot.vmp.media.zlm.AssistRESTfulUtils; import com.genersoft.iot.vmp.media.zlm.ZLMHttpHookSubscribe; import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils; @@ -305,16 +309,10 @@ public class PlayServiceImpl implements IPlayService { // 单端口模式streamId也有变化,需要重新设置监听 if (!mediaServerItem.isRtpEnable()) { // 添加订阅 - JSONObject subscribeKey = new JSONObject(); - subscribeKey.put("app", "rtp"); - subscribeKey.put("stream", stream); - subscribeKey.put("regist", true); - subscribeKey.put("schema", "rtmp"); - subscribeKey.put("mediaServerId", mediaServerItem.getId()); - subscribe.removeSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed,subscribeKey); - subscribeKey.put("stream", String.format("%08x", Integer.parseInt(ssrcInResponse)).toUpperCase()); - subscribe.addSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, subscribeKey, - (MediaServerItem mediaServerItemInUse, JSONObject response)->{ + HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed("rtp", stream, true, "rtmp", mediaServerItem.getId()); + subscribe.removeSubscribe(hookSubscribe); + hookSubscribe.getContent().put("stream", String.format("%08x", Integer.parseInt(ssrcInResponse)).toUpperCase()); + subscribe.addSubscribe(hookSubscribe, (MediaServerItem mediaServerItemInUse, JSONObject response)->{ logger.info("[ZLM HOOK] ssrc修正后收到订阅消息: " + response.toJSONString()); dynamicTask.stop(timeOutTaskKey); // hook响应 diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/RedisGbPlayMsgListener.java b/src/main/java/com/genersoft/iot/vmp/service/impl/RedisGbPlayMsgListener.java index 638ea41d..a4fa6357 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/RedisGbPlayMsgListener.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/RedisGbPlayMsgListener.java @@ -8,6 +8,9 @@ import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem; import com.genersoft.iot.vmp.media.zlm.ZLMHttpHookSubscribe; import com.genersoft.iot.vmp.media.zlm.ZLMMediaListManager; import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory; +import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory; +import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange; +import com.genersoft.iot.vmp.media.zlm.dto.HookType; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import com.genersoft.iot.vmp.service.IMediaServerService; import com.genersoft.iot.vmp.service.bean.*; @@ -270,14 +273,9 @@ public class RedisGbPlayMsgListener implements MessageListener { }, userSetting.getPlatformPlayTimeout()); // 添加订阅 - JSONObject subscribeKey = new JSONObject(); - subscribeKey.put("app", content.getApp()); - subscribeKey.put("stream", content.getStream()); - subscribeKey.put("regist", true); - subscribeKey.put("schema", "rtmp"); - subscribeKey.put("mediaServerId", mediaServerItem.getId()); - subscribe.addSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, subscribeKey, - (MediaServerItem mediaServerItemInUse, JSONObject json)->{ + HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed(content.getApp(), content.getStream(), true, "rtmp", mediaServerItem.getId()); + + subscribe.addSubscribe(hookSubscribe, (MediaServerItem mediaServerItemInUse, JSONObject json)->{ dynamicTask.stop(taskKey); responseSendItem(mediaServerItem, content, toId, serial); }); diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/RedisGpsMsgListener.java b/src/main/java/com/genersoft/iot/vmp/service/impl/RedisGpsMsgListener.java index d5a26e7b..4e94d68a 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/RedisGpsMsgListener.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/RedisGpsMsgListener.java @@ -4,6 +4,7 @@ import com.alibaba.fastjson.JSON; import com.genersoft.iot.vmp.gb28181.bean.HandlerCatchData; import com.genersoft.iot.vmp.service.bean.GPSMsgInfo; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; +import com.genersoft.iot.vmp.storager.IVideoManagerStorage; import org.jetbrains.annotations.NotNull; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -11,9 +12,11 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.data.redis.connection.Message; import org.springframework.data.redis.connection.MessageListener; +import org.springframework.scheduling.annotation.Scheduled; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.stereotype.Component; +import java.util.List; import java.util.concurrent.ConcurrentLinkedQueue; /** @@ -30,6 +33,9 @@ public class RedisGpsMsgListener implements MessageListener { @Autowired private IRedisCatchStorage redisCatchStorage; + @Autowired + private IVideoManagerStorage storager; + private final ConcurrentLinkedQueue taskQueue = new ConcurrentLinkedQueue<>(); @Qualifier("taskExecutor") @@ -46,10 +52,26 @@ public class RedisGpsMsgListener implements MessageListener { while (!taskQueue.isEmpty()) { Message msg = taskQueue.poll(); GPSMsgInfo gpsMsgInfo = JSON.parseObject(msg.getBody(), GPSMsgInfo.class); + // 只是放入redis缓存起来 redisCatchStorage.updateGpsMsgInfo(gpsMsgInfo); } taskQueueHandlerRun = false; }); } } + + /** + * 定时将经纬度更新到数据库 + */ + @Scheduled(fixedRate = 2 * 1000) //每2秒执行一次 + public void execute(){ + List gpsMsgInfo = redisCatchStorage.getAllGpsMsgInfo(); + if (gpsMsgInfo.size() > 0) { + storager.updateStreamGPS(gpsMsgInfo); + for (GPSMsgInfo msgInfo : gpsMsgInfo) { + msgInfo.setStored(true); + redisCatchStorage.updateGpsMsgInfo(msgInfo); + } + } + } } diff --git a/src/main/java/com/genersoft/iot/vmp/vmanager/server/ServerController.java b/src/main/java/com/genersoft/iot/vmp/vmanager/server/ServerController.java index faed2c82..2311d4be 100644 --- a/src/main/java/com/genersoft/iot/vmp/vmanager/server/ServerController.java +++ b/src/main/java/com/genersoft/iot/vmp/vmanager/server/ServerController.java @@ -8,6 +8,8 @@ import com.genersoft.iot.vmp.conf.DynamicTask; import com.genersoft.iot.vmp.conf.SipConfig; import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.conf.VersionInfo; +import com.genersoft.iot.vmp.media.zlm.ZLMHttpHookSubscribe; +import com.genersoft.iot.vmp.media.zlm.dto.IHookSubscribe; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import com.genersoft.iot.vmp.service.IMediaServerService; import com.genersoft.iot.vmp.utils.SpringBeanFactory; @@ -38,7 +40,7 @@ import java.util.Set; public class ServerController { @Autowired - private ConfigurableApplicationContext context; + private ZLMHttpHookSubscribe zlmHttpHookSubscribe; @Autowired private IMediaServerService mediaServerService; @@ -254,6 +256,18 @@ public class ServerController { return result; } + @ApiOperation("获取当前所有hook") + @GetMapping(value = "/hooks") + @ResponseBody + public WVPResult> getHooks(){ + WVPResult> result = new WVPResult<>(); + result.setCode(0); + result.setMsg("success"); + List all = zlmHttpHookSubscribe.getAll(); + result.setData(all); + return result; + } + // @ApiOperation("当前进行中的动态任务") // @GetMapping(value = "/dynamicTask") // @ResponseBody