修复使用队列导致的问题

pull/620/head
648540858 2022-09-22 16:56:20 +08:00
parent 70c20364af
commit a574ff0944
11 changed files with 44 additions and 38 deletions

View File

@ -79,7 +79,7 @@ public class NotifyRequestProcessor extends SIPRequestProcessorParent implements
private boolean taskQueueHandlerRun = false;
private final ConcurrentLinkedQueue<HandlerCatchData> taskQueue = new ConcurrentLinkedQueue<>();
private ConcurrentLinkedQueue<HandlerCatchData> taskQueue = new ConcurrentLinkedQueue<>();
@Qualifier("taskExecutor")
@Autowired

View File

@ -72,7 +72,7 @@ public class AlarmNotifyMessageHandler extends SIPRequestProcessorParent impleme
private boolean taskQueueHandlerRun = false;
private final ConcurrentLinkedQueue<SipMsgInfo> taskQueue = new ConcurrentLinkedQueue<>();
private ConcurrentLinkedQueue<SipMsgInfo> taskQueue = new ConcurrentLinkedQueue<>();
@Qualifier("taskExecutor")
@Autowired
@ -92,13 +92,14 @@ public class AlarmNotifyMessageHandler extends SIPRequestProcessorParent impleme
if (!taskQueueHandlerRun) {
taskQueueHandlerRun = true;
taskExecutor.execute(() -> {
logger.info("[处理报警通知]待处理数量:{}", taskQueue.size() );
while (!taskQueue.isEmpty()) {
SipMsgInfo sipMsgInfo = taskQueue.poll();
// 回复200 OK
try {
responseAck(getServerTransaction(sipMsgInfo.getEvt()), Response.OK);
} catch (SipException | InvalidArgumentException | ParseException e) {
logger.error("[收到报警通知], 回复200OK失败", e);
logger.error("[处理报警通知], 回复200OK失败", e);
}
Element deviceIdElement = sipMsgInfo.getRootElement().element("DeviceID");
@ -112,7 +113,7 @@ public class AlarmNotifyMessageHandler extends SIPRequestProcessorParent impleme
deviceAlarm.setAlarmMethod(getText(sipMsgInfo.getRootElement(), "AlarmMethod"));
String alarmTime = XmlUtil.getText(sipMsgInfo.getRootElement(), "AlarmTime");
if (alarmTime == null) {
return;
continue;
}
deviceAlarm.setAlarmTime(DateUtil.ISO8601Toyyyy_MM_dd_HH_mm_ss(alarmTime));
String alarmDescription = getText(sipMsgInfo.getRootElement(), "AlarmDescription");
@ -182,7 +183,7 @@ public class AlarmNotifyMessageHandler extends SIPRequestProcessorParent impleme
deviceAlarm.setAlarmType(getText(sipMsgInfo.getRootElement().element("Info"), "AlarmType"));
}
}
logger.info("[收到报警通知]内容:{}", JSONObject.toJSON(deviceAlarm));
if ("7".equals(deviceAlarm.getAlarmMethod()) ) {
// 发送给平台的报警信息。 发送redis通知
AlarmChannelMessage alarmChannelMessage = new AlarmChannelMessage();
@ -190,7 +191,7 @@ public class AlarmNotifyMessageHandler extends SIPRequestProcessorParent impleme
alarmChannelMessage.setAlarmDescription(deviceAlarm.getAlarmDescription());
alarmChannelMessage.setGbId(channelId);
redisCatchStorage.sendAlarmMsg(alarmChannelMessage);
return;
continue;
}
logger.debug("存储报警信息、报警分类");
@ -198,7 +199,7 @@ public class AlarmNotifyMessageHandler extends SIPRequestProcessorParent impleme
if (sipConfig.isAlarm()) {
deviceAlarmService.add(deviceAlarm);
}
logger.info("[收到报警通知]内容:{}", JSONObject.toJSON(deviceAlarm));
if (redisCatchStorage.deviceIsOnline(sipMsgInfo.getDevice().getDeviceId())) {
publisher.deviceAlarmEventPublish(deviceAlarm);
}

View File

@ -58,7 +58,7 @@ public class MobilePositionNotifyMessageHandler extends SIPRequestProcessorParen
private boolean taskQueueHandlerRun = false;
private final ConcurrentLinkedQueue<SipMsgInfo> taskQueue = new ConcurrentLinkedQueue<>();
private ConcurrentLinkedQueue<SipMsgInfo> taskQueue = new ConcurrentLinkedQueue<>();
@Qualifier("taskExecutor")
@Autowired
@ -83,7 +83,7 @@ public class MobilePositionNotifyMessageHandler extends SIPRequestProcessorParen
if (rootElementAfterCharset == null) {
logger.warn("[ 移动设备位置数据通知 ] content cannot be null, {}", sipMsgInfo.getEvt().getRequest());
responseAck(getServerTransaction(sipMsgInfo.getEvt()), Response.BAD_REQUEST);
return;
continue;
}
MobilePosition mobilePosition = new MobilePosition();
mobilePosition.setCreateTime(DateUtil.getNow());

View File

@ -131,6 +131,24 @@ public class PlayServiceImpl implements IPlayService {
StreamInfo streamInfo = redisCatchStorage.queryPlayByDevice(deviceId, channelId);
playResult.setDevice(device);
result.onCompletion(()->{
// 点播结束时调用截图接口
taskExecutor.execute(()->{
// TODO 应该在上流时调用更好,结束也可能是错误结束
String path = "snap";
String fileName = deviceId + "_" + channelId + ".jpg";
WVPResult wvpResult = (WVPResult)result.getResult();
if (Objects.requireNonNull(wvpResult).getCode() == 0) {
StreamInfo streamInfoForSuccess = (StreamInfo)wvpResult.getData();
MediaServerItem mediaInfo = mediaServerService.getOne(streamInfoForSuccess.getMediaServerId());
String streamUrl = streamInfoForSuccess.getFmp4();
// 请求截图
logger.info("[请求截图]: " + fileName);
zlmresTfulUtils.getSnap(mediaInfo, streamUrl, 15, 1, path, fileName);
}
});
});
if (streamInfo != null) {
String streamId = streamInfo.getStream();
if (streamId == null) {
@ -192,21 +210,6 @@ public class PlayServiceImpl implements IPlayService {
SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, streamId, device.isSsrcCheck(), false);
logger.info(JSONObject.toJSONString(ssrcInfo));
play(mediaServerItem, ssrcInfo, device, channelId, (mediaServerItemInUse, response)->{
// 点播结束时调用截图接口
taskExecutor.execute(()->{
// TODO 应该在上流时调用更好,结束也可能是错误结束
String path = "snap";
String fileName = deviceId + "_" + channelId + ".jpg";
WVPResult wvpResult = (WVPResult)result.getResult();
if (Objects.requireNonNull(wvpResult).getCode() == 0) {
StreamInfo streamInfoForSuccess = (StreamInfo)wvpResult.getData();
MediaServerItem mediaInfo = mediaServerService.getOne(streamInfoForSuccess.getMediaServerId());
String streamUrl = streamInfoForSuccess.getFmp4();
// 请求截图
logger.info("[请求截图]: " + fileName);
zlmresTfulUtils.getSnap(mediaInfo, streamUrl, 15, 1, path, fileName);
}
});
if (hookEvent != null) {
hookEvent.response(mediaServerItem, response);
}

View File

@ -16,6 +16,7 @@ import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;
import org.springframework.util.ObjectUtils;
import javax.validation.constraints.NotNull;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
@ -36,19 +37,20 @@ public class RedisAlarmMsgListener implements MessageListener {
private boolean taskQueueHandlerRun = false;
private final ConcurrentLinkedQueue<Message> taskQueue = new ConcurrentLinkedQueue<>();
private ConcurrentLinkedQueue<Message> taskQueue = new ConcurrentLinkedQueue<>();
@Qualifier("taskExecutor")
@Autowired
private ThreadPoolTaskExecutor taskExecutor;
@Override
public void onMessage(Message message, byte[] bytes) {
public void onMessage(@NotNull Message message, byte[] bytes) {
logger.info("收到来自REDIS的ALARM通知 {}", new String(message.getBody()));
taskQueue.offer(message);
if (!taskQueueHandlerRun) {
taskQueueHandlerRun = true;
logger.info("[线程池信息]活动线程数:{}, 最大线程数: {}", taskExecutor.getActiveCount(), taskExecutor.getMaxPoolSize());
taskExecutor.execute(() -> {
while (!taskQueue.isEmpty()) {
Message msg = taskQueue.poll();
@ -56,7 +58,7 @@ public class RedisAlarmMsgListener implements MessageListener {
AlarmChannelMessage alarmChannelMessage = JSON.parseObject(msg.getBody(), AlarmChannelMessage.class);
if (alarmChannelMessage == null) {
logger.warn("[REDIS的ALARM通知]消息解析失败");
return;
continue;
}
String gbId = alarmChannelMessage.getGbId();

View File

@ -90,7 +90,7 @@ public class RedisGbPlayMsgListener implements MessageListener {
private boolean taskQueueHandlerRun = false;
private final ConcurrentLinkedQueue<Message> taskQueue = new ConcurrentLinkedQueue<>();
private ConcurrentLinkedQueue<Message> taskQueue = new ConcurrentLinkedQueue<>();
@Qualifier("taskExecutor")
@Autowired
@ -121,7 +121,7 @@ public class RedisGbPlayMsgListener implements MessageListener {
JSONObject msgJSON = JSON.parseObject(msg.getBody(), JSONObject.class);
WvpRedisMsg wvpRedisMsg = JSON.toJavaObject(msgJSON, WvpRedisMsg.class);
if (!userSetting.getServerId().equals(wvpRedisMsg.getToId())) {
return;
continue;
}
if (WvpRedisMsg.isRequest(wvpRedisMsg)) {
logger.info("[收到REDIS通知] 请求: {}", new String(msg.getBody()));

View File

@ -35,7 +35,7 @@ public class RedisGpsMsgListener implements MessageListener {
@Autowired
private IVideoManagerStorage storager;
private final ConcurrentLinkedQueue<Message> taskQueue = new ConcurrentLinkedQueue<>();
private ConcurrentLinkedQueue<Message> taskQueue = new ConcurrentLinkedQueue<>();
@Qualifier("taskExecutor")
@Autowired

View File

@ -28,7 +28,7 @@ public class RedisPushStreamResponseListener implements MessageListener {
private boolean taskQueueHandlerRun = false;
private final ConcurrentLinkedQueue<Message> taskQueue = new ConcurrentLinkedQueue<>();
private ConcurrentLinkedQueue<Message> taskQueue = new ConcurrentLinkedQueue<>();
@Qualifier("taskExecutor")
@Autowired
@ -53,7 +53,7 @@ public class RedisPushStreamResponseListener implements MessageListener {
MessageForPushChannelResponse response = JSON.parseObject(new String(msg.getBody()), MessageForPushChannelResponse.class);
if (response == null || ObjectUtils.isEmpty(response.getApp()) || ObjectUtils.isEmpty(response.getStream())){
logger.info("[REDIS消息-请求推流结果]:参数不全");
return;
continue;
}
// 查看正在等待的invite消息
if (responseEvents.get(response.getApp() + response.getStream()) != null) {

View File

@ -40,7 +40,7 @@ public class RedisPushStreamStatusListMsgListener implements MessageListener {
private boolean taskQueueHandlerRun = false;
private final ConcurrentLinkedQueue<Message> taskQueue = new ConcurrentLinkedQueue<>();
private ConcurrentLinkedQueue<Message> taskQueue = new ConcurrentLinkedQueue<>();
@Qualifier("taskExecutor")
@Autowired

View File

@ -42,7 +42,7 @@ public class RedisPushStreamStatusMsgListener implements MessageListener, Applic
private final ConcurrentLinkedQueue<Message> taskQueue = new ConcurrentLinkedQueue<>();
private ConcurrentLinkedQueue<Message> taskQueue = new ConcurrentLinkedQueue<>();
@Qualifier("taskExecutor")
@Autowired
@ -61,7 +61,7 @@ public class RedisPushStreamStatusMsgListener implements MessageListener, Applic
PushStreamStatusChangeFromRedisDto statusChangeFromPushStream = JSON.parseObject(msg.getBody(), PushStreamStatusChangeFromRedisDto.class);
if (statusChangeFromPushStream == null) {
logger.warn("[REDIS消息]推流设备状态变化消息解析失败");
return;
continue;
}
// 取消定时任务
dynamicTask.stop(VideoManagerConstants.VM_MSG_GET_ALL_ONLINE_REQUESTED);

View File

@ -35,7 +35,7 @@ public class RedisStreamMsgListener implements MessageListener {
private boolean taskQueueHandlerRun = false;
private final ConcurrentLinkedQueue<Message> taskQueue = new ConcurrentLinkedQueue<>();
private ConcurrentLinkedQueue<Message> taskQueue = new ConcurrentLinkedQueue<>();
@Qualifier("taskExecutor")
@Autowired
@ -53,13 +53,13 @@ public class RedisStreamMsgListener implements MessageListener {
JSONObject steamMsgJson = JSON.parseObject(msg.getBody(), JSONObject.class);
if (steamMsgJson == null) {
logger.warn("[收到redis 流变化]消息解析失败");
return;
continue;
}
String serverId = steamMsgJson.getString("serverId");
if (userSetting.getServerId().equals(serverId)) {
// 自己发送的消息忽略即可
return;
continue;
}
logger.info("[收到redis 流变化] {}", new String(message.getBody()));
String app = steamMsgJson.getString("app");