diff --git a/src/main/java/com/genersoft/iot/vmp/common/InviteInfo.java b/src/main/java/com/genersoft/iot/vmp/common/InviteInfo.java index 716b600b..a6989ab6 100644 --- a/src/main/java/com/genersoft/iot/vmp/common/InviteInfo.java +++ b/src/main/java/com/genersoft/iot/vmp/common/InviteInfo.java @@ -31,6 +31,8 @@ public class InviteInfo { private String mediaServerId; + private Long expirationTime; + public static InviteInfo getInviteInfo(String deviceId, Integer channelId, String stream, SSRCInfo ssrcInfo, String mediaServerId, String receiveIp, Integer receivePort, String streamMode, diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/service/IInviteStreamService.java b/src/main/java/com/genersoft/iot/vmp/gb28181/service/IInviteStreamService.java index 9903d59b..e8e4d3c6 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/service/IInviteStreamService.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/service/IInviteStreamService.java @@ -39,7 +39,7 @@ public interface IInviteStreamService { */ void removeInviteInfoByDeviceAndChannel(InviteSessionType inviteSessionType, Integer channelId); - List getAllInviteInfo(InviteSessionType type, Integer channelId, String stream); + List getAllInviteInfo(); /** * 获取点播的状态信息 diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/InviteStreamServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/InviteStreamServiceImpl.java index c74a630c..925788f7 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/InviteStreamServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/InviteStreamServiceImpl.java @@ -10,11 +10,12 @@ import com.genersoft.iot.vmp.gb28181.dao.DeviceMapper; import com.genersoft.iot.vmp.gb28181.service.IInviteStreamService; import com.genersoft.iot.vmp.media.event.media.MediaDepartureEvent; import com.genersoft.iot.vmp.service.bean.ErrorCallback; -import com.genersoft.iot.vmp.utils.redis.RedisUtil; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.event.EventListener; +import org.springframework.data.redis.core.Cursor; import org.springframework.data.redis.core.RedisTemplate; +import org.springframework.data.redis.core.ScanOptions; import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Service; @@ -23,7 +24,6 @@ import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.TimeUnit; @Slf4j @Service @@ -61,11 +61,12 @@ public class InviteStreamServiceImpl implements IInviteStreamService { } } } + @Override public void updateInviteInfo(InviteInfo inviteInfo) { if (InviteSessionStatus.ready == inviteInfo.getStatus()) { updateInviteInfo(inviteInfo, Long.valueOf(userSetting.getPlayTimeout()) * 2); - }else { + } else { updateInviteInfo(inviteInfo, null); } } @@ -114,16 +115,15 @@ public class InviteStreamServiceImpl implements IInviteStreamService { inviteInfoForUpdate = inviteInfoInRedis; } - String key = VideoManagerConstants.INVITE_PREFIX + - ":" + inviteInfoForUpdate.getType() + + String key = VideoManagerConstants.INVITE_PREFIX; + String objectKey = inviteInfoForUpdate.getType() + ":" + inviteInfoForUpdate.getChannelId() + - ":" + inviteInfoForUpdate.getStream()+ + ":" + inviteInfoForUpdate.getStream() + ":" + inviteInfoForUpdate.getSsrcInfo().getSsrc(); if (time != null && time > 0) { - redisTemplate.opsForValue().set(key, inviteInfoForUpdate, time, TimeUnit.SECONDS); - }else { - redisTemplate.opsForValue().set(key, inviteInfoForUpdate); + inviteInfoForUpdate.setExpirationTime(time); } + redisTemplate.opsForHash().put(key, objectKey, inviteInfoForUpdate); } @Override @@ -134,8 +134,8 @@ public class InviteStreamServiceImpl implements IInviteStreamService { return null; } removeInviteInfo(inviteInfoInDb); - String key = VideoManagerConstants.INVITE_PREFIX + - ":" + inviteInfo.getType() + + String key = VideoManagerConstants.INVITE_PREFIX; + String objectKey = inviteInfo.getType() + ":" + inviteInfo.getChannelId() + ":" + stream + ":" + inviteInfo.getSsrcInfo().getSsrc(); @@ -144,46 +144,43 @@ public class InviteStreamServiceImpl implements IInviteStreamService { inviteInfoInDb.getSsrcInfo().setStream(stream); } if (InviteSessionStatus.ready == inviteInfo.getStatus()) { - redisTemplate.opsForValue().set(key, inviteInfoInDb, userSetting.getPlayTimeout() * 2, TimeUnit.SECONDS); - }else { - redisTemplate.opsForValue().set(key, inviteInfoInDb); + inviteInfoInDb.setExpirationTime((long) (userSetting.getPlayTimeout() * 2)); } - + redisTemplate.opsForHash().put(key, objectKey, inviteInfoInDb); return inviteInfoInDb; } @Override public InviteInfo getInviteInfo(InviteSessionType type, Integer channelId, String stream) { - String key = VideoManagerConstants.INVITE_PREFIX + - ":" + (type != null ? type : "*") + + String key = VideoManagerConstants.INVITE_PREFIX; + String keyPattern = (type != null ? type : "*") + ":" + (channelId != null ? channelId : "*") + ":" + (stream != null ? stream : "*") + ":*"; - List scanResult = RedisUtil.scan(redisTemplate, key); - if (scanResult.isEmpty()) { - return null; - } - if (scanResult.size() != 1) { - log.warn("[获取InviteInfo] 发现 key: {}存在多条", key); - } + ScanOptions options = ScanOptions.scanOptions().match(keyPattern).count(20).build(); + try (Cursor> cursor = redisTemplate.opsForHash().scan(key, options)) { + if (cursor.hasNext()) { + InviteInfo inviteInfo = (InviteInfo) cursor.next().getValue(); + cursor.close(); + return inviteInfo; - return (InviteInfo) redisTemplate.opsForValue().get(scanResult.get(0)); + } + } catch (Exception e) { + log.error("[Redis-InviteInfo] 查询异常: ", e); + } + return null; } @Override - public List getAllInviteInfo(InviteSessionType type, Integer channelId, String stream) { - String key = VideoManagerConstants.INVITE_PREFIX + - ":" + (type != null ? type : "*") + - ":" + (channelId != null ? channelId : "*") + - ":" + (stream != null ? stream : "*") - + ":*"; - List scanResult = RedisUtil.scan(redisTemplate, key); - if (scanResult.isEmpty()) { - return new ArrayList<>(); - } + public List getAllInviteInfo() { List result = new ArrayList<>(); - for (Object keyObj : scanResult) { - result.add((InviteInfo) redisTemplate.opsForValue().get(keyObj)); + String key = VideoManagerConstants.INVITE_PREFIX; + List values = redisTemplate.opsForHash().values(key); + if(values.isEmpty()) { + return result; + } + for (Object value : values) { + result.add((InviteInfo)value); } return result; } @@ -199,23 +196,22 @@ public class InviteStreamServiceImpl implements IInviteStreamService { } @Override - public void removeInviteInfo(InviteSessionType type, Integer channelId, String stream) { - String scanKey = VideoManagerConstants.INVITE_PREFIX + - ":" + (type != null ? type : "*") + - ":" + (channelId != null ? channelId : "*") + - ":" + (stream != null ? stream : "*") + - ":*"; - List scanResult = RedisUtil.scan(redisTemplate, scanKey); - if (!scanResult.isEmpty()) { - for (Object keyObj : scanResult) { - String key = (String) keyObj; - InviteInfo inviteInfo = (InviteInfo) redisTemplate.opsForValue().get(key); - if (inviteInfo == null) { - continue; - } - redisTemplate.delete(key); - inviteErrorCallbackMap.remove(buildKey(type,channelId, inviteInfo.getStream())); - } + public void removeInviteInfo(InviteSessionType type, Integer channelId, String stream) { + String key = VideoManagerConstants.INVITE_PREFIX; + if (type == null && channelId == null && stream == null) { + redisTemplate.opsForHash().delete(key); + return; + } + InviteInfo inviteInfo = getInviteInfo(type, channelId, stream); + if (inviteInfo != null) { + String objectKey = inviteInfo.getType() + + ":" + inviteInfo.getChannelId() + + ":" + stream + + ":" + inviteInfo.getSsrcInfo().getSsrc(); + redisTemplate.opsForHash().delete(key, objectKey); + } + if (redisTemplate.opsForHash().size(key) == 0) { + redisTemplate.opsForHash().delete(key); } } @@ -230,14 +226,14 @@ public class InviteStreamServiceImpl implements IInviteStreamService { } @Override - public void once(InviteSessionType type, Integer channelId, String stream, ErrorCallback callback) { + public void once(InviteSessionType type, Integer channelId, String stream, ErrorCallback callback) { String key = buildKey(type, channelId, stream); List> callbacks = inviteErrorCallbackMap.computeIfAbsent(key, k -> new CopyOnWriteArrayList<>()); callbacks.add(callback); } - private String buildKey(InviteSessionType type, Integer channelId, String stream) { + private String buildKey(InviteSessionType type, Integer channelId, String stream) { String key = type + ":" + channelId; // 如果ssrc未null那么可以实现一个通道只能一次操作,ssrc不为null则可以支持一个通道多次invite if (stream != null) { @@ -249,7 +245,7 @@ public class InviteStreamServiceImpl implements IInviteStreamService { @Override public void clearInviteInfo(String deviceId) { - List inviteInfoList = getAllInviteInfo(null, null, null); + List inviteInfoList = getAllInviteInfo(); for (InviteInfo inviteInfo : inviteInfoList) { if (inviteInfo.getDeviceId().equals(deviceId)) { removeInviteInfo(inviteInfo); @@ -260,23 +256,21 @@ public class InviteStreamServiceImpl implements IInviteStreamService { @Override public int getStreamInfoCount(String mediaServerId) { int count = 0; - String key = VideoManagerConstants.INVITE_PREFIX + ":*:*:*:*"; - List scanResult = RedisUtil.scan(redisTemplate, key); - if (scanResult.isEmpty()) { - return 0; - }else { - for (Object keyObj : scanResult) { - String keyStr = (String) keyObj; - InviteInfo inviteInfo = (InviteInfo) redisTemplate.opsForValue().get(keyStr); - if (inviteInfo != null - && inviteInfo.getStreamInfo() != null - && inviteInfo.getStreamInfo().getMediaServer() != null - && inviteInfo.getStreamInfo().getMediaServer().getId().equals(mediaServerId)) { - if (inviteInfo.getType().equals(InviteSessionType.DOWNLOAD) && inviteInfo.getStreamInfo().getProgress() == 1) { - continue; - } - count++; + String key = VideoManagerConstants.INVITE_PREFIX; + List values = redisTemplate.opsForHash().values(key); + if (values.isEmpty()) { + return count; + } + for (Object value : values) { + InviteInfo inviteInfo = (InviteInfo)value; + if (inviteInfo != null + && inviteInfo.getStreamInfo() != null + && inviteInfo.getStreamInfo().getMediaServer() != null + && inviteInfo.getStreamInfo().getMediaServer().getId().equals(mediaServerId)) { + if (inviteInfo.getType().equals(InviteSessionType.DOWNLOAD) && inviteInfo.getStreamInfo().getProgress() == 1) { + continue; } + count++; } } return count; @@ -309,13 +303,16 @@ public class InviteStreamServiceImpl implements IInviteStreamService { @Override public InviteInfo getInviteInfoBySSRC(String ssrc) { - String key = VideoManagerConstants.INVITE_PREFIX + ":*:*:*:" + ssrc; - List scanResult = RedisUtil.scan(redisTemplate, key); - if (scanResult.size() != 1) { + List inviteInfoList = getAllInviteInfo(); + if (inviteInfoList.isEmpty()) { return null; } - - return (InviteInfo) redisTemplate.opsForValue().get(scanResult.get(0)); + for (InviteInfo inviteInfo : inviteInfoList) { + if (inviteInfo.getSsrcInfo() != null && ssrc.equals(inviteInfo.getSsrcInfo().getSsrc())) { + return inviteInfo; + } + } + return null; } @Override @@ -325,15 +322,15 @@ public class InviteStreamServiceImpl implements IInviteStreamService { return null; } removeInviteInfo(inviteInfoInDb); - String key = VideoManagerConstants.INVITE_PREFIX + - ":" + inviteInfo.getType() + + String key = VideoManagerConstants.INVITE_PREFIX; + String objectKey = inviteInfo.getType() + ":" + inviteInfo.getChannelId() + ":" + inviteInfo.getStream() + ":" + ssrc; if (inviteInfoInDb.getSsrcInfo() != null) { inviteInfoInDb.getSsrcInfo().setSsrc(ssrc); } - redisTemplate.opsForValue().set(key, inviteInfoInDb); + redisTemplate.opsForHash().put(key, objectKey, inviteInfoInDb); return inviteInfoInDb; } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlayServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlayServiceImpl.java index 946ab021..49be0a70 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlayServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlayServiceImpl.java @@ -1566,10 +1566,11 @@ public class PlayServiceImpl implements IPlayService { @Override public void stop(InviteSessionType type, Device device, DeviceChannel channel, String stream) { - InviteInfo inviteInfo = inviteStreamService.getInviteInfoByStream(type, stream); + InviteInfo inviteInfo = inviteStreamService.getInviteInfo(type, channel.getId(), stream); if (inviteInfo == null) { throw new ControllerException(ErrorCode.ERROR100.getCode(), "点播未找到"); } + inviteStreamService.removeInviteInfoByDeviceAndChannel(inviteInfo.getType(), channel.getId()); if (InviteSessionStatus.ok == inviteInfo.getStatus()) { try { log.info("[停止点播/回放/下载] {}/{}", device.getDeviceId(), channel.getDeviceId()); @@ -1579,7 +1580,7 @@ public class PlayServiceImpl implements IPlayService { throw new ControllerException(ErrorCode.ERROR100.getCode(), "命令发送失败: " + e.getMessage()); } } - inviteStreamService.removeInviteInfoByDeviceAndChannel(inviteInfo.getType(), channel.getId()); + if (inviteInfo.getType() == InviteSessionType.PLAY) { deviceChannelService.stopPlay(channel.getId()); } diff --git a/src/main/java/com/genersoft/iot/vmp/vmanager/TestController.java b/src/main/java/com/genersoft/iot/vmp/vmanager/TestController.java index ec4f126b..6df13ba7 100644 --- a/src/main/java/com/genersoft/iot/vmp/vmanager/TestController.java +++ b/src/main/java/com/genersoft/iot/vmp/vmanager/TestController.java @@ -1,16 +1,21 @@ package com.genersoft.iot.vmp.vmanager; -import com.genersoft.iot.vmp.conf.security.JwtUtils; +import com.genersoft.iot.vmp.common.InviteInfo; +import com.genersoft.iot.vmp.common.InviteSessionType; +import com.genersoft.iot.vmp.common.VideoManagerConstants; import com.genersoft.iot.vmp.media.event.hook.Hook; import com.genersoft.iot.vmp.media.event.hook.HookSubscribe; -import io.swagger.v3.oas.annotations.Operation; -import io.swagger.v3.oas.annotations.security.SecurityRequirement; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.data.redis.core.Cursor; +import org.springframework.data.redis.core.RedisTemplate; +import org.springframework.data.redis.core.ScanOptions; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; +import java.util.ArrayList; import java.util.List; +import java.util.Map; @RestController @RequestMapping("/api/test") @@ -19,13 +24,42 @@ public class TestController { @Autowired private HookSubscribe subscribe; + @Autowired + private RedisTemplate redisTemplate; @GetMapping("/hook/list") - @Operation(summary = "查询角色", security = @SecurityRequirement(name = JwtUtils.HEADER)) public List all(){ return subscribe.getAll(); } + + @GetMapping("/redis") + public List redis(){ + InviteSessionType type = InviteSessionType.PLAY; + String channelId = null; + String stream = null; + + String key = VideoManagerConstants.INVITE_PREFIX; + String keyPattern = (type != null ? type : "*") + + ":" + (channelId != null ? channelId : "*") + + ":" + (stream != null ? stream : "*") + + ":*"; + ScanOptions options = ScanOptions.scanOptions().match(keyPattern).count(20).build(); + Cursor> cursor = redisTemplate.opsForHash().scan(key, options); + List result = new ArrayList<>(); + try { + while (cursor.hasNext()) { + System.out.println(cursor.next().getKey()); + result.add((InviteInfo) cursor.next().getValue()); + } + }catch (Exception e) { + + }finally { + cursor.close(); + } + return result; + } + // @Bean // public ServletRegistrationBean druidStatViewServlet() { // ServletRegistrationBean registrationBean = new ServletRegistrationBean<>(new StatViewServlet(), "/druid/*");