[RPC] 添加拉流代理支持

dev/数据库统合
648540858 2024-12-31 11:19:01 +08:00
parent 26ee38aebe
commit a324e4c95a
7 changed files with 150 additions and 21 deletions

View File

@ -24,4 +24,9 @@ public interface IRedisRpcPlayService {
String frontEndCommand(String serverId, Integer channelId, int cmdCode, int parameter1, int parameter2, int combindCode2);
void playPush(Integer id, ErrorCallback<StreamInfo> callback);
StreamInfo playProxy(String serverId, int id);
void stopProxy(String serverId, int id);
}

View File

@ -3,7 +3,6 @@ package com.genersoft.iot.vmp.service.redisMsg;
import com.genersoft.iot.vmp.common.CommonCallback;
import com.genersoft.iot.vmp.common.StreamInfo;
import com.genersoft.iot.vmp.gb28181.bean.SendRtpInfo;
import com.genersoft.iot.vmp.service.bean.ErrorCallback;
import com.genersoft.iot.vmp.vmanager.bean.WVPResult;
public interface IRedisRpcService {
@ -29,6 +28,4 @@ public interface IRedisRpcService {
void subscribeMobilePosition(int id, int cycle, int interval);
void play(Integer id, ErrorCallback<StreamInfo> callback);
}

View File

@ -0,0 +1,95 @@
package com.genersoft.iot.vmp.service.redisMsg.control;
import com.alibaba.fastjson2.JSONObject;
import com.genersoft.iot.vmp.common.StreamInfo;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.conf.redis.RedisRpcConfig;
import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcMessage;
import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcRequest;
import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcResponse;
import com.genersoft.iot.vmp.service.redisMsg.dto.RedisRpcController;
import com.genersoft.iot.vmp.service.redisMsg.dto.RedisRpcMapping;
import com.genersoft.iot.vmp.service.redisMsg.dto.RpcController;
import com.genersoft.iot.vmp.streamProxy.bean.StreamProxy;
import com.genersoft.iot.vmp.streamProxy.service.IStreamProxyPlayService;
import com.genersoft.iot.vmp.streamProxy.service.IStreamProxyService;
import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
@Component
@Slf4j
@RedisRpcController("streamProxy")
public class RedisRpcStreamProxyController extends RpcController {
@Autowired
private UserSetting userSetting;
@Autowired
private RedisTemplate<Object, Object> redisTemplate;
@Autowired
private IStreamProxyPlayService streamProxyPlayService;
@Autowired
private IStreamProxyService streamProxyService;
private void sendResponse(RedisRpcResponse response){
log.info("[redis-rpc] >> {}", response);
response.setToId(userSetting.getServerId());
RedisRpcMessage message = new RedisRpcMessage();
message.setResponse(response);
redisTemplate.convertAndSend(RedisRpcConfig.REDIS_REQUEST_CHANNEL_KEY, message);
}
/**
*
*/
@RedisRpcMapping("play")
public RedisRpcResponse play(RedisRpcRequest request) {
int id = Integer.parseInt(request.getParam().toString());
RedisRpcResponse response = request.getResponse();
if (id <= 0) {
response.setStatusCode(ErrorCode.ERROR400.getCode());
response.setBody("param error");
return response;
}
StreamProxy streamProxy = streamProxyService.getStreamProxy(id);
if (streamProxy == null) {
response.setStatusCode(ErrorCode.ERROR400.getCode());
response.setBody("param error");
return response;
}
StreamInfo streamInfo = streamProxyPlayService.startProxy(streamProxy);
response.setStatusCode(ErrorCode.SUCCESS.getCode());
response.setBody(JSONObject.toJSONString(streamInfo));
return response;
}
/**
*
*/
@RedisRpcMapping("stop")
public RedisRpcResponse stop(RedisRpcRequest request) {
int id = Integer.parseInt(request.getParam().toString());
RedisRpcResponse response = request.getResponse();
if (id <= 0) {
response.setStatusCode(ErrorCode.ERROR400.getCode());
response.setBody("param error");
return response;
}
StreamProxy streamProxy = streamProxyService.getStreamProxy(id);
if (streamProxy == null) {
response.setStatusCode(ErrorCode.ERROR400.getCode());
response.setBody("param error");
return response;
}
streamProxyPlayService.stopProxy(streamProxy);
response.setStatusCode(ErrorCode.SUCCESS.getCode());
return response;
}
}

View File

@ -189,5 +189,42 @@ public class RedisRpcPlayServiceImpl implements IRedisRpcPlayService {
}
return null;
}
@Override
public void playPush(Integer id, ErrorCallback<StreamInfo> callback) {
RedisRpcRequest request = buildRequest("streamPush/play", id);
RedisRpcResponse response = redisRpcConfig.request(request, userSetting.getPlayTimeout());
if (response == null) {
callback.run(ErrorCode.ERROR100.getCode(), ErrorCode.ERROR100.getMsg(), null);
}else {
if (response.getStatusCode() == ErrorCode.SUCCESS.getCode()) {
StreamInfo streamInfo = JSON.parseObject(response.getBody().toString(), StreamInfo.class);
callback.run(InviteErrorCode.SUCCESS.getCode(), InviteErrorCode.SUCCESS.getMsg(), streamInfo);
}else {
callback.run(response.getStatusCode(), response.getBody().toString(), null);
}
}
}
@Override
public StreamInfo playProxy(String serverId, int id) {
RedisRpcRequest request = buildRequest("streamProxy/play", id);
RedisRpcResponse response = redisRpcConfig.request(request, userSetting.getPlayTimeout());
if (response != null && response.getStatusCode() == ErrorCode.SUCCESS.getCode()) {
return JSON.parseObject(response.getBody().toString(), StreamInfo.class);
}
return null;
}
@Override
public void stopProxy(String serverId, int id) {
RedisRpcRequest request = buildRequest("streamProxy/stop", id);
RedisRpcResponse response = redisRpcConfig.request(request, userSetting.getPlayTimeout());
if (response != null && response.getStatusCode() == ErrorCode.SUCCESS.getCode()) {
log.info("[rpc 拉流代理] 停止成功: id: {}", id);
}else {
log.info("[rpc 拉流代理] 停止失败 id: {}", id);
}
}
}

