修复message和sip消息超时未回复的判断机制

master
lin 2025-02-12 20:54:20 +08:00
parent 5808c7aff5
commit 9618f23af8
13 changed files with 124 additions and 162 deletions

View File

@ -7,17 +7,11 @@
package com.genersoft.iot.vmp.gb28181.controller; package com.genersoft.iot.vmp.gb28181.controller;
import com.alibaba.fastjson2.JSONObject;
import com.genersoft.iot.vmp.conf.exception.ControllerException;
import com.genersoft.iot.vmp.conf.security.JwtUtils; import com.genersoft.iot.vmp.conf.security.JwtUtils;
import com.genersoft.iot.vmp.gb28181.bean.BasicParam; import com.genersoft.iot.vmp.gb28181.bean.BasicParam;
import com.genersoft.iot.vmp.gb28181.bean.Device; import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.service.IDeviceService; import com.genersoft.iot.vmp.gb28181.service.IDeviceService;
import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder;
import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander;
import com.genersoft.iot.vmp.vmanager.bean.ErrorCode; import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
import com.genersoft.iot.vmp.vmanager.bean.StreamContent;
import com.genersoft.iot.vmp.vmanager.bean.WVPResult; import com.genersoft.iot.vmp.vmanager.bean.WVPResult;
import io.swagger.v3.oas.annotations.Operation; import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.Parameter; import io.swagger.v3.oas.annotations.Parameter;
@ -26,15 +20,9 @@ import io.swagger.v3.oas.annotations.tags.Tag;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.util.Assert; import org.springframework.util.Assert;
import org.springframework.util.ObjectUtils;
import org.springframework.web.bind.annotation.*; import org.springframework.web.bind.annotation.*;
import org.springframework.web.context.request.async.DeferredResult; import org.springframework.web.context.request.async.DeferredResult;
import javax.sip.InvalidArgumentException;
import javax.sip.SipException;
import java.text.ParseException;
import java.util.UUID;
@Slf4j @Slf4j
@Tag(name = "国标设备配置") @Tag(name = "国标设备配置")
@RestController @RestController
@ -44,21 +32,12 @@ public class DeviceConfig {
@Autowired @Autowired
private IDeviceService deviceService; private IDeviceService deviceService;
@Autowired
private SIPCommander cmder;
@Autowired
private DeferredResultHolder resultHolder;
/**
*
*/
@GetMapping("/basicParam") @GetMapping("/basicParam")
@Operation(summary = "基本配置设置命令", security = @SecurityRequirement(name = JwtUtils.HEADER)) @Operation(summary = "基本配置设置命令", security = @SecurityRequirement(name = JwtUtils.HEADER))
@Parameter(name = "basicParam", description = "基础配置参数", required = true) @Parameter(name = "basicParam", description = "基础配置参数", required = true)
public DeferredResult<WVPResult<String>> homePositionApi(BasicParam basicParam) { public DeferredResult<WVPResult<String>> homePositionApi(BasicParam basicParam) {
if (log.isDebugEnabled()) { if (log.isDebugEnabled()) {
log.debug("报警复位API调用"); log.debug("基本配置设置命令API调用");
} }
Assert.notNull(basicParam.getDeviceId(), "设备ID必须存在"); Assert.notNull(basicParam.getDeviceId(), "设备ID必须存在");
@ -78,36 +57,36 @@ public class DeviceConfig {
} }
/** @Operation(summary = "设备配置查询", security = @SecurityRequirement(name = JwtUtils.HEADER))
* API
* @param deviceId ID
* @param configType
* @param channelId ID
* @return
*/
@Operation(summary = "设备配置查询请求", security = @SecurityRequirement(name = JwtUtils.HEADER))
@Parameter(name = "deviceId", description = "设备国标编号", required = true) @Parameter(name = "deviceId", description = "设备国标编号", required = true)
@Parameter(name = "channelId", description = "通道国标编号", required = true) @Parameter(name = "channelId", description = "通道国标编号", required = true)
@Parameter(name = "configType", description = "配置类型") @Parameter(name = "configType", description = "配置类型, 可选值," +
"基本参数配置:BasicParam," +
"视频参数范围:VideoParamOpt, " +
"SVAC编码配置:SVACEncodeConfig, " +
"SVAC解码配置:SVACDecodeConfig。" +
"可同时查询多个配置类型,各类型以“/”分隔,")
@GetMapping("/query/{deviceId}/{configType}") @GetMapping("/query/{deviceId}/{configType}")
public DeferredResult<WVPResult<String>> configDownloadApi(@PathVariable String deviceId, public DeferredResult<WVPResult<Object>> configDownloadApi(@PathVariable String deviceId,
@PathVariable String configType, @PathVariable String configType,
@RequestParam(required = false) String channelId) { @RequestParam(required = false) String channelId) {
if (log.isDebugEnabled()) { if (log.isDebugEnabled()) {
log.debug("设备状态查询API调用"); log.debug("设备配置查询请求API调用");
} }
String key = DeferredResultHolder.CALLBACK_CMD_CONFIGDOWNLOAD + (ObjectUtils.isEmpty(channelId) ? deviceId : deviceId + channelId);
String uuid = UUID.randomUUID().toString();
Device device = deviceService.getDeviceByDeviceId(deviceId); Device device = deviceService.getDeviceByDeviceId(deviceId);
Assert.notNull(device, "设备不存在"); Assert.notNull(device, "设备不存在");
DeferredResult<WVPResult<String>> result = deviceService.deviceConfigQuery(device, channelId, configType); DeferredResult<WVPResult<Object>> deferredResult = new DeferredResult<>();
result.onTimeout(() -> { deviceService.deviceConfigQuery(device, channelId, configType, (code, msg, data) -> {
log.warn("[获取设备配置] 超时, {}", device.getDeviceId()); deferredResult.setResult(new WVPResult<>(code, msg, data));
result.setResult(WVPResult.fail(ErrorCode.ERROR100.getCode(), "超时"));
}); });
return result;
deferredResult.onTimeout(() -> {
log.warn("[获取设备配置] 超时, {}", device.getDeviceId());
deferredResult.setResult(WVPResult.fail(ErrorCode.ERROR100.getCode(), "超时"));
});
return deferredResult;
} }
} }

