优化REDIS消息高并发处理 #1578

pull/1669/head
648540858 2024-10-23 14:46:57 +08:00
parent a316a12187
commit cb39593a79
7 changed files with 301 additions and 252 deletions

View File

@ -78,7 +78,7 @@ public class KeepaliveNotifyMessageHandler extends SIPRequestProcessorParent imp
RemoteAddressInfo remoteAddressInfo = SipUtils.getRemoteAddressFromRequest(request, userSetting.getSipUseSourceIpAsRemoteAddress()); RemoteAddressInfo remoteAddressInfo = SipUtils.getRemoteAddressFromRequest(request, userSetting.getSipUseSourceIpAsRemoteAddress());
if (!device.getIp().equalsIgnoreCase(remoteAddressInfo.getIp()) || device.getPort() != remoteAddressInfo.getPort()) { if (!device.getIp().equalsIgnoreCase(remoteAddressInfo.getIp()) || device.getPort() != remoteAddressInfo.getPort()) {
log.info("[收到心跳] 设备{}地址变化, 远程地址为: {}:{}", device.getDeviceId(), remoteAddressInfo.getIp(), remoteAddressInfo.getPort()); log.info("[收到心跳] 设备{}地址变化, {}:{}->{}", device.getDeviceId(), remoteAddressInfo.getIp(), remoteAddressInfo.getPort(), request.getLocalAddress().getHostAddress());
device.setPort(remoteAddressInfo.getPort()); device.setPort(remoteAddressInfo.getPort());
device.setHostAddress(remoteAddressInfo.getIp().concat(":").concat(String.valueOf(remoteAddressInfo.getPort()))); device.setHostAddress(remoteAddressInfo.getIp().concat(":").concat(String.valueOf(remoteAddressInfo.getPort())));
device.setIp(remoteAddressInfo.getIp()); device.setIp(remoteAddressInfo.getIp());

View File

@ -15,10 +15,9 @@ import com.genersoft.iot.vmp.service.IMobilePositionService;
import com.genersoft.iot.vmp.utils.DateUtil; import com.genersoft.iot.vmp.utils.DateUtil;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.data.redis.connection.Message; import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener; import org.springframework.data.redis.connection.MessageListener;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import org.springframework.util.ObjectUtils; import org.springframework.util.ObjectUtils;
@ -26,6 +25,7 @@ import javax.sip.InvalidArgumentException;
import javax.sip.SipException; import javax.sip.SipException;
import javax.validation.constraints.NotNull; import javax.validation.constraints.NotNull;
import java.text.ParseException; import java.text.ParseException;
import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentLinkedQueue;
@ -55,111 +55,119 @@ public class RedisAlarmMsgListener implements MessageListener {
@Autowired @Autowired
private IPlatformService platformService; private IPlatformService platformService;
private ConcurrentLinkedQueue<Message> taskQueue = new ConcurrentLinkedQueue<>(); private final ConcurrentLinkedQueue<Message> taskQueue = new ConcurrentLinkedQueue<>();
@Qualifier("taskExecutor")
@Autowired
private ThreadPoolTaskExecutor taskExecutor;
@Autowired @Autowired
private UserSetting userSetting; private UserSetting userSetting;
@Override @Override
public void onMessage(@NotNull Message message, byte[] bytes) { public void onMessage(@NotNull Message message, byte[] bytes) {
log.info("收到来自REDIS的ALARM通知 {}", new String(message.getBody())); log.info("[REDIS: ALARM] {}", new String(message.getBody()));
boolean isEmpty = taskQueue.isEmpty();
taskQueue.offer(message); taskQueue.offer(message);
if (isEmpty) { }
// logger.info("[线程池信息]活动线程数:{}, 最大线程数: {}", taskExecutor.getActiveCount(), taskExecutor.getMaxPoolSize());
taskExecutor.execute(() -> {
while (!taskQueue.isEmpty()) {
Message msg = taskQueue.poll();
try {
AlarmChannelMessage alarmChannelMessage = JSON.parseObject(msg.getBody(), AlarmChannelMessage.class);
if (alarmChannelMessage == null) {
log.warn("[REDIS的ALARM通知]消息解析失败");
continue;
}
String gbId = alarmChannelMessage.getGbId();
DeviceAlarm deviceAlarm = new DeviceAlarm(); @Scheduled(fixedDelay = 100)
deviceAlarm.setCreateTime(DateUtil.getNow()); public void executeTaskQueue() {
deviceAlarm.setChannelId(gbId); if (taskQueue.isEmpty()) {
deviceAlarm.setAlarmDescription(alarmChannelMessage.getAlarmDescription()); return;
deviceAlarm.setAlarmMethod("" + alarmChannelMessage.getAlarmSn()); }
deviceAlarm.setAlarmType("" + alarmChannelMessage.getAlarmType()); List<Message> messageDataList = new ArrayList<>();
deviceAlarm.setAlarmPriority("1"); int size = taskQueue.size();
deviceAlarm.setAlarmTime(DateUtil.getNow()); for (int i = 0; i < size; i++) {
deviceAlarm.setLongitude(0); Message msg = taskQueue.poll();
deviceAlarm.setLatitude(0); if (msg != null) {
messageDataList.add(msg);
}
}
if (messageDataList.isEmpty()) {
return;
}
for (Message msg : messageDataList) {
try {
AlarmChannelMessage alarmChannelMessage = JSON.parseObject(msg.getBody(), AlarmChannelMessage.class);
if (alarmChannelMessage == null) {
log.warn("[REDIS的ALARM通知]消息解析失败");
continue;
}
String gbId = alarmChannelMessage.getGbId();
if (ObjectUtils.isEmpty(gbId)) { DeviceAlarm deviceAlarm = new DeviceAlarm();
if (userSetting.getSendToPlatformsWhenIdLost()) { deviceAlarm.setCreateTime(DateUtil.getNow());
// 发送给所有的上级 deviceAlarm.setChannelId(gbId);
List<Platform> parentPlatforms = platformService.queryEnablePlatformList(); deviceAlarm.setAlarmDescription(alarmChannelMessage.getAlarmDescription());
if (!parentPlatforms.isEmpty()) { deviceAlarm.setAlarmMethod("" + alarmChannelMessage.getAlarmSn());
for (Platform parentPlatform : parentPlatforms) { deviceAlarm.setAlarmType("" + alarmChannelMessage.getAlarmType());
try { deviceAlarm.setAlarmPriority("1");
deviceAlarm.setChannelId(parentPlatform.getDeviceGBId()); deviceAlarm.setAlarmTime(DateUtil.getNow());
commanderForPlatform.sendAlarmMessage(parentPlatform, deviceAlarm); deviceAlarm.setLongitude(0);
} catch (SipException | InvalidArgumentException | ParseException e) { deviceAlarm.setLatitude(0);
log.error("[命令发送失败] 国标级联 发送报警: {}", e.getMessage());
}
}
}
}else {
// 获取开启了消息推送的设备和平台
List<Platform> parentPlatforms = mobilePositionService.queryEnablePlatformListWithAsMessageChannel();
if (parentPlatforms.size() > 0) {
for (Platform parentPlatform : parentPlatforms) {
try {
deviceAlarm.setChannelId(parentPlatform.getDeviceGBId());
commanderForPlatform.sendAlarmMessage(parentPlatform, deviceAlarm);
} catch (SipException | InvalidArgumentException | ParseException e) {
log.error("[命令发送失败] 国标级联 发送报警: {}", e.getMessage());
}
}
}
} if (ObjectUtils.isEmpty(gbId)) {
// 获取开启了消息推送的设备和平台 if (userSetting.getSendToPlatformsWhenIdLost()) {
List<Device> devices = channelService.queryDeviceWithAsMessageChannel(); // 发送给所有的上级
if (devices.size() > 0) { List<Platform> parentPlatforms = platformService.queryEnablePlatformList();
for (Device device : devices) { if (!parentPlatforms.isEmpty()) {
try { for (Platform parentPlatform : parentPlatforms) {
deviceAlarm.setChannelId(device.getDeviceId());
commander.sendAlarmMessage(device, deviceAlarm);
} catch (InvalidArgumentException | SipException | ParseException e) {
log.error("[命令发送失败] 发送报警: {}", e.getMessage());
}
}
}
}else {
Device device = deviceService.getDeviceByDeviceId(gbId);
Platform platform = platformService.queryPlatformByServerGBId(gbId);
if (device != null && platform == null) {
try { try {
commander.sendAlarmMessage(device, deviceAlarm); deviceAlarm.setChannelId(parentPlatform.getDeviceGBId());
} catch (InvalidArgumentException | SipException | ParseException e) { commanderForPlatform.sendAlarmMessage(parentPlatform, deviceAlarm);
log.error("[命令发送失败] 发送报警: {}", e.getMessage()); } catch (SipException | InvalidArgumentException | ParseException e) {
log.error("[命令发送失败] 国标级联 发送报警: {}", e.getMessage());
} }
}else if (device == null && platform != null){
try {
commanderForPlatform.sendAlarmMessage(platform, deviceAlarm);
} catch (InvalidArgumentException | SipException | ParseException e) {
log.error("[命令发送失败] 发送报警: {}", e.getMessage());
}
}else {
log.warn("无法确定" + gbId + "是平台还是设备");
} }
} }
}catch (Exception e) { } else {
log.error("未处理的异常 ", e); // 获取开启了消息推送的设备和平台
log.warn("[REDIS的ALARM通知] 发现未处理的异常, {}",e.getMessage()); List<Platform> parentPlatforms = mobilePositionService.queryEnablePlatformListWithAsMessageChannel();
if (!parentPlatforms.isEmpty()) {
for (Platform parentPlatform : parentPlatforms) {
try {
deviceAlarm.setChannelId(parentPlatform.getDeviceGBId());
commanderForPlatform.sendAlarmMessage(parentPlatform, deviceAlarm);
} catch (SipException | InvalidArgumentException | ParseException e) {
log.error("[命令发送失败] 国标级联 发送报警: {}", e.getMessage());
}
}
}
}
// 获取开启了消息推送的设备和平台
List<Device> devices = channelService.queryDeviceWithAsMessageChannel();
if (!devices.isEmpty()) {
for (Device device : devices) {
try {
deviceAlarm.setChannelId(device.getDeviceId());
commander.sendAlarmMessage(device, deviceAlarm);
} catch (InvalidArgumentException | SipException | ParseException e) {
log.error("[命令发送失败] 发送报警: {}", e.getMessage());
}
}
}
} else {
Device device = deviceService.getDeviceByDeviceId(gbId);
Platform platform = platformService.queryPlatformByServerGBId(gbId);
if (device != null && platform == null) {
try {
commander.sendAlarmMessage(device, deviceAlarm);
} catch (InvalidArgumentException | SipException | ParseException e) {
log.error("[命令发送失败] 发送报警: {}", e.getMessage());
}
} else if (device == null && platform != null) {
try {
commanderForPlatform.sendAlarmMessage(platform, deviceAlarm);
} catch (InvalidArgumentException | SipException | ParseException e) {
log.error("[命令发送失败] 发送报警: {}", e.getMessage());
}
} else {
log.warn("无法确定" + gbId + "是平台还是设备");
} }
} }
}); } catch (Exception e) {
log.error("未处理的异常 ", e);
log.warn("[REDIS的ALARM通知] 发现未处理的异常, {}", e.getMessage());
}
} }
} }
} }

