From 3afdfff5b29df492a4913d02e49999c0646193c7 Mon Sep 17 00:00:00 2001
From: 648540858 <648540858@qq.com>
Date: Wed, 11 Dec 2024 16:59:48 +0800
Subject: [PATCH] =?UTF-8?q?=E6=94=AF=E6=8C=81=E7=82=B9=E6=92=AD=E5=90=8C?=
 =?UTF-8?q?=E4=B8=80=E4=B8=AAredis=E4=B8=8B=E7=9A=84=E5=85=B6=E4=BB=96wvp?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit

---
 .../gb28181/controller/PlayController.java    | 27 +++++-----
 .../service/IGbChannelRpcPlayService.java     |  8 +++
 .../iot/vmp/gb28181/service/IPlayService.java |  3 ++
 .../gb28181/service/impl/PlayRpcService.java  | 54 +++++++++++++++++++
 .../gb28181/service/impl/PlayServiceImpl.java | 11 +++-
 .../redisMsg/control/RedisRpcController.java  | 47 +++++++++++++++-
 6 files changed, 136 insertions(+), 14 deletions(-)
 create mode 100644 src/main/java/com/genersoft/iot/vmp/gb28181/service/IGbChannelRpcPlayService.java
 create mode 100644 src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlayRpcService.java

diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/controller/PlayController.java b/src/main/java/com/genersoft/iot/vmp/gb28181/controller/PlayController.java
index 003478f90..d9f7e7317 100755
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/controller/PlayController.java
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/controller/PlayController.java
@@ -10,15 +10,13 @@ import com.genersoft.iot.vmp.conf.security.JwtUtils;
 import com.genersoft.iot.vmp.gb28181.bean.Device;
 import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel;
 import com.genersoft.iot.vmp.gb28181.bean.SsrcTransaction;
-import com.genersoft.iot.vmp.gb28181.service.IDeviceChannelService;
-import com.genersoft.iot.vmp.gb28181.service.IDeviceService;
-import com.genersoft.iot.vmp.gb28181.service.IInviteStreamService;
-import com.genersoft.iot.vmp.gb28181.service.IPlayService;
+import com.genersoft.iot.vmp.gb28181.service.*;
 import com.genersoft.iot.vmp.gb28181.session.SipInviteSessionManager;
 import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder;
 import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage;
 import com.genersoft.iot.vmp.media.bean.MediaServer;
 import com.genersoft.iot.vmp.media.service.IMediaServerService;
+import com.genersoft.iot.vmp.service.bean.ErrorCallback;
 import com.genersoft.iot.vmp.service.bean.InviteErrorCode;
 import com.genersoft.iot.vmp.utils.DateUtil;
 import com.genersoft.iot.vmp.vmanager.bean.AudioBroadcastResult;
