[集群] 重构message消息接受返回值的方式

master
lin 2025-02-12 17:20:12 +08:00
parent b6c3f42a1f
commit 5808c7aff5
19 changed files with 359 additions and 167 deletions

View File

@ -0,0 +1,49 @@
package com.genersoft.iot.vmp.gb28181.bean;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Data;
/**
*
*/
@Data
@Schema(description = "基础配置")
public class BasicParam {
@Schema(description = "设备ID")
private String deviceId;
@Schema(description = "通道ID如果时对设备配置直接设置同设备ID一样即可")
private String channelId;
@Schema(description = "名称")
private String name;
@Schema(description = "注册过期时间")
private String expiration;
@Schema(description = "心跳间隔时间")
private Integer heartBeatInterval;
@Schema(description = "心跳超时次数")
private Integer heartBeatCount;
@Schema(description = "定位功能支持情况。取值:0-不支持;1-支持 GPS定位;2-支持北斗定位(可选,默认取值为0)" +
"用于接受配置查询结果, 基础配置时无效")
private Integer positionCapability;
@Schema(description = "经度(可选),用于接受配置查询结果, 基础配置时无效")
private Double longitude;
@Schema(description = "纬度(可选),用于接受配置查询结果, 基础配置时无效")
private Double latitude;
public static BasicParam getInstance(String name, String expiration, Integer heartBeatInterval, Integer heartBeatCount) {
BasicParam basicParam = new BasicParam();
basicParam.setName(name);
basicParam.setExpiration(expiration);
basicParam.setHeartBeatInterval(heartBeatInterval);
basicParam.setHeartBeatCount(heartBeatCount);
return basicParam;
}
}

View File

@ -10,12 +10,14 @@ package com.genersoft.iot.vmp.gb28181.controller;
import com.alibaba.fastjson2.JSONObject;
import com.genersoft.iot.vmp.conf.exception.ControllerException;
import com.genersoft.iot.vmp.conf.security.JwtUtils;
import com.genersoft.iot.vmp.gb28181.bean.BasicParam;
import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.service.IDeviceService;
import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder;
import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander;
import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
import com.genersoft.iot.vmp.vmanager.bean.StreamContent;
import com.genersoft.iot.vmp.vmanager.bean.WVPResult;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.Parameter;
@ -49,45 +51,30 @@ public class DeviceConfig {
private DeferredResultHolder resultHolder;
/**
* API
* @param deviceId ID
* @param channelId ID
* @param name
* @param expiration
* @param heartBeatInterval
* @param heartBeatCount
* @return
*
*/
@GetMapping("/basicParam/{deviceId}")
@GetMapping("/basicParam")
@Operation(summary = "基本配置设置命令", security = @SecurityRequirement(name = JwtUtils.HEADER))
@Parameter(name = "deviceId", description = "设备国标编号", required = true)
@Parameter(name = "channelId", description = "通道国标编号", required = true)
@Parameter(name = "name", description = "名称")
@Parameter(name = "expiration", description = "到期时间")
@Parameter(name = "heartBeatInterval", description = "心跳间隔")
@Parameter(name = "heartBeatCount", description = "心跳计数")
public DeferredResult<String> homePositionApi(@PathVariable String deviceId,
@RequestParam(required = false) String channelId,
@RequestParam(required = false) String name,
@RequestParam(required = false) String expiration,
@RequestParam(required = false) String heartBeatInterval,
@RequestParam(required = false) String heartBeatCount) {
@Parameter(name = "basicParam", description = "基础配置参数", required = true)
public DeferredResult<WVPResult<String>> homePositionApi(BasicParam basicParam) {
if (log.isDebugEnabled()) {
log.debug("报警复位API调用");
}
Device device = deviceService.getDeviceByDeviceId(deviceId);
Assert.notNull(device, "设备不存在");
DeferredResult<String> result = deviceService.deviceBasicConfig(device, channelId, name, expiration, heartBeatInterval, heartBeatCount);
Assert.notNull(basicParam.getDeviceId(), "设备ID必须存在");
result.onTimeout(() -> {
log.warn(String.format("设备配置操作超时, 设备未返回应答指令"));
JSONObject json = new JSONObject();
json.put("DeviceID", device.getDeviceId());
json.put("Status", "Timeout");
json.put("Description", "设备配置操作超时, 设备未返回应答指令");
result.setResult(json.toString());
Device device = deviceService.getDeviceByDeviceId(basicParam.getDeviceId());
Assert.notNull(device, "设备不存在");
DeferredResult<WVPResult<String>> deferredResult = new DeferredResult<>();
deviceService.deviceBasicConfig(device, basicParam, (code, msg, data) -> {
deferredResult.setResult(new WVPResult<>(code, msg, data));
});
return result;
deferredResult.onTimeout(() -> {
log.warn("[设备配置] 超时, {}", device.getDeviceId());
deferredResult.setResult(WVPResult.fail(ErrorCode.ERROR100.getCode(), "超时"));
});
return deferredResult;
}

View File

@ -368,32 +368,32 @@ public class DeviceQuery {
return result;
String key = DeferredResultHolder.CALLBACK_CMD_ALARM + deviceId;
String uuid = UUID.randomUUID().toString();
try {
cmder.alarmInfoQuery(device, startPriority, endPriority, alarmMethod, alarmType, startTime, endTime, event -> {
RequestMessage msg = new RequestMessage();
msg.setId(uuid);
msg.setKey(key);
msg.setData(String.format("设备报警查询失败,错误码: %s, %s",event.statusCode, event.msg));
resultHolder.invokeResult(msg);
});
} catch (InvalidArgumentException | SipException | ParseException e) {
log.error("[命令发送失败] 设备报警查询: {}", e.getMessage());
throw new ControllerException(ErrorCode.ERROR100.getCode(), "命令发送失败: " + e.getMessage());
}
DeferredResult<ResponseEntity<String>> result = new DeferredResult<ResponseEntity<String >> (3 * 1000L);
result.onTimeout(()->{
log.warn(String.format("设备报警查询超时"));
// 释放rtpserver
RequestMessage msg = new RequestMessage();
msg.setId(uuid);
msg.setKey(key);
msg.setData("设备报警查询超时");
resultHolder.invokeResult(msg);
});
resultHolder.put(DeferredResultHolder.CALLBACK_CMD_ALARM + deviceId, uuid, result);
return result;
// String key = DeferredResultHolder.CALLBACK_CMD_ALARM + deviceId;
// String uuid = UUID.randomUUID().toString();
// try {
// cmder.alarmInfoQuery(device, startPriority, endPriority, alarmMethod, alarmType, startTime, endTime, event -> {
// RequestMessage msg = new RequestMessage();
// msg.setId(uuid);
// msg.setKey(key);
// msg.setData(String.format("设备报警查询失败,错误码: %s, %s",event.statusCode, event.msg));
// resultHolder.invokeResult(msg);
// });
// } catch (InvalidArgumentException | SipException | ParseException e) {
// log.error("[命令发送失败] 设备报警查询: {}", e.getMessage());
// throw new ControllerException(ErrorCode.ERROR100.getCode(), "命令发送失败: " + e.getMessage());
// }
// DeferredResult<ResponseEntity<String>> result = new DeferredResult<ResponseEntity<String >> (3 * 1000L);
// result.onTimeout(()->{
// log.warn(String.format("设备报警查询超时"));
// // 释放rtpserver
// RequestMessage msg = new RequestMessage();
// msg.setId(uuid);
// msg.setKey(key);
// msg.setData("设备报警查询超时");
// resultHolder.invokeResult(msg);
// });
// resultHolder.put(DeferredResultHolder.CALLBACK_CMD_ALARM + deviceId, uuid, result);
// return result;
}

View File

@ -0,0 +1,84 @@
package com.genersoft.iot.vmp.gb28181.event;
import com.genersoft.iot.vmp.gb28181.bean.DeviceNotFoundEvent;
import com.genersoft.iot.vmp.gb28181.event.sip.MessageEvent;
import com.genersoft.iot.vmp.gb28181.event.sip.SipEvent;
import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
import gov.nist.javax.sip.message.SIPRequest;
import gov.nist.javax.sip.message.SIPResponse;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.ObjectUtils;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import javax.sip.DialogTerminatedEvent;
import javax.sip.ResponseEvent;
import javax.sip.TimeoutEvent;
import javax.sip.TransactionTerminatedEvent;
import javax.sip.header.WarningHeader;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.DelayQueue;
/**
* @author lin
*/
@Slf4j
@Component
public class MessageSubscribe {
private final Map<String, MessageEvent<?>> subscribes = new ConcurrentHashMap<>();
private final DelayQueue<MessageEvent<?>> delayQueue = new DelayQueue<>();
@Scheduled(fixedDelay = 200) //每200毫秒执行
public void execute(){
if (delayQueue.isEmpty()) {
return;
}
try {
MessageEvent<?> take = delayQueue.take();
// 出现超时异常
if(take.getCallback() != null) {
take.getCallback().run(ErrorCode.ERROR486.getCode(), "消息超时未回复", null);
}
subscribes.remove(take.getKey());
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
public void addSubscribe(MessageEvent<?> event) {
MessageEvent<?> messageEvent = subscribes.get(event.getKey());
if (messageEvent != null) {
subscribes.remove(event.getKey());
delayQueue.remove(messageEvent);
}
subscribes.put(event.getKey(), event);
delayQueue.offer(event);
}
public MessageEvent<?> getSubscribe(String key) {
return subscribes.get(key);
}
public void removeSubscribe(String key) {
if(key == null){
return;
}
MessageEvent<?> messageEvent = subscribes.get(key);
if (messageEvent != null) {
subscribes.remove(key);
delayQueue.remove(messageEvent);
}
}
public boolean isEmpty(){
return subscribes.isEmpty();
}
public Integer size() {
return subscribes.size();
}
}

View File

@ -78,7 +78,9 @@ public class SipSubscribe {
// 消息发送失败
cmdSendFailEvent,
// 消息发送失败
failedToGetPort
failedToGetPort,
// 收到失败的回复
failedResult
}
public static class EventResult<EventObject>{

View File

@ -0,0 +1,56 @@
package com.genersoft.iot.vmp.gb28181.event.sip;
import com.genersoft.iot.vmp.service.bean.ErrorCallback;
import lombok.Data;
import org.jetbrains.annotations.NotNull;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
@Data
public class MessageEvent<T> implements Delayed {
/**
* ( )
*/
private long delay;
private String cmdType;
private String sn;
private String deviceId;
private String result;
private T t;
private ErrorCallback<T> callback;
@Override
public long getDelay(@NotNull TimeUnit unit) {
return unit.convert(delay, TimeUnit.MILLISECONDS);
}
@Override
public int compareTo(@NotNull Delayed o) {
return (int) (this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS));
}
public String getKey(){
return cmdType + sn;
}
public static <T> MessageEvent<T> getInstance(String cmdType, String sn, String deviceId, Long delay, ErrorCallback<T> callback){
MessageEvent<T> messageEvent = new MessageEvent<>();
messageEvent.cmdType = cmdType;
messageEvent.sn = sn;
messageEvent.deviceId = deviceId;
messageEvent.callback = callback;
if (delay == null) {
messageEvent.delay = 1000;
}else {
messageEvent.delay = delay;
}
return messageEvent;
}
}

View File

@ -1,9 +1,11 @@
package com.genersoft.iot.vmp.gb28181.service;
import com.genersoft.iot.vmp.common.CommonCallback;
import com.genersoft.iot.vmp.gb28181.bean.BasicParam;
import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.bean.SipTransactionInfo;
import com.genersoft.iot.vmp.gb28181.bean.SyncStatus;
import com.genersoft.iot.vmp.service.bean.ErrorCallback;
import com.genersoft.iot.vmp.vmanager.bean.ResourceBaseInfo;
import com.genersoft.iot.vmp.vmanager.bean.WVPResult;
import com.github.pagehelper.PageInfo;
@ -170,7 +172,7 @@ public interface IDeviceService {
WVPResult<SyncStatus> devicesSync(Device device);
DeferredResult<WVPResult<String>> deviceBasicConfig(Device device, String channelId, String name, String expiration, String heartBeatInterval, String heartBeatCount);
void deviceBasicConfig(Device device, BasicParam basicParam, ErrorCallback<String> callback);
DeferredResult<WVPResult<String>> deviceConfigQuery(Device device, String channelId, String configType);

View File

@ -13,14 +13,12 @@ public interface IGbChannelPlayService {
void stopPlay(InviteSessionType type, CommonGBChannel channel, String stream);
void play(CommonGBChannel channel, Platform platform, ErrorCallback<StreamInfo> callback);
void play(CommonGBChannel channel, Platform platform, Boolean record, ErrorCallback<StreamInfo> callback);
void playGbDeviceChannel(CommonGBChannel channel, Boolean record, ErrorCallback<StreamInfo> callback);
void stopPlayDeviceChannel(InviteSessionType type, CommonGBChannel channel, String stream);
void playProxy(CommonGBChannel channel, ErrorCallback<StreamInfo> callback);
void playProxy(CommonGBChannel channel, Boolean record, ErrorCallback<StreamInfo> callback);
void stopPlayProxy(CommonGBChannel channel);

View File

@ -12,6 +12,7 @@ import com.genersoft.iot.vmp.gb28181.bean.*;
import com.genersoft.iot.vmp.gb28181.dao.DeviceChannelMapper;
import com.genersoft.iot.vmp.gb28181.dao.DeviceMapper;
import com.genersoft.iot.vmp.gb28181.dao.PlatformChannelMapper;
import com.genersoft.iot.vmp.gb28181.event.MessageSubscribe;
import com.genersoft.iot.vmp.gb28181.service.IDeviceService;
import com.genersoft.iot.vmp.gb28181.service.IInviteStreamService;
import com.genersoft.iot.vmp.gb28181.session.AudioBroadcastManager;
@ -26,6 +27,7 @@ import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.respons
import com.genersoft.iot.vmp.media.bean.MediaServer;
import com.genersoft.iot.vmp.media.service.IMediaServerService;
import com.genersoft.iot.vmp.service.ISendRtpServerService;
import com.genersoft.iot.vmp.service.bean.ErrorCallback;
import com.genersoft.iot.vmp.service.redisMsg.IRedisRpcService;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.utils.DateUtil;
@ -655,26 +657,23 @@ public class DeviceServiceImpl implements IDeviceService {
}
@Override
public DeferredResult<WVPResult<String>> deviceBasicConfig(Device device, String channelId, String name, String expiration,
String heartBeatInterval, String heartBeatCount) {
public void deviceBasicConfig(Device device, BasicParam basicParam, ErrorCallback<String> callback) {
if (!userSetting.getServerId().equals(device.getServerId())) {
WVPResult<String> result = redisRpcService.deviceBasicConfig(device.getServerId(), device, channelId, name, expiration,
heartBeatInterval, heartBeatCount);
DeferredResult<WVPResult<String>> deferredResult = new DeferredResult<>(3 * 1000L);
deferredResult.setResult(result);
return deferredResult;
WVPResult<String> result = redisRpcService.deviceBasicConfig(device.getServerId(), device, basicParam);
if (result.getCode() == ErrorCode.SUCCESS.getCode()) {
callback.run(result.getCode(), result.getMsg(), result.getData());
}
return;
}
DeferredResult<WVPResult<String>> result = new DeferredResult<>(3 * 1000L);
try {
sipCommander.deviceBasicConfigCmd(device, channelId, name, expiration, heartBeatInterval, heartBeatCount, event -> {
result.setResult(WVPResult.fail(ErrorCode.ERROR100.getCode(), "操作超时"));
sipCommander.deviceBasicConfigCmd(device, basicParam, event -> {
callback.run(ErrorCode.ERROR100.getCode(), "操作超时", null);
});
} catch (InvalidArgumentException | SipException | ParseException e) {
log.error("[命令发送失败] 设备配置: {}", e.getMessage());
throw new ControllerException(ErrorCode.ERROR100.getCode(), "命令发送: " + e.getMessage());
}
return result;
}
@Override

View File

@ -302,7 +302,7 @@ public class PlayServiceImpl implements IPlayService {
log.warn("[点播] 未找到可用的zlm deviceId: {},channelId:{}", device.getDeviceId(), channel.getDeviceId());
throw new ControllerException(ErrorCode.ERROR100.getCode(), "未找到可用的zlm");
}
play(mediaServerItem, device, channel, null, callback);
play(mediaServerItem, device, channel, null, userSetting.getRecordSip(), callback);
}
@Override
@ -1661,11 +1661,11 @@ public class PlayServiceImpl implements IPlayService {
}
return;
}
inviteStreamService.removeInviteInfo( "rtp", inviteInfo);
inviteStreamService.removeInviteInfo(inviteInfo);
if (InviteSessionStatus.ok == inviteInfo.getStatus()) {
try {
log.info("[停止点播/回放/下载] {}/{}", device.getDeviceId(), channel.getDeviceId());
cmder.streamByeCmd(device, channel.getDeviceId(), inviteInfo.getStream(), null, null);
cmder.streamByeCmd(device, channel.getDeviceId(), "rtp", inviteInfo.getStream(), null, null);
} catch (InvalidArgumentException | SipException | ParseException | SsrcTransactionNotFoundException e) {
log.error("[命令发送失败] 停止点播/回放/下载, 发送BYE: {}", e.getMessage());
throw new ControllerException(ErrorCode.ERROR100.getCode(), "命令发送失败: " + e.getMessage());

View File

@ -229,15 +229,8 @@ public interface ISIPCommander {
/**
* basicParam
*
* @param device
* @param channelId
* @param name /
* @param expiration
* @param heartBeatInterval
* @param heartBeatCount
*/
void deviceBasicConfigCmd(Device device, String channelId, String name, String expiration, String heartBeatInterval, String heartBeatCount, SipSubscribe.Event errorEvent) throws InvalidArgumentException, SipException, ParseException;
*/
void deviceBasicConfigCmd(Device device, BasicParam basicParam, SipSubscribe.Event errorEvent) throws InvalidArgumentException, SipException, ParseException;
/**
*

View File

@ -1,5 +1,6 @@
package com.genersoft.iot.vmp.gb28181.transmit.cmd.impl;
import com.alibaba.fastjson2.JSONObject;
import com.genersoft.iot.vmp.common.InviteSessionType;
import com.genersoft.iot.vmp.common.StreamInfo;
import com.genersoft.iot.vmp.conf.SipConfig;
@ -7,7 +8,9 @@ import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.conf.exception.SsrcTransactionNotFoundException;
import com.genersoft.iot.vmp.gb28181.SipLayer;
import com.genersoft.iot.vmp.gb28181.bean.*;
import com.genersoft.iot.vmp.gb28181.event.MessageSubscribe;
import com.genersoft.iot.vmp.gb28181.event.SipSubscribe;
import com.genersoft.iot.vmp.gb28181.event.sip.MessageEvent;
import com.genersoft.iot.vmp.gb28181.session.SipInviteSessionManager;
import com.genersoft.iot.vmp.gb28181.transmit.SIPSender;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander;
@ -19,8 +22,10 @@ import com.genersoft.iot.vmp.media.event.hook.Hook;
import com.genersoft.iot.vmp.media.event.hook.HookSubscribe;
import com.genersoft.iot.vmp.media.event.hook.HookType;
import com.genersoft.iot.vmp.media.service.IMediaServerService;
import com.genersoft.iot.vmp.service.bean.ErrorCallback;
import com.genersoft.iot.vmp.service.bean.SSRCInfo;
import com.genersoft.iot.vmp.utils.DateUtil;
import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
import gov.nist.javax.sip.message.SIPRequest;
import gov.nist.javax.sip.message.SIPResponse;
import lombok.extern.slf4j.Slf4j;
@ -71,6 +76,9 @@ public class SIPCommander implements ISIPCommander {
@Autowired
private IMediaServerService mediaServerService;
@Autowired
private MessageSubscribe messageSubscribe;
/**
@ -861,52 +869,53 @@ public class SIPCommander implements ISIPCommander {
/**
* basicParam
*
* @param device
* @param channelId
* @param name /
* @param expiration
* @param heartBeatInterval
* @param heartBeatCount
*/
@Override
public void deviceBasicConfigCmd(Device device, String channelId, String name, String expiration,
String heartBeatInterval, String heartBeatCount, SipSubscribe.Event errorEvent) throws InvalidArgumentException, SipException, ParseException {
public void deviceBasicConfigCmd(Device device, BasicParam basicParam, SipSubscribe.Event errorEvent) throws InvalidArgumentException, SipException, ParseException {
int sn = (int) ((Math.random() * 9 + 1) * 100000);
String cmdType = "DeviceConfig";
StringBuffer cmdXml = new StringBuffer(200);
String charset = device.getCharset();
cmdXml.append("<?xml version=\"1.0\" encoding=\"" + charset + "\"?>\r\n");
cmdXml.append("<Control>\r\n");
cmdXml.append("<CmdType>DeviceConfig</CmdType>\r\n");
cmdXml.append("<SN>" + (int) ((Math.random() * 9 + 1) * 100000) + "</SN>\r\n");
cmdXml.append("<CmdType>" + cmdType + "</CmdType>\r\n");
cmdXml.append("<SN>" + sn + "</SN>\r\n");
String channelId = basicParam.getChannelId();
if (ObjectUtils.isEmpty(channelId)) {
cmdXml.append("<DeviceID>" + device.getDeviceId() + "</DeviceID>\r\n");
} else {
cmdXml.append("<DeviceID>" + channelId + "</DeviceID>\r\n");
channelId = device.getDeviceId();
}
cmdXml.append("<DeviceID>" + channelId + "</DeviceID>\r\n");
cmdXml.append("<BasicParam>\r\n");
if (!ObjectUtils.isEmpty(name)) {
cmdXml.append("<Name>" + name + "</Name>\r\n");
if (!ObjectUtils.isEmpty(basicParam.getName())) {
cmdXml.append("<Name>" + basicParam.getName() + "</Name>\r\n");
}
if (NumericUtil.isInteger(expiration)) {
if (Integer.valueOf(expiration) > 0) {
cmdXml.append("<Expiration>" + expiration + "</Expiration>\r\n");
if (NumericUtil.isInteger(basicParam.getExpiration())) {
if (Integer.parseInt(basicParam.getExpiration()) > 0) {
cmdXml.append("<Expiration>" + basicParam.getExpiration() + "</Expiration>\r\n");
}
}
if (NumericUtil.isInteger(heartBeatInterval)) {
if (Integer.valueOf(heartBeatInterval) > 0) {
cmdXml.append("<HeartBeatInterval>" + heartBeatInterval + "</HeartBeatInterval>\r\n");
}
if (basicParam.getHeartBeatInterval() != null && basicParam.getHeartBeatInterval() > 0) {
cmdXml.append("<HeartBeatInterval>" + basicParam.getHeartBeatInterval() + "</HeartBeatInterval>\r\n");
}
if (NumericUtil.isInteger(heartBeatCount)) {
if (Integer.valueOf(heartBeatCount) > 0) {
cmdXml.append("<HeartBeatCount>" + heartBeatCount + "</HeartBeatCount>\r\n");
}
if (basicParam.getHeartBeatCount() != null && basicParam.getHeartBeatCount() > 0) {
cmdXml.append("<HeartBeatCount>" + basicParam.getHeartBeatCount() + "</HeartBeatCount>\r\n");
}
cmdXml.append("</BasicParam>\r\n");
cmdXml.append("</Control>\r\n");
ErrorCallback<String> errorCallback = (code, msg, data) -> {
if (code != ErrorCode.SUCCESS.getCode()) {
SipSubscribe.EventResult<Object> eventResult = new SipSubscribe.EventResult<>();
eventResult.type = SipSubscribe.EventResultType.failedResult;
eventResult.msg = msg;
eventResult.statusCode = code;
errorEvent.response(eventResult);
}
};
MessageEvent<String> messageEvent = MessageEvent.getInstance(cmdType, sn + "", channelId, 1000L, errorCallback);
messageSubscribe.addSubscribe(messageEvent);
Request request = headerProvider.createMessageRequest(device, cmdXml.toString(), null, SipUtils.getNewFromTag(), null,sipSender.getNewCallIdHeader(sipLayer.getLocalIp(device.getLocalIp()),device.getTransport()));
sipSender.transmitRequest(sipLayer.getLocalIp(device.getLocalIp()), request, errorEvent);

View File

@ -1,11 +1,21 @@
package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.response;
import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.event.MessageSubscribe;
import com.genersoft.iot.vmp.gb28181.event.sip.MessageEvent;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.IMessageHandler;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.MessageHandlerAbstract;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.MessageRequestProcessor;
import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
import org.dom4j.Element;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.sip.RequestEvent;
import static com.genersoft.iot.vmp.gb28181.utils.XmlUtil.getText;
/**
*
* , , , , , , ......
@ -18,8 +28,34 @@ public class ResponseMessageHandler extends MessageHandlerAbstract implements In
@Autowired
private MessageRequestProcessor messageRequestProcessor;
@Autowired
private MessageSubscribe messageSubscribe;
@Override
public void afterPropertiesSet() throws Exception {
messageRequestProcessor.addHandler(messageType, this);
}
@Override
public void handForDevice(RequestEvent evt, Device device, Element element) {
super.handForDevice(evt, device, element);
handMessageEvent(element, null);
}
public void handMessageEvent(Element element, Object data) {
String cmd = getText(element, "CmdType");
IMessageHandler messageHandler = messageHandlerMap.get(cmd);
if (messageHandler == null) {
String sn = getText(element, "SN");
MessageEvent<Object> subscribe = (MessageEvent<Object>)messageSubscribe.getSubscribe(cmd + sn);
if (subscribe != null) {
String result = getText(element, "Result");
if ("OK".equalsIgnoreCase(result)) {
subscribe.getCallback().run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), data);
}else {
subscribe.getCallback().run(ErrorCode.ERROR100.getCode(), ErrorCode.ERROR100.getMsg(), result);
}
}
}
}
}

View File

@ -10,6 +10,7 @@ import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorP
import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.IMessageHandler;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.response.ResponseMessageHandler;
import com.genersoft.iot.vmp.gb28181.utils.XmlUtil;
import com.genersoft.iot.vmp.vmanager.bean.WVPResult;
import gov.nist.javax.sip.message.SIPRequest;
import lombok.extern.slf4j.Slf4j;
import org.dom4j.Element;
@ -49,12 +50,7 @@ public class ConfigDownloadResponseMessageHandler extends SIPRequestProcessorPar
@Override
public void handForDevice(RequestEvent evt, Device device, Element element) {
String channelId = getText(element, "DeviceID");
String key;
if (device.getDeviceId().equals(channelId)) {
key = DeferredResultHolder.CALLBACK_CMD_CONFIGDOWNLOAD + device.getDeviceId();
}else {
key = DeferredResultHolder.CALLBACK_CMD_CONFIGDOWNLOAD + device.getDeviceId() + channelId;
}
try {
// 回复200 OK
responseAck((SIPRequest) evt.getRequest(), Response.OK);
@ -76,13 +72,7 @@ public class ConfigDownloadResponseMessageHandler extends SIPRequestProcessorPar
device.setPositionCapability(positionCapability);
deviceService.updateDeviceHeartInfo(device);
RequestMessage msg = new RequestMessage();
msg.setKey(key);
msg.setData(json);
deferredResultHolder.invokeAllResult(msg);
responseMessageHandler.handMessageEvent(element, basicParam);
}

View File

@ -2,11 +2,9 @@ package com.genersoft.iot.vmp.service.redisMsg;
import com.genersoft.iot.vmp.common.CommonCallback;
import com.genersoft.iot.vmp.common.StreamInfo;
import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.bean.Platform;
import com.genersoft.iot.vmp.gb28181.bean.SendRtpInfo;
import com.genersoft.iot.vmp.gb28181.bean.SyncStatus;
import com.genersoft.iot.vmp.gb28181.bean.*;
import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent;
import com.genersoft.iot.vmp.service.bean.ErrorCallback;
import com.genersoft.iot.vmp.vmanager.bean.WVPResult;
public interface IRedisRpcService {
@ -40,7 +38,7 @@ public interface IRedisRpcService {
SyncStatus getChannelSyncStatus(String serverId, String deviceId);
WVPResult<String> deviceBasicConfig(String serverId, Device device, String channelId, String name, String expiration, String heartBeatInterval, String heartBeatCount);
WVPResult<String> deviceBasicConfig(String serverId, Device device, BasicParam basicParam);
WVPResult<String> deviceConfigQuery(String serverId, Device device, String channelId, String configType);

View File

@ -8,6 +8,7 @@ import com.genersoft.iot.vmp.conf.redis.RedisRpcConfig;
import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcMessage;
import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcRequest;
import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcResponse;
import com.genersoft.iot.vmp.gb28181.bean.BasicParam;
import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.bean.SyncStatus;
import com.genersoft.iot.vmp.gb28181.service.IDeviceService;
@ -92,14 +93,9 @@ public class RedisRpcDeviceController extends RpcController {
@RedisRpcMapping("deviceBasicConfig")
public RedisRpcResponse deviceBasicConfig(RedisRpcRequest request) {
JSONObject paramJson = JSONObject.parseObject(request.getParam().toString());
String deviceId = paramJson.getString("deviceId");
String channelId = paramJson.getString("channelId");
String name = paramJson.getString("configType");
String expiration = paramJson.getString("expiration");
String heartBeatInterval = paramJson.getString("heartBeatInterval");
BasicParam basicParam = JSONObject.parseObject(request.getParam().toString(), BasicParam.class);
Device device = deviceService.getDeviceByDeviceId(deviceId);
Device device = deviceService.getDeviceByDeviceId(basicParam.getDeviceId());
RedisRpcResponse response = request.getResponse();
if (device == null || !userSetting.getServerId().equals(device.getServerId())) {
@ -107,22 +103,13 @@ public class RedisRpcDeviceController extends RpcController {
response.setBody("param error");
return response;
}
DeferredResult<WVPResult<String>> deferredResult = deviceService.deviceBasicConfig(device, channelId, name,
expiration, heartBeatInterval, heartBeatInterval);
deferredResult.onCompletion(() ->{
response.setStatusCode(ErrorCode.SUCCESS.getCode());
response.setBody(deferredResult.getResult());
// 手动发送结果
sendResponse(response);
});
deferredResult.onTimeout(() -> {
log.warn(String.format("设备配置操作超时, 设备未返回应答指令"));
response.setStatusCode(ErrorCode.SUCCESS.getCode());
response.setBody(WVPResult.fail(ErrorCode.ERROR100.getCode(), "操作超时, 设备未应答"));
// 手动发送结果
sendResponse(response);
});
return response;
deviceService.deviceBasicConfig(device, basicParam, (code, msg, data) -> {
response.setStatusCode(code);
response.setBody(new WVPResult<>(code, msg, data));
// 手动发送结果
sendResponse(response);
});
return null;
}
@RedisRpcMapping("deviceConfigQuery")

View File

@ -9,10 +9,7 @@ import com.genersoft.iot.vmp.conf.exception.ControllerException;
import com.genersoft.iot.vmp.conf.redis.RedisRpcConfig;
import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcRequest;
import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcResponse;
import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.bean.Platform;
import com.genersoft.iot.vmp.gb28181.bean.SendRtpInfo;
import com.genersoft.iot.vmp.gb28181.bean.SyncStatus;
import com.genersoft.iot.vmp.gb28181.bean.*;
import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent;
import com.genersoft.iot.vmp.gb28181.session.SSRCFactory;
import com.genersoft.iot.vmp.media.event.hook.Hook;
@ -267,16 +264,8 @@ public class RedisRpcServiceImpl implements IRedisRpcService {
}
@Override
public WVPResult<String> deviceBasicConfig(String serverId, Device device, String channelId, String name, String expiration,
String heartBeatInterval, String heartBeatCount) {
JSONObject jsonObject = new JSONObject();
jsonObject.put("device", device.getDeviceId());
jsonObject.put("channelId", channelId);
jsonObject.put("name", name);
jsonObject.put("expiration", expiration);
jsonObject.put("heartBeatInterval", heartBeatInterval);
jsonObject.put("heartBeatCount", heartBeatCount);
RedisRpcRequest request = buildRequest("device/deviceBasicConfig", jsonObject);
public WVPResult<String> deviceBasicConfig(String serverId, Device device, BasicParam basicParam) {
RedisRpcRequest request = buildRequest("device/deviceBasicConfig", JSONObject.toJSONString(basicParam));
request.setToId(serverId);
RedisRpcResponse response = redisRpcConfig.request(request, 50, TimeUnit.MILLISECONDS);
return JSON.parseObject(response.getBody().toString(), WVPResult.class);

View File

@ -8,4 +8,5 @@ public interface IStreamPushPlayService {
void stop(String app, String stream);
void stop(Integer id);
}

View File

@ -127,4 +127,16 @@ public class StreamPushPlayServiceImpl implements IStreamPushPlayService {
Assert.notNull(mediaServer, "未找到使用的节点");
mediaServerService.closeStreams(mediaServer, app, stream);
}
@Override
public void stop(Integer id) {
StreamPush streamPush = streamPushMapper.queryOne(id);
if (streamPush == null || !streamPush.isPushing()) {
return;
}
String mediaServerId = streamPush.getMediaServerId();
MediaServer mediaServer = mediaServerService.getOne(mediaServerId);
Assert.notNull(mediaServer, "未找到使用的节点");
mediaServerService.closeStreams(mediaServer, streamPush.getApp(), streamPush.getStream());
}
}