From 93afd46d0f0b1d7d8b3981492252febbcf8e91c4 Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: Thu, 12 Dec 2024 17:46:51 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A2=9E=E5=8A=A0RPC=E5=BD=95=E5=83=8F?= =?UTF-8?q?=E5=9B=9E=E6=94=BE=E5=92=8C=E5=BD=95=E5=83=8F=E4=B8=8B=E8=BD=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- ...InviteInfo.java => InviteMessageInfo.java} | 2 +- .../service/IGbChannelPlayService.java | 4 +- .../impl/GbChannelPlayServiceImpl.java | 4 +- .../gb28181/service/impl/PlayServiceImpl.java | 24 +++-- .../request/impl/InviteRequestProcessor.java | 10 +- .../redisMsg/IRedisRpcPlayService.java | 4 + .../RedisRpcChannelPlayController.java | 93 ++++++++++++++++++- .../service/RedisRpcPlayServiceImpl.java | 45 +++++++++ 8 files changed, 167 insertions(+), 19 deletions(-) rename src/main/java/com/genersoft/iot/vmp/gb28181/bean/{InviteInfo.java => InviteMessageInfo.java} (93%) diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/InviteInfo.java b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/InviteMessageInfo.java similarity index 93% rename from src/main/java/com/genersoft/iot/vmp/gb28181/bean/InviteInfo.java rename to src/main/java/com/genersoft/iot/vmp/gb28181/bean/InviteMessageInfo.java index 57e83bd1..beadb690 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/InviteInfo.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/InviteMessageInfo.java @@ -4,7 +4,7 @@ import lombok.Data; // 从INVITE消息中解析需要的信息 @Data -public class InviteInfo { +public class InviteMessageInfo { private String requesterId; private String targetChannelId; private String sourceChannelId; diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/service/IGbChannelPlayService.java b/src/main/java/com/genersoft/iot/vmp/gb28181/service/IGbChannelPlayService.java index 7658f635..967661a8 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/service/IGbChannelPlayService.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/service/IGbChannelPlayService.java @@ -3,13 +3,13 @@ package com.genersoft.iot.vmp.gb28181.service; import com.genersoft.iot.vmp.common.InviteSessionType; import com.genersoft.iot.vmp.common.StreamInfo; import com.genersoft.iot.vmp.gb28181.bean.CommonGBChannel; -import com.genersoft.iot.vmp.gb28181.bean.InviteInfo; +import com.genersoft.iot.vmp.gb28181.bean.InviteMessageInfo; import com.genersoft.iot.vmp.gb28181.bean.Platform; import com.genersoft.iot.vmp.service.bean.ErrorCallback; public interface IGbChannelPlayService { - void start(CommonGBChannel channel, InviteInfo inviteInfo, Platform platform, ErrorCallback callback); + void start(CommonGBChannel channel, InviteMessageInfo inviteInfo, Platform platform, ErrorCallback callback); void stopPlay(InviteSessionType type, CommonGBChannel channel, String stream); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/GbChannelPlayServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/GbChannelPlayServiceImpl.java index 618b72a1..c1c009d3 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/GbChannelPlayServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/GbChannelPlayServiceImpl.java @@ -3,7 +3,7 @@ package com.genersoft.iot.vmp.gb28181.service.impl; import com.genersoft.iot.vmp.common.InviteSessionType; import com.genersoft.iot.vmp.common.StreamInfo; import com.genersoft.iot.vmp.gb28181.bean.CommonGBChannel; -import com.genersoft.iot.vmp.gb28181.bean.InviteInfo; +import com.genersoft.iot.vmp.gb28181.bean.InviteMessageInfo; import com.genersoft.iot.vmp.gb28181.bean.Platform; import com.genersoft.iot.vmp.gb28181.bean.PlayException; import com.genersoft.iot.vmp.gb28181.service.IGbChannelPlayService; @@ -33,7 +33,7 @@ public class GbChannelPlayServiceImpl implements IGbChannelPlayService { @Override - public void start(CommonGBChannel channel, InviteInfo inviteInfo, Platform platform, ErrorCallback callback) { + public void start(CommonGBChannel channel, InviteMessageInfo inviteInfo, Platform platform, ErrorCallback callback) { if (channel == null || inviteInfo == null || callback == null) { log.warn("[通用通道点播] 参数异常, channel: {}, inviteInfo: {}, callback: {}", channel != null, inviteInfo != null, callback != null); throw new PlayException(Response.SERVER_INTERNAL_ERROR, "server internal error"); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlayServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlayServiceImpl.java index 70bd6d81..868b600c 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlayServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlayServiceImpl.java @@ -295,14 +295,14 @@ public class PlayServiceImpl implements IPlayService { // 判断设备是否属于当前平台, 如果不属于则发起自动调用 if (!userSetting.getServerId().equals(device.getServerId())) { redisRpcPlayService.play(device.getServerId(), channel.getId(), callback); - }else { - MediaServer mediaServerItem = getNewMediaServerItem(device); - if (mediaServerItem == null) { - log.warn("[点播] 未找到可用的zlm deviceId: {},channelId:{}", device.getDeviceId(), channel.getDeviceId()); - throw new ControllerException(ErrorCode.ERROR100.getCode(), "未找到可用的zlm"); - } - play(mediaServerItem, device, channel, null, callback); + return; } + MediaServer mediaServerItem = getNewMediaServerItem(device); + if (mediaServerItem == null) { + log.warn("[点播] 未找到可用的zlm deviceId: {},channelId:{}", device.getDeviceId(), channel.getDeviceId()); + throw new ControllerException(ErrorCode.ERROR100.getCode(), "未找到可用的zlm"); + } + play(mediaServerItem, device, channel, null, callback); } @Override @@ -746,6 +746,11 @@ public class PlayServiceImpl implements IPlayService { if (channel == null) { throw new ControllerException(ErrorCode.ERROR100.getCode(), "通道不存在"); } + if (!userSetting.getServerId().equals(device.getServerId())) { + redisRpcPlayService.playback(device.getServerId(), channel.getId(), startTime, endTime, callback); + return; + } + MediaServer newMediaServerItem = getNewMediaServerItem(device); if (newMediaServerItem == null) { throw new ControllerException(ErrorCode.ERROR100.getCode(), "未找到可用的节点"); @@ -954,6 +959,11 @@ public class PlayServiceImpl implements IPlayService { @Override public void download(Device device, DeviceChannel channel, String startTime, String endTime, int downloadSpeed, ErrorCallback callback) { + if (!userSetting.getServerId().equals(device.getServerId())) { + redisRpcPlayService.download(device.getServerId(), channel.getId(), startTime, endTime, downloadSpeed, callback); + return; + } + MediaServer newMediaServerItem = this.getNewMediaServerItem(device); if (newMediaServerItem == null) { callback.run(InviteErrorCode.ERROR_FOR_ASSIST_NOT_READY.getCode(), diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java index 6459841e..aed5d873 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java @@ -121,7 +121,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements SIPRequest request = (SIPRequest)evt.getRequest(); try { - InviteInfo inviteInfo = decode(evt); + InviteMessageInfo inviteInfo = decode(evt); // 查询请求是否来自上级平台\设备 Platform platform = platformService.queryPlatformByServerGBId(inviteInfo.getRequesterId()); @@ -247,9 +247,9 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements } } - private InviteInfo decode(RequestEvent evt) throws SdpException { + private InviteMessageInfo decode(RequestEvent evt) throws SdpException { - InviteInfo inviteInfo = new InviteInfo(); + InviteMessageInfo inviteInfo = new InviteMessageInfo(); SIPRequest request = (SIPRequest)evt.getRequest(); String[] channelIdArrayFromSub = SipUtils.getChannelIdFromRequest(request); @@ -349,7 +349,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements } - private String createSendSdp(SendRtpInfo sendRtpItem, InviteInfo inviteInfo, String sdpIp) { + private String createSendSdp(SendRtpInfo sendRtpItem, InviteMessageInfo inviteInfo, String sdpIp) { StringBuilder content = new StringBuilder(200); content.append("v=0\r\n"); content.append("o=" + inviteInfo.getTargetChannelId() + " 0 0 IN IP4 " + sdpIp + "\r\n"); @@ -393,7 +393,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements } } - public void inviteFromDeviceHandle(SIPRequest request, InviteInfo inviteInfo) { + public void inviteFromDeviceHandle(SIPRequest request, InviteMessageInfo inviteInfo) { if (inviteInfo.getSourceChannelId() == null) { log.warn("来自设备的Invite请求,无法从请求信息中确定请求来自的通道,已忽略,requesterId: {}", inviteInfo.getRequesterId()); diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/IRedisRpcPlayService.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/IRedisRpcPlayService.java index 42957aa5..4c1bc066 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/IRedisRpcPlayService.java +++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/IRedisRpcPlayService.java @@ -10,4 +10,8 @@ public interface IRedisRpcPlayService { void play(String serverId, Integer channelId, ErrorCallback callback); void stop(String serverId, InviteSessionType type, int channelId, String stream); + + void playback(String serverId, Integer channelId, String startTime, String endTime, ErrorCallback callback); + + void download(String serverId, Integer id, String startTime, String endTime, int downloadSpeed, ErrorCallback callback); } diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcChannelPlayController.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcChannelPlayController.java index 587f2198..f28aa6f1 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcChannelPlayController.java +++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcChannelPlayController.java @@ -1,6 +1,7 @@ package com.genersoft.iot.vmp.service.redisMsg.control; import com.alibaba.fastjson2.JSONObject; +import com.genersoft.iot.vmp.common.InviteInfo; import com.genersoft.iot.vmp.common.InviteSessionType; import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.conf.redis.RedisRpcConfig; @@ -8,12 +9,15 @@ import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcMessage; import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcRequest; import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcResponse; import com.genersoft.iot.vmp.gb28181.bean.CommonGBChannel; +import com.genersoft.iot.vmp.gb28181.bean.InviteMessageInfo; import com.genersoft.iot.vmp.gb28181.service.IGbChannelPlayService; import com.genersoft.iot.vmp.gb28181.service.IGbChannelService; import com.genersoft.iot.vmp.service.bean.InviteErrorCode; import com.genersoft.iot.vmp.service.redisMsg.dto.RedisRpcController; import com.genersoft.iot.vmp.service.redisMsg.dto.RedisRpcMapping; import com.genersoft.iot.vmp.service.redisMsg.dto.RpcController; +import com.genersoft.iot.vmp.utils.DateUtil; +import lombok.Data; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.RedisTemplate; @@ -68,7 +72,9 @@ public class RedisRpcChannelPlayController extends RpcController { return response; } - channelPlayService.play(channel, null, (code, msg, data) ->{ + InviteMessageInfo inviteInfo = new InviteMessageInfo(); + inviteInfo.setSessionName("Play"); + channelPlayService.start(channel, inviteInfo, null, (code, msg, data) ->{ if (code == InviteErrorCode.SUCCESS.getCode()) { response.setStatusCode(Response.OK); response.setBody(data); @@ -87,7 +93,6 @@ public class RedisRpcChannelPlayController extends RpcController { */ @RedisRpcMapping("stop") public RedisRpcResponse stop(RedisRpcRequest request) { - System.out.println(request.getParam().toString()); JSONObject jsonObject = JSONObject.parseObject(request.getParam().toString()); RedisRpcResponse response = request.getResponse(); @@ -119,4 +124,88 @@ public class RedisRpcChannelPlayController extends RpcController { return response; } + /** + * 录像回放国标设备 + */ + @RedisRpcMapping("playback") + public RedisRpcResponse playbackChannel(RedisRpcRequest request) { + JSONObject paramJson = JSONObject.parseObject(request.getParam().toString()); + int channelId = paramJson.getIntValue("channelId"); + String startTime = paramJson.getString("startTime"); + String endTime = paramJson.getString("endTime"); + RedisRpcResponse response = request.getResponse(); + + if (channelId <= 0) { + response.setStatusCode(Response.BAD_REQUEST); + response.setBody("param error"); + return response; + } + // 获取对应的设备和通道信息 + CommonGBChannel channel = channelService.getOne(channelId); + if (channel == null) { + response.setStatusCode(Response.BAD_REQUEST); + response.setBody("param error"); + return response; + } + + InviteMessageInfo inviteInfo = new InviteMessageInfo(); + inviteInfo.setSessionName("Playback"); + inviteInfo.setStartTime(DateUtil.yyyy_MM_dd_HH_mm_ssToTimestamp(startTime)); + inviteInfo.setStopTime(DateUtil.yyyy_MM_dd_HH_mm_ssToTimestamp(endTime)); + channelPlayService.start(channel, inviteInfo, null, (code, msg, data) ->{ + if (code == InviteErrorCode.SUCCESS.getCode()) { + response.setStatusCode(Response.OK); + response.setBody(data); + }else { + response.setStatusCode(code); + } + // 手动发送结果 + sendResponse(response); + }); + return null; + } + + /** + * 录像回放国标设备 + */ + @RedisRpcMapping("download") + public RedisRpcResponse downloadChannel(RedisRpcRequest request) { + JSONObject paramJson = JSONObject.parseObject(request.getParam().toString()); + int channelId = paramJson.getIntValue("channelId"); + String startTime = paramJson.getString("startTime"); + String endTime = paramJson.getString("endTime"); + int downloadSpeed = paramJson.getIntValue("downloadSpeed"); + RedisRpcResponse response = request.getResponse(); + + if (channelId <= 0) { + response.setStatusCode(Response.BAD_REQUEST); + response.setBody("param error"); + return response; + } + // 获取对应的设备和通道信息 + CommonGBChannel channel = channelService.getOne(channelId); + if (channel == null) { + response.setStatusCode(Response.BAD_REQUEST); + response.setBody("param error"); + return response; + } + + InviteMessageInfo inviteInfo = new InviteMessageInfo(); + inviteInfo.setSessionName("Download"); + inviteInfo.setStartTime(DateUtil.yyyy_MM_dd_HH_mm_ssToTimestamp(startTime)); + inviteInfo.setStopTime(DateUtil.yyyy_MM_dd_HH_mm_ssToTimestamp(endTime)); + inviteInfo.setDownloadSpeed(downloadSpeed + ""); + channelPlayService.start(channel, inviteInfo, null, (code, msg, data) ->{ + if (code == InviteErrorCode.SUCCESS.getCode()) { + response.setStatusCode(Response.OK); + response.setBody(data); + }else { + response.setStatusCode(code); + } + // 手动发送结果 + sendResponse(response); + }); + return null; + } + } diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/service/RedisRpcPlayServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/service/RedisRpcPlayServiceImpl.java index e6f8a040..623aae13 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/service/RedisRpcPlayServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/service/RedisRpcPlayServiceImpl.java @@ -74,5 +74,50 @@ public class RedisRpcPlayServiceImpl implements IRedisRpcPlayService { } } } + + @Override + public void playback(String serverId, Integer channelId, String startTime, String endTime, ErrorCallback callback) { + + JSONObject jsonObject = new JSONObject(); + jsonObject.put("channelId", channelId); + jsonObject.put("startTime", startTime); + jsonObject.put("endTime", endTime); + RedisRpcRequest request = buildRequest("channel/playback", jsonObject.toString()); + request.setToId(serverId); + RedisRpcResponse response = redisRpcConfig.request(request, userSetting.getPlayTimeout(), TimeUnit.MILLISECONDS); + if (response == null) { + callback.run(ErrorCode.ERROR100.getCode(), ErrorCode.ERROR100.getMsg(), null); + }else { + if (response.getStatusCode() == Response.OK) { + StreamInfo streamInfo = JSON.parseObject(response.getBody().toString(), StreamInfo.class); + callback.run(InviteErrorCode.SUCCESS.getCode(), InviteErrorCode.SUCCESS.getMsg(), streamInfo); + }else { + callback.run(response.getStatusCode(), response.getBody().toString(), null); + } + } + } + + @Override + public void download(String serverId, Integer channelId, String startTime, String endTime, int downloadSpeed, ErrorCallback callback) { + + JSONObject jsonObject = new JSONObject(); + jsonObject.put("channelId", channelId); + jsonObject.put("startTime", startTime); + jsonObject.put("endTime", endTime); + jsonObject.put("downloadSpeed", downloadSpeed); + RedisRpcRequest request = buildRequest("channel/download", jsonObject.toString()); + request.setToId(serverId); + RedisRpcResponse response = redisRpcConfig.request(request, userSetting.getPlayTimeout(), TimeUnit.MILLISECONDS); + if (response == null) { + callback.run(ErrorCode.ERROR100.getCode(), ErrorCode.ERROR100.getMsg(), null); + }else { + if (response.getStatusCode() == Response.OK) { + StreamInfo streamInfo = JSON.parseObject(response.getBody().toString(), StreamInfo.class); + callback.run(InviteErrorCode.SUCCESS.getCode(), InviteErrorCode.SUCCESS.getMsg(), streamInfo); + }else { + callback.run(response.getStatusCode(), response.getBody().toString(), null); + } + } + } }