View File

@ -1,21 +1,11 @@
package com.genersoft.iot.vmp.gb28181.event; package com.genersoft.iot.vmp.gb28181.event;
import com.genersoft.iot.vmp.gb28181.bean.DeviceNotFoundEvent;
import com.genersoft.iot.vmp.gb28181.event.sip.MessageEvent; import com.genersoft.iot.vmp.gb28181.event.sip.MessageEvent;
import com.genersoft.iot.vmp.gb28181.event.sip.SipEvent;
import com.genersoft.iot.vmp.vmanager.bean.ErrorCode; import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
import gov.nist.javax.sip.message.SIPRequest;
import gov.nist.javax.sip.message.SIPResponse;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.ObjectUtils;
import org.springframework.scheduling.annotation.Scheduled; import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import javax.sip.DialogTerminatedEvent;
import javax.sip.ResponseEvent;
import javax.sip.TimeoutEvent;
import javax.sip.TransactionTerminatedEvent;
import javax.sip.header.WarningHeader;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.DelayQueue; import java.util.concurrent.DelayQueue;
@ -33,9 +23,7 @@ public class MessageSubscribe {
@Scheduled(fixedDelay = 200) //每200毫秒执行 @Scheduled(fixedDelay = 200) //每200毫秒执行
public void execute(){ public void execute(){
if (delayQueue.isEmpty()) { while (!delayQueue.isEmpty()) {
return;
}
try { try {
MessageEvent<?> take = delayQueue.take(); MessageEvent<?> take = delayQueue.take();
// 出现超时异常 // 出现超时异常
@ -47,6 +35,7 @@ public class MessageSubscribe {
throw new RuntimeException(e); throw new RuntimeException(e);
} }
} }
}
public void addSubscribe(MessageEvent<?> event) { public void addSubscribe(MessageEvent<?> event) {

View File

@ -29,11 +29,10 @@ public class SipSubscribe {
private final DelayQueue<SipEvent> delayQueue = new DelayQueue<>(); private final DelayQueue<SipEvent> delayQueue = new DelayQueue<>();
@Scheduled(fixedDelay = 200) //每200毫秒执行 @Scheduled(fixedDelay = 200) //每200毫秒执行
public void execute(){ public void execute(){
if (delayQueue.isEmpty()) { while (!delayQueue.isEmpty()) {
return;
}
try { try {
SipEvent take = delayQueue.take(); SipEvent take = delayQueue.take();
// 出现超时异常 // 出现超时异常
@ -49,6 +48,7 @@ public class SipSubscribe {
throw new RuntimeException(e); throw new RuntimeException(e);
} }
} }
}
public void updateTimeout(String callId) { public void updateTimeout(String callId) {
SipEvent sipEvent = subscribes.get(callId); SipEvent sipEvent = subscribes.get(callId);

View File

@ -28,7 +28,7 @@ public class MessageEvent<T> implements Delayed {
@Override @Override
public long getDelay(@NotNull TimeUnit unit) { public long getDelay(@NotNull TimeUnit unit) {
return unit.convert(delay, TimeUnit.MILLISECONDS); return unit.convert(delay - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
} }
@Override @Override
@ -47,9 +47,9 @@ public class MessageEvent<T> implements Delayed {
messageEvent.deviceId = deviceId; messageEvent.deviceId = deviceId;
messageEvent.callback = callback; messageEvent.callback = callback;
if (delay == null) { if (delay == null) {
messageEvent.delay = 1000; messageEvent.delay = System.currentTimeMillis() + 1000;
}else { }else {
messageEvent.delay = delay; messageEvent.delay = System.currentTimeMillis() + delay;
} }
return messageEvent; return messageEvent;
} }

View File

@ -32,13 +32,13 @@ public class SipEvent implements Delayed {
sipEvent.setKey(key); sipEvent.setKey(key);
sipEvent.setOkEvent(okEvent); sipEvent.setOkEvent(okEvent);
sipEvent.setErrorEvent(errorEvent); sipEvent.setErrorEvent(errorEvent);
sipEvent.setDelay(delay); sipEvent.setDelay(System.currentTimeMillis() + delay);
return sipEvent; return sipEvent;
} }
@Override @Override
public long getDelay(@NotNull TimeUnit unit) { public long getDelay(@NotNull TimeUnit unit) {
return unit.convert(delay, TimeUnit.MILLISECONDS); return unit.convert(delay - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
} }
@Override @Override

View File

@ -174,7 +174,7 @@ public interface IDeviceService {
void deviceBasicConfig(Device device, BasicParam basicParam, ErrorCallback<String> callback); void deviceBasicConfig(Device device, BasicParam basicParam, ErrorCallback<String> callback);
DeferredResult<WVPResult<String>> deviceConfigQuery(Device device, String channelId, String configType); void deviceConfigQuery(Device device, String channelId, String configType, ErrorCallback<Object> callback);
void teleboot(Device device); void teleboot(Device device);

View File

@ -1,7 +1,6 @@
package com.genersoft.iot.vmp.gb28181.service.impl; package com.genersoft.iot.vmp.gb28181.service.impl;
import com.alibaba.fastjson2.JSON; import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONObject;
import com.genersoft.iot.vmp.common.CommonCallback; import com.genersoft.iot.vmp.common.CommonCallback;
import com.genersoft.iot.vmp.common.VideoManagerConstants; import com.genersoft.iot.vmp.common.VideoManagerConstants;
import com.genersoft.iot.vmp.common.enums.ChannelDataType; import com.genersoft.iot.vmp.common.enums.ChannelDataType;
@ -12,7 +11,6 @@ import com.genersoft.iot.vmp.gb28181.bean.*;
import com.genersoft.iot.vmp.gb28181.dao.DeviceChannelMapper; import com.genersoft.iot.vmp.gb28181.dao.DeviceChannelMapper;
import com.genersoft.iot.vmp.gb28181.dao.DeviceMapper; import com.genersoft.iot.vmp.gb28181.dao.DeviceMapper;
import com.genersoft.iot.vmp.gb28181.dao.PlatformChannelMapper; import com.genersoft.iot.vmp.gb28181.dao.PlatformChannelMapper;
import com.genersoft.iot.vmp.gb28181.event.MessageSubscribe;
import com.genersoft.iot.vmp.gb28181.service.IDeviceService; import com.genersoft.iot.vmp.gb28181.service.IDeviceService;
import com.genersoft.iot.vmp.gb28181.service.IInviteStreamService; import com.genersoft.iot.vmp.gb28181.service.IInviteStreamService;
import com.genersoft.iot.vmp.gb28181.session.AudioBroadcastManager; import com.genersoft.iot.vmp.gb28181.session.AudioBroadcastManager;
@ -20,8 +18,6 @@ import com.genersoft.iot.vmp.gb28181.session.SipInviteSessionManager;
import com.genersoft.iot.vmp.gb28181.task.ISubscribeTask; import com.genersoft.iot.vmp.gb28181.task.ISubscribeTask;
import com.genersoft.iot.vmp.gb28181.task.impl.CatalogSubscribeTask; import com.genersoft.iot.vmp.gb28181.task.impl.CatalogSubscribeTask;
import com.genersoft.iot.vmp.gb28181.task.impl.MobilePositionSubscribeTask; import com.genersoft.iot.vmp.gb28181.task.impl.MobilePositionSubscribeTask;
import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder;
import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander; import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.response.cmd.CatalogResponseMessageHandler; import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.response.cmd.CatalogResponseMessageHandler;
import com.genersoft.iot.vmp.media.bean.MediaServer; import com.genersoft.iot.vmp.media.bean.MediaServer;
@ -38,12 +34,9 @@ import com.github.pagehelper.PageHelper;
import com.github.pagehelper.PageInfo; import com.github.pagehelper.PageInfo;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional; import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.Assert; import org.springframework.util.Assert;
import org.springframework.util.ObjectUtils;
import org.springframework.web.context.request.async.DeferredResult; import org.springframework.web.context.request.async.DeferredResult;
import javax.sip.InvalidArgumentException; import javax.sip.InvalidArgumentException;
@ -51,7 +44,6 @@ import javax.sip.SipException;
import java.text.ParseException; import java.text.ParseException;
import java.time.Instant; import java.time.Instant;
import java.util.List; import java.util.List;
import java.util.UUID;
import java.util.Objects; import java.util.Objects;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -667,9 +659,7 @@ public class DeviceServiceImpl implements IDeviceService {
} }
try { try {
sipCommander.deviceBasicConfigCmd(device, basicParam, event -> { sipCommander.deviceBasicConfigCmd(device, basicParam, callback);
callback.run(ErrorCode.ERROR100.getCode(), "操作超时", null);
});
} catch (InvalidArgumentException | SipException | ParseException e) { } catch (InvalidArgumentException | SipException | ParseException e) {
log.error("[命令发送失败] 设备配置: {}", e.getMessage()); log.error("[命令发送失败] 设备配置: {}", e.getMessage());
throw new ControllerException(ErrorCode.ERROR100.getCode(), "命令发送: " + e.getMessage()); throw new ControllerException(ErrorCode.ERROR100.getCode(), "命令发送: " + e.getMessage());
@ -677,25 +667,20 @@ public class DeviceServiceImpl implements IDeviceService {
} }
@Override @Override
public DeferredResult<WVPResult<String>> deviceConfigQuery(Device device, String channelId, String configType) { public void deviceConfigQuery(Device device, String channelId, String configType, ErrorCallback<Object> callback) {
if (!userSetting.getServerId().equals(device.getServerId())) { if (!userSetting.getServerId().equals(device.getServerId())) {
WVPResult<String> result = redisRpcService.deviceConfigQuery(device.getServerId(), device, channelId, configType); WVPResult<String> result = redisRpcService.deviceConfigQuery(device.getServerId(), device, channelId, configType);
DeferredResult<WVPResult<String>> deferredResult = new DeferredResult<>(3 * 1000L); callback.run(result.getCode(), result.getMsg(), result.getData());
deferredResult.setResult(result); return;
return deferredResult;
} }
DeferredResult<WVPResult<String>> result = new DeferredResult<>(3 * 1000L);
try { try {
sipCommander.deviceConfigQuery(device, channelId, configType, event -> { sipCommander.deviceConfigQuery(device, channelId, configType, callback);
result.setResult(WVPResult.fail(ErrorCode.ERROR100));
});
} catch (InvalidArgumentException | SipException | ParseException e) { } catch (InvalidArgumentException | SipException | ParseException e) {
log.error("[命令发送失败] 获取设备配置: {}", e.getMessage()); log.error("[命令发送失败] 获取设备配置: {}", e.getMessage());
throw new ControllerException(ErrorCode.ERROR100.getCode(), "命令发送: " + e.getMessage()); throw new ControllerException(ErrorCode.ERROR100.getCode(), "命令发送: " + e.getMessage());
} }
return result;
} }
@Override @Override

View File

@ -9,6 +9,7 @@ import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel;
import com.genersoft.iot.vmp.gb28181.event.SipSubscribe; import com.genersoft.iot.vmp.gb28181.event.SipSubscribe;
import com.genersoft.iot.vmp.media.event.hook.HookSubscribe; import com.genersoft.iot.vmp.media.event.hook.HookSubscribe;
import com.genersoft.iot.vmp.media.bean.MediaServer; import com.genersoft.iot.vmp.media.bean.MediaServer;
import com.genersoft.iot.vmp.service.bean.ErrorCallback;
import com.genersoft.iot.vmp.service.bean.SSRCInfo; import com.genersoft.iot.vmp.service.bean.SSRCInfo;
import gov.nist.javax.sip.message.SIPRequest; import gov.nist.javax.sip.message.SIPRequest;
@ -230,7 +231,7 @@ public interface ISIPCommander {
/** /**
* basicParam * basicParam
*/ */
void deviceBasicConfigCmd(Device device, BasicParam basicParam, SipSubscribe.Event errorEvent) throws InvalidArgumentException, SipException, ParseException; void deviceBasicConfigCmd(Device device, BasicParam basicParam, ErrorCallback<String> callback) throws InvalidArgumentException, SipException, ParseException;
/** /**
* *
@ -286,7 +287,7 @@ public interface ISIPCommander {
* @param channelId * @param channelId
* @param configType * @param configType
*/ */
void deviceConfigQuery(Device device, String channelId, String configType, SipSubscribe.Event errorEvent) throws InvalidArgumentException, SipException, ParseException; void deviceConfigQuery(Device device, String channelId, String configType, ErrorCallback<Object> callback) throws InvalidArgumentException, SipException, ParseException;
/** /**
* *

View File

@ -1,6 +1,5 @@
package com.genersoft.iot.vmp.gb28181.transmit.cmd.impl; package com.genersoft.iot.vmp.gb28181.transmit.cmd.impl;
import com.alibaba.fastjson2.JSONObject;
import com.genersoft.iot.vmp.common.InviteSessionType; import com.genersoft.iot.vmp.common.InviteSessionType;
import com.genersoft.iot.vmp.common.StreamInfo; import com.genersoft.iot.vmp.common.StreamInfo;
import com.genersoft.iot.vmp.conf.SipConfig; import com.genersoft.iot.vmp.conf.SipConfig;
@ -871,7 +870,7 @@ public class SIPCommander implements ISIPCommander {
* basicParam * basicParam
*/ */
@Override @Override
public void deviceBasicConfigCmd(Device device, BasicParam basicParam, SipSubscribe.Event errorEvent) throws InvalidArgumentException, SipException, ParseException { public void deviceBasicConfigCmd(Device device, BasicParam basicParam, ErrorCallback<String> callback) throws InvalidArgumentException, SipException, ParseException {
int sn = (int) ((Math.random() * 9 + 1) * 100000); int sn = (int) ((Math.random() * 9 + 1) * 100000);
String cmdType = "DeviceConfig"; String cmdType = "DeviceConfig";
@ -904,21 +903,14 @@ public class SIPCommander implements ISIPCommander {
cmdXml.append("</BasicParam>\r\n"); cmdXml.append("</BasicParam>\r\n");
cmdXml.append("</Control>\r\n"); cmdXml.append("</Control>\r\n");
ErrorCallback<String> errorCallback = (code, msg, data) -> {
if (code != ErrorCode.SUCCESS.getCode()) {
SipSubscribe.EventResult<Object> eventResult = new SipSubscribe.EventResult<>();
eventResult.type = SipSubscribe.EventResultType.failedResult;
eventResult.msg = msg;
eventResult.statusCode = code;
errorEvent.response(eventResult);
}
};
MessageEvent<String> messageEvent = MessageEvent.getInstance(cmdType, sn + "", channelId, 1000L, errorCallback); MessageEvent<String> messageEvent = MessageEvent.getInstance(cmdType, sn + "", channelId, 1000L, callback);
messageSubscribe.addSubscribe(messageEvent); messageSubscribe.addSubscribe(messageEvent);
Request request = headerProvider.createMessageRequest(device, cmdXml.toString(), null, SipUtils.getNewFromTag(), null,sipSender.getNewCallIdHeader(sipLayer.getLocalIp(device.getLocalIp()),device.getTransport())); Request request = headerProvider.createMessageRequest(device, cmdXml.toString(), null, SipUtils.getNewFromTag(), null,sipSender.getNewCallIdHeader(sipLayer.getLocalIp(device.getLocalIp()),device.getTransport()));
sipSender.transmitRequest(sipLayer.getLocalIp(device.getLocalIp()), request, errorEvent); sipSender.transmitRequest(sipLayer.getLocalIp(device.getLocalIp()), request, eventResult -> {
callback.run(ErrorCode.ERROR100.getCode(), "消息发送失败", null);
});
} }
/** /**
@ -1095,14 +1087,16 @@ public class SIPCommander implements ISIPCommander {
* @param configType * @param configType
*/ */
@Override @Override
public void deviceConfigQuery(Device device, String channelId, String configType, SipSubscribe.Event errorEvent) throws InvalidArgumentException, SipException, ParseException { public void deviceConfigQuery(Device device, String channelId, String configType, ErrorCallback<Object> callback) throws InvalidArgumentException, SipException, ParseException {
String cmdType = "ConfigDownload";
int sn = (int) ((Math.random() * 9 + 1) * 100000);
StringBuffer cmdXml = new StringBuffer(200); StringBuffer cmdXml = new StringBuffer(200);
String charset = device.getCharset(); String charset = device.getCharset();
cmdXml.append("<?xml version=\"1.0\" encoding=\"" + charset + "\"?>\r\n"); cmdXml.append("<?xml version=\"1.0\" encoding=\"" + charset + "\"?>\r\n");
cmdXml.append("<Query>\r\n"); cmdXml.append("<Query>\r\n");
cmdXml.append("<CmdType>ConfigDownload</CmdType>\r\n"); cmdXml.append("<CmdType>" + cmdType + "</CmdType>\r\n");
cmdXml.append("<SN>" + (int) ((Math.random() * 9 + 1) * 100000) + "</SN>\r\n"); cmdXml.append("<SN>" + sn + "</SN>\r\n");
if (ObjectUtils.isEmpty(channelId)) { if (ObjectUtils.isEmpty(channelId)) {
cmdXml.append("<DeviceID>" + device.getDeviceId() + "</DeviceID>\r\n"); cmdXml.append("<DeviceID>" + device.getDeviceId() + "</DeviceID>\r\n");
} else { } else {
@ -1111,10 +1105,13 @@ public class SIPCommander implements ISIPCommander {
cmdXml.append("<ConfigType>" + configType + "</ConfigType>\r\n"); cmdXml.append("<ConfigType>" + configType + "</ConfigType>\r\n");
cmdXml.append("</Query>\r\n"); cmdXml.append("</Query>\r\n");
MessageEvent<Object> messageEvent = MessageEvent.getInstance(cmdType, sn + "", channelId, 1000L, callback);
messageSubscribe.addSubscribe(messageEvent);
Request request = headerProvider.createMessageRequest(device, cmdXml.toString(), null, SipUtils.getNewFromTag(), null,sipSender.getNewCallIdHeader(sipLayer.getLocalIp(device.getLocalIp()),device.getTransport())); Request request = headerProvider.createMessageRequest(device, cmdXml.toString(), null, SipUtils.getNewFromTag(), null,sipSender.getNewCallIdHeader(sipLayer.getLocalIp(device.getLocalIp()),device.getTransport()));
sipSender.transmitRequest(sipLayer.getLocalIp(device.getLocalIp()), request, errorEvent); sipSender.transmitRequest(sipLayer.getLocalIp(device.getLocalIp()), request, eventResult -> {
callback.run(ErrorCode.ERROR100.getCode(), "消息发送失败", null);
});
} }
/** /**

View File

@ -3,7 +3,6 @@ package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.respon
import com.genersoft.iot.vmp.gb28181.bean.Device; import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.event.MessageSubscribe; import com.genersoft.iot.vmp.gb28181.event.MessageSubscribe;
import com.genersoft.iot.vmp.gb28181.event.sip.MessageEvent; import com.genersoft.iot.vmp.gb28181.event.sip.MessageEvent;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.IMessageHandler;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.MessageHandlerAbstract; import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.MessageHandlerAbstract;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.MessageRequestProcessor; import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.MessageRequestProcessor;
import com.genersoft.iot.vmp.vmanager.bean.ErrorCode; import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
@ -44,18 +43,16 @@ public class ResponseMessageHandler extends MessageHandlerAbstract implements In
public void handMessageEvent(Element element, Object data) { public void handMessageEvent(Element element, Object data) {
String cmd = getText(element, "CmdType"); String cmd = getText(element, "CmdType");
IMessageHandler messageHandler = messageHandlerMap.get(cmd);
if (messageHandler == null) {
String sn = getText(element, "SN"); String sn = getText(element, "SN");
MessageEvent<Object> subscribe = (MessageEvent<Object>)messageSubscribe.getSubscribe(cmd + sn); MessageEvent<Object> subscribe = (MessageEvent<Object>)messageSubscribe.getSubscribe(cmd + sn);
if (subscribe != null) { if (subscribe != null) {
String result = getText(element, "Result"); String result = getText(element, "Result");
if ("OK".equalsIgnoreCase(result)) { if (result == null || "OK".equalsIgnoreCase(result) || data != null) {
subscribe.getCallback().run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), data); subscribe.getCallback().run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), data);
}else { }else {
subscribe.getCallback().run(ErrorCode.ERROR100.getCode(), ErrorCode.ERROR100.getMsg(), result); subscribe.getCallback().run(ErrorCode.ERROR100.getCode(), ErrorCode.ERROR100.getMsg(), result);
} }
} messageSubscribe.removeSubscribe(cmd + sn);
} }
} }
} }

View File

@ -5,12 +5,10 @@ import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.bean.Platform; import com.genersoft.iot.vmp.gb28181.bean.Platform;
import com.genersoft.iot.vmp.gb28181.service.IDeviceService; import com.genersoft.iot.vmp.gb28181.service.IDeviceService;
import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder; import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder;
import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent; import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.IMessageHandler; import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.IMessageHandler;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.response.ResponseMessageHandler; import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.response.ResponseMessageHandler;
import com.genersoft.iot.vmp.gb28181.utils.XmlUtil; import com.genersoft.iot.vmp.gb28181.utils.XmlUtil;
import com.genersoft.iot.vmp.vmanager.bean.WVPResult;
import gov.nist.javax.sip.message.SIPRequest; import gov.nist.javax.sip.message.SIPRequest;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.dom4j.Element; import org.dom4j.Element;
@ -24,8 +22,6 @@ import javax.sip.SipException;
import javax.sip.message.Response; import javax.sip.message.Response;
import java.text.ParseException; import java.text.ParseException;
import static com.genersoft.iot.vmp.gb28181.utils.XmlUtil.getText;
@Slf4j @Slf4j
@Component @Component
public class ConfigDownloadResponseMessageHandler extends SIPRequestProcessorParent implements InitializingBean, IMessageHandler { public class ConfigDownloadResponseMessageHandler extends SIPRequestProcessorParent implements InitializingBean, IMessageHandler {
@ -49,8 +45,6 @@ public class ConfigDownloadResponseMessageHandler extends SIPRequestProcessorPar
@Override @Override
public void handForDevice(RequestEvent evt, Device device, Element element) { public void handForDevice(RequestEvent evt, Device device, Element element) {
String channelId = getText(element, "DeviceID");
try { try {
// 回复200 OK // 回复200 OK
responseAck((SIPRequest) evt.getRequest(), Response.OK); responseAck((SIPRequest) evt.getRequest(), Response.OK);
@ -63,7 +57,24 @@ public class ConfigDownloadResponseMessageHandler extends SIPRequestProcessorPar
if (log.isDebugEnabled()) { if (log.isDebugEnabled()) {
log.debug(json.toJSONString()); log.debug(json.toJSONString());
} }
JSONObject jsonObject = new JSONObject();
if (json.get("BasicParam") != null) {
jsonObject.put("BasicParam", json.getJSONObject("BasicParam"));
}
if (json.get("VideoParamOpt") != null) {
jsonObject.put("VideoParamOpt", json.getJSONObject("VideoParamOpt"));
}
if (json.get("SVACEncodeConfig") != null) {
jsonObject.put("SVACEncodeConfig", json.getJSONObject("SVACEncodeConfig"));
}
if (json.get("SVACDecodeConfig") != null) {
jsonObject.put("SVACDecodeConfig", json.getJSONObject("SVACDecodeConfig"));
}
responseMessageHandler.handMessageEvent(element, jsonObject);
JSONObject basicParam = json.getJSONObject("BasicParam"); JSONObject basicParam = json.getJSONObject("BasicParam");
if (basicParam != null) {
Integer heartBeatInterval = basicParam.getInteger("HeartBeatInterval"); Integer heartBeatInterval = basicParam.getInteger("HeartBeatInterval");
Integer heartBeatCount = basicParam.getInteger("HeartBeatCount"); Integer heartBeatCount = basicParam.getInteger("HeartBeatCount");
Integer positionCapability = basicParam.getInteger("PositionCapability"); Integer positionCapability = basicParam.getInteger("PositionCapability");
@ -72,8 +83,7 @@ public class ConfigDownloadResponseMessageHandler extends SIPRequestProcessorPar
device.setPositionCapability(positionCapability); device.setPositionCapability(positionCapability);
deviceService.updateDeviceHeartInfo(device); deviceService.updateDeviceHeartInfo(device);
responseMessageHandler.handMessageEvent(element, basicParam); }
} }

View File

@ -9,13 +9,19 @@ import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorP
import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.IMessageHandler; import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.IMessageHandler;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.response.ResponseMessageHandler; import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.response.ResponseMessageHandler;
import com.genersoft.iot.vmp.gb28181.utils.XmlUtil; import com.genersoft.iot.vmp.gb28181.utils.XmlUtil;
import gov.nist.javax.sip.message.SIPRequest;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.dom4j.Element; import org.dom4j.Element;
import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import javax.sip.InvalidArgumentException;
import javax.sip.RequestEvent; import javax.sip.RequestEvent;
import javax.sip.SipException;
import javax.sip.message.Response;
import java.text.ParseException;
import static com.genersoft.iot.vmp.gb28181.utils.XmlUtil.getText; import static com.genersoft.iot.vmp.gb28181.utils.XmlUtil.getText;
@ -39,6 +45,12 @@ public class DeviceConfigResponseMessageHandler extends SIPRequestProcessorParen
@Override @Override
public void handForDevice(RequestEvent evt, Device device, Element element) { public void handForDevice(RequestEvent evt, Device device, Element element) {
JSONObject json = new JSONObject(); JSONObject json = new JSONObject();
try {
// 回复200 OK
responseAck((SIPRequest) evt.getRequest(), Response.OK);
} catch (SipException | InvalidArgumentException | ParseException e) {
log.error("[命令发送失败] 设备配置查询: {}", e.getMessage());
}
XmlUtil.node2Json(element, json); XmlUtil.node2Json(element, json);
String channelId = getText(element, "DeviceID"); String channelId = getText(element, "DeviceID");
if (log.isDebugEnabled()) { if (log.isDebugEnabled()) {

View File

@ -1,6 +1,5 @@
package com.genersoft.iot.vmp.service.redisMsg.control; package com.genersoft.iot.vmp.service.redisMsg.control;
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONObject; import com.alibaba.fastjson2.JSONObject;
import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.conf.exception.ControllerException; import com.genersoft.iot.vmp.conf.exception.ControllerException;
@ -114,6 +113,7 @@ public class RedisRpcDeviceController extends RpcController {
@RedisRpcMapping("deviceConfigQuery") @RedisRpcMapping("deviceConfigQuery")
public RedisRpcResponse deviceConfigQuery(RedisRpcRequest request) { public RedisRpcResponse deviceConfigQuery(RedisRpcRequest request) {
JSONObject paramJson = JSONObject.parseObject(request.getParam().toString()); JSONObject paramJson = JSONObject.parseObject(request.getParam().toString());
String deviceId = paramJson.getString("deviceId"); String deviceId = paramJson.getString("deviceId");
String channelId = paramJson.getString("channelId"); String channelId = paramJson.getString("channelId");
@ -127,21 +127,13 @@ public class RedisRpcDeviceController extends RpcController {
response.setBody("param error"); response.setBody("param error");
return response; return response;
} }
DeferredResult<WVPResult<String>> deferredResult = deviceService.deviceConfigQuery(device, channelId, configType); deviceService.deviceConfigQuery(device, channelId, configType, (code, msg, data) -> {
deferredResult.onCompletion(() ->{ response.setStatusCode(code);
response.setStatusCode(ErrorCode.SUCCESS.getCode()); response.setBody(new WVPResult<>(code, msg, data));
response.setBody(deferredResult.getResult());
// 手动发送结果 // 手动发送结果
sendResponse(response); sendResponse(response);
}); });
deferredResult.onTimeout(() -> { return null;
log.warn("[设备配置]操作超时, 设备未返回应答指令, {}", deviceId);
response.setStatusCode(ErrorCode.SUCCESS.getCode());
response.setBody(WVPResult.fail(ErrorCode.ERROR100.getCode(), "超时"));
// 手动发送结果
sendResponse(response);
});
return response;
} }
@RedisRpcMapping("teleboot") @RedisRpcMapping("teleboot")