临时提交

结构优化
648540858 2023-12-11 22:49:48 +08:00
parent 92669f192f
commit 31dcde7cc1
28 changed files with 256 additions and 252 deletions

View File

@ -3,11 +3,13 @@ package com.genersoft.iot.vmp.common;
import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel;
import com.genersoft.iot.vmp.gb28181.bean.Gb28181CodeType;
import com.genersoft.iot.vmp.gb28181.utils.SipUtils;
import com.genersoft.iot.vmp.media.zlm.dto.StreamPush;
import com.genersoft.iot.vmp.media.zlm.dto.StreamProxy;
import com.genersoft.iot.vmp.service.bean.CommonGbChannelType;
import com.genersoft.iot.vmp.service.impl.CommonGbChannelServiceImpl;
import com.genersoft.iot.vmp.utils.DateUtil;
import io.swagger.v3.oas.annotations.media.Schema;
import org.apache.commons.lang3.ObjectUtils;
import org.apache.commons.lang3.math.NumberUtils;
import org.checkerframework.checker.units.qual.C;
import org.slf4j.Logger;
@ -762,6 +764,28 @@ public class CommonGbChannel {
return commonGbChannel;
}
public static CommonGbChannel getInstance(StreamPush streamPush){
CommonGbChannel commonGbChannel = new CommonGbChannel();
commonGbChannel.setCommonGbDeviceID(streamPush.getGbId());
commonGbChannel.setType(CommonGbChannelType.PUSH);
if (!ObjectUtils.isEmpty(streamPush.getName().trim())) {
commonGbChannel.setCommonGbName(streamPush.getName().trim());
}
if (streamPush.getLongitude() > 0) {
commonGbChannel.setCommonGbLongitude(streamPush.getLongitude());
}
if (streamPush.getLatitude() > 0) {
commonGbChannel.setCommonGbLatitude(streamPush.getLatitude());
}
if (!ObjectUtils.isEmpty(streamPush.getGroupDeviceId())) {
commonGbChannel.setCommonGbBusinessGroupID(streamPush.getGroupDeviceId());
}
commonGbChannel.setUpdateTime(DateUtil.getNow());
commonGbChannel.setCreateTime(DateUtil.getNow());
return commonGbChannel;
}
public Integer getSVCSpaceSupportMode() {
return SVCSpaceSupportMode;
}

View File