View File

@ -6,12 +6,13 @@ import com.genersoft.iot.vmp.streamPush.service.IStreamPushService;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.NotNull;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.data.redis.connection.Message; import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener; import org.springframework.data.redis.connection.MessageListener;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentLinkedQueue;
/** /**
@ -26,31 +27,40 @@ public class RedisCloseStreamMsgListener implements MessageListener {
@Autowired @Autowired
private IStreamPushService pushService; private IStreamPushService pushService;
private ConcurrentLinkedQueue<Message> taskQueue = new ConcurrentLinkedQueue<>(); private final ConcurrentLinkedQueue<Message> taskQueue = new ConcurrentLinkedQueue<>();
@Qualifier("taskExecutor")
@Autowired
private ThreadPoolTaskExecutor taskExecutor;
@Override @Override
public void onMessage(@NotNull Message message, byte[] bytes) { public void onMessage(@NotNull Message message, byte[] bytes) {
boolean isEmpty = taskQueue.isEmpty(); log.info("[REDIS: 关闭流] {}", new String(message.getBody()));
taskQueue.offer(message); taskQueue.offer(message);
if (isEmpty) { }
taskExecutor.execute(() -> {
while (!taskQueue.isEmpty()) { @Scheduled(fixedDelay = 100)
Message msg = taskQueue.poll(); public void executeTaskQueue() {
try { if (taskQueue.isEmpty()) {
JSONObject jsonObject = JSON.parseObject(msg.getBody()); return;
String app = jsonObject.getString("app"); }
String stream = jsonObject.getString("stream"); List<Message> messageDataList = new ArrayList<>();
pushService.stopByAppAndStream(app, stream); int size = taskQueue.size();
}catch (Exception e) { for (int i = 0; i < size; i++) {
log.warn("[REDIS的关闭推流通知] 发现未处理的异常, \r\n{}", JSON.toJSONString(message)); Message msg = taskQueue.poll();
log.error("[REDIS的关闭推流通知] 异常内容: ", e); if (msg != null) {
} messageDataList.add(msg);
} }
}); }
if (messageDataList.isEmpty()) {
return;
}
for (Message msg : messageDataList) {
try {
JSONObject jsonObject = JSON.parseObject(msg.getBody());
String app = jsonObject.getString("app");
String stream = jsonObject.getString("stream");
pushService.stopByAppAndStream(app, stream);
}catch (Exception e) {
log.warn("[REDIS的关闭推流通知] 发现未处理的异常, \r\n{}", JSON.toJSONString(msg));
log.error("[REDIS的关闭推流通知] 异常内容: ", e);
}
} }
} }
} }

View File

@ -33,11 +33,12 @@ public class RedisGpsMsgListener implements MessageListener {
@Autowired @Autowired
private IStreamPushService streamPushService; private IStreamPushService streamPushService;
private ConcurrentLinkedQueue<Message> taskQueue = new ConcurrentLinkedQueue<>(); private final ConcurrentLinkedQueue<Message> taskQueue = new ConcurrentLinkedQueue<>();
@Override @Override
public void onMessage(@NotNull Message message, byte[] bytes) { public void onMessage(@NotNull Message message, byte[] bytes) {
log.debug("[REDIS: GPS] {}", new String(message.getBody()));
taskQueue.offer(message); taskQueue.offer(message);
} }

View File

@ -8,11 +8,9 @@ import com.genersoft.iot.vmp.streamPush.bean.StreamPush;
import com.genersoft.iot.vmp.streamPush.service.IStreamPushService; import com.genersoft.iot.vmp.streamPush.service.IStreamPushService;
import com.genersoft.iot.vmp.utils.DateUtil; import com.genersoft.iot.vmp.utils.DateUtil;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.data.redis.connection.Message; import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener; import org.springframework.data.redis.connection.MessageListener;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import javax.annotation.Resource; import javax.annotation.Resource;
@ -38,86 +36,93 @@ public class RedisPushStreamListMsgListener implements MessageListener {
@Resource @Resource
private IStreamPushService streamPushService; private IStreamPushService streamPushService;
private ConcurrentLinkedQueue<Message> taskQueue = new ConcurrentLinkedQueue<>(); private final ConcurrentLinkedQueue<Message> taskQueue = new ConcurrentLinkedQueue<>();
@Qualifier("taskExecutor")
@Autowired
private ThreadPoolTaskExecutor taskExecutor;
@Override @Override
public void onMessage(Message message, byte[] bytes) { public void onMessage(Message message, byte[] bytes) {
log.info("[REDIS消息-推流设备列表更新] {}", new String(message.getBody())); log.info("[REDIS: 流设备列表更新] {}", new String(message.getBody()));
boolean isEmpty = taskQueue.isEmpty();
taskQueue.offer(message); taskQueue.offer(message);
if (isEmpty) { }
taskExecutor.execute(() -> {
while (!taskQueue.isEmpty()) {
Message msg = taskQueue.poll();
try {
List<RedisPushStreamMessage> streamPushItems = JSON.parseArray(new String(msg.getBody()), RedisPushStreamMessage.class);
//查询全部的app+stream 用于判断是添加还是修改
Map<String, StreamPush> allAppAndStream = streamPushService.getAllAppAndStreamMap();
Map<String, StreamPush> allGBId = streamPushService.getAllGBId();
/** @Scheduled(fixedDelay = 100)
* APP+Streamstream_pushgb_stream public void executeTaskQueue() {
*/ if (taskQueue.isEmpty()) {
List<StreamPush> streamPushItemForSave = new ArrayList<>(); return;
List<StreamPush> streamPushItemForUpdate = new ArrayList<>(); }
for (RedisPushStreamMessage pushStreamMessage : streamPushItems) { List<Message> messageDataList = new ArrayList<>();
String app = pushStreamMessage.getApp(); int size = taskQueue.size();
String stream = pushStreamMessage.getStream(); for (int i = 0; i < size; i++) {
boolean contains = allAppAndStream.containsKey(app + stream); Message msg = taskQueue.poll();
//不存在就添加 if (msg != null) {
if (!contains) { messageDataList.add(msg);
if (allGBId.containsKey(pushStreamMessage.getGbId())) { }
StreamPush streamPushInDb = allGBId.get(pushStreamMessage.getGbId()); }
log.warn("[REDIS消息-推流设备列表更新-INSERT] 国标编号重复: {}, 已分配给{}/{}", if (messageDataList.isEmpty()) {
streamPushInDb.getGbDeviceId(), streamPushInDb.getApp(), streamPushInDb.getStream()); return;
continue; }
} for (Message msg : messageDataList) {
StreamPush streamPush = pushStreamMessage.buildstreamPush(); try {
streamPush.setCreateTime(DateUtil.getNow()); List<RedisPushStreamMessage> streamPushItems = JSON.parseArray(new String(msg.getBody()), RedisPushStreamMessage.class);
streamPush.setUpdateTime(DateUtil.getNow()); //查询全部的app+stream 用于判断是添加还是修改
streamPush.setMediaServerId(mediaServerService.getDefaultMediaServer().getId()); Map<String, StreamPush> allAppAndStream = streamPushService.getAllAppAndStreamMap();
streamPushItemForSave.add(streamPush); Map<String, StreamPush> allGBId = streamPushService.getAllGBId();
allGBId.put(streamPush.getGbDeviceId(), streamPush);
} else {
StreamPush streamPushForGbDeviceId = allGBId.get(pushStreamMessage.getGbId());
if (streamPushForGbDeviceId != null
&& (!streamPushForGbDeviceId.getApp().equals(pushStreamMessage.getApp())
|| !streamPushForGbDeviceId.getStream().equals(pushStreamMessage.getStream()))) {
StreamPush streamPushInDb = allGBId.get(pushStreamMessage.getGbId());
log.warn("[REDIS消息-推流设备列表更新-UPDATE] 国标编号重复: {}, 已分配给{}/{}",
pushStreamMessage.getGbId(), streamPushInDb.getApp(), streamPushInDb.getStream());
continue;
}
StreamPush streamPush = allAppAndStream.get(app + stream);
streamPush.setUpdateTime(DateUtil.getNow());
streamPush.setGbDeviceId(pushStreamMessage.getGbId());
streamPush.setGbName(pushStreamMessage.getName());
streamPush.setGbStatus(pushStreamMessage.isStatus()?"ON":"OFF");
//存在就只修改 name和gbId
streamPushItemForUpdate.add(streamPush);
}
}
if (!streamPushItemForSave.isEmpty()) {
log.info("添加{}条",streamPushItemForSave.size());
log.info(JSONObject.toJSONString(streamPushItemForSave));
streamPushService.batchAdd(streamPushItemForSave);
// 用于存储更具APP+Stream过滤后的数据可以直接存入stream_push表与gb_stream表
List<StreamPush> streamPushItemForSave = new ArrayList<>();
List<StreamPush> streamPushItemForUpdate = new ArrayList<>();
for (RedisPushStreamMessage pushStreamMessage : streamPushItems) {
String app = pushStreamMessage.getApp();
String stream = pushStreamMessage.getStream();
boolean contains = allAppAndStream.containsKey(app + stream);
//不存在就添加
if (!contains) {
if (allGBId.containsKey(pushStreamMessage.getGbId())) {
StreamPush streamPushInDb = allGBId.get(pushStreamMessage.getGbId());
log.warn("[REDIS消息-推流设备列表更新-INSERT] 国标编号重复: {}, 已分配给{}/{}",
streamPushInDb.getGbDeviceId(), streamPushInDb.getApp(), streamPushInDb.getStream());
continue;
} }
if(!streamPushItemForUpdate.isEmpty()){ StreamPush streamPush = pushStreamMessage.buildstreamPush();
log.info("修改{}条",streamPushItemForUpdate.size()); streamPush.setCreateTime(DateUtil.getNow());
log.info(JSONObject.toJSONString(streamPushItemForUpdate)); streamPush.setUpdateTime(DateUtil.getNow());
streamPushService.batchUpdate(streamPushItemForUpdate); streamPush.setMediaServerId(mediaServerService.getDefaultMediaServer().getId());
streamPushItemForSave.add(streamPush);
allGBId.put(streamPush.getGbDeviceId(), streamPush);
} else {
StreamPush streamPushForGbDeviceId = allGBId.get(pushStreamMessage.getGbId());
if (streamPushForGbDeviceId != null
&& (!streamPushForGbDeviceId.getApp().equals(pushStreamMessage.getApp())
|| !streamPushForGbDeviceId.getStream().equals(pushStreamMessage.getStream()))) {
StreamPush streamPushInDb = allGBId.get(pushStreamMessage.getGbId());
log.warn("[REDIS消息-推流设备列表更新-UPDATE] 国标编号重复: {}, 已分配给{}/{}",
pushStreamMessage.getGbId(), streamPushInDb.getApp(), streamPushInDb.getStream());
continue;
} }
}catch (Exception e) { StreamPush streamPush = allAppAndStream.get(app + stream);
log.warn("[REDIS消息-推流设备列表更新] 发现未处理的异常, \r\n{}", new String(message.getBody())); streamPush.setUpdateTime(DateUtil.getNow());
log.error("[REDIS消息-推流设备列表更新] 异常内容: ", e); streamPush.setGbDeviceId(pushStreamMessage.getGbId());
streamPush.setGbName(pushStreamMessage.getName());
streamPush.setGbStatus(pushStreamMessage.isStatus() ? "ON" : "OFF");
//存在就只修改 name和gbId
streamPushItemForUpdate.add(streamPush);
} }
} }
}); if (!streamPushItemForSave.isEmpty()) {
log.info("添加{}条", streamPushItemForSave.size());
log.info(JSONObject.toJSONString(streamPushItemForSave));
streamPushService.batchAdd(streamPushItemForSave);
}
if (!streamPushItemForUpdate.isEmpty()) {
log.info("修改{}条", streamPushItemForUpdate.size());
log.info(JSONObject.toJSONString(streamPushItemForUpdate));
streamPushService.batchUpdate(streamPushItemForUpdate);
}
} catch (Exception e) {
log.warn("[REDIS消息-推流设备列表更新] 发现未处理的异常, \r\n{}", new String(msg.getBody()));
log.error("[REDIS消息-推流设备列表更新] 异常内容: ", e);
}
} }
} }
} }

