From d72f57c772b8995be99a91a384a821e211bb4d0b Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: Fri, 24 Nov 2023 10:12:36 +0800 Subject: [PATCH] =?UTF-8?q?=E6=94=AF=E6=8C=81=E9=80=9A=E7=94=A8=E9=80=9A?= =?UTF-8?q?=E9=81=93=E5=9B=BD=E6=A0=87=E7=B1=BB=E5=9E=8B=E7=9A=84=E6=92=AD?= =?UTF-8?q?=E6=94=BE?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../iot/vmp/common/CommonGbChannel.java | 8 + .../gb28181/GB28181ResourceServiceImpl.java | 123 ++++++++ .../callback/DeferredResultHolder.java | 3 + .../vmp/service/ICommonGbChannelService.java | 10 +- .../vmp/service/IDeviceChannelService.java | 1 + .../iot/vmp/service/IPlayService.java | 1 + .../vmp/service/IResourcePlayCallback.java | 19 ++ .../iot/vmp/service/IResourceService.java | 40 +++ .../impl/CommonGbChannelServiceImpl.java | 27 +- .../impl/DeviceChannelServiceImpl.java | 59 ++-- .../iot/vmp/service/impl/PlayServiceImpl.java | 33 ++- .../vmp/storager/dao/DeviceChannelMapper.java | 17 ++ .../channel/CommonChannelController.java | 274 +++++++++++------- .../vmanager/gb28181/play/PlayController.java | 22 +- 14 files changed, 474 insertions(+), 163 deletions(-) create mode 100644 src/main/java/com/genersoft/iot/vmp/gb28181/GB28181ResourceServiceImpl.java create mode 100644 src/main/java/com/genersoft/iot/vmp/service/IResourcePlayCallback.java create mode 100644 src/main/java/com/genersoft/iot/vmp/service/IResourceService.java diff --git a/src/main/java/com/genersoft/iot/vmp/common/CommonGbChannel.java b/src/main/java/com/genersoft/iot/vmp/common/CommonGbChannel.java index 8d607e82..3730cc54 100644 --- a/src/main/java/com/genersoft/iot/vmp/common/CommonGbChannel.java +++ b/src/main/java/com/genersoft/iot/vmp/common/CommonGbChannel.java @@ -1,7 +1,10 @@ package com.genersoft.iot.vmp.common; +import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel; import io.swagger.v3.oas.annotations.media.Schema; +import java.util.List; + public class CommonGbChannel { @@ -597,4 +600,9 @@ public class CommonGbChannel { public void setCreateTime(String createTime) { this.createTime = createTime; } + + + public static CommonGbChannel getInstance(List syncKeys, DeviceChannel channel){ + + } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/GB28181ResourceServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/gb28181/GB28181ResourceServiceImpl.java new file mode 100644 index 00000000..83317dec --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/GB28181ResourceServiceImpl.java @@ -0,0 +1,123 @@ +package com.genersoft.iot.vmp.gb28181; + +import com.genersoft.iot.vmp.common.CommonGbChannel; +import com.genersoft.iot.vmp.common.StreamInfo; +import com.genersoft.iot.vmp.conf.exception.ControllerException; +import com.genersoft.iot.vmp.gb28181.bean.Device; +import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel; +import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; +import com.genersoft.iot.vmp.service.IDeviceChannelService; +import com.genersoft.iot.vmp.service.IPlayService; +import com.genersoft.iot.vmp.service.IResourcePlayCallback; +import com.genersoft.iot.vmp.service.IResourceService; +import com.genersoft.iot.vmp.service.bean.InviteErrorCode; +import com.genersoft.iot.vmp.storager.dao.DeviceChannelMapper; +import com.genersoft.iot.vmp.storager.dao.DeviceMapper; +import com.genersoft.iot.vmp.storager.impl.RedisCatchStorageImpl; +import com.genersoft.iot.vmp.vmanager.bean.ErrorCode; +import com.genersoft.iot.vmp.vmanager.bean.StreamContent; +import com.genersoft.iot.vmp.vmanager.bean.WVPResult; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +import java.net.MalformedURLException; +import java.net.URL; + +/** + * 国标的资源实现类 + */ +@Service("28181") +public class GB28181ResourceServiceImpl implements IResourceService { + + private final Logger logger = LoggerFactory.getLogger(GB28181ResourceServiceImpl.class); + + public static final String resourceType = "28181"; + + @Autowired + private DeviceMapper deviceMapper; + + @Autowired + private DeviceChannelMapper deviceChannelMapper; + + @Autowired + private IPlayService playService; + + @Override + public boolean deleteChannel(CommonGbChannel commonGbChannel) { + if (!GB28181ResourceServiceImpl.resourceType.equals(commonGbChannel.getType())) { + logger.warn("[资源类-国标28181] 收到移除通道: {} 时发现类型不为28181", commonGbChannel.getCommonGbId()); + return false; + } + return deviceChannelMapper.removeCommonChannelId(commonGbChannel.getCommonGbId()) > 0; + } + + @Override + public void startPlay(CommonGbChannel commonGbChannel, IResourcePlayCallback callback) { + assert callback != null; + if (!GB28181ResourceServiceImpl.resourceType.equals(commonGbChannel.getType())) { + logger.warn("[资源类-国标28181] 收到播放通道: {} 时发现类型不为28181", commonGbChannel.getCommonGbId()); + callback.call(commonGbChannel, ErrorCode.ERROR500.getCode(), ErrorCode.ERROR500.getMsg(), null); + return; + } + DeviceChannel channel = deviceChannelMapper.getChannelByCommonChannelId(commonGbChannel.getCommonGbId()); + if (channel == null) { + logger.warn("[资源类-国标28181] 收到播放通道: {} 时未找到国标通道", commonGbChannel.getCommonGbId()); + callback.call(commonGbChannel, ErrorCode.ERROR500.getCode(), ErrorCode.ERROR500.getMsg(), null); + return; + } + Device device = deviceMapper.getDeviceByDeviceId(channel.getDeviceId()); + if (device == null) { + logger.warn("[资源类-国标28181] 收到播放通道: {} 时未找到通道 {} 所属的国标设备", + commonGbChannel.getCommonGbId(), channel.getDeviceId()); + callback.call(commonGbChannel, ErrorCode.ERROR500.getCode(), ErrorCode.ERROR500.getMsg(), null); + return; + } + MediaServerItem mediaServerItem = playService.getNewMediaServerItem(device); + playService.play(mediaServerItem, channel.getDeviceId(), channel.getChannelId(), null, (code, msg, data) -> { + if (code == InviteErrorCode.SUCCESS.getCode()) { + if (data != null) { + StreamInfo streamInfo = (StreamInfo)data; + callback.call(commonGbChannel, ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), streamInfo); + } + }else { + callback.call(commonGbChannel, code, msg, null); + } + }); + } + + @Override + public void stopPlay(CommonGbChannel commonGbChannel, IResourcePlayCallback callback) { + if (!GB28181ResourceServiceImpl.resourceType.equals(commonGbChannel.getType())) { + logger.warn("[资源类-国标28181] 收到停止播放通道: {} 时发现类型不为28181", commonGbChannel.getCommonGbId()); + if (callback != null) { + callback.call(commonGbChannel, ErrorCode.ERROR500.getCode(), ErrorCode.ERROR500.getMsg(), null); + } + return; + } + DeviceChannel channel = deviceChannelMapper.getChannelByCommonChannelId(commonGbChannel.getCommonGbId()); + if (channel == null) { + logger.warn("[资源类-国标28181] 收到停止播放通道: {} 时未找到国标通道", commonGbChannel.getCommonGbId()); + if (callback != null) { + callback.call(commonGbChannel, ErrorCode.ERROR500.getCode(), ErrorCode.ERROR500.getMsg(), null); + } + return; + } + try { + playService.stop(channel.getDeviceId(), channel.getChannelId()); + } catch (ControllerException exception) { + if (callback != null) { + callback.call(commonGbChannel, exception.getCode(), exception.getMsg(), null); + } + } + } + + @Override + public boolean ptzControl(CommonGbChannel commonGbChannel, + String command, Integer horizonSpeed, + Integer verticalSpeed, Integer zoomSpeed) { + return false; + } + +} diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/callback/DeferredResultHolder.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/callback/DeferredResultHolder.java index 8d0ed7fb..936aa9f2 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/callback/DeferredResultHolder.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/callback/DeferredResultHolder.java @@ -55,6 +55,9 @@ public class DeferredResultHolder { public static final String CALLBACK_CMD_SNAP= "CALLBACK_SNAP"; + + public static final String CALLBACK_CHANNEL_PLAY = "CALLBACK_CHANNEL_PLAY"; + private Map> map = new ConcurrentHashMap<>(); diff --git a/src/main/java/com/genersoft/iot/vmp/service/ICommonGbChannelService.java b/src/main/java/com/genersoft/iot/vmp/service/ICommonGbChannelService.java index 771b3f11..a4abf428 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/ICommonGbChannelService.java +++ b/src/main/java/com/genersoft/iot/vmp/service/ICommonGbChannelService.java @@ -12,6 +12,10 @@ import com.github.pagehelper.PageInfo; import java.util.ArrayList; import java.util.List; +/** + * 通用通道资源管理 + * 接入的资源通过调用这个类实现将自己本身的数据添加到通用通道当中 + */ public interface ICommonGbChannelService { CommonGbChannel getChannel(String channelId); @@ -56,8 +60,6 @@ public interface ICommonGbChannelService { PageInfo queryChannelList(String query, int page, int count); - String getRandomCode(Gb28181CodeType type); - List getIndustryCodeList(); List getDeviceTypeList(); @@ -71,4 +73,8 @@ public interface ICommonGbChannelService { void removeFromRegion(UpdateCommonChannelToRegion params); void updateChannelToRegion(UpdateCommonChannelToRegion params); + + void startPlay(CommonGbChannel channel, IResourcePlayCallback callback); + + void stopPlay(CommonGbChannel channel, IResourcePlayCallback callback); } diff --git a/src/main/java/com/genersoft/iot/vmp/service/IDeviceChannelService.java b/src/main/java/com/genersoft/iot/vmp/service/IDeviceChannelService.java index f109d4b4..016ce7e5 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/IDeviceChannelService.java +++ b/src/main/java/com/genersoft/iot/vmp/service/IDeviceChannelService.java @@ -92,4 +92,5 @@ public interface IDeviceChannelService { * 重置通道 */ boolean resetChannels(Device device, List deviceChannelList); + } diff --git a/src/main/java/com/genersoft/iot/vmp/service/IPlayService.java b/src/main/java/com/genersoft/iot/vmp/service/IPlayService.java index 1effe96c..664ea78f 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/IPlayService.java +++ b/src/main/java/com/genersoft/iot/vmp/service/IPlayService.java @@ -45,4 +45,5 @@ public interface IPlayService { void getSnap(String deviceId, String channelId, String fileName, ErrorCallback errorCallback); + void stop(String deviceId, String channelId); } diff --git a/src/main/java/com/genersoft/iot/vmp/service/IResourcePlayCallback.java b/src/main/java/com/genersoft/iot/vmp/service/IResourcePlayCallback.java new file mode 100644 index 00000000..08f3fce7 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/service/IResourcePlayCallback.java @@ -0,0 +1,19 @@ +package com.genersoft.iot.vmp.service; + +import com.genersoft.iot.vmp.common.CommonGbChannel; +import com.genersoft.iot.vmp.common.StreamInfo; + +/** + * 资源播放回调 + */ +public interface IResourcePlayCallback { + + /** + * 资源播放回调 + * @param commonGbChannel 通道 + * @param code + * @param message + * @param streamInfo + */ + void call(CommonGbChannel commonGbChannel, int code, String message, StreamInfo streamInfo); +} diff --git a/src/main/java/com/genersoft/iot/vmp/service/IResourceService.java b/src/main/java/com/genersoft/iot/vmp/service/IResourceService.java new file mode 100644 index 00000000..541c4536 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/service/IResourceService.java @@ -0,0 +1,40 @@ +package com.genersoft.iot.vmp.service; + +import com.genersoft.iot.vmp.common.CommonGbChannel; +import com.genersoft.iot.vmp.common.StreamInfo; + +/** + * 同用资源接入接口,待接入的资源实现此接口即可自动接入, + * 包括GIS,分屏播放,国标级联等功能 + */ +public interface IResourceService { + + + /** + * 通知资源类通道删除 + */ + boolean deleteChannel(CommonGbChannel commonGbChannel); + + /** + * 开始播放通道 + */ + void startPlay(CommonGbChannel commonGbChannel, IResourcePlayCallback callback); + + /** + * 停止播放通道 + */ + void stopPlay(CommonGbChannel commonGbChannel, IResourcePlayCallback callback); + + + /** + * 云台控制 + * @param commonGbChannel 通道 + * @param command 控制指令,允许值: left, right, up, down, upleft, upright, downleft, downright, zoomin, zoomout, stop + * @param horizonSpeed 水平速度 0-255 + * @param verticalSpeed 垂直速度 0-255 + * @param zoomSpeed 缩放速度 + * @return 结果 + */ + boolean ptzControl(CommonGbChannel commonGbChannel, String command, + Integer horizonSpeed, Integer verticalSpeed, Integer zoomSpeed); +} diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/CommonGbChannelServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/CommonGbChannelServiceImpl.java index 4f342c70..df390ac5 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/CommonGbChannelServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/CommonGbChannelServiceImpl.java @@ -8,6 +8,8 @@ import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel; import com.genersoft.iot.vmp.gb28181.bean.Gb28181CodeType; import com.genersoft.iot.vmp.gb28181.utils.SipUtils; import com.genersoft.iot.vmp.service.ICommonGbChannelService; +import com.genersoft.iot.vmp.service.IResourcePlayCallback; +import com.genersoft.iot.vmp.service.IResourceService; import com.genersoft.iot.vmp.service.bean.*; import com.genersoft.iot.vmp.storager.dao.CommonGbChannelMapper; import com.genersoft.iot.vmp.storager.dao.DeviceChannelMapper; @@ -57,6 +59,9 @@ public class CommonGbChannelServiceImpl implements ICommonGbChannelService { @Autowired private CivilCodeFileConf civilCodeFileConf; + @Autowired + private Map resourceServiceMap; + @Override public CommonGbChannel getChannel(String channelId) { @@ -634,12 +639,6 @@ public class CommonGbChannelServiceImpl implements ICommonGbChannelService { return new PageInfo<>(all); } - @Override - public String getRandomCode(Gb28181CodeType type) { - - return ""; - } - @Override public List getIndustryCodeList() { IndustryCodeTypeEnum[] values = IndustryCodeTypeEnum.values(); @@ -702,4 +701,20 @@ public class CommonGbChannelServiceImpl implements ICommonGbChannelService { public void updateChannelToRegion(UpdateCommonChannelToRegion params) { commonGbChannelMapper.updateChannelToRegion(params); } + + @Override + public void startPlay(CommonGbChannel channel, IResourcePlayCallback callback) { + IResourceService resourceService = resourceServiceMap.get(channel.getType()); + assert resourceService != null; + resourceService.startPlay(channel, callback); + } + + @Override + public void stopPlay(CommonGbChannel channel, IResourcePlayCallback callback) { + IResourceService resourceService = resourceServiceMap.get(channel.getType()); + assert resourceService != null; + resourceService.stopPlay(channel,callback); + } + + } diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceChannelServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceChannelServiceImpl.java index 601be2fb..5eb64f2f 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceChannelServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceChannelServiceImpl.java @@ -59,6 +59,7 @@ public class DeviceChannelServiceImpl implements IDeviceChannelService { @Autowired ICommonGbChannelService commonGbChannelService; + @Override public DeviceChannel updateGps(DeviceChannel deviceChannel, Device device) { if (deviceChannel.getLongitude()*deviceChannel.getLatitude() > 0) { @@ -97,7 +98,6 @@ public class DeviceChannelServiceImpl implements IDeviceChannelService { public void updateChannel(String deviceId, DeviceChannel channel) { String channelId = channel.getChannelId(); channel.setDeviceId(deviceId); -// StreamInfo streamInfo = redisCatchStorage.queryPlayByDevice(deviceId, channelId); InviteInfo inviteInfo = inviteStreamService.getInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, deviceId, channelId); if (inviteInfo != null && inviteInfo.getStreamInfo() != null) { channel.setStreamId(inviteInfo.getStreamInfo().getStream()); @@ -283,25 +283,24 @@ public class DeviceChannelServiceImpl implements IDeviceChannelService { if (CollectionUtils.isEmpty(deviceChannelList)) { return false; } - List allChannels = channelMapper.queryAllChannels(device.getDeviceId()); - Map allChannelMap = new ConcurrentHashMap<>(); - if (!allChannels.isEmpty()) { - allChannels.stream().forEach(deviceChannel -> { - allChannelMap.put(deviceChannel.getChannelId(), deviceChannel); - }); - } - // 数据去重 + Map allChannelMap = channelMapper.queryAllChannelsForMap(device.getDeviceId()); + + // 存储数据,方便对数据去重 List channels = new ArrayList<>(); - List updateChannels = new ArrayList<>(); + // 存储需要更新的数据 + List updateChannelsForInfo = new ArrayList<>(); + List updateChannelsForStatus = new ArrayList<>(); + // 存储需要需要新增的数据库 List addChannels = new ArrayList<>(); StringBuilder stringBuilder = new StringBuilder(); Map subContMap = new HashMap<>(); - // 数据去重 + Set gbIdSet = new HashSet<>(); for (DeviceChannel deviceChannel : deviceChannelList) { + // 数据去重 if (gbIdSet.contains(deviceChannel.getChannelId())) { stringBuilder.append(deviceChannel.getChannelId()).append(","); continue; @@ -311,17 +310,21 @@ public class DeviceChannelServiceImpl implements IDeviceChannelService { deviceChannel.setStreamId(allChannelMap.get(deviceChannel.getChannelId()).getStreamId()); deviceChannel.setHasAudio(allChannelMap.get(deviceChannel.getChannelId()).isHasAudio()); deviceChannel.setCommonGbChannelId(allChannelMap.get(deviceChannel.getChannelId()).getCommonGbChannelId()); - if (allChannelMap.get(deviceChannel.getChannelId()).isStatus() !=deviceChannel.isStatus()){ - List strings = platformChannelMapper.queryParentPlatformByChannelId(deviceChannel.getChannelId()); - if (!CollectionUtils.isEmpty(strings)){ - strings.forEach(platformId->{ - eventPublisher.catalogEventPublish(platformId, deviceChannel, deviceChannel.isStatus()? CatalogEvent.ON:CatalogEvent.OFF); - }); - } - - } + deviceChannel.setCommonGbChannelId(allChannelMap.get(deviceChannel.getChannelId()).getCommonGbChannelId()); deviceChannel.setUpdateTime(DateUtil.getNow()); - updateChannels.add(deviceChannel); + // 同步时发现状态变化 + if (allChannelMap.get(deviceChannel.getChannelId()).isStatus() !=deviceChannel.isStatus()){ + // TODO 应该通知给commonChannel + updateChannelsForStatus.add(deviceChannel); +// List strings = platformChannelMapper.queryParentPlatformByChannelId(deviceChannel.getChannelId()); +// if (!CollectionUtils.isEmpty(strings)){ +// strings.forEach(platformId->{ +// eventPublisher.catalogEventPublish(platformId, deviceChannel, deviceChannel.isStatus()? CatalogEvent.ON:CatalogEvent.OFF); +// }); +// } + }else { + updateChannelsForInfo.add(deviceChannel); + } }else { deviceChannel.setCreateTime(DateUtil.getNow()); deviceChannel.setUpdateTime(DateUtil.getNow()); @@ -385,17 +388,17 @@ public class DeviceChannelServiceImpl implements IDeviceChannelService { result = channelMapper.batchAdd(addChannels) < 0; } } - if (!updateChannels.isEmpty()) { - if (updateChannels.size() > limitCount) { - for (int i = 0; i < updateChannels.size(); i += limitCount) { + if (!updateChannelsForInfo.isEmpty()) { + if (updateChannelsForInfo.size() > limitCount) { + for (int i = 0; i < updateChannelsForInfo.size(); i += limitCount) { int toIndex = i + limitCount; - if (i + limitCount > updateChannels.size()) { - toIndex = updateChannels.size(); + if (i + limitCount > updateChannelsForInfo.size()) { + toIndex = updateChannelsForInfo.size(); } - result = result || channelMapper.batchUpdate(updateChannels.subList(i, toIndex)) < 0; + result = result || channelMapper.batchUpdate(updateChannelsForInfo.subList(i, toIndex)) < 0; } }else { - result = result || channelMapper.batchUpdate(updateChannels) < 0; + result = result || channelMapper.batchUpdate(updateChannelsForInfo) < 0; } } } diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java index 0f735f24..497a80e1 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java @@ -32,6 +32,7 @@ import com.genersoft.iot.vmp.service.bean.InviteErrorCode; import com.genersoft.iot.vmp.service.bean.SSRCInfo; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.IVideoManagerStorage; +import com.genersoft.iot.vmp.storager.dao.DeviceMapper; import com.genersoft.iot.vmp.utils.DateUtil; import com.genersoft.iot.vmp.vmanager.bean.ErrorCode; import gov.nist.javax.sip.message.SIPResponse; @@ -74,9 +75,6 @@ public class PlayServiceImpl implements IPlayService { @Autowired private IInviteStreamService inviteStreamService; - @Autowired - private DeferredResultHolder resultHolder; - @Autowired private ZLMRESTfulUtils zlmresTfulUtils; @@ -96,7 +94,7 @@ public class PlayServiceImpl implements IPlayService { private VideoStreamSessionManager streamSession; @Autowired - private IDeviceService deviceService; + private DeviceMapper deviceMapper; @Autowired private UserSetting userSetting; @@ -298,6 +296,29 @@ public class PlayServiceImpl implements IPlayService { } } + @Override + public void stop(String deviceId, String channelId) { + Device device = storager.queryVideoDevice(deviceId); + if (device == null) { + throw new ControllerException(ErrorCode.ERROR100.getCode(), "设备[" + deviceId + "]不存在"); + } + InviteInfo inviteInfo = inviteStreamService.getInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, deviceId, channelId); + if (inviteInfo == null) { + throw new ControllerException(ErrorCode.ERROR100.getCode(), "点播未找到"); + } + if (InviteSessionStatus.ok == inviteInfo.getStatus()) { + try { + logger.info("[停止点播] {}/{}", deviceId, channelId); + cmder.streamByeCmd(device, channelId, inviteInfo.getStream(), null, null); + } catch (InvalidArgumentException | SipException | ParseException | SsrcTransactionNotFoundException e) { + logger.error("[命令发送失败] 停止点播, 发送BYE: {}", e.getMessage()); + throw new ControllerException(ErrorCode.ERROR100.getCode(), "命令发送失败: " + e.getMessage()); + } + } + inviteStreamService.removeInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, deviceId, channelId); + storager.stopPlay(deviceId, channelId); + } + private void tcpActiveHandler(Device device, String channelId, String contentString, MediaServerItem mediaServerItem, String timeOutTaskKey, SSRCInfo ssrcInfo, ErrorCallback callback){ @@ -828,7 +849,7 @@ public class PlayServiceImpl implements IPlayService { if (allSsrc.size() > 0) { for (SsrcTransaction ssrcTransaction : allSsrc) { if (ssrcTransaction.getMediaServerId().equals(mediaServerId)) { - Device device = deviceService.getDevice(ssrcTransaction.getDeviceId()); + Device device = deviceMapper.getDeviceByDeviceId(ssrcTransaction.getDeviceId()); if (device == null) { continue; } @@ -944,7 +965,7 @@ public class PlayServiceImpl implements IPlayService { @Override public void getSnap(String deviceId, String channelId, String fileName, ErrorCallback errorCallback) { - Device device = deviceService.getDevice(deviceId); + Device device = deviceMapper.getDeviceByDeviceId(deviceId); if (device == null) { errorCallback.run(InviteErrorCode.ERROR_FOR_PARAMETER_ERROR.getCode(), InviteErrorCode.ERROR_FOR_PARAMETER_ERROR.getMsg(), null); return; diff --git a/src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceChannelMapper.java b/src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceChannelMapper.java index ef8b7726..05243b30 100755 --- a/src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceChannelMapper.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceChannelMapper.java @@ -10,6 +10,7 @@ import org.apache.ibatis.annotations.Param; import org.springframework.stereotype.Repository; import java.util.List; +import java.util.Map; /** * 用于存储设备通道信息 @@ -405,6 +406,9 @@ public interface DeviceChannelMapper { @Select("select * from wvp_device_channel where device_id = #{deviceId}") List queryAllChannels(String deviceId); + @MapKey("channelId") + @Select("select * from wvp_device_channel where device_id = #{deviceId}") + Map queryAllChannelsForMap(String deviceId); @Select("select channelId" + ", device_id" + @@ -473,4 +477,17 @@ public interface DeviceChannelMapper { " where wdc.device_id = #{deviceId}") int updateCommonChannelId(@Param("deviceId") String deviceId); + @Select(value = {" "}) + DeviceChannel getChannelByCommonChannelId(@Param("commonGbId") int commonGbId); + + @Update(" update wvp_device_channel " + + " set common_gb_channel_id = null " + + " where common_gb_channel_id = #{commonGbId}") + int removeCommonChannelId(@Param("commonGbId") int commonGbId); + + } diff --git a/src/main/java/com/genersoft/iot/vmp/vmanager/channel/CommonChannelController.java b/src/main/java/com/genersoft/iot/vmp/vmanager/channel/CommonChannelController.java index 3d503560..90859a39 100755 --- a/src/main/java/com/genersoft/iot/vmp/vmanager/channel/CommonChannelController.java +++ b/src/main/java/com/genersoft/iot/vmp/vmanager/channel/CommonChannelController.java @@ -1,11 +1,19 @@ package com.genersoft.iot.vmp.vmanager.channel; +import com.alibaba.fastjson2.JSONObject; import com.genersoft.iot.vmp.common.CommonGbChannel; +import com.genersoft.iot.vmp.common.InviteSessionType; +import com.genersoft.iot.vmp.common.StreamInfo; +import com.genersoft.iot.vmp.conf.UserSetting; +import com.genersoft.iot.vmp.conf.exception.ControllerException; +import com.genersoft.iot.vmp.gb28181.bean.Device; import com.genersoft.iot.vmp.gb28181.bean.Gb28181CodeType; +import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder; +import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage; +import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import com.genersoft.iot.vmp.service.ICommonGbChannelService; import com.genersoft.iot.vmp.service.bean.*; -import com.genersoft.iot.vmp.vmanager.bean.UpdateCommonChannelToGroup; -import com.genersoft.iot.vmp.vmanager.bean.UpdateCommonChannelToRegion; +import com.genersoft.iot.vmp.vmanager.bean.*; import com.github.pagehelper.PageInfo; import com.google.common.collect.Lists; import io.swagger.v3.oas.annotations.Operation; @@ -16,9 +24,14 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.*; +import org.springframework.web.context.request.async.DeferredResult; +import javax.servlet.http.HttpServletRequest; +import java.net.MalformedURLException; +import java.net.URL; import java.util.Arrays; import java.util.List; +import java.util.UUID; @Tag(name = "通用国标通道") @@ -31,23 +44,70 @@ public class CommonChannelController { @Autowired private ICommonGbChannelService commonGbChannelService; + @Autowired + private DeferredResultHolder resultHolder; - @Operation(summary = "查询区域下的通道") - @Parameter(name = "regionDeviceId", description = "区域的编号", required = true) - @Parameter(name = "query", description = "要搜索的内容", required = false) - @Parameter(name = "page", description = "当前页", required = true) - @Parameter(name = "count", description = "每页查询数量", required = true) - @GetMapping("/region/list") - public PageInfo getChannelsInRegion( - @RequestParam(required = true) String regionDeviceId, - @RequestParam(required = false) String query, - @RequestParam(required = true) int page, - @RequestParam(required = true) int count - ) { - return commonGbChannelService.getChannelsInRegion(regionDeviceId, query, page, count); + @Autowired + private UserSetting userSetting; + + + /** + * 从下级设备中同步通道 TODO 存疑 放在国标的接口里可能更合适 + * + * @param deviceId 设备编号 + */ + @GetMapping("/sync/device") + @Operation(summary = "从下级设备中同步通道") + @Parameter(name = "deviceId", description = "设备编号") + @Parameter(name = "syncKeys", description = "选择性同步的字段") + public boolean syncFromDevice(String deviceId, String[] syncKeys, + @RequestParam(required = false) Boolean syncGroup, + @RequestParam(required = false) Boolean syncRegion) { + return commonGbChannelService.syncChannelFromGb28181Device(deviceId, Lists.newArrayList(syncKeys), syncGroup, syncRegion); } - @Operation(summary = "查询分组下的通道") + + @Operation(summary = "更新通道信息") + @Parameter(name = "CommonGbChannel", description = "commonGbChannel", required = true) + @ResponseBody + @GetMapping("/update") + public void update( + @RequestParam(required = false) CommonGbChannel commonGbChannel + ){ + commonGbChannelService.update(commonGbChannel); + } + + /** + * TODO 存疑 可以单独创建一个controller + */ + @Operation(summary = "获取行业编码列表") + @ResponseBody + @GetMapping("/industry/list") + public List getIndustryCodeList(){ + return commonGbChannelService.getIndustryCodeList(); + } + + /** + * TODO 存疑 可以单独创建一个controller + */ + @Operation(summary = "获取编码列表") + @ResponseBody + @GetMapping("/type/list") + public List getDeviceTypeList(){ + return commonGbChannelService.getDeviceTypeList(); + } + + /** + * TODO 存疑 可以单独创建一个controller + */ + @Operation(summary = "获取编码列表") + @ResponseBody + @GetMapping("/network/identification/list") + public List getNetworkIdentificationTypeList(){ + return commonGbChannelService.getNetworkIdentificationTypeList(); + } + + @Operation(summary = "查询分组或区域下的通道") @Parameter(name = "groupDeviceId", description = "分组的编号", required = false) @Parameter(name = "regionDeviceId", description = "区域的编号", required = false) @Parameter(name = "query", description = "要搜索的内容", required = false) @@ -86,81 +146,21 @@ public class CommonChannelController { inGroup, inRegion, type); } - /** - * 从下级设备中同步通道 - * - * @param deviceId 设备编号 - */ - @GetMapping("/sync/device") - @Operation(summary = "从下级设备中同步通道") - @Parameter(name = "deviceId", description = "设备编号") - @Parameter(name = "syncKeys", description = "选择性同步的字段") - public boolean syncFromDevice(String deviceId, String[] syncKeys, - @RequestParam(required = false) Boolean syncGroup, - @RequestParam(required = false) Boolean syncRegion) { - return commonGbChannelService.syncChannelFromGb28181Device(deviceId, Lists.newArrayList(syncKeys), syncGroup, syncRegion); + @Operation(summary = "为区域添加分组") + @ResponseBody + @PostMapping("/region/update") + public void updateChannelToRegion(@RequestBody UpdateCommonChannelToRegion params){ + assert params.getCommonGbCivilCode() != null; + assert !params.getCommonGbIds().isEmpty(); + commonGbChannelService.updateChannelToRegion(params); } - -// @Operation(summary = "分页查询通道") -// @Parameter(name = "query", description = "要搜索的内容", required = false) -// @Parameter(name = "page", description = "当前页", required = true) -// @Parameter(name = "count", description = "每页查询数量", required = true) -// @ResponseBody -// @GetMapping("/list") -// public PageInfo queryChannelList( -// @RequestParam(required = false) String query, -// @RequestParam(required = true) int page, -// @RequestParam(required = true) int count ){ -// -// return commonGbChannelService.queryChannelList(query, page, count); -// } - - @Operation(summary = "更新通道") - @Parameter(name = "CommonGbChannel", description = "commonGbChannel", required = true) + @Operation(summary = "从区域中移除通道") @ResponseBody - @GetMapping("/update") - public void update( - @RequestParam(required = false) CommonGbChannel commonGbChannel - ){ - commonGbChannelService.update(commonGbChannel); - } - - @Operation(summary = "获取一个随机的可用国标编号") - @Parameter(name = "type", description = "类型: " + - "CIVIL_CODE_PROVINCE 省级编号 " + - "CIVIL_CODE_CIT 市级编号" + - "CIVIL_CODE_GRASS_ROOTS 区级编号" + - "CIVIL_CODE_GRASS_ROOTS 基层接入单位编号 " + - "BUSINESS_GROUP 业务分组 " + - "VIRTUAL_ORGANIZATION 虚拟组织 ", required = true) - @ResponseBody - @GetMapping("/code/random") - public String getRandomCode( - @RequestParam(required = true) Gb28181CodeType type - ){ - return commonGbChannelService.getRandomCode(type); - } - - @Operation(summary = "获取行业编码列表") - @ResponseBody - @GetMapping("/industry/list") - public List getIndustryCodeList(){ - return commonGbChannelService.getIndustryCodeList(); - } - - @Operation(summary = "获取编码列表") - @ResponseBody - @GetMapping("/type/list") - public List getDeviceTypeList(){ - return commonGbChannelService.getDeviceTypeList(); - } - - @Operation(summary = "获取编码列表") - @ResponseBody - @GetMapping("/network/identification/list") - public List getNetworkIdentificationTypeList(){ - return commonGbChannelService.getNetworkIdentificationTypeList(); + @PostMapping("/region/remove") + public void removeFromRegion(@RequestBody UpdateCommonChannelToRegion params){ + assert params.getCommonGbCivilCode() != null || !params.getCommonGbIds().isEmpty(); + commonGbChannelService.removeFromRegion(params); } @Operation(summary = "为通道添加分组") @@ -180,22 +180,96 @@ public class CommonChannelController { commonGbChannelService.removeFromGroup(params); } - @Operation(summary = "从区域中移除通道") + + @Operation(summary = "播放通道") + @Parameter(name = "channelDeviceId", description = "通道国标编号", required = true) @ResponseBody - @PostMapping("/region/remove") - public void removeFromRegion(@RequestBody UpdateCommonChannelToRegion params){ - assert params.getCommonGbCivilCode() != null || !params.getCommonGbIds().isEmpty(); - commonGbChannelService.removeFromRegion(params); + @PostMapping("/play") + public DeferredResult> play(HttpServletRequest request, String channelDeviceId){ + logger.info("[播放通道] channelDeviceId:{} ", channelDeviceId); + assert !ObjectUtils.isEmpty(channelDeviceId); + + CommonGbChannel channel = commonGbChannelService.getChannel(channelDeviceId); + assert channel != null; + + DeferredResult> result = new DeferredResult<>(userSetting.getPlayTimeout().longValue()); + + result.onTimeout(()->{ + logger.info("[播放通道] 超时 channelDeviceId:{} ", channelDeviceId); + // 释放rtpserver + WVPResult wvpResult = new WVPResult<>(); + wvpResult.setCode(ErrorCode.ERROR100.getCode()); + wvpResult.setMsg("播放通道超时"); + result.setResult(wvpResult); + commonGbChannelService.stopPlay(channel, null); + }); + commonGbChannelService.startPlay(channel, (callbackChannel, code, message, streamInfo) -> { + if (code == ErrorCode.SUCCESS.getCode()) { + WVPResult wvpResult = new WVPResult<>(); + wvpResult.setCode(ErrorCode.SUCCESS.getCode()); + wvpResult.setMsg(ErrorCode.SUCCESS.getMsg()); + + if (streamInfo != null) { + if (userSetting.getUseSourceIpAsStreamIp()) { + streamInfo = streamInfo.clone();//深拷贝 + String host; + try { + URL url = new URL(request.getRequestURL().toString()); + host = url.getHost(); + } catch (MalformedURLException e) { + host = request.getLocalAddr(); + } + streamInfo.channgeStreamIp(host); + } + wvpResult.setData(new StreamContent(streamInfo)); + } + }else { + WVPResult wvpResult = new WVPResult<>(); + wvpResult.setCode(code); + wvpResult.setMsg(message); + result.setResult(wvpResult); + commonGbChannelService.stopPlay(channel, null); + } + }); + return result; } - @Operation(summary = "为通道添加分组") - @ResponseBody - @PostMapping("/region/update") - public void updateChannelToRegion(@RequestBody UpdateCommonChannelToRegion params){ - assert params.getCommonGbCivilCode() != null; - assert !params.getCommonGbIds().isEmpty(); - commonGbChannelService.updateChannelToRegion(params); + + @Operation(summary = "停止播放通道") + @Parameter(name = "channelDeviceId", description = "通道国标编号", required = true) + @GetMapping("/stopPlay") + public void playStop(String channelDeviceId) { + + logger.info("[停止播放通道] channelDeviceId:{} ", channelDeviceId); + + assert !ObjectUtils.isEmpty(channelDeviceId); + + CommonGbChannel channel = commonGbChannelService.getChannel(channelDeviceId); + assert channel != null; + + DeferredResult> result = new DeferredResult<>(); + + result.onTimeout(()->{ + logger.info("[停止播放通道] 超时 channelDeviceId:{} ", channelDeviceId); + // 释放rtpserver + WVPResult wvpResult = new WVPResult<>(); + wvpResult.setCode(ErrorCode.ERROR100.getCode()); + wvpResult.setMsg("停止播放通道超时"); + result.setResult(wvpResult); + commonGbChannelService.stopPlay(channel, null); + }); + + commonGbChannelService.stopPlay(channel, (commonGbChannel, code, message, streamInfo) -> { + WVPResult wvpResult = new WVPResult(); + wvpResult.setCode(code); + wvpResult.setMsg(message); + result.setResult(wvpResult); + }); } + // 将通道共享到上级平台 + + // 从上级平台共享中移除通道 + } diff --git a/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/play/PlayController.java b/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/play/PlayController.java index 6db7e4b1..f433befc 100755 --- a/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/play/PlayController.java +++ b/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/play/PlayController.java @@ -164,27 +164,7 @@ public class PlayController { if (deviceId == null || channelId == null) { throw new ControllerException(ErrorCode.ERROR400); } - - Device device = storager.queryVideoDevice(deviceId); - if (device == null) { - throw new ControllerException(ErrorCode.ERROR100.getCode(), "设备[" + deviceId + "]不存在"); - } - - InviteInfo inviteInfo = inviteStreamService.getInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, deviceId, channelId); - if (inviteInfo == null) { - throw new ControllerException(ErrorCode.ERROR100.getCode(), "点播未找到"); - } - if (InviteSessionStatus.ok == inviteInfo.getStatus()) { - try { - logger.info("[停止点播] {}/{}", device.getDeviceId(), channelId); - cmder.streamByeCmd(device, channelId, inviteInfo.getStream(), null, null); - } catch (InvalidArgumentException | SipException | ParseException | SsrcTransactionNotFoundException e) { - logger.error("[命令发送失败] 停止点播, 发送BYE: {}", e.getMessage()); - throw new ControllerException(ErrorCode.ERROR100.getCode(), "命令发送失败: " + e.getMessage()); - } - } - inviteStreamService.removeInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, deviceId, channelId); - storager.stopPlay(deviceId, channelId); + playService.stop(deviceId, channelId); JSONObject json = new JSONObject(); json.put("deviceId", deviceId);