@@ -64,6 +62,9 @@ public class PlayController {
 	@Autowired
 	private IPlayService playService;
 
+	@Autowired
+	private IGbChannelRpcPlayService playRpcService;
+
 	@Autowired
 	private IMediaServerService mediaServerService;
 
@@ -88,14 +89,10 @@ public class PlayController {
 		Assert.notNull(channelId, "通道国标编号不可为NULL");
 		// 获取可用的zlm
 		Device device = deviceService.getDeviceByDeviceId(deviceId);
-
 		Assert.notNull(deviceId, "设备不存在");
 		DeviceChannel channel = deviceChannelService.getOne(deviceId, channelId);
 		Assert.notNull(channel, "通道不存在");
 
-
-		MediaServer newMediaServerItem = playService.getNewMediaServerItem(device);
-
 		RequestMessage requestMessage = new RequestMessage();
 		String key = DeferredResultHolder.CALLBACK_CMD_PLAY + deviceId + channelId;
 		requestMessage.setKey(key);
@@ -118,7 +115,7 @@ public class PlayController {
 		// 录像查询以channelId作为deviceId查询
 		resultHolder.put(key, uuid, result);
 
-		playService.play(newMediaServerItem, deviceId, channelId, null, (code, msg, streamInfo) -> {
+		ErrorCallback<StreamInfo> callback  = (code, msg, streamInfo) -> {
 			WVPResult<StreamContent> wvpResult = new WVPResult<>();
 			if (code == InviteErrorCode.SUCCESS.getCode()) {
 				wvpResult.setCode(ErrorCode.SUCCESS.getCode());
@@ -136,8 +133,8 @@ public class PlayController {
 						}
 						streamInfo.channgeStreamIp(host);
 					}
-					if (!ObjectUtils.isEmpty(newMediaServerItem.getTranscodeSuffix()) && !"null".equalsIgnoreCase(newMediaServerItem.getTranscodeSuffix())) {
-						streamInfo.setStream(streamInfo.getStream() + "_" + newMediaServerItem.getTranscodeSuffix());
+					if (!ObjectUtils.isEmpty(streamInfo.getMediaServer().getTranscodeSuffix()) && !"null".equalsIgnoreCase(streamInfo.getMediaServer().getTranscodeSuffix())) {
+						streamInfo.setStream(streamInfo.getStream() + "_" + streamInfo.getMediaServer().getTranscodeSuffix());
 					}
 					wvpResult.setData(new StreamContent(streamInfo));
 				}else {
@@ -151,7 +148,13 @@ public class PlayController {
 			requestMessage.setData(wvpResult);
 			// 此处必须释放所有请求
 			resultHolder.invokeAllResult(requestMessage);
-		});
+		};
+		// 判断设备是否属于当前平台, 如果不属于则发起自动调用
+		if (userSetting.getServerId().equals(device.getServerId())) {
+			playRpcService.play(device.getServerId(), channel.getId(), callback);
+		}else {
+			playService.play(device, channel, callback);
+		}
 		return result;
 	}
 
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/service/IGbChannelRpcPlayService.java b/src/main/java/com/genersoft/iot/vmp/gb28181/service/IGbChannelRpcPlayService.java
new file mode 100644
index 000000000..1847e4307
--- /dev/null
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/service/IGbChannelRpcPlayService.java
@@ -0,0 +1,8 @@
+package com.genersoft.iot.vmp.gb28181.service;
+
+import com.genersoft.iot.vmp.common.StreamInfo;
+import com.genersoft.iot.vmp.service.bean.ErrorCallback;
+
+public interface IGbChannelRpcPlayService {
+    void play(String serverId, Integer channelId, ErrorCallback<StreamInfo> callback);
+}
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/service/IPlayService.java b/src/main/java/com/genersoft/iot/vmp/gb28181/service/IPlayService.java
index aa2ca3ab6..4a1403a04 100755
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/service/IPlayService.java
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/service/IPlayService.java
@@ -25,6 +25,8 @@ public interface IPlayService {
 
     SSRCInfo play(MediaServer mediaServerItem, String deviceId, String channelId, String ssrc, ErrorCallback<StreamInfo> callback);
 
+    void play(Device device, DeviceChannel channel, ErrorCallback<StreamInfo> callback);
+
     StreamInfo onPublishHandlerForPlay(MediaServer mediaServerItem, MediaInfo mediaInfo, Device device, DeviceChannel channel);
 
     MediaServer getNewMediaServerItem(Device device);
@@ -70,4 +72,5 @@ public interface IPlayService {
 
     void download(CommonGBChannel channel, Long startTime, Long stopTime, Integer downloadSpeed, ErrorCallback<StreamInfo> callback);
 
+
 }
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlayRpcService.java b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlayRpcService.java
new file mode 100644
index 000000000..96eb3ced6
--- /dev/null
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlayRpcService.java
@@ -0,0 +1,54 @@
+package com.genersoft.iot.vmp.gb28181.service.impl;
+
+import com.alibaba.fastjson2.JSON;
+import com.genersoft.iot.vmp.common.StreamInfo;
+import com.genersoft.iot.vmp.conf.UserSetting;
+import com.genersoft.iot.vmp.conf.redis.RedisRpcConfig;
+import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcRequest;
+import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcResponse;
+import com.genersoft.iot.vmp.gb28181.service.IGbChannelRpcPlayService;
+import com.genersoft.iot.vmp.service.bean.ErrorCallback;
+import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+import javax.sip.message.Response;
+
+@Slf4j
+@Service("playRpcService")
+public class PlayRpcService implements IGbChannelRpcPlayService {
+
+    @Autowired
+    private RedisRpcConfig redisRpcConfig;
+
+    @Autowired
+    private UserSetting userSetting;
+
+
+    private RedisRpcRequest buildRequest(String uri, Object param) {
+        RedisRpcRequest request = new RedisRpcRequest();
+        request.setFromId(userSetting.getServerId());
+        request.setParam(param);
+        request.setUri(uri);
+        return request;
+    }
+
+    @Override
+    public void play(String serverId, Integer channelId, ErrorCallback<StreamInfo> callback) {
+        log.info("[点播其他WVP的设备] 通道Id:{}", channelId);
+        RedisRpcRequest request = buildRequest("playChannel", channelId);
+        request.setToId(serverId);
+        RedisRpcResponse response = redisRpcConfig.request(request, userSetting.getPlayTimeout());
+        if (response == null) {
+            callback.run(ErrorCode.ERROR100.getCode(), ErrorCode.ERROR100.getMsg(), null);
+        }else {
+            if (response.getStatusCode() == Response.OK) {
+                StreamInfo streamInfo = JSON.parseObject(response.getBody().toString(), StreamInfo.class);
+                callback.run(response.getStatusCode(), "success", streamInfo);
+            }else {
+                callback.run(response.getStatusCode(), response.getBody().toString(), null);
+            }
+        }
+    }
+}
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlayServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlayServiceImpl.java
index 2c9b777b8..8b3d9f067 100755
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlayServiceImpl.java
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlayServiceImpl.java
@@ -63,7 +63,7 @@ import java.util.Vector;
 
 @SuppressWarnings(value = {"rawtypes", "unchecked"})
 @Slf4j
-@Service
+@Service("playService")
 public class PlayServiceImpl implements IPlayService {
 
     @Autowired
@@ -285,6 +285,15 @@ public class PlayServiceImpl implements IPlayService {
         }
     }
 
+    @Override
+    public void play(Device device, DeviceChannel channel, ErrorCallback<StreamInfo> callback) {
+        MediaServer mediaServerItem = getNewMediaServerItem(device);
+        if (mediaServerItem == null) {
+            log.warn("[点播] 未找到可用的zlm deviceId: {},channelId:{}", device.getDeviceId(), channel.getDeviceId());
+            throw new ControllerException(ErrorCode.ERROR100.getCode(), "未找到可用的zlm");
+        }
+        play(mediaServerItem, device, channel, null, callback);
+    }
 
     @Override
     public SSRCInfo play(MediaServer mediaServerItem, String deviceId, String channelId, String ssrc, ErrorCallback<StreamInfo> callback) {
diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcController.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcController.java
index b74df842f..946178bdf 100644
--- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcController.java
+++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcController.java
@@ -8,7 +8,10 @@ import com.genersoft.iot.vmp.conf.redis.RedisRpcConfig;
 import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcMessage;
 import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcRequest;
 import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcResponse;
+import com.genersoft.iot.vmp.gb28181.bean.CommonGBChannel;
 import com.genersoft.iot.vmp.gb28181.bean.SendRtpInfo;
+import com.genersoft.iot.vmp.gb28181.service.IGbChannelPlayService;
+import com.genersoft.iot.vmp.gb28181.service.IGbChannelService;
 import com.genersoft.iot.vmp.gb28181.session.SSRCFactory;
 import com.genersoft.iot.vmp.media.bean.MediaInfo;
 import com.genersoft.iot.vmp.media.bean.MediaServer;
@@ -17,6 +20,7 @@ import com.genersoft.iot.vmp.media.event.hook.HookSubscribe;
 import com.genersoft.iot.vmp.media.event.hook.HookType;
 import com.genersoft.iot.vmp.media.service.IMediaServerService;
 import com.genersoft.iot.vmp.service.ISendRtpServerService;
+import com.genersoft.iot.vmp.service.bean.InviteErrorCode;
 import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
 import com.genersoft.iot.vmp.vmanager.bean.WVPResult;
 import lombok.extern.slf4j.Slf4j;
@@ -24,6 +28,8 @@ import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.data.redis.core.RedisTemplate;
 import org.springframework.stereotype.Component;
 
+import javax.sip.message.Response;
+
 /**
  * 其他wvp发起的rpc调用,这里的方法被 RedisRpcConfig 通过反射寻找对应的方法名称调用
  */
@@ -49,6 +55,12 @@ public class RedisRpcController {
     @Autowired
     private RedisTemplate<Object, Object> redisTemplate;
 
+    @Autowired
+    private IGbChannelService channelService;
+
+    @Autowired
+    private IGbChannelPlayService channelPlayService;
+
 
     /**
      * 获取发流的信息
@@ -255,7 +267,7 @@ public class RedisRpcController {
         String callId = request.getParam().toString();
         SendRtpInfo sendRtpItem = sendRtpServerService.queryByCallId(callId);
         RedisRpcResponse response = request.getResponse();
-        response.setStatusCode(200);
+        response.setStatusCode(Response.OK);
         if (sendRtpItem == null) {
             log.info("[redis-rpc] 停止推流, 未找到redis中的发流信息, key:{}", callId);
             WVPResult wvpResult = WVPResult.fail(ErrorCode.ERROR100.getCode(), "未找到redis中的发流信息");
@@ -290,4 +302,37 @@ public class RedisRpcController {
         message.setResponse(response);
         redisTemplate.convertAndSend(RedisRpcConfig.REDIS_REQUEST_CHANNEL_KEY, message);
     }
+
+    /**
+     * 点播国标设备
+     */
+    public RedisRpcResponse playChannel(RedisRpcRequest request) {
+        int channelId = Integer.parseInt(request.getParam().toString());
+        RedisRpcResponse response = request.getResponse();
+
+        if (channelId <= 0) {
+            response.setStatusCode(Response.BAD_REQUEST);
+            response.setBody("param error");
+            return response;
+        }
+        // 获取对应的设备和通道信息
+        CommonGBChannel channel = channelService.getOne(channelId);
+        if (channel == null) {
+            response.setStatusCode(Response.BAD_REQUEST);
+            response.setBody("param error");
+            return response;
+        }
+
+        channelPlayService.play(channel, null, (code, msg, data) ->{
+            if (code == InviteErrorCode.SUCCESS.getCode()) {
+                response.setStatusCode(Response.OK);
+                response.setBody(data);
+            }else {
+                response.setStatusCode(code);
+            }
+            // 手动发送结果
+            sendResponse(response);
+        });
+        return null;
+    }
 }