优化RPC时间定义
parent
ee590d6597
commit
304e596440
|
@ -171,19 +171,25 @@ public class RedisRpcConfig implements MessageListener {
|
||||||
private final Map<Long, SynchronousQueue<RedisRpcResponse>> topicSubscribers = new ConcurrentHashMap<>();
|
private final Map<Long, SynchronousQueue<RedisRpcResponse>> topicSubscribers = new ConcurrentHashMap<>();
|
||||||
private final Map<Long, CommonCallback<RedisRpcResponse>> callbacks = new ConcurrentHashMap<>();
|
private final Map<Long, CommonCallback<RedisRpcResponse>> callbacks = new ConcurrentHashMap<>();
|
||||||
|
|
||||||
public RedisRpcResponse request(RedisRpcRequest request, int timeOut) {
|
public RedisRpcResponse request(RedisRpcRequest request, long timeOut) {
|
||||||
|
return request(request, timeOut, TimeUnit.SECONDS);
|
||||||
|
}
|
||||||
|
|
||||||
|
public RedisRpcResponse request(RedisRpcRequest request, long timeOut, TimeUnit timeUnit) {
|
||||||
request.setSn((long) random.nextInt(1000) + 1);
|
request.setSn((long) random.nextInt(1000) + 1);
|
||||||
SynchronousQueue<RedisRpcResponse> subscribe = subscribe(request.getSn());
|
SynchronousQueue<RedisRpcResponse> subscribe = subscribe(request.getSn());
|
||||||
|
|
||||||
try {
|
try {
|
||||||
sendRequest(request);
|
sendRequest(request);
|
||||||
return subscribe.poll(timeOut, TimeUnit.SECONDS);
|
return subscribe.poll(timeOut, timeUnit);
|
||||||
} catch (InterruptedException e) {
|
} catch (InterruptedException e) {
|
||||||
log.warn("[redis rpc timeout] uri: {}, sn: {}", request.getUri(), request.getSn(), e);
|
log.warn("[redis rpc timeout] uri: {}, sn: {}", request.getUri(), request.getSn(), e);
|
||||||
|
RedisRpcResponse redisRpcResponse = new RedisRpcResponse();
|
||||||
|
redisRpcResponse.setStatusCode(Response.BUSY_HERE);
|
||||||
|
return redisRpcResponse;
|
||||||
} finally {
|
} finally {
|
||||||
this.unsubscribe(request.getSn());
|
this.unsubscribe(request.getSn());
|
||||||
}
|
}
|
||||||
return null;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public void request(RedisRpcRequest request, CommonCallback<RedisRpcResponse> callback) {
|
public void request(RedisRpcRequest request, CommonCallback<RedisRpcResponse> callback) {
|
||||||
|
@ -236,6 +242,7 @@ public class RedisRpcConfig implements MessageListener {
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
// @Scheduled(fixedRate = 1000) //每1秒执行一次
|
// @Scheduled(fixedRate = 1000) //每1秒执行一次
|
||||||
// public void execute(){
|
// public void execute(){
|
||||||
// logger.info("callbacks的长度: " + callbacks.size());
|
// logger.info("callbacks的长度: " + callbacks.size());
|
||||||
|
|
|
@ -10,6 +10,7 @@ import com.genersoft.iot.vmp.conf.redis.RedisRpcConfig;
|
||||||
import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcRequest;
|
import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcRequest;
|
||||||
import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcResponse;
|
import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcResponse;
|
||||||
import com.genersoft.iot.vmp.service.bean.ErrorCallback;
|
import com.genersoft.iot.vmp.service.bean.ErrorCallback;
|
||||||
|
import com.genersoft.iot.vmp.service.bean.InviteErrorCode;
|
||||||
import com.genersoft.iot.vmp.service.redisMsg.IRedisRpcPlayService;
|
import com.genersoft.iot.vmp.service.redisMsg.IRedisRpcPlayService;
|
||||||
import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
|
import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
@ -17,6 +18,7 @@ import org.springframework.beans.factory.annotation.Autowired;
|
||||||
import org.springframework.stereotype.Service;
|
import org.springframework.stereotype.Service;
|
||||||
|
|
||||||
import javax.sip.message.Response;
|
import javax.sip.message.Response;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
@Slf4j
|
@Slf4j
|
||||||
@Service
|
@Service
|
||||||
|
@ -42,13 +44,13 @@ public class RedisRpcPlayServiceImpl implements IRedisRpcPlayService {
|
||||||
public void play(String serverId, Integer channelId, ErrorCallback<StreamInfo> callback) {
|
public void play(String serverId, Integer channelId, ErrorCallback<StreamInfo> callback) {
|
||||||
RedisRpcRequest request = buildRequest("channel/play", channelId);
|
RedisRpcRequest request = buildRequest("channel/play", channelId);
|
||||||
request.setToId(serverId);
|
request.setToId(serverId);
|
||||||
RedisRpcResponse response = redisRpcConfig.request(request, userSetting.getPlayTimeout());
|
RedisRpcResponse response = redisRpcConfig.request(request, userSetting.getPlayTimeout(), TimeUnit.MICROSECONDS);
|
||||||
if (response == null) {
|
if (response == null) {
|
||||||
callback.run(ErrorCode.ERROR100.getCode(), ErrorCode.ERROR100.getMsg(), null);
|
callback.run(ErrorCode.ERROR100.getCode(), ErrorCode.ERROR100.getMsg(), null);
|
||||||
}else {
|
}else {
|
||||||
if (response.getStatusCode() == Response.OK) {
|
if (response.getStatusCode() == Response.OK) {
|
||||||
StreamInfo streamInfo = JSON.parseObject(response.getBody().toString(), StreamInfo.class);
|
StreamInfo streamInfo = JSON.parseObject(response.getBody().toString(), StreamInfo.class);
|
||||||
callback.run(response.getStatusCode(), "success", streamInfo);
|
callback.run(InviteErrorCode.SUCCESS.getCode(), InviteErrorCode.SUCCESS.getMsg(), streamInfo);
|
||||||
}else {
|
}else {
|
||||||
callback.run(response.getStatusCode(), response.getBody().toString(), null);
|
callback.run(response.getStatusCode(), response.getBody().toString(), null);
|
||||||
}
|
}
|
||||||
|
@ -63,7 +65,7 @@ public class RedisRpcPlayServiceImpl implements IRedisRpcPlayService {
|
||||||
jsonObject.put("inviteSessionType", type);
|
jsonObject.put("inviteSessionType", type);
|
||||||
RedisRpcRequest request = buildRequest("channel/stop", jsonObject.toJSONString());
|
RedisRpcRequest request = buildRequest("channel/stop", jsonObject.toJSONString());
|
||||||
request.setToId(serverId);
|
request.setToId(serverId);
|
||||||
RedisRpcResponse response = redisRpcConfig.request(request, 50);
|
RedisRpcResponse response = redisRpcConfig.request(request, 50, TimeUnit.MICROSECONDS);
|
||||||
if (response == null) {
|
if (response == null) {
|
||||||
throw new ControllerException(ErrorCode.ERROR100.getCode(), ErrorCode.ERROR100.getMsg());
|
throw new ControllerException(ErrorCode.ERROR100.getCode(), ErrorCode.ERROR100.getMsg());
|
||||||
}else {
|
}else {
|
||||||
|
|
Loading…
Reference in New Issue