From 91e671ec4b35649c2d7fd083d044c52f5fb6041e Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: Sat, 19 Oct 2024 20:27:42 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E5=A4=8D=E6=B6=88=E6=81=AF=E8=B6=85?= =?UTF-8?q?=E6=97=B6=E6=9C=AA=E5=9B=9E=E5=A4=8D=E8=AF=AF=E6=8A=A5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../iot/vmp/gb28181/event/SipSubscribe.java | 7 ++++++- .../iot/vmp/gb28181/event/sip/SipEvent.java | 4 ++-- .../gb28181/transmit/SIPProcessorObserver.java | 16 ++++++++++------ .../iot/vmp/gb28181/transmit/SIPSender.java | 5 +++-- 4 files changed, 21 insertions(+), 11 deletions(-) diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/event/SipSubscribe.java b/src/main/java/com/genersoft/iot/vmp/gb28181/event/SipSubscribe.java index 455be8df..1e00c462 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/event/SipSubscribe.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/event/SipSubscribe.java @@ -29,7 +29,7 @@ public class SipSubscribe { private final DelayQueue delayQueue = new DelayQueue<>(); - @Scheduled(fixedRate = 200) //每200毫秒执行 + @Scheduled(fixedDelay = 200) //每200毫秒执行 public void execute(){ if (delayQueue.isEmpty()) { return; @@ -153,6 +153,7 @@ public class SipSubscribe { public void addSubscribe(String key, SipEvent event) { + System.out.println(event.getDelay()); SipEvent sipEvent = subscribes.get(key); if (sipEvent != null) { subscribes.remove(key); @@ -180,4 +181,8 @@ public class SipSubscribe { public boolean isEmpty(){ return subscribes.isEmpty(); } + + public Integer size() { + return subscribes.size(); + } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/event/sip/SipEvent.java b/src/main/java/com/genersoft/iot/vmp/gb28181/event/sip/SipEvent.java index 45b2626f..e6af35ca 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/event/sip/SipEvent.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/event/sip/SipEvent.java @@ -23,7 +23,7 @@ public class SipEvent implements Delayed { private SipSubscribe.Event errorEvent; /** - * 超时时间 + * 超时时间(单位: 毫秒) */ private long delay; @@ -38,7 +38,7 @@ public class SipEvent implements Delayed { @Override public long getDelay(@NotNull TimeUnit unit) { - return unit.convert(delay - System.currentTimeMillis(),TimeUnit.MILLISECONDS); + return unit.convert(delay, TimeUnit.MILLISECONDS); } @Override diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/SIPProcessorObserver.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/SIPProcessorObserver.java index 5c203757..efc1d91d 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/SIPProcessorObserver.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/SIPProcessorObserver.java @@ -97,10 +97,12 @@ public class SIPProcessorObserver implements ISIPProcessorObserver { CallIdHeader callIdHeader = response.getCallIdHeader(); if (callIdHeader != null) { SipEvent sipEvent = sipSubscribe.getSubscribe(callIdHeader.getCallId()); - if (sipEvent != null && sipEvent.getOkEvent() != null) { - SipSubscribe.EventResult eventResult = new SipSubscribe.EventResult<>(responseEvent); + if (sipEvent != null) { + if (sipEvent.getOkEvent() != null) { + SipSubscribe.EventResult eventResult = new SipSubscribe.EventResult<>(responseEvent); + sipEvent.getOkEvent().response(eventResult); + } sipSubscribe.removeSubscribe(callIdHeader.getCallId()); - sipEvent.getOkEvent().response(eventResult); } } } @@ -118,10 +120,12 @@ public class SIPProcessorObserver implements ISIPProcessorObserver { CallIdHeader callIdHeader = (CallIdHeader)responseEvent.getResponse().getHeader(CallIdHeader.NAME); if (callIdHeader != null) { SipEvent sipEvent = sipSubscribe.getSubscribe(callIdHeader.getCallId()); - if (sipEvent != null && sipEvent.getErrorEvent() != null) { - SipSubscribe.EventResult eventResult = new SipSubscribe.EventResult(responseEvent); + if (sipEvent != null ) { + if (sipEvent.getErrorEvent() != null) { + SipSubscribe.EventResult eventResult = new SipSubscribe.EventResult<>(responseEvent); + sipEvent.getErrorEvent().response(eventResult); + } sipSubscribe.removeSubscribe(callIdHeader.getCallId()); - sipEvent.getErrorEvent().response(eventResult); } } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/SIPSender.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/SIPSender.java index d894bb6f..a11e0fb9 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/SIPSender.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/SIPSender.java @@ -72,17 +72,18 @@ public class SIPSender { if (okEvent != null || errorEvent != null) { CallIdHeader callIdHeader = (CallIdHeader) message.getHeader(CallIdHeader.NAME); SipEvent sipEvent = SipEvent.getInstance(callIdHeader.getCallId(), eventResult -> { - sipSubscribe.removeSubscribe(eventResult.callId); + sipSubscribe.removeSubscribe(callIdHeader.getCallId()); if(okEvent != null) { okEvent.response(eventResult); } }, (eventResult -> { - sipSubscribe.removeSubscribe(eventResult.callId); + sipSubscribe.removeSubscribe(callIdHeader.getCallId()); if (errorEvent != null) { errorEvent.response(eventResult); } }), timeout == null ? sipConfig.getTimeout() : timeout); sipSubscribe.addSubscribe(callIdHeader.getCallId(), sipEvent); + System.out.println("订阅数量" + sipSubscribe.size()); } if ("TCP".equals(transport)) {