From 2492b0d63887ad80854dc8883c9d84f5dc6727d3 Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: Wed, 16 Oct 2024 17:17:17 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E5=A4=8D=20SiP=E6=B6=88=E6=81=AF?= =?UTF-8?q?=E8=B6=85=E6=97=B6=E6=9C=AA=E6=9C=AA=E5=9B=9E=E5=A4=8D=E6=97=A0?= =?UTF-8?q?=E6=B3=95=E8=AF=86=E5=88=AB=E7=9A=84BUG?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../com/genersoft/iot/vmp/conf/SipConfig.java | 83 +---------- .../genersoft/iot/vmp/gb28181/SipLayer.java | 2 +- .../iot/vmp/gb28181/event/SipSubscribe.java | 105 ++++++-------- .../iot/vmp/gb28181/event/sip/SipEvent.java | 48 +++++++ .../service/impl/PlatformServiceImpl.java | 4 +- .../gb28181/service/impl/PlayServiceImpl.java | 10 +- .../transmit/SIPProcessorObserver.java | 86 ++++++------ .../iot/vmp/gb28181/transmit/SIPSender.java | 132 ++++++++++-------- .../gb28181/transmit/cmd/ISIPCommander.java | 8 +- .../transmit/cmd/impl/SIPCommander.java | 20 +-- .../cmd/impl/SIPCommanderForPlatform.java | 42 +++--- .../impl/message/MessageRequestProcessor.java | 6 +- .../event/response/ISIPResponseProcessor.java | 3 + .../impl/InviteResponseProcessor.java | 2 - .../timeout/impl/TimeoutProcessorImpl.java | 12 +- src/main/resources/配置详情.yml | 2 + 16 files changed, 272 insertions(+), 293 deletions(-) create mode 100644 src/main/java/com/genersoft/iot/vmp/gb28181/event/sip/SipEvent.java diff --git a/src/main/java/com/genersoft/iot/vmp/conf/SipConfig.java b/src/main/java/com/genersoft/iot/vmp/conf/SipConfig.java index 4b85e0da..4742daa5 100644 --- a/src/main/java/com/genersoft/iot/vmp/conf/SipConfig.java +++ b/src/main/java/com/genersoft/iot/vmp/conf/SipConfig.java @@ -1,13 +1,15 @@ package com.genersoft.iot.vmp.conf; -import org.springframework.core.annotation.Order; +import lombok.Data; import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.core.annotation.Order; import org.springframework.stereotype.Component; @Component @ConfigurationProperties(prefix = "sip", ignoreInvalidFields = true) @Order(0) +@Data public class SipConfig { private String ip; @@ -26,82 +28,7 @@ public class SipConfig { Integer registerTimeInterval = 120; - private boolean alarm; + private boolean alarm = false; - public void setIp(String ip) { - this.ip = ip; - } - - public void setPort(Integer port) { - this.port = port; - } - - public void setDomain(String domain) { - this.domain = domain; - } - - public void setId(String id) { - this.id = id; - } - - public void setPassword(String password) { - this.password = password; - } - - public void setPtzSpeed(Integer ptzSpeed) { - this.ptzSpeed = ptzSpeed; - } - - - public void setRegisterTimeInterval(Integer registerTimeInterval) { - this.registerTimeInterval = registerTimeInterval; - } - - public String getIp() { - return ip; - } - - - public Integer getPort() { - return port; - } - - - public String getDomain() { - return domain; - } - - - public String getId() { - return id; - } - - public String getPassword() { - return password; - } - - - public Integer getPtzSpeed() { - return ptzSpeed; - } - - public Integer getRegisterTimeInterval() { - return registerTimeInterval; - } - - public boolean isAlarm() { - return alarm; - } - - public void setAlarm(boolean alarm) { - this.alarm = alarm; - } - - public String getShowIp() { - return showIp; - } - - public void setShowIp(String showIp) { - this.showIp = showIp; - } + private long timeout = 15; } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/SipLayer.java b/src/main/java/com/genersoft/iot/vmp/gb28181/SipLayer.java index 3331baca..cbbb2959 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/SipLayer.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/SipLayer.java @@ -125,7 +125,7 @@ public class SipLayer implements CommandLineRunner { SipProviderImpl udpSipProvider = (SipProviderImpl)sipStack.createSipProvider(udpListeningPoint); udpSipProvider.addSipListener(sipProcessorObserver); - + udpSipProvider.setDialogErrorsAutomaticallyHandled(); udpSipProviderMap.put(monitorIp, udpSipProvider); log.info("[SIP SERVER] udp://{}:{} 启动成功", monitorIp, port); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/event/SipSubscribe.java b/src/main/java/com/genersoft/iot/vmp/gb28181/event/SipSubscribe.java index a1a1a153..455be8df 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/event/SipSubscribe.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/event/SipSubscribe.java @@ -1,6 +1,7 @@ package com.genersoft.iot.vmp.gb28181.event; import com.genersoft.iot.vmp.gb28181.bean.DeviceNotFoundEvent; +import com.genersoft.iot.vmp.gb28181.event.sip.SipEvent; import gov.nist.javax.sip.message.SIPRequest; import gov.nist.javax.sip.message.SIPResponse; import lombok.extern.slf4j.Slf4j; @@ -13,10 +14,9 @@ import javax.sip.ResponseEvent; import javax.sip.TimeoutEvent; import javax.sip.TransactionTerminatedEvent; import javax.sip.header.WarningHeader; -import java.time.Instant; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.DelayQueue; /** * @author lin @@ -25,41 +25,36 @@ import java.util.concurrent.TimeUnit; @Component public class SipSubscribe { - private final Map errorSubscribes = new ConcurrentHashMap<>(); + private final Map subscribes = new ConcurrentHashMap<>(); - private final Map okSubscribes = new ConcurrentHashMap<>(); + private final DelayQueue delayQueue = new DelayQueue<>(); - private final Map okTimeSubscribes = new ConcurrentHashMap<>(); - - private final Map errorTimeSubscribes = new ConcurrentHashMap<>(); - - // @Scheduled(cron="*/5 * * * * ?") //每五秒执行一次 - // @Scheduled(fixedRate= 100 * 60 * 60 ) - @Scheduled(cron="0 0/5 * * * ?") //每5分钟执行一次 + @Scheduled(fixedRate = 200) //每200毫秒执行 public void execute(){ - if(log.isDebugEnabled()){ - log.info("[定时任务] 清理过期的SIP订阅信息"); + if (delayQueue.isEmpty()) { + return; } - - Instant instant = Instant.now().minusMillis(TimeUnit.MINUTES.toMillis(5)); - - for (String key : okTimeSubscribes.keySet()) { - if (okTimeSubscribes.get(key).isBefore(instant)){ - okSubscribes.remove(key); - okTimeSubscribes.remove(key); + try { + SipEvent take = delayQueue.take(); + // 出现超时异常 + if(take.getErrorEvent() != null) { + EventResult eventResult = new EventResult<>(); + eventResult.type = EventResultType.timeout; + eventResult.msg = "消息超时未回复"; + eventResult.statusCode = -1024; + take.getErrorEvent().response(eventResult); } + subscribes.remove(take.getKey()); + } catch (InterruptedException e) { + throw new RuntimeException(e); } - for (String key : errorTimeSubscribes.keySet()) { - if (errorTimeSubscribes.get(key).isBefore(instant)){ - errorSubscribes.remove(key); - errorTimeSubscribes.remove(key); - } - } - if(log.isDebugEnabled()){ - log.debug("okTimeSubscribes.size:{}",okTimeSubscribes.size()); - log.debug("okSubscribes.size:{}",okSubscribes.size()); - log.debug("errorTimeSubscribes.size:{}",errorTimeSubscribes.size()); - log.debug("errorSubscribes.size:{}",errorSubscribes.size()); + } + + public void updateTimeout(String callId) { + SipEvent sipEvent = subscribes.get(callId); + if (sipEvent != null) { + delayQueue.remove(sipEvent); + delayQueue.offer(sipEvent); } } @@ -156,43 +151,33 @@ public class SipSubscribe { } } - public void addErrorSubscribe(String key, SipSubscribe.Event event) { - errorSubscribes.put(key, event); - errorTimeSubscribes.put(key, Instant.now()); + + public void addSubscribe(String key, SipEvent event) { + SipEvent sipEvent = subscribes.get(key); + if (sipEvent != null) { + subscribes.remove(key); + delayQueue.remove(sipEvent); + } + subscribes.put(key, event); + delayQueue.offer(event); } - public void addOkSubscribe(String key, SipSubscribe.Event event) { - okSubscribes.put(key, event); - okTimeSubscribes.put(key, Instant.now()); + public SipEvent getSubscribe(String key) { + return subscribes.get(key); } - public SipSubscribe.Event getErrorSubscribe(String key) { - return errorSubscribes.get(key); - } - - public void removeErrorSubscribe(String key) { + public void removeSubscribe(String key) { if(key == null){ return; } - errorSubscribes.remove(key); - errorTimeSubscribes.remove(key); - } - - public SipSubscribe.Event getOkSubscribe(String key) { - return okSubscribes.get(key); - } - - public void removeOkSubscribe(String key) { - if(key == null){ - return; + SipEvent sipEvent = subscribes.get(key); + if (sipEvent != null) { + subscribes.remove(key); + delayQueue.remove(sipEvent); } - okSubscribes.remove(key); - okTimeSubscribes.remove(key); } - public int getErrorSubscribesSize(){ - return errorSubscribes.size(); - } - public int getOkSubscribesSize(){ - return okSubscribes.size(); + + public boolean isEmpty(){ + return subscribes.isEmpty(); } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/event/sip/SipEvent.java b/src/main/java/com/genersoft/iot/vmp/gb28181/event/sip/SipEvent.java new file mode 100644 index 00000000..45b2626f --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/event/sip/SipEvent.java @@ -0,0 +1,48 @@ +package com.genersoft.iot.vmp.gb28181.event.sip; + +import com.genersoft.iot.vmp.gb28181.event.SipSubscribe; +import lombok.Data; +import org.jetbrains.annotations.NotNull; + +import java.util.concurrent.Delayed; +import java.util.concurrent.TimeUnit; + +@Data +public class SipEvent implements Delayed { + + private String key; + + /** + * 成功的回调 + */ + private SipSubscribe.Event okEvent; + + /** + * 错误的回调,包括超时 + */ + private SipSubscribe.Event errorEvent; + + /** + * 超时时间 + */ + private long delay; + + public static SipEvent getInstance(String key, SipSubscribe.Event okEvent, SipSubscribe.Event errorEvent, long delay) { + SipEvent sipEvent = new SipEvent(); + sipEvent.setKey(key); + sipEvent.setOkEvent(okEvent); + sipEvent.setErrorEvent(errorEvent); + sipEvent.setDelay(delay); + return sipEvent; + } + + @Override + public long getDelay(@NotNull TimeUnit unit) { + return unit.convert(delay - System.currentTimeMillis(),TimeUnit.MILLISECONDS); + } + + @Override + public int compareTo(@NotNull Delayed o) { + return (int) (this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS)); + } +} diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlatformServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlatformServiceImpl.java index 6f6730d4..101712bb 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlatformServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlatformServiceImpl.java @@ -380,7 +380,9 @@ public class PlatformServiceImpl implements IPlatformService { commanderForPlatform.register(platform, sipTransactionInfo, eventResult -> { log.info("[国标级联] 平台:{}注册失败,{}:{}", platform.getServerGBId(), eventResult.statusCode, eventResult.msg); - offline(platform, false); + if (platform.isStatus()) { + offline(platform, false); + } }, null); } catch (Exception e) { log.error("[命令发送失败] 国标级联定时注册: {}", e.getMessage()); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlayServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlayServiceImpl.java index 666c4fa2..f5076be9 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlayServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlayServiceImpl.java @@ -436,7 +436,7 @@ public class PlayServiceImpl implements IPlayService { // 处理收到200ok后的TCP主动连接以及SSRC不一致的问题 InviteOKHandler(eventResult, ssrcInfo, mediaServerItem, device, channel, callback, inviteInfo, InviteSessionType.PLAY); }, (event) -> { - log.info("[点播失败] deviceId: {}, channelId:{}, {}: {}", device.getDeviceId(), channel.getDeviceId(), event.statusCode, event.msg); + log.info("[点播失败]{}:{} deviceId: {}, channelId:{}",event.statusCode, event.msg, device.getDeviceId(), channel.getDeviceId()); receiveRtpServerService.closeRTPServer(mediaServerItem, ssrcInfo); sessionManager.removeByStream(ssrcInfo.getStream()); @@ -447,7 +447,7 @@ public class PlayServiceImpl implements IPlayService { event.statusCode, event.msg, null); inviteStreamService.removeInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, channel.getId()); - }); + }, userSetting.getPlayTimeout().longValue()); } catch (InvalidArgumentException | SipException | ParseException e) { log.error("[命令发送失败] 点播消息: {}", e.getMessage()); receiveRtpServerService.closeRTPServer(mediaServerItem, ssrcInfo); @@ -565,7 +565,7 @@ public class PlayServiceImpl implements IPlayService { mediaServerService.releaseSsrc(mediaServerItem.getId(), sendRtpInfo.getSsrc()); sessionManager.removeByStream(sendRtpInfo.getStream()); errorEvent.response(event); - }); + }, userSetting.getPlayTimeout().longValue()); } catch (InvalidArgumentException | SipException | ParseException e) { log.error("[命令发送失败] 对讲消息: {}", e.getMessage()); @@ -820,7 +820,7 @@ public class PlayServiceImpl implements IPlayService { receiveRtpServerService.closeRTPServer(mediaServerItem, ssrcInfo); sessionManager.removeByStream(ssrcInfo.getStream()); inviteStreamService.removeInviteInfo(inviteInfo); - }); + }, userSetting.getPlayTimeout().longValue()); } catch (InvalidArgumentException | SipException | ParseException e) { log.error("[命令发送失败] 录像回放: {}", e.getMessage()); if (callback != null) { @@ -1044,7 +1044,7 @@ public class PlayServiceImpl implements IPlayService { // 设置过期时间,下载失败时自动处理订阅数据 hook.setExpireTime(System.currentTimeMillis() + 24 * 60 * 60 * 1000); subscribe.addSubscribe(hook, hookEventForRecord); - }); + }, userSetting.getPlayTimeout().longValue()); } catch (InvalidArgumentException | SipException | ParseException e) { log.error("[命令发送失败] 录像下载: {}", e.getMessage()); callback.run(InviteErrorCode.FAIL.getCode(),e.getMessage(), null); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/SIPProcessorObserver.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/SIPProcessorObserver.java index faf24232..5c203757 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/SIPProcessorObserver.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/SIPProcessorObserver.java @@ -2,18 +2,18 @@ package com.genersoft.iot.vmp.gb28181.transmit; import com.genersoft.iot.vmp.gb28181.event.EventPublisher; import com.genersoft.iot.vmp.gb28181.event.SipSubscribe; +import com.genersoft.iot.vmp.gb28181.event.sip.SipEvent; import com.genersoft.iot.vmp.gb28181.transmit.event.request.ISIPRequestProcessor; import com.genersoft.iot.vmp.gb28181.transmit.event.response.ISIPResponseProcessor; import com.genersoft.iot.vmp.gb28181.transmit.event.timeout.ITimeoutProcessor; +import gov.nist.javax.sip.message.SIPResponse; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Component; import javax.sip.*; -import javax.sip.header.CSeqHeader; import javax.sip.header.CallIdHeader; -import javax.sip.message.Request; import javax.sip.message.Response; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -88,40 +88,40 @@ public class SIPProcessorObserver implements ISIPProcessorObserver { @Override @Async("taskExecutor") public void processResponse(ResponseEvent responseEvent) { - Response response = responseEvent.getResponse(); + SIPResponse response = (SIPResponse)responseEvent.getResponse(); int status = response.getStatusCode(); // Success if (((status >= Response.OK) && (status < Response.MULTIPLE_CHOICES)) || status == Response.UNAUTHORIZED) { - CSeqHeader cseqHeader = (CSeqHeader) responseEvent.getResponse().getHeader(CSeqHeader.NAME); - String method = cseqHeader.getMethod(); - ISIPResponseProcessor sipRequestProcessor = responseProcessorMap.get(method); - if (sipRequestProcessor != null) { - sipRequestProcessor.process(responseEvent); - } - if (status != Response.UNAUTHORIZED && responseEvent.getResponse() != null && sipSubscribe.getOkSubscribesSize() > 0 ) { - CallIdHeader callIdHeader = (CallIdHeader)responseEvent.getResponse().getHeader(CallIdHeader.NAME); + if (status != Response.UNAUTHORIZED && responseEvent.getResponse() != null && !sipSubscribe.isEmpty() ) { + CallIdHeader callIdHeader = response.getCallIdHeader(); if (callIdHeader != null) { - SipSubscribe.Event subscribe = sipSubscribe.getOkSubscribe(callIdHeader.getCallId()); - if (subscribe != null) { - SipSubscribe.EventResult eventResult = new SipSubscribe.EventResult(responseEvent); - sipSubscribe.removeOkSubscribe(callIdHeader.getCallId()); - subscribe.response(eventResult); + SipEvent sipEvent = sipSubscribe.getSubscribe(callIdHeader.getCallId()); + if (sipEvent != null && sipEvent.getOkEvent() != null) { + SipSubscribe.EventResult eventResult = new SipSubscribe.EventResult<>(responseEvent); + sipSubscribe.removeSubscribe(callIdHeader.getCallId()); + sipEvent.getOkEvent().response(eventResult); } } } + ISIPResponseProcessor sipRequestProcessor = responseProcessorMap.get(response.getCSeqHeader().getMethod()); + if (sipRequestProcessor != null) { + sipRequestProcessor.process(responseEvent); + } } else if ((status >= Response.TRYING) && (status < Response.OK)) { // 增加其它无需回复的响应,如101、180等 + // 更新sip订阅的时间 +// sipSubscribe.updateTimeout(response.getCallIdHeader().getCallId()); } else { log.warn("接收到失败的response响应!status:" + status + ",message:" + response.getReasonPhrase()); - if (responseEvent.getResponse() != null && sipSubscribe.getErrorSubscribesSize() > 0 ) { + if (responseEvent.getResponse() != null && !sipSubscribe.isEmpty() ) { CallIdHeader callIdHeader = (CallIdHeader)responseEvent.getResponse().getHeader(CallIdHeader.NAME); if (callIdHeader != null) { - SipSubscribe.Event subscribe = sipSubscribe.getErrorSubscribe(callIdHeader.getCallId()); - if (subscribe != null) { + SipEvent sipEvent = sipSubscribe.getSubscribe(callIdHeader.getCallId()); + if (sipEvent != null && sipEvent.getErrorEvent() != null) { SipSubscribe.EventResult eventResult = new SipSubscribe.EventResult(responseEvent); - subscribe.response(eventResult); - sipSubscribe.removeErrorSubscribe(callIdHeader.getCallId()); + sipSubscribe.removeSubscribe(callIdHeader.getCallId()); + sipEvent.getErrorEvent().response(eventResult); } } } @@ -140,27 +140,27 @@ public class SIPProcessorObserver implements ISIPProcessorObserver { @Override public void processTimeout(TimeoutEvent timeoutEvent) { log.info("[消息发送超时]"); - ClientTransaction clientTransaction = timeoutEvent.getClientTransaction(); - - if (clientTransaction != null) { - log.info("[发送错误订阅] clientTransaction != null"); - Request request = clientTransaction.getRequest(); - if (request != null) { - log.info("[发送错误订阅] request != null"); - CallIdHeader callIdHeader = (CallIdHeader) request.getHeader(CallIdHeader.NAME); - if (callIdHeader != null) { - log.info("[发送错误订阅]"); - SipSubscribe.Event subscribe = sipSubscribe.getErrorSubscribe(callIdHeader.getCallId()); - SipSubscribe.EventResult eventResult = new SipSubscribe.EventResult(timeoutEvent); - if (subscribe != null){ - subscribe.response(eventResult); - } - sipSubscribe.removeOkSubscribe(callIdHeader.getCallId()); - sipSubscribe.removeErrorSubscribe(callIdHeader.getCallId()); - } - } - } - eventPublisher.requestTimeOut(timeoutEvent); +// ClientTransaction clientTransaction = timeoutEvent.getClientTransaction(); +// +// if (clientTransaction != null) { +// log.info("[发送错误订阅] clientTransaction != null"); +// Request request = clientTransaction.getRequest(); +// if (request != null) { +// log.info("[发送错误订阅] request != null"); +// CallIdHeader callIdHeader = (CallIdHeader) request.getHeader(CallIdHeader.NAME); +// if (callIdHeader != null) { +// log.info("[发送错误订阅]"); +// SipSubscribe.Event subscribe = sipSubscribe.getErrorSubscribe(callIdHeader.getCallId()); +// SipSubscribe.EventResult eventResult = new SipSubscribe.EventResult(timeoutEvent); +// if (subscribe != null){ +// subscribe.response(eventResult); +// } +// sipSubscribe.removeOkSubscribe(callIdHeader.getCallId()); +// sipSubscribe.removeErrorSubscribe(callIdHeader.getCallId()); +// } +// } +// } +// eventPublisher.requestTimeOut(timeoutEvent); } @Override @@ -199,4 +199,6 @@ public class SIPProcessorObserver implements ISIPProcessorObserver { } + + } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/SIPSender.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/SIPSender.java index 1dea8882..d894bb6f 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/SIPSender.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/SIPSender.java @@ -1,7 +1,9 @@ package com.genersoft.iot.vmp.gb28181.transmit; +import com.genersoft.iot.vmp.conf.SipConfig; import com.genersoft.iot.vmp.gb28181.SipLayer; import com.genersoft.iot.vmp.gb28181.event.SipSubscribe; +import com.genersoft.iot.vmp.gb28181.event.sip.SipEvent; import com.genersoft.iot.vmp.gb28181.utils.SipUtils; import com.genersoft.iot.vmp.utils.GitUtil; import gov.nist.javax.sip.SipProviderImpl; @@ -21,6 +23,7 @@ import java.text.ParseException; /** * 发送SIP消息 + * * @author lin */ @Slf4j @@ -35,75 +38,80 @@ public class SIPSender { @Autowired private SipSubscribe sipSubscribe; + @Autowired + private SipConfig sipConfig; public void transmitRequest(String ip, Message message) throws SipException, ParseException { - transmitRequest(ip, message, null, null); + transmitRequest(ip, message, null, null, null); } public void transmitRequest(String ip, Message message, SipSubscribe.Event errorEvent) throws SipException, ParseException { - transmitRequest(ip, message, errorEvent, null); + transmitRequest(ip, message, errorEvent, null, null); } public void transmitRequest(String ip, Message message, SipSubscribe.Event errorEvent, SipSubscribe.Event okEvent) throws SipException { - ViaHeader viaHeader = (ViaHeader)message.getHeader(ViaHeader.NAME); - String transport = "UDP"; - if (viaHeader == null) { - log.warn("[消息头缺失]: ViaHeader, 使用默认的UDP方式处理数据"); - }else { - transport = viaHeader.getTransport(); - } - if (message.getHeader(UserAgentHeader.NAME) == null) { - try { - message.addHeader(SipUtils.createUserAgentHeader(gitUtil)); - } catch (ParseException e) { - log.error("添加UserAgentHeader失败", e); - } - } - - CallIdHeader callIdHeader = (CallIdHeader) message.getHeader(CallIdHeader.NAME); - // 添加错误订阅 - if (errorEvent != null) { - sipSubscribe.addErrorSubscribe(callIdHeader.getCallId(), (eventResult -> { - sipSubscribe.removeErrorSubscribe(eventResult.callId); - sipSubscribe.removeOkSubscribe(eventResult.callId); - errorEvent.response(eventResult); - })); - } - // 添加订阅 - if (okEvent != null) { - sipSubscribe.addOkSubscribe(callIdHeader.getCallId(), eventResult -> { - sipSubscribe.removeOkSubscribe(eventResult.callId); - sipSubscribe.removeErrorSubscribe(eventResult.callId); - okEvent.response(eventResult); - }); - } - if ("TCP".equals(transport)) { - SipProviderImpl tcpSipProvider = sipLayer.getTcpSipProvider(ip); - if (tcpSipProvider == null) { - log.error("[发送信息失败] 未找到tcp://{}的监听信息", ip); - return; - } - if (message instanceof Request) { - tcpSipProvider.sendRequest((Request)message); - }else if (message instanceof Response) { - tcpSipProvider.sendResponse((Response)message); - } - - } else if ("UDP".equals(transport)) { - SipProviderImpl sipProvider = sipLayer.getUdpSipProvider(ip); - if (sipProvider == null) { - log.error("[发送信息失败] 未找到udp://{}的监听信息", ip); - return; - } - if (message instanceof Request) { - sipProvider.sendRequest((Request)message); - }else if (message instanceof Response) { - sipProvider.sendResponse((Response)message); - } - } + transmitRequest(ip, message, errorEvent, okEvent, null); } - public CallIdHeader getNewCallIdHeader(String ip, String transport){ + public void transmitRequest(String ip, Message message, SipSubscribe.Event errorEvent, SipSubscribe.Event okEvent, Long timeout) throws SipException { + ViaHeader viaHeader = (ViaHeader) message.getHeader(ViaHeader.NAME); + String transport = "UDP"; + if (viaHeader == null) { + log.warn("[消息头缺失]: ViaHeader, 使用默认的UDP方式处理数据"); + } else { + transport = viaHeader.getTransport(); + } + if (message.getHeader(UserAgentHeader.NAME) == null) { + try { + message.addHeader(SipUtils.createUserAgentHeader(gitUtil)); + } catch (ParseException e) { + log.error("添加UserAgentHeader失败", e); + } + } + + if (okEvent != null || errorEvent != null) { + CallIdHeader callIdHeader = (CallIdHeader) message.getHeader(CallIdHeader.NAME); + SipEvent sipEvent = SipEvent.getInstance(callIdHeader.getCallId(), eventResult -> { + sipSubscribe.removeSubscribe(eventResult.callId); + if(okEvent != null) { + okEvent.response(eventResult); + } + }, (eventResult -> { + sipSubscribe.removeSubscribe(eventResult.callId); + if (errorEvent != null) { + errorEvent.response(eventResult); + } + }), timeout == null ? sipConfig.getTimeout() : timeout); + sipSubscribe.addSubscribe(callIdHeader.getCallId(), sipEvent); + } + + if ("TCP".equals(transport)) { + SipProviderImpl tcpSipProvider = sipLayer.getTcpSipProvider(ip); + if (tcpSipProvider == null) { + log.error("[发送信息失败] 未找到tcp://{}的监听信息", ip); + return; + } + if (message instanceof Request) { + tcpSipProvider.sendRequest((Request) message); + } else if (message instanceof Response) { + tcpSipProvider.sendResponse((Response) message); + } + + } else if ("UDP".equals(transport)) { + SipProviderImpl sipProvider = sipLayer.getUdpSipProvider(ip); + if (sipProvider == null) { + log.error("[发送信息失败] 未找到udp://{}的监听信息", ip); + return; + } + if (message instanceof Request) { + sipProvider.sendRequest((Request) message); + } else if (message instanceof Response) { + sipProvider.sendResponse((Response) message); + } + } + } + + public CallIdHeader getNewCallIdHeader(String ip, String transport) { if (ObjectUtils.isEmpty(transport)) { return sipLayer.getUdpSipProvider().getNewCallId(); } @@ -111,7 +119,7 @@ public class SIPSender { if (ObjectUtils.isEmpty(ip)) { sipProvider = transport.equalsIgnoreCase("TCP") ? sipLayer.getTcpSipProvider() : sipLayer.getUdpSipProvider(); - }else { + } else { sipProvider = transport.equalsIgnoreCase("TCP") ? sipLayer.getTcpSipProvider(ip) : sipLayer.getUdpSipProvider(ip); } @@ -122,9 +130,11 @@ public class SIPSender { if (sipProvider != null) { return sipProvider.getNewCallId(); - }else { + } else { log.warn("[新建CallIdHeader失败], ip={}, transport={}", ip, transport); return null; } } + + } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommander.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommander.java index 70d5ae21..dca16ba8 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommander.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommander.java @@ -100,7 +100,7 @@ public interface ISIPCommander { * @param device 视频设备 * @param channel 预览通道 */ - void playStreamCmd(MediaServer mediaServerItem, SSRCInfo ssrcInfo, Device device, DeviceChannel channel, SipSubscribe.Event okEvent, SipSubscribe.Event errorEvent) throws InvalidArgumentException, SipException, ParseException; + void playStreamCmd(MediaServer mediaServerItem, SSRCInfo ssrcInfo, Device device, DeviceChannel channel, SipSubscribe.Event okEvent, SipSubscribe.Event errorEvent, Long timeout) throws InvalidArgumentException, SipException, ParseException; /** * 请求回放视频流 @@ -110,7 +110,7 @@ public interface ISIPCommander { * @param startTime 开始时间,格式要求:yyyy-MM-dd HH:mm:ss * @param endTime 结束时间,格式要求:yyyy-MM-dd HH:mm:ss */ - void playbackStreamCmd(MediaServer mediaServerItem, SSRCInfo ssrcInf, Device device, DeviceChannel channel, String startTime, String endTime, SipSubscribe.Event okEvent, SipSubscribe.Event errorEvent) throws InvalidArgumentException, SipException, ParseException; + void playbackStreamCmd(MediaServer mediaServerItem, SSRCInfo ssrcInf, Device device, DeviceChannel channel, String startTime, String endTime, SipSubscribe.Event okEvent, SipSubscribe.Event errorEvent, Long timeout) throws InvalidArgumentException, SipException, ParseException; /** * 请求历史媒体下载 @@ -123,7 +123,7 @@ public interface ISIPCommander { */ void downloadStreamCmd(MediaServer mediaServerItem, SSRCInfo ssrcInfo, Device device, DeviceChannel channel, String startTime, String endTime, int downloadSpeed, - SipSubscribe.Event errorEvent, SipSubscribe.Event okEvent) throws InvalidArgumentException, SipException, ParseException; + SipSubscribe.Event errorEvent, SipSubscribe.Event okEvent, Long timeout) throws InvalidArgumentException, SipException, ParseException; /** @@ -131,7 +131,7 @@ public interface ISIPCommander { */ void streamByeCmd(Device device, String channelId, String stream, String callId, SipSubscribe.Event okEvent) throws InvalidArgumentException, SipException, ParseException, SsrcTransactionNotFoundException; - void talkStreamCmd(MediaServer mediaServerItem, SendRtpInfo sendRtpItem, Device device, DeviceChannel channelId, String callId, HookSubscribe.Event event, HookSubscribe.Event eventForPush, SipSubscribe.Event okEvent, SipSubscribe.Event errorEvent) throws InvalidArgumentException, SipException, ParseException; + void talkStreamCmd(MediaServer mediaServerItem, SendRtpInfo sendRtpItem, Device device, DeviceChannel channelId, String callId, HookSubscribe.Event event, HookSubscribe.Event eventForPush, SipSubscribe.Event okEvent, SipSubscribe.Event errorEvent, Long timeout) throws InvalidArgumentException, SipException, ParseException; void streamByeCmd(Device device, String channelId, String stream, String callId) throws InvalidArgumentException, ParseException, SipException, SsrcTransactionNotFoundException; diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java index 52a43156..8cb38a10 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java @@ -262,7 +262,7 @@ public class SIPCommander implements ISIPCommander { */ @Override public void playStreamCmd(MediaServer mediaServerItem, SSRCInfo ssrcInfo, Device device, DeviceChannel channel, - SipSubscribe.Event okEvent, SipSubscribe.Event errorEvent) throws InvalidArgumentException, SipException, ParseException { + SipSubscribe.Event okEvent, SipSubscribe.Event errorEvent, Long timeout) throws InvalidArgumentException, SipException, ParseException { String stream = ssrcInfo.getStream(); if (device == null) { @@ -335,8 +335,6 @@ public class SIPCommander implements ISIPCommander { // f字段:f= v/编码格式/分辨率/帧率/码率类型/码率大小a/编码格式/码率大小/采样率 // content.append("f=v/2/5/25/1/4000a/1/8/1" + "\r\n"); // 未发现支持此特性的设备 - - Request request = headerProvider.createInviteRequest(device, channel.getDeviceId(), content.toString(), SipUtils.getNewViaTag(), SipUtils.getNewFromTag(), null, ssrcInfo.getSsrc(),sipSender.getNewCallIdHeader(sipLayer.getLocalIp(device.getLocalIp()),device.getTransport())); sipSender.transmitRequest(sipLayer.getLocalIp(device.getLocalIp()), request, (e -> { sessionManager.removeByStream(ssrcInfo.getStream()); @@ -350,7 +348,7 @@ public class SIPCommander implements ISIPCommander { InviteSessionType.PLAY); sessionManager.put(ssrcTransaction); okEvent.response(e); - }); + }, timeout); } /** @@ -364,7 +362,7 @@ public class SIPCommander implements ISIPCommander { @Override public void playbackStreamCmd(MediaServer mediaServerItem, SSRCInfo ssrcInfo, Device device, DeviceChannel channel, String startTime, String endTime, - SipSubscribe.Event okEvent, SipSubscribe.Event errorEvent) throws InvalidArgumentException, SipException, ParseException { + SipSubscribe.Event okEvent, SipSubscribe.Event errorEvent, Long timeout) throws InvalidArgumentException, SipException, ParseException { log.info("{} 分配的ZLM为: {} [{}:{}]", ssrcInfo.getStream(), mediaServerItem.getId(), mediaServerItem.getSdpIp(), ssrcInfo.getPort()); @@ -445,7 +443,7 @@ public class SIPCommander implements ISIPCommander { channel.getId(), sipSender.getNewCallIdHeader(sipLayer.getLocalIp(device.getLocalIp()),device.getTransport()).getCallId(), ssrcInfo.getStream(), ssrcInfo.getSsrc(), mediaServerItem.getId(), response, InviteSessionType.PLAYBACK); sessionManager.put(ssrcTransaction); okEvent.response(event); - }); + }, timeout); } /** @@ -454,7 +452,7 @@ public class SIPCommander implements ISIPCommander { @Override public void downloadStreamCmd(MediaServer mediaServerItem, SSRCInfo ssrcInfo, Device device, DeviceChannel channel, String startTime, String endTime, int downloadSpeed, - SipSubscribe.Event errorEvent, SipSubscribe.Event okEvent) throws InvalidArgumentException, SipException, ParseException { + SipSubscribe.Event errorEvent, SipSubscribe.Event okEvent, Long timeout) throws InvalidArgumentException, SipException, ParseException { log.info("[发送-请求历史媒体下载-命令] 流ID: {},节点为: {} [{}:{}]", ssrcInfo.getStream(), mediaServerItem.getId(), mediaServerItem.getSdpIp(), ssrcInfo.getPort()); String sdpIp; @@ -536,11 +534,13 @@ public class SIPCommander implements ISIPCommander { SsrcTransaction ssrcTransaction = SsrcTransaction.buildForDevice(device.getDeviceId(), channel.getId(), response.getCallIdHeader().getCallId(), ssrcInfo.getStream(), ssrc, mediaServerItem.getId(), response, InviteSessionType.DOWNLOAD); sessionManager.put(ssrcTransaction); okEvent.response(event); - }); + }, timeout); } @Override - public void talkStreamCmd(MediaServer mediaServerItem, SendRtpInfo sendRtpItem, Device device, DeviceChannel channel, String callId, HookSubscribe.Event event, HookSubscribe.Event eventForPush, SipSubscribe.Event okEvent, SipSubscribe.Event errorEvent) throws InvalidArgumentException, SipException, ParseException { + public void talkStreamCmd(MediaServer mediaServerItem, SendRtpInfo sendRtpItem, Device device, DeviceChannel channel, + String callId, HookSubscribe.Event event, HookSubscribe.Event eventForPush, SipSubscribe.Event okEvent, + SipSubscribe.Event errorEvent, Long timeout) throws InvalidArgumentException, SipException, ParseException { String stream = sendRtpItem.getStream(); @@ -601,7 +601,7 @@ public class SIPCommander implements ISIPCommander { SsrcTransaction ssrcTransaction = SsrcTransaction.buildForDevice(device.getDeviceId(), channel.getId(), "talk", stream, sendRtpItem.getSsrc(), mediaServerItem.getId(), response, InviteSessionType.TALK); sessionManager.put(ssrcTransaction); okEvent.response(e); - }); + }, timeout); } /** diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderForPlatform.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderForPlatform.java index 2608769e..87821ab0 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderForPlatform.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderForPlatform.java @@ -127,23 +127,19 @@ public class SIPCommanderForPlatform implements ISIPCommanderForPlatform { // 将 callid 写入缓存, 等注册成功可以更新状态 String callIdFromHeader = callIdHeader.getCallId(); redisCatchStorage.updatePlatformRegisterInfo(callIdFromHeader, PlatformRegisterInfo.getInstance(parentPlatform.getServerGBId(), isRegister)); - - sipSubscribe.addErrorSubscribe(callIdHeader.getCallId(), (event)->{ - if (event != null) { - log.info("向上级平台 [ {} ] 注册发生错误: {} ", - parentPlatform.getServerGBId(), - event.msg); - } - redisCatchStorage.delPlatformRegisterInfo(callIdFromHeader); - if (errorEvent != null ) { - errorEvent.response(event); - } - }); }else { request = headerProviderPlatformProvider.createRegisterRequest(parentPlatform, fromTag, toTag, www, callIdHeader, isRegister? parentPlatform.getExpires() : 0); } - sipSender.transmitRequest(parentPlatform.getDeviceIp(), request, null, okEvent); + sipSender.transmitRequest(parentPlatform.getDeviceIp(), request, (event)->{ + if (event != null) { + log.info("[国标级联]:{}, 注册失败: {} ", parentPlatform.getServerGBId(), event.msg); + } + redisCatchStorage.delPlatformRegisterInfo(callIdHeader.getCallId()); + if (errorEvent != null ) { + errorEvent.response(event); + } + }, okEvent, 5L); } @Override @@ -167,6 +163,7 @@ public class SIPCommanderForPlatform implements ISIPCommanderForPlatform { SipUtils.getNewFromTag(), SipUtils.getNewViaTag(), callIdHeader); + sipSender.transmitRequest(parentPlatform.getDeviceIp(), request, errorEvent, okEvent); return callIdHeader.getCallId(); } @@ -249,16 +246,17 @@ public class SIPCommanderForPlatform implements ISIPCommanderForPlatform { log.debug(catalogXml); if (sendAfterResponse) { // 默认按照收到200回复后发送下一条, 如果超时收不到回复,就以30毫秒的间隔直接发送。 - dynamicTask.startDelay(timeoutTaskKey, ()->{ - sipSubscribe.removeOkSubscribe(callId); - int indexNext = index + parentPlatform.getCatalogGroup(); - try { - sendCatalogResponse(channels, parentPlatform, sn, fromTag, indexNext, false); - } catch (SipException | InvalidArgumentException | ParseException e) { - log.error("[命令发送失败] 国标级联 目录查询回复: {}", e.getMessage()); - } - }, 3000); sipSender.transmitRequest(parentPlatform.getDeviceIp(), request, eventResult -> { + if (eventResult.type.equals(SipSubscribe.EventResultType.timeout)) { + // 消息发送超时, 以30毫秒的间隔直接发送 + int indexNext = index + parentPlatform.getCatalogGroup(); + try { + sendCatalogResponse(channels, parentPlatform, sn, fromTag, indexNext, false); + } catch (SipException | InvalidArgumentException | ParseException e) { + log.error("[命令发送失败] 国标级联 目录查询回复: {}", e.getMessage()); + } + return; + } log.error("[目录推送失败] 国标级联 platform : {}, code: {}, msg: {}, 停止发送", parentPlatform.getServerGBId(), eventResult.statusCode, eventResult.msg); dynamicTask.stop(timeoutTaskKey); }, eventResult -> { diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/MessageRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/MessageRequestProcessor.java index 44cb43cb..237eab16 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/MessageRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/MessageRequestProcessor.java @@ -5,6 +5,7 @@ import com.genersoft.iot.vmp.gb28181.bean.DeviceNotFoundEvent; import com.genersoft.iot.vmp.gb28181.bean.Platform; import com.genersoft.iot.vmp.gb28181.bean.SsrcTransaction; import com.genersoft.iot.vmp.gb28181.event.SipSubscribe; +import com.genersoft.iot.vmp.gb28181.event.sip.SipEvent; import com.genersoft.iot.vmp.gb28181.service.IPlatformService; import com.genersoft.iot.vmp.gb28181.session.SipInviteSessionManager; import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver; @@ -93,11 +94,12 @@ public class MessageRequestProcessor extends SIPRequestProcessorParent implement // 不存在则回复404 responseAck(request, Response.NOT_FOUND, "device "+ deviceId +" not found"); log.warn("[设备未找到 ]deviceId: {}, callId: {}", deviceId, callIdHeader.getCallId()); - if (sipSubscribe.getErrorSubscribe(callIdHeader.getCallId()) != null){ + SipEvent sipEvent = sipSubscribe.getSubscribe(callIdHeader.getCallId()); + if (sipEvent != null && sipEvent.getErrorEvent() != null){ DeviceNotFoundEvent deviceNotFoundEvent = new DeviceNotFoundEvent(evt.getDialog()); deviceNotFoundEvent.setCallId(callIdHeader.getCallId()); SipSubscribe.EventResult eventResult = new SipSubscribe.EventResult(deviceNotFoundEvent); - sipSubscribe.getErrorSubscribe(callIdHeader.getCallId()).response(eventResult); + sipEvent.getErrorEvent().response(eventResult); } }else { Element rootElement; diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/response/ISIPResponseProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/response/ISIPResponseProcessor.java index 50fb2021..ffa93f21 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/response/ISIPResponseProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/response/ISIPResponseProcessor.java @@ -1,5 +1,7 @@ package com.genersoft.iot.vmp.gb28181.transmit.event.response; +import org.springframework.scheduling.annotation.Async; + import javax.sip.ResponseEvent; /** @@ -9,6 +11,7 @@ import javax.sip.ResponseEvent; */ public interface ISIPResponseProcessor { + void process(ResponseEvent evt); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/response/impl/InviteResponseProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/response/impl/InviteResponseProcessor.java index 86a62443..13f97a4e 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/response/impl/InviteResponseProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/response/impl/InviteResponseProcessor.java @@ -38,14 +38,12 @@ public class InviteResponseProcessor extends SIPResponseProcessorAbstract { @Autowired private SIPProcessorObserver sipProcessorObserver; - @Autowired private SIPSender sipSender; @Autowired private SIPRequestHeaderProvider headerProvider; - @Override public void afterPropertiesSet() throws Exception { // 添加消息处理的订阅 diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/timeout/impl/TimeoutProcessorImpl.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/timeout/impl/TimeoutProcessorImpl.java index f1e5352f..c36a2e54 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/timeout/impl/TimeoutProcessorImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/timeout/impl/TimeoutProcessorImpl.java @@ -1,6 +1,7 @@ package com.genersoft.iot.vmp.gb28181.transmit.event.timeout.impl; import com.genersoft.iot.vmp.gb28181.event.SipSubscribe; +import com.genersoft.iot.vmp.gb28181.event.sip.SipEvent; import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver; import com.genersoft.iot.vmp.gb28181.transmit.event.timeout.ITimeoutProcessor; import lombok.extern.slf4j.Slf4j; @@ -32,11 +33,12 @@ public class TimeoutProcessorImpl implements InitializingBean, ITimeoutProcessor // TODO Auto-generated method stub CallIdHeader callIdHeader = event.getClientTransaction().getDialog().getCallId(); String callId = callIdHeader.getCallId(); - SipSubscribe.Event errorSubscribe = sipSubscribe.getErrorSubscribe(callId); - SipSubscribe.EventResult timeoutEventEventResult = new SipSubscribe.EventResult<>(event); - errorSubscribe.response(timeoutEventEventResult); - sipSubscribe.removeErrorSubscribe(callId); - sipSubscribe.removeOkSubscribe(callId); + SipEvent sipEvent = sipSubscribe.getSubscribe(callId); + if (sipEvent != null && sipEvent.getErrorEvent() != null) { + SipSubscribe.EventResult timeoutEventEventResult = new SipSubscribe.EventResult<>(event); + sipEvent.getErrorEvent().response(timeoutEventEventResult); + sipSubscribe.removeSubscribe(callId); + } } catch (Exception e) { log.error("[超时事件失败]: {}", e.getMessage()); } diff --git a/src/main/resources/配置详情.yml b/src/main/resources/配置详情.yml index 31db6807..2f8e8da1 100644 --- a/src/main/resources/配置详情.yml +++ b/src/main/resources/配置详情.yml @@ -123,6 +123,8 @@ sip: # keepalliveToOnline: false # 是否存储alarm信息 alarm: false + # 命令发送等待回复的超时时间, 单位:秒 + timeout: 15 # 做为JT1078服务器的配置 jt1078: