优化RPC时间定义

dev/数据库统合
648540858 2024-12-12 17:00:58 +08:00
parent 2e254c6c8c
commit b0a1a8d141
2 changed files with 15 additions and 6 deletions

View File

@ -171,19 +171,25 @@ public class RedisRpcConfig implements MessageListener {
private final Map<Long, SynchronousQueue<RedisRpcResponse>> topicSubscribers = new ConcurrentHashMap<>();
private final Map<Long, CommonCallback<RedisRpcResponse>> callbacks = new ConcurrentHashMap<>();
public RedisRpcResponse request(RedisRpcRequest request, int timeOut) {
public RedisRpcResponse request(RedisRpcRequest request, long timeOut) {
return request(request, timeOut, TimeUnit.SECONDS);
}
public RedisRpcResponse request(RedisRpcRequest request, long timeOut, TimeUnit timeUnit) {
request.setSn((long) random.nextInt(1000) + 1);
SynchronousQueue<RedisRpcResponse> subscribe = subscribe(request.getSn());
try {
sendRequest(request);
return subscribe.poll(timeOut, TimeUnit.SECONDS);
return subscribe.poll(timeOut, timeUnit);
} catch (InterruptedException e) {
log.warn("[redis rpc timeout] uri: {}, sn: {}", request.getUri(), request.getSn(), e);
RedisRpcResponse redisRpcResponse = new RedisRpcResponse();
redisRpcResponse.setStatusCode(Response.BUSY_HERE);
return redisRpcResponse;
} finally {
this.unsubscribe(request.getSn());
}
return null;
}
public void request(RedisRpcRequest request, CommonCallback<RedisRpcResponse> callback) {
@ -236,6 +242,7 @@ public class RedisRpcConfig implements MessageListener {
// @Scheduled(fixedRate = 1000) //每1秒执行一次
// public void execute(){
// logger.info("callbacks的长度: " + callbacks.size());

View File

@ -10,6 +10,7 @@ import com.genersoft.iot.vmp.conf.redis.RedisRpcConfig;
import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcRequest;
import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcResponse;
import com.genersoft.iot.vmp.service.bean.ErrorCallback;
import com.genersoft.iot.vmp.service.bean.InviteErrorCode;
import com.genersoft.iot.vmp.service.redisMsg.IRedisRpcPlayService;
import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
import lombok.extern.slf4j.Slf4j;
@ -17,6 +18,7 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import javax.sip.message.Response;
import java.util.concurrent.TimeUnit;
@Slf4j
@Service
@ -42,13 +44,13 @@ public class RedisRpcPlayServiceImpl implements IRedisRpcPlayService {
public void play(String serverId, Integer channelId, ErrorCallback<StreamInfo> callback) {
RedisRpcRequest request = buildRequest("channel/play", channelId);
request.setToId(serverId);
RedisRpcResponse response = redisRpcConfig.request(request, userSetting.getPlayTimeout());
RedisRpcResponse response = redisRpcConfig.request(request, userSetting.getPlayTimeout(), TimeUnit.MICROSECONDS);
if (response == null) {
callback.run(ErrorCode.ERROR100.getCode(), ErrorCode.ERROR100.getMsg(), null);
}else {
if (response.getStatusCode() == Response.OK) {
StreamInfo streamInfo = JSON.parseObject(response.getBody().toString(), StreamInfo.class);
callback.run(response.getStatusCode(), "success", streamInfo);
callback.run(InviteErrorCode.SUCCESS.getCode(), InviteErrorCode.SUCCESS.getMsg(), streamInfo);
}else {
callback.run(response.getStatusCode(), response.getBody().toString(), null);
}
@ -63,7 +65,7 @@ public class RedisRpcPlayServiceImpl implements IRedisRpcPlayService {
jsonObject.put("inviteSessionType", type);
RedisRpcRequest request = buildRequest("channel/stop", jsonObject.toJSONString());
request.setToId(serverId);
RedisRpcResponse response = redisRpcConfig.request(request, 50);
RedisRpcResponse response = redisRpcConfig.request(request, 50, TimeUnit.MICROSECONDS);
if (response == null) {
throw new ControllerException(ErrorCode.ERROR100.getCode(), ErrorCode.ERROR100.getMsg());
}else {