From e16ca179a6897b4dbec6b06ac5aa13d57c9c9a5f Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: Fri, 13 Dec 2024 17:28:38 +0800 Subject: [PATCH] =?UTF-8?q?=E6=96=B0=E5=A2=9ERPC=E5=BD=95=E5=83=8F?= =?UTF-8?q?=E5=9B=9E=E6=94=BE=E6=9A=82=E5=81=9C=E5=92=8C=E6=81=A2=E5=A4=8D?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../controller/PlaybackController.java | 5 +- .../service/IGbChannelPlayService.java | 5 +- .../gb28181/service/IGbChannelService.java | 1 + .../impl/GbChannelPlayServiceImpl.java | 18 +++++++ .../service/impl/GbChannelServiceImpl.java | 7 +-- .../gb28181/service/impl/PlayServiceImpl.java | 31 +++++++++--- .../redisMsg/IRedisRpcPlayService.java | 3 ++ .../RedisRpcChannelPlayController.java | 50 +++++++++++++++++++ .../service/RedisRpcPlayServiceImpl.java | 28 +++++++++++ 9 files changed, 135 insertions(+), 13 deletions(-) diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/controller/PlaybackController.java b/src/main/java/com/genersoft/iot/vmp/gb28181/controller/PlaybackController.java index 210b022f..dbc0b1a7 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/controller/PlaybackController.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/controller/PlaybackController.java @@ -41,6 +41,8 @@ import java.net.URL; import java.text.ParseException; import java.util.UUID; +import static sun.audio.AudioDevice.device; + /** * @author lin */ @@ -166,8 +168,7 @@ public class PlaybackController { @Parameter(name = "streamId", description = "回放流ID", required = true) @GetMapping("/pause/{streamId}") public void playPause(@PathVariable String streamId) { - log.info("playPause: "+streamId); - + log.info("[回放暂停] streamId: {}", streamId); try { playService.pauseRtp(streamId); } catch (ServiceException e) { diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/service/IGbChannelPlayService.java b/src/main/java/com/genersoft/iot/vmp/gb28181/service/IGbChannelPlayService.java index 61c0b8cb..6c5e0fcf 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/service/IGbChannelPlayService.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/service/IGbChannelPlayService.java @@ -5,7 +5,6 @@ import com.genersoft.iot.vmp.common.StreamInfo; import com.genersoft.iot.vmp.gb28181.bean.CommonGBChannel; import com.genersoft.iot.vmp.gb28181.bean.InviteMessageInfo; import com.genersoft.iot.vmp.gb28181.bean.Platform; -import com.genersoft.iot.vmp.gb28181.bean.RecordInfo; import com.genersoft.iot.vmp.service.bean.ErrorCallback; public interface IGbChannelPlayService { @@ -27,4 +26,8 @@ public interface IGbChannelPlayService { void playPush(CommonGBChannel channel, String platformDeviceId, String platformName, ErrorCallback callback); void stopPlayPush(CommonGBChannel channel); + + void pauseRtp(String streamId); + + void resumeRtp(String streamId); } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/service/IGbChannelService.java b/src/main/java/com/genersoft/iot/vmp/gb28181/service/IGbChannelService.java index 441b524f..e4706e61 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/service/IGbChannelService.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/service/IGbChannelService.java @@ -89,4 +89,5 @@ public interface IGbChannelService { PageInfo queryList(int page, int count, String query, Boolean online, Boolean hasRecordPlan, Integer channelType); void queryRecordInfo(CommonGBChannel channel, String startTime, String endTime, ErrorCallback callback); + } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/GbChannelPlayServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/GbChannelPlayServiceImpl.java index d01409a2..a9fe9dec 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/GbChannelPlayServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/GbChannelPlayServiceImpl.java @@ -2,6 +2,7 @@ package com.genersoft.iot.vmp.gb28181.service.impl; import com.genersoft.iot.vmp.common.InviteSessionType; import com.genersoft.iot.vmp.common.StreamInfo; +import com.genersoft.iot.vmp.conf.exception.ServiceException; import com.genersoft.iot.vmp.gb28181.bean.*; import com.genersoft.iot.vmp.gb28181.service.IGbChannelPlayService; import com.genersoft.iot.vmp.gb28181.service.IPlayService; @@ -13,7 +14,10 @@ import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; +import javax.sip.InvalidArgumentException; +import javax.sip.SipException; import javax.sip.message.Response; +import java.text.ParseException; @Slf4j @Service @@ -208,6 +212,20 @@ public class GbChannelPlayServiceImpl implements IGbChannelPlayService { } } + @Override + public void pauseRtp(String streamId) { + try { + deviceChannelPlayService.pauseRtp(streamId); + } catch (ServiceException | InvalidArgumentException | ParseException | SipException ignore) {} + } + + @Override + public void resumeRtp(String streamId) { + try { + deviceChannelPlayService.resumeRtp(streamId); + } catch (ServiceException | InvalidArgumentException | ParseException | SipException ignore) {} + } + private void downloadGbDeviceChannel(CommonGBChannel channel, Long startTime, Long stopTime, Integer downloadSpeed, ErrorCallback callback){ try { diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/GbChannelServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/GbChannelServiceImpl.java index 5fb01d4b..a0ef5251 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/GbChannelServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/GbChannelServiceImpl.java @@ -9,14 +9,12 @@ import com.genersoft.iot.vmp.gb28181.dao.RegionMapper; import com.genersoft.iot.vmp.gb28181.event.EventPublisher; import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent; import com.genersoft.iot.vmp.gb28181.service.IDeviceChannelService; -import com.genersoft.iot.vmp.gb28181.service.IDeviceService; import com.genersoft.iot.vmp.gb28181.service.IGbChannelService; import com.genersoft.iot.vmp.gb28181.service.IPlatformChannelService; import com.genersoft.iot.vmp.service.bean.ErrorCallback; import com.genersoft.iot.vmp.streamPush.bean.StreamPush; import com.genersoft.iot.vmp.utils.DateUtil; import com.genersoft.iot.vmp.vmanager.bean.ErrorCode; -import com.genersoft.iot.vmp.vmanager.bean.WVPResult; import com.github.pagehelper.PageHelper; import com.github.pagehelper.PageInfo; import lombok.extern.slf4j.Slf4j; @@ -27,7 +25,10 @@ import org.springframework.util.Assert; import org.springframework.util.ObjectUtils; import javax.sip.message.Response; -import java.util.*; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; @Slf4j @Service diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlayServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlayServiceImpl.java index 868b600c..265d96be 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlayServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlayServiceImpl.java @@ -1093,6 +1093,8 @@ public class PlayServiceImpl implements IPlayService { @Override public StreamInfo getDownLoadInfo(Device device, DeviceChannel channel, String stream) { + + InviteInfo inviteInfo = inviteStreamService.getInviteInfo(InviteSessionType.DOWNLOAD, channel.getId(), stream); if (inviteInfo == null) { String app = "rtp"; @@ -1369,11 +1371,20 @@ public class PlayServiceImpl implements IPlayService { @Override public void pauseRtp(String streamId) throws ServiceException, InvalidArgumentException, ParseException, SipException { + InviteInfo inviteInfo = inviteStreamService.getInviteInfoByStream(InviteSessionType.PLAYBACK, streamId); if (null == inviteInfo || inviteInfo.getStreamInfo() == null) { - log.warn("streamId不存在!"); - throw new ServiceException("streamId不存在"); + throw new ControllerException(ErrorCode.ERROR100.getCode(), "streamId不存在"); } + Device device = deviceService.getDeviceByDeviceId(inviteInfo.getDeviceId()); + if (device == null) { + throw new ControllerException(ErrorCode.ERROR100.getCode(), "设备不存在"); + } + if (!userSetting.getServerId().equals(device.getServerId())) { + redisRpcPlayService.pauseRtp(device.getServerId(), streamId); + return; + } + inviteInfo.getStreamInfo().setPause(true); inviteStreamService.updateInviteInfo(inviteInfo); MediaServer mediaServerItem = inviteInfo.getStreamInfo().getMediaServer(); @@ -1391,7 +1402,7 @@ public class PlayServiceImpl implements IPlayService { if (!result) { throw new ServiceException("暂停RTP接收失败"); } - Device device = deviceService.getDeviceByDeviceId(inviteInfo.getDeviceId()); + DeviceChannel channel = deviceChannelService.getOneById(inviteInfo.getChannelId()); cmder.playPauseCmd(device, channel, inviteInfo.getStreamInfo()); } @@ -1400,9 +1411,17 @@ public class PlayServiceImpl implements IPlayService { public void resumeRtp(String streamId) throws ServiceException, InvalidArgumentException, ParseException, SipException { InviteInfo inviteInfo = inviteStreamService.getInviteInfoByStream(InviteSessionType.PLAYBACK, streamId); if (null == inviteInfo || inviteInfo.getStreamInfo() == null) { - log.warn("streamId不存在!"); - throw new ServiceException("streamId不存在"); + throw new ControllerException(ErrorCode.ERROR100.getCode(), "streamId不存在"); } + Device device = deviceService.getDeviceByDeviceId(inviteInfo.getDeviceId()); + if (device == null) { + throw new ControllerException(ErrorCode.ERROR100.getCode(), "设备不存在"); + } + if (!userSetting.getServerId().equals(device.getServerId())) { + redisRpcPlayService.resumeRtp(device.getServerId(), streamId); + return; + } + inviteInfo.getStreamInfo().setPause(false); inviteStreamService.updateInviteInfo(inviteInfo); MediaServer mediaServerItem = inviteInfo.getStreamInfo().getMediaServer(); @@ -1410,7 +1429,6 @@ public class PlayServiceImpl implements IPlayService { log.warn("mediaServer 不存在!"); throw new ServiceException("mediaServer不存在"); } - // zlm 暂停RTP超时检查 // 使用zlm中的流ID String streamKey = inviteInfo.getStream(); if (!mediaServerItem.isRtpEnable()) { @@ -1420,7 +1438,6 @@ public class PlayServiceImpl implements IPlayService { if (!result) { throw new ServiceException("继续RTP接收失败"); } - Device device = deviceService.getDeviceByDeviceId(inviteInfo.getDeviceId()); DeviceChannel channel = deviceChannelService.getOneById(inviteInfo.getChannelId()); cmder.playResumeCmd(device, channel, inviteInfo.getStreamInfo()); } diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/IRedisRpcPlayService.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/IRedisRpcPlayService.java index 11f80ee8..9d0376a4 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/IRedisRpcPlayService.java +++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/IRedisRpcPlayService.java @@ -18,4 +18,7 @@ public interface IRedisRpcPlayService { void queryRecordInfo(String serverId, Integer channelId, String startTime, String endTime, ErrorCallback callback); + void pauseRtp(String serverId, String streamId); + + void resumeRtp(String serverId, String streamId); } diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcChannelPlayController.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcChannelPlayController.java index a521b6da..815f16d1 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcChannelPlayController.java +++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcChannelPlayController.java @@ -131,6 +131,56 @@ public class RedisRpcChannelPlayController extends RpcController { return null; } + /** + * 暂停录像回放 + */ + @RedisRpcMapping("pauseRtp") + public RedisRpcResponse pauseRtp(RedisRpcRequest request) { + String streamId = request.getParam().toString(); + RedisRpcResponse response = request.getResponse(); + + if (streamId == null) { + response.setStatusCode(ErrorCode.ERROR400.getCode()); + response.setBody("param error"); + return response; + } + + try { + channelPlayService.pauseRtp(streamId); + response.setStatusCode(ErrorCode.SUCCESS.getCode()); + }catch (ControllerException e) { + response.setStatusCode(ErrorCode.ERROR100.getCode()); + response.setBody(e.getMessage()); + } + + return response; + } + + /** + * 恢复录像回放 + */ + @RedisRpcMapping("resumeRtp") + public RedisRpcResponse resumeRtp(RedisRpcRequest request) { + String streamId = request.getParam().toString(); + RedisRpcResponse response = request.getResponse(); + + if (streamId == null) { + response.setStatusCode(ErrorCode.ERROR400.getCode()); + response.setBody("param error"); + return response; + } + + try { + channelPlayService.resumeRtp(streamId); + response.setStatusCode(ErrorCode.SUCCESS.getCode()); + }catch (ControllerException e) { + response.setStatusCode(ErrorCode.ERROR100.getCode()); + response.setBody(e.getMessage()); + } + + return response; + } + /** * 停止点播国标设备 diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/service/RedisRpcPlayServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/service/RedisRpcPlayServiceImpl.java index e82b5323..e074865d 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/service/RedisRpcPlayServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/service/RedisRpcPlayServiceImpl.java @@ -118,6 +118,34 @@ public class RedisRpcPlayServiceImpl implements IRedisRpcPlayService { } } + @Override + public void pauseRtp(String serverId, String streamId) { + RedisRpcRequest request = buildRequest("channel/pauseRtp", streamId); + request.setToId(serverId); + RedisRpcResponse response = redisRpcConfig.request(request, 5, TimeUnit.SECONDS); + if (response == null) { + log.info("[RPC 暂停回放] 失败, streamId: {}", streamId); + }else { + if (response.getStatusCode() != ErrorCode.SUCCESS.getCode()) { + log.info("[RPC 暂停回放] 失败, {}, streamId: {}", response.getBody(), streamId); + } + } + } + + @Override + public void resumeRtp(String serverId, String streamId) { + RedisRpcRequest request = buildRequest("channel/resumeRtp", streamId); + request.setToId(serverId); + RedisRpcResponse response = redisRpcConfig.request(request, 5, TimeUnit.SECONDS); + if (response == null) { + log.info("[RPC 恢复回放] 失败, streamId: {}", streamId); + }else { + if (response.getStatusCode() != ErrorCode.SUCCESS.getCode()) { + log.info("[RPC 恢复回放] 失败, {}, streamId: {}", response.getBody(), streamId); + } + } + } + @Override public void download(String serverId, Integer channelId, String startTime, String endTime, int downloadSpeed, ErrorCallback callback) {