@ -36,7 +36,7 @@ public class SubscribeHolder {
// 添加订阅到期
String taskOverdueKey = taskOverduePrefix + "catalog_" + platformId;
// 添加任务处理订阅过期
dynamicTask.startDelay(taskOverdueKey, () -> removeCatalogSubscribe(subscribeInfo.getId()),
dynamicTask.startDelay(taskOverdueKey, () -> removeCatalogSubscribe(platformId),
subscribeInfo.getExpires() * 1000);
}
}
@ -68,7 +68,7 @@ public class SubscribeHolder {
if (subscribeInfo.getExpires() > 0) {
// 添加任务处理订阅过期
dynamicTask.startDelay(taskOverdueKey, () -> {
removeMobilePositionSubscribe(subscribeInfo.getId());
removeMobilePositionSubscribe(platformId);
},
subscribeInfo.getExpires() * 1000);
}

View File

@ -24,10 +24,10 @@ public class MobilePositionSubscribeHandlerTask implements ISubscribeTask {
private IPlatformService platformService;
private String platformId;
private Integer platformId;
public MobilePositionSubscribeHandlerTask(String platformId) {
public MobilePositionSubscribeHandlerTask(Integer platformId) {
this.platformService = SpringBeanFactory.getBean("platformServiceImpl");
this.platformId = platformId;
}

View File

@ -417,7 +417,7 @@ public class ZLMHttpHookListener {
if (gbStream != null) {
// eventPublisher.catalogEventPublishForStream(null, gbStream, CatalogEvent.OFF);
}
zlmMediaListManager.removeMedia(param.getApp(), param.getStream());
zlmMediaListManager.streamOffline(param.getApp(), param.getStream());
}
GbStream gbStream = storager.getGbStream(param.getApp(), param.getStream());
if (gbStream != null) {

View File

@ -5,12 +5,11 @@ import com.genersoft.iot.vmp.gb28181.bean.GbStream;
import com.genersoft.iot.vmp.media.zlm.dto.*;
import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam;
import com.genersoft.iot.vmp.service.IMediaServerService;
import com.genersoft.iot.vmp.service.IResourceService;
import com.genersoft.iot.vmp.service.IStreamProxyService;
import com.genersoft.iot.vmp.service.IStreamPushService;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
import com.genersoft.iot.vmp.storager.dao.GbStreamMapper;
import com.genersoft.iot.vmp.storager.dao.PlatformGbStreamMapper;
import com.genersoft.iot.vmp.storager.dao.StreamPushMapper;
import com.genersoft.iot.vmp.utils.DateUtil;
import org.slf4j.Logger;
@ -39,17 +38,11 @@ public class ZLMMediaListManager {
@Autowired
private IVideoManagerStorage storager;
@Autowired
private GbStreamMapper gbStreamMapper;
@Autowired
private PlatformGbStreamMapper platformGbStreamMapper;
@Autowired
private IStreamPushService streamPushService;
@Autowired
private IStreamProxyService streamProxyService;
private Map<String, IResourceService> resourceServiceMap;
@Autowired
private StreamPushMapper streamPushMapper;
@ -71,16 +64,16 @@ public class ZLMMediaListManager {
public StreamPush addPush(OnStreamChangedHookParam onStreamChangedHookParam) {
StreamPush transform = streamPushService.transform(onStreamChangedHookParam);
StreamPush pushInDb = streamPushService.getPush(onStreamChangedHookParam.getApp(), onStreamChangedHookParam.getStream());
transform.setPushIng(onStreamChangedHookParam.isRegist());
transform.setUpdateTime(DateUtil.getNow());
transform.setPushTime(DateUtil.getNow());
transform.setSelf(userSetting.getServerId().equals(onStreamChangedHookParam.getSeverId()));
if (pushInDb == null) {
transform.setCreateTime(DateUtil.getNow());
streamPushMapper.add(transform);
streamPushService.add(transform);
}else {
streamPushMapper.update(transform);
gbStreamMapper.updateMediaServer(onStreamChangedHookParam.getApp(), onStreamChangedHookParam.getStream(), onStreamChangedHookParam.getMediaServerId());
pushInDb.setPushIng(onStreamChangedHookParam.isRegist());
pushInDb.setUpdateTime(DateUtil.getNow());
pushInDb.setPushTime(DateUtil.getNow());
pushInDb.setSelf(userSetting.getServerId().equals(onStreamChangedHookParam.getSeverId()));
streamPushService.update(pushInDb);
}
ChannelOnlineEvent channelOnlineEventLister = getChannelOnlineEventLister(transform.getApp(), transform.getStream());
if ( channelOnlineEventLister != null) {
@ -111,16 +104,10 @@ public class ZLMMediaListManager {
}
}
public int removeMedia(String app, String streamId) {
// 查找是否关联了国标, 关联了不删除, 置为离线
GbStream gbStream = gbStreamMapper.selectOne(app, streamId);
int result;
if (gbStream == null) {
result = storager.removeMedia(app, streamId);
}else {
result =storager.mediaOffline(app, streamId);
public void streamOffline(String app, String streamId) {
for (String key : resourceServiceMap.keySet()) {
resourceServiceMap.get(key).streamOffline(app, streamId);
}
return result;
}
public void addChannelOnlineEventLister(String app, String stream, ChannelOnlineEvent callback) {

View File

@ -1,6 +1,7 @@
package com.genersoft.iot.vmp.media.zlm.dto;
import com.genersoft.iot.vmp.utils.DateUtil;
import com.genersoft.iot.vmp.vmanager.bean.StreamPushExcelDto;
import io.swagger.v3.oas.annotations.media.Schema;
import org.jetbrains.annotations.NotNull;
@ -108,6 +109,22 @@ public class StreamPush implements Comparable<StreamPush>{
@Schema(description = "状态")
private boolean status;
@Schema(description = "分组国标编号")
private String groupDeviceId;
public static StreamPush getInstance(StreamPushExcelDto streamPushExcelDto) {
StreamPush streamPush = new StreamPush();
streamPush.setApp(streamPushExcelDto.getApp());
streamPush.setStream(streamPushExcelDto.getStream());
streamPush.setStatus(streamPushExcelDto.isStatus());
streamPush.setCreateTime(DateUtil.getNow());
streamPush.setUpdateTime(DateUtil.getNow());
streamPush.setGbId(streamPushExcelDto.getGbId());
streamPush.setName(streamPushExcelDto.getName());
streamPush.setGroupDeviceId(streamPushExcelDto.getCatalogId());
return streamPush;
}
@Override
public int compareTo(@NotNull StreamPush streamPushItem) {
@ -266,5 +283,13 @@ public class StreamPush implements Comparable<StreamPush>{
public void setStatus(boolean status) {
this.status = status;
}
public String getGroupDeviceId() {
return groupDeviceId;
}
public void setGroupDeviceId(String groupDeviceId) {
this.groupDeviceId = groupDeviceId;
}
}

View File

@ -85,4 +85,6 @@ public interface ICommonGbChannelService {
void deleteById(int commonGbChannelId);
void deleteByIdList(List<Integer> commonChannelIdList);
void offlineForList(List<Integer> onlinePushers);
}

View File

@ -3,7 +3,6 @@ package com.genersoft.iot.vmp.service;
import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel;
import com.genersoft.iot.vmp.vmanager.bean.ResourceBaseInfo;
import com.genersoft.iot.vmp.vmanager.gb28181.platform.bean.ChannelReduce;
import java.util.List;

View File

@ -5,6 +5,7 @@ import com.genersoft.iot.vmp.service.bean.Group;
import com.github.pagehelper.PageInfo;
import java.util.List;
import java.util.Map;
/**
*
@ -71,4 +72,7 @@ public interface IGroupService {
*
*/
PageInfo<Group> queryChildGroupList(String groupParentId, int page, int count);
Map<String, Group> getAllGroupMap();
}

View File

@ -1,8 +1,6 @@
package com.genersoft.iot.vmp.service;
import com.genersoft.iot.vmp.common.CommonGbChannel;
import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform;
import com.genersoft.iot.vmp.vmanager.gb28181.platform.bean.ChannelReduce;
import java.util.List;

View File

@ -54,7 +54,7 @@ public interface IPlatformService {
*
* @param platformId
*/
void sendNotifyMobilePosition(String platformId);
void sendNotifyMobilePosition(Integer platformId);
void addSimulatedSubscribeInfo(ParentPlatform parentPlatform);

View File

@ -37,4 +37,9 @@ public interface IResourceService {
*/
boolean ptzControl(CommonGbChannel commonGbChannel, String command,
Integer horizonSpeed, Integer verticalSpeed, Integer zoomSpeed);
/**
* 线
*/
void streamOffline(String app, String streamId);
}

View File

@ -7,6 +7,7 @@ import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.media.zlm.dto.StreamPush;
import com.genersoft.iot.vmp.service.bean.StreamPushItemFromRedis;
import com.genersoft.iot.vmp.vmanager.bean.ResourceBaseInfo;
import com.genersoft.iot.vmp.vmanager.bean.StreamPushExcelDto;
import com.github.pagehelper.PageInfo;
import java.util.List;
@ -65,7 +66,7 @@ public interface IStreamPushService {
/**
*
*/
void batchAddForUpload(List<StreamPush> streamPushItems, Map<String, List<String[]>> streamPushItemsForAll);
void batchAddForUpload(List<StreamPushExcelDto> streamPushItems);
/**
* 线
@ -85,7 +86,7 @@ public interface IStreamPushService {
/**
*
*/
boolean add(StreamPush stream, CommonGbChannel commonGbChannel);
boolean add(StreamPush stream);
/**
* app+Streanm
@ -100,4 +101,6 @@ public interface IStreamPushService {
ResourceBaseInfo getOverview();
void batchUpdate(List<StreamPush> streamPushItemForUpdate);
void update(StreamPush transform);
}

View File

@ -777,4 +777,11 @@ public class CommonGbChannelServiceImpl implements ICommonGbChannelService {
commonGbChannelMapper.deleteByIdList(commonChannelIdList);
// TODO 向国标级联发送catalog
}
@Override
public void offlineForList(List<Integer> onlinePushers) {
// TODO 向国标级联发送catalog
}
}

View File

@ -12,14 +12,10 @@ import com.genersoft.iot.vmp.gb28181.task.impl.MobilePositionSubscribeTask;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.response.cmd.CatalogResponseMessageHandler;
import com.genersoft.iot.vmp.service.IDeviceChannelService;
import com.genersoft.iot.vmp.service.IDeviceService;
import com.genersoft.iot.vmp.service.IInviteStreamService;
import com.genersoft.iot.vmp.service.IMediaServerService;
import com.genersoft.iot.vmp.service.*;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.dao.DeviceChannelMapper;
import com.genersoft.iot.vmp.storager.dao.DeviceMapper;
import com.genersoft.iot.vmp.storager.dao.PlatformChannelMapper;
import com.genersoft.iot.vmp.utils.DateUtil;
import com.genersoft.iot.vmp.vmanager.bean.BaseTree;
import com.genersoft.iot.vmp.vmanager.bean.ResourceBaseInfo;
@ -30,6 +26,7 @@ import org.springframework.jdbc.datasource.DataSourceTransactionManager;
import org.springframework.stereotype.Service;
import org.springframework.transaction.TransactionDefinition;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.ObjectUtils;
import javax.sip.InvalidArgumentException;
@ -69,8 +66,6 @@ public class DeviceServiceImpl implements IDeviceService {
@Autowired
private DeviceMapper deviceMapper;
@Autowired
private PlatformChannelMapper platformChannelMapper;
@Autowired
private IDeviceChannelService deviceChannelService;
@ -96,6 +91,9 @@ public class DeviceServiceImpl implements IDeviceService {
@Autowired
private IMediaServerService mediaServerService;
@Autowired
private ICommonGbChannelService commonGbChannelService;
@Override
public void online(Device device, SipTransactionInfo sipTransactionInfo) {
logger.info("[设备上线] deviceId{}->{}:{}", device.getDeviceId(), device.getIp(), device.getPort());
@ -571,25 +569,13 @@ public class DeviceServiceImpl implements IDeviceService {
}
@Override
@Transactional
public boolean delete(String deviceId) {
TransactionStatus transactionStatus = dataSourceTransactionManager.getTransaction(transactionDefinition);
boolean result = false;
try {
platformChannelMapper.delChannelForDeviceId(deviceId);
deviceChannelMapper.cleanChannelsByDeviceId(deviceId);
if ( deviceMapper.del(deviceId) < 0 ) {
//事务回滚
dataSourceTransactionManager.rollback(transactionStatus);
}
result = true;
dataSourceTransactionManager.commit(transactionStatus); //手动提交
}catch (Exception e) {
dataSourceTransactionManager.rollback(transactionStatus);
}
if (result) {
redisCatchStorage.removeDevice(deviceId);
}
return result;
List<Integer> commonChannelIdList = deviceChannelMapper.getCommonChannelIdList(deviceId);
commonGbChannelService.deleteByIdList(commonChannelIdList);
deviceChannelMapper.cleanChannelsByDeviceId(deviceId);
deviceMapper.del(deviceId);
return true;
}
@Override

View File

@ -25,6 +25,7 @@ import org.springframework.util.ObjectUtils;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@Service
public class GroupServiceImpl implements IGroupService {
@ -270,4 +271,9 @@ public class GroupServiceImpl implements IGroupService {
return new PageInfo<>(groupList);
}
@Override
public Map<String, Group> getAllGroupMap() {
return groupMapper.queryAllForMap();
}
}

View File

@ -385,8 +385,8 @@ public class PlatformServiceImpl implements IPlatformService {
}
@Override
public void sendNotifyMobilePosition(String platformId) {
ParentPlatform platform = platformMapper.getParentPlatByServerGBId(platformId);
public void sendNotifyMobilePosition(Integer platformId) {
ParentPlatform platform = platformMapper.getParentPlatById(platformId);
if (platform == null) {
return;
}

View File

@ -3,8 +3,8 @@ package com.genersoft.iot.vmp.service.impl;
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONObject;
import com.alibaba.fastjson2.TypeReference;
import com.genersoft.iot.vmp.common.BatchLimit;
import com.genersoft.iot.vmp.common.CommonGbChannel;
import com.genersoft.iot.vmp.conf.MediaConfig;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.gb28181.bean.*;
import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
@ -13,15 +13,15 @@ import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils;
import com.genersoft.iot.vmp.media.zlm.dto.*;
import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam;
import com.genersoft.iot.vmp.media.zlm.dto.hook.OriginType;
import com.genersoft.iot.vmp.service.ICommonGbChannelService;
import com.genersoft.iot.vmp.service.IMediaServerService;
import com.genersoft.iot.vmp.service.IStreamPushService;
import com.genersoft.iot.vmp.service.*;
import com.genersoft.iot.vmp.service.bean.CommonGbChannelType;
import com.genersoft.iot.vmp.service.bean.Group;
import com.genersoft.iot.vmp.service.bean.StreamPushItemFromRedis;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.dao.*;
import com.genersoft.iot.vmp.utils.DateUtil;
import com.genersoft.iot.vmp.vmanager.bean.ResourceBaseInfo;
import com.genersoft.iot.vmp.vmanager.bean.StreamPushExcelDto;
import com.github.pagehelper.PageHelper;
import com.github.pagehelper.PageInfo;
import org.slf4j.Logger;
@ -66,13 +66,14 @@ public class StreamPushServiceImpl implements IStreamPushService {
private ICommonGbChannelService commonGbChannelService;
@Autowired
DataSourceTransactionManager dataSourceTransactionManager;
private DataSourceTransactionManager dataSourceTransactionManager;
@Autowired
TransactionDefinition transactionDefinition;
private TransactionDefinition transactionDefinition;
@Autowired
private MediaConfig mediaConfig;
private IGroupService groupService;
@Override
@ -163,14 +164,14 @@ public class StreamPushServiceImpl implements IStreamPushService {
// redis记录
List<OnStreamChangedHookParam> onStreamChangedHookParams = redisCatchStorage.getStreams(mediaServerId, "PUSH");
Map<String, OnStreamChangedHookParam> streamInfoPushItemMap = new HashMap<>();
if (pushList.size() > 0) {
if (!pushList.isEmpty()) {
for (StreamPush streamPushItem : pushList) {
if (streamPushItem.getCommonGbChannelId() > 0) {
pushItemMap.put(streamPushItem.getApp() + streamPushItem.getStream(), streamPushItem);
}
}
}
if (onStreamChangedHookParams.size() > 0) {
if (!onStreamChangedHookParams.isEmpty()) {
for (OnStreamChangedHookParam onStreamChangedHookParam : onStreamChangedHookParams) {
streamInfoPushItemMap.put(onStreamChangedHookParam.getApp() + onStreamChangedHookParam.getStream(), onStreamChangedHookParam);
}
@ -203,13 +204,12 @@ public class StreamPushServiceImpl implements IStreamPushService {
}
}
List<StreamPush> offlinePushItems = new ArrayList<>(pushItemMap.values());
if (offlinePushItems.size() > 0) {
if (!offlinePushItems.isEmpty()) {
String type = "PUSH";
int runLimit = 300;
if (offlinePushItems.size() > runLimit) {
for (int i = 0; i < offlinePushItems.size(); i += runLimit) {
int toIndex = i + runLimit;
if (i + runLimit > offlinePushItems.size()) {
if (offlinePushItems.size() > BatchLimit.count) {
for (int i = 0; i < offlinePushItems.size(); i += BatchLimit.count) {
int toIndex = i + BatchLimit.count;
if (i + BatchLimit.count > offlinePushItems.size()) {
toIndex = offlinePushItems.size();
}
List<StreamPush> streamPushItemsSub = offlinePushItems.subList(i, toIndex);
@ -221,7 +221,7 @@ public class StreamPushServiceImpl implements IStreamPushService {
}
Collection<OnStreamChangedHookParam> offlineOnStreamChangedHookParamList = streamInfoPushItemMap.values();
if (offlineOnStreamChangedHookParamList.size() > 0) {
if (!offlineOnStreamChangedHookParamList.isEmpty()) {
String type = "PUSH";
for (OnStreamChangedHookParam offlineOnStreamChangedHookParam : offlineOnStreamChangedHookParamList) {
JSONObject jsonObject = new JSONObject();
@ -237,7 +237,7 @@ public class StreamPushServiceImpl implements IStreamPushService {
}
Collection<StreamAuthorityInfo> streamAuthorityInfos = streamAuthorityInfoInfoMap.values();
if (streamAuthorityInfos.size() > 0) {
if (!streamAuthorityInfos.isEmpty()) {
for (StreamAuthorityInfo streamAuthorityInfo : streamAuthorityInfos) {
// 移除redis内流的信息
redisCatchStorage.removeStreamAuthorityInfo(streamAuthorityInfo.getApp(), streamAuthorityInfo.getStream());
@ -258,7 +258,7 @@ public class StreamPushServiceImpl implements IStreamPushService {
String type = "PUSH";
// 发送redis消息
List<OnStreamChangedHookParam> streamInfoList = redisCatchStorage.getStreams(mediaServerId, type);
if (streamInfoList.size() > 0) {
if (!streamInfoList.isEmpty()) {
for (OnStreamChangedHookParam onStreamChangedHookParam : streamInfoList) {
// 移除redis内流的信息
redisCatchStorage.removeStream(mediaServerId, type, onStreamChangedHookParam.getApp(), onStreamChangedHookParam.getStream());
@ -282,10 +282,40 @@ public class StreamPushServiceImpl implements IStreamPushService {
@Transactional
public void batchAdd(List<StreamPush> streamPushItems) {
// 把存在国标Id的写入同步资源库
List<CommonGbChannel> commonGbChannelList = new ArrayList<>();
List<StreamPush> streamPushListForChannel = new ArrayList<>();
List<StreamPush> streamPushListWithoutChannel = new ArrayList<>();
// 将含有国标编号的推流数据与没有国标编号的进行拆分拆分先将通用通道存储得到每个通道的ID赋值给推流信息后再将所有推流信息存入
streamPushItems.stream().forEach(streamPush -> {
if (!ObjectUtils.isEmpty(streamPush.getGbId())) {
CommonGbChannel channel = CommonGbChannel.getInstance(streamPush);
commonGbChannelList.add(channel);
streamPushListForChannel.add(streamPush);
}else {
streamPushListWithoutChannel.add(streamPush);
}
});
if (!commonGbChannelList.isEmpty()) {
streamPushMapper.addAll(streamPushItems);
commonGbChannelService.batchAdd(commonGbChannelList);
for (int i = 0; i < commonGbChannelList.size(); i++) {
streamPushListForChannel.get(i).setCommonGbChannelId(commonGbChannelList.get(i).getCommonGbId());
}
streamPushListWithoutChannel.addAll(streamPushListForChannel);
}
if (streamPushListWithoutChannel.size() > BatchLimit.count) {
for (int i = 0; i < streamPushListWithoutChannel.size(); i += BatchLimit.count) {
int toIndex = i + BatchLimit.count;
if (i + BatchLimit.count > streamPushListWithoutChannel.size()) {
toIndex = streamPushListWithoutChannel.size();
}
List<StreamPush> streamPushItemsSub = streamPushListWithoutChannel.subList(i, toIndex);
streamPushMapper.addAll(streamPushItemsSub);
}
}else {
streamPushMapper.addAll(streamPushListWithoutChannel);
}
}
@Override
@ -294,91 +324,45 @@ public class StreamPushServiceImpl implements IStreamPushService {
}
@Override
public void batchAddForUpload(List<StreamPush> streamPushItems, Map<String, List<String[]>> streamPushItemsForAll ) {
// 存储数据到stream_push表
streamPushMapper.addAll(streamPushItems);
List<StreamPush> streamPushItemForGbStream = streamPushItems.stream()
.filter(streamPushItem-> streamPushItem.getGbId() != null)
.collect(Collectors.toList());
// 存储数据到gb_stream表 id会返回到streamPushItemForGbStream里
if (streamPushItemForGbStream.size() > 0) {
gbStreamMapper.batchAdd(streamPushItemForGbStream);
@Transactional
public void batchAddForUpload(List<StreamPushExcelDto> streamPushExcelDtoList) {
// 插入国标通用通道得到通道ID
List<CommonGbChannel> commonGbChannelList = new ArrayList<>();
List<StreamPush> streamPushListForChannel = new ArrayList<>();
List<StreamPush> streamPushListWithoutChannel = new ArrayList<>();
Map<String, Group> groupMap = groupService.getAllGroupMap();
streamPushExcelDtoList.stream().forEach(streamPushExcelDto -> {
StreamPush streamPush = StreamPush.getInstance(streamPushExcelDto);
if (!ObjectUtils.isEmpty(streamPushExcelDto.getGbId().trim())) {
CommonGbChannel commonGbChannel = CommonGbChannel.getInstance(streamPush);
if (!ObjectUtils.isEmpty(streamPushExcelDto.getCatalogId())
&& groupMap.containsKey(streamPushExcelDto.getCatalogId())) {
commonGbChannel.setCommonGbBusinessGroupID(streamPushExcelDto.getCatalogId());
}
commonGbChannelList.add(commonGbChannel);
streamPushListForChannel.add(streamPush);
}else {
streamPushListWithoutChannel.add(streamPush);
}
});
commonGbChannelService.batchAdd(commonGbChannelList);
for (int i = 0; i < commonGbChannelList.size(); i++) {
streamPushListForChannel.get(i).setCommonGbChannelId(commonGbChannelList.get(i).getCommonGbId());
}
// 去除没有ID也就是没有存储到数据库的数据
List<StreamPush> streamPushItemsForPlatform = streamPushItemForGbStream.stream()
.filter(streamPushItem-> streamPushItem.getGbStreamId() != null)
.collect(Collectors.toList());
if (streamPushItemsForPlatform.size() > 0) {
// 获取所有平台,平台和目录信息一般不会特别大量。
List<ParentPlatform> parentPlatformList = parentPlatformMapper.getParentPlatformList();
Map<String, Map<String, PlatformCatalog>> platformInfoMap = new HashMap<>();
if (parentPlatformList.size() == 0) {
return;
}
for (ParentPlatform platform : parentPlatformList) {
Map<String, PlatformCatalog> catalogMap = new HashMap<>();
// 创建根节点
PlatformCatalog platformCatalog = new PlatformCatalog();
platformCatalog.setId(platform.getServerGBId());
catalogMap.put(platform.getServerGBId(), platformCatalog);
// 查询所有节点信息
List<PlatformCatalog> platformCatalogs = platformCatalogMapper.selectByPlatForm(platform.getServerGBId());
if (platformCatalogs.size() > 0) {
for (PlatformCatalog catalog : platformCatalogs) {
catalogMap.put(catalog.getId(), catalog);
}
}
platformInfoMap.put(platform.getServerGBId(), catalogMap);
}
List<StreamPush> streamPushItemListFroPlatform = new ArrayList<>();
Map<String, List<GbStream>> platformForEvent = new HashMap<>();
// 遍历存储结果查找app+Stream->platformId+catalogId的对应关系然后执行批量写入
for (StreamPush streamPushItem : streamPushItemsForPlatform) {
List<String[]> platFormInfoList = streamPushItemsForAll.get(streamPushItem.getApp() + streamPushItem.getStream());
if (platFormInfoList != null && platFormInfoList.size() > 0) {
for (String[] platFormInfoArray : platFormInfoList) {
StreamPush streamPushItemForPlatform = new StreamPush();
streamPushItemForPlatform.setGbStreamId(streamPushItem.getGbStreamId());
if (platFormInfoArray.length > 0) {
// 数组 platFormInfoArray 0 为平台ID。 1为目录ID
// 不存在这个平台,则忽略导入此关联关系
if (platformInfoMap.get(platFormInfoArray[0]) == null
|| platformInfoMap.get(platFormInfoArray[0]).get(platFormInfoArray[1]) == null) {
logger.info("导入数据时不存在平台或目录{}/{},已导入未分配", platFormInfoArray[0], platFormInfoArray[1] );
continue;
}
streamPushItemForPlatform.setPlatformId(platFormInfoArray[0]);
List<GbStream> gbStreamList = platformForEvent.get(platFormInfoArray[0]);
if (gbStreamList == null) {
gbStreamList = new ArrayList<>();
platformForEvent.put(platFormInfoArray[0], gbStreamList);
}
// 为发送通知整理数据
streamPushItemForPlatform.setName(streamPushItem.getName());
streamPushItemForPlatform.setApp(streamPushItem.getApp());
streamPushItemForPlatform.setStream(streamPushItem.getStream());
streamPushItemForPlatform.setGbId(streamPushItem.getGbId());
gbStreamList.add(streamPushItemForPlatform);
}
if (platFormInfoArray.length > 1) {
streamPushItemForPlatform.setCatalogId(platFormInfoArray[1]);
}
streamPushItemListFroPlatform.add(streamPushItemForPlatform);
}
}
}
if (!streamPushItemListFroPlatform.isEmpty()) {
platformGbStreamMapper.batchAdd(streamPushItemListFroPlatform);
// 发送通知
for (String platformId : platformForEvent.keySet()) {
eventPublisher.catalogEventPublishForStream(
platformId, platformForEvent.get(platformId), CatalogEvent.ADD);
streamPushListWithoutChannel.addAll(streamPushListForChannel);
if (streamPushListWithoutChannel.size() > BatchLimit.count) {
for (int i = 0; i < streamPushListWithoutChannel.size(); i += BatchLimit.count) {
int toIndex = i + BatchLimit.count;
if (i + BatchLimit.count > streamPushListWithoutChannel.size()) {
toIndex = streamPushListWithoutChannel.size();
}
List<StreamPush> streamPushItemsSub = streamPushListWithoutChannel.subList(i, toIndex);
streamPushMapper.addAll(streamPushItemsSub);
}
}else {
streamPushMapper.addAll(streamPushListWithoutChannel);
}
}
@ -401,14 +385,12 @@ public class StreamPushServiceImpl implements IStreamPushService {
@Override
public void allStreamOffline() {
List<GbStream> onlinePushers = streamPushMapper.getOnlinePusherForGb();
if (onlinePushers.size() == 0) {
return;
}
List<Integer> onlinePushers = streamPushMapper.getOnlinePusherForGb();
streamPushMapper.setAllStreamOffline();
if (!onlinePushers.isEmpty()) {
commonGbChannelService.offlineForList(onlinePushers);
}
// 发送通知
eventPublisher.catalogEventPublishForStream(null, onlinePushers, CatalogEvent.OFF);
}
@Override
@ -431,19 +413,15 @@ public class StreamPushServiceImpl implements IStreamPushService {
@Override
@Transactional
public boolean add(StreamPush stream, CommonGbChannel commonGbChannel) {
assert !ObjectUtils.isEmpty(commonGbChannel.getCommonGbDeviceID());
assert !ObjectUtils.isEmpty(commonGbChannel.getCommonGbName());
public boolean add(StreamPush stream) {
String now = DateUtil.getNow();
commonGbChannel.setCreateTime(now);
commonGbChannel.setUpdateTime(now);
commonGbChannel.setType(CommonGbChannelType.PUSH);
commonGbChannelService.add(commonGbChannel);
if (commonGbChannel.getCommonGbId() > 0) {
stream.setCommonGbChannelId(commonGbChannel.getCommonGbId());
}else {
return false;
CommonGbChannel commonGbChannel = null;
if (!ObjectUtils.isEmpty(stream.getGbId())) {
commonGbChannel = CommonGbChannel.getInstance(stream);
commonGbChannelService.add(commonGbChannel);
if (commonGbChannel.getCommonGbId() > 0) {
stream.setCommonGbChannelId(commonGbChannel.getCommonGbId());
}
}
stream.setUpdateTime(now);
stream.setCreateTime(now);
@ -451,6 +429,23 @@ public class StreamPushServiceImpl implements IStreamPushService {
return streamPushMapper.add(stream) > 1;
}
@Override
@Transactional
public void update(StreamPush streamPush) {
assert streamPush.getId() > 0;
StreamPush streamPushIDb = streamPushMapper.query(streamPush.getId());
assert streamPushIDb != null;
if (streamPushIDb.getCommonGbChannelId() > 0 && streamPush.getCommonGbChannelId() == 0) {
commonGbChannelService.deleteById(streamPushIDb.getCommonGbChannelId());
}
if (streamPushIDb.getCommonGbChannelId() == 0 && streamPush.getCommonGbChannelId() > 0) {
CommonGbChannel commonGbChannel = CommonGbChannel.getInstance(streamPush);
commonGbChannelService.add(commonGbChannel);
}
streamPush.setUpdateTime(DateUtil.getNow());
streamPushMapper.update(streamPush);
}
@Override
public Map<String, StreamPush> getAllAppAndStream() {
return streamPushMapper.getAllAppAndStream();

View File

@ -32,12 +32,12 @@ public class StreamPushUploadFileHandler extends AnalysisEventListener<StreamPus
/**
*
*/
private final List<StreamPush> streamPushItems = new ArrayList<>();
private final List<StreamPushExcelDto> streamPushItems = new ArrayList<>();
/**
* APP+Streamstream_pushgb_stream
*/
private final Map<String, StreamPush> streamPushItemForSave = new HashMap<>();
private final Map<String, StreamPushExcelDto> streamPushItemForSave = new HashMap<>();
/**
* APP+StreamKEY ID+Id valuegb_streamapp+Stream
@ -81,7 +81,7 @@ public class StreamPushUploadFileHandler extends AnalysisEventListener<StreamPus
this.errorDataHandler = errorDataHandler;
// 获取数据库已有的数据,已经存在的则忽略
Map<String, StreamPush> allAppAndStreams = pushService.getAllAppAndStream();
if (allAppAndStreams.size() > 0) {
if (!allAppAndStreams.isEmpty()) {
for (String allAppAndStream : allAppAndStreams.keySet()) {
pushMapInDb.put(allAppAndStream, allAppAndStream);
}
@ -126,37 +126,9 @@ public class StreamPushUploadFileHandler extends AnalysisEventListener<StreamPus
streamPushStreamSet.add(streamPushExcelDto.getApp()+streamPushExcelDto.getStream() + streamPushExcelDto.getPlatformId());
}
StreamPush streamPushItem = new StreamPush();
streamPushItem.setApp(streamPushExcelDto.getApp());
streamPushItem.setStream(streamPushExcelDto.getStream());
streamPushItem.setGbId(streamPushExcelDto.getGbId());
streamPushItem.setStatus(streamPushExcelDto.getStatus());
streamPushItem.setCreateTime(DateUtil.getNow());
streamPushItem.setMediaServerId(defaultMediaServerId);
streamPushItem.setName(streamPushExcelDto.getName());
streamPushItem.setTotalReaderCount("0");
streamPushItem.setPlatformId(streamPushExcelDto.getPlatformId());
streamPushItem.setCatalogId(streamPushExcelDto.getCatalogId());
// 存入所有的通道信息
streamPushItems.add(streamPushItem);
streamPushItemForSave.put(streamPushItem.getApp() + streamPushItem.getStream(), streamPushItem);
if (!ObjectUtils.isEmpty(streamPushExcelDto.getPlatformId())) {
List<String[]> platformList = streamPushItemsForPlatform.get(streamPushItem.getApp() + streamPushItem.getStream());
if (platformList == null) {
platformList = new ArrayList<>();
streamPushItemsForPlatform.put(streamPushItem.getApp() + streamPushItem.getStream(), platformList);
}
String platformId = streamPushExcelDto.getPlatformId();
String catalogId = streamPushExcelDto.getCatalogId();
if (ObjectUtils.isEmpty(streamPushExcelDto.getCatalogId())) {
catalogId = null;
}
String[] platFormInfoArray = new String[]{platformId, catalogId};
platformList.add(platFormInfoArray);
}
streamPushItems.add(streamPushExcelDto);
streamPushItemForSave.put(streamPushExcelDto.getApp() + streamPushExcelDto.getStream(), streamPushExcelDto);
loadedSize ++;
if (loadedSize > 1000) {
@ -182,9 +154,9 @@ public class StreamPushUploadFileHandler extends AnalysisEventListener<StreamPus
}
private void saveData(){
if (streamPushItemForSave.size() > 0) {
if (!streamPushItemForSave.isEmpty()) {
// 向数据库查询是否存在重复的app
pushService.batchAddForUpload(new ArrayList<>(streamPushItemForSave.values()), streamPushItemsForPlatform);
pushService.batchAddForUpload(new ArrayList<>(streamPushItemForSave.values()));
}
}
}

View File

@ -3,10 +3,8 @@ package com.genersoft.iot.vmp.service.redisMsg;
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONObject;
import com.genersoft.iot.vmp.media.zlm.dto.StreamPush;
import com.genersoft.iot.vmp.service.IGbStreamService;
import com.genersoft.iot.vmp.service.IMediaServerService;
import com.genersoft.iot.vmp.service.IStreamPushService;
import com.genersoft.iot.vmp.service.dto.StreamPushDto;
import com.genersoft.iot.vmp.utils.DateUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -78,7 +76,11 @@ public class RedisPushStreamStatusListMsgListener implements MessageListener {
if (!streamPush.getName().equals(streamPushBoInDB.getName())
|| !streamPush.getGbId().equals(streamPushBoInDB.getGbId())
|| !streamPush.isStatus() == streamPushBoInDB.isStatus()) {
streamPushItemForUpdate.add(streamPush);
streamPushBoInDB.setName(streamPush.getName());
streamPushBoInDB.setGbId(streamPush.getGbId());
streamPushBoInDB.setStatus(streamPush.isStatus());
streamPushItemForUpdate.add(streamPushBoInDB);
}
}
}

View File

@ -79,7 +79,7 @@ public class RedisStreamMsgListener implements MessageListener {
if (register) {
zlmMediaListManager.addPush(onStreamChangedHookParam);
}else {
zlmMediaListManager.removeMedia(app, stream);
zlmMediaListManager.streamOffline(app, stream);
}
}catch (Exception e) {
logger.warn("[REDIS消息-流变化] 发现未处理的异常, \r\n{}", JSON.toJSONString(message));

View File

@ -4,7 +4,6 @@ import com.genersoft.iot.vmp.gb28181.bean.*;
import com.genersoft.iot.vmp.media.zlm.dto.StreamProxy;
import com.genersoft.iot.vmp.service.bean.GPSMsgInfo;
import com.genersoft.iot.vmp.storager.dao.dto.ChannelSourceInfo;
import com.genersoft.iot.vmp.vmanager.gb28181.platform.bean.ChannelReduce;
import com.genersoft.iot.vmp.web.gb28181.dto.DeviceChannelExtend;
import com.github.pagehelper.PageInfo;

View File

@ -464,5 +464,10 @@ public interface DeviceChannelMapper {
" where common_gb_channel_id = #{commonGbId}")
int removeCommonChannelId(@Param("commonGbId") int commonGbId);
@Select(value = {" <script>" +
"select common_gb_channel_id " +
"from wvp_device_channel " +
"where device_id=#{deviceId}" +
" </script>"})
List<Integer> getCommonChannelIdList(@Param("deviceId") String deviceId);
}

View File

@ -163,4 +163,9 @@ public interface GroupMapper {
"</foreach>" +
"</script>"})
int updateAll(@Param("groupList") List<Group> groupList);
@MapKey("commonGroupDeviceId")
@Select("select * from wvp_common_group")
Map<String, Group> queryAllForMap();
}

View File

@ -166,8 +166,8 @@ public interface StreamPushMapper {
")</script>")
void online(List<StreamPushItemFromRedis> onlineStreams);
@Select("SELECT gs.* FROM wvp_stream_push sp left join wvp_gb_stream gs on sp.app = gs.app AND sp.stream = gs.stream where sp.status = true")
List<GbStream> getOnlinePusherForGb();
@Select("SELECT common_gb_channel_id FROM wvp_stream_push gb_id > 0")
List<Integer> getOnlinePusherForGb();
@Update("UPDATE wvp_stream_push SET status=0")
void setAllStreamOffline();
@ -192,4 +192,7 @@ public interface StreamPushMapper {
"</foreach>" +
"</script>")
List<StreamPush> getListIn(List<StreamPush> streamPushItems);
@Select("select* from wvp_stream_push where id = #{id}")
StreamPush query(@Param("id") Integer id);
}

View File

@ -5,13 +5,11 @@ import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.gb28181.bean.*;
import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
import com.genersoft.iot.vmp.media.zlm.dto.StreamProxy;
import com.genersoft.iot.vmp.service.IGbStreamService;
import com.genersoft.iot.vmp.service.bean.GPSMsgInfo;
import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
import com.genersoft.iot.vmp.storager.dao.*;
import com.genersoft.iot.vmp.storager.dao.dto.ChannelSourceInfo;
import com.genersoft.iot.vmp.utils.DateUtil;
import com.genersoft.iot.vmp.vmanager.gb28181.platform.bean.ChannelReduce;
import com.genersoft.iot.vmp.web.gb28181.dto.DeviceChannelExtend;
import com.github.pagehelper.PageHelper;
import com.github.pagehelper.PageInfo;
@ -66,9 +64,6 @@ public class VideoManagerStorageImpl implements IVideoManagerStorage {
@Autowired
private StreamPushMapper streamPushMapper;
@Autowired
private GbStreamMapper gbStreamMapper;
@Autowired
private UserSetting userSetting;

View File

@ -1,23 +1,13 @@
package com.genersoft.iot.vmp.vmanager.gb28181.platform;
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONObject;
import com.genersoft.iot.vmp.common.VideoManagerConstants;
import com.genersoft.iot.vmp.conf.DynamicTask;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.conf.exception.ControllerException;
import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform;
import com.genersoft.iot.vmp.gb28181.bean.ParentPlatformCatch;
import com.genersoft.iot.vmp.gb28181.bean.PlatformCatalog;
import com.genersoft.iot.vmp.gb28181.bean.SubscribeHolder;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform;
import com.genersoft.iot.vmp.service.*;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
import com.genersoft.iot.vmp.utils.DateUtil;
import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
import com.genersoft.iot.vmp.vmanager.gb28181.platform.bean.ChannelReduce;
import com.genersoft.iot.vmp.vmanager.gb28181.platform.bean.UpdateChannelParam;
import com.github.pagehelper.PageInfo;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.Parameter;
@ -29,11 +19,6 @@ import org.springframework.util.ObjectUtils;
import org.springframework.web.bind.annotation.*;
import com.genersoft.iot.vmp.conf.SipConfig;
import javax.sip.InvalidArgumentException;
import javax.sip.SipException;
import java.text.ParseException;
import java.util.List;
/**
*
*/

View File

@ -244,15 +244,12 @@ public class StreamPushController {
@PostMapping(value = "/add")
@ResponseBody
@Operation(summary = "添加推流信息")
public void add(@RequestBody StreamPushWithCommonChannelParam param){
public void add(@RequestBody StreamPush param){
if (ObjectUtils.isEmpty(param.getApp()) && ObjectUtils.isEmpty(param.getStream())) {
throw new ControllerException(ErrorCode.ERROR400.getCode(), "app或stream不可为空");
}
StreamPush streamPushItem = new StreamPush();
streamPushItem.setApp(param.getApp());
streamPushItem.setStream(param.getStream());
if (!streamPushService.add(streamPushItem, param)) {
if (!streamPushService.add(param)) {
throw new ControllerException(ErrorCode.ERROR100);
}
}