diff --git a/sql/2.6.9更新.sql b/sql/2.6.9更新.sql new file mode 100644 index 00000000..f8f44d95 --- /dev/null +++ b/sql/2.6.9更新.sql @@ -0,0 +1,8 @@ +alter table wvp_device_channel + change stream_id stream_id varying(255) + +alter table wvp_platform + add auto_push_channel bool default false + +alter table wvp_stream_proxy + add stream_key character varying(255) diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/task/impl/CatalogSubscribeTask.java b/src/main/java/com/genersoft/iot/vmp/gb28181/task/impl/CatalogSubscribeTask.java index 39dff931..2ffbfe40 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/task/impl/CatalogSubscribeTask.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/task/impl/CatalogSubscribeTask.java @@ -89,17 +89,17 @@ public class CatalogSubscribeTask implements ISubscribeTask { ResponseEvent event = (ResponseEvent) eventResult.event; if (event.getResponse().getRawContent() != null) { // 成功 - logger.info("[取消目录订阅订阅]成功: {}", device.getDeviceId()); + logger.info("[取消目录订阅]成功: {}", device.getDeviceId()); }else { // 成功 - logger.info("[取消目录订阅订阅]成功: {}", device.getDeviceId()); + logger.info("[取消目录订阅]成功: {}", device.getDeviceId()); } },eventResult -> { // 失败 - logger.warn("[取消目录订阅订阅]失败,信令发送失败: {}-{} ", device.getDeviceId(), eventResult.msg); + logger.warn("[取消目录订阅]失败,信令发送失败: {}-{} ", device.getDeviceId(), eventResult.msg); }); } catch (InvalidArgumentException | SipException | ParseException e) { - logger.error("[命令发送失败] 取消目录订阅订阅: {}", e.getMessage()); + logger.error("[命令发送失败] 取消目录订阅: {}", e.getMessage()); } } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java index dbe49d5d..d35c6a63 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java @@ -132,7 +132,6 @@ public class NotifyRequestProcessor extends SIPRequestProcessorParent implements if (CmdType.CATALOG.equals(cmd)) { logger.info("接收到Catalog通知"); - processNotifyCatalogList(take.getEvt()); notifyRequestForCatalogProcessor.process(take.getEvt()); } else if (CmdType.ALARM.equals(cmd)) { logger.info("接收到Alarm通知"); @@ -371,114 +370,6 @@ public class NotifyRequestProcessor extends SIPRequestProcessorParent implements } } - /*** - * 处理catalog设备目录列表Notify - * - * @param evt - */ - private void processNotifyCatalogList(RequestEvent evt) { - try { - FromHeader fromHeader = (FromHeader) evt.getRequest().getHeader(FromHeader.NAME); - String deviceId = SipUtils.getUserIdFromFromHeader(fromHeader); - - Device device = redisCatchStorage.getDevice(deviceId); - if (device == null || !device.isOnLine()) { - logger.warn("[收到目录订阅]:{}, 但是设备已经离线", (device != null ? device.getDeviceId():"" )); - return; - } - Element rootElement = getRootElement(evt, device.getCharset()); - if (rootElement == null) { - logger.warn("[ 收到目录订阅 ] content cannot be null, {}", evt.getRequest()); - return; - } - Element deviceListElement = rootElement.element("DeviceList"); - if (deviceListElement == null) { - return; - } - Iterator deviceListIterator = deviceListElement.elementIterator(); - if (deviceListIterator != null) { - - // 遍历DeviceList - while (deviceListIterator.hasNext()) { - Element itemDevice = deviceListIterator.next(); - Element channelDeviceElement = itemDevice.element("DeviceID"); - if (channelDeviceElement == null) { - continue; - } - Element eventElement = itemDevice.element("Event"); - String event; - if (eventElement == null) { - logger.warn("[收到目录订阅]:{}, 但是Event为空, 设为默认值 ADD", (device != null ? device.getDeviceId():"" )); - event = CatalogEvent.ADD; - }else { - event = eventElement.getText().toUpperCase(); - } - DeviceChannel channel = XmlUtil.channelContentHandler(itemDevice, device, event, civilCodeFileConf); - if (channel == null) { - logger.info("[收到目录订阅]:但是解析失败 {}", new String(evt.getRequest().getRawContent())); - continue; - } - if (channel.getParentId() != null && channel.getParentId().equals(sipConfig.getId())) { - channel.setParentId(null); - } - channel.setDeviceId(device.getDeviceId()); - logger.info("[收到目录订阅]:{}/{}", device.getDeviceId(), channel.getChannelId()); - switch (event) { - case CatalogEvent.ON: - // 上线 - logger.info("[收到通道上线通知] 来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId()); - storager.deviceChannelOnline(deviceId, channel.getChannelId()); - break; - case CatalogEvent.OFF : - // 离线 - logger.info("[收到通道离线通知] 来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId()); - if (userSetting.getRefuseChannelStatusChannelFormNotify()) { - storager.deviceChannelOffline(deviceId, channel.getChannelId()); - }else { - logger.info("[收到通道离线通知] 但是平台已配置拒绝此消息,来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId()); - } - break; - case CatalogEvent.VLOST: - // 视频丢失 - logger.info("[收到通道视频丢失通知] 来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId()); - if (userSetting.getRefuseChannelStatusChannelFormNotify()) { - storager.deviceChannelOffline(deviceId, channel.getChannelId()); - }else { - logger.info("[收到通道视频丢失通知] 但是平台已配置拒绝此消息,来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId()); - } - break; - case CatalogEvent.DEFECT: - // 故障 - break; - case CatalogEvent.ADD: - // 增加 - logger.info("[收到增加通道通知] 来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId()); - deviceChannelService.updateChannel(deviceId, channel); - break; - case CatalogEvent.DEL: - // 删除 - logger.info("[收到删除通道通知] 来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId()); - storager.delChannel(deviceId, channel.getChannelId()); - break; - case CatalogEvent.UPDATE: - // 更新 - logger.info("[收到更新通道通知] 来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId()); - deviceChannelService.updateChannel(deviceId, channel); - break; - default: - logger.warn("[ NotifyCatalog ] event not found : {}", event ); - - } - // 转发变化信息 - eventPublisher.catalogEventPublish(null, channel, event); - - } - } - } catch (DocumentException e) { - logger.error("未处理的异常 ", e); - } - } - public void setCmder(SIPCommander cmder) { } diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRESTfulUtils.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRESTfulUtils.java index 84e9e7e6..52bc9028 100755 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRESTfulUtils.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRESTfulUtils.java @@ -215,6 +215,21 @@ public class ZLMRESTfulUtils { } } + public JSONObject isMediaOnline(MediaServerItem mediaServerItem, String app, String stream, String schema){ + Map param = new HashMap<>(); + if (app != null) { + param.put("app",app); + } + if (stream != null) { + param.put("stream",stream); + } + if (schema != null) { + param.put("schema",schema); + } + param.put("vhost","__defaultVhost__"); + return sendPost(mediaServerItem, "isMediaOnline", param, null); + } + public JSONObject getMediaList(MediaServerItem mediaServerItem, String app, String stream, String schema, RequestCallback callback){ Map param = new HashMap<>(); if (app != null) { diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRunner.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRunner.java index 6e594024..4a781f31 100755 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRunner.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRunner.java @@ -9,6 +9,7 @@ import com.genersoft.iot.vmp.gb28181.event.EventPublisher; import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory; import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForServerStarted; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; +import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem; import com.genersoft.iot.vmp.service.IMediaServerService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceServiceImpl.java index 0d99eccd..79f20391 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceServiceImpl.java @@ -520,16 +520,18 @@ public class DeviceServiceImpl implements IDeviceService { // 目录订阅相关的信息 - if (device.getSubscribeCycleForCatalog() > 0) { - if (deviceInStore.getSubscribeCycleForCatalog() == 0 || deviceInStore.getSubscribeCycleForCatalog() != device.getSubscribeCycleForCatalog()) { - deviceInStore.setSubscribeCycleForCatalog(device.getSubscribeCycleForCatalog()); + if (deviceInStore.getSubscribeCycleForCatalog() != device.getSubscribeCycleForCatalog()) { + if (device.getSubscribeCycleForCatalog() > 0) { + // 若已开启订阅,但订阅周期不同,则先取消 + if (deviceInStore.getSubscribeCycleForCatalog() != 0) { + removeCatalogSubscribe(deviceInStore); + } // 开启订阅 - addCatalogSubscribe(deviceInStore); - } - }else if (device.getSubscribeCycleForCatalog() == 0) { - if (deviceInStore.getSubscribeCycleForCatalog() != 0) { deviceInStore.setSubscribeCycleForCatalog(device.getSubscribeCycleForCatalog()); + addCatalogSubscribe(deviceInStore); + }else if (device.getSubscribeCycleForCatalog() == 0) { // 取消订阅 + deviceInStore.setSubscribeCycleForCatalog(device.getSubscribeCycleForCatalog()); removeCatalogSubscribe(deviceInStore); } } @@ -544,6 +546,8 @@ public class DeviceServiceImpl implements IDeviceService { } }else if (device.getSubscribeCycleForMobilePosition() == 0) { if (deviceInStore.getSubscribeCycleForMobilePosition() != 0) { + deviceInStore.setMobilePositionSubmissionInterval(device.getMobilePositionSubmissionInterval()); + deviceInStore.setSubscribeCycleForMobilePosition(device.getSubscribeCycleForMobilePosition()); // 取消订阅 removeMobilePositionSubscribe(deviceInStore); } diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/InviteStreamServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/InviteStreamServiceImpl.java index d630a2c0..752d0631 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/InviteStreamServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/InviteStreamServiceImpl.java @@ -257,7 +257,7 @@ public class InviteStreamServiceImpl implements IInviteStreamService { ":" + inviteInfo.getDeviceId() + ":" + inviteInfo.getChannelId() + ":" + inviteInfo.getStream() + - ":" + inviteInfo.getSsrcInfo().getSsrc(); + ":" + ssrc; if (inviteInfoInDb.getSsrcInfo() != null) { inviteInfoInDb.getSsrcInfo().setSsrc(ssrc); } diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java index 7fbe7691..eac543a3 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java @@ -35,15 +35,19 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.jdbc.datasource.DataSourceTransactionManager; +import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Service; import org.springframework.transaction.TransactionDefinition; import org.springframework.transaction.TransactionStatus; +import org.springframework.util.CollectionUtils; import org.springframework.util.ObjectUtils; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.UUID; +import java.util.function.Function; +import java.util.stream.Collectors; /** * 视频代理业务 @@ -560,4 +564,43 @@ public class StreamProxyServiceImpl implements IStreamProxyService { return new ResourceBaseInfo(total, online); } + + + @Scheduled(cron = "* 0/10 * * * ?") + public void asyncCheckStreamProxyStatus() { + + List all = mediaServerService.getAllOnline(); + + if (CollectionUtils.isEmpty(all)){ + return; + } + + Map serverItemMap = all.stream().collect(Collectors.toMap(MediaServerItem::getId, Function.identity(), (m1, m2) -> m1)); + + List list = videoManagerStorager.getStreamProxyListForEnable(true); + + if (CollectionUtils.isEmpty(list)){ + return; + } + + for (StreamProxyItem streamProxyItem : list) { + + MediaServerItem mediaServerItem = serverItemMap.get(streamProxyItem.getMediaServerId()); + + // TODO 支持其他 schema + JSONObject mediaInfo = zlmresTfulUtils.isMediaOnline(mediaServerItem, streamProxyItem.getApp(), streamProxyItem.getStream(), "rtsp"); + + if (mediaInfo == null){ + streamProxyItem.setStatus(false); + } else { + if (mediaInfo.getInteger("code") == 0 && mediaInfo.getBoolean("online")) { + streamProxyItem.setStatus(true); + } else { + streamProxyItem.setStatus(false); + } + } + + updateStreamProxy(streamProxyItem); + } + } } diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java index bc34162b..32e9bdbe 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java @@ -506,6 +506,9 @@ public class StreamPushServiceImpl implements IStreamPushService { stream.setUpdateTime(DateUtil.getNow()); stream.setCreateTime(DateUtil.getNow()); stream.setServerId(userSetting.getServerId()); + stream.setMediaServerId(mediaConfig.getId()); + stream.setSelf(true); + stream.setPushIng(true); // 放在事务内执行 boolean result = false;