From b957ab61c7d7f54716f81f4cd9474238fd110e1d Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: Tue, 26 Jul 2022 11:43:56 +0800 Subject: [PATCH] =?UTF-8?q?bug=E4=BF=AE=E5=A4=8D?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../impl/MobilePositionSubscribeTask.java | 6 +- .../request/impl/InviteRequestProcessor.java | 18 ++--- .../request/impl/NotifyRequestProcessor.java | 12 ++-- .../notify/cmd/AlarmNotifyMessageHandler.java | 2 +- .../cmd/CatalogResponseMessageHandler.java | 4 +- .../vmp/media/zlm/ZLMHttpHookListener.java | 2 + .../vmp/media/zlm/ZLMMediaListManager.java | 67 +++++++++---------- .../vmp/media/zlm/ZLMRTPServerFactory.java | 6 +- .../service/impl/RedisStreamMsgListener.java | 9 +-- .../streamPush/StreamPushController.java | 9 ++- web_src/src/components/PushVideoList.vue | 2 +- web_src/src/components/dialog/catalogEdit.vue | 5 +- 12 files changed, 71 insertions(+), 71 deletions(-) diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/task/impl/MobilePositionSubscribeTask.java b/src/main/java/com/genersoft/iot/vmp/gb28181/task/impl/MobilePositionSubscribeTask.java index cf1f7cf6..e43e59b9 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/task/impl/MobilePositionSubscribeTask.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/task/impl/MobilePositionSubscribeTask.java @@ -39,9 +39,9 @@ public class MobilePositionSubscribeTask implements ISubscribeTask { dynamicTask.stop(taskKey); } sipCommander.mobilePositionSubscribe(device, dialog, eventResult -> { -// if (eventResult.dialog != null || eventResult.dialog.getState().equals(DialogState.CONFIRMED)) { -// dialog = eventResult.dialog; -// } + if (eventResult.dialog != null || eventResult.dialog.getState().equals(DialogState.CONFIRMED)) { + dialog = eventResult.dialog; + } ResponseEvent event = (ResponseEvent) eventResult.event; if (event.getResponse().getRawContent() != null) { // 成功 diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java index 01837483..fda3bff5 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java @@ -419,12 +419,12 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements } } } else if (gbStream != null) { - if (streamPushItem != null && streamPushItem.isStatus()) { - // 在线状态 + if (streamPushItem != null && streamPushItem.isPushIng()) { + // 推流状态 pushStream(evt, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive, mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId); } else { - // 不在线 拉起 + // 未推流 拉起 notifyStreamOnline(evt, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive, mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId); } @@ -451,7 +451,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements int port, Boolean tcpActive, boolean mediaTransmissionTCP, String channelId, String addressStr, String ssrc, String requesterId) throws InvalidArgumentException, ParseException, SipException { // 推流 - if (streamPushItem.getServerId().equals(userSetting.getServerId())) { + if (streamPushItem.isSelf()) { Boolean streamReady = zlmrtpServerFactory.isStreamReady(mediaServerItem, gbStream.getApp(), gbStream.getStream()); if (streamReady) { // 自平台内容 @@ -500,7 +500,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements String channelId, String addressStr, String ssrc, String requesterId) throws InvalidArgumentException, ParseException, SipException { if ("proxy".equals(gbStream.getStreamType())) { // TODO 控制启用以使设备上线 - logger.info("[ app={}, stream={} ]通道离线,启用流后开始推流", gbStream.getApp(), gbStream.getStream()); + logger.info("[ app={}, stream={} ]通道未推流,启用流后开始推流", gbStream.getApp(), gbStream.getStream()); responseAck(evt, Response.BAD_REQUEST, "channel [" + gbStream.getGbId() + "] offline"); } else if ("push".equals(gbStream.getStreamType())) { if (!platform.isStartOfflinePush()) { @@ -508,7 +508,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements return; } // 发送redis消息以使设备上线 - logger.info("[ app={}, stream={} ]通道离线,发送redis信息控制设备开始推流", gbStream.getApp(), gbStream.getStream()); + logger.info("[ app={}, stream={} ]通道未推流,发送redis信息控制设备开始推流", gbStream.getApp(), gbStream.getStream()); MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(1, gbStream.getApp(), gbStream.getStream(), gbStream.getGbId(), gbStream.getPlatformId(), @@ -518,7 +518,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements dynamicTask.startDelay(callIdHeader.getCallId(), () -> { logger.info("[ app={}, stream={} ] 等待设备开始推流超时", gbStream.getApp(), gbStream.getStream()); try { - mediaListManager.removedChannelOnlineEventLister(gbStream.getGbId()); + mediaListManager.removedChannelOnlineEventLister(gbStream.getApp(), gbStream.getStream()); responseAck(evt, Response.REQUEST_TIMEOUT); // 超时 } catch (SipException e) { e.printStackTrace(); @@ -533,7 +533,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements Boolean finalTcpActive = tcpActive; // 添加在本机上线的通知 - mediaListManager.addChannelOnlineEventLister(gbStream.getGbId(), (app, stream, serverId) -> { + mediaListManager.addChannelOnlineEventLister(gbStream.getApp(), gbStream.getStream(), (app, stream, serverId) -> { dynamicTask.stop(callIdHeader.getCallId()); if (serverId.equals(userSetting.getServerId())) { SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(mediaServerItem, addressStr, finalPort, ssrc, requesterId, @@ -621,7 +621,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements // 离线 // 查询是否在本机上线了 StreamPushItem currentStreamPushItem = streamPushService.getPush(streamPushItem.getApp(), streamPushItem.getStream()); - if (currentStreamPushItem.isStatus()) { + if (currentStreamPushItem.isPushIng()) { // 在线状态 pushStream(evt, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive, mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java index c8a221bd..e191578a 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java @@ -350,17 +350,17 @@ public class NotifyRequestProcessor extends SIPRequestProcessorParent implements switch (event) { case CatalogEvent.ON: // 上线 - logger.info("收到来自设备【{}】的通道【{}】上线通知", device.getDeviceId(), channel.getChannelId()); + logger.info("[收到通道上线通知] 来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId()); storager.deviceChannelOnline(deviceId, channel.getChannelId()); break; case CatalogEvent.OFF : // 离线 - logger.info("收到来自设备【{}】的通道【{}】离线通知", device.getDeviceId(), channel.getChannelId()); + logger.info("[收到通道离线通知] 来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId()); storager.deviceChannelOffline(deviceId, channel.getChannelId()); break; case CatalogEvent.VLOST: // 视频丢失 - logger.info("收到来自设备【{}】的通道【{}】视频丢失通知", device.getDeviceId(), channel.getChannelId()); + logger.info("[收到通道视频丢失通知] 来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId()); storager.deviceChannelOffline(deviceId, channel.getChannelId()); break; case CatalogEvent.DEFECT: @@ -368,17 +368,17 @@ public class NotifyRequestProcessor extends SIPRequestProcessorParent implements break; case CatalogEvent.ADD: // 增加 - logger.info("收到来自设备【{}】的增加通道【{}】通知", device.getDeviceId(), channel.getChannelId()); + logger.info("[收到增加通道通知] 来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId()); deviceChannelService.updateChannel(deviceId, channel); break; case CatalogEvent.DEL: // 删除 - logger.info("收到来自设备【{}】的删除通道【{}】通知", device.getDeviceId(), channel.getChannelId()); + logger.info("[收到删除通道通知] 来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId()); storager.delChannel(deviceId, channel.getChannelId()); break; case CatalogEvent.UPDATE: // 更新 - logger.info("收到来自设备【{}】的更新通道【{}】通知", device.getDeviceId(), channel.getChannelId()); + logger.info("[收到更新通道通知] 来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId()); deviceChannelService.updateChannel(deviceId, channel); break; default: diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/AlarmNotifyMessageHandler.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/AlarmNotifyMessageHandler.java index 265694ae..20316e7e 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/AlarmNotifyMessageHandler.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/AlarmNotifyMessageHandler.java @@ -69,7 +69,7 @@ public class AlarmNotifyMessageHandler extends SIPRequestProcessorParent impleme @Override public void handForDevice(RequestEvent evt, Device device, Element rootElement) { - logger.info("收到来自设备[{}]的报警通知", device.getDeviceId()); + logger.info("[收到报警通知]设备:{}", device.getDeviceId()); // 回复200 OK try { responseAck(evt, Response.OK); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/CatalogResponseMessageHandler.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/CatalogResponseMessageHandler.java index f66d4f8b..ff71a922 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/CatalogResponseMessageHandler.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/CatalogResponseMessageHandler.java @@ -111,7 +111,7 @@ public class CatalogResponseMessageHandler extends SIPRequestProcessorParent imp int sumNum = Integer.parseInt(sumNumElement.getText()); if (sumNum == 0) { - logger.info("收到来自设备【{}】的通道: 0个", take.getDevice().getDeviceId()); + logger.info("[收到通道]设备:{}的: 0个", take.getDevice().getDeviceId()); // 数据已经完整接收 storager.cleanChannelsForDevice(take.getDevice().getDeviceId()); catalogDataCatch.setChannelSyncEnd(take.getDevice().getDeviceId(), null); @@ -133,7 +133,7 @@ public class CatalogResponseMessageHandler extends SIPRequestProcessorParent imp } int sn = Integer.parseInt(snElement.getText()); catalogDataCatch.put(take.getDevice().getDeviceId(), sn, sumNum, take.getDevice(), channelList); - logger.info("收到来自设备【{}】的通道: {}个,{}/{}", take.getDevice().getDeviceId(), channelList.size(), catalogDataCatch.get(take.getDevice().getDeviceId()) == null ? 0 :catalogDataCatch.get(take.getDevice().getDeviceId()).size(), sumNum); + logger.info("[收到通道]设备: {} -> {}个,{}/{}", take.getDevice().getDeviceId(), channelList.size(), catalogDataCatch.get(take.getDevice().getDeviceId()) == null ? 0 :catalogDataCatch.get(take.getDevice().getDeviceId()).size(), sumNum); if (catalogDataCatch.get(take.getDevice().getDeviceId()).size() == sumNum) { // 数据已经完整接收 boolean resetChannelsResult = storager.resetChannels(take.getDevice().getDeviceId(), catalogDataCatch.get(take.getDevice().getDeviceId())); diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java index 1f462314..11dd8179 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java @@ -240,6 +240,8 @@ public class ZLMHttpHookListener { if (mediaInfo != null) { assistRESTfulUtils.addStreamCallInfo(mediaInfo, param.getApp(), param.getStream(), callId, null); } + }else { + zlmMediaListManager.sendStreamEvent(param.getApp(),param.getStream(), param.getMediaServerId()); } ret.put("code", 0); diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaListManager.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaListManager.java index 1b1fb9f4..f78ca4a1 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaListManager.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaListManager.java @@ -4,6 +4,7 @@ import com.alibaba.fastjson.JSONObject; import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.gb28181.bean.GbStream; import com.genersoft.iot.vmp.media.zlm.dto.*; +import com.genersoft.iot.vmp.service.IMediaServerService; import com.genersoft.iot.vmp.service.IStreamProxyService; import com.genersoft.iot.vmp.service.IStreamPushService; import com.genersoft.iot.vmp.service.bean.ThirdPartyGB; @@ -63,7 +64,13 @@ public class ZLMMediaListManager { @Autowired private UserSetting userSetting; - private Map channelOnlineEvents = new ConcurrentHashMap<>(); + @Autowired + private ZLMRTPServerFactory zlmrtpServerFactory; + + @Autowired + private IMediaServerService mediaServerService; + + private Map channelOnPublishEvents = new ConcurrentHashMap<>(); public StreamPushItem addPush(MediaItem mediaItem) { // 查找此直播流是否存在redis预设gbId @@ -79,9 +86,26 @@ public class ZLMMediaListManager { }else { streamPushMapper.update(transform); } + if (transform != null) { + if (getChannelOnlineEventLister(transform.getApp(), transform.getStream()) != null) { + getChannelOnlineEventLister(transform.getApp(), transform.getStream()).run(transform.getApp(), transform.getStream(), transform.getServerId()); + removedChannelOnlineEventLister(transform.getApp(), transform.getStream()); + } + } return transform; } + public void sendStreamEvent(String app, String stream, String mediaServerId) { + MediaServerItem mediaServerItem = mediaServerService.getOne(mediaServerId); + // 查看推流状态 + if (zlmrtpServerFactory.isStreamReady(mediaServerItem, app, stream)) { + if (getChannelOnlineEventLister(app, stream) != null) { + getChannelOnlineEventLister(app, stream).run(app, stream, mediaServerId); + removedChannelOnlineEventLister(app, stream); + } + } + } + public int removeMedia(String app, String streamId) { // 查找是否关联了国标, 关联了不删除, 置为离线 GbStream gbStream = gbStreamMapper.selectOne(app, streamId); @@ -89,48 +113,21 @@ public class ZLMMediaListManager { if (gbStream == null) { result = storager.removeMedia(app, streamId); }else { - // TODO 暂不设置为离线 result =storager.mediaOffline(app, streamId); } return result; } - public void addChannelOnlineEventLister(String key, ChannelOnlineEvent callback) { - this.channelOnlineEvents.put(key,callback); + public void addChannelOnlineEventLister(String app, String stream, ChannelOnlineEvent callback) { + this.channelOnPublishEvents.put(app + "_" + stream, callback); } - public void removedChannelOnlineEventLister(String key) { - this.channelOnlineEvents.remove(key); + public void removedChannelOnlineEventLister(String app, String stream) { + this.channelOnPublishEvents.remove(app + "_" + stream); } + public ChannelOnlineEvent getChannelOnlineEventLister(String app, String stream) { + return this.channelOnPublishEvents.get(app + "_" + stream); + } - -// public void clearAllSessions() { -// logger.info("清空所有国标相关的session"); -// JSONObject allSessionJSON = zlmresTfulUtils.getAllSession(); -// ZLMServerConfig mediaInfo = redisCatchStorage.getMediaInfo(); -// HashSet allLocalPorts = new HashSet(); -// if (allSessionJSON.getInteger("code") == 0) { -// JSONArray data = allSessionJSON.getJSONArray("data"); -// if (data.size() > 0) { -// for (int i = 0; i < data.size(); i++) { -// JSONObject sessionJOSN = data.getJSONObject(i); -// Integer local_port = sessionJOSN.getInteger("local_port"); -// if (!local_port.equals(Integer.valueOf(mediaInfo.getHttpPort())) && -// !local_port.equals(Integer.valueOf(mediaInfo.getHttpSSLport())) && -// !local_port.equals(Integer.valueOf(mediaInfo.getRtmpPort())) && -// !local_port.equals(Integer.valueOf(mediaInfo.getRtspPort())) && -// !local_port.equals(Integer.valueOf(mediaInfo.getRtspSSlport())) && -// !local_port.equals(Integer.valueOf(mediaInfo.getHookOnFlowReport()))){ -// allLocalPorts.add(sessionJOSN.getInteger("local_port") + ""); -// } -// } -// } -// } -// if (allLocalPorts.size() > 0) { -// List result = new ArrayList<>(allLocalPorts); -// String localPortSStr = String.join(",", result); -// zlmresTfulUtils.kickSessions(localPortSStr); -// } -// } } diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java index cbef9ce2..1fe00e44 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java @@ -273,8 +273,10 @@ public class ZLMRTPServerFactory { * 查询待转推的流是否就绪 */ public Boolean isStreamReady(MediaServerItem mediaServerItem, String app, String streamId) { - JSONObject mediaInfo = zlmresTfulUtils.getMediaInfo(mediaServerItem, app, "rtmp", streamId); - return (mediaInfo.getInteger("code") == 0 && mediaInfo.getBoolean("online")); + JSONObject mediaInfo = zlmresTfulUtils.getMediaList(mediaServerItem, app, streamId); + return (mediaInfo.getInteger("code") == 0 + && mediaInfo.getJSONArray("data") != null + && mediaInfo.getJSONArray("data").size() > 0); } /** diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/RedisStreamMsgListener.java b/src/main/java/com/genersoft/iot/vmp/service/impl/RedisStreamMsgListener.java index 72b3c4e2..c5592f49 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/RedisStreamMsgListener.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/RedisStreamMsgListener.java @@ -72,9 +72,10 @@ public class RedisStreamMsgListener implements MessageListener { mediaItem.setOriginType(0); mediaItem.setOriginTypeStr("0"); mediaItem.setOriginTypeStr("unknown"); - - zlmMediaListManager.addPush(mediaItem); - - + if (register) { + zlmMediaListManager.addPush(mediaItem); + }else { + zlmMediaListManager.removeMedia(app, stream); + } } } diff --git a/src/main/java/com/genersoft/iot/vmp/vmanager/streamPush/StreamPushController.java b/src/main/java/com/genersoft/iot/vmp/vmanager/streamPush/StreamPushController.java index 6c111693..c978ae67 100644 --- a/src/main/java/com/genersoft/iot/vmp/vmanager/streamPush/StreamPushController.java +++ b/src/main/java/com/genersoft/iot/vmp/vmanager/streamPush/StreamPushController.java @@ -264,9 +264,8 @@ public class StreamPushController { }) @GetMapping(value = "/getPlayUrl") @ResponseBody - public WVPResult getPlayUrl(HttpServletRequest request, @RequestParam String app, - @RequestParam String stream, - @RequestParam(required = false) String mediaServerId){ + public WVPResult getPlayUrl(@RequestParam String app,@RequestParam String stream, + @RequestParam(required = false) String mediaServerId){ boolean authority = false; // 是否登陆用户, 登陆用户返回完整信息 LoginUser userInfo = SecurityUtils.getUserInfo(); @@ -275,7 +274,7 @@ public class StreamPushController { } WVPResult result = new WVPResult<>(); StreamPushItem push = streamPushService.getPush(app, stream); - if (!userSetting.getServerId().equals(push.getServerId()) ) { + if (push != null && !push.isSelf()) { result.setCode(-1); result.setMsg("来自其他平台的推流信息"); return result; @@ -283,7 +282,7 @@ public class StreamPushController { StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStreamWithCheck(app, stream, mediaServerId, authority); if (streamInfo != null){ result.setCode(0); - result.setMsg("scccess"); + result.setMsg("success"); result.setData(streamInfo); }else { result.setCode(-1); diff --git a/web_src/src/components/PushVideoList.vue b/web_src/src/components/PushVideoList.vue index 866c8084..6b4707e3 100644 --- a/web_src/src/components/PushVideoList.vue +++ b/web_src/src/components/PushVideoList.vue @@ -76,7 +76,7 @@