临时提交
parent
5d36c2a337
commit
678bb33562
|
@ -771,7 +771,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
|
||||||
redisCatchStorage.sendStreamPushRequestedMsg(messageForPushChannel);
|
redisCatchStorage.sendStreamPushRequestedMsg(messageForPushChannel);
|
||||||
// 设置超时
|
// 设置超时
|
||||||
dynamicTask.startDelay(sendRtpItem.getCallId(), () -> {
|
dynamicTask.startDelay(sendRtpItem.getCallId(), () -> {
|
||||||
redisRpcService.stopWaitePushStreamOnline(sendRtpItem);
|
redisRpcService.stopWaitePushStreamOnline(sendRtpItem.getRedisKey(), sendRtpItem);
|
||||||
logger.info("[ app={}, stream={} ] 等待设备开始推流超时", sendRtpItem.getApp(), sendRtpItem.getStream());
|
logger.info("[ app={}, stream={} ] 等待设备开始推流超时", sendRtpItem.getApp(), sendRtpItem.getStream());
|
||||||
try {
|
try {
|
||||||
responseAck(request, Response.REQUEST_TIMEOUT); // 超时
|
responseAck(request, Response.REQUEST_TIMEOUT); // 超时
|
||||||
|
@ -834,7 +834,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
|
||||||
redisPushStreamResponseListener.addEvent(sendRtpItem.getApp(), sendRtpItem.getStream(), response -> {
|
redisPushStreamResponseListener.addEvent(sendRtpItem.getApp(), sendRtpItem.getStream(), response -> {
|
||||||
if (response.getCode() != 0) {
|
if (response.getCode() != 0) {
|
||||||
dynamicTask.stop(sendRtpItem.getCallId());
|
dynamicTask.stop(sendRtpItem.getCallId());
|
||||||
redisRpcService.stopWaitePushStreamOnline(sendRtpItem);
|
redisRpcService.stopWaitePushStreamOnline(sendRtpItem.getRedisKey(), sendRtpItem);
|
||||||
try {
|
try {
|
||||||
responseAck(request, Response.TEMPORARILY_UNAVAILABLE, response.getMsg());
|
responseAck(request, Response.TEMPORARILY_UNAVAILABLE, response.getMsg());
|
||||||
} catch (SipException | InvalidArgumentException | ParseException e) {
|
} catch (SipException | InvalidArgumentException | ParseException e) {
|
||||||
|
|
|
@ -14,7 +14,7 @@ public interface IRedisRpcService {
|
||||||
|
|
||||||
void waitePushStreamOnline(SendRtpItem sendRtpItem, CommonCallback<String> callback);
|
void waitePushStreamOnline(SendRtpItem sendRtpItem, CommonCallback<String> callback);
|
||||||
|
|
||||||
void stopWaitePushStreamOnline(SendRtpItem sendRtpItem);
|
void stopWaitePushStreamOnline(String sendRtpItemKey, SendRtpItem sendRtpItem);
|
||||||
|
|
||||||
void rtpSendStopped(String sendRtpItemKey);
|
void rtpSendStopped(String sendRtpItemKey);
|
||||||
|
|
||||||
|
|
|
@ -174,7 +174,14 @@ public class RedisRpcController {
|
||||||
* 停止监听流上线
|
* 停止监听流上线
|
||||||
*/
|
*/
|
||||||
public RedisRpcResponse stopWaitePushStreamOnline(RedisRpcRequest request) {
|
public RedisRpcResponse stopWaitePushStreamOnline(RedisRpcRequest request) {
|
||||||
SendRtpItem sendRtpItem = JSONObject.parseObject(request.getParam().toString(), SendRtpItem.class);
|
String sendRtpItemKey = request.getParam().toString();
|
||||||
|
SendRtpItem sendRtpItem = (SendRtpItem) redisTemplate.opsForValue().get(sendRtpItemKey);
|
||||||
|
if (sendRtpItem == null) {
|
||||||
|
logger.info("[redis-rpc] 停止监听流上线, 未找到redis中的发流信息, key:{}", sendRtpItemKey);
|
||||||
|
RedisRpcResponse response = request.getResponse();
|
||||||
|
response.setStatusCode(200);
|
||||||
|
return response;
|
||||||
|
}
|
||||||
logger.info("[redis-rpc] 停止监听流上线: {}/{}, 目标地址: {}:{}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort() );
|
logger.info("[redis-rpc] 停止监听流上线: {}/{}, 目标地址: {}:{}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort() );
|
||||||
|
|
||||||
// 监听流上线。 流上线直接发送sendRtpItem消息给实际的信令处理者
|
// 监听流上线。 流上线直接发送sendRtpItem消息给实际的信令处理者
|
||||||
|
|
|
@ -126,12 +126,12 @@ public class RedisRpcServiceImpl implements IRedisRpcService {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void stopWaitePushStreamOnline(SendRtpItem sendRtpItem) {
|
public void stopWaitePushStreamOnline(String sendRtpItemKey, SendRtpItem sendRtpItem) {
|
||||||
logger.info("[停止WVP监听流上线] {}/{}, key:{}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem);
|
logger.info("[停止WVP监听流上线] {}/{}, key:{}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItemKey);
|
||||||
HookSubscribeForStreamChange hook = HookSubscribeFactory.on_stream_changed(
|
HookSubscribeForStreamChange hook = HookSubscribeFactory.on_stream_changed(
|
||||||
sendRtpItem.getApp(), sendRtpItem.getStream(), true, "rtsp", null);
|
sendRtpItem.getApp(), sendRtpItem.getStream(), true, "rtsp", null);
|
||||||
hookSubscribe.removeSubscribe(hook);
|
hookSubscribe.removeSubscribe(hook);
|
||||||
RedisRpcRequest request = buildRequest("stopWaitePushStreamOnline", sendRtpItem);
|
RedisRpcRequest request = buildRequest("stopWaitePushStreamOnline", sendRtpItemKey);
|
||||||
request.setToId(sendRtpItem.getServerId());
|
request.setToId(sendRtpItem.getServerId());
|
||||||
redisRpcConfig.request(request, 10);
|
redisRpcConfig.request(request, 10);
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue