diff --git a/src/main/java/com/genersoft/iot/vmp/common/CommonGbChannel.java b/src/main/java/com/genersoft/iot/vmp/common/CommonGbChannel.java index 4e7b3b29..b7bed5b8 100644 --- a/src/main/java/com/genersoft/iot/vmp/common/CommonGbChannel.java +++ b/src/main/java/com/genersoft/iot/vmp/common/CommonGbChannel.java @@ -3,11 +3,13 @@ package com.genersoft.iot.vmp.common; import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel; import com.genersoft.iot.vmp.gb28181.bean.Gb28181CodeType; import com.genersoft.iot.vmp.gb28181.utils.SipUtils; +import com.genersoft.iot.vmp.media.zlm.dto.StreamProxy; import com.genersoft.iot.vmp.service.bean.CommonGbChannelType; import com.genersoft.iot.vmp.service.impl.CommonGbChannelServiceImpl; import com.genersoft.iot.vmp.utils.DateUtil; import io.swagger.v3.oas.annotations.media.Schema; import org.apache.commons.lang3.math.NumberUtils; +import org.checkerframework.checker.units.qual.C; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -611,6 +613,17 @@ public class CommonGbChannel { this.createTime = createTime; } + public static CommonGbChannel getInstance(StreamProxy streamProxy) { + CommonGbChannel commonGbChannel = new CommonGbChannel(); + commonGbChannel.setType(CommonGbChannelType.PROXY); + commonGbChannel.setCommonGbDeviceID(streamProxy.getGbId()); + commonGbChannel.setCommonGbName(streamProxy.getName()); + commonGbChannel.setCommonGbLongitude(streamProxy.getLongitude()); + commonGbChannel.setCommonGbLatitude(streamProxy.getLatitude()); + commonGbChannel.setCreateTime(DateUtil.getNow()); + commonGbChannel.setUpdateTime(DateUtil.getNow()); + return commonGbChannel; + } public static CommonGbChannel getInstance(List syncKeys, DeviceChannel deviceChannel){ CommonGbChannel commonGbChannel = new CommonGbChannel(); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java index 29ea3dd0..bc17d59a 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java @@ -161,7 +161,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements MediaServerItem mediaServerItem = null; StreamPush streamPushItem = null; - StreamProxyItem proxyByAppAndStream =null; + StreamProxy proxyByAppAndStream =null; // 不是通道可能是直播流 if (channel != null && gbStream == null) { // 通道存在,发100,TRYING diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java index 3c76883a..9aed1c82 100755 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java @@ -20,7 +20,7 @@ import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommanderFroPlatform; import com.genersoft.iot.vmp.media.zlm.dto.HookType; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import com.genersoft.iot.vmp.media.zlm.dto.StreamAuthorityInfo; -import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem; +import com.genersoft.iot.vmp.media.zlm.dto.StreamProxy; import com.genersoft.iot.vmp.media.zlm.dto.hook.*; import com.genersoft.iot.vmp.service.*; import com.genersoft.iot.vmp.service.bean.MessageForPushChannel; @@ -199,7 +199,7 @@ public class ZLMHttpHookListener { } // 推流鉴权的处理 if (!"rtp".equals(param.getApp())) { - StreamProxyItem stream = streamProxyService.getStreamProxyByAppAndStream(param.getApp(), param.getStream()); + StreamProxy stream = streamProxyService.getStreamProxyByAppAndStream(param.getApp(), param.getStream()); if (stream != null) { HookResultForOnPublish result = HookResultForOnPublish.SUCCESS(); result.setEnable_audio(stream.isEnableAudio()); @@ -539,7 +539,7 @@ public class ZLMHttpHookListener { } else { // 非国标流 推流/拉流代理 // 拉流代理 - StreamProxyItem streamProxyItem = streamProxyService.getStreamProxyByAppAndStream(param.getApp(), param.getStream()); + StreamProxy streamProxyItem = streamProxyService.getStreamProxyByAppAndStream(param.getApp(), param.getStream()); if (streamProxyItem != null) { if (streamProxyItem.isEnableRemoveNoneReader()) { // 无人观看自动移除 @@ -678,7 +678,7 @@ public class ZLMHttpHookListener { } else { // 拉流代理 - StreamProxyItem streamProxyByAppAndStream = streamProxyService.getStreamProxyByAppAndStream(param.getApp(), param.getStream()); + StreamProxy streamProxyByAppAndStream = streamProxyService.getStreamProxyByAppAndStream(param.getApp(), param.getStream()); if (streamProxyByAppAndStream != null && streamProxyByAppAndStream.isEnableDisableNoneReader()) { streamProxyService.start(param.getApp(), param.getStream()); } diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/StreamProxyItem.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/StreamProxy.java similarity index 70% rename from src/main/java/com/genersoft/iot/vmp/media/zlm/dto/StreamProxyItem.java rename to src/main/java/com/genersoft/iot/vmp/media/zlm/dto/StreamProxy.java index 07bd1319..3a438998 100755 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/StreamProxyItem.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/StreamProxy.java @@ -6,7 +6,10 @@ import io.swagger.v3.oas.annotations.media.Schema; * @author lin */ @Schema(description = "拉流代理的信息") -public class StreamProxyItem { +public class StreamProxy { + + @Schema(description = "ID") + private int id; @Schema(description = "类型") private String type; @@ -43,12 +46,44 @@ public class StreamProxyItem { @Schema(description = "拉流代理时zlm返回的key,用于停止拉流代理") private String streamKey; + @Schema(description = "国标ID") + private String gbId; + + @Schema(description = "名称") + private String name; + @Schema(description = "经度") + private double longitude; + @Schema(description = "纬度") + private double latitude; + @Schema(description = "状态") + private boolean status; + @Schema(description = "创建时间") + private String createTime; + @Schema(description = "更新时间") + private String updateTime; + /** * 国标通用信息ID */ @Schema(description = "国标通用信息ID") private int commonGbChannelId; + public int getId() { + return id; + } + + public void setId(int id) { + this.id = id; + } + + public String getCreateTime() { + return createTime; + } + + public String getUpdateTime() { + return updateTime; + } + public String getType() { return type; } @@ -189,4 +224,60 @@ public class StreamProxyItem { public void setCommonGbChannelId(int commonGbChannelId) { this.commonGbChannelId = commonGbChannelId; } + + public String getGbId() { + return gbId; + } + + public void setGbId(String gbId) { + this.gbId = gbId; + } + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + public double getLongitude() { + return longitude; + } + + public void setLongitude(double longitude) { + this.longitude = longitude; + } + + public double getLatitude() { + return latitude; + } + + public void setLatitude(double latitude) { + this.latitude = latitude; + } + + public boolean isStatus() { + return status; + } + + public void setStatus(boolean status) { + this.status = status; + } + + public String isCreateTime() { + return createTime; + } + + public void setCreateTime(String createTime) { + this.createTime = createTime; + } + + public String isUpdateTime() { + return updateTime; + } + + public void setUpdateTime(String updateTime) { + this.updateTime = updateTime; + } } diff --git a/src/main/java/com/genersoft/iot/vmp/service/ICommonGbChannelService.java b/src/main/java/com/genersoft/iot/vmp/service/ICommonGbChannelService.java index 255db999..0d608fd2 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/ICommonGbChannelService.java +++ b/src/main/java/com/genersoft/iot/vmp/service/ICommonGbChannelService.java @@ -82,4 +82,7 @@ public interface ICommonGbChannelService { void batchDelete(List allCommonChannelsForDelete); + void deleteById(int commonGbChannelId); + + void deleteByIdList(List commonChannelIdList); } diff --git a/src/main/java/com/genersoft/iot/vmp/service/IStreamProxyService.java b/src/main/java/com/genersoft/iot/vmp/service/IStreamProxyService.java index c4968a74..73cabf60 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/IStreamProxyService.java +++ b/src/main/java/com/genersoft/iot/vmp/service/IStreamProxyService.java @@ -4,7 +4,7 @@ import com.alibaba.fastjson2.JSONObject; import com.genersoft.iot.vmp.common.GeneralCallback; import com.genersoft.iot.vmp.common.StreamInfo; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; -import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem; +import com.genersoft.iot.vmp.media.zlm.dto.StreamProxy; import com.genersoft.iot.vmp.vmanager.bean.ResourceBaseInfo; import com.github.pagehelper.PageInfo; @@ -14,21 +14,21 @@ public interface IStreamProxyService { * 保存视频代理 * @param param */ - void save(StreamProxyItem param, GeneralCallback callback); + void save(StreamProxy param, GeneralCallback callback); /** * 添加视频代理到zlm * @param param * @return */ - JSONObject addStreamProxyToZlm(StreamProxyItem param); + JSONObject addStreamProxyToZlm(StreamProxy param); /** * 从zlm移除视频代理 * @param param * @return */ - JSONObject removeStreamProxyFromZlm(StreamProxyItem param); + JSONObject removeStreamProxyFromZlm(StreamProxy param); /** * 分页查询 @@ -36,7 +36,7 @@ public interface IStreamProxyService { * @param count * @return */ - PageInfo getAll(Integer page, Integer count); + PageInfo getAll(Integer page, Integer count); /** * 删除视频代理 @@ -81,7 +81,7 @@ public interface IStreamProxyService { * 根据app与stream获取streamProxy * @return */ - StreamProxyItem getStreamProxyByAppAndStream(String app, String streamId); + StreamProxy getStreamProxyByAppAndStream(String app, String streamId); /** @@ -103,7 +103,7 @@ public interface IStreamProxyService { /** * 更新代理流 */ - boolean updateStreamProxy(StreamProxyItem streamProxyItem); + boolean updateStreamProxy(StreamProxy streamProxyItem); /** * 获取统计信息 diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/CommonGbChannelServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/CommonGbChannelServiceImpl.java index 83719588..a4002869 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/CommonGbChannelServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/CommonGbChannelServiceImpl.java @@ -765,4 +765,9 @@ public class CommonGbChannelServiceImpl implements ICommonGbChannelService { } // TODO 向国标级联发送catalog } + + @Override + public void deleteById(int commonGbChannelId) { + + } } diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceAlarmServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceAlarmServiceImpl.java index 8c55986e..e2c23065 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceAlarmServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceAlarmServiceImpl.java @@ -1,7 +1,6 @@ package com.genersoft.iot.vmp.service.impl; import com.genersoft.iot.vmp.gb28181.bean.DeviceAlarm; -import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem; import com.genersoft.iot.vmp.service.IDeviceAlarmService; import com.genersoft.iot.vmp.storager.dao.DeviceAlarmMapper; import com.github.pagehelper.PageHelper; diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java index 7fbe7691..ce147a27 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java @@ -2,6 +2,7 @@ package com.genersoft.iot.vmp.service.impl; import com.alibaba.fastjson2.JSONArray; import com.alibaba.fastjson2.JSONObject; +import com.genersoft.iot.vmp.common.CommonGbChannel; import com.genersoft.iot.vmp.common.GeneralCallback; import com.genersoft.iot.vmp.common.StreamInfo; import com.genersoft.iot.vmp.conf.DynamicTask; @@ -15,17 +16,15 @@ import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe; import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory; import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; -import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem; +import com.genersoft.iot.vmp.media.zlm.dto.StreamProxy; import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam; -import com.genersoft.iot.vmp.service.IGbStreamService; +import com.genersoft.iot.vmp.service.ICommonGbChannelService; import com.genersoft.iot.vmp.service.IMediaServerService; import com.genersoft.iot.vmp.service.IMediaService; import com.genersoft.iot.vmp.service.IStreamProxyService; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.IVideoManagerStorage; -import com.genersoft.iot.vmp.storager.dao.GbStreamMapper; import com.genersoft.iot.vmp.storager.dao.ParentPlatformMapper; -import com.genersoft.iot.vmp.storager.dao.PlatformGbStreamMapper; import com.genersoft.iot.vmp.storager.dao.StreamProxyMapper; import com.genersoft.iot.vmp.utils.DateUtil; import com.genersoft.iot.vmp.vmanager.bean.ErrorCode; @@ -38,12 +37,10 @@ import org.springframework.jdbc.datasource.DataSourceTransactionManager; import org.springframework.stereotype.Service; import org.springframework.transaction.TransactionDefinition; import org.springframework.transaction.TransactionStatus; +import org.springframework.transaction.annotation.Transactional; import org.springframework.util.ObjectUtils; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.UUID; +import java.util.*; /** * 视频代理业务 @@ -78,10 +75,7 @@ public class StreamProxyServiceImpl implements IStreamProxyService { private UserSetting userSetting; @Autowired - private GbStreamMapper gbStreamMapper; - - @Autowired - private PlatformGbStreamMapper platformGbStreamMapper; + private ICommonGbChannelService commonGbChannelService; @Autowired private EventPublisher eventPublisher; @@ -89,9 +83,6 @@ public class StreamProxyServiceImpl implements IStreamProxyService { @Autowired private ParentPlatformMapper parentPlatformMapper; - @Autowired - private IGbStreamService gbStreamService; - @Autowired private IMediaServerService mediaServerService; @@ -109,7 +100,8 @@ public class StreamProxyServiceImpl implements IStreamProxyService { @Override - public void save(StreamProxyItem param, GeneralCallback callback) { + @Transactional + public void save(StreamProxy param, GeneralCallback callback) { MediaServerItem mediaInfo; if (ObjectUtils.isEmpty(param.getMediaServerId()) || "auto".equals(param.getMediaServerId())){ mediaInfo = mediaServerService.getMediaServerForMinimumLoad(null); @@ -155,16 +147,30 @@ public class StreamProxyServiceImpl implements IStreamProxyService { param.setDstUrl(dstUrl); logger.info("[拉流代理] 输出地址为:{}", dstUrl); param.setMediaServerId(mediaInfo.getId()); - boolean saveResult; // 更新 - if (videoManagerStorager.queryStreamProxy(param.getApp(), param.getStream()) != null) { - saveResult = updateStreamProxy(param); + StreamProxy streamProxyInDb = videoManagerStorager.queryStreamProxy(param.getApp(), param.getStream()); + if (streamProxyInDb != null) { + if (streamProxyInDb.getCommonGbChannelId() == 0 && param.getGbId() != null ) { + // 新增通用通道 + CommonGbChannel commonGbChannel = CommonGbChannel.getInstance(param); + commonGbChannelService.add(commonGbChannel); + param.setCommonGbChannelId(commonGbChannel.getCommonGbId()); + } + if (streamProxyInDb.getCommonGbChannelId() > 0 && param.getGbId() == null ) { + // 移除通用通道 + commonGbChannelService.deleteById(streamProxyInDb.getCommonGbChannelId()); + } + param.setUpdateTime(DateUtil.getNow()); + streamProxyMapper.update(param); }else { // 新增 - saveResult = addStreamProxy(param); - } - if (!saveResult) { - callback.run(ErrorCode.ERROR100.getCode(), "保存失败", null); - return; + // 新增通用通道 + CommonGbChannel commonGbChannel = CommonGbChannel.getInstance(param); + commonGbChannelService.add(commonGbChannel); + param.setCommonGbChannelId(commonGbChannel.getCommonGbId()); + + param.setCreateTime(DateUtil.getNow()); + param.setUpdateTime(DateUtil.getNow()); + streamProxyMapper.add(param); } HookSubscribeForStreamChange hookSubscribeForStreamChange = HookSubscribeFactory.on_stream_changed(param.getApp(), param.getStream(), true, "rtsp", mediaInfo.getId()); hookSubscribe.addSubscribe(hookSubscribeForStreamChange, (mediaServerItem, response) -> { @@ -232,79 +238,19 @@ public class StreamProxyServiceImpl implements IStreamProxyService { return null; } - /** - * 新增代理流 - * @param streamProxyItem - * @return - */ - private boolean addStreamProxy(StreamProxyItem streamProxyItem) { - TransactionStatus transactionStatus = dataSourceTransactionManager.getTransaction(transactionDefinition); - boolean result = false; - streamProxyItem.setStreamType("proxy"); - streamProxyItem.setStatus(true); - String now = DateUtil.getNow(); - streamProxyItem.setCreateTime(now); - try { - if (streamProxyMapper.add(streamProxyItem) > 0) { - if (!ObjectUtils.isEmpty(streamProxyItem.getGbId())) { - if (gbStreamMapper.add(streamProxyItem) < 0) { - //事务回滚 - dataSourceTransactionManager.rollback(transactionStatus); - return false; - } - } - }else { - //事务回滚 - dataSourceTransactionManager.rollback(transactionStatus); - return false; - } - result = true; - dataSourceTransactionManager.commit(transactionStatus); //手动提交 - }catch (Exception e) { - logger.error("向数据库添加流代理失败:", e); - dataSourceTransactionManager.rollback(transactionStatus); - } - - - return result; - } - /** * 更新代理流 * @param streamProxyItem * @return */ @Override - public boolean updateStreamProxy(StreamProxyItem streamProxyItem) { - TransactionStatus transactionStatus = dataSourceTransactionManager.getTransaction(transactionDefinition); - boolean result = false; - streamProxyItem.setStreamType("proxy"); - try { - if (streamProxyMapper.update(streamProxyItem) > 0) { - if (!ObjectUtils.isEmpty(streamProxyItem.getGbId())) { - if (gbStreamMapper.updateByAppAndStream(streamProxyItem) == 0) { - //事务回滚 - dataSourceTransactionManager.rollback(transactionStatus); - return false; - } - } - } else { - //事务回滚 - dataSourceTransactionManager.rollback(transactionStatus); - return false; - } - - dataSourceTransactionManager.commit(transactionStatus); //手动提交 - result = true; - }catch (Exception e) { - logger.error("未处理的异常 ", e); - dataSourceTransactionManager.rollback(transactionStatus); - } - return result; + public boolean updateStreamProxy(StreamProxy streamProxyItem) { + streamProxyItem.setCreateTime(DateUtil.getNow()); + return streamProxyMapper.update(streamProxyItem) > 0; } @Override - public JSONObject addStreamProxyToZlm(StreamProxyItem param) { + public JSONObject addStreamProxyToZlm(StreamProxy param) { JSONObject result = null; MediaServerItem mediaServerItem = null; if (param.getMediaServerId() == null) { @@ -347,7 +293,7 @@ public class StreamProxyServiceImpl implements IStreamProxyService { } @Override - public JSONObject removeStreamProxyFromZlm(StreamProxyItem param) { + public JSONObject removeStreamProxyFromZlm(StreamProxy param) { if (param ==null) { return null; } @@ -362,19 +308,17 @@ public class StreamProxyServiceImpl implements IStreamProxyService { } @Override - public PageInfo getAll(Integer page, Integer count) { + public PageInfo getAll(Integer page, Integer count) { return videoManagerStorager.queryStreamProxyList(page, count); } @Override public void del(String app, String stream) { - StreamProxyItem streamProxyItem = videoManagerStorager.queryStreamProxy(app, stream); + StreamProxy streamProxyItem = videoManagerStorager.queryStreamProxy(app, stream); if (streamProxyItem != null) { - gbStreamService.sendCatalogMsg(streamProxyItem, CatalogEvent.DEL); - - // 如果关联了国标那么移除关联 - platformGbStreamMapper.delByAppAndStream(app, stream); - gbStreamMapper.del(app, stream); + if (streamProxyItem.getCommonGbChannelId() > 0) { + commonGbChannelService.deleteById(streamProxyItem.getCommonGbChannelId()); + } videoManagerStorager.deleteStreamProxy(app, stream); redisCatchStorage.removeStream(streamProxyItem.getMediaServerId(), "PULL", app, stream); JSONObject jsonObject = removeStreamProxyFromZlm(streamProxyItem); @@ -389,7 +333,7 @@ public class StreamProxyServiceImpl implements IStreamProxyService { @Override public boolean start(String app, String stream) { boolean result = false; - StreamProxyItem streamProxy = videoManagerStorager.queryStreamProxy(app, stream); + StreamProxy streamProxy = videoManagerStorager.queryStreamProxy(app, stream); if (streamProxy != null && !streamProxy.isEnable() ) { JSONObject jsonObject = addStreamProxyToZlm(streamProxy); if (jsonObject == null) { @@ -410,7 +354,7 @@ public class StreamProxyServiceImpl implements IStreamProxyService { @Override public boolean stop(String app, String stream) { boolean result = false; - StreamProxyItem streamProxyDto = videoManagerStorager.queryStreamProxy(app, stream); + StreamProxy streamProxyDto = videoManagerStorager.queryStreamProxy(app, stream); if (streamProxyDto != null && streamProxyDto.isEnable()) { JSONObject jsonObject = removeStreamProxyFromZlm(streamProxyDto); if (jsonObject != null && jsonObject.getInteger("code") == 0) { @@ -440,16 +384,24 @@ public class StreamProxyServiceImpl implements IStreamProxyService { @Override - public StreamProxyItem getStreamProxyByAppAndStream(String app, String streamId) { + public StreamProxy getStreamProxyByAppAndStream(String app, String streamId) { return videoManagerStorager.getStreamProxyByAppAndStream(app, streamId); } @Override public void zlmServerOnline(String mediaServerId) { // 移除开启了无人观看自动移除的流 - List streamProxyItemList = streamProxyMapper.selectAutoRemoveItemByMediaServerId(mediaServerId); - if (streamProxyItemList.size() > 0) { - gbStreamMapper.batchDel(streamProxyItemList); + List streamProxyItemList = streamProxyMapper.selectAutoRemoveItemByMediaServerId(mediaServerId); + List commonChannelIdList = new ArrayList<>(); + if (!streamProxyItemList.isEmpty()) { + streamProxyItemList.stream().forEach(streamProxy -> { + if (streamProxy.getCommonGbChannelId() > 0) { + commonChannelIdList.add(streamProxy.getCommonGbChannelId()); + } + }); + } + if (!commonChannelIdList.isEmpty()) { + commonGbChannelService.deleteByIdList(commonChannelIdList); } streamProxyMapper.deleteAutoRemoveItemByMediaServerId(mediaServerId); @@ -457,9 +409,9 @@ public class StreamProxyServiceImpl implements IStreamProxyService { // syncPullStream(mediaServerId); // 恢复流代理, 只查找这个这个流媒体 - List streamProxyListForEnable = storager.getStreamProxyListForEnableInMediaServer( + List streamProxyListForEnable = storager.getStreamProxyListForEnableInMediaServer( mediaServerId, true); - for (StreamProxyItem streamProxyDto : streamProxyListForEnable) { + for (StreamProxy streamProxyDto : streamProxyListForEnable) { logger.info("恢复流代理," + streamProxyDto.getApp() + "/" + streamProxyDto.getStream()); JSONObject jsonObject = addStreamProxyToZlm(streamProxyDto); if (jsonObject == null) { @@ -475,9 +427,17 @@ public class StreamProxyServiceImpl implements IStreamProxyService { @Override public void zlmServerOffline(String mediaServerId) { // 移除开启了无人观看自动移除的流 - List streamProxyItemList = streamProxyMapper.selectAutoRemoveItemByMediaServerId(mediaServerId); - if (streamProxyItemList.size() > 0) { - gbStreamMapper.batchDel(streamProxyItemList); + List streamProxyItemList = streamProxyMapper.selectAutoRemoveItemByMediaServerId(mediaServerId); + List commonChannelIdList = new ArrayList<>(); + if (!streamProxyItemList.isEmpty()) { + streamProxyItemList.stream().forEach(streamProxy -> { + if (streamProxy.getCommonGbChannelId() > 0) { + commonChannelIdList.add(streamProxy.getCommonGbChannelId()); + } + }); + } + if (!commonChannelIdList.isEmpty()) { + commonGbChannelService.deleteByIdList(commonChannelIdList); } streamProxyMapper.deleteAutoRemoveItemByMediaServerId(mediaServerId); // 其他的流设置离线 diff --git a/src/main/java/com/genersoft/iot/vmp/storager/IVideoManagerStorage.java b/src/main/java/com/genersoft/iot/vmp/storager/IVideoManagerStorage.java index c1d3e307..48aa6613 100755 --- a/src/main/java/com/genersoft/iot/vmp/storager/IVideoManagerStorage.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/IVideoManagerStorage.java @@ -1,7 +1,7 @@ package com.genersoft.iot.vmp.storager; import com.genersoft.iot.vmp.gb28181.bean.*; -import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem; +import com.genersoft.iot.vmp.media.zlm.dto.StreamProxy; import com.genersoft.iot.vmp.service.bean.GPSMsgInfo; import com.genersoft.iot.vmp.storager.dao.dto.ChannelSourceInfo; import com.genersoft.iot.vmp.vmanager.gb28181.platform.bean.ChannelReduce; @@ -172,7 +172,7 @@ public interface IVideoManagerStorage { * @param stream * @return */ - public StreamProxyItem queryStreamProxy(String app, String stream); + public StreamProxy queryStreamProxy(String app, String stream); /** * 获取代理流 @@ -180,7 +180,7 @@ public interface IVideoManagerStorage { * @param count * @return */ - PageInfo queryStreamProxyList(Integer page, Integer count); + PageInfo queryStreamProxyList(Integer page, Integer count); /** * 根据国标ID获取平台关联的直播流 @@ -215,7 +215,7 @@ public interface IVideoManagerStorage { * @param enable 启用/不启用 * @return */ - List getStreamProxyListForEnableInMediaServer(String id, boolean enable); + List getStreamProxyListForEnableInMediaServer(String id, boolean enable); /** * 通道上线 @@ -235,7 +235,7 @@ public interface IVideoManagerStorage { * @param streamId * @return */ - StreamProxyItem getStreamProxyByAppAndStream(String app, String streamId); + StreamProxy getStreamProxyByAppAndStream(String app, String streamId); int updateStreamGPS(List gpsMsgInfo); diff --git a/src/main/java/com/genersoft/iot/vmp/storager/dao/StreamProxyMapper.java b/src/main/java/com/genersoft/iot/vmp/storager/dao/StreamProxyMapper.java index 6ad36cef..4c0864f6 100755 --- a/src/main/java/com/genersoft/iot/vmp/storager/dao/StreamProxyMapper.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/dao/StreamProxyMapper.java @@ -1,6 +1,6 @@ package com.genersoft.iot.vmp.storager.dao; -import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem; +import com.genersoft.iot.vmp.media.zlm.dto.StreamProxy; import com.genersoft.iot.vmp.vmanager.bean.ResourceBaseInfo; import org.apache.ibatis.annotations.*; import org.springframework.stereotype.Repository; @@ -16,7 +16,7 @@ public interface StreamProxyMapper { "(#{type}, #{name}, #{app}, #{stream}, #{mediaServerId}, #{url}, #{srcUrl}, #{dstUrl}, " + "#{timeoutMs}, #{ffmpegCmdKey}, #{rtpType}, #{enableAudio}, #{enableMp4}, #{enable}, #{status}, #{streamKey}, " + "#{enableRemoveNoneReader}, #{enableDisableNoneReader}, #{createTime} )") - int add(StreamProxyItem streamProxyDto); + int add(StreamProxy streamProxyDto); @Update("UPDATE wvp_stream_proxy " + "SET type=#{type}, " + @@ -38,29 +38,29 @@ public interface StreamProxyMapper { "enable_disable_none_reader=#{enableDisableNoneReader}, " + "enable_mp4=#{enableMp4} " + "WHERE app=#{app} AND stream=#{stream}") - int update(StreamProxyItem streamProxyDto); + int update(StreamProxy streamProxyDto); @Delete("DELETE FROM wvp_stream_proxy WHERE app=#{app} AND stream=#{stream}") int del(String app, String stream); @Select("SELECT st.*, pgs.gb_id, pgs.name, pgs.longitude, pgs.latitude FROM wvp_stream_proxy st LEFT join wvp_gb_stream pgs on st.app = pgs.app AND st.stream = pgs.stream order by st.create_time desc") - List selectAll(); + List selectAll(); @Select("SELECT st.*, pgs.gb_id, pgs.name, pgs.longitude, pgs.latitude, 'proxy' as streamType FROM wvp_stream_proxy st LEFT join wvp_gb_stream pgs on st.app = pgs.app AND st.stream = pgs.stream WHERE st.enable=#{enable} order by st.create_time desc") - List selectForEnable(boolean enable); + List selectForEnable(boolean enable); @Select("SELECT st.*, pgs.gb_id, pgs.name, pgs.longitude, pgs.latitude FROM wvp_stream_proxy st LEFT join wvp_gb_stream pgs on st.app = pgs.app AND st.stream = pgs.stream WHERE st.app=#{app} AND st.stream=#{stream} order by st.create_time desc") - StreamProxyItem selectOne(@Param("app") String app, @Param("stream") String stream); + StreamProxy selectOne(@Param("app") String app, @Param("stream") String stream); @Select("SELECT st.*, pgs.gb_id, pgs.name, pgs.longitude, pgs.latitude FROM wvp_stream_proxy st " + "LEFT join wvp_gb_stream pgs on st.app = pgs.app AND st.stream = pgs.stream " + "WHERE st.enable=#{enable} and st.media_server_id= #{id} order by st.create_time desc") - List selectForEnableInMediaServer( @Param("id") String id, @Param("enable") boolean enable); + List selectForEnableInMediaServer(@Param("id") String id, @Param("enable") boolean enable); @Select("SELECT st.*, pgs.gb_id, pgs.name, pgs.longitude, pgs.latitude FROM wvp_stream_proxy st " + "LEFT join wvp_gb_stream pgs on st.app = pgs.app AND st.stream = pgs.stream " + "WHERE st.media_server_id= #{id} order by st.create_time desc") - List selectInMediaServer(String id); + List selectInMediaServer(String id); @Update("UPDATE wvp_stream_proxy " + "SET status=#{status} " + @@ -76,7 +76,7 @@ public interface StreamProxyMapper { void deleteAutoRemoveItemByMediaServerId(String mediaServerId); @Select("SELECT st.*, pgs.gb_id, pgs.name, pgs.longitude, pgs.latitude FROM wvp_stream_proxy st LEFT join wvp_gb_stream pgs on st.app = pgs.app AND st.stream = pgs.stream WHERE st.enable_remove_none_reader=true AND st.media_server_id=#{mediaServerId} order by st.create_time desc") - List selectAutoRemoveItemByMediaServerId(String mediaServerId); + List selectAutoRemoveItemByMediaServerId(String mediaServerId); @Select("select count(1) as total, sum(status) as online from wvp_stream_proxy") ResourceBaseInfo getOverview(); diff --git a/src/main/java/com/genersoft/iot/vmp/storager/impl/VideoManagerStorageImpl.java b/src/main/java/com/genersoft/iot/vmp/storager/impl/VideoManagerStorageImpl.java index 5dd4518d..86603162 100755 --- a/src/main/java/com/genersoft/iot/vmp/storager/impl/VideoManagerStorageImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/impl/VideoManagerStorageImpl.java @@ -4,11 +4,9 @@ import com.genersoft.iot.vmp.conf.SipConfig; import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.gb28181.bean.*; import com.genersoft.iot.vmp.gb28181.event.EventPublisher; -import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent; -import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem; +import com.genersoft.iot.vmp.media.zlm.dto.StreamProxy; import com.genersoft.iot.vmp.service.IGbStreamService; import com.genersoft.iot.vmp.service.bean.GPSMsgInfo; -import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.IVideoManagerStorage; import com.genersoft.iot.vmp.storager.dao.*; import com.genersoft.iot.vmp.storager.dao.dto.ChannelSourceInfo; @@ -23,13 +21,8 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.jdbc.datasource.DataSourceTransactionManager; import org.springframework.stereotype.Component; import org.springframework.transaction.TransactionDefinition; -import org.springframework.transaction.TransactionStatus; -import org.springframework.transaction.annotation.Transactional; -import org.springframework.util.CollectionUtils; -import org.springframework.util.ObjectUtils; import java.util.*; -import java.util.concurrent.ConcurrentHashMap; /** * 视频设备数据存储-jdbc实现 @@ -312,9 +305,9 @@ public class VideoManagerStorageImpl implements IVideoManagerStorage { * @return */ @Override - public PageInfo queryStreamProxyList(Integer page, Integer count) { + public PageInfo queryStreamProxyList(Integer page, Integer count) { PageHelper.startPage(page, count); - List all = streamProxyMapper.selectAll(); + List all = streamProxyMapper.selectAll(); return new PageInfo<>(all); } @@ -346,7 +339,7 @@ public class VideoManagerStorageImpl implements IVideoManagerStorage { * @return */ @Override - public StreamProxyItem queryStreamProxy(String app, String stream){ + public StreamProxy queryStreamProxy(String app, String stream){ return streamProxyMapper.selectOne(app, stream); } @@ -368,24 +361,15 @@ public class VideoManagerStorageImpl implements IVideoManagerStorage { } @Override - public List getStreamProxyListForEnableInMediaServer(String id, boolean enable) { + public List getStreamProxyListForEnableInMediaServer(String id, boolean enable) { return streamProxyMapper.selectForEnableInMediaServer(id, enable); } @Override - public StreamProxyItem getStreamProxyByAppAndStream(String app, String streamId) { + public StreamProxy getStreamProxyByAppAndStream(String app, String streamId) { return streamProxyMapper.selectOne(app, streamId); } - private PlatformCatalog getTopCatalog(String id, String platformId) { - PlatformCatalog catalog = catalogMapper.selectByPlatFormAndCatalogId(platformId, id); - if (catalog.getParentId().equals(platformId)) { - return catalog; - }else { - return getTopCatalog(catalog.getParentId(), platformId); - } - } - @Override public int updateStreamGPS(List gpsMsgInfos) { return gbStreamMapper.updateStreamGPS(gpsMsgInfos); diff --git a/src/main/java/com/genersoft/iot/vmp/vmanager/streamProxy/StreamProxyController.java b/src/main/java/com/genersoft/iot/vmp/vmanager/streamProxy/StreamProxyController.java index e28ca11a..ce3c835e 100755 --- a/src/main/java/com/genersoft/iot/vmp/vmanager/streamProxy/StreamProxyController.java +++ b/src/main/java/com/genersoft/iot/vmp/vmanager/streamProxy/StreamProxyController.java @@ -7,7 +7,7 @@ import com.genersoft.iot.vmp.conf.exception.ControllerException; import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder; import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; -import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem; +import com.genersoft.iot.vmp.media.zlm.dto.StreamProxy; import com.genersoft.iot.vmp.service.IMediaServerService; import com.genersoft.iot.vmp.service.IStreamProxyService; import com.genersoft.iot.vmp.vmanager.bean.ErrorCode; @@ -59,10 +59,10 @@ public class StreamProxyController { @Parameter(name = "online", description = "是否在线") @GetMapping(value = "/list") @ResponseBody - public PageInfo list(@RequestParam(required = false)Integer page, - @RequestParam(required = false)Integer count, - @RequestParam(required = false)String query, - @RequestParam(required = false)Boolean online ){ + public PageInfo list(@RequestParam(required = false)Integer page, + @RequestParam(required = false)Integer count, + @RequestParam(required = false)String query, + @RequestParam(required = false)Boolean online ){ return streamProxyService.getAll(page, count); } @@ -72,7 +72,7 @@ public class StreamProxyController { @Parameter(name = "stream", description = "流Id") @GetMapping(value = "/one") @ResponseBody - public StreamProxyItem one(String app, String stream){ + public StreamProxy one(String app, String stream){ return streamProxyService.getStreamProxyByAppAndStream(app, stream); } @@ -82,7 +82,7 @@ public class StreamProxyController { }) @PostMapping(value = "/save") @ResponseBody - public DeferredResult save(@RequestBody StreamProxyItem param){ + public DeferredResult save(@RequestBody StreamProxy param){ logger.info("添加代理: " + JSONObject.toJSONString(param)); if (ObjectUtils.isEmpty(param.getMediaServerId())) { param.setMediaServerId("auto"); @@ -96,35 +96,27 @@ public class StreamProxyController { if (ObjectUtils.isEmpty(param.getGbId())) { param.setGbId(null); } - StreamProxyItem streamProxyItem = streamProxyService.getStreamProxyByAppAndStream(param.getApp(), param.getStream()); + StreamProxy streamProxyItem = streamProxyService.getStreamProxyByAppAndStream(param.getApp(), param.getStream()); if (streamProxyItem != null) { streamProxyService.del(param.getApp(), param.getStream()); } - RequestMessage requestMessage = new RequestMessage(); - String key = DeferredResultHolder.CALLBACK_CMD_PROXY + param.getApp() + param.getStream(); - requestMessage.setKey(key); - String uuid = UUID.randomUUID().toString(); - requestMessage.setId(uuid); DeferredResult result = new DeferredResult<>(userSetting.getPlayTimeout().longValue()); // 录像查询以channelId作为deviceId查询 - resultHolder.put(key, uuid, result); result.onTimeout(()->{ WVPResult wvpResult = new WVPResult<>(); wvpResult.setCode(ErrorCode.ERROR100.getCode()); wvpResult.setMsg("超时"); - requestMessage.setData(wvpResult); - resultHolder.invokeAllResult(requestMessage); + result.setResult(wvpResult); }); streamProxyService.save(param, (code, msg, streamInfo) -> { logger.info("[拉流代理] {}", code == ErrorCode.SUCCESS.getCode()? "成功":"失败: " + msg); if (code == ErrorCode.SUCCESS.getCode()) { - requestMessage.setData(new StreamContent(streamInfo)); + result.setResult(new StreamContent(streamInfo)); }else { - requestMessage.setData(WVPResult.fail(code, msg)); + result.setResult(WVPResult.fail(code, msg)); } - resultHolder.invokeAllResult(requestMessage); }); return result; } diff --git a/src/main/java/com/genersoft/iot/vmp/vmanager/streamPush/StreamPushController.java b/src/main/java/com/genersoft/iot/vmp/vmanager/streamPush/StreamPushController.java index bbe345c5..3eade8e8 100755 --- a/src/main/java/com/genersoft/iot/vmp/vmanager/streamPush/StreamPushController.java +++ b/src/main/java/com/genersoft/iot/vmp/vmanager/streamPush/StreamPushController.java @@ -163,7 +163,6 @@ public class StreamPushController { //获取文件流 InputStream inputStream = null; try { - String name = file.getName(); inputStream = file.getInputStream(); } catch (IOException e) { logger.error("未处理的异常 ", e);