View File

@ -7,16 +7,20 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.data.redis.connection.Message; import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener; import org.springframework.data.redis.connection.MessageListener;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import org.springframework.util.ObjectUtils; import org.springframework.util.ObjectUtils;
import java.util.ArrayList;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentLinkedQueue;
/** /**
* redis * redis
*
* @author lin * @author lin
* PUBLISH VM_MSG_STREAM_PUSH_RESPONSE '{"code":0,"msg":"失败","app":"1000","stream":"10000022"}' * PUBLISH VM_MSG_STREAM_PUSH_RESPONSE '{"code":0,"msg":"失败","app":"1000","stream":"10000022"}'
*/ */
@ -30,37 +34,49 @@ public class RedisPushStreamResponseListener implements MessageListener {
@Autowired @Autowired
private ThreadPoolTaskExecutor taskExecutor; private ThreadPoolTaskExecutor taskExecutor;
private Map<String, PushStreamResponseEvent> responseEvents = new ConcurrentHashMap<>(); private final Map<String, PushStreamResponseEvent> responseEvents = new ConcurrentHashMap<>();
public interface PushStreamResponseEvent{ public interface PushStreamResponseEvent {
void run(MessageForPushChannelResponse response); void run(MessageForPushChannelResponse response);
} }
@Override @Override
public void onMessage(Message message, byte[] bytes) { public void onMessage(Message message, byte[] bytes) {
log.info("[REDIS消息-请求推流结果] {}", new String(message.getBody())); log.info("[REDIS: 推流结果] {}", new String(message.getBody()));
boolean isEmpty = taskQueue.isEmpty();
taskQueue.offer(message); taskQueue.offer(message);
if (isEmpty) { }
taskExecutor.execute(() -> {
while (!taskQueue.isEmpty()) { @Scheduled(fixedDelay = 100)
Message msg = taskQueue.poll(); public void executeTaskQueue() {
try { if (taskQueue.isEmpty()) {
MessageForPushChannelResponse response = JSON.parseObject(new String(msg.getBody()), MessageForPushChannelResponse.class); return;
if (response == null || ObjectUtils.isEmpty(response.getApp()) || ObjectUtils.isEmpty(response.getStream())){ }
log.info("[REDIS消息-请求推流结果]:参数不全"); List<Message> messageDataList = new ArrayList<>();
continue; int size = taskQueue.size();
} for (int i = 0; i < size; i++) {
// 查看正在等待的invite消息 Message msg = taskQueue.poll();
if (responseEvents.get(response.getApp() + response.getStream()) != null) { if (msg != null) {
responseEvents.get(response.getApp() + response.getStream()).run(response); messageDataList.add(msg);
} }
}catch (Exception e) { }
log.warn("[REDIS消息-请求推流结果] 发现未处理的异常, \r\n{}", JSON.toJSONString(message)); if (messageDataList.isEmpty()) {
log.error("[REDIS消息-请求推流结果] 异常内容: ", e); return;
} }
for (Message msg : messageDataList) {
try {
MessageForPushChannelResponse response = JSON.parseObject(new String(msg.getBody()), MessageForPushChannelResponse.class);
if (response == null || ObjectUtils.isEmpty(response.getApp()) || ObjectUtils.isEmpty(response.getStream())) {
log.info("[REDIS消息-请求推流结果]:参数不全");
continue;
} }
}); // 查看正在等待的invite消息
if (responseEvents.get(response.getApp() + response.getStream()) != null) {
responseEvents.get(response.getApp() + response.getStream()).run(response);
}
} catch (Exception e) {
log.warn("[REDIS消息-请求推流结果] 发现未处理的异常, \r\n{}", JSON.toJSONString(msg));
log.error("[REDIS消息-请求推流结果] 异常内容: ", e);
}
} }
} }

View File

@ -9,19 +9,21 @@ import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.streamPush.service.IStreamPushService; import com.genersoft.iot.vmp.streamPush.service.IStreamPushService;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.ApplicationArguments; import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner; import org.springframework.boot.ApplicationRunner;
import org.springframework.data.redis.connection.Message; import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener; import org.springframework.data.redis.connection.MessageListener;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentLinkedQueue;
/** /**
* redis线线 * redis线线
*
* @author lin * @author lin
* PUBLISH VM_MSG_PUSH_STREAM_STATUS_CHANGE '{"setAllOffline":false,"offlineStreams":[{"app":"1000","stream":"10000022","timeStamp":1726729716551}]}' * PUBLISH VM_MSG_PUSH_STREAM_STATUS_CHANGE '{"setAllOffline":false,"offlineStreams":[{"app":"1000","stream":"10000022","timeStamp":1726729716551}]}'
* SUBSCRIBE VM_MSG_PUSH_STREAM_STATUS_CHANGE * SUBSCRIBE VM_MSG_PUSH_STREAM_STATUS_CHANGE
@ -44,48 +46,55 @@ public class RedisPushStreamStatusMsgListener implements MessageListener, Applic
private final ConcurrentLinkedQueue<Message> taskQueue = new ConcurrentLinkedQueue<>(); private final ConcurrentLinkedQueue<Message> taskQueue = new ConcurrentLinkedQueue<>();
@Qualifier("taskExecutor")
@Autowired
private ThreadPoolTaskExecutor taskExecutor;
@Override @Override
public void onMessage(Message message, byte[] bytes) { public void onMessage(Message message, byte[] bytes) {
boolean isEmpty = taskQueue.isEmpty(); log.info("[REDIS: 流设备状态变化] {}", new String(message.getBody()));
log.warn("[REDIS消息-推流设备状态变化] {}", new String(message.getBody()));
taskQueue.offer(message); taskQueue.offer(message);
}
if (isEmpty) { @Scheduled(fixedDelay = 100)
taskExecutor.execute(() -> { public void executeTaskQueue() {
while (!taskQueue.isEmpty()) { if (taskQueue.isEmpty()) {
Message msg = taskQueue.poll(); return;
try { }
PushStreamStatusChangeFromRedisDto streamStatusMessage = JSON.parseObject(msg.getBody(), PushStreamStatusChangeFromRedisDto.class); List<Message> messageDataList = new ArrayList<>();
if (streamStatusMessage == null) { int size = taskQueue.size();
log.warn("[REDIS消息]推流设备状态变化消息解析失败"); for (int i = 0; i < size; i++) {
continue; Message msg = taskQueue.poll();
} if (msg != null) {
// 取消定时任务 messageDataList.add(msg);
dynamicTask.stop(VideoManagerConstants.VM_MSG_GET_ALL_ONLINE_REQUESTED); }
if (streamStatusMessage.isSetAllOffline()) { }
// 所有设备离线 if (messageDataList.isEmpty()) {
streamPushService.allOffline(); return;
} }
if (streamStatusMessage.getOfflineStreams() != null for (Message msg : messageDataList) {
&& !streamStatusMessage.getOfflineStreams().isEmpty()) { try {
// 更新部分设备离线 PushStreamStatusChangeFromRedisDto streamStatusMessage = JSON.parseObject(msg.getBody(), PushStreamStatusChangeFromRedisDto.class);
streamPushService.offline(streamStatusMessage.getOfflineStreams()); if (streamStatusMessage == null) {
} log.warn("[REDIS消息]推流设备状态变化消息解析失败");
if (streamStatusMessage.getOnlineStreams() != null && continue;
!streamStatusMessage.getOnlineStreams().isEmpty()) {
// 更新部分设备上线
streamPushService.online(streamStatusMessage.getOnlineStreams());
}
}catch (Exception e) {
log.warn("[REDIS消息-推流设备状态变化] 发现未处理的异常, \r\n{}", JSON.toJSONString(message));
log.error("[REDIS消息-推流设备状态变化] 异常内容: ", e);
}
} }
}); // 取消定时任务
dynamicTask.stop(VideoManagerConstants.VM_MSG_GET_ALL_ONLINE_REQUESTED);
if (streamStatusMessage.isSetAllOffline()) {
// 所有设备离线
streamPushService.allOffline();
}
if (streamStatusMessage.getOfflineStreams() != null
&& !streamStatusMessage.getOfflineStreams().isEmpty()) {
// 更新部分设备离线
streamPushService.offline(streamStatusMessage.getOfflineStreams());
}
if (streamStatusMessage.getOnlineStreams() != null &&
!streamStatusMessage.getOnlineStreams().isEmpty()) {
// 更新部分设备上线
streamPushService.online(streamStatusMessage.getOnlineStreams());
}
} catch (Exception e) {
log.warn("[REDIS消息-推流设备状态变化] 发现未处理的异常, \r\n{}", JSON.toJSONString(msg));
log.error("[REDIS消息-推流设备状态变化] 异常内容: ", e);
}
} }
} }
@ -94,7 +103,7 @@ public class RedisPushStreamStatusMsgListener implements MessageListener, Applic
if (!userSetting.isUsePushingAsStatus()) { if (!userSetting.isUsePushingAsStatus()) {
// 启动时设置所有推流通道离线,发起查询请求 // 启动时设置所有推流通道离线,发起查询请求
redisCatchStorage.sendStreamPushRequestedMsgForStatus(); redisCatchStorage.sendStreamPushRequestedMsgForStatus();
dynamicTask.startDelay(VideoManagerConstants.VM_MSG_GET_ALL_ONLINE_REQUESTED, ()->{ dynamicTask.startDelay(VideoManagerConstants.VM_MSG_GET_ALL_ONLINE_REQUESTED, () -> {
log.info("[REDIS消息]未收到redis回复推流设备状态执行推流设备离线"); log.info("[REDIS消息]未收到redis回复推流设备状态执行推流设备离线");
// 五秒收不到请求就设置通道离线,然后通知上级离线 // 五秒收不到请求就设置通道离线,然后通知上级离线
streamPushService.allOffline(); streamPushService.allOffline();