修复多个wvp执行推流转发失败的BUG

pull/1414/head
648540858 2024-04-10 18:22:42 +08:00
parent de5d025cee
commit 9bb5ef6522
1 changed files with 56 additions and 47 deletions

View File

@ -1,7 +1,11 @@
package com.genersoft.iot.vmp.service.redisMsg; package com.genersoft.iot.vmp.service.redisMsg;
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONObject;
import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.media.zlm.ZLMMediaListManager; import com.genersoft.iot.vmp.media.zlm.ZLMMediaListManager;
import com.genersoft.iot.vmp.media.zlm.dto.ChannelOnlineEvent;
import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
@ -11,6 +15,7 @@ import org.springframework.data.redis.connection.MessageListener;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import java.text.ParseException;
import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentLinkedQueue;
@ -37,52 +42,56 @@ public class RedisStreamMsgListener implements MessageListener {
@Override @Override
public void onMessage(Message message, byte[] bytes) { public void onMessage(Message message, byte[] bytes) {
// boolean isEmpty = taskQueue.isEmpty(); boolean isEmpty = taskQueue.isEmpty();
// taskQueue.offer(message); taskQueue.offer(message);
// if (isEmpty) { if (isEmpty) {
// taskExecutor.execute(() -> { taskExecutor.execute(() -> {
// while (!taskQueue.isEmpty()) { while (!taskQueue.isEmpty()) {
// Message msg = taskQueue.poll(); Message msg = taskQueue.poll();
// try { try {
// JSONObject steamMsgJson = JSON.parseObject(msg.getBody(), JSONObject.class); JSONObject steamMsgJson = JSON.parseObject(msg.getBody(), JSONObject.class);
// if (steamMsgJson == null) { if (steamMsgJson == null) {
// logger.warn("[收到redis 流变化]消息解析失败"); logger.warn("[收到redis 流变化]消息解析失败");
// continue; continue;
// } }
// String serverId = steamMsgJson.getString("serverId"); String serverId = steamMsgJson.getString("serverId");
//
// if (userSetting.getServerId().equals(serverId)) { if (userSetting.getServerId().equals(serverId)) {
// // 自己发送的消息忽略即可 // 自己发送的消息忽略即可
// continue; continue;
// } }
// logger.info("[收到redis 流变化] {}", new String(message.getBody())); logger.info("[收到redis 流变化] {}", new String(message.getBody()));
// String app = steamMsgJson.getString("app"); String app = steamMsgJson.getString("app");
// String stream = steamMsgJson.getString("stream"); String stream = steamMsgJson.getString("stream");
// boolean register = steamMsgJson.getBoolean("register"); boolean register = steamMsgJson.getBoolean("register");
// String mediaServerId = steamMsgJson.getString("mediaServerId"); String mediaServerId = steamMsgJson.getString("mediaServerId");
// OnStreamChangedHookParam onStreamChangedHookParam = new OnStreamChangedHookParam(); OnStreamChangedHookParam onStreamChangedHookParam = new OnStreamChangedHookParam();
// onStreamChangedHookParam.setSeverId(serverId); onStreamChangedHookParam.setSeverId(serverId);
// onStreamChangedHookParam.setApp(app); onStreamChangedHookParam.setApp(app);
// onStreamChangedHookParam.setStream(stream); onStreamChangedHookParam.setStream(stream);
// onStreamChangedHookParam.setRegist(register); onStreamChangedHookParam.setRegist(register);
// onStreamChangedHookParam.setMediaServerId(mediaServerId); onStreamChangedHookParam.setMediaServerId(mediaServerId);
// onStreamChangedHookParam.setCreateStamp(System.currentTimeMillis()/1000); onStreamChangedHookParam.setCreateStamp(System.currentTimeMillis()/1000);
// onStreamChangedHookParam.setAliveSecond(0L); onStreamChangedHookParam.setAliveSecond(0L);
// onStreamChangedHookParam.setTotalReaderCount("0"); onStreamChangedHookParam.setTotalReaderCount("0");
// onStreamChangedHookParam.setOriginType(0); onStreamChangedHookParam.setOriginType(0);
// onStreamChangedHookParam.setOriginTypeStr("0"); onStreamChangedHookParam.setOriginTypeStr("0");
// onStreamChangedHookParam.setOriginTypeStr("unknown"); onStreamChangedHookParam.setOriginTypeStr("unknown");
// if (register) { ChannelOnlineEvent channelOnlineEventLister = zlmMediaListManager.getChannelOnlineEventLister(app, stream);
// zlmMediaListManager.addPush(onStreamChangedHookParam); if ( channelOnlineEventLister != null) {
// }else { try {
// zlmMediaListManager.removeMedia(app, stream); channelOnlineEventLister.run(app, stream, serverId);;
// } } catch (ParseException e) {
// }catch (Exception e) { logger.error("addPush: ", e);
// logger.warn("[REDIS消息-流变化] 发现未处理的异常, \r\n{}", JSON.toJSONString(message)); }
// logger.error("[REDIS消息-流变化] 异常内容: ", e); zlmMediaListManager.removedChannelOnlineEventLister(app, stream);
// } }
// } }catch (Exception e) {
// }); logger.warn("[REDIS消息-流变化] 发现未处理的异常, \r\n{}", JSON.toJSONString(message));
// } logger.error("[REDIS消息-流变化] 异常内容: ", e);
}
}
});
}
} }
} }