From 53046208616d2e9e6cc1c81027ce75373b83cc44 Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: Tue, 29 Oct 2024 15:57:52 +0800 Subject: [PATCH] =?UTF-8?q?=E9=87=8D=E6=9E=84=E6=8A=A5=E8=AD=A6SSE?= =?UTF-8?q?=E6=8E=A8=E9=80=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../iot/vmp/common/VideoManagerConstants.java | 1 + .../iot/vmp/gb28181/bean/DeviceAlarm.java | 9 +-- .../iot/vmp/gb28181/bean/SSEMessage.java | 21 ------ .../vmp/gb28181/controller/SseController.java | 19 ++--- .../event/alarm/AlarmEventListener.java | 53 ++------------ .../gb28181/session/SseSessionManager.java | 72 +++++++++++++++++++ .../request/impl/NotifyRequestProcessor.java | 1 + .../notify/cmd/AlarmNotifyMessageHandler.java | 3 +- .../media/zlm/ZLMMediaNodeServerService.java | 1 - web_src/src/layout/UiHeader.vue | 6 +- 10 files changed, 94 insertions(+), 92 deletions(-) delete mode 100644 src/main/java/com/genersoft/iot/vmp/gb28181/bean/SSEMessage.java create mode 100644 src/main/java/com/genersoft/iot/vmp/gb28181/session/SseSessionManager.java diff --git a/src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java b/src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java index 53f871de..a639531d 100644 --- a/src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java +++ b/src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java @@ -52,6 +52,7 @@ public class VideoManagerConstants { public static final String PUSH_STREAM_LIST = "VMP_PUSH_STREAM_LIST_"; public static final String WAITE_SEND_PUSH_STREAM = "VMP_WAITE_SEND_PUSH_STREAM:"; public static final String START_SEND_PUSH_STREAM = "VMP_START_SEND_PUSH_STREAM:"; + public static final String SSE_TASK_KEY = "SSE_TASK_"; diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/DeviceAlarm.java b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/DeviceAlarm.java index 10991aed..22d3dd2a 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/DeviceAlarm.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/DeviceAlarm.java @@ -13,18 +13,15 @@ import java.util.Set; @Data public class DeviceAlarm { - /** - * 数据库id - */ @Schema(description = "数据库id") private String id; - /** - * 设备Id - */ @Schema(description = "设备的国标编号") private String deviceId; + @Schema(description = "设备名称") + private String deviceName; + /** * 通道Id */ diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SSEMessage.java b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SSEMessage.java deleted file mode 100644 index eec5d2d0..00000000 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SSEMessage.java +++ /dev/null @@ -1,21 +0,0 @@ -package com.genersoft.iot.vmp.gb28181.bean; - -import com.alibaba.fastjson2.JSONObject; -import lombok.Data; - -@Data -public class SSEMessage { - private String event; - private T data; - - public static SSEMessage getInstance(String event, DeviceAlarm data) { - SSEMessage message = new SSEMessage<>(); - message.setEvent(event); - message.setData(data); - return message; - } - - public String ecode(){ - return String.format("event:%s\ndata:%s\n", event, JSONObject.toJSONString(data)); - } -} diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/controller/SseController.java b/src/main/java/com/genersoft/iot/vmp/gb28181/controller/SseController.java index 16e5a4d7..08cb8857 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/controller/SseController.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/controller/SseController.java @@ -1,16 +1,16 @@ package com.genersoft.iot.vmp.gb28181.controller; -import com.genersoft.iot.vmp.gb28181.event.alarm.AlarmEventListener; +import com.genersoft.iot.vmp.gb28181.session.SseSessionManager; import io.swagger.v3.oas.annotations.tags.Tag; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; +import org.springframework.web.servlet.mvc.method.annotation.SseEmitter; import javax.annotation.Resource; import javax.servlet.http.HttpServletResponse; import java.io.IOException; -import java.io.PrintWriter; /** @@ -26,22 +26,17 @@ import java.io.PrintWriter; public class SseController { @Resource - private AlarmEventListener alarmEventListener; + private SseSessionManager sseSessionManager; /** * SSE 推送. * - * @param response 响应 * @param browserId 浏览器ID - * @throws IOException IOEXCEPTION - * @author xiaoQQya - * @since 2023/11/06 */ @GetMapping("/emit") - public void emit(HttpServletResponse response, @RequestParam String browserId) throws IOException, InterruptedException { - response.setContentType("text/event-stream"); - response.setCharacterEncoding("utf-8"); - PrintWriter writer = response.getWriter(); - alarmEventListener.addSseEmitter(browserId, writer); + public SseEmitter emit(HttpServletResponse response, @RequestParam String browserId) throws IOException, InterruptedException { +// response.setContentType("text/event-stream"); +// response.setCharacterEncoding("utf-8"); + return sseSessionManager.conect(browserId); } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/event/alarm/AlarmEventListener.java b/src/main/java/com/genersoft/iot/vmp/gb28181/event/alarm/AlarmEventListener.java index e20e9353..a5ffc1a1 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/event/alarm/AlarmEventListener.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/event/alarm/AlarmEventListener.java @@ -1,15 +1,12 @@ package com.genersoft.iot.vmp.gb28181.event.alarm; -import com.genersoft.iot.vmp.gb28181.bean.SSEMessage; +import com.genersoft.iot.vmp.gb28181.session.SseSessionManager; import lombok.extern.slf4j.Slf4j; import org.jetbrains.annotations.NotNull; import org.springframework.context.ApplicationListener; import org.springframework.stereotype.Component; -import java.io.PrintWriter; -import java.util.Iterator; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; +import javax.annotation.Resource; /** * 报警事件监听器. @@ -22,54 +19,14 @@ import java.util.concurrent.ConcurrentHashMap; @Component public class AlarmEventListener implements ApplicationListener { - private static final Map sseChannelMap = new ConcurrentHashMap<>(); - - public void addSseEmitter(String browserId, PrintWriter writer) throws InterruptedException { - sseChannelMap.put(browserId, writer); - log.info("[SSE推送] 连接已建立, 浏览器 ID: {}, 当前在线数: {}", browserId, sseChannelMap.size()); - while (!writer.checkError()) { - try { - Thread.sleep(1000); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - writer.write(":keep alive\n\n"); - writer.flush(); - } - removeSseEmitter(browserId, writer); - - } - - public void removeSseEmitter(String browserId, PrintWriter writer) { - sseChannelMap.remove(browserId, writer); - log.info("[SSE推送] 连接已断开, 浏览器 ID: {}, 当前在线数: {}", browserId, sseChannelMap.size()); - } + @Resource + private SseSessionManager sseSessionManager; @Override public void onApplicationEvent(@NotNull AlarmEvent event) { if (log.isDebugEnabled()) { log.debug("设备报警事件触发, deviceId: {}, {}", event.getAlarmInfo().getDeviceId(), event.getAlarmInfo().getAlarmDescription()); } - - log.info("设备报警事件触发, deviceId: {}, {}", event.getAlarmInfo().getDeviceId(), event.getAlarmInfo().getAlarmDescription()); - - for (Iterator> it = sseChannelMap.entrySet().iterator(); it.hasNext(); ) { - Map.Entry response = it.next(); - - try { - PrintWriter writer = response.getValue(); - - if (writer.checkError()) { - it.remove(); - continue; - } - - writer.write(SSEMessage.getInstance("message", event.getAlarmInfo()).ecode()); - writer.flush(); - } catch (Exception e) { - log.error("[发送SSE] 失败", e); - it.remove(); - } - } + sseSessionManager.sendForAll("message", event.getAlarmInfo()); } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/session/SseSessionManager.java b/src/main/java/com/genersoft/iot/vmp/gb28181/session/SseSessionManager.java new file mode 100644 index 00000000..6c88f7df --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/session/SseSessionManager.java @@ -0,0 +1,72 @@ +package com.genersoft.iot.vmp.gb28181.session; + +import com.genersoft.iot.vmp.conf.DynamicTask; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.stereotype.Component; +import org.springframework.web.servlet.mvc.method.annotation.SseEmitter; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +@Component +@Slf4j +public class SseSessionManager { + + private static final Map sseSessionMap = new ConcurrentHashMap<>(); + + @Autowired + private DynamicTask dynamicTask; + + public SseEmitter conect(String browserId){ + SseEmitter sseEmitter = new SseEmitter(0L); + sseEmitter.onError((err)-> { + log.error("[SSE推送] 连接错误, 浏览器 ID: {}, {}", browserId, err.getMessage()); + sseSessionMap.remove(browserId); + sseEmitter.completeWithError(err); + }); + +// sseEmitter.onTimeout(() -> { +// log.info("[SSE推送] 连接超时, 浏览器 ID: {}", browserId); +// sseSessionMap.remove(browserId); +// sseEmitter.complete(); +// dynamicTask.stop(key); +// }); + + sseEmitter.onCompletion(() -> { + log.info("[SSE推送] 连接结束, 浏览器 ID: {}", browserId); + sseSessionMap.remove(browserId); + }); + + sseSessionMap.put(browserId, sseEmitter); + + log.info("[SSE推送] 连接已建立, 浏览器 ID: {}, 当前在线数: {}", browserId, sseSessionMap.size()); + return sseEmitter; + } + + @Scheduled(fixedRate = 1000) //每1秒执行一次 + public void execute(){ + if (sseSessionMap.isEmpty()){ + return; + } + sendForAll("keepalive", "alive"); + } + + + public void sendForAll(String event, Object data) { + for (String browserId : sseSessionMap.keySet()) { + SseEmitter sseEmitter = sseSessionMap.get(browserId); + if (sseEmitter == null) { + continue; + }; + try { + sseEmitter.send(SseEmitter.event().name(event).data(data)); + } catch (Exception e) { + log.error("[SSE推送] 发送失败: {}", e.getMessage()); + sseSessionMap.remove(browserId); + sseEmitter.completeWithError(e); + } + } + } +} diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java index ba1cd623..b7ed168b 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java @@ -122,6 +122,7 @@ public class NotifyRequestProcessor extends SIPRequestProcessorParent implements } DeviceAlarm deviceAlarm = new DeviceAlarm(); deviceAlarm.setDeviceId(deviceId); + deviceAlarm.setDeviceName(device.getName()); deviceAlarm.setAlarmPriority(XmlUtil.getText(rootElement, "AlarmPriority")); deviceAlarm.setAlarmMethod(XmlUtil.getText(rootElement, "AlarmMethod")); String alarmTime = XmlUtil.getText(rootElement, "AlarmTime"); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/AlarmNotifyMessageHandler.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/AlarmNotifyMessageHandler.java index 230f9039..999a5e94 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/AlarmNotifyMessageHandler.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/AlarmNotifyMessageHandler.java @@ -102,7 +102,6 @@ public class AlarmNotifyMessageHandler extends SIPRequestProcessorParent impleme continue; } RequestEvent evt = sipMsgInfo.getEvt(); - System.out.println(evt.getRequest()); // 回复200 OK try { responseAck((SIPRequest) evt.getRequest(), Response.OK); @@ -117,6 +116,7 @@ public class AlarmNotifyMessageHandler extends SIPRequestProcessorParent impleme DeviceAlarm deviceAlarm = new DeviceAlarm(); deviceAlarm.setCreateTime(DateUtil.getNow()); deviceAlarm.setDeviceId(sipMsgInfo.getDevice().getDeviceId()); + deviceAlarm.setDeviceName(sipMsgInfo.getDevice().getName()); deviceAlarm.setChannelId(channelId); deviceAlarm.setAlarmPriority(getText(sipMsgInfo.getRootElement(), "AlarmPriority")); deviceAlarm.setAlarmMethod(getText(sipMsgInfo.getRootElement(), "AlarmMethod")); @@ -223,6 +223,7 @@ public class AlarmNotifyMessageHandler extends SIPRequestProcessorParent impleme DeviceAlarm deviceAlarm = new DeviceAlarm(); deviceAlarm.setCreateTime(DateUtil.getNow()); deviceAlarm.setDeviceId(parentPlatform.getServerGBId()); + deviceAlarm.setDeviceName(parentPlatform.getName()); deviceAlarm.setChannelId(channelId); deviceAlarm.setAlarmPriority(getText(rootElement, "AlarmPriority")); deviceAlarm.setAlarmMethod(getText(rootElement, "AlarmMethod")); diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaNodeServerService.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaNodeServerService.java index 9e8e6c50..223365cb 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaNodeServerService.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaNodeServerService.java @@ -133,7 +133,6 @@ public class ZLMMediaNodeServerService implements IMediaNodeServerService { param.put("ssrc", ssrc); } JSONObject jsonObject = zlmresTfulUtils.stopSendRtp(mediaInfo, param); - System.out.println(jsonObject); if (jsonObject.getInteger("code") != null && jsonObject.getInteger("code") == 0) { log.info("[停止发流] 成功: 参数:{}", JSON.toJSONString(param)); return true; diff --git a/web_src/src/layout/UiHeader.vue b/web_src/src/layout/UiHeader.vue index 1b753aec..9406509f 100755 --- a/web_src/src/layout/UiHeader.vue +++ b/web_src/src/layout/UiHeader.vue @@ -120,7 +120,8 @@ export default { that.$notify({ title: '报警信息', dangerouslyUseHTMLString: true, - message: `设备编号: ${data.deviceId}` + + message: `设备名称: ${data.deviceName}` + + `
设备编号: ${ data.deviceId}` + `
通道编号: ${ data.channelId}` + `
报警级别: ${ data.alarmPriorityDescription}` + `
报警方式: ${ data.alarmMethodDescription}` + @@ -128,9 +129,8 @@ export default { `
报警时间: ${ data.alarmTime}`, type: 'warning', position: 'bottom-right', - duration: 3000 + duration: 5000 }); - }); this.sseSource.addEventListener('open', function (e) { console.log("SSE连接打开.");