View File

@ -223,20 +223,4 @@ public class RedisRpcServiceImpl implements IRedisRpcService {
RedisRpcRequest request = buildRequest("device/subscribeMobilePosition", jsonObject);
redisRpcConfig.request(request, 10);
}
@Override
public void play(Integer id, ErrorCallback<StreamInfo> callback) {
RedisRpcRequest request = buildRequest("streamPush/play", id);
RedisRpcResponse response = redisRpcConfig.request(request, 10);
if (response == null) {
callback.run(ErrorCode.ERROR100.getCode(), ErrorCode.ERROR100.getMsg(), null);
}else {
if (response.getStatusCode() == ErrorCode.SUCCESS.getCode()) {
StreamInfo streamInfo = JSON.parseObject(response.getBody().toString(), StreamInfo.class);
callback.run(InviteErrorCode.SUCCESS.getCode(), InviteErrorCode.SUCCESS.getMsg(), streamInfo);
}else {
callback.run(response.getStatusCode(), response.getBody().toString(), null);
}
}
}
}

View File

@ -5,6 +5,7 @@ import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.conf.exception.ControllerException;
import com.genersoft.iot.vmp.media.bean.MediaServer;
import com.genersoft.iot.vmp.media.service.IMediaServerService;
import com.genersoft.iot.vmp.service.redisMsg.IRedisRpcPlayService;
import com.genersoft.iot.vmp.streamProxy.bean.StreamProxy;
import com.genersoft.iot.vmp.streamProxy.dao.StreamProxyMapper;
import com.genersoft.iot.vmp.streamProxy.service.IStreamProxyPlayService;
@ -31,6 +32,9 @@ public class StreamProxyPlayServiceImpl implements IStreamProxyPlayService {
@Autowired
private UserSetting userSetting;
@Autowired
private IRedisRpcPlayService redisRpcPlayService;
@Override
public StreamInfo start(int id) {
StreamProxy streamProxy = streamProxyMapper.select(id);
@ -47,7 +51,7 @@ public class StreamProxyPlayServiceImpl implements IStreamProxyPlayService {
return null;
}
if (!userSetting.getServerId().equals(streamProxy.getServerId())) {
return redisRpcService.play(id, callback);
return redisRpcPlayService.playProxy(streamProxy.getServerId(), streamProxy.getId());
}
MediaServer mediaServer;
@ -74,6 +78,9 @@ public class StreamProxyPlayServiceImpl implements IStreamProxyPlayService {
if (streamProxy == null) {
throw new ControllerException(ErrorCode.ERROR404.getCode(), "代理信息未找到");
}
if (!userSetting.getServerId().equals(streamProxy.getServerId())) {
redisRpcPlayService.stopProxy(streamProxy.getServerId(), streamProxy.getId());
}
stopProxy(streamProxy);
}

View File

@ -9,6 +9,7 @@ import com.genersoft.iot.vmp.media.service.IMediaServerService;
import com.genersoft.iot.vmp.media.zlm.dto.StreamAuthorityInfo;
import com.genersoft.iot.vmp.service.bean.ErrorCallback;
import com.genersoft.iot.vmp.service.bean.MessageForPushChannel;
import com.genersoft.iot.vmp.service.redisMsg.IRedisRpcPlayService;
import com.genersoft.iot.vmp.service.redisMsg.IRedisRpcService;
import com.genersoft.iot.vmp.service.redisMsg.RedisPushStreamResponseListener;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
@ -45,6 +46,9 @@ public class StreamPushPlayServiceImpl implements IStreamPushPlayService {
@Autowired
private IRedisRpcService redisRpcService;
@Autowired
private IRedisRpcPlayService redisRpcPlayService;
@Autowired
private RedisPushStreamResponseListener redisPushStreamResponseListener;
@ -54,7 +58,7 @@ public class StreamPushPlayServiceImpl implements IStreamPushPlayService {
Assert.notNull(streamPush, "推流信息未找到");
if (!userSetting.getServerId().equals(streamPush.getServerId())) {
redisRpcService.play(id, callback);
redisRpcPlayService.playPush(id, callback);
return;
}