修复消息超时未回复误报

pull/1652/head
648540858 2024-10-19 20:27:42 +08:00
parent a548dec2c2
commit 91e671ec4b
4 changed files with 21 additions and 11 deletions

View File

@ -29,7 +29,7 @@ public class SipSubscribe {
private final DelayQueue<SipEvent> delayQueue = new DelayQueue<>(); private final DelayQueue<SipEvent> delayQueue = new DelayQueue<>();
@Scheduled(fixedRate = 200) //每200毫秒执行 @Scheduled(fixedDelay = 200) //每200毫秒执行
public void execute(){ public void execute(){
if (delayQueue.isEmpty()) { if (delayQueue.isEmpty()) {
return; return;
@ -153,6 +153,7 @@ public class SipSubscribe {
public void addSubscribe(String key, SipEvent event) { public void addSubscribe(String key, SipEvent event) {
System.out.println(event.getDelay());
SipEvent sipEvent = subscribes.get(key); SipEvent sipEvent = subscribes.get(key);
if (sipEvent != null) { if (sipEvent != null) {
subscribes.remove(key); subscribes.remove(key);
@ -180,4 +181,8 @@ public class SipSubscribe {
public boolean isEmpty(){ public boolean isEmpty(){
return subscribes.isEmpty(); return subscribes.isEmpty();
} }
public Integer size() {
return subscribes.size();
}
} }

View File

@ -23,7 +23,7 @@ public class SipEvent implements Delayed {
private SipSubscribe.Event errorEvent; private SipSubscribe.Event errorEvent;
/** /**
* * ( )
*/ */
private long delay; private long delay;
@ -38,7 +38,7 @@ public class SipEvent implements Delayed {
@Override @Override
public long getDelay(@NotNull TimeUnit unit) { public long getDelay(@NotNull TimeUnit unit) {
return unit.convert(delay - System.currentTimeMillis(),TimeUnit.MILLISECONDS); return unit.convert(delay, TimeUnit.MILLISECONDS);
} }
@Override @Override

View File

@ -97,10 +97,12 @@ public class SIPProcessorObserver implements ISIPProcessorObserver {
CallIdHeader callIdHeader = response.getCallIdHeader(); CallIdHeader callIdHeader = response.getCallIdHeader();
if (callIdHeader != null) { if (callIdHeader != null) {
SipEvent sipEvent = sipSubscribe.getSubscribe(callIdHeader.getCallId()); SipEvent sipEvent = sipSubscribe.getSubscribe(callIdHeader.getCallId());
if (sipEvent != null && sipEvent.getOkEvent() != null) { if (sipEvent != null) {
SipSubscribe.EventResult<ResponseEvent> eventResult = new SipSubscribe.EventResult<>(responseEvent); if (sipEvent.getOkEvent() != null) {
SipSubscribe.EventResult<ResponseEvent> eventResult = new SipSubscribe.EventResult<>(responseEvent);
sipEvent.getOkEvent().response(eventResult);
}
sipSubscribe.removeSubscribe(callIdHeader.getCallId()); sipSubscribe.removeSubscribe(callIdHeader.getCallId());
sipEvent.getOkEvent().response(eventResult);
} }
} }
} }
@ -118,10 +120,12 @@ public class SIPProcessorObserver implements ISIPProcessorObserver {
CallIdHeader callIdHeader = (CallIdHeader)responseEvent.getResponse().getHeader(CallIdHeader.NAME); CallIdHeader callIdHeader = (CallIdHeader)responseEvent.getResponse().getHeader(CallIdHeader.NAME);
if (callIdHeader != null) { if (callIdHeader != null) {
SipEvent sipEvent = sipSubscribe.getSubscribe(callIdHeader.getCallId()); SipEvent sipEvent = sipSubscribe.getSubscribe(callIdHeader.getCallId());
if (sipEvent != null && sipEvent.getErrorEvent() != null) { if (sipEvent != null ) {
SipSubscribe.EventResult eventResult = new SipSubscribe.EventResult(responseEvent); if (sipEvent.getErrorEvent() != null) {
SipSubscribe.EventResult<ResponseEvent> eventResult = new SipSubscribe.EventResult<>(responseEvent);
sipEvent.getErrorEvent().response(eventResult);
}
sipSubscribe.removeSubscribe(callIdHeader.getCallId()); sipSubscribe.removeSubscribe(callIdHeader.getCallId());
sipEvent.getErrorEvent().response(eventResult);
} }
} }
} }

View File

@ -72,17 +72,18 @@ public class SIPSender {
if (okEvent != null || errorEvent != null) { if (okEvent != null || errorEvent != null) {
CallIdHeader callIdHeader = (CallIdHeader) message.getHeader(CallIdHeader.NAME); CallIdHeader callIdHeader = (CallIdHeader) message.getHeader(CallIdHeader.NAME);
SipEvent sipEvent = SipEvent.getInstance(callIdHeader.getCallId(), eventResult -> { SipEvent sipEvent = SipEvent.getInstance(callIdHeader.getCallId(), eventResult -> {
sipSubscribe.removeSubscribe(eventResult.callId); sipSubscribe.removeSubscribe(callIdHeader.getCallId());
if(okEvent != null) { if(okEvent != null) {
okEvent.response(eventResult); okEvent.response(eventResult);
} }
}, (eventResult -> { }, (eventResult -> {
sipSubscribe.removeSubscribe(eventResult.callId); sipSubscribe.removeSubscribe(callIdHeader.getCallId());
if (errorEvent != null) { if (errorEvent != null) {
errorEvent.response(eventResult); errorEvent.response(eventResult);
} }
}), timeout == null ? sipConfig.getTimeout() : timeout); }), timeout == null ? sipConfig.getTimeout() : timeout);
sipSubscribe.addSubscribe(callIdHeader.getCallId(), sipEvent); sipSubscribe.addSubscribe(callIdHeader.getCallId(), sipEvent);
System.out.println("订阅数量" + sipSubscribe.size());
} }
if ("TCP".equals(transport)) { if ("TCP".equals(transport)) {