重构报警SSE推送

pull/1669/head
648540858 2024-10-29 15:57:52 +08:00
parent caf9e99939
commit 5304620861
10 changed files with 94 additions and 92 deletions

View File

@ -52,6 +52,7 @@ public class VideoManagerConstants {
public static final String PUSH_STREAM_LIST = "VMP_PUSH_STREAM_LIST_"; public static final String PUSH_STREAM_LIST = "VMP_PUSH_STREAM_LIST_";
public static final String WAITE_SEND_PUSH_STREAM = "VMP_WAITE_SEND_PUSH_STREAM:"; public static final String WAITE_SEND_PUSH_STREAM = "VMP_WAITE_SEND_PUSH_STREAM:";
public static final String START_SEND_PUSH_STREAM = "VMP_START_SEND_PUSH_STREAM:"; public static final String START_SEND_PUSH_STREAM = "VMP_START_SEND_PUSH_STREAM:";
public static final String SSE_TASK_KEY = "SSE_TASK_";

View File

@ -13,18 +13,15 @@ import java.util.Set;
@Data @Data
public class DeviceAlarm { public class DeviceAlarm {
/**
* id
*/
@Schema(description = "数据库id") @Schema(description = "数据库id")
private String id; private String id;
/**
* Id
*/
@Schema(description = "设备的国标编号") @Schema(description = "设备的国标编号")
private String deviceId; private String deviceId;
@Schema(description = "设备名称")
private String deviceName;
/** /**
* Id * Id
*/ */

View File

@ -1,21 +0,0 @@
package com.genersoft.iot.vmp.gb28181.bean;
import com.alibaba.fastjson2.JSONObject;
import lombok.Data;
@Data
public class SSEMessage<T> {
private String event;
private T data;
public static SSEMessage<DeviceAlarm> getInstance(String event, DeviceAlarm data) {
SSEMessage<DeviceAlarm> message = new SSEMessage<>();
message.setEvent(event);
message.setData(data);
return message;
}
public String ecode(){
return String.format("event:%s\ndata:%s\n", event, JSONObject.toJSONString(data));
}
}

View File

@ -1,16 +1,16 @@
package com.genersoft.iot.vmp.gb28181.controller; package com.genersoft.iot.vmp.gb28181.controller;
import com.genersoft.iot.vmp.gb28181.event.alarm.AlarmEventListener; import com.genersoft.iot.vmp.gb28181.session.SseSessionManager;
import io.swagger.v3.oas.annotations.tags.Tag; import io.swagger.v3.oas.annotations.tags.Tag;
import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController; import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import javax.annotation.Resource; import javax.annotation.Resource;
import javax.servlet.http.HttpServletResponse; import javax.servlet.http.HttpServletResponse;
import java.io.IOException; import java.io.IOException;
import java.io.PrintWriter;
/** /**
@ -26,22 +26,17 @@ import java.io.PrintWriter;
public class SseController { public class SseController {
@Resource @Resource
private AlarmEventListener alarmEventListener; private SseSessionManager sseSessionManager;
/** /**
* SSE . * SSE .
* *
* @param response
* @param browserId ID * @param browserId ID
* @throws IOException IOEXCEPTION
* @author <a href="mailto:xiaoQQya@126.com">xiaoQQya</a>
* @since 2023/11/06
*/ */
@GetMapping("/emit") @GetMapping("/emit")
public void emit(HttpServletResponse response, @RequestParam String browserId) throws IOException, InterruptedException { public SseEmitter emit(HttpServletResponse response, @RequestParam String browserId) throws IOException, InterruptedException {
response.setContentType("text/event-stream"); // response.setContentType("text/event-stream");
response.setCharacterEncoding("utf-8"); // response.setCharacterEncoding("utf-8");
PrintWriter writer = response.getWriter(); return sseSessionManager.conect(browserId);
alarmEventListener.addSseEmitter(browserId, writer);
} }
} }

View File

@ -1,15 +1,12 @@
package com.genersoft.iot.vmp.gb28181.event.alarm; package com.genersoft.iot.vmp.gb28181.event.alarm;
import com.genersoft.iot.vmp.gb28181.bean.SSEMessage; import com.genersoft.iot.vmp.gb28181.session.SseSessionManager;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.NotNull;
import org.springframework.context.ApplicationListener; import org.springframework.context.ApplicationListener;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import java.io.PrintWriter; import javax.annotation.Resource;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/** /**
* . * .
@ -22,54 +19,14 @@ import java.util.concurrent.ConcurrentHashMap;
@Component @Component
public class AlarmEventListener implements ApplicationListener<AlarmEvent> { public class AlarmEventListener implements ApplicationListener<AlarmEvent> {
private static final Map<String, PrintWriter> sseChannelMap = new ConcurrentHashMap<>(); @Resource
private SseSessionManager sseSessionManager;
public void addSseEmitter(String browserId, PrintWriter writer) throws InterruptedException {
sseChannelMap.put(browserId, writer);
log.info("[SSE推送] 连接已建立, 浏览器 ID: {}, 当前在线数: {}", browserId, sseChannelMap.size());
while (!writer.checkError()) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
writer.write(":keep alive\n\n");
writer.flush();
}
removeSseEmitter(browserId, writer);
}
public void removeSseEmitter(String browserId, PrintWriter writer) {
sseChannelMap.remove(browserId, writer);
log.info("[SSE推送] 连接已断开, 浏览器 ID: {}, 当前在线数: {}", browserId, sseChannelMap.size());
}
@Override @Override
public void onApplicationEvent(@NotNull AlarmEvent event) { public void onApplicationEvent(@NotNull AlarmEvent event) {
if (log.isDebugEnabled()) { if (log.isDebugEnabled()) {
log.debug("设备报警事件触发, deviceId: {}, {}", event.getAlarmInfo().getDeviceId(), event.getAlarmInfo().getAlarmDescription()); log.debug("设备报警事件触发, deviceId: {}, {}", event.getAlarmInfo().getDeviceId(), event.getAlarmInfo().getAlarmDescription());
} }
sseSessionManager.sendForAll("message", event.getAlarmInfo());
log.info("设备报警事件触发, deviceId: {}, {}", event.getAlarmInfo().getDeviceId(), event.getAlarmInfo().getAlarmDescription());
for (Iterator<Map.Entry<String, PrintWriter>> it = sseChannelMap.entrySet().iterator(); it.hasNext(); ) {
Map.Entry<String, PrintWriter> response = it.next();
try {
PrintWriter writer = response.getValue();
if (writer.checkError()) {
it.remove();
continue;
}
writer.write(SSEMessage.getInstance("message", event.getAlarmInfo()).ecode());
writer.flush();
} catch (Exception e) {
log.error("[发送SSE] 失败", e);
it.remove();
}
}
} }
} }

View File

@ -0,0 +1,72 @@
package com.genersoft.iot.vmp.gb28181.session;
import com.genersoft.iot.vmp.conf.DynamicTask;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@Component
@Slf4j
public class SseSessionManager {
private static final Map<String, SseEmitter> sseSessionMap = new ConcurrentHashMap<>();
@Autowired
private DynamicTask dynamicTask;
public SseEmitter conect(String browserId){
SseEmitter sseEmitter = new SseEmitter(0L);
sseEmitter.onError((err)-> {
log.error("[SSE推送] 连接错误, 浏览器 ID: {}, {}", browserId, err.getMessage());
sseSessionMap.remove(browserId);
sseEmitter.completeWithError(err);
});
// sseEmitter.onTimeout(() -> {
// log.info("[SSE推送] 连接超时, 浏览器 ID: {}", browserId);
// sseSessionMap.remove(browserId);
// sseEmitter.complete();
// dynamicTask.stop(key);
// });
sseEmitter.onCompletion(() -> {
log.info("[SSE推送] 连接结束, 浏览器 ID: {}", browserId);
sseSessionMap.remove(browserId);
});
sseSessionMap.put(browserId, sseEmitter);
log.info("[SSE推送] 连接已建立, 浏览器 ID: {}, 当前在线数: {}", browserId, sseSessionMap.size());
return sseEmitter;
}
@Scheduled(fixedRate = 1000) //每1秒执行一次
public void execute(){
if (sseSessionMap.isEmpty()){
return;
}
sendForAll("keepalive", "alive");
}
public void sendForAll(String event, Object data) {
for (String browserId : sseSessionMap.keySet()) {
SseEmitter sseEmitter = sseSessionMap.get(browserId);
if (sseEmitter == null) {
continue;
};
try {
sseEmitter.send(SseEmitter.event().name(event).data(data));
} catch (Exception e) {
log.error("[SSE推送] 发送失败: {}", e.getMessage());
sseSessionMap.remove(browserId);
sseEmitter.completeWithError(e);
}
}
}
}

View File

@ -122,6 +122,7 @@ public class NotifyRequestProcessor extends SIPRequestProcessorParent implements
} }
DeviceAlarm deviceAlarm = new DeviceAlarm(); DeviceAlarm deviceAlarm = new DeviceAlarm();
deviceAlarm.setDeviceId(deviceId); deviceAlarm.setDeviceId(deviceId);
deviceAlarm.setDeviceName(device.getName());
deviceAlarm.setAlarmPriority(XmlUtil.getText(rootElement, "AlarmPriority")); deviceAlarm.setAlarmPriority(XmlUtil.getText(rootElement, "AlarmPriority"));
deviceAlarm.setAlarmMethod(XmlUtil.getText(rootElement, "AlarmMethod")); deviceAlarm.setAlarmMethod(XmlUtil.getText(rootElement, "AlarmMethod"));
String alarmTime = XmlUtil.getText(rootElement, "AlarmTime"); String alarmTime = XmlUtil.getText(rootElement, "AlarmTime");

View File

@ -102,7 +102,6 @@ public class AlarmNotifyMessageHandler extends SIPRequestProcessorParent impleme
continue; continue;
} }
RequestEvent evt = sipMsgInfo.getEvt(); RequestEvent evt = sipMsgInfo.getEvt();
System.out.println(evt.getRequest());
// 回复200 OK // 回复200 OK
try { try {
responseAck((SIPRequest) evt.getRequest(), Response.OK); responseAck((SIPRequest) evt.getRequest(), Response.OK);
@ -117,6 +116,7 @@ public class AlarmNotifyMessageHandler extends SIPRequestProcessorParent impleme
DeviceAlarm deviceAlarm = new DeviceAlarm(); DeviceAlarm deviceAlarm = new DeviceAlarm();
deviceAlarm.setCreateTime(DateUtil.getNow()); deviceAlarm.setCreateTime(DateUtil.getNow());
deviceAlarm.setDeviceId(sipMsgInfo.getDevice().getDeviceId()); deviceAlarm.setDeviceId(sipMsgInfo.getDevice().getDeviceId());
deviceAlarm.setDeviceName(sipMsgInfo.getDevice().getName());
deviceAlarm.setChannelId(channelId); deviceAlarm.setChannelId(channelId);
deviceAlarm.setAlarmPriority(getText(sipMsgInfo.getRootElement(), "AlarmPriority")); deviceAlarm.setAlarmPriority(getText(sipMsgInfo.getRootElement(), "AlarmPriority"));
deviceAlarm.setAlarmMethod(getText(sipMsgInfo.getRootElement(), "AlarmMethod")); deviceAlarm.setAlarmMethod(getText(sipMsgInfo.getRootElement(), "AlarmMethod"));
@ -223,6 +223,7 @@ public class AlarmNotifyMessageHandler extends SIPRequestProcessorParent impleme
DeviceAlarm deviceAlarm = new DeviceAlarm(); DeviceAlarm deviceAlarm = new DeviceAlarm();
deviceAlarm.setCreateTime(DateUtil.getNow()); deviceAlarm.setCreateTime(DateUtil.getNow());
deviceAlarm.setDeviceId(parentPlatform.getServerGBId()); deviceAlarm.setDeviceId(parentPlatform.getServerGBId());
deviceAlarm.setDeviceName(parentPlatform.getName());
deviceAlarm.setChannelId(channelId); deviceAlarm.setChannelId(channelId);
deviceAlarm.setAlarmPriority(getText(rootElement, "AlarmPriority")); deviceAlarm.setAlarmPriority(getText(rootElement, "AlarmPriority"));
deviceAlarm.setAlarmMethod(getText(rootElement, "AlarmMethod")); deviceAlarm.setAlarmMethod(getText(rootElement, "AlarmMethod"));

View File

@ -133,7 +133,6 @@ public class ZLMMediaNodeServerService implements IMediaNodeServerService {
param.put("ssrc", ssrc); param.put("ssrc", ssrc);
} }
JSONObject jsonObject = zlmresTfulUtils.stopSendRtp(mediaInfo, param); JSONObject jsonObject = zlmresTfulUtils.stopSendRtp(mediaInfo, param);
System.out.println(jsonObject);
if (jsonObject.getInteger("code") != null && jsonObject.getInteger("code") == 0) { if (jsonObject.getInteger("code") != null && jsonObject.getInteger("code") == 0) {
log.info("[停止发流] 成功: 参数:{}", JSON.toJSONString(param)); log.info("[停止发流] 成功: 参数:{}", JSON.toJSONString(param));
return true; return true;

View File

@ -120,7 +120,8 @@ export default {
that.$notify({ that.$notify({
title: '报警信息', title: '报警信息',
dangerouslyUseHTMLString: true, dangerouslyUseHTMLString: true,
message: `<strong>设备编号:</strong> <i> ${data.deviceId}</i>` + message: `<strong>设备名称:</strong> <i> ${data.deviceName}</i>` +
`<br><strong>设备编号:</strong> <i>${ data.deviceId}</i>` +
`<br><strong>通道编号:</strong> <i>${ data.channelId}</i>` + `<br><strong>通道编号:</strong> <i>${ data.channelId}</i>` +
`<br><strong>报警级别:</strong> <i>${ data.alarmPriorityDescription}</i>` + `<br><strong>报警级别:</strong> <i>${ data.alarmPriorityDescription}</i>` +
`<br><strong>报警方式:</strong> <i>${ data.alarmMethodDescription}</i>` + `<br><strong>报警方式:</strong> <i>${ data.alarmMethodDescription}</i>` +
@ -128,9 +129,8 @@ export default {
`<br><strong>报警时间:</strong> <i>${ data.alarmTime}</i>`, `<br><strong>报警时间:</strong> <i>${ data.alarmTime}</i>`,
type: 'warning', type: 'warning',
position: 'bottom-right', position: 'bottom-right',
duration: 3000 duration: 5000
}); });
}); });
this.sseSource.addEventListener('open', function (e) { this.sseSource.addEventListener('open', function (e) {
console.log("SSE连接打开."); console.log("SSE连接打开.");