message消息适配新的回调模式

master
lin 2025-02-18 17:04:13 +08:00
parent 3da6444baa
commit 676e2d7ea9
21 changed files with 356 additions and 548 deletions

View File

@ -67,11 +67,6 @@ public class DeviceQuery {
@Autowired
private IRedisRpcService redisRpcService;
/**
* 使ID
* @param deviceId ID
* @return
*/
@Operation(summary = "查询国标设备", security = @SecurityRequirement(name = JwtUtils.HEADER))
@Parameter(name = "deviceId", description = "设备国标编号", required = true)
@GetMapping("/devices/{deviceId}")
@ -80,12 +75,7 @@ public class DeviceQuery {
return deviceService.getDeviceByDeviceId(deviceId);
}
/**
*
* @param page
* @param count
* @return
*/
@Operation(summary = "分页查询国标设备", security = @SecurityRequirement(name = JwtUtils.HEADER))
@Parameter(name = "page", description = "当前页", required = true)
@Parameter(name = "count", description = "每页查询数量", required = true)
@ -100,9 +90,7 @@ public class DeviceQuery {
return deviceService.getAll(page, count, query, status);
}
/**
*
*/
@GetMapping("/devices/{deviceId}/channels")
@Operation(summary = "分页查询通道", security = @SecurityRequirement(name = JwtUtils.HEADER))
@Parameter(name = "deviceId", description = "设备国标编号", required = true)
@ -123,9 +111,7 @@ public class DeviceQuery {
return deviceChannelService.queryChannelsByDeviceId(deviceId, query, channelType, online, page, count);
}
/**
*
*/
@Operation(summary = "同步设备通道", security = @SecurityRequirement(name = JwtUtils.HEADER))
@Parameter(name = "deviceId", description = "设备国标编号", required = true)
@GetMapping("/devices/{deviceId}/sync")
@ -135,19 +121,11 @@ public class DeviceQuery {
log.debug("设备通道信息同步API调用deviceId" + deviceId);
}
Device device = deviceService.getDeviceByDeviceId(deviceId);
if (!userSetting.getServerId().equals(device.getServerId())) {
return redisRpcService.devicesSync(device.getServerId(), deviceId);
}
return deviceService.devicesSync(device);
}
/**
*
* @param deviceId id
* @return
*/
@Operation(summary = "移除设备", security = @SecurityRequirement(name = JwtUtils.HEADER))
@Parameter(name = "deviceId", description = "设备国标编号", required = true)
@DeleteMapping("/devices/{deviceId}/delete")
@ -182,17 +160,6 @@ public class DeviceQuery {
}
}
/**
*
* @param deviceId id
* @param channelId id
* @param page
* @param count
* @param query
* @param online 线
* @param channelType
* @return
*/
@Operation(summary = "分页查询子目录通道", security = @SecurityRequirement(name = JwtUtils.HEADER))
@Parameter(name = "deviceId", description = "设备国标编号", required = true)
@Parameter(name = "channelId", description = "通道国标编号", required = true)
@ -235,12 +202,7 @@ public class DeviceQuery {
deviceChannelService.updateChannelStreamIdentification(channel);
}
/**
*
* @param deviceId id
* @param streamMode
* @return
*/
@Operation(summary = "修改数据流传输模式", security = @SecurityRequirement(name = JwtUtils.HEADER))
@Parameter(name = "deviceId", description = "设备国标编号", required = true)
@Parameter(name = "streamMode", description = "数据流传输模式, 取值:" +
@ -252,11 +214,7 @@ public class DeviceQuery {
deviceService.updateCustomDevice(device);
}
/**
*
* @param device
* @return
*/
@Operation(summary = "添加设备信息", security = @SecurityRequirement(name = JwtUtils.HEADER))
@Parameter(name = "device", description = "设备", required = true)
@PostMapping("/device/add/")
@ -274,11 +232,7 @@ public class DeviceQuery {
deviceService.addDevice(device);
}
/**
*
* @param device
* @return
*/
@Operation(summary = "更新设备信息", security = @SecurityRequirement(name = JwtUtils.HEADER))
@Parameter(name = "device", description = "设备", required = true)
@PostMapping("/device/update/")
@ -289,11 +243,6 @@ public class DeviceQuery {
deviceService.updateCustomDevice(device);
}
/**
* API
*
* @param deviceId id
*/
@Operation(summary = "设备状态查询", security = @SecurityRequirement(name = JwtUtils.HEADER))
@Parameter(name = "deviceId", description = "设备国标编号", required = true)
@GetMapping("/devices/{deviceId}/status")
@ -314,27 +263,17 @@ public class DeviceQuery {
return result;
}
/**
* API
* @param deviceId id
* @param startPriority
* @param endPriority
* @param alarmMethod
* @param alarmType
* @param startTime
* @param endTime
* @return true =
*/
@Operation(summary = "设备报警查询", security = @SecurityRequirement(name = JwtUtils.HEADER))
@Parameter(name = "deviceId", description = "设备国标编号", required = true)
@Parameter(name = "startPriority", description = "报警起始级别")
@Parameter(name = "endPriority", description = "报警终止级别")
@Parameter(name = "alarmMethod", description = "报警方式条件")
@Parameter(name = "startPriority", description = "报警起始级别, 0为全部,1为一级警情,2为二级警情,3为三级警情,4为四级警情")
@Parameter(name = "endPriority", description = "报警终止级别, ,0为全部,1为一级警情,2为二级警情,3为三级警情,4为四级警情")
@Parameter(name = "alarmMethod", description = "报警方式条件,取值0为全部,1为电话报警,2为设备报警,3为短信报警,4为GPS报警," +
"5为视频报警,6为设备故障报警,7其他报警;可以为直接组合如12为电话报警或设备报警")
@Parameter(name = "alarmType", description = "报警类型")
@Parameter(name = "startTime", description = "报警发生起始时间")
@Parameter(name = "endTime", description = "报警发生终止时间")
@GetMapping("/alarm/{deviceId}")
public DeferredResult<WVPResult<String>> alarmApi(@PathVariable String deviceId,
@GetMapping("/alarm")
public DeferredResult<WVPResult<Object>> alarmApi(String deviceId,
@RequestParam(required = false) String startPriority,
@RequestParam(required = false) String endPriority,
@RequestParam(required = false) String alarmMethod,
@ -346,8 +285,8 @@ public class DeviceQuery {
}
Device device = deviceService.getDeviceByDeviceId(deviceId);
Assert.notNull(device, "设备不存在");
DeferredResult<WVPResult<String>> result = new DeferredResult<>();
deviceService.deviceStatus(device, (code, msg, data) -> {
DeferredResult<WVPResult<Object>> result = new DeferredResult<>();
deviceService.alarm(device, startPriority,endPriority ,alarmMethod ,alarmType ,startTime ,endTime, (code, msg, data) -> {
result.setResult(new WVPResult<>(code, msg, data));
});
result.onTimeout(() -> {
@ -355,34 +294,26 @@ public class DeviceQuery {
result.setResult(WVPResult.fail(ErrorCode.ERROR100.getCode(), "操作超时, 设备未应答"));
});
return result;
}
// String key = DeferredResultHolder.CALLBACK_CMD_ALARM + deviceId;
// String uuid = UUID.randomUUID().toString();
// try {
// cmder.alarmInfoQuery(device, startPriority, endPriority, alarmMethod, alarmType, startTime, endTime, event -> {
// RequestMessage msg = new RequestMessage();
// msg.setId(uuid);
// msg.setKey(key);
// msg.setData(String.format("设备报警查询失败,错误码: %s, %s",event.statusCode, event.msg));
// resultHolder.invokeResult(msg);
// });
// } catch (InvalidArgumentException | SipException | ParseException e) {
// log.error("[命令发送失败] 设备报警查询: {}", e.getMessage());
// throw new ControllerException(ErrorCode.ERROR100.getCode(), "命令发送失败: " + e.getMessage());
// }
// DeferredResult<ResponseEntity<String>> result = new DeferredResult<ResponseEntity<String >> (3 * 1000L);
// result.onTimeout(()->{
// log.warn(String.format("设备报警查询超时"));
// // 释放rtpserver
// RequestMessage msg = new RequestMessage();
// msg.setId(uuid);
// msg.setKey(key);
// msg.setData("设备报警查询超时");
// resultHolder.invokeResult(msg);
// });
// resultHolder.put(DeferredResultHolder.CALLBACK_CMD_ALARM + deviceId, uuid, result);
// return result;
@Operation(summary = "设备信息查询", security = @SecurityRequirement(name = JwtUtils.HEADER))
@Parameter(name = "deviceId", description = "设备国标编号", required = true)
@GetMapping("/info")
public DeferredResult<WVPResult<Object>> deviceInfo(String deviceId) {
if (log.isDebugEnabled()) {
log.debug("设备信息查询API调用");
}
Device device = deviceService.getDeviceByDeviceId(deviceId);
Assert.notNull(device, "设备不存在");
DeferredResult<WVPResult<Object>> result = new DeferredResult<>();
deviceService.deviceInfo(device, (code, msg, data) -> {
result.setResult(new WVPResult<>(code, msg, data));
});
result.onTimeout(() -> {
log.warn("[设备信息查询] 操作超时, 设备未返回应答指令, {}", deviceId);
result.setResult(WVPResult.fail(ErrorCode.ERROR100.getCode(), "操作超时, 设备未应答"));
});
return result;
}

View File

@ -93,28 +93,20 @@ public class PlayController {
DeviceChannel channel = deviceChannelService.getOne(deviceId, channelId);
Assert.notNull(channel, "通道不存在");
RequestMessage requestMessage = new RequestMessage();
String key = DeferredResultHolder.CALLBACK_CMD_PLAY + deviceId + channelId;
requestMessage.setKey(key);
String uuid = UUID.randomUUID().toString();
requestMessage.setId(uuid);
DeferredResult<WVPResult<StreamContent>> result = new DeferredResult<>(userSetting.getPlayTimeout().longValue());
result.onTimeout(()->{
log.info("[点播等待超时] deviceId{}, channelId{}, ", deviceId, channelId);
// 释放rtpserver
WVPResult<StreamInfo> wvpResult = new WVPResult<>();
WVPResult<StreamContent> wvpResult = new WVPResult<>();
wvpResult.setCode(ErrorCode.ERROR100.getCode());
wvpResult.setMsg("点播超时");
requestMessage.setData(wvpResult);
resultHolder.invokeAllResult(requestMessage);
result.setResult(wvpResult);
inviteStreamService.removeInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, channel.getId());
deviceChannelService.stopPlay(channel.getId());
});
// 录像查询以channelId作为deviceId查询
resultHolder.put(key, uuid, result);
ErrorCallback<StreamInfo> callback = (code, msg, streamInfo) -> {
WVPResult<StreamContent> wvpResult = new WVPResult<>();
if (code == InviteErrorCode.SUCCESS.getCode()) {
@ -145,9 +137,7 @@ public class PlayController {
wvpResult.setCode(code);
wvpResult.setMsg(msg);
}
requestMessage.setData(wvpResult);
// 此处必须释放所有请求
resultHolder.invokeAllResult(requestMessage);
result.setResult(wvpResult);
};
playService.play(device, channel, callback);
return result;

View File

@ -7,9 +7,9 @@ import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.service.IDeviceService;
import com.genersoft.iot.vmp.gb28181.service.IPTZService;
import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder;
import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander;
import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
import com.genersoft.iot.vmp.vmanager.bean.WVPResult;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.Parameter;
import io.swagger.v3.oas.annotations.security.SecurityRequirement;
@ -17,18 +17,12 @@ import io.swagger.v3.oas.annotations.tags.Tag;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.util.Assert;
import org.springframework.util.ObjectUtils;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.context.request.async.DeferredResult;
import javax.sip.InvalidArgumentException;
import javax.sip.SipException;
import java.text.ParseException;
import java.util.UUID;
@Tag(name = "前端设备控制")
@Slf4j
@RestController
@ -225,40 +219,22 @@ public class PtzController {
@Parameter(name = "deviceId", description = "设备国标编号", required = true)
@Parameter(name = "channelId", description = "通道国标编号", required = true)
@GetMapping("/preset/query/{deviceId}/{channelId}")
public DeferredResult<String> queryPreset(@PathVariable String deviceId, @PathVariable String channelId) {
public DeferredResult<WVPResult<Object>> queryPreset(@PathVariable String deviceId, @PathVariable String channelId) {
if (log.isDebugEnabled()) {
log.debug("设备预置位查询API调用");
}
Device device = deviceService.getDeviceByDeviceId(deviceId);
String uuid = UUID.randomUUID().toString();
String key = DeferredResultHolder.CALLBACK_CMD_PRESETQUERY + (ObjectUtils.isEmpty(channelId) ? deviceId : channelId);
DeferredResult<String> result = new DeferredResult<String> (3 * 1000L);
result.onTimeout(()->{
log.warn(String.format("获取设备预置位超时"));
// 释放rtpserver
RequestMessage msg = new RequestMessage();
msg.setId(uuid);
msg.setKey(key);
msg.setData("获取设备预置位超时");
resultHolder.invokeResult(msg);
Assert.notNull(device, "设备不存在");
DeferredResult<WVPResult<Object>> deferredResult = new DeferredResult<> (3 * 1000L);
deviceService.queryPreset(device, channelId, (code, msg, data) -> {
deferredResult.setResult(new WVPResult<>(code, msg, data));
});
if (resultHolder.exist(key, null)) {
return result;
}
resultHolder.put(key, uuid, result);
try {
cmder.presetQuery(device, channelId, event -> {
RequestMessage msg = new RequestMessage();
msg.setId(uuid);
msg.setKey(key);
msg.setData(String.format("获取设备预置位失败,错误码: %s, %s", event.statusCode, event.msg));
resultHolder.invokeResult(msg);
});
} catch (InvalidArgumentException | SipException | ParseException e) {
log.error("[命令发送失败] 获取设备预置位: {}", e.getMessage());
throw new ControllerException(ErrorCode.ERROR100.getCode(), "命令发送失败: " + e.getMessage());
}
return result;
deferredResult.onTimeout(()->{
log.warn("[获取设备预置位] 超时, {}", device.getDeviceId());
deferredResult.setResult(WVPResult.fail(ErrorCode.ERROR100.getCode(), "超时"));
});
return deferredResult;
}
@Operation(summary = "预置位指令-设置预置位", security = @SecurityRequirement(name = JwtUtils.HEADER))

View File

@ -1,10 +1,7 @@
package com.genersoft.iot.vmp.gb28181.service;
import com.genersoft.iot.vmp.common.CommonCallback;
import com.genersoft.iot.vmp.gb28181.bean.BasicParam;
import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.bean.SipTransactionInfo;
import com.genersoft.iot.vmp.gb28181.bean.SyncStatus;
import com.genersoft.iot.vmp.gb28181.bean.*;
import com.genersoft.iot.vmp.service.bean.ErrorCallback;
import com.genersoft.iot.vmp.vmanager.bean.ResourceBaseInfo;
import com.genersoft.iot.vmp.vmanager.bean.WVPResult;
@ -194,4 +191,10 @@ public interface IDeviceService {
void deviceStatus(Device device, ErrorCallback<String> callback);
void updateDeviceHeartInfo(Device device);
void alarm(Device device, String startPriority, String endPriority, String alarmMethod, String alarmType, String startTime, String endTime, ErrorCallback<Object> callback);
void deviceInfo(Device device, ErrorCallback<Object> callback);
void queryPreset(Device device, String channelId, ErrorCallback<Object> callback);
}

View File

@ -37,7 +37,6 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.Assert;
import org.springframework.web.context.request.async.DeferredResult;
import javax.sip.InvalidArgumentException;
import javax.sip.SipException;
@ -140,7 +139,7 @@ public class DeviceServiceImpl implements IDeviceService {
deviceMapper.add(device);
redisCatchStorage.updateDevice(device);
try {
commander.deviceInfoQuery(device);
commander.deviceInfoQuery(device, null);
commander.deviceConfigQuery(device, null, "BasicParam", null);
} catch (InvalidArgumentException | SipException | ParseException e) {
log.error("[命令发送失败] 查询设备信息: {}", e.getMessage());
@ -155,7 +154,7 @@ public class DeviceServiceImpl implements IDeviceService {
if (userSetting.getSyncChannelOnDeviceOnline()) {
log.info("[设备上线,离线状态下重新注册]: {},查询设备信息以及通道信息", device.getDeviceId());
try {
commander.deviceInfoQuery(device);
commander.deviceInfoQuery(device, null);
} catch (InvalidArgumentException | SipException | ParseException e) {
log.error("[命令发送失败] 查询设备信息: {}", e.getMessage());
}
@ -623,7 +622,9 @@ public class DeviceServiceImpl implements IDeviceService {
@Override
public WVPResult<SyncStatus> devicesSync(Device device) {
if (!userSetting.getServerId().equals(device.getServerId())) {
return redisRpcService.devicesSync(device.getServerId(), device.getDeviceId());
}
// 已存在则返回进度
if (isSyncRunning(device.getDeviceId())) {
SyncStatus channelSyncStatus = getChannelSyncStatus(device.getDeviceId());
@ -742,7 +743,7 @@ public class DeviceServiceImpl implements IDeviceService {
return;
}
try {
sipCommander.alarmCmd(device, alarmMethod, alarmType, callback);
sipCommander.alarmResetCmd(device, alarmMethod, alarmType, callback);
} catch (InvalidArgumentException | SipException | ParseException e) {
log.error("[命令发送失败] 布防/撤防操作: {}", e.getMessage());
callback.run(ErrorCode.ERROR100.getCode(), "命令发送: " + e.getMessage(), null);
@ -842,7 +843,6 @@ public class DeviceServiceImpl implements IDeviceService {
return;
}
DeferredResult<WVPResult<String>> result = new DeferredResult<>(2*1000L);
try {
sipCommander.deviceStatusQuery(device, callback);
} catch (InvalidArgumentException | SipException | ParseException e) {
@ -851,4 +851,65 @@ public class DeviceServiceImpl implements IDeviceService {
throw new ControllerException(ErrorCode.ERROR100.getCode(), "命令发送失败: " + e.getMessage());
}
}
@Override
public void alarm(Device device, String startPriority, String endPriority, String alarmMethod, String alarmType, String startTime, String endTime, ErrorCallback<Object> callback) {
if (!userSetting.getServerId().equals(device.getServerId())) {
WVPResult<String> result = redisRpcService.alarm(device.getServerId(), device, startPriority, endPriority, alarmMethod, alarmType, startTime, endTime);
callback.run(result.getCode(), result.getMsg(), result.getData());
return;
}
String startAlarmTime = "";
if (startTime != null) {
startAlarmTime = DateUtil.yyyy_MM_dd_HH_mm_ssToISO8601(startTime);
}
String endAlarmTime = "";
if (startTime != null) {
endAlarmTime = DateUtil.yyyy_MM_dd_HH_mm_ssToISO8601(endTime);
}
try {
sipCommander.alarmInfoQuery(device, startPriority, endPriority, alarmMethod, alarmType, startAlarmTime, endAlarmTime, callback);
} catch (InvalidArgumentException | SipException | ParseException e) {
log.error("[命令发送失败] 获取设备状态: {}", e.getMessage());
callback.run(ErrorCode.ERROR100.getCode(), "命令发送: " + e.getMessage(), null);
throw new ControllerException(ErrorCode.ERROR100.getCode(), "命令发送失败: " + e.getMessage());
}
}
@Override
public void deviceInfo(Device device, ErrorCallback<Object> callback) {
if (!userSetting.getServerId().equals(device.getServerId())) {
WVPResult<Object> result = redisRpcService.deviceInfo(device.getServerId(), device);
callback.run(result.getCode(), result.getMsg(), result.getData());
return;
}
try {
sipCommander.deviceInfoQuery(device, callback);
} catch (InvalidArgumentException | SipException | ParseException e) {
log.error("[命令发送失败] 获取设备信息: {}", e.getMessage());
callback.run(ErrorCode.ERROR100.getCode(), "命令发送: " + e.getMessage(), null);
throw new ControllerException(ErrorCode.ERROR100.getCode(), "命令发送失败: " + e.getMessage());
}
}
@Override
public void queryPreset(Device device, String channelId, ErrorCallback<Object> callback) {
if (!userSetting.getServerId().equals(device.getServerId())) {
WVPResult<Object> result = redisRpcService.queryPreset(device.getServerId(), device, channelId);
callback.run(result.getCode(), result.getMsg(), result.getData());
return;
}
try {
sipCommander.presetQuery(device, channelId, callback);
} catch (InvalidArgumentException | SipException | ParseException e) {
log.error("[命令发送失败] 预制位查询: {}", e.getMessage());
callback.run(ErrorCode.ERROR100.getCode(), "命令发送: " + e.getMessage(), null);
throw new ControllerException(ErrorCode.ERROR100.getCode(), "命令发送失败: " + e.getMessage());
}
}
}

View File

@ -1,92 +0,0 @@
package com.genersoft.iot.vmp.gb28181.session;
import com.genersoft.iot.vmp.gb28181.bean.*;
import com.genersoft.iot.vmp.gb28181.event.record.RecordInfoEventListener;
import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder;
import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import java.time.Instant;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
/**
* @author lin
*/
@Component
public class RecordDataCatch {
public static Map<String, RecordInfo> data = new ConcurrentHashMap<>();
@Autowired
private DeferredResultHolder deferredResultHolder;
@Autowired
private RecordInfoEventListener recordEndEventListener;
public int put(String deviceId,String channelId, String sn, int sumNum, List<RecordItem> recordItems) {
String key = deviceId + sn;
RecordInfo recordInfo = data.get(key);
if (recordInfo == null) {
recordInfo = new RecordInfo();
recordInfo.setDeviceId(deviceId);
recordInfo.setChannelId(channelId);
recordInfo.setSn(sn.trim());
recordInfo.setSumNum(sumNum);
recordInfo.setRecordList(Collections.synchronizedList(new ArrayList<>()));
recordInfo.setLastTime(Instant.now());
recordInfo.getRecordList().addAll(recordItems);
data.put(key, recordInfo);
}else {
// 同一个设备的通道同步请求只考虑一个,其他的直接忽略
if (!Objects.equals(sn.trim(), recordInfo.getSn())) {
return 0;
}
recordInfo.getRecordList().addAll(recordItems);
recordInfo.setLastTime(Instant.now());
}
return recordInfo.getRecordList().size();
}
@Scheduled(fixedRate = 5 * 1000) //每5秒执行一次, 发现数据5秒未更新则移除数据并认为数据接收超时
private void timerTask(){
Set<String> keys = data.keySet();
// 获取五秒前的时刻
Instant instantBefore5S = Instant.now().minusMillis(TimeUnit.SECONDS.toMillis(5));
for (String key : keys) {
RecordInfo recordInfo = data.get(key);
// 超过五秒收不到消息任务超时, 只更新这一部分数据
if ( recordInfo.getLastTime().isBefore(instantBefore5S)) {
// 处理录像数据, 返回给前端
String msgKey = DeferredResultHolder.CALLBACK_CMD_RECORDINFO + recordInfo.getDeviceId() + recordInfo.getSn();
// 对数据进行排序
Collections.sort(recordInfo.getRecordList());
RequestMessage msg = new RequestMessage();
msg.setKey(msgKey);
msg.setData(recordInfo);
deferredResultHolder.invokeAllResult(msg);
recordEndEventListener.delEndEventHandler(recordInfo.getDeviceId(),recordInfo.getChannelId());
data.remove(key);
}
}
}
public boolean isComplete(String deviceId, String sn) {
RecordInfo recordInfo = data.get(deviceId + sn);
return recordInfo != null && recordInfo.getRecordList().size() == recordInfo.getSumNum();
}
public RecordInfo getRecordInfo(String deviceId, String sn) {
return data.get(deviceId + sn);
}
public void remove(String deviceId, String sn) {
data.remove(deviceId + sn);
}
}

View File

@ -18,20 +18,6 @@ import java.util.concurrent.ConcurrentHashMap;
@SuppressWarnings(value = {"rawtypes", "unchecked"})
@Component
public class DeferredResultHolder {
public static final String CALLBACK_CMD_DEVICESTATUS = "CALLBACK_DEVICESTATUS";
public static final String CALLBACK_CMD_DEVICEINFO = "CALLBACK_DEVICEINFO";
public static final String CALLBACK_CMD_DEVICECONTROL = "CALLBACK_DEVICECONTROL";
public static final String CALLBACK_CMD_DEVICECONFIG = "CALLBACK_DEVICECONFIG";
public static final String CALLBACK_CMD_CONFIGDOWNLOAD = "CALLBACK_CONFIGDOWNLOAD";
public static final String CALLBACK_CMD_CATALOG = "CALLBACK_CATALOG";
public static final String CALLBACK_CMD_RECORDINFO = "CALLBACK_RECORDINFO";
public static final String CALLBACK_CMD_PLAY = "CALLBACK_PLAY";
@ -39,20 +25,11 @@ public class DeferredResultHolder {
public static final String CALLBACK_CMD_DOWNLOAD = "CALLBACK_DOWNLOAD";
public static final String CALLBACK_CMD_PROXY = "CALLBACK_PROXY";
public static final String CALLBACK_CMD_STOP = "CALLBACK_STOP";
public static final String UPLOAD_FILE_CHANNEL = "UPLOAD_FILE_CHANNEL";
public static final String CALLBACK_CMD_MOBILE_POSITION = "CALLBACK_CMD_MOBILE_POSITION";
public static final String CALLBACK_CMD_PRESETQUERY = "CALLBACK_PRESETQUERY";
public static final String CALLBACK_CMD_ALARM = "CALLBACK_ALARM";
public static final String CALLBACK_CMD_BROADCAST = "CALLBACK_BROADCAST";
public static final String CALLBACK_CMD_SNAP= "CALLBACK_SNAP";
private Map<String, Map<String, DeferredResultEx>> map = new ConcurrentHashMap<>();

View File

@ -205,7 +205,7 @@ public interface ISIPCommander {
* @param alarmMethod
* @param alarmType
*/
void alarmCmd(Device device, String alarmMethod, String alarmType, ErrorCallback<String> callback) throws InvalidArgumentException, SipException, ParseException;
void alarmResetCmd(Device device, String alarmMethod, String alarmType, ErrorCallback<String> callback) throws InvalidArgumentException, SipException, ParseException;
/**
* ,IDR
@ -242,11 +242,12 @@ public interface ISIPCommander {
/**
*
*
* @param device
* @return
*
* @param device
* @param callback
* @return
*/
void deviceInfoQuery(Device device) throws InvalidArgumentException, SipException, ParseException;
void deviceInfoQuery(Device device, ErrorCallback<Object> callback) throws InvalidArgumentException, SipException, ParseException;
/**
*
@ -278,7 +279,7 @@ public interface ISIPCommander {
* @return true =
*/
void alarmInfoQuery(Device device, String startPriority, String endPriority, String alarmMethod,
String alarmType, String startTime, String endTime, SipSubscribe.Event errorEvent) throws InvalidArgumentException, SipException, ParseException;
String alarmType, String startTime, String endTime, ErrorCallback<Object> callback) throws InvalidArgumentException, SipException, ParseException;
/**
*
@ -294,7 +295,7 @@ public interface ISIPCommander {
*
* @param device
*/
void presetQuery(Device device, String channelId, SipSubscribe.Event errorEvent) throws InvalidArgumentException, SipException, ParseException;
void presetQuery(Device device, String channelId, ErrorCallback<Object> callback) throws InvalidArgumentException, SipException, ParseException;
/**
*

View File

@ -771,7 +771,7 @@ public class SIPCommander implements ISIPCommander {
* @param device
*/
@Override
public void alarmCmd(Device device, String alarmMethod, String alarmType, ErrorCallback<String> callback) throws InvalidArgumentException, SipException, ParseException {
public void alarmResetCmd(Device device, String alarmMethod, String alarmType, ErrorCallback<String> callback) throws InvalidArgumentException, SipException, ParseException {
String cmdType = "DeviceControl";
int sn = (int) ((Math.random() * 9 + 1) * 100000);
@ -975,25 +975,35 @@ public class SIPCommander implements ISIPCommander {
/**
*
*
* @param device
* @param device
* @param callback
*/
@Override
public void deviceInfoQuery(Device device) throws InvalidArgumentException, SipException, ParseException {
public void deviceInfoQuery(Device device, ErrorCallback<Object> callback) throws InvalidArgumentException, SipException, ParseException {
String cmdType = "DeviceInfo";
String sn = (int) ((Math.random() * 9 + 1) * 100000) + "";
StringBuffer catalogXml = new StringBuffer(200);
String charset = device.getCharset();
catalogXml.append("<?xml version=\"1.0\" encoding=\"" + charset + "\"?>\r\n");
catalogXml.append("<Query>\r\n");
catalogXml.append("<CmdType>DeviceInfo</CmdType>\r\n");
catalogXml.append("<SN>" + (int) ((Math.random() * 9 + 1) * 100000) + "</SN>\r\n");
catalogXml.append("<CmdType>" + cmdType +"</CmdType>\r\n");
catalogXml.append("<SN>" + sn + "</SN>\r\n");
catalogXml.append("<DeviceID>" + device.getDeviceId() + "</DeviceID>\r\n");
catalogXml.append("</Query>\r\n");
MessageEvent<Object> messageEvent = MessageEvent.getInstance(cmdType, sn, device.getDeviceId(), 1000L, callback);
messageSubscribe.addSubscribe(messageEvent);
Request request = headerProvider.createMessageRequest(device, catalogXml.toString(), SipUtils.getNewViaTag(), SipUtils.getNewFromTag(), null,sipSender.getNewCallIdHeader(sipLayer.getLocalIp(device.getLocalIp()),device.getTransport()));
sipSender.transmitRequest(sipLayer.getLocalIp(device.getLocalIp()), request);
sipSender.transmitRequest(sipLayer.getLocalIp(device.getLocalIp()), request, eventResult -> {
messageSubscribe.removeSubscribe(messageEvent.getKey());
if (callback != null) {
callback.run(ErrorCode.ERROR100.getCode(), "失败," + eventResult.msg, null);
}
});
}
@ -1079,14 +1089,17 @@ public class SIPCommander implements ISIPCommander {
*/
@Override
public void alarmInfoQuery(Device device, String startPriority, String endPriority, String alarmMethod, String alarmType,
String startTime, String endTime, SipSubscribe.Event errorEvent) throws InvalidArgumentException, SipException, ParseException {
String startTime, String endTime, ErrorCallback<Object> callback) throws InvalidArgumentException, SipException, ParseException {
String cmdType = "Alarm";
String sn = (int) ((Math.random() * 9 + 1) * 100000) + "";
StringBuffer cmdXml = new StringBuffer(200);
String charset = device.getCharset();
cmdXml.append("<?xml version=\"1.0\" encoding=\"" + charset + "\"?>\r\n");
cmdXml.append("<Query>\r\n");
cmdXml.append("<CmdType>Alarm</CmdType>\r\n");
cmdXml.append("<SN>" + (int) ((Math.random() * 9 + 1) * 100000) + "</SN>\r\n");
cmdXml.append("<CmdType>" + cmdType + "</CmdType>\r\n");
cmdXml.append("<SN>" + sn + "</SN>\r\n");
cmdXml.append("<DeviceID>" + device.getDeviceId() + "</DeviceID>\r\n");
if (!ObjectUtils.isEmpty(startPriority)) {
cmdXml.append("<StartAlarmPriority>" + startPriority + "</StartAlarmPriority>\r\n");
@ -1108,10 +1121,14 @@ public class SIPCommander implements ISIPCommander {
}
cmdXml.append("</Query>\r\n");
MessageEvent<Object> messageEvent = MessageEvent.getInstance(cmdType, sn, device.getDeviceId(), 1000L, callback);
messageSubscribe.addSubscribe(messageEvent);
Request request = headerProvider.createMessageRequest(device, cmdXml.toString(), null, SipUtils.getNewFromTag(), null,sipSender.getNewCallIdHeader(sipLayer.getLocalIp(device.getLocalIp()),device.getTransport()));
sipSender.transmitRequest(sipLayer.getLocalIp(device.getLocalIp()), request, errorEvent);
sipSender.transmitRequest(sipLayer.getLocalIp(device.getLocalIp()), request, eventResult -> {
messageSubscribe.removeSubscribe(messageEvent.getKey());
callback.run(ErrorCode.ERROR100.getCode(), "失败," + eventResult.msg, null);
});
}
/**
@ -1156,14 +1173,17 @@ public class SIPCommander implements ISIPCommander {
* @param device
*/
@Override
public void presetQuery(Device device, String channelId, SipSubscribe.Event errorEvent) throws InvalidArgumentException, SipException, ParseException {
public void presetQuery(Device device, String channelId, ErrorCallback<Object> callback) throws InvalidArgumentException, SipException, ParseException {
String cmdType = "PresetQuery";
int sn = (int) ((Math.random() * 9 + 1) * 100000);
StringBuffer cmdXml = new StringBuffer(200);
String charset = device.getCharset();
cmdXml.append("<?xml version=\"1.0\" encoding=\"" + charset + "\"?>\r\n");
cmdXml.append("<Query>\r\n");
cmdXml.append("<CmdType>PresetQuery</CmdType>\r\n");
cmdXml.append("<SN>" + (int) ((Math.random() * 9 + 1) * 100000) + "</SN>\r\n");
cmdXml.append("<CmdType>" + cmdType + "</CmdType>\r\n");
cmdXml.append("<SN>" + sn + "</SN>\r\n");
if (ObjectUtils.isEmpty(channelId)) {
cmdXml.append("<DeviceID>" + device.getDeviceId() + "</DeviceID>\r\n");
} else {
@ -1171,9 +1191,14 @@ public class SIPCommander implements ISIPCommander {
}
cmdXml.append("</Query>\r\n");
MessageEvent<Object> messageEvent = MessageEvent.getInstance(cmdType, sn + "", channelId, 1000L, callback);
messageSubscribe.addSubscribe(messageEvent);
Request request = headerProvider.createMessageRequest(device, cmdXml.toString(), null, SipUtils.getNewFromTag(), null,sipSender.getNewCallIdHeader(sipLayer.getLocalIp(device.getLocalIp()),device.getTransport()));
sipSender.transmitRequest(sipLayer.getLocalIp(device.getLocalIp()), request, errorEvent);
sipSender.transmitRequest(sipLayer.getLocalIp(device.getLocalIp()), request, eventResult -> {
messageSubscribe.removeSubscribe(messageEvent.getKey());
callback.run(ErrorCode.ERROR100.getCode(), "失败," + eventResult.msg, null);
});
}
/**

View File

@ -424,7 +424,7 @@ public class DeviceControlQueryMessageHandler extends SIPRequestProcessorParent
}
}
try {
cmder.alarmCmd(device, alarmMethod, alarmType, (code, msg, data) -> {
cmder.alarmResetCmd(device, alarmMethod, alarmType, (code, msg, data) -> {
if (code == ErrorCode.SUCCESS.getCode()) {
onOk(request);
}else {

View File

@ -4,18 +4,22 @@ import com.alibaba.fastjson2.JSONObject;
import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.bean.Platform;
import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder;
import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.IMessageHandler;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.response.ResponseMessageHandler;
import com.genersoft.iot.vmp.gb28181.utils.XmlUtil;
import gov.nist.javax.sip.message.SIPRequest;
import lombok.extern.slf4j.Slf4j;
import org.dom4j.Element;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.sip.InvalidArgumentException;
import javax.sip.RequestEvent;
import javax.sip.SipException;
import javax.sip.message.Response;
import java.text.ParseException;
@Slf4j
@Component
@ -36,18 +40,18 @@ public class AlarmResponseMessageHandler extends SIPRequestProcessorParent imple
@Override
public void handForDevice(RequestEvent evt, Device device, Element rootElement) {
Element deviceIdElement = rootElement.element("DeviceID");
String channelId = deviceIdElement.getText().toString();
String key = DeferredResultHolder.CALLBACK_CMD_ALARM + device.getDeviceId() + channelId;
// 回复200 OK
try {
responseAck((SIPRequest) evt.getRequest(), Response.OK);
} catch (SipException | InvalidArgumentException | ParseException e) {
log.error("[命令发送失败] 目录查询回复: {}", e.getMessage());
}
JSONObject json = new JSONObject();
XmlUtil.node2Json(rootElement, json);
if (log.isDebugEnabled()) {
log.debug(json.toJSONString());
}
RequestMessage msg = new RequestMessage();
msg.setKey(key);
msg.setData(json);
deferredResultHolder.invokeAllResult(msg);
responseMessageHandler.handMessageEvent(rootElement, null);
}
@Override

View File

@ -1,69 +0,0 @@
package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.response.cmd;
import com.alibaba.fastjson2.JSONObject;
import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.bean.Platform;
import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder;
import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.IMessageHandler;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.response.ResponseMessageHandler;
import com.genersoft.iot.vmp.gb28181.utils.XmlUtil;
import gov.nist.javax.sip.message.SIPRequest;
import lombok.extern.slf4j.Slf4j;
import org.dom4j.Element;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.sip.InvalidArgumentException;
import javax.sip.RequestEvent;
import javax.sip.SipException;
import javax.sip.message.Response;
import java.text.ParseException;
import static com.genersoft.iot.vmp.gb28181.utils.XmlUtil.getText;
@Slf4j
@Component
public class DeviceConfigResponseMessageHandler extends SIPRequestProcessorParent implements InitializingBean, IMessageHandler {
private final String cmdType = "DeviceConfig";
@Autowired
private ResponseMessageHandler responseMessageHandler;
@Autowired
private DeferredResultHolder deferredResultHolder;
@Override
public void afterPropertiesSet() throws Exception {
responseMessageHandler.addHandler(cmdType, this);
}
@Override
public void handForDevice(RequestEvent evt, Device device, Element element) {
JSONObject json = new JSONObject();
try {
// 回复200 OK
responseAck((SIPRequest) evt.getRequest(), Response.OK);
} catch (SipException | InvalidArgumentException | ParseException e) {
log.error("[命令发送失败] 设备配置查询: {}", e.getMessage());
}
XmlUtil.node2Json(element, json);
String channelId = getText(element, "DeviceID");
if (log.isDebugEnabled()) {
log.debug(json.toJSONString());
}
String key = DeferredResultHolder.CALLBACK_CMD_DEVICECONFIG + device.getDeviceId() + channelId;
RequestMessage msg = new RequestMessage();
msg.setKey(key);
msg.setData(json);
deferredResultHolder.invokeAllResult(msg);
}
@Override
public void handForPlatform(RequestEvent evt, Platform parentPlatform, Element rootElement) {
}
}

View File

@ -1,75 +0,0 @@
package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.response.cmd;
import com.alibaba.fastjson2.JSONObject;
import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.bean.Platform;
import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder;
import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.IMessageHandler;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.response.ResponseMessageHandler;
import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
import com.genersoft.iot.vmp.vmanager.bean.WVPResult;
import gov.nist.javax.sip.message.SIPRequest;
import lombok.extern.slf4j.Slf4j;
import org.dom4j.Element;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.sip.InvalidArgumentException;
import javax.sip.RequestEvent;
import javax.sip.SipException;
import javax.sip.message.Response;
import java.text.ParseException;
import static com.genersoft.iot.vmp.gb28181.utils.XmlUtil.getText;
@Slf4j
@Component
public class DeviceControlResponseMessageHandler extends SIPRequestProcessorParent implements InitializingBean, IMessageHandler {
private final String cmdType = "DeviceControl";
@Autowired
private ResponseMessageHandler responseMessageHandler;
@Autowired
private DeferredResultHolder deferredResultHolder;
@Override
public void afterPropertiesSet() throws Exception {
responseMessageHandler.addHandler(cmdType, this);
}
@Override
public void handForDevice(RequestEvent evt, Device device, Element element) {
// 此处是对本平台发出DeviceControl指令的应答
try {
responseAck((SIPRequest) evt.getRequest(), Response.OK);
} catch (SipException | InvalidArgumentException | ParseException e) {
log.error("[命令发送失败] 国标级联 设备控制: {}", e.getMessage());
}
JSONObject json = new JSONObject();
String channelId = getText(element, "DeviceID");
String result = getText(element, "Result");
RequestMessage msg = new RequestMessage();
String key = DeferredResultHolder.CALLBACK_CMD_DEVICECONTROL + device.getDeviceId() + channelId;
msg.setKey(key);
if ("OK".equalsIgnoreCase(result)) {
msg.setData(WVPResult.success());
}else {
msg.setData(WVPResult.fail(ErrorCode.ERROR100));
}
if (log.isDebugEnabled()) {
log.debug(json.toJSONString());
}
deferredResultHolder.invokeAllResult(msg);
}
@Override
public void handForPlatform(RequestEvent evt, Platform parentPlatform, Element rootElement) {
}
}

View File

@ -2,12 +2,10 @@ package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.respon
import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.bean.Platform;
import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder;
import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage;
import com.genersoft.iot.vmp.gb28181.service.IDeviceService;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.IMessageHandler;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.response.ResponseMessageHandler;
import com.genersoft.iot.vmp.gb28181.service.IDeviceService;
import gov.nist.javax.sip.message.SIPRequest;
import lombok.extern.slf4j.Slf4j;
import org.dom4j.DocumentException;
@ -37,9 +35,6 @@ public class DeviceInfoResponseMessageHandler extends SIPRequestProcessorParent
@Autowired
private ResponseMessageHandler responseMessageHandler;
@Autowired
private DeferredResultHolder deferredResultHolder;
@Autowired
private IDeviceService deviceService;
@ -70,9 +65,6 @@ public class DeviceInfoResponseMessageHandler extends SIPRequestProcessorParent
}
return;
}
Element deviceIdElement = rootElement.element("DeviceID");
String channelId = deviceIdElement.getTextTrim();
String key = DeferredResultHolder.CALLBACK_CMD_DEVICEINFO + device.getDeviceId() + channelId;
device.setName(getText(rootElement, "DeviceName"));
device.setManufacturer(getText(rootElement, "Manufacturer"));
@ -82,11 +74,8 @@ public class DeviceInfoResponseMessageHandler extends SIPRequestProcessorParent
device.setStreamMode("TCP-PASSIVE");
}
deviceService.updateDevice(device);
responseMessageHandler.handMessageEvent(rootElement, device);
RequestMessage msg = new RequestMessage();
msg.setKey(key);
msg.setData(device);
deferredResultHolder.invokeAllResult(msg);
} catch (DocumentException e) {
throw new RuntimeException(e);
}

View File

@ -3,14 +3,11 @@ package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.respon
import com.alibaba.fastjson2.JSONObject;
import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.bean.Platform;
import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder;
import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage;
import com.genersoft.iot.vmp.gb28181.service.IDeviceService;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.IMessageHandler;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.response.ResponseMessageHandler;
import com.genersoft.iot.vmp.gb28181.utils.XmlUtil;
import com.genersoft.iot.vmp.gb28181.service.IDeviceService;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import gov.nist.javax.sip.message.SIPRequest;
import lombok.extern.slf4j.Slf4j;
import org.dom4j.Element;
@ -33,15 +30,9 @@ public class DeviceStatusResponseMessageHandler extends SIPRequestProcessorParen
@Autowired
private ResponseMessageHandler responseMessageHandler;
@Autowired
private DeferredResultHolder deferredResultHolder;
@Autowired
private IDeviceService deviceService;
@Autowired
private IRedisCatchStorage redisCatchStorage;
@Override
public void afterPropertiesSet() throws Exception {
responseMessageHandler.addHandler(cmdType, this);
@ -60,9 +51,7 @@ public class DeviceStatusResponseMessageHandler extends SIPRequestProcessorParen
} catch (SipException | InvalidArgumentException | ParseException e) {
log.error("[命令发送失败] 国标级联 设备状态应答回复200OK: {}", e.getMessage());
}
Element deviceIdElement = element.element("DeviceID");
Element onlineElement = element.element("Online");
String channelId = deviceIdElement.getText();
JSONObject json = new JSONObject();
XmlUtil.node2Json(element, json);
if (log.isDebugEnabled()) {
@ -74,10 +63,8 @@ public class DeviceStatusResponseMessageHandler extends SIPRequestProcessorParen
}else {
deviceService.offline(device.getDeviceId(), "设备状态查询结果:" + text.trim());
}
RequestMessage msg = new RequestMessage();
msg.setKey(DeferredResultHolder.CALLBACK_CMD_DEVICESTATUS + device.getDeviceId());
msg.setData(json);
deferredResultHolder.invokeAllResult(msg);
responseMessageHandler.handMessageEvent(element, text);
}
@Override

View File

@ -4,7 +4,6 @@ import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.bean.Platform;
import com.genersoft.iot.vmp.gb28181.bean.Preset;
import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder;
import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.IMessageHandler;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.response.ResponseMessageHandler;
@ -25,8 +24,6 @@ import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import static com.genersoft.iot.vmp.gb28181.utils.XmlUtil.getText;
/**
*
*/
@ -68,8 +65,6 @@ public class PresetQueryResponseMessageHandler extends SIPRequestProcessorParent
Element presetListNumElement = rootElement.element("PresetList");
Element snElement = rootElement.element("SN");
//该字段可能为通道或则设备的id
String deviceId = getText(rootElement, "DeviceID");
String key = DeferredResultHolder.CALLBACK_CMD_PRESETQUERY + deviceId;
if (snElement == null || presetListNumElement == null) {
try {
responseAck(request, Response.BAD_REQUEST, "xml error");
@ -98,10 +93,7 @@ public class PresetQueryResponseMessageHandler extends SIPRequestProcessorParent
presetQuerySipReqList.add(presetQuerySipReq);
}
}
RequestMessage requestMessage = new RequestMessage();
requestMessage.setKey(key);
requestMessage.setData(presetQuerySipReqList);
deferredResultHolder.invokeAllResult(requestMessage);
responseMessageHandler.handMessageEvent(rootElement, presetQuerySipReqList);
try {
responseAck(request, Response.OK);
} catch (InvalidArgumentException | ParseException | SipException e) {

View File

@ -4,7 +4,6 @@ import com.genersoft.iot.vmp.common.CommonCallback;
import com.genersoft.iot.vmp.common.StreamInfo;
import com.genersoft.iot.vmp.gb28181.bean.*;
import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent;
import com.genersoft.iot.vmp.service.bean.ErrorCallback;
import com.genersoft.iot.vmp.vmanager.bean.WVPResult;
public interface IRedisRpcService {
@ -59,4 +58,10 @@ public interface IRedisRpcService {
void dragZoomOut(String serverId, Device device, String channelId, int length, int width, int midpointx, int midpointy, int lengthx, int lengthy);
WVPResult<String> deviceStatus(String serverId, Device device);
WVPResult<String> alarm(String serverId, Device device, String startPriority, String endPriority, String alarmMethod, String alarmType, String startTime, String endTime);
WVPResult<Object> deviceInfo(String serverId, Device device);
WVPResult<Object> queryPreset(String serverId, Device device, String channelId);
}

View File

@ -376,6 +376,41 @@ public class RedisRpcDeviceController extends RpcController {
return null;
}
@RedisRpcMapping("alarm")
public RedisRpcResponse alarm(RedisRpcRequest request) {
JSONObject paramJson = JSONObject.parseObject(request.getParam().toString());
String deviceId = paramJson.getString("deviceId");
String startPriority = paramJson.getString("startPriority");
String endPriority = paramJson.getString("endPriority");
String alarmMethod = paramJson.getString("alarmMethod");
String alarmType = paramJson.getString("alarmType");
String startTime = paramJson.getString("startTime");
String endTime = paramJson.getString("endTime");
Device device = deviceService.getDeviceByDeviceId(deviceId);
RedisRpcResponse response = request.getResponse();
if (device == null || !userSetting.getServerId().equals(device.getServerId())) {
response.setStatusCode(ErrorCode.ERROR400.getCode());
response.setBody("param error");
return response;
}
try {
deviceService.alarm(device, startPriority, endPriority, alarmMethod, alarmType, startTime, endTime, (code, msg, data) -> {
response.setStatusCode(ErrorCode.SUCCESS.getCode());
response.setBody(new WVPResult<>(code, msg, data));
// 手动发送结果
sendResponse(response);
});
}catch (ControllerException e) {
response.setStatusCode(e.getCode());
response.setBody(WVPResult.fail(ErrorCode.ERROR100.getCode(), e.getMsg()));
sendResponse(response);
}
return null;
}
@RedisRpcMapping("deviceStatus")
public RedisRpcResponse deviceStatus(RedisRpcRequest request) {
String deviceId = request.getParam().toString();
@ -403,5 +438,61 @@ public class RedisRpcDeviceController extends RpcController {
return null;
}
@RedisRpcMapping("info")
public RedisRpcResponse info(RedisRpcRequest request) {
String deviceId = request.getParam().toString();
Device device = deviceService.getDeviceByDeviceId(deviceId);
RedisRpcResponse response = request.getResponse();
if (device == null || !userSetting.getServerId().equals(device.getServerId())) {
response.setStatusCode(ErrorCode.ERROR400.getCode());
response.setBody("param error");
return response;
}
try {
deviceService.deviceInfo(device, (code, msg, data) -> {
response.setStatusCode(ErrorCode.SUCCESS.getCode());
response.setBody(new WVPResult<>(code, msg, data));
// 手动发送结果
sendResponse(response);
});
}catch (ControllerException e) {
response.setStatusCode(e.getCode());
response.setBody(WVPResult.fail(ErrorCode.ERROR100.getCode(), e.getMsg()));
sendResponse(response);
}
return null;
}
@RedisRpcMapping("info")
public RedisRpcResponse queryPreset(RedisRpcRequest request) {
JSONObject paramJson = JSONObject.parseObject(request.getParam().toString());
String deviceId = paramJson.getString("deviceId");
String channelId = paramJson.getString("channelId");
Device device = deviceService.getDeviceByDeviceId(deviceId);
RedisRpcResponse response = request.getResponse();
if (device == null || !userSetting.getServerId().equals(device.getServerId())) {
response.setStatusCode(ErrorCode.ERROR400.getCode());
response.setBody("param error");
return response;
}
try {
deviceService.queryPreset(device, channelId, (code, msg, data) -> {
response.setStatusCode(ErrorCode.SUCCESS.getCode());
response.setBody(new WVPResult<>(code, msg, data));
// 手动发送结果
sendResponse(response);
});
}catch (ControllerException e) {
response.setStatusCode(e.getCode());
response.setBody(WVPResult.fail(ErrorCode.ERROR100.getCode(), e.getMsg()));
sendResponse(response);
}
return null;
}
}

View File

@ -393,4 +393,41 @@ public class RedisRpcServiceImpl implements IRedisRpcService {
RedisRpcResponse response = redisRpcConfig.request(request, 50, TimeUnit.MILLISECONDS);
return JSON.parseObject(response.getBody().toString(), WVPResult.class);
}
@Override
public WVPResult<Object> deviceInfo(String serverId, Device device) {
RedisRpcRequest request = buildRequest("device/info", device.getDeviceId());
request.setToId(serverId);
RedisRpcResponse response = redisRpcConfig.request(request, 50, TimeUnit.MILLISECONDS);
return JSON.parseObject(response.getBody().toString(), WVPResult.class);
}
@Override
public WVPResult<Object> queryPreset(String serverId, Device device, String channelId) {
JSONObject jsonObject = new JSONObject();
jsonObject.put("device", device.getDeviceId());
jsonObject.put("channelId", channelId);
RedisRpcRequest request = buildRequest("device/queryPreset", jsonObject);
request.setToId(serverId);
RedisRpcResponse response = redisRpcConfig.request(request, 60000, TimeUnit.MILLISECONDS);
return JSON.parseObject(response.getBody().toString(), WVPResult.class);
}
@Override
public WVPResult<String> alarm(String serverId, Device device, String startPriority, String endPriority,
String alarmMethod, String alarmType, String startTime, String endTime) {
JSONObject jsonObject = new JSONObject();
jsonObject.put("device", device.getDeviceId());
// jsonObject.put("channelId", channelId);
jsonObject.put("startPriority", startPriority);
jsonObject.put("endPriority", endPriority);
jsonObject.put("alarmMethod", alarmMethod);
jsonObject.put("alarmType", alarmType);
jsonObject.put("startTime", startTime);
jsonObject.put("endTime", endTime);
RedisRpcRequest request = buildRequest("device/alarm", jsonObject);
request.setToId(serverId);
RedisRpcResponse response = redisRpcConfig.request(request, 50, TimeUnit.MILLISECONDS);
return JSON.parseObject(response.getBody().toString(), WVPResult.class);
}
}

View File

@ -3,6 +3,7 @@ package com.genersoft.iot.vmp.utils;
import org.apache.commons.lang3.ObjectUtils;
import javax.validation.constraints.NotNull;
import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalDateTime;
@ -66,8 +67,7 @@ public class DateUtil {
public static final DateTimeFormatter DateFormatter = DateTimeFormatter.ofPattern(date_PATTERN, Locale.getDefault()).withZone(ZoneId.of(zoneStr));
public static final DateTimeFormatter urlFormatter = DateTimeFormatter.ofPattern(URL_PATTERN, Locale.getDefault()).withZone(ZoneId.of(zoneStr));
public static String yyyy_MM_dd_HH_mm_ssToISO8601(String formatTime) {
public static String yyyy_MM_dd_HH_mm_ssToISO8601(@NotNull String formatTime) {
return formatterISO8601.format(formatter.parse(formatTime));
}

View File

@ -2,20 +2,19 @@ package com.genersoft.iot.vmp.web.gb28181;
import com.alibaba.fastjson2.JSONArray;
import com.alibaba.fastjson2.JSONObject;
import com.genersoft.iot.vmp.conf.exception.ControllerException;
import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.bean.Preset;
import com.genersoft.iot.vmp.gb28181.service.IDeviceChannelService;
import com.genersoft.iot.vmp.gb28181.service.IDeviceService;
import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder;
import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander;
import com.genersoft.iot.vmp.vmanager.bean.DeferredResultEx;
import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
import com.genersoft.iot.vmp.vmanager.bean.WVPResult;
import com.genersoft.iot.vmp.web.gb28181.dto.DeviceChannelExtend;
import com.github.pagehelper.PageInfo;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.util.Assert;
import org.springframework.util.ObjectUtils;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
@ -23,9 +22,6 @@ import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.context.request.async.DeferredResult;
import javax.sip.InvalidArgumentException;
import javax.sip.SipException;
import java.text.ParseException;
import java.util.*;
/**
@ -185,7 +181,7 @@ public class ApiDeviceController {
* @return
*/
@GetMapping(value = "/fetchpreset")
private DeferredResult<Object> list(String serial,
private DeferredResult<WVPResult<Object>> list(String serial,
@RequestParam(required = false)Integer channel,
@RequestParam(required = false)String code,
@RequestParam(required = false)Boolean fill,
@ -197,55 +193,34 @@ public class ApiDeviceController {
}
Device device = deviceService.getDeviceByDeviceId(serial);
String uuid = UUID.randomUUID().toString();
String key = DeferredResultHolder.CALLBACK_CMD_PRESETQUERY + (ObjectUtils.isEmpty(code) ? serial : code);
DeferredResult<Object> result = new DeferredResult<> (timeout * 1000L);
DeferredResultEx<Object> deferredResultEx = new DeferredResultEx<>(result);
result.onTimeout(()->{
log.warn("<模拟接口> 获取设备预置位超时");
// 释放rtpserver
RequestMessage msg = new RequestMessage();
msg.setId(uuid);
msg.setKey(key);
msg.setData("wait for presetquery timeout["+timeout+"s]");
resultHolder.invokeResult(msg);
});
if (resultHolder.exist(key, null)) {
return result;
}
deferredResultEx.setFilter(filterResult->{
List<Preset> presetQuerySipReqList = (List<Preset>)filterResult;
HashMap<String, Object> resultMap = new HashMap<>();
resultMap.put("DeviceID", code);
resultMap.put("Result", "OK");
resultMap.put("SumNum", presetQuerySipReqList.size());
ArrayList<Map<String, Object>> presetItemList = new ArrayList<>(presetQuerySipReqList.size());
for (Preset presetQuerySipReq : presetQuerySipReqList) {
Map<String, Object> item = new HashMap<>();
item.put("PresetID", presetQuerySipReq.getPresetId());
item.put("PresetName", presetQuerySipReq.getPresetName());
item.put("PresetEnable", true);
presetItemList.add(item);
Assert.notNull(device, "设备不存在");
DeferredResult<WVPResult<Object>> deferredResult = new DeferredResult<> (timeout * 1000L);
deviceService.queryPreset(device, code, (resultCode, msg, data) -> {
if (resultCode == ErrorCode.SUCCESS.getCode()) {
List<Preset> presetQuerySipReqList = (List<Preset>)data;
HashMap<String, Object> resultMap = new HashMap<>();
resultMap.put("DeviceID", code);
resultMap.put("Result", "OK");
resultMap.put("SumNum", presetQuerySipReqList.size());
ArrayList<Map<String, Object>> presetItemList = new ArrayList<>(presetQuerySipReqList.size());
for (Preset presetQuerySipReq : presetQuerySipReqList) {
Map<String, Object> item = new HashMap<>();
item.put("PresetID", presetQuerySipReq.getPresetId());
item.put("PresetName", presetQuerySipReq.getPresetName());
item.put("PresetEnable", true);
presetItemList.add(item);
}
resultMap.put("PresetItemList",presetItemList );
deferredResult.setResult(new WVPResult<>(resultCode, msg, resultMap));
}else {
deferredResult.setResult(new WVPResult<>(resultCode, msg, null));
}
resultMap.put("PresetItemList",presetItemList );
return resultMap;
});
resultHolder.put(key, uuid, deferredResultEx);
try {
cmder.presetQuery(device, code, event -> {
RequestMessage msg = new RequestMessage();
msg.setId(uuid);
msg.setKey(key);
msg.setData(String.format("获取设备预置位失败,错误码: %s, %s", event.statusCode, event.msg));
resultHolder.invokeResult(msg);
});
} catch (InvalidArgumentException | SipException | ParseException e) {
log.error("[命令发送失败] 获取设备预置位: {}", e.getMessage());
throw new ControllerException(ErrorCode.ERROR100.getCode(), "命令发送失败: " + e.getMessage());
}
return result;
deferredResult.onTimeout(()->{
log.warn("[获取设备预置位] 超时, {}", device.getDeviceId());
deferredResult.setResult(WVPResult.fail(ErrorCode.ERROR100.getCode(), "wait for presetquery timeout["+timeout+"s]"));
});
return deferredResult;
}
}