[RPC] 增加订阅支持
parent
317f771fa2
commit
0cebbee0a9
|
@ -26,6 +26,7 @@ import com.genersoft.iot.vmp.media.service.IMediaServerService;
|
||||||
import com.genersoft.iot.vmp.service.ISendRtpServerService;
|
import com.genersoft.iot.vmp.service.ISendRtpServerService;
|
||||||
import com.genersoft.iot.vmp.service.bean.ErrorCallback;
|
import com.genersoft.iot.vmp.service.bean.ErrorCallback;
|
||||||
import com.genersoft.iot.vmp.service.redisMsg.IRedisRpcPlayService;
|
import com.genersoft.iot.vmp.service.redisMsg.IRedisRpcPlayService;
|
||||||
|
import com.genersoft.iot.vmp.service.redisMsg.IRedisRpcService;
|
||||||
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
|
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
|
||||||
import com.genersoft.iot.vmp.utils.DateUtil;
|
import com.genersoft.iot.vmp.utils.DateUtil;
|
||||||
import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
|
import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
|
||||||
|
@ -99,6 +100,9 @@ public class DeviceServiceImpl implements IDeviceService {
|
||||||
@Autowired
|
@Autowired
|
||||||
private AudioBroadcastManager audioBroadcastManager;
|
private AudioBroadcastManager audioBroadcastManager;
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
private IRedisRpcService redisRpcService;
|
||||||
|
|
||||||
private Device getDeviceByDeviceIdFromDb(String deviceId) {
|
private Device getDeviceByDeviceIdFromDb(String deviceId) {
|
||||||
return deviceMapper.getDeviceByDeviceId(deviceId);
|
return deviceMapper.getDeviceByDeviceId(deviceId);
|
||||||
}
|
}
|
||||||
|
@ -532,10 +536,14 @@ public class DeviceServiceImpl implements IDeviceService {
|
||||||
public void subscribeCatalog(int id, int cycle) {
|
public void subscribeCatalog(int id, int cycle) {
|
||||||
Device device = deviceMapper.query(id);
|
Device device = deviceMapper.query(id);
|
||||||
Assert.notNull(device, "未找到设备");
|
Assert.notNull(device, "未找到设备");
|
||||||
|
|
||||||
if (device.getSubscribeCycleForCatalog() == cycle) {
|
if (device.getSubscribeCycleForCatalog() == cycle) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
if (!userSetting.getServerId().equals(device.getServerId())) {
|
||||||
|
redisRpcService.subscribeCatalog(id, cycle);
|
||||||
|
return;
|
||||||
|
}
|
||||||
// 目录订阅相关的信息
|
// 目录订阅相关的信息
|
||||||
if (device.getSubscribeCycleForCatalog() > 0) {
|
if (device.getSubscribeCycleForCatalog() > 0) {
|
||||||
// 订阅周期不同,则先取消
|
// 订阅周期不同,则先取消
|
||||||
|
@ -565,7 +573,10 @@ public class DeviceServiceImpl implements IDeviceService {
|
||||||
if (device.getSubscribeCycleForMobilePosition() == cycle) {
|
if (device.getSubscribeCycleForMobilePosition() == cycle) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
if (!userSetting.getServerId().equals(device.getServerId())) {
|
||||||
|
redisRpcService.subscribeMobilePosition(id, cycle, interval);
|
||||||
|
return;
|
||||||
|
}
|
||||||
// 目录订阅相关的信息
|
// 目录订阅相关的信息
|
||||||
if (device.getSubscribeCycleForMobilePosition() > 0) {
|
if (device.getSubscribeCycleForMobilePosition() > 0) {
|
||||||
// 订阅周期已经开启,则先取消
|
// 订阅周期已经开启,则先取消
|
||||||
|
|
|
@ -3,6 +3,7 @@ package com.genersoft.iot.vmp.service.redisMsg;
|
||||||
import com.genersoft.iot.vmp.common.CommonCallback;
|
import com.genersoft.iot.vmp.common.CommonCallback;
|
||||||
import com.genersoft.iot.vmp.common.StreamInfo;
|
import com.genersoft.iot.vmp.common.StreamInfo;
|
||||||
import com.genersoft.iot.vmp.gb28181.bean.SendRtpInfo;
|
import com.genersoft.iot.vmp.gb28181.bean.SendRtpInfo;
|
||||||
|
import com.genersoft.iot.vmp.service.bean.ErrorCallback;
|
||||||
import com.genersoft.iot.vmp.vmanager.bean.WVPResult;
|
import com.genersoft.iot.vmp.vmanager.bean.WVPResult;
|
||||||
|
|
||||||
public interface IRedisRpcService {
|
public interface IRedisRpcService {
|
||||||
|
@ -24,4 +25,7 @@ public interface IRedisRpcService {
|
||||||
long onStreamOnlineEvent(String app, String stream, CommonCallback<StreamInfo> callback);
|
long onStreamOnlineEvent(String app, String stream, CommonCallback<StreamInfo> callback);
|
||||||
void unPushStreamOnlineEvent(String app, String stream);
|
void unPushStreamOnlineEvent(String app, String stream);
|
||||||
|
|
||||||
|
void subscribeCatalog(int id, int cycle);
|
||||||
|
|
||||||
|
void subscribeMobilePosition(int id, int cycle, int interval);
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,99 @@
|
||||||
|
package com.genersoft.iot.vmp.service.redisMsg.control;
|
||||||
|
|
||||||
|
import com.alibaba.fastjson2.JSONObject;
|
||||||
|
import com.genersoft.iot.vmp.common.InviteSessionType;
|
||||||
|
import com.genersoft.iot.vmp.conf.UserSetting;
|
||||||
|
import com.genersoft.iot.vmp.conf.exception.ControllerException;
|
||||||
|
import com.genersoft.iot.vmp.conf.redis.RedisRpcConfig;
|
||||||
|
import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcMessage;
|
||||||
|
import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcRequest;
|
||||||
|
import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcResponse;
|
||||||
|
import com.genersoft.iot.vmp.gb28181.bean.CommonGBChannel;
|
||||||
|
import com.genersoft.iot.vmp.gb28181.bean.Device;
|
||||||
|
import com.genersoft.iot.vmp.gb28181.bean.InviteMessageInfo;
|
||||||
|
import com.genersoft.iot.vmp.gb28181.service.IDeviceService;
|
||||||
|
import com.genersoft.iot.vmp.gb28181.service.IGbChannelPlayService;
|
||||||
|
import com.genersoft.iot.vmp.gb28181.service.IGbChannelService;
|
||||||
|
import com.genersoft.iot.vmp.gb28181.service.IPTZService;
|
||||||
|
import com.genersoft.iot.vmp.service.bean.InviteErrorCode;
|
||||||
|
import com.genersoft.iot.vmp.service.redisMsg.dto.RedisRpcController;
|
||||||
|
import com.genersoft.iot.vmp.service.redisMsg.dto.RedisRpcMapping;
|
||||||
|
import com.genersoft.iot.vmp.service.redisMsg.dto.RpcController;
|
||||||
|
import com.genersoft.iot.vmp.utils.DateUtil;
|
||||||
|
import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
|
import org.springframework.data.redis.core.RedisTemplate;
|
||||||
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
|
import javax.sip.message.Response;
|
||||||
|
|
||||||
|
@Component
|
||||||
|
@Slf4j
|
||||||
|
@RedisRpcController("device")
|
||||||
|
public class RedisRpcGbDeviceController extends RpcController {
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
private UserSetting userSetting;
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
private RedisTemplate<Object, Object> redisTemplate;
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
private IDeviceService deviceService;
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
private void sendResponse(RedisRpcResponse response){
|
||||||
|
log.info("[redis-rpc] >> {}", response);
|
||||||
|
response.setToId(userSetting.getServerId());
|
||||||
|
RedisRpcMessage message = new RedisRpcMessage();
|
||||||
|
message.setResponse(response);
|
||||||
|
redisTemplate.convertAndSend(RedisRpcConfig.REDIS_REQUEST_CHANNEL_KEY, message);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 目录订阅
|
||||||
|
*/
|
||||||
|
@RedisRpcMapping("subscribeCatalog")
|
||||||
|
public RedisRpcResponse subscribeCatalog(RedisRpcRequest request) {
|
||||||
|
JSONObject paramJson = JSONObject.parseObject(request.getParam().toString());
|
||||||
|
int id = paramJson.getIntValue("id");
|
||||||
|
int cycle = paramJson.getIntValue("cycle");
|
||||||
|
|
||||||
|
RedisRpcResponse response = request.getResponse();
|
||||||
|
|
||||||
|
if (id <= 0) {
|
||||||
|
response.setStatusCode(ErrorCode.ERROR400.getCode());
|
||||||
|
response.setBody("param error");
|
||||||
|
return response;
|
||||||
|
}
|
||||||
|
deviceService.subscribeCatalog(id, cycle);
|
||||||
|
response.setStatusCode(ErrorCode.SUCCESS.getCode());
|
||||||
|
return response;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 移动位置订阅
|
||||||
|
*/
|
||||||
|
@RedisRpcMapping("subscribeMobilePosition")
|
||||||
|
public RedisRpcResponse subscribeMobilePosition(RedisRpcRequest request) {
|
||||||
|
JSONObject paramJson = JSONObject.parseObject(request.getParam().toString());
|
||||||
|
int id = paramJson.getIntValue("id");
|
||||||
|
int cycle = paramJson.getIntValue("cycle");
|
||||||
|
int interval = paramJson.getIntValue("interval");
|
||||||
|
|
||||||
|
RedisRpcResponse response = request.getResponse();
|
||||||
|
|
||||||
|
if (id <= 0) {
|
||||||
|
response.setStatusCode(ErrorCode.ERROR400.getCode());
|
||||||
|
response.setBody("param error");
|
||||||
|
return response;
|
||||||
|
}
|
||||||
|
deviceService.subscribeMobilePosition(id, cycle, interval);
|
||||||
|
response.setStatusCode(ErrorCode.SUCCESS.getCode());
|
||||||
|
return response;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -1,6 +1,7 @@
|
||||||
package com.genersoft.iot.vmp.service.redisMsg.service;
|
package com.genersoft.iot.vmp.service.redisMsg.service;
|
||||||
|
|
||||||
import com.alibaba.fastjson2.JSON;
|
import com.alibaba.fastjson2.JSON;
|
||||||
|
import com.alibaba.fastjson2.JSONObject;
|
||||||
import com.genersoft.iot.vmp.common.CommonCallback;
|
import com.genersoft.iot.vmp.common.CommonCallback;
|
||||||
import com.genersoft.iot.vmp.common.StreamInfo;
|
import com.genersoft.iot.vmp.common.StreamInfo;
|
||||||
import com.genersoft.iot.vmp.conf.UserSetting;
|
import com.genersoft.iot.vmp.conf.UserSetting;
|
||||||
|
@ -201,4 +202,23 @@ public class RedisRpcServiceImpl implements IRedisRpcService {
|
||||||
RedisRpcRequest request = buildRequest("streamPush/unPushStreamOnlineEvent", streamInfoParam);
|
RedisRpcRequest request = buildRequest("streamPush/unPushStreamOnlineEvent", streamInfoParam);
|
||||||
redisRpcConfig.request(request, 10);
|
redisRpcConfig.request(request, 10);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void subscribeCatalog(int id, int cycle) {
|
||||||
|
JSONObject jsonObject = new JSONObject();
|
||||||
|
jsonObject.put("id", id);
|
||||||
|
jsonObject.put("cycle", cycle);
|
||||||
|
RedisRpcRequest request = buildRequest("device/subscribeCatalog", jsonObject);
|
||||||
|
redisRpcConfig.request(request, 10);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void subscribeMobilePosition(int id, int cycle, int interval) {
|
||||||
|
JSONObject jsonObject = new JSONObject();
|
||||||
|
jsonObject.put("id", id);
|
||||||
|
jsonObject.put("cycle", cycle);
|
||||||
|
jsonObject.put("interval", cycle);
|
||||||
|
RedisRpcRequest request = buildRequest("device/subscribeMobilePosition", jsonObject);
|
||||||
|
redisRpcConfig.request(request, 10);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue