From 31dcde7cc1ffe0d784504fa2cca8080c41768ea3 Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: Mon, 11 Dec 2023 22:49:48 +0800 Subject: [PATCH] =?UTF-8?q?=E4=B8=B4=E6=97=B6=E6=8F=90=E4=BA=A4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../iot/vmp/common/CommonGbChannel.java | 24 ++ .../iot/vmp/gb28181/bean/SubscribeHolder.java | 4 +- .../MobilePositionSubscribeHandlerTask.java | 4 +- .../vmp/media/zlm/ZLMHttpHookListener.java | 2 +- .../vmp/media/zlm/ZLMMediaListManager.java | 39 +-- .../iot/vmp/media/zlm/dto/StreamPush.java | 25 ++ .../vmp/service/ICommonGbChannelService.java | 2 + .../vmp/service/IDeviceChannelService.java | 1 - .../iot/vmp/service/IGroupService.java | 4 + .../vmp/service/IPlatformChannelService.java | 2 - .../iot/vmp/service/IPlatformService.java | 2 +- .../iot/vmp/service/IResourceService.java | 5 + .../iot/vmp/service/IStreamPushService.java | 7 +- .../impl/CommonGbChannelServiceImpl.java | 7 + .../vmp/service/impl/DeviceServiceImpl.java | 36 +-- .../vmp/service/impl/GroupServiceImpl.java | 6 + .../vmp/service/impl/PlatformServiceImpl.java | 4 +- .../service/impl/StreamPushServiceImpl.java | 235 +++++++++--------- .../impl/StreamPushUploadFileHandler.java | 42 +--- .../RedisPushStreamStatusListMsgListener.java | 8 +- .../redisMsg/RedisStreamMsgListener.java | 2 +- .../vmp/storager/IVideoManagerStorage.java | 1 - .../vmp/storager/dao/DeviceChannelMapper.java | 7 +- .../iot/vmp/storager/dao/GroupMapper.java | 5 + .../vmp/storager/dao/StreamPushMapper.java | 7 +- .../impl/VideoManagerStorageImpl.java | 5 - .../gb28181/platform/PlatformController.java | 15 -- .../streamPush/StreamPushController.java | 7 +- 28 files changed, 256 insertions(+), 252 deletions(-) diff --git a/src/main/java/com/genersoft/iot/vmp/common/CommonGbChannel.java b/src/main/java/com/genersoft/iot/vmp/common/CommonGbChannel.java index 5529ab5d..009c1bf0 100644 --- a/src/main/java/com/genersoft/iot/vmp/common/CommonGbChannel.java +++ b/src/main/java/com/genersoft/iot/vmp/common/CommonGbChannel.java @@ -3,11 +3,13 @@ package com.genersoft.iot.vmp.common; import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel; import com.genersoft.iot.vmp.gb28181.bean.Gb28181CodeType; import com.genersoft.iot.vmp.gb28181.utils.SipUtils; +import com.genersoft.iot.vmp.media.zlm.dto.StreamPush; import com.genersoft.iot.vmp.media.zlm.dto.StreamProxy; import com.genersoft.iot.vmp.service.bean.CommonGbChannelType; import com.genersoft.iot.vmp.service.impl.CommonGbChannelServiceImpl; import com.genersoft.iot.vmp.utils.DateUtil; import io.swagger.v3.oas.annotations.media.Schema; +import org.apache.commons.lang3.ObjectUtils; import org.apache.commons.lang3.math.NumberUtils; import org.checkerframework.checker.units.qual.C; import org.slf4j.Logger; @@ -762,6 +764,28 @@ public class CommonGbChannel { return commonGbChannel; } + public static CommonGbChannel getInstance(StreamPush streamPush){ + CommonGbChannel commonGbChannel = new CommonGbChannel(); + commonGbChannel.setCommonGbDeviceID(streamPush.getGbId()); + commonGbChannel.setType(CommonGbChannelType.PUSH); + if (!ObjectUtils.isEmpty(streamPush.getName().trim())) { + commonGbChannel.setCommonGbName(streamPush.getName().trim()); + } + if (streamPush.getLongitude() > 0) { + commonGbChannel.setCommonGbLongitude(streamPush.getLongitude()); + } + if (streamPush.getLatitude() > 0) { + commonGbChannel.setCommonGbLatitude(streamPush.getLatitude()); + } + if (!ObjectUtils.isEmpty(streamPush.getGroupDeviceId())) { + commonGbChannel.setCommonGbBusinessGroupID(streamPush.getGroupDeviceId()); + } + commonGbChannel.setUpdateTime(DateUtil.getNow()); + commonGbChannel.setCreateTime(DateUtil.getNow()); + + return commonGbChannel; + } + public Integer getSVCSpaceSupportMode() { return SVCSpaceSupportMode; } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SubscribeHolder.java b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SubscribeHolder.java index 83622a64..0389df06 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SubscribeHolder.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SubscribeHolder.java @@ -36,7 +36,7 @@ public class SubscribeHolder { // 添加订阅到期 String taskOverdueKey = taskOverduePrefix + "catalog_" + platformId; // 添加任务处理订阅过期 - dynamicTask.startDelay(taskOverdueKey, () -> removeCatalogSubscribe(subscribeInfo.getId()), + dynamicTask.startDelay(taskOverdueKey, () -> removeCatalogSubscribe(platformId), subscribeInfo.getExpires() * 1000); } } @@ -68,7 +68,7 @@ public class SubscribeHolder { if (subscribeInfo.getExpires() > 0) { // 添加任务处理订阅过期 dynamicTask.startDelay(taskOverdueKey, () -> { - removeMobilePositionSubscribe(subscribeInfo.getId()); + removeMobilePositionSubscribe(platformId); }, subscribeInfo.getExpires() * 1000); } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/task/impl/MobilePositionSubscribeHandlerTask.java b/src/main/java/com/genersoft/iot/vmp/gb28181/task/impl/MobilePositionSubscribeHandlerTask.java index 2e792c1c..a7e99789 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/task/impl/MobilePositionSubscribeHandlerTask.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/task/impl/MobilePositionSubscribeHandlerTask.java @@ -24,10 +24,10 @@ public class MobilePositionSubscribeHandlerTask implements ISubscribeTask { private IPlatformService platformService; - private String platformId; + private Integer platformId; - public MobilePositionSubscribeHandlerTask(String platformId) { + public MobilePositionSubscribeHandlerTask(Integer platformId) { this.platformService = SpringBeanFactory.getBean("platformServiceImpl"); this.platformId = platformId; } diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java index 9aed1c82..e8865fef 100755 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java @@ -417,7 +417,7 @@ public class ZLMHttpHookListener { if (gbStream != null) { // eventPublisher.catalogEventPublishForStream(null, gbStream, CatalogEvent.OFF); } - zlmMediaListManager.removeMedia(param.getApp(), param.getStream()); + zlmMediaListManager.streamOffline(param.getApp(), param.getStream()); } GbStream gbStream = storager.getGbStream(param.getApp(), param.getStream()); if (gbStream != null) { diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaListManager.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaListManager.java index 2b93e637..1b93e9ac 100755 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaListManager.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaListManager.java @@ -5,12 +5,11 @@ import com.genersoft.iot.vmp.gb28181.bean.GbStream; import com.genersoft.iot.vmp.media.zlm.dto.*; import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam; import com.genersoft.iot.vmp.service.IMediaServerService; +import com.genersoft.iot.vmp.service.IResourceService; import com.genersoft.iot.vmp.service.IStreamProxyService; import com.genersoft.iot.vmp.service.IStreamPushService; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.IVideoManagerStorage; -import com.genersoft.iot.vmp.storager.dao.GbStreamMapper; -import com.genersoft.iot.vmp.storager.dao.PlatformGbStreamMapper; import com.genersoft.iot.vmp.storager.dao.StreamPushMapper; import com.genersoft.iot.vmp.utils.DateUtil; import org.slf4j.Logger; @@ -39,17 +38,11 @@ public class ZLMMediaListManager { @Autowired private IVideoManagerStorage storager; - @Autowired - private GbStreamMapper gbStreamMapper; - - @Autowired - private PlatformGbStreamMapper platformGbStreamMapper; - @Autowired private IStreamPushService streamPushService; @Autowired - private IStreamProxyService streamProxyService; + private Map resourceServiceMap; @Autowired private StreamPushMapper streamPushMapper; @@ -71,16 +64,16 @@ public class ZLMMediaListManager { public StreamPush addPush(OnStreamChangedHookParam onStreamChangedHookParam) { StreamPush transform = streamPushService.transform(onStreamChangedHookParam); StreamPush pushInDb = streamPushService.getPush(onStreamChangedHookParam.getApp(), onStreamChangedHookParam.getStream()); - transform.setPushIng(onStreamChangedHookParam.isRegist()); - transform.setUpdateTime(DateUtil.getNow()); - transform.setPushTime(DateUtil.getNow()); - transform.setSelf(userSetting.getServerId().equals(onStreamChangedHookParam.getSeverId())); + if (pushInDb == null) { - transform.setCreateTime(DateUtil.getNow()); - streamPushMapper.add(transform); + + streamPushService.add(transform); }else { - streamPushMapper.update(transform); - gbStreamMapper.updateMediaServer(onStreamChangedHookParam.getApp(), onStreamChangedHookParam.getStream(), onStreamChangedHookParam.getMediaServerId()); + pushInDb.setPushIng(onStreamChangedHookParam.isRegist()); + pushInDb.setUpdateTime(DateUtil.getNow()); + pushInDb.setPushTime(DateUtil.getNow()); + pushInDb.setSelf(userSetting.getServerId().equals(onStreamChangedHookParam.getSeverId())); + streamPushService.update(pushInDb); } ChannelOnlineEvent channelOnlineEventLister = getChannelOnlineEventLister(transform.getApp(), transform.getStream()); if ( channelOnlineEventLister != null) { @@ -111,16 +104,10 @@ public class ZLMMediaListManager { } } - public int removeMedia(String app, String streamId) { - // 查找是否关联了国标, 关联了不删除, 置为离线 - GbStream gbStream = gbStreamMapper.selectOne(app, streamId); - int result; - if (gbStream == null) { - result = storager.removeMedia(app, streamId); - }else { - result =storager.mediaOffline(app, streamId); + public void streamOffline(String app, String streamId) { + for (String key : resourceServiceMap.keySet()) { + resourceServiceMap.get(key).streamOffline(app, streamId); } - return result; } public void addChannelOnlineEventLister(String app, String stream, ChannelOnlineEvent callback) { diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/StreamPush.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/StreamPush.java index e4e4d99c..ad96d5a3 100755 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/StreamPush.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/StreamPush.java @@ -1,6 +1,7 @@ package com.genersoft.iot.vmp.media.zlm.dto; import com.genersoft.iot.vmp.utils.DateUtil; +import com.genersoft.iot.vmp.vmanager.bean.StreamPushExcelDto; import io.swagger.v3.oas.annotations.media.Schema; import org.jetbrains.annotations.NotNull; @@ -108,6 +109,22 @@ public class StreamPush implements Comparable{ @Schema(description = "状态") private boolean status; + @Schema(description = "分组国标编号") + private String groupDeviceId; + + public static StreamPush getInstance(StreamPushExcelDto streamPushExcelDto) { + StreamPush streamPush = new StreamPush(); + streamPush.setApp(streamPushExcelDto.getApp()); + streamPush.setStream(streamPushExcelDto.getStream()); + streamPush.setStatus(streamPushExcelDto.isStatus()); + streamPush.setCreateTime(DateUtil.getNow()); + streamPush.setUpdateTime(DateUtil.getNow()); + streamPush.setGbId(streamPushExcelDto.getGbId()); + streamPush.setName(streamPushExcelDto.getName()); + streamPush.setGroupDeviceId(streamPushExcelDto.getCatalogId()); + return streamPush; + } + @Override public int compareTo(@NotNull StreamPush streamPushItem) { @@ -266,5 +283,13 @@ public class StreamPush implements Comparable{ public void setStatus(boolean status) { this.status = status; } + + public String getGroupDeviceId() { + return groupDeviceId; + } + + public void setGroupDeviceId(String groupDeviceId) { + this.groupDeviceId = groupDeviceId; + } } diff --git a/src/main/java/com/genersoft/iot/vmp/service/ICommonGbChannelService.java b/src/main/java/com/genersoft/iot/vmp/service/ICommonGbChannelService.java index 0d608fd2..f91bce54 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/ICommonGbChannelService.java +++ b/src/main/java/com/genersoft/iot/vmp/service/ICommonGbChannelService.java @@ -85,4 +85,6 @@ public interface ICommonGbChannelService { void deleteById(int commonGbChannelId); void deleteByIdList(List commonChannelIdList); + + void offlineForList(List onlinePushers); } diff --git a/src/main/java/com/genersoft/iot/vmp/service/IDeviceChannelService.java b/src/main/java/com/genersoft/iot/vmp/service/IDeviceChannelService.java index 966ba882..0db27910 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/IDeviceChannelService.java +++ b/src/main/java/com/genersoft/iot/vmp/service/IDeviceChannelService.java @@ -3,7 +3,6 @@ package com.genersoft.iot.vmp.service; import com.genersoft.iot.vmp.gb28181.bean.Device; import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel; import com.genersoft.iot.vmp.vmanager.bean.ResourceBaseInfo; -import com.genersoft.iot.vmp.vmanager.gb28181.platform.bean.ChannelReduce; import java.util.List; diff --git a/src/main/java/com/genersoft/iot/vmp/service/IGroupService.java b/src/main/java/com/genersoft/iot/vmp/service/IGroupService.java index acf42e4e..e52f6f9e 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/IGroupService.java +++ b/src/main/java/com/genersoft/iot/vmp/service/IGroupService.java @@ -5,6 +5,7 @@ import com.genersoft.iot.vmp.service.bean.Group; import com.github.pagehelper.PageInfo; import java.util.List; +import java.util.Map; /** * 业务分组 @@ -71,4 +72,7 @@ public interface IGroupService { * 查询子节点 */ PageInfo queryChildGroupList(String groupParentId, int page, int count); + + Map getAllGroupMap(); + } diff --git a/src/main/java/com/genersoft/iot/vmp/service/IPlatformChannelService.java b/src/main/java/com/genersoft/iot/vmp/service/IPlatformChannelService.java index df95f1df..e18e1407 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/IPlatformChannelService.java +++ b/src/main/java/com/genersoft/iot/vmp/service/IPlatformChannelService.java @@ -1,8 +1,6 @@ package com.genersoft.iot.vmp.service; -import com.genersoft.iot.vmp.common.CommonGbChannel; import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform; -import com.genersoft.iot.vmp.vmanager.gb28181.platform.bean.ChannelReduce; import java.util.List; diff --git a/src/main/java/com/genersoft/iot/vmp/service/IPlatformService.java b/src/main/java/com/genersoft/iot/vmp/service/IPlatformService.java index dcfd2913..17cbae4a 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/IPlatformService.java +++ b/src/main/java/com/genersoft/iot/vmp/service/IPlatformService.java @@ -54,7 +54,7 @@ public interface IPlatformService { * 向上级平台发送位置订阅 * @param platformId 平台 */ - void sendNotifyMobilePosition(String platformId); + void sendNotifyMobilePosition(Integer platformId); void addSimulatedSubscribeInfo(ParentPlatform parentPlatform); diff --git a/src/main/java/com/genersoft/iot/vmp/service/IResourceService.java b/src/main/java/com/genersoft/iot/vmp/service/IResourceService.java index 541c4536..7ba811a6 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/IResourceService.java +++ b/src/main/java/com/genersoft/iot/vmp/service/IResourceService.java @@ -37,4 +37,9 @@ public interface IResourceService { */ boolean ptzControl(CommonGbChannel commonGbChannel, String command, Integer horizonSpeed, Integer verticalSpeed, Integer zoomSpeed); + + /** + * 流离线 + */ + void streamOffline(String app, String streamId); } diff --git a/src/main/java/com/genersoft/iot/vmp/service/IStreamPushService.java b/src/main/java/com/genersoft/iot/vmp/service/IStreamPushService.java index 329c6512..92e29990 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/IStreamPushService.java +++ b/src/main/java/com/genersoft/iot/vmp/service/IStreamPushService.java @@ -7,6 +7,7 @@ import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import com.genersoft.iot.vmp.media.zlm.dto.StreamPush; import com.genersoft.iot.vmp.service.bean.StreamPushItemFromRedis; import com.genersoft.iot.vmp.vmanager.bean.ResourceBaseInfo; +import com.genersoft.iot.vmp.vmanager.bean.StreamPushExcelDto; import com.github.pagehelper.PageInfo; import java.util.List; @@ -65,7 +66,7 @@ public interface IStreamPushService { /** * 导入时批量增加 */ - void batchAddForUpload(List streamPushItems, Map> streamPushItemsForAll); + void batchAddForUpload(List streamPushItems); /** * 全部离线 @@ -85,7 +86,7 @@ public interface IStreamPushService { /** * 增加推流 */ - boolean add(StreamPush stream, CommonGbChannel commonGbChannel); + boolean add(StreamPush stream); /** * 获取全部的app+Streanm 用于判断推流列表是新增还是修改 @@ -100,4 +101,6 @@ public interface IStreamPushService { ResourceBaseInfo getOverview(); void batchUpdate(List streamPushItemForUpdate); + + void update(StreamPush transform); } diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/CommonGbChannelServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/CommonGbChannelServiceImpl.java index 20402993..f340195d 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/CommonGbChannelServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/CommonGbChannelServiceImpl.java @@ -777,4 +777,11 @@ public class CommonGbChannelServiceImpl implements ICommonGbChannelService { commonGbChannelMapper.deleteByIdList(commonChannelIdList); // TODO 向国标级联发送catalog } + + @Override + public void offlineForList(List onlinePushers) { + + + // TODO 向国标级联发送catalog + } } diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceServiceImpl.java index 697d5c7d..90dc72c5 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceServiceImpl.java @@ -12,14 +12,10 @@ import com.genersoft.iot.vmp.gb28181.task.impl.MobilePositionSubscribeTask; import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander; import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander; import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.response.cmd.CatalogResponseMessageHandler; -import com.genersoft.iot.vmp.service.IDeviceChannelService; -import com.genersoft.iot.vmp.service.IDeviceService; -import com.genersoft.iot.vmp.service.IInviteStreamService; -import com.genersoft.iot.vmp.service.IMediaServerService; +import com.genersoft.iot.vmp.service.*; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.dao.DeviceChannelMapper; import com.genersoft.iot.vmp.storager.dao.DeviceMapper; -import com.genersoft.iot.vmp.storager.dao.PlatformChannelMapper; import com.genersoft.iot.vmp.utils.DateUtil; import com.genersoft.iot.vmp.vmanager.bean.BaseTree; import com.genersoft.iot.vmp.vmanager.bean.ResourceBaseInfo; @@ -30,6 +26,7 @@ import org.springframework.jdbc.datasource.DataSourceTransactionManager; import org.springframework.stereotype.Service; import org.springframework.transaction.TransactionDefinition; import org.springframework.transaction.TransactionStatus; +import org.springframework.transaction.annotation.Transactional; import org.springframework.util.ObjectUtils; import javax.sip.InvalidArgumentException; @@ -69,8 +66,6 @@ public class DeviceServiceImpl implements IDeviceService { @Autowired private DeviceMapper deviceMapper; - @Autowired - private PlatformChannelMapper platformChannelMapper; @Autowired private IDeviceChannelService deviceChannelService; @@ -96,6 +91,9 @@ public class DeviceServiceImpl implements IDeviceService { @Autowired private IMediaServerService mediaServerService; + @Autowired + private ICommonGbChannelService commonGbChannelService; + @Override public void online(Device device, SipTransactionInfo sipTransactionInfo) { logger.info("[设备上线] deviceId:{}->{}:{}", device.getDeviceId(), device.getIp(), device.getPort()); @@ -571,25 +569,13 @@ public class DeviceServiceImpl implements IDeviceService { } @Override + @Transactional public boolean delete(String deviceId) { - TransactionStatus transactionStatus = dataSourceTransactionManager.getTransaction(transactionDefinition); - boolean result = false; - try { - platformChannelMapper.delChannelForDeviceId(deviceId); - deviceChannelMapper.cleanChannelsByDeviceId(deviceId); - if ( deviceMapper.del(deviceId) < 0 ) { - //事务回滚 - dataSourceTransactionManager.rollback(transactionStatus); - } - result = true; - dataSourceTransactionManager.commit(transactionStatus); //手动提交 - }catch (Exception e) { - dataSourceTransactionManager.rollback(transactionStatus); - } - if (result) { - redisCatchStorage.removeDevice(deviceId); - } - return result; + List commonChannelIdList = deviceChannelMapper.getCommonChannelIdList(deviceId); + commonGbChannelService.deleteByIdList(commonChannelIdList); + deviceChannelMapper.cleanChannelsByDeviceId(deviceId); + deviceMapper.del(deviceId); + return true; } @Override diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/GroupServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/GroupServiceImpl.java index 007215f7..00db0a1b 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/GroupServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/GroupServiceImpl.java @@ -25,6 +25,7 @@ import org.springframework.util.ObjectUtils; import java.util.ArrayList; import java.util.List; +import java.util.Map; @Service public class GroupServiceImpl implements IGroupService { @@ -270,4 +271,9 @@ public class GroupServiceImpl implements IGroupService { return new PageInfo<>(groupList); } + + @Override + public Map getAllGroupMap() { + return groupMapper.queryAllForMap(); + } } diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/PlatformServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/PlatformServiceImpl.java index 9e86a499..fd40a28d 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/PlatformServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/PlatformServiceImpl.java @@ -385,8 +385,8 @@ public class PlatformServiceImpl implements IPlatformService { } @Override - public void sendNotifyMobilePosition(String platformId) { - ParentPlatform platform = platformMapper.getParentPlatByServerGBId(platformId); + public void sendNotifyMobilePosition(Integer platformId) { + ParentPlatform platform = platformMapper.getParentPlatById(platformId); if (platform == null) { return; } diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java index 96fa16c9..dfc6111f 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java @@ -3,8 +3,8 @@ package com.genersoft.iot.vmp.service.impl; import com.alibaba.fastjson2.JSON; import com.alibaba.fastjson2.JSONObject; import com.alibaba.fastjson2.TypeReference; +import com.genersoft.iot.vmp.common.BatchLimit; import com.genersoft.iot.vmp.common.CommonGbChannel; -import com.genersoft.iot.vmp.conf.MediaConfig; import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.gb28181.bean.*; import com.genersoft.iot.vmp.gb28181.event.EventPublisher; @@ -13,15 +13,15 @@ import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils; import com.genersoft.iot.vmp.media.zlm.dto.*; import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam; import com.genersoft.iot.vmp.media.zlm.dto.hook.OriginType; -import com.genersoft.iot.vmp.service.ICommonGbChannelService; -import com.genersoft.iot.vmp.service.IMediaServerService; -import com.genersoft.iot.vmp.service.IStreamPushService; +import com.genersoft.iot.vmp.service.*; import com.genersoft.iot.vmp.service.bean.CommonGbChannelType; +import com.genersoft.iot.vmp.service.bean.Group; import com.genersoft.iot.vmp.service.bean.StreamPushItemFromRedis; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.dao.*; import com.genersoft.iot.vmp.utils.DateUtil; import com.genersoft.iot.vmp.vmanager.bean.ResourceBaseInfo; +import com.genersoft.iot.vmp.vmanager.bean.StreamPushExcelDto; import com.github.pagehelper.PageHelper; import com.github.pagehelper.PageInfo; import org.slf4j.Logger; @@ -66,13 +66,14 @@ public class StreamPushServiceImpl implements IStreamPushService { private ICommonGbChannelService commonGbChannelService; @Autowired - DataSourceTransactionManager dataSourceTransactionManager; + private DataSourceTransactionManager dataSourceTransactionManager; @Autowired - TransactionDefinition transactionDefinition; + private TransactionDefinition transactionDefinition; @Autowired - private MediaConfig mediaConfig; + private IGroupService groupService; + @Override @@ -163,14 +164,14 @@ public class StreamPushServiceImpl implements IStreamPushService { // redis记录 List onStreamChangedHookParams = redisCatchStorage.getStreams(mediaServerId, "PUSH"); Map streamInfoPushItemMap = new HashMap<>(); - if (pushList.size() > 0) { + if (!pushList.isEmpty()) { for (StreamPush streamPushItem : pushList) { if (streamPushItem.getCommonGbChannelId() > 0) { pushItemMap.put(streamPushItem.getApp() + streamPushItem.getStream(), streamPushItem); } } } - if (onStreamChangedHookParams.size() > 0) { + if (!onStreamChangedHookParams.isEmpty()) { for (OnStreamChangedHookParam onStreamChangedHookParam : onStreamChangedHookParams) { streamInfoPushItemMap.put(onStreamChangedHookParam.getApp() + onStreamChangedHookParam.getStream(), onStreamChangedHookParam); } @@ -203,13 +204,12 @@ public class StreamPushServiceImpl implements IStreamPushService { } } List offlinePushItems = new ArrayList<>(pushItemMap.values()); - if (offlinePushItems.size() > 0) { + if (!offlinePushItems.isEmpty()) { String type = "PUSH"; - int runLimit = 300; - if (offlinePushItems.size() > runLimit) { - for (int i = 0; i < offlinePushItems.size(); i += runLimit) { - int toIndex = i + runLimit; - if (i + runLimit > offlinePushItems.size()) { + if (offlinePushItems.size() > BatchLimit.count) { + for (int i = 0; i < offlinePushItems.size(); i += BatchLimit.count) { + int toIndex = i + BatchLimit.count; + if (i + BatchLimit.count > offlinePushItems.size()) { toIndex = offlinePushItems.size(); } List streamPushItemsSub = offlinePushItems.subList(i, toIndex); @@ -221,7 +221,7 @@ public class StreamPushServiceImpl implements IStreamPushService { } Collection offlineOnStreamChangedHookParamList = streamInfoPushItemMap.values(); - if (offlineOnStreamChangedHookParamList.size() > 0) { + if (!offlineOnStreamChangedHookParamList.isEmpty()) { String type = "PUSH"; for (OnStreamChangedHookParam offlineOnStreamChangedHookParam : offlineOnStreamChangedHookParamList) { JSONObject jsonObject = new JSONObject(); @@ -237,7 +237,7 @@ public class StreamPushServiceImpl implements IStreamPushService { } Collection streamAuthorityInfos = streamAuthorityInfoInfoMap.values(); - if (streamAuthorityInfos.size() > 0) { + if (!streamAuthorityInfos.isEmpty()) { for (StreamAuthorityInfo streamAuthorityInfo : streamAuthorityInfos) { // 移除redis内流的信息 redisCatchStorage.removeStreamAuthorityInfo(streamAuthorityInfo.getApp(), streamAuthorityInfo.getStream()); @@ -258,7 +258,7 @@ public class StreamPushServiceImpl implements IStreamPushService { String type = "PUSH"; // 发送redis消息 List streamInfoList = redisCatchStorage.getStreams(mediaServerId, type); - if (streamInfoList.size() > 0) { + if (!streamInfoList.isEmpty()) { for (OnStreamChangedHookParam onStreamChangedHookParam : streamInfoList) { // 移除redis内流的信息 redisCatchStorage.removeStream(mediaServerId, type, onStreamChangedHookParam.getApp(), onStreamChangedHookParam.getStream()); @@ -282,10 +282,40 @@ public class StreamPushServiceImpl implements IStreamPushService { @Transactional public void batchAdd(List streamPushItems) { // 把存在国标Id的写入同步资源库 + List commonGbChannelList = new ArrayList<>(); + List streamPushListForChannel = new ArrayList<>(); + List streamPushListWithoutChannel = new ArrayList<>(); + // 将含有国标编号的推流数据与没有国标编号的进行拆分,拆分先将通用通道存储,得到每个通道的ID,赋值给推流信息后再将所有推流信息存入 + streamPushItems.stream().forEach(streamPush -> { + if (!ObjectUtils.isEmpty(streamPush.getGbId())) { + CommonGbChannel channel = CommonGbChannel.getInstance(streamPush); + commonGbChannelList.add(channel); + streamPushListForChannel.add(streamPush); + }else { + streamPushListWithoutChannel.add(streamPush); + } + }); + if (!commonGbChannelList.isEmpty()) { - streamPushMapper.addAll(streamPushItems); - + commonGbChannelService.batchAdd(commonGbChannelList); + for (int i = 0; i < commonGbChannelList.size(); i++) { + streamPushListForChannel.get(i).setCommonGbChannelId(commonGbChannelList.get(i).getCommonGbId()); + } + streamPushListWithoutChannel.addAll(streamPushListForChannel); + } + if (streamPushListWithoutChannel.size() > BatchLimit.count) { + for (int i = 0; i < streamPushListWithoutChannel.size(); i += BatchLimit.count) { + int toIndex = i + BatchLimit.count; + if (i + BatchLimit.count > streamPushListWithoutChannel.size()) { + toIndex = streamPushListWithoutChannel.size(); + } + List streamPushItemsSub = streamPushListWithoutChannel.subList(i, toIndex); + streamPushMapper.addAll(streamPushItemsSub); + } + }else { + streamPushMapper.addAll(streamPushListWithoutChannel); + } } @Override @@ -294,91 +324,45 @@ public class StreamPushServiceImpl implements IStreamPushService { } @Override - public void batchAddForUpload(List streamPushItems, Map> streamPushItemsForAll ) { - // 存储数据到stream_push表 - streamPushMapper.addAll(streamPushItems); - List streamPushItemForGbStream = streamPushItems.stream() - .filter(streamPushItem-> streamPushItem.getGbId() != null) - .collect(Collectors.toList()); - // 存储数据到gb_stream表, id会返回到streamPushItemForGbStream里 - if (streamPushItemForGbStream.size() > 0) { - gbStreamMapper.batchAdd(streamPushItemForGbStream); + @Transactional + public void batchAddForUpload(List streamPushExcelDtoList) { + // 插入国标通用通道得到通道ID + List commonGbChannelList = new ArrayList<>(); + List streamPushListForChannel = new ArrayList<>(); + List streamPushListWithoutChannel = new ArrayList<>(); + Map groupMap = groupService.getAllGroupMap(); + streamPushExcelDtoList.stream().forEach(streamPushExcelDto -> { + StreamPush streamPush = StreamPush.getInstance(streamPushExcelDto); + + if (!ObjectUtils.isEmpty(streamPushExcelDto.getGbId().trim())) { + CommonGbChannel commonGbChannel = CommonGbChannel.getInstance(streamPush); + if (!ObjectUtils.isEmpty(streamPushExcelDto.getCatalogId()) + && groupMap.containsKey(streamPushExcelDto.getCatalogId())) { + commonGbChannel.setCommonGbBusinessGroupID(streamPushExcelDto.getCatalogId()); + } + commonGbChannelList.add(commonGbChannel); + streamPushListForChannel.add(streamPush); + }else { + streamPushListWithoutChannel.add(streamPush); + + } + }); + commonGbChannelService.batchAdd(commonGbChannelList); + for (int i = 0; i < commonGbChannelList.size(); i++) { + streamPushListForChannel.get(i).setCommonGbChannelId(commonGbChannelList.get(i).getCommonGbId()); } - // 去除没有ID也就是没有存储到数据库的数据 - List streamPushItemsForPlatform = streamPushItemForGbStream.stream() - .filter(streamPushItem-> streamPushItem.getGbStreamId() != null) - .collect(Collectors.toList()); - - if (streamPushItemsForPlatform.size() > 0) { - // 获取所有平台,平台和目录信息一般不会特别大量。 - List parentPlatformList = parentPlatformMapper.getParentPlatformList(); - Map> platformInfoMap = new HashMap<>(); - if (parentPlatformList.size() == 0) { - return; - } - for (ParentPlatform platform : parentPlatformList) { - Map catalogMap = new HashMap<>(); - - // 创建根节点 - PlatformCatalog platformCatalog = new PlatformCatalog(); - platformCatalog.setId(platform.getServerGBId()); - catalogMap.put(platform.getServerGBId(), platformCatalog); - - // 查询所有节点信息 - List platformCatalogs = platformCatalogMapper.selectByPlatForm(platform.getServerGBId()); - if (platformCatalogs.size() > 0) { - for (PlatformCatalog catalog : platformCatalogs) { - catalogMap.put(catalog.getId(), catalog); - } - } - platformInfoMap.put(platform.getServerGBId(), catalogMap); - } - List streamPushItemListFroPlatform = new ArrayList<>(); - Map> platformForEvent = new HashMap<>(); - // 遍历存储结果,查找app+Stream->platformId+catalogId的对应关系,然后执行批量写入 - for (StreamPush streamPushItem : streamPushItemsForPlatform) { - List platFormInfoList = streamPushItemsForAll.get(streamPushItem.getApp() + streamPushItem.getStream()); - if (platFormInfoList != null && platFormInfoList.size() > 0) { - for (String[] platFormInfoArray : platFormInfoList) { - StreamPush streamPushItemForPlatform = new StreamPush(); - streamPushItemForPlatform.setGbStreamId(streamPushItem.getGbStreamId()); - if (platFormInfoArray.length > 0) { - // 数组 platFormInfoArray 0 为平台ID。 1为目录ID - // 不存在这个平台,则忽略导入此关联关系 - if (platformInfoMap.get(platFormInfoArray[0]) == null - || platformInfoMap.get(platFormInfoArray[0]).get(platFormInfoArray[1]) == null) { - logger.info("导入数据时不存在平台或目录{}/{},已导入未分配", platFormInfoArray[0], platFormInfoArray[1] ); - continue; - } - streamPushItemForPlatform.setPlatformId(platFormInfoArray[0]); - List gbStreamList = platformForEvent.get(platFormInfoArray[0]); - if (gbStreamList == null) { - gbStreamList = new ArrayList<>(); - platformForEvent.put(platFormInfoArray[0], gbStreamList); - } - // 为发送通知整理数据 - streamPushItemForPlatform.setName(streamPushItem.getName()); - streamPushItemForPlatform.setApp(streamPushItem.getApp()); - streamPushItemForPlatform.setStream(streamPushItem.getStream()); - streamPushItemForPlatform.setGbId(streamPushItem.getGbId()); - gbStreamList.add(streamPushItemForPlatform); - } - if (platFormInfoArray.length > 1) { - streamPushItemForPlatform.setCatalogId(platFormInfoArray[1]); - } - streamPushItemListFroPlatform.add(streamPushItemForPlatform); - } - - } - } - if (!streamPushItemListFroPlatform.isEmpty()) { - platformGbStreamMapper.batchAdd(streamPushItemListFroPlatform); - // 发送通知 - for (String platformId : platformForEvent.keySet()) { - eventPublisher.catalogEventPublishForStream( - platformId, platformForEvent.get(platformId), CatalogEvent.ADD); + streamPushListWithoutChannel.addAll(streamPushListForChannel); + if (streamPushListWithoutChannel.size() > BatchLimit.count) { + for (int i = 0; i < streamPushListWithoutChannel.size(); i += BatchLimit.count) { + int toIndex = i + BatchLimit.count; + if (i + BatchLimit.count > streamPushListWithoutChannel.size()) { + toIndex = streamPushListWithoutChannel.size(); } + List streamPushItemsSub = streamPushListWithoutChannel.subList(i, toIndex); + streamPushMapper.addAll(streamPushItemsSub); } + }else { + streamPushMapper.addAll(streamPushListWithoutChannel); } } @@ -401,14 +385,12 @@ public class StreamPushServiceImpl implements IStreamPushService { @Override public void allStreamOffline() { - List onlinePushers = streamPushMapper.getOnlinePusherForGb(); - if (onlinePushers.size() == 0) { - return; - } + List onlinePushers = streamPushMapper.getOnlinePusherForGb(); streamPushMapper.setAllStreamOffline(); + if (!onlinePushers.isEmpty()) { + commonGbChannelService.offlineForList(onlinePushers); + } - // 发送通知 - eventPublisher.catalogEventPublishForStream(null, onlinePushers, CatalogEvent.OFF); } @Override @@ -431,19 +413,15 @@ public class StreamPushServiceImpl implements IStreamPushService { @Override @Transactional - public boolean add(StreamPush stream, CommonGbChannel commonGbChannel) { - assert !ObjectUtils.isEmpty(commonGbChannel.getCommonGbDeviceID()); - assert !ObjectUtils.isEmpty(commonGbChannel.getCommonGbName()); + public boolean add(StreamPush stream) { String now = DateUtil.getNow(); - commonGbChannel.setCreateTime(now); - commonGbChannel.setUpdateTime(now); - commonGbChannel.setType(CommonGbChannelType.PUSH); - - commonGbChannelService.add(commonGbChannel); - if (commonGbChannel.getCommonGbId() > 0) { - stream.setCommonGbChannelId(commonGbChannel.getCommonGbId()); - }else { - return false; + CommonGbChannel commonGbChannel = null; + if (!ObjectUtils.isEmpty(stream.getGbId())) { + commonGbChannel = CommonGbChannel.getInstance(stream); + commonGbChannelService.add(commonGbChannel); + if (commonGbChannel.getCommonGbId() > 0) { + stream.setCommonGbChannelId(commonGbChannel.getCommonGbId()); + } } stream.setUpdateTime(now); stream.setCreateTime(now); @@ -451,6 +429,23 @@ public class StreamPushServiceImpl implements IStreamPushService { return streamPushMapper.add(stream) > 1; } + @Override + @Transactional + public void update(StreamPush streamPush) { + assert streamPush.getId() > 0; + StreamPush streamPushIDb = streamPushMapper.query(streamPush.getId()); + assert streamPushIDb != null; + if (streamPushIDb.getCommonGbChannelId() > 0 && streamPush.getCommonGbChannelId() == 0) { + commonGbChannelService.deleteById(streamPushIDb.getCommonGbChannelId()); + } + if (streamPushIDb.getCommonGbChannelId() == 0 && streamPush.getCommonGbChannelId() > 0) { + CommonGbChannel commonGbChannel = CommonGbChannel.getInstance(streamPush); + commonGbChannelService.add(commonGbChannel); + } + streamPush.setUpdateTime(DateUtil.getNow()); + streamPushMapper.update(streamPush); + } + @Override public Map getAllAppAndStream() { return streamPushMapper.getAllAppAndStream(); diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushUploadFileHandler.java b/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushUploadFileHandler.java index 5d9a219c..5920dc56 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushUploadFileHandler.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushUploadFileHandler.java @@ -32,12 +32,12 @@ public class StreamPushUploadFileHandler extends AnalysisEventListener streamPushItems = new ArrayList<>(); + private final List streamPushItems = new ArrayList<>(); /** * 用于存储更具APP+Stream过滤后的数据,可以直接存入stream_push表与gb_stream表 */ - private final Map streamPushItemForSave = new HashMap<>(); + private final Map streamPushItemForSave = new HashMap<>(); /** * 用于存储按照APP+Stream为KEY, 平台ID+目录Id 为value的数据,用于存储到gb_stream表后获取app+Stream对应的平台与目录信息,然后存入关联表 @@ -81,7 +81,7 @@ public class StreamPushUploadFileHandler extends AnalysisEventListener allAppAndStreams = pushService.getAllAppAndStream(); - if (allAppAndStreams.size() > 0) { + if (!allAppAndStreams.isEmpty()) { for (String allAppAndStream : allAppAndStreams.keySet()) { pushMapInDb.put(allAppAndStream, allAppAndStream); } @@ -126,37 +126,9 @@ public class StreamPushUploadFileHandler extends AnalysisEventListener platformList = streamPushItemsForPlatform.get(streamPushItem.getApp() + streamPushItem.getStream()); - if (platformList == null) { - platformList = new ArrayList<>(); - streamPushItemsForPlatform.put(streamPushItem.getApp() + streamPushItem.getStream(), platformList); - } - String platformId = streamPushExcelDto.getPlatformId(); - String catalogId = streamPushExcelDto.getCatalogId(); - if (ObjectUtils.isEmpty(streamPushExcelDto.getCatalogId())) { - catalogId = null; - } - String[] platFormInfoArray = new String[]{platformId, catalogId}; - platformList.add(platFormInfoArray); - } + streamPushItems.add(streamPushExcelDto); + streamPushItemForSave.put(streamPushExcelDto.getApp() + streamPushExcelDto.getStream(), streamPushExcelDto); loadedSize ++; if (loadedSize > 1000) { @@ -182,9 +154,9 @@ public class StreamPushUploadFileHandler extends AnalysisEventListener 0) { + if (!streamPushItemForSave.isEmpty()) { // 向数据库查询是否存在重复的app - pushService.batchAddForUpload(new ArrayList<>(streamPushItemForSave.values()), streamPushItemsForPlatform); + pushService.batchAddForUpload(new ArrayList<>(streamPushItemForSave.values())); } } } diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamStatusListMsgListener.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamStatusListMsgListener.java index 3209bb74..52c672dd 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamStatusListMsgListener.java +++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamStatusListMsgListener.java @@ -3,10 +3,8 @@ package com.genersoft.iot.vmp.service.redisMsg; import com.alibaba.fastjson2.JSON; import com.alibaba.fastjson2.JSONObject; import com.genersoft.iot.vmp.media.zlm.dto.StreamPush; -import com.genersoft.iot.vmp.service.IGbStreamService; import com.genersoft.iot.vmp.service.IMediaServerService; import com.genersoft.iot.vmp.service.IStreamPushService; -import com.genersoft.iot.vmp.service.dto.StreamPushDto; import com.genersoft.iot.vmp.utils.DateUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -78,7 +76,11 @@ public class RedisPushStreamStatusListMsgListener implements MessageListener { if (!streamPush.getName().equals(streamPushBoInDB.getName()) || !streamPush.getGbId().equals(streamPushBoInDB.getGbId()) || !streamPush.isStatus() == streamPushBoInDB.isStatus()) { - streamPushItemForUpdate.add(streamPush); + streamPushBoInDB.setName(streamPush.getName()); + streamPushBoInDB.setGbId(streamPush.getGbId()); + streamPushBoInDB.setStatus(streamPush.isStatus()); + + streamPushItemForUpdate.add(streamPushBoInDB); } } } diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisStreamMsgListener.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisStreamMsgListener.java index f5f29487..7fdaaaf8 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisStreamMsgListener.java +++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisStreamMsgListener.java @@ -79,7 +79,7 @@ public class RedisStreamMsgListener implements MessageListener { if (register) { zlmMediaListManager.addPush(onStreamChangedHookParam); }else { - zlmMediaListManager.removeMedia(app, stream); + zlmMediaListManager.streamOffline(app, stream); } }catch (Exception e) { logger.warn("[REDIS消息-流变化] 发现未处理的异常, \r\n{}", JSON.toJSONString(message)); diff --git a/src/main/java/com/genersoft/iot/vmp/storager/IVideoManagerStorage.java b/src/main/java/com/genersoft/iot/vmp/storager/IVideoManagerStorage.java index 48aa6613..d43092ec 100755 --- a/src/main/java/com/genersoft/iot/vmp/storager/IVideoManagerStorage.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/IVideoManagerStorage.java @@ -4,7 +4,6 @@ import com.genersoft.iot.vmp.gb28181.bean.*; import com.genersoft.iot.vmp.media.zlm.dto.StreamProxy; import com.genersoft.iot.vmp.service.bean.GPSMsgInfo; import com.genersoft.iot.vmp.storager.dao.dto.ChannelSourceInfo; -import com.genersoft.iot.vmp.vmanager.gb28181.platform.bean.ChannelReduce; import com.genersoft.iot.vmp.web.gb28181.dto.DeviceChannelExtend; import com.github.pagehelper.PageInfo; diff --git a/src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceChannelMapper.java b/src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceChannelMapper.java index f7bd028a..e843aa3f 100755 --- a/src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceChannelMapper.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceChannelMapper.java @@ -464,5 +464,10 @@ public interface DeviceChannelMapper { " where common_gb_channel_id = #{commonGbId}") int removeCommonChannelId(@Param("commonGbId") int commonGbId); - + @Select(value = {" "}) + List getCommonChannelIdList(@Param("deviceId") String deviceId); } diff --git a/src/main/java/com/genersoft/iot/vmp/storager/dao/GroupMapper.java b/src/main/java/com/genersoft/iot/vmp/storager/dao/GroupMapper.java index 62473245..9ce47e88 100644 --- a/src/main/java/com/genersoft/iot/vmp/storager/dao/GroupMapper.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/dao/GroupMapper.java @@ -163,4 +163,9 @@ public interface GroupMapper { "" + ""}) int updateAll(@Param("groupList") List groupList); + + @MapKey("commonGroupDeviceId") + @Select("select * from wvp_common_group") + Map queryAllForMap(); + } diff --git a/src/main/java/com/genersoft/iot/vmp/storager/dao/StreamPushMapper.java b/src/main/java/com/genersoft/iot/vmp/storager/dao/StreamPushMapper.java index eac48fb4..f201fece 100755 --- a/src/main/java/com/genersoft/iot/vmp/storager/dao/StreamPushMapper.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/dao/StreamPushMapper.java @@ -166,8 +166,8 @@ public interface StreamPushMapper { ")") void online(List onlineStreams); - @Select("SELECT gs.* FROM wvp_stream_push sp left join wvp_gb_stream gs on sp.app = gs.app AND sp.stream = gs.stream where sp.status = true") - List getOnlinePusherForGb(); + @Select("SELECT common_gb_channel_id FROM wvp_stream_push gb_id > 0") + List getOnlinePusherForGb(); @Update("UPDATE wvp_stream_push SET status=0") void setAllStreamOffline(); @@ -192,4 +192,7 @@ public interface StreamPushMapper { "" + "") List getListIn(List streamPushItems); + + @Select("select* from wvp_stream_push where id = #{id}") + StreamPush query(@Param("id") Integer id); } diff --git a/src/main/java/com/genersoft/iot/vmp/storager/impl/VideoManagerStorageImpl.java b/src/main/java/com/genersoft/iot/vmp/storager/impl/VideoManagerStorageImpl.java index 86603162..a0a79977 100755 --- a/src/main/java/com/genersoft/iot/vmp/storager/impl/VideoManagerStorageImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/impl/VideoManagerStorageImpl.java @@ -5,13 +5,11 @@ import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.gb28181.bean.*; import com.genersoft.iot.vmp.gb28181.event.EventPublisher; import com.genersoft.iot.vmp.media.zlm.dto.StreamProxy; -import com.genersoft.iot.vmp.service.IGbStreamService; import com.genersoft.iot.vmp.service.bean.GPSMsgInfo; import com.genersoft.iot.vmp.storager.IVideoManagerStorage; import com.genersoft.iot.vmp.storager.dao.*; import com.genersoft.iot.vmp.storager.dao.dto.ChannelSourceInfo; import com.genersoft.iot.vmp.utils.DateUtil; -import com.genersoft.iot.vmp.vmanager.gb28181.platform.bean.ChannelReduce; import com.genersoft.iot.vmp.web.gb28181.dto.DeviceChannelExtend; import com.github.pagehelper.PageHelper; import com.github.pagehelper.PageInfo; @@ -66,9 +64,6 @@ public class VideoManagerStorageImpl implements IVideoManagerStorage { @Autowired private StreamPushMapper streamPushMapper; - @Autowired - private GbStreamMapper gbStreamMapper; - @Autowired private UserSetting userSetting; diff --git a/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/platform/PlatformController.java b/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/platform/PlatformController.java index 5aff760d..45aa360b 100755 --- a/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/platform/PlatformController.java +++ b/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/platform/PlatformController.java @@ -1,23 +1,13 @@ package com.genersoft.iot.vmp.vmanager.gb28181.platform; -import com.alibaba.fastjson2.JSON; import com.alibaba.fastjson2.JSONObject; -import com.genersoft.iot.vmp.common.VideoManagerConstants; -import com.genersoft.iot.vmp.conf.DynamicTask; -import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.conf.exception.ControllerException; import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform; -import com.genersoft.iot.vmp.gb28181.bean.ParentPlatformCatch; -import com.genersoft.iot.vmp.gb28181.bean.PlatformCatalog; import com.genersoft.iot.vmp.gb28181.bean.SubscribeHolder; -import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform; import com.genersoft.iot.vmp.service.*; -import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.IVideoManagerStorage; import com.genersoft.iot.vmp.utils.DateUtil; import com.genersoft.iot.vmp.vmanager.bean.ErrorCode; -import com.genersoft.iot.vmp.vmanager.gb28181.platform.bean.ChannelReduce; -import com.genersoft.iot.vmp.vmanager.gb28181.platform.bean.UpdateChannelParam; import com.github.pagehelper.PageInfo; import io.swagger.v3.oas.annotations.Operation; import io.swagger.v3.oas.annotations.Parameter; @@ -29,11 +19,6 @@ import org.springframework.util.ObjectUtils; import org.springframework.web.bind.annotation.*; import com.genersoft.iot.vmp.conf.SipConfig; -import javax.sip.InvalidArgumentException; -import javax.sip.SipException; -import java.text.ParseException; -import java.util.List; - /** * 级联平台管理 */ diff --git a/src/main/java/com/genersoft/iot/vmp/vmanager/streamPush/StreamPushController.java b/src/main/java/com/genersoft/iot/vmp/vmanager/streamPush/StreamPushController.java index 3eade8e8..8760a5fb 100755 --- a/src/main/java/com/genersoft/iot/vmp/vmanager/streamPush/StreamPushController.java +++ b/src/main/java/com/genersoft/iot/vmp/vmanager/streamPush/StreamPushController.java @@ -244,15 +244,12 @@ public class StreamPushController { @PostMapping(value = "/add") @ResponseBody @Operation(summary = "添加推流信息") - public void add(@RequestBody StreamPushWithCommonChannelParam param){ + public void add(@RequestBody StreamPush param){ if (ObjectUtils.isEmpty(param.getApp()) && ObjectUtils.isEmpty(param.getStream())) { throw new ControllerException(ErrorCode.ERROR400.getCode(), "app或stream不可为空"); } - StreamPush streamPushItem = new StreamPush(); - streamPushItem.setApp(param.getApp()); - streamPushItem.setStream(param.getStream()); - if (!streamPushService.add(streamPushItem, param)) { + if (!streamPushService.add(param)) { throw new ControllerException(ErrorCode.ERROR100); } }