From f78657473e85487c09cd3c10112db48a0c4a7c4e Mon Sep 17 00:00:00 2001 From: xiaoQQya Date: Mon, 6 Nov 2023 20:34:58 +0800 Subject: [PATCH] =?UTF-8?q?fix(=E6=8A=A5=E8=AD=A6=E6=8E=A8=E9=80=81):=20?= =?UTF-8?q?=E4=BF=AE=E5=A4=8D=E6=8A=A5=E8=AD=A6=E6=8E=A8=E9=80=81=E5=8A=9F?= =?UTF-8?q?=E8=83=BD=E6=97=A0=E6=95=88=E7=9A=84=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 该问题原因为 com.genersoft.iot.vmp.conf.GlobalResponseAdvice 类改变了 sse 响应体的数据结构,导致前端无法正确解析 sse 数据,调试后未发现 GlobalResponseAdvice 如何修改的 sse 数据结构,故根据 sse 消息体结构自定义实现了 sse 连接 --- .../vmp/conf/security/WebSecurityConfig.java | 11 ++- .../event/alarm/AlarmEventListener.java | 77 +++++++++++-------- .../gb28181/SseController/SseController.java | 37 --------- .../vmanager/gb28181/sse/SseController.java | 55 +++++++++++++ web_src/src/layout/UiHeader.vue | 16 ++-- web_src/src/main.js | 13 ++-- 6 files changed, 121 insertions(+), 88 deletions(-) delete mode 100755 src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/SseController/SseController.java create mode 100644 src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/sse/SseController.java diff --git a/src/main/java/com/genersoft/iot/vmp/conf/security/WebSecurityConfig.java b/src/main/java/com/genersoft/iot/vmp/conf/security/WebSecurityConfig.java index 6a247359..9cb3a1ff 100644 --- a/src/main/java/com/genersoft/iot/vmp/conf/security/WebSecurityConfig.java +++ b/src/main/java/com/genersoft/iot/vmp/conf/security/WebSecurityConfig.java @@ -1,12 +1,12 @@ package com.genersoft.iot.vmp.conf.security; import com.genersoft.iot.vmp.conf.UserSetting; -import org.springframework.core.annotation.Order; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; +import org.springframework.core.annotation.Order; import org.springframework.security.authentication.AuthenticationManager; import org.springframework.security.authentication.dao.DaoAuthenticationProvider; import org.springframework.security.config.annotation.authentication.builders.AuthenticationManagerBuilder; @@ -28,6 +28,7 @@ import java.util.Arrays; /** * 配置Spring Security + * * @author lin */ @Configuration @@ -75,6 +76,7 @@ public class WebSecurityConfig extends WebSecurityConfigurerAdapter { matchers.add("/js/**"); matchers.add("/api/device/query/snap/**"); matchers.add("/record_proxy/*/**"); + matchers.add("/api/emit"); matchers.addAll(userSetting.getInterfaceAuthenticationExcludes()); // 可以直接访问的静态数据 web.ignoring().antMatchers(matchers.toArray(new String[0])); @@ -83,6 +85,7 @@ public class WebSecurityConfig extends WebSecurityConfigurerAdapter { /** * 配置认证方式 + * * @param auth * @throws Exception */ @@ -111,7 +114,7 @@ public class WebSecurityConfig extends WebSecurityConfigurerAdapter { .authorizeRequests() .requestMatchers(CorsUtils::isPreFlightRequest).permitAll() .antMatchers(userSetting.getInterfaceAuthenticationExcludes().toArray(new String[0])).permitAll() - .antMatchers("/api/user/login","/index/hook/**","/zlm_Proxy/FhTuMYqB2HeCuNOb/record/t/1/2023-03-25/16:35:07-16:35:16-9353.mp4").permitAll() + .antMatchers("/api/user/login", "/index/hook/**").permitAll() .anyRequest().authenticated() // 异常处理器 .and() @@ -124,7 +127,7 @@ public class WebSecurityConfig extends WebSecurityConfigurerAdapter { } - CorsConfigurationSource configurationSource(){ + CorsConfigurationSource configurationSource() { // 配置跨域 CorsConfiguration corsConfiguration = new CorsConfiguration(); corsConfiguration.setAllowedHeaders(Arrays.asList("*")); @@ -135,7 +138,7 @@ public class WebSecurityConfig extends WebSecurityConfigurerAdapter { corsConfiguration.setExposedHeaders(Arrays.asList(JwtUtils.getHeader())); UrlBasedCorsConfigurationSource url = new UrlBasedCorsConfigurationSource(); - url.registerCorsConfiguration("/**",corsConfiguration); + url.registerCorsConfiguration("/**", corsConfiguration); return url; } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/event/alarm/AlarmEventListener.java b/src/main/java/com/genersoft/iot/vmp/gb28181/event/alarm/AlarmEventListener.java index 9ee64773..aef59076 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/event/alarm/AlarmEventListener.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/event/alarm/AlarmEventListener.java @@ -1,55 +1,68 @@ package com.genersoft.iot.vmp.gb28181.event.alarm; -import org.springframework.context.ApplicationListener; -import org.springframework.stereotype.Component; -import org.springframework.web.servlet.mvc.method.annotation.SseEmitter; -import java.io.IOException; -import java.util.Hashtable; -import java.util.Iterator; -import java.util.Map; - +import org.jetbrains.annotations.NotNull; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.context.ApplicationListener; +import org.springframework.stereotype.Component; + +import java.io.PrintWriter; +import java.util.Iterator; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; /** - * @description: 报警事件监听 - * @author: lawrencehj - * @data: 2021-01-20 + * 报警事件监听器. + * + * @author lawrencehj + * @author xiaoQQya + * @since 2021/01/20 */ - @Component public class AlarmEventListener implements ApplicationListener { - private final static Logger logger = LoggerFactory.getLogger(AlarmEventListener.class); + private static final Logger logger = LoggerFactory.getLogger(AlarmEventListener.class); - private static Map sseEmitters = new Hashtable<>(); + private static final Map SSE_CACHE = new ConcurrentHashMap<>(); - public void addSseEmitters(String browserId, SseEmitter sseEmitter) { - sseEmitters.put(browserId, sseEmitter); + public void addSseEmitter(String browserId, PrintWriter writer) { + SSE_CACHE.put(browserId, writer); + logger.info("SSE 在线数量: {}", SSE_CACHE.size()); + } + + public void removeSseEmitter(String browserId, PrintWriter writer) { + SSE_CACHE.remove(browserId, writer); + logger.info("SSE 在线数量: {}", SSE_CACHE.size()); } @Override - public void onApplicationEvent(AlarmEvent event) { + public void onApplicationEvent(@NotNull AlarmEvent event) { if (logger.isDebugEnabled()) { - logger.debug("设备报警事件触发,deviceId:" + event.getAlarmInfo().getDeviceId() + ", " - + event.getAlarmInfo().getAlarmDescription()); + logger.debug("设备报警事件触发, deviceId: {}, {}", event.getAlarmInfo().getDeviceId(), event.getAlarmInfo().getAlarmDescription()); } - String msg = "设备编码: " + event.getAlarmInfo().getDeviceId() + "" - + "
报警描述: " + event.getAlarmInfo().getAlarmDescription() + "" - + "
报警时间: " + event.getAlarmInfo().getAlarmTime() + "" - + "
报警位置: " + event.getAlarmInfo().getLongitude() + "" - + ", " + event.getAlarmInfo().getLatitude() + ""; - for (Iterator> it = sseEmitters.entrySet().iterator(); it.hasNext();) { - Map.Entry emitter = it.next(); - logger.info("推送到SSE连接,浏览器ID: " + emitter.getKey()); + String msg = "设备编号: " + event.getAlarmInfo().getDeviceId() + "" + + "
通道编号: " + event.getAlarmInfo().getChannelId() + "" + + "
报警描述: " + event.getAlarmInfo().getAlarmDescription() + "" + + "
报警时间: " + event.getAlarmInfo().getAlarmTime() + ""; + + for (Iterator> it = SSE_CACHE.entrySet().iterator(); it.hasNext(); ) { + Map.Entry response = it.next(); + logger.info("推送到 SSE 连接, 浏览器 ID: {}", response.getKey()); try { - emitter.getValue().send(msg); - } catch (IOException | IllegalStateException e) { - if (logger.isDebugEnabled()) { - logger.debug("SSE连接已关闭"); + PrintWriter writer = response.getValue(); + + if (writer.checkError()) { + it.remove(); + continue; } - // 移除已关闭的连接 + + String sseMsg = "event:message\n" + + "data:" + msg + "\n" + + "\n"; + writer.write(sseMsg); + writer.flush(); + } catch (Exception e) { it.remove(); } } diff --git a/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/SseController/SseController.java b/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/SseController/SseController.java deleted file mode 100755 index b1ad3b9f..00000000 --- a/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/SseController/SseController.java +++ /dev/null @@ -1,37 +0,0 @@ -package com.genersoft.iot.vmp.vmanager.gb28181.SseController; - -import com.genersoft.iot.vmp.gb28181.event.alarm.AlarmEventListener; - -import io.swagger.v3.oas.annotations.tags.Tag; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Controller; -import org.springframework.web.bind.annotation.CrossOrigin; -import org.springframework.web.bind.annotation.GetMapping; -import org.springframework.web.bind.annotation.RequestMapping; -import org.springframework.web.bind.annotation.RequestParam; -import org.springframework.web.servlet.mvc.method.annotation.SseEmitter; - -/** - * @description: SSE推送 - * @author: lawrencehj - * @data: 2021-01-20 - */ -@Tag(name = "SSE推送") - -@Controller -@RequestMapping("/api") -public class SseController { - @Autowired - AlarmEventListener alarmEventListener; - - @GetMapping("/emit") - public SseEmitter emit(@RequestParam String browserId) { - final SseEmitter sseEmitter = new SseEmitter(0L); - try { - alarmEventListener.addSseEmitters(browserId, sseEmitter); - }catch (Exception e){ - sseEmitter.completeWithError(e); - } - return sseEmitter; - } -} diff --git a/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/sse/SseController.java b/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/sse/SseController.java new file mode 100644 index 00000000..575f22b3 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/sse/SseController.java @@ -0,0 +1,55 @@ +package com.genersoft.iot.vmp.vmanager.gb28181.sse; + +import com.genersoft.iot.vmp.gb28181.event.alarm.AlarmEventListener; +import io.swagger.v3.oas.annotations.tags.Tag; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RequestParam; +import org.springframework.web.bind.annotation.RestController; + +import javax.annotation.Resource; +import javax.servlet.http.HttpServletResponse; +import java.io.IOException; +import java.io.PrintWriter; + + +/** + * SSE 推送. + * + * @author lawrencehj + * @author xiaoQQya + * @since 2021/01/20 + */ +@Tag(name = "SSE 推送") +@RestController +@RequestMapping("/api") +public class SseController { + + @Resource + private AlarmEventListener alarmEventListener; + + /** + * SSE 推送. + * + * @param response 响应 + * @param browserId 浏览器ID + * @throws IOException IOEXCEPTION + * @author xiaoQQya + * @since 2023/11/06 + */ + @GetMapping("/emit") + public void emit(HttpServletResponse response, @RequestParam String browserId) throws IOException, InterruptedException { + response.setContentType("text/event-stream"); + response.setCharacterEncoding("utf-8"); + + PrintWriter writer = response.getWriter(); + alarmEventListener.addSseEmitter(browserId, writer); + + while (!writer.checkError()) { + Thread.sleep(1000); + writer.write(":keep alive\n\n"); + writer.flush(); + } + alarmEventListener.removeSseEmitter(browserId, writer); + } +} diff --git a/web_src/src/layout/UiHeader.vue b/web_src/src/layout/UiHeader.vue index 2cdca02c..fdfcb9f9 100755 --- a/web_src/src/layout/UiHeader.vue +++ b/web_src/src/layout/UiHeader.vue @@ -37,7 +37,6 @@