diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SubscribeHolder.java b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SubscribeHolder.java index 28fa1e43..01619939 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SubscribeHolder.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SubscribeHolder.java @@ -41,8 +41,6 @@ public class SubscribeHolder { // 添加任务处理订阅过期 dynamicTask.startDelay(taskOverdueKey, () -> removeCatalogSubscribe(subscribeInfo.getId()), subscribeInfo.getExpires() * 1000); - // 发送目录订阅添加通知 - eventPublisher.catalogSubscribePutEventPublish(platformId, subscribeInfo); } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/controller/PlatformController.java b/src/main/java/com/genersoft/iot/vmp/gb28181/controller/PlatformController.java index 5b13b30e..fd66ed78 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/controller/PlatformController.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/controller/PlatformController.java @@ -259,4 +259,24 @@ public class PlatformController { Assert.notNull(id, "平台ID不可为空"); platformChannelService.pushChannel(id); } + + @Operation(summary = "添加通道-通过设备", security = @SecurityRequirement(name = JwtUtils.HEADER)) + @PostMapping("/channel/device/add") + @ResponseBody + public void addChannelByDevice(@RequestBody UpdateChannelParam param) { + Assert.notNull(param.getPlatformId(), "平台ID不可为空"); + Assert.notNull(param.getDeviceIds(), "设备ID不可为空"); + Assert.notEmpty(param.getDeviceIds(), "设备ID不可为空"); + platformChannelService.addChannelByDevice(param.getPlatformId(), param.getDeviceIds()); + } + + @Operation(summary = "移除通道-通过设备", security = @SecurityRequirement(name = JwtUtils.HEADER)) + @PostMapping("/channel/device/remove") + @ResponseBody + public void removeChannelByDevice(@RequestBody UpdateChannelParam param) { + Assert.notNull(param.getPlatformId(), "平台ID不可为空"); + Assert.notNull(param.getDeviceIds(), "设备ID不可为空"); + Assert.notEmpty(param.getDeviceIds(), "设备ID不可为空"); + platformChannelService.removeChannelByDevice(param.getPlatformId(), param.getDeviceIds()); + } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/controller/bean/UpdateChannelParam.java b/src/main/java/com/genersoft/iot/vmp/gb28181/controller/bean/UpdateChannelParam.java index 30c7c0ca..5f36002c 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/controller/bean/UpdateChannelParam.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/controller/bean/UpdateChannelParam.java @@ -18,4 +18,7 @@ public class UpdateChannelParam { @Schema(description = "待关联的通道ID") List channelIds; + + @Schema(description = "待关联的设备ID") + List deviceIds; } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/dao/CommonGBChannelMapper.java b/src/main/java/com/genersoft/iot/vmp/gb28181/dao/CommonGBChannelMapper.java index 8f1ecaa9..3fe02c02 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/dao/CommonGBChannelMapper.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/dao/CommonGBChannelMapper.java @@ -309,6 +309,13 @@ public interface CommonGBChannelMapper { @SelectProvider(type = ChannelProvider.class, method = "queryByGbDeviceIds") List queryByGbDeviceIds(List deviceIds); + @Select(value = {" "}) + List queryByGbDeviceIdsForIds(List deviceIds); + @SelectProvider(type = ChannelProvider.class, method = "queryByGroupList") List queryByGroupList(List groupList); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/dao/PlatformMapper.java b/src/main/java/com/genersoft/iot/vmp/gb28181/dao/PlatformMapper.java index 90d94a92..44df00bd 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/dao/PlatformMapper.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/dao/PlatformMapper.java @@ -63,7 +63,11 @@ public interface PlatformMapper { int delete(@Param("id") Integer id); @Select(" SELECT pp.*, " + - " (SELECT count(0) FROM wvp_platform_channel pc WHERE pc.platform_id = pp.id ) as channel_count" + + " ( (SELECT count(0) FROM wvp_platform_channel pc WHERE pc.platform_id = pp.id ) + " + + " (SELECT count(0) FROM wvp_platform_group pg WHERE pg.platform_id = pp.id ) * pp.catalog_with_group + " + + " (SELECT count(0) FROM wvp_platform_region pr WHERE pr.platform_id = pp.id ) * pp.catalog_with_region + " + + " pp.catalog_with_platform " + + " ) as channel_count" + " FROM wvp_platform pp " ) List queryList(); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/event/EventPublisher.java b/src/main/java/com/genersoft/iot/vmp/gb28181/event/EventPublisher.java index 36541589..8355f2a9 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/event/EventPublisher.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/event/EventPublisher.java @@ -5,7 +5,6 @@ import com.genersoft.iot.vmp.gb28181.event.alarm.AlarmEvent; import com.genersoft.iot.vmp.gb28181.event.device.RequestTimeoutEvent; import com.genersoft.iot.vmp.gb28181.event.record.RecordEndEvent; import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent; -import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogSubscribePutEvent; import com.genersoft.iot.vmp.gb28181.event.subscribe.mobilePosition.MobilePositionEvent; import com.genersoft.iot.vmp.media.event.mediaServer.MediaServerOfflineEvent; import com.genersoft.iot.vmp.media.event.mediaServer.MediaServerOnlineEvent; @@ -106,11 +105,4 @@ public class EventPublisher { outEvent.setRecordInfo(recordInfo); applicationEventPublisher.publishEvent(outEvent); } - - public void catalogSubscribePutEventPublish(String platformId, SubscribeInfo subscribeInfo) { - CatalogSubscribePutEvent event = new CatalogSubscribePutEvent(this); - event.setPlatformId(platformId); - event.setSubscribeInfo(subscribeInfo); - applicationEventPublisher.publishEvent(event); - } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/catalog/CatalogSubscribePutEvent.java b/src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/catalog/CatalogSubscribePutEvent.java deleted file mode 100644 index 905e46bd..00000000 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/catalog/CatalogSubscribePutEvent.java +++ /dev/null @@ -1,21 +0,0 @@ -package com.genersoft.iot.vmp.gb28181.event.subscribe.catalog; - -import com.genersoft.iot.vmp.gb28181.bean.SubscribeInfo; -import lombok.Getter; -import lombok.Setter; -import org.springframework.context.ApplicationEvent; - -public class CatalogSubscribePutEvent extends ApplicationEvent { - - public CatalogSubscribePutEvent(Object source) { - super(source); - } - - @Getter - @Setter - private SubscribeInfo subscribeInfo; - - @Getter - @Setter - private String platformId; -} diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/catalog/CatalogSubscribePutEventLister.java b/src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/catalog/CatalogSubscribePutEventLister.java deleted file mode 100644 index d3b92621..00000000 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/catalog/CatalogSubscribePutEventLister.java +++ /dev/null @@ -1,41 +0,0 @@ -package com.genersoft.iot.vmp.gb28181.event.subscribe.catalog; - -import com.genersoft.iot.vmp.gb28181.bean.CommonGBChannel; -import com.genersoft.iot.vmp.gb28181.bean.Platform; -import com.genersoft.iot.vmp.gb28181.event.EventPublisher; -import com.genersoft.iot.vmp.gb28181.service.IPlatformService; -import lombok.extern.slf4j.Slf4j; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.context.ApplicationListener; -import org.springframework.stereotype.Component; - -@Slf4j -@Component -public class CatalogSubscribePutEventLister implements ApplicationListener { - - @Autowired - private IPlatformService platformService; - - @Autowired - private EventPublisher eventPublisher; - - - @Override - public void onApplicationEvent(CatalogSubscribePutEvent event) { - - Platform platform = platformService.queryPlatformByServerGBId(event.getPlatformId()); - if (platform == null){ - return; - } - - CommonGBChannel channel = CommonGBChannel.build(platform); - - // 发送消息 - try { - // 发送catalog - eventPublisher.catalogEventPublish(platform.getId(), channel, CatalogEvent.ADD); - } catch (Exception e) { - log.warn("[推送平台信息] 发送失败,平台{}({})", platform.getName(), platform.getServerGBId(), e); - } - } -} diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/service/IPlatformChannelService.java b/src/main/java/com/genersoft/iot/vmp/gb28181/service/IPlatformChannelService.java index c191874a..fe545c29 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/service/IPlatformChannelService.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/service/IPlatformChannelService.java @@ -30,4 +30,8 @@ public interface IPlatformChannelService { List queryByPlatform(Platform platform); void pushChannel(Integer platformId); + + void addChannelByDevice(Integer platformId, List deviceIds); + + void removeChannelByDevice(Integer platformId, List deviceIds); } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlatformChannelServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlatformChannelServiceImpl.java index 5c7b4081..0be964f6 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlatformChannelServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlatformChannelServiceImpl.java @@ -51,6 +51,8 @@ public class PlatformChannelServiceImpl implements IPlatformChannelService { private ISIPCommanderForPlatform sipCommanderFroPlatform; + + @Override public PageInfo queryChannelList(int page, int count, String query, Boolean online, Integer platformId, Boolean hasShare) { PageHelper.startPage(page, count); @@ -58,48 +60,6 @@ public class PlatformChannelServiceImpl implements IPlatformChannelService { return new PageInfo<>(all); } - @Override - @Transactional - public int addAllChannel(Integer platformId) { - List channelListNotShare = platformChannelMapper.queryNotShare(platformId, null); - Assert.notEmpty(channelListNotShare, "所有通道已共享"); - int result = platformChannelMapper.addChannels(platformId, channelListNotShare); - if (result > 0) { - // 查询通道相关的行政区划信息是否共享,如果没共享就添加 - Set regionListNotShare = getRegionNotShareByChannelList(channelListNotShare, platformId); - if (!regionListNotShare.isEmpty()) { - int addGroupResult = platformChannelMapper.addPlatformRegion(new ArrayList<>(regionListNotShare), platformId); - if (addGroupResult > 0) { - for (Region region : regionListNotShare) { - // 分组信息排序时需要将顶层排在最后 - channelListNotShare.add(0, CommonGBChannel.build(region)); - } - } - } - - // 查询通道相关的分组信息是否共享,如果没共享就添加 - Set groupListNotShare = getGroupNotShareByChannelList(channelListNotShare, platformId); - if (!groupListNotShare.isEmpty()) { - int addGroupResult = platformChannelMapper.addPlatformGroup(new ArrayList<>(groupListNotShare), platformId); - if (addGroupResult > 0) { - for (Group group : groupListNotShare) { - // 分组信息排序时需要将顶层排在最后 - channelListNotShare.add(0, CommonGBChannel.build(group)); - } - } - } - - // 发送消息 - try { - // 发送catalog - eventPublisher.catalogEventPublish(platformId, channelListNotShare, CatalogEvent.ADD); - } catch (Exception e) { - log.warn("[关联全部通道] 发送失败,数量:{}", channelListNotShare.size(), e); - } - } - return result; - } - /** * 获取通道使用的分组中未分享的 */ @@ -231,33 +191,46 @@ public class PlatformChannelServiceImpl implements IPlatformChannelService { return channelList; } + @Override + @Transactional + public int addAllChannel(Integer platformId) { + List channelListNotShare = platformChannelMapper.queryNotShare(platformId, null); + Assert.notEmpty(channelListNotShare, "所有通道已共享"); + return addChannelList(platformId, channelListNotShare); + } + @Override @Transactional public int addChannels(Integer platformId, List channelIds) { List channelListNotShare = platformChannelMapper.queryNotShare(platformId, channelIds); Assert.notEmpty(channelListNotShare, "通道已共享"); - int result = platformChannelMapper.addChannels(platformId, channelListNotShare); + return addChannelList(platformId, channelListNotShare); + } + + @Transactional + public int addChannelList(Integer platformId, List channelList) { + int result = platformChannelMapper.addChannels(platformId, channelList); if (result > 0) { // 查询通道相关的行政区划信息是否共享,如果没共享就添加 - Set regionListNotShare = getRegionNotShareByChannelList(channelListNotShare, platformId); + Set regionListNotShare = getRegionNotShareByChannelList(channelList, platformId); if (!regionListNotShare.isEmpty()) { int addGroupResult = platformChannelMapper.addPlatformRegion(new ArrayList<>(regionListNotShare), platformId); if (addGroupResult > 0) { for (Region region : regionListNotShare) { // 分组信息排序时需要将顶层排在最后 - channelListNotShare.add(0, CommonGBChannel.build(region)); + channelList.add(0, CommonGBChannel.build(region)); } } } // 查询通道相关的分组信息是否共享,如果没共享就添加 - Set groupListNotShare = getGroupNotShareByChannelList(channelListNotShare, platformId); + Set groupListNotShare = getGroupNotShareByChannelList(channelList, platformId); if (!groupListNotShare.isEmpty()) { int addGroupResult = platformChannelMapper.addPlatformGroup(new ArrayList<>(groupListNotShare), platformId); if (addGroupResult > 0) { for (Group group : groupListNotShare) { // 分组信息排序时需要将顶层排在最后 - channelListNotShare.add(0, CommonGBChannel.build(group)); + channelList.add(0, CommonGBChannel.build(group)); } } } @@ -265,9 +238,9 @@ public class PlatformChannelServiceImpl implements IPlatformChannelService { // 发送消息 try { // 发送catalog - eventPublisher.catalogEventPublish(platformId, channelListNotShare, CatalogEvent.ADD); + eventPublisher.catalogEventPublish(platformId, channelList, CatalogEvent.ADD); } catch (Exception e) { - log.warn("[关联通道] 发送失败,数量:{}", channelListNotShare.size(), e); + log.warn("[关联通道] 发送失败,数量:{}", channelList.size(), e); } } return result; @@ -309,9 +282,20 @@ public class PlatformChannelServiceImpl implements IPlatformChannelService { @Override @Transactional - public int removeChannels(Integer platformId, List channelIds) { - List channelList = platformChannelMapper.queryShare(platformId, channelIds); - Assert.notEmpty(channelList, "所选通道未共享"); + public void addChannelByDevice(Integer platformId, List deviceIds) { + List channelList = commonGBChannelMapper.queryByGbDeviceIdsForIds(deviceIds); + addChannels(platformId, channelList); + } + + @Override + @Transactional + public void removeChannelByDevice(Integer platformId, List deviceIds) { + List channelList = commonGBChannelMapper.queryByGbDeviceIdsForIds(deviceIds); + removeChannels(platformId, channelList); + } + + @Transactional + public int removeChannelList(Integer platformId, List channelList) { int result = platformChannelMapper.removeChannelsWithPlatform(platformId, channelList); if (result > 0) { // 查询通道相关的分组信息 @@ -342,6 +326,14 @@ public class PlatformChannelServiceImpl implements IPlatformChannelService { return result; } + @Override + @Transactional + public int removeChannels(Integer platformId, List channelIds) { + List channelList = platformChannelMapper.queryShare(platformId, channelIds); + Assert.notEmpty(channelList, "所选通道未共享"); + return removeChannelList(platformId, channelList); + } + @Override @Transactional public void removeChannels(List ids) { @@ -356,12 +348,12 @@ public class PlatformChannelServiceImpl implements IPlatformChannelService { } @Override + @Transactional public void removeChannel(int channelId) { List platformList = platformChannelMapper.queryPlatFormListByChannelId(channelId); if (platformList.isEmpty()) { return; } - for (Platform platform : platformList) { ArrayList ids = new ArrayList<>(); ids.add(channelId); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlatformServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlatformServiceImpl.java index 1f39e4d7..c6536dcd 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlatformServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlatformServiceImpl.java @@ -1,14 +1,15 @@ package com.genersoft.iot.vmp.gb28181.service.impl; import com.baomidou.dynamic.datasource.annotation.DS; -import com.genersoft.iot.vmp.common.*; import com.genersoft.iot.vmp.common.InviteInfo; +import com.genersoft.iot.vmp.common.*; import com.genersoft.iot.vmp.conf.DynamicTask; import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.conf.exception.SsrcTransactionNotFoundException; import com.genersoft.iot.vmp.gb28181.bean.*; import com.genersoft.iot.vmp.gb28181.dao.PlatformChannelMapper; import com.genersoft.iot.vmp.gb28181.dao.PlatformMapper; +import com.genersoft.iot.vmp.gb28181.event.EventPublisher; import com.genersoft.iot.vmp.gb28181.event.SipSubscribe; import com.genersoft.iot.vmp.gb28181.service.IInviteStreamService; import com.genersoft.iot.vmp.gb28181.service.IPlatformService; @@ -96,6 +97,8 @@ public class PlatformServiceImpl implements IPlatformService { @Autowired private PlatformChannelMapper platformChannelMapper; + @Autowired + private EventPublisher eventPublisher; /** * 流离开的处理 @@ -147,7 +150,6 @@ public class PlatformServiceImpl implements IPlatformService { } } - @Override public Platform queryPlatformByServerGBId(String platformGbId) { return platformMapper.getParentPlatByServerGBId(platformGbId); diff --git a/src/main/java/com/genersoft/iot/vmp/streamProxy/service/impl/StreamProxyPlayServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/streamProxy/service/impl/StreamProxyPlayServiceImpl.java index 5e26e889..4a1104b6 100755 --- a/src/main/java/com/genersoft/iot/vmp/streamProxy/service/impl/StreamProxyPlayServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/streamProxy/service/impl/StreamProxyPlayServiceImpl.java @@ -12,6 +12,7 @@ import com.genersoft.iot.vmp.vmanager.bean.ErrorCode; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; +import org.springframework.util.Assert; import org.springframework.util.ObjectUtils; /** @@ -48,12 +49,15 @@ public class StreamProxyPlayServiceImpl implements IStreamProxyPlayService { mediaServer = mediaServerService.getMediaServerForMinimumLoad(null); }else { mediaServer = mediaServerService.getOne(mediaServerId); + if (mediaServer == null) { + mediaServer = mediaServerService.getMediaServerForMinimumLoad(null); + } } if (mediaServer == null) { throw new ControllerException(ErrorCode.ERROR100.getCode(), "未找到可用的媒体节点"); } StreamInfo streamInfo = mediaServerService.startProxy(mediaServer, streamProxy); - if (mediaServerId == null) { + if (mediaServerId == null || !mediaServerId.equals(mediaServer.getId())) { streamProxy.setMediaServerId(mediaServer.getId()); streamProxyMapper.update(streamProxy); } @@ -72,15 +76,12 @@ public class StreamProxyPlayServiceImpl implements IStreamProxyPlayService { @Override public void stopProxy(StreamProxy streamProxy){ - MediaServer mediaServer; String mediaServerId = streamProxy.getMediaServerId(); - if (mediaServerId == null) { - mediaServer = mediaServerService.getMediaServerForMinimumLoad(null); - }else { - mediaServer = mediaServerService.getOne(mediaServerId); - } + Assert.notNull(mediaServerId, "代理节点不存在"); + + MediaServer mediaServer = mediaServerService.getOne(mediaServerId); if (mediaServer == null) { - throw new ControllerException(ErrorCode.ERROR100.getCode(), "未找到可用的媒体节点"); + throw new ControllerException(ErrorCode.ERROR100.getCode(), "媒体节点不存在"); } if (ObjectUtils.isEmpty(streamProxy.getStreamKey())) { mediaServerService.closeStreams(mediaServer, streamProxy.getApp(), streamProxy.getStream()); diff --git a/web_src/src/components/StreamProxyList.vue b/web_src/src/components/StreamProxyList.vue index 27cad5c0..c148c40e 100755 --- a/web_src/src/components/StreamProxyList.vue +++ b/web_src/src/components/StreamProxyList.vue @@ -19,12 +19,12 @@ :value="item.id"> - 推流状态: + 拉流状态: - + 添加代理 搜索ONVIF @@ -58,8 +58,8 @@ @@ -72,7 +72,7 @@ - +