diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/dao/CommonGBChannelMapper.java b/src/main/java/com/genersoft/iot/vmp/gb28181/dao/CommonGBChannelMapper.java index 92a85fe1..44389f34 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/dao/CommonGBChannelMapper.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/dao/CommonGBChannelMapper.java @@ -414,4 +414,46 @@ public interface CommonGBChannelMapper { " "}) int updateGroup(@Param("parentId") String parentId, @Param("businessGroup") String businessGroup, List channelList); + + @Update({""}) + int batchUpdate(List commonGBChannels); } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/service/IGbChannelService.java b/src/main/java/com/genersoft/iot/vmp/gb28181/service/IGbChannelService.java index 10d2d295..1afbde0d 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/service/IGbChannelService.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/service/IGbChannelService.java @@ -73,4 +73,6 @@ public interface IGbChannelService { void addChannelToGroupByGbDevice(String parentId, String businessGroup, List deviceIds); void deleteChannelToGroupByGbDevice(List deviceIds); + + void batchUpdate(List commonGBChannels); } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/GbChannelServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/GbChannelServiceImpl.java index 2c74cdb9..fb295c0c 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/GbChannelServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/GbChannelServiceImpl.java @@ -238,6 +238,36 @@ public class GbChannelServiceImpl implements IGbChannelService { log.warn("[新增多个通道] 通道数量为{},成功保存:{}", commonGBChannels.size(), result); } + @Override + public void batchUpdate(List commonGBChannels) { + if (commonGBChannels.isEmpty()) { + log.warn("[更新多个通道] 通道数量为0,更新失败"); + return; + } + // 批量保存 + int limitCount = 1000; + int result = 0; + if (commonGBChannels.size() > limitCount) { + for (int i = 0; i < commonGBChannels.size(); i += limitCount) { + int toIndex = i + limitCount; + if (i + limitCount > commonGBChannels.size()) { + toIndex = commonGBChannels.size(); + } + result += commonGBChannelMapper.batchUpdate(commonGBChannels.subList(i, toIndex)); + } + }else { + result += commonGBChannelMapper.batchUpdate(commonGBChannels); + } + log.warn("[更新多个通道] 通道数量为{},成功保存:{}", commonGBChannels.size(), result); + // 发送通过更新通知 + try { + // 发送通知 + eventPublisher.catalogEventPublish(null, commonGBChannels, CatalogEvent.UPDATE); + }catch (Exception e) { + log.warn("[更新多个通道] 发送失败,{}个", commonGBChannels.size(), e); + } + } + @Override @Transactional public void updateStatus(List commonGBChannels) { @@ -259,11 +289,17 @@ public class GbChannelServiceImpl implements IGbChannelService { result += commonGBChannelMapper.updateStatus(commonGBChannels); } log.warn("[更新多个通道状态] 通道数量为{},成功保存:{}", commonGBChannels.size(), result); + // 发送通过更新通知 + try { + // 发送通知 + eventPublisher.catalogEventPublish(null, commonGBChannels, CatalogEvent.UPDATE); + }catch (Exception e) { + log.warn("[更新多个通道] 发送失败,{}个", commonGBChannels.size(), e); + } } @Override public List queryByPlatformId(Integer platformId) { - return commonGBChannelMapper.queryByPlatformId(platformId); } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlayServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlayServiceImpl.java index ffa8a2b2..f70fd790 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlayServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlayServiceImpl.java @@ -523,7 +523,7 @@ public class PlayServiceImpl implements IPlayService { streamSession.remove(device.getDeviceId(), channel.getDeviceId(), ssrcInfo.getStream()); mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream()); // 取消订阅消息监听 - subscribe.removeSubscribe(Hook.getInstance(HookType.on_media_arrival, "rtp", ssrcInfo.getStream(), mediaServerItem.getId())); + subscribe.removeSubscribe(Hook.getInstance(HookType.on_media_arrival, "rtp", ssrcInfo.getStream())); } }else { log.info("[点播超时] 收流超时 deviceId: {}, channelId: {},码流:{},端口:{}, SSRC: {}", diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java index c6c0a050..1bd55dd2 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java @@ -10,6 +10,7 @@ import com.genersoft.iot.vmp.conf.SipConfig; import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.conf.exception.ControllerException; import com.genersoft.iot.vmp.gb28181.bean.*; +import com.genersoft.iot.vmp.gb28181.service.IPlayService; import com.genersoft.iot.vmp.gb28181.session.AudioBroadcastManager; import com.genersoft.iot.vmp.gb28181.session.SSRCFactory; import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager; @@ -24,11 +25,8 @@ import com.genersoft.iot.vmp.media.bean.MediaServer; import com.genersoft.iot.vmp.media.event.hook.Hook; import com.genersoft.iot.vmp.media.event.hook.HookSubscribe; import com.genersoft.iot.vmp.media.event.hook.HookType; -import com.genersoft.iot.vmp.media.event.media.MediaArrivalEvent; import com.genersoft.iot.vmp.media.service.IMediaServerService; import com.genersoft.iot.vmp.media.zlm.SendRtpPortManager; -import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam; -import com.genersoft.iot.vmp.gb28181.service.IPlayService; import com.genersoft.iot.vmp.service.bean.ErrorCallback; import com.genersoft.iot.vmp.service.bean.InviteErrorCode; import com.genersoft.iot.vmp.service.bean.MessageForPushChannel; @@ -598,10 +596,10 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements sendRtpItem.setPlayType(InviteStreamType.PUSH); if (streamPushItem != null) { // 从redis查询是否正在接收这个推流 - MediaArrivalEvent mediaArrivalEvent = redisCatchStorage.getPushListItem(gbStream.getApp(), gbStream.getStream()); - if (mediaArrivalEvent != null) { - sendRtpItem.setServerId(mediaArrivalEvent.getServerId()); - sendRtpItem.setMediaServerId(mediaArrivalEvent.getMediaServer().getId()); + MediaInfo mediaInfo = redisCatchStorage.getPushListItem(gbStream.getApp(), gbStream.getStream()); + if (mediaInfo != null) { + sendRtpItem.setServerId(mediaInfo.getServerId()); + sendRtpItem.setMediaServerId(mediaInfo.getMediaServer().getId()); redisCatchStorage.updateSendRTPSever(sendRtpItem); // 开始推流 diff --git a/src/main/java/com/genersoft/iot/vmp/media/bean/MediaInfo.java b/src/main/java/com/genersoft/iot/vmp/media/bean/MediaInfo.java index dcd4944f..ace46986 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/bean/MediaInfo.java +++ b/src/main/java/com/genersoft/iot/vmp/media/bean/MediaInfo.java @@ -3,10 +3,12 @@ package com.genersoft.iot.vmp.media.bean; import com.alibaba.fastjson2.JSONArray; import com.alibaba.fastjson2.JSONObject; import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam; +import com.genersoft.iot.vmp.utils.MediaServerUtils; import io.swagger.v3.oas.annotations.media.Schema; import lombok.Data; import java.util.List; +import java.util.Map; /** * 视频信息 @@ -51,10 +53,15 @@ public class MediaInfo { private Long bytesSpeed; @Schema(description = "鉴权参数") private String callId; + @Schema(description = "额外参数") + private Map paramMap; + @Schema(description = "服务ID") + private String serverId; - public static MediaInfo getInstance(JSONObject jsonObject, MediaServer mediaServer) { + public static MediaInfo getInstance(JSONObject jsonObject, MediaServer mediaServer, String serverId) { MediaInfo mediaInfo = new MediaInfo(); mediaInfo.setMediaServer(mediaServer); + mediaInfo.setServerId(serverId); String app = jsonObject.getString("app"); mediaInfo.setApp(app); String stream = jsonObject.getString("stream"); @@ -66,6 +73,7 @@ public class MediaInfo { Integer originType = jsonObject.getInteger("originType"); String originUrl = jsonObject.getString("originUrl"); Long aliveSecond = jsonObject.getLong("aliveSecond"); + String params = jsonObject.getString("params"); Long bytesSpeed = jsonObject.getLong("bytesSpeed"); if (totalReaderCount != null) { mediaInfo.setReaderCount(totalReaderCount); @@ -86,6 +94,12 @@ public class MediaInfo { if (bytesSpeed != null) { mediaInfo.setBytesSpeed(bytesSpeed); } + if (params != null) { + mediaInfo.setParamMap(MediaServerUtils.urlParamToMap(params)); + if(mediaInfo.getCallId() == null) { + mediaInfo.setCallId(mediaInfo.getParamMap().get("callId")); + } + } JSONArray jsonArray = jsonObject.getJSONArray("tracks"); if (jsonArray.isEmpty()) { return null; @@ -137,7 +151,7 @@ public class MediaInfo { return mediaInfo; } - public static MediaInfo getInstance(OnStreamChangedHookParam param, MediaServer mediaServer) { + public static MediaInfo getInstance(OnStreamChangedHookParam param, MediaServer mediaServer, String serverId) { MediaInfo mediaInfo = new MediaInfo(); mediaInfo.setApp(param.getApp()); @@ -150,6 +164,11 @@ public class MediaInfo { mediaInfo.setOriginUrl(param.getOriginUrl()); mediaInfo.setAliveSecond(param.getAliveSecond()); mediaInfo.setBytesSpeed(param.getBytesSpeed()); + mediaInfo.setParamMap(param.getParamMap()); + if(mediaInfo.getCallId() == null) { + mediaInfo.setCallId(param.getParamMap().get("callId")); + } + mediaInfo.setServerId(serverId); List tracks = param.getTracks(); if (tracks == null || tracks.isEmpty()) { return mediaInfo; diff --git a/src/main/java/com/genersoft/iot/vmp/media/event/hook/Hook.java b/src/main/java/com/genersoft/iot/vmp/media/event/hook/Hook.java index 341e3c36..0d1c55a9 100755 --- a/src/main/java/com/genersoft/iot/vmp/media/event/hook/Hook.java +++ b/src/main/java/com/genersoft/iot/vmp/media/event/hook/Hook.java @@ -1,9 +1,12 @@ package com.genersoft.iot.vmp.media.event.hook; +import lombok.Data; + /** * zlm hook事件的参数 * @author lin */ +@Data public class Hook { private HookType hookType; @@ -12,60 +15,21 @@ public class Hook { private String stream; - private String mediaServerId; - private Long expireTime; - public static Hook getInstance(HookType hookType, String app, String stream, String mediaServerId) { + public static Hook getInstance(HookType hookType, String app, String stream) { Hook hookSubscribe = new Hook(); hookSubscribe.setApp(app); hookSubscribe.setStream(stream); hookSubscribe.setHookType(hookType); - hookSubscribe.setMediaServerId(mediaServerId); hookSubscribe.setExpireTime(System.currentTimeMillis() + 5 * 60 * 1000); return hookSubscribe; } - public HookType getHookType() { - return hookType; - } - - public void setHookType(HookType hookType) { - this.hookType = hookType; - } - - public String getApp() { - return app; - } - - public void setApp(String app) { - this.app = app; - } - - public String getStream() { - return stream; - } - - public void setStream(String stream) { - this.stream = stream; - } - - - public Long getExpireTime() { - return expireTime; - } - - public void setExpireTime(Long expireTime) { - this.expireTime = expireTime; - } - - public String getMediaServerId() { - return mediaServerId; - } - - public void setMediaServerId(String mediaServerId) { - this.mediaServerId = mediaServerId; + public static Hook getInstance(HookType hookType, String app, String stream, String mediaServer) { + // TODO 后续修改所有方法 + return Hook.getInstance(hookType, app, stream); } @Override @@ -74,8 +38,7 @@ public class Hook { Hook param = (Hook) obj; return param.getHookType().equals(this.hookType) && param.getApp().equals(this.app) - && param.getStream().equals(this.stream) - && param.getMediaServerId().equals(this.mediaServerId); + && param.getStream().equals(this.stream); }else { return false; } @@ -83,6 +46,6 @@ public class Hook { @Override public String toString() { - return this.getHookType() + this.getApp() + this.getStream() + this.getMediaServerId(); + return this.getHookType() + this.getApp() + this.getStream(); } } diff --git a/src/main/java/com/genersoft/iot/vmp/media/event/hook/HookSubscribe.java b/src/main/java/com/genersoft/iot/vmp/media/event/hook/HookSubscribe.java index 58e37619..ee2f5677 100755 --- a/src/main/java/com/genersoft/iot/vmp/media/event/hook/HookSubscribe.java +++ b/src/main/java/com/genersoft/iot/vmp/media/event/hook/HookSubscribe.java @@ -70,11 +70,13 @@ public class HookSubscribe { private final Map allHook = new ConcurrentHashMap<>(); private void sendNotify(HookType hookType, MediaEvent event) { - Hook paramHook = Hook.getInstance(hookType, event.getApp(), event.getStream(), event.getMediaServer().getId()); + Hook paramHook = Hook.getInstance(hookType, event.getApp(), event.getStream()); Event hookSubscribeEvent = allSubscribes.get(paramHook.toString()); if (hookSubscribeEvent != null) { HookData data = HookData.getInstance(event); hookSubscribeEvent.response(data); + }else { + } } diff --git a/src/main/java/com/genersoft/iot/vmp/media/event/media/MediaArrivalEvent.java b/src/main/java/com/genersoft/iot/vmp/media/event/media/MediaArrivalEvent.java index 78b50384..b91b77b0 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/event/media/MediaArrivalEvent.java +++ b/src/main/java/com/genersoft/iot/vmp/media/event/media/MediaArrivalEvent.java @@ -4,7 +4,6 @@ import com.genersoft.iot.vmp.media.bean.MediaInfo; import com.genersoft.iot.vmp.media.bean.MediaServer; import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam; import com.genersoft.iot.vmp.vmanager.bean.StreamContent; -import lombok.Data; import lombok.Getter; import lombok.Setter; @@ -19,15 +18,14 @@ public class MediaArrivalEvent extends MediaEvent { super(source); } - public static MediaArrivalEvent getInstance(Object source, OnStreamChangedHookParam hookParam, MediaServer mediaServer){ + public static MediaArrivalEvent getInstance(Object source, OnStreamChangedHookParam hookParam, MediaServer mediaServer, String serverId){ MediaArrivalEvent mediaArrivalEvent = new MediaArrivalEvent(source); - mediaArrivalEvent.setMediaInfo(MediaInfo.getInstance(hookParam, mediaServer)); + mediaArrivalEvent.setMediaInfo(MediaInfo.getInstance(hookParam, mediaServer, serverId)); mediaArrivalEvent.setApp(hookParam.getApp()); mediaArrivalEvent.setStream(hookParam.getStream()); mediaArrivalEvent.setMediaServer(mediaServer); mediaArrivalEvent.setSchema(hookParam.getSchema()); mediaArrivalEvent.setSchema(hookParam.getSchema()); - mediaArrivalEvent.setHookParam(hookParam); mediaArrivalEvent.setParamMap(hookParam.getParamMap()); return mediaArrivalEvent; } @@ -40,10 +38,6 @@ public class MediaArrivalEvent extends MediaEvent { @Setter private String callId; - @Getter - @Setter - private OnStreamChangedHookParam hookParam; - @Getter @Setter private StreamContent streamInfo; diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java index f2ecb44f..7829d8ee 100755 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java @@ -156,7 +156,7 @@ public class ZLMHttpHookListener { }else { param.setParamMap(new HashMap<>()); } - MediaArrivalEvent mediaArrivalEvent = MediaArrivalEvent.getInstance(this, param, mediaServer); + MediaArrivalEvent mediaArrivalEvent = MediaArrivalEvent.getInstance(this, param, mediaServer, userSetting.getServerId()); applicationEventPublisher.publishEvent(mediaArrivalEvent); } else { log.info("[ZLM HOOK] 流注销, {}->{}->{}/{}", param.getMediaServerId(), param.getSchema(), param.getApp(), param.getStream()); diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaNodeServerService.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaNodeServerService.java index b0a02a31..5e7f4563 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaNodeServerService.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaNodeServerService.java @@ -5,6 +5,7 @@ import com.alibaba.fastjson2.JSONArray; import com.alibaba.fastjson2.JSONObject; import com.genersoft.iot.vmp.common.CommonCallback; import com.genersoft.iot.vmp.common.StreamInfo; +import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.conf.exception.ControllerException; import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem; import com.genersoft.iot.vmp.media.bean.MediaInfo; @@ -35,6 +36,9 @@ public class ZLMMediaNodeServerService implements IMediaNodeServerService { @Autowired private ZLMServerFactory zlmServerFactory; + @Autowired + private UserSetting userSetting; + @Override public int createRTPServer(MediaServer mediaServer, String streamId, long ssrc, Integer port, Boolean onlyAuto, Boolean disableAudio, Boolean reUsePort, Integer tcpMode) { return zlmServerFactory.createRTPServer(mediaServer, streamId, ssrc, port, onlyAuto, reUsePort, tcpMode); @@ -181,7 +185,7 @@ public class ZLMMediaNodeServerService implements IMediaNodeServerService { return null; } JSONObject mediaJSON = data.getJSONObject(0); - MediaInfo mediaInfo = MediaInfo.getInstance(mediaJSON, mediaServer); + MediaInfo mediaInfo = MediaInfo.getInstance(mediaJSON, mediaServer, userSetting.getServerId()); StreamInfo streamInfo = getStreamInfoByAppAndStream(mediaServer, app, stream, mediaInfo, callId, true); if (streamInfo != null) { streamInfoList.add(streamInfo); @@ -234,7 +238,7 @@ public class ZLMMediaNodeServerService implements IMediaNodeServerService { if (jsonObject.getInteger("code") != 0) { return null; } - return MediaInfo.getInstance(jsonObject, mediaServer); + return MediaInfo.getInstance(jsonObject, mediaServer, userSetting.getServerId()); } @Override diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcController.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcController.java index 280904d8..c624b1a7 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcController.java +++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcController.java @@ -166,7 +166,7 @@ public class RedisRpcController { /** * 监听流上线 */ - public RedisRpcResponse onPushStreamOnlineEvent(RedisRpcRequest request) { + public RedisRpcResponse onStreamOnlineEvent(RedisRpcRequest request) { StreamInfo streamInfo = JSONObject.parseObject(request.getParam().toString(), StreamInfo.class); log.info("[redis-rpc] 监听流上线: {}/{}", streamInfo.getApp(), streamInfo.getStream()); // 查询本级是否有这个流 diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/service/RedisRpcServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/service/RedisRpcServiceImpl.java index 46838e1f..0a748e3d 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/service/RedisRpcServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/service/RedisRpcServiceImpl.java @@ -158,11 +158,11 @@ public class RedisRpcServiceImpl implements IRedisRpcService { log.info("[请求所有WVP监听流上线] {}/{}", app, stream); // 监听流上线。 流上线直接发送sendRtpItem消息给实际的信令处理者 - Hook hook = Hook.getInstance(HookType.on_media_arrival, app, stream, null); + Hook hook = Hook.getInstance(HookType.on_media_arrival, app, stream); StreamInfo streamInfoParam = new StreamInfo(); streamInfoParam.setApp(app); streamInfoParam.setStream(stream); - RedisRpcRequest request = buildRequest("onPushStreamOnlineEvent", streamInfoParam); + RedisRpcRequest request = buildRequest("onStreamOnlineEvent", streamInfoParam); hookSubscribe.addSubscribe(hook, (hookData) -> { if (callback != null) { callback.run(mediaServerService.getStreamInfoByAppAndStream(hookData.getMediaServer(), diff --git a/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java b/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java index 2b70d6ec..2f0f839e 100755 --- a/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java @@ -5,9 +5,7 @@ import com.genersoft.iot.vmp.common.SystemAllInfo; import com.genersoft.iot.vmp.gb28181.bean.*; import com.genersoft.iot.vmp.media.bean.MediaInfo; import com.genersoft.iot.vmp.media.bean.MediaServer; -import com.genersoft.iot.vmp.media.event.media.MediaArrivalEvent; import com.genersoft.iot.vmp.media.zlm.dto.StreamAuthorityInfo; -import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam; import com.genersoft.iot.vmp.service.bean.GPSMsgInfo; import com.genersoft.iot.vmp.service.bean.MessageForPushChannel; import com.genersoft.iot.vmp.storager.dao.dto.PlatformRegisterInfo; @@ -214,9 +212,9 @@ public interface IRedisCatchStorage { void sendPlatformStopPlayMsg(SendRtpItem sendRtpItem, ParentPlatform platform); - void addPushListItem(String app, String stream, MediaArrivalEvent param); + void addPushListItem(String app, String stream, MediaInfo param); - MediaArrivalEvent getPushListItem(String app, String stream); + MediaInfo getPushListItem(String app, String stream); void removePushListItem(String app, String stream, String mediaServerId); diff --git a/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java b/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java index 9f5814bc..a049ef27 100755 --- a/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java @@ -10,9 +10,7 @@ import com.genersoft.iot.vmp.gb28181.dao.DeviceChannelMapper; import com.genersoft.iot.vmp.gb28181.dao.DeviceMapper; import com.genersoft.iot.vmp.media.bean.MediaInfo; import com.genersoft.iot.vmp.media.bean.MediaServer; -import com.genersoft.iot.vmp.media.event.media.MediaArrivalEvent; import com.genersoft.iot.vmp.media.zlm.dto.StreamAuthorityInfo; -import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam; import com.genersoft.iot.vmp.service.bean.GPSMsgInfo; import com.genersoft.iot.vmp.service.bean.MessageForPushChannel; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; @@ -690,22 +688,22 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage { } @Override - public void addPushListItem(String app, String stream, MediaArrivalEvent event) { + public void addPushListItem(String app, String stream, MediaInfo mediaInfo) { String key = VideoManagerConstants.PUSH_STREAM_LIST + app + "_" + stream; - redisTemplate.opsForValue().set(key, event); + redisTemplate.opsForValue().set(key, mediaInfo); } @Override - public MediaArrivalEvent getPushListItem(String app, String stream) { + public MediaInfo getPushListItem(String app, String stream) { String key = VideoManagerConstants.PUSH_STREAM_LIST + app + "_" + stream; - return (MediaArrivalEvent)redisTemplate.opsForValue().get(key); + return (MediaInfo)redisTemplate.opsForValue().get(key); } @Override public void removePushListItem(String app, String stream, String mediaServerId) { String key = VideoManagerConstants.PUSH_STREAM_LIST + app + "_" + stream; - OnStreamChangedHookParam param = (OnStreamChangedHookParam)redisTemplate.opsForValue().get(key); - if (param != null && param.getMediaServerId().equalsIgnoreCase(mediaServerId)) { + MediaInfo param = (MediaInfo)redisTemplate.opsForValue().get(key); + if (param != null && param.getMediaServer().getId().equalsIgnoreCase(mediaServerId)) { redisTemplate.delete(key); } } diff --git a/src/main/java/com/genersoft/iot/vmp/streamPush/dao/StreamPushMapper.java b/src/main/java/com/genersoft/iot/vmp/streamPush/dao/StreamPushMapper.java index ba5f09f1..d5aa4423 100755 --- a/src/main/java/com/genersoft/iot/vmp/streamPush/dao/StreamPushMapper.java +++ b/src/main/java/com/genersoft/iot/vmp/streamPush/dao/StreamPushMapper.java @@ -13,8 +13,8 @@ import java.util.Set; @Repository public interface StreamPushMapper { - @Insert("INSERT INTO wvp_stream_push (app, stream, media_server_id, server_id, push_time, update_time, create_time, pushing) VALUES" + - "(#{app}, #{stream}, #{mediaServerId} , #{serverId} , #{pushTime} ,#{updateTime}, #{createTime}, #{pushing})") + @Insert("INSERT INTO wvp_stream_push (app, stream, media_server_id, server_id, push_time, update_time, create_time, pushing, start_offline_push) VALUES" + + "(#{app}, #{stream}, #{mediaServerId} , #{serverId} , #{pushTime} ,#{updateTime}, #{createTime}, #{pushing}, #{startOfflinePush})") @Options(useGeneratedKeys = true, keyProperty = "id", keyColumn = "id") int add(StreamPush streamPushItem); @@ -28,6 +28,7 @@ public interface StreamPushMapper { ", server_id=#{serverId}" + ", push_time=#{pushTime}" + ", pushing=#{pushing}" + + ", start_offline_push=#{startOfflinePush}" + "WHERE id = #{id}"+ " "}) int update(StreamPush streamPushItem); @@ -61,9 +62,9 @@ public interface StreamPushMapper { @Insert("") @Options(useGeneratedKeys = true, keyProperty = "id", keyColumn = "id") @@ -136,4 +137,21 @@ public interface StreamPushMapper { ")") void batchDel(List streamPushList); + + @Update({""}) + int batchUpdate(List streamPushItemForUpdate); } diff --git a/src/main/java/com/genersoft/iot/vmp/streamPush/service/IStreamPushService.java b/src/main/java/com/genersoft/iot/vmp/streamPush/service/IStreamPushService.java index c4decef2..23dae58c 100755 --- a/src/main/java/com/genersoft/iot/vmp/streamPush/service/IStreamPushService.java +++ b/src/main/java/com/genersoft/iot/vmp/streamPush/service/IStreamPushService.java @@ -90,7 +90,7 @@ public interface IStreamPushService { void deleteByAppAndStream(String app, String stream); - void updatePushStatus(Integer streamPushId, boolean pushIng); + void updatePushStatus(StreamPush streamPush, boolean pushIng); void batchUpdate(List streamPushItemForUpdate); diff --git a/src/main/java/com/genersoft/iot/vmp/streamPush/service/impl/StreamPushPlayServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/streamPush/service/impl/StreamPushPlayServiceImpl.java index 03a579f5..7183dc30 100644 --- a/src/main/java/com/genersoft/iot/vmp/streamPush/service/impl/StreamPushPlayServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/streamPush/service/impl/StreamPushPlayServiceImpl.java @@ -4,7 +4,7 @@ import com.baomidou.dynamic.datasource.annotation.DS; import com.genersoft.iot.vmp.common.StreamInfo; import com.genersoft.iot.vmp.conf.DynamicTask; import com.genersoft.iot.vmp.conf.UserSetting; -import com.genersoft.iot.vmp.media.event.media.MediaArrivalEvent; +import com.genersoft.iot.vmp.media.bean.MediaInfo; import com.genersoft.iot.vmp.media.service.IMediaServerService; import com.genersoft.iot.vmp.media.zlm.dto.StreamAuthorityInfo; import com.genersoft.iot.vmp.service.bean.ErrorCallback; @@ -53,14 +53,14 @@ public class StreamPushPlayServiceImpl implements IStreamPushPlayService { public void start(Integer id, ErrorCallback callback, String platformDeviceId, String platformName ) { StreamPush streamPush = streamPushMapper.queryOne(id); Assert.notNull(streamPush, "推流信息未找到"); - MediaArrivalEvent pushListItem = redisCatchStorage.getPushListItem(streamPush.getApp(), streamPush.getStream()); - if (pushListItem != null) { + MediaInfo mediaInfo = redisCatchStorage.getPushListItem(streamPush.getApp(), streamPush.getStream()); + if (mediaInfo != null) { String callId = null; StreamAuthorityInfo streamAuthorityInfo = redisCatchStorage.getStreamAuthorityInfo(streamPush.getApp(), streamPush.getStream()); if (streamAuthorityInfo != null) { callId = streamAuthorityInfo.getCallId(); } - callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), mediaServerService.getStreamInfoByAppAndStream(pushListItem.getMediaServer(), + callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), mediaServerService.getStreamInfoByAppAndStream(mediaInfo.getMediaServer(), streamPush.getApp(), streamPush.getStream(), null, callId)); return; } @@ -83,7 +83,7 @@ public class StreamPushPlayServiceImpl implements IStreamPushPlayService { long key = redisRpcService.onStreamOnlineEvent(streamPush.getApp(), streamPush.getStream(), (streamInfo) -> { dynamicTask.stop(timeOutTaskKey); if (streamInfo == null) { - log.warn("[级联点播] 等待推流得到结果未空: {}/{}", streamPush.getApp(), streamPush.getStream()); + log.warn("等待推流得到结果未空: {}/{}", streamPush.getApp(), streamPush.getStream()); callback.run(ErrorCode.ERROR100.getCode(), "fail", null); }else { callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), streamInfo); diff --git a/src/main/java/com/genersoft/iot/vmp/streamPush/service/impl/StreamPushServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/streamPush/service/impl/StreamPushServiceImpl.java index 1f388145..c0e84cdc 100755 --- a/src/main/java/com/genersoft/iot/vmp/streamPush/service/impl/StreamPushServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/streamPush/service/impl/StreamPushServiceImpl.java @@ -24,7 +24,6 @@ import com.genersoft.iot.vmp.streamPush.service.IStreamPushService; import com.genersoft.iot.vmp.utils.DateUtil; import com.genersoft.iot.vmp.vmanager.bean.ErrorCode; import com.genersoft.iot.vmp.vmanager.bean.ResourceBaseInfo; -import com.genersoft.iot.vmp.vmanager.bean.StreamContent; import com.github.pagehelper.PageHelper; import com.github.pagehelper.PageInfo; import lombok.extern.slf4j.Slf4j; @@ -99,14 +98,11 @@ public class StreamPushServiceImpl implements IStreamPushService { streamPush.setPushTime(DateUtil.getNow()); add(streamPush); }else { - updatePushStatus(streamPushInDb.getId(), true); + updatePushStatus(streamPushInDb, true); } // 冗余数据,自己系统中自用 if (!"broadcast".equals(event.getApp()) && !"talk".equals(event.getApp())) { - StreamInfo streamInfo = mediaServerService.getStreamInfoByAppAndStream( - event.getMediaServer(), event.getApp(), event.getStream(), event.getMediaInfo(), event.getCallId()); - event.getHookParam().setStreamInfo(new StreamContent(streamInfo)); - redisCatchStorage.addPushListItem(event.getApp(), event.getStream(), event); + redisCatchStorage.addPushListItem(event.getApp(), event.getStream(), event.getMediaInfo()); } // 发送流变化redis消息 @@ -148,18 +144,12 @@ public class StreamPushServiceImpl implements IStreamPushService { redisCatchStorage.sendStreamChangeMsg(type, jsonObject); } } - StreamPush push = getPush(event.getApp(), event.getStream()); - if (push == null) { + StreamPush streamPush = getPush(event.getApp(), event.getStream()); + if (streamPush == null) { return; } - push.setPushing(false); - if (push.getGbDeviceId() != null) { - if (userSetting.isUsePushingAsStatus()) { - push.setGbStatus("OFF"); - updateStatus(push); -// streamPushMapper.updatePushStatus(event.getApp(), event.getStream(), false); -// eventPublisher.catalogEventPublishForStream(null, gbStream, CatalogEvent.OFF); - } + if (streamPush.getGbDeviceId() != null) { + updatePushStatus(streamPush, false); }else { deleteByAppAndStream(event.getApp(), event.getStream()); } @@ -524,27 +514,28 @@ public class StreamPushServiceImpl implements IStreamPushService { @Override public void updateStatus(StreamPush push) { - if (ObjectUtils.isEmpty(push.getGbDeviceId())) { - return; - } - if ("ON".equalsIgnoreCase(push.getGbStatus())) { - gbChannelService.online(push.buildCommonGBChannel()); - }else { - gbChannelService.offline(push.buildCommonGBChannel()); - } + } @Override - public void updatePushStatus(Integer streamPushId, boolean pushIng) { - StreamPush streamPushInDb = streamPushMapper.queryOne(streamPushId); - streamPushInDb.setPushing(pushIng); + @Transactional + public void updatePushStatus(StreamPush streamPush, boolean pushIng) { + streamPush.setPushing(pushIng); if (userSetting.isUsePushingAsStatus()) { - streamPushInDb.setGbStatus(pushIng?"ON":"OFF"); + streamPush.setGbStatus(pushIng?"ON":"OFF"); + } + streamPush.setPushTime(DateUtil.getNow()); + streamPushMapper.updatePushStatus(streamPush.getId(), pushIng); + if (ObjectUtils.isEmpty(streamPush.getGbDeviceId())) { + return; + } + if ("ON".equalsIgnoreCase(streamPush.getGbStatus())) { + gbChannelService.online(streamPush.buildCommonGBChannel()); + }else { + gbChannelService.offline(streamPush.buildCommonGBChannel()); } - streamPushInDb.setPushTime(DateUtil.getNow()); - updateStatus(streamPushInDb); } private List handleJSON(List streamInfoList) { @@ -570,7 +561,16 @@ public class StreamPushServiceImpl implements IStreamPushService { @Override public void batchUpdate(List streamPushItemForUpdate) { - + int result = streamPushMapper.batchUpdate(streamPushItemForUpdate); + if (result > 0) { + List commonGBChannels = new ArrayList<>(); + for (StreamPush streamPush : streamPushItemForUpdate) { + if (!ObjectUtils.isEmpty(streamPush.getGbDeviceId())) { + commonGBChannels.add(streamPush.buildCommonGBChannel()); + } + } + gbChannelService.batchUpdate(commonGBChannels); + } } @Override diff --git a/web_src/src/components/StreamPushEdit.vue b/web_src/src/components/StreamPushEdit.vue index 4fbdb0bd..d61a238f 100644 --- a/web_src/src/components/StreamPushEdit.vue +++ b/web_src/src/components/StreamPushEdit.vue @@ -26,7 +26,7 @@ 策略 - 拉起离线推流 + 拉起离线推流 diff --git a/web_src/src/components/StreamPushList.vue b/web_src/src/components/StreamPushList.vue index cc0830a4..4d54267d 100755 --- a/web_src/src/components/StreamPushList.vue +++ b/web_src/src/components/StreamPushList.vue @@ -52,8 +52,8 @@ diff --git a/数据库/2.7.2-重构/初始化-mysql-2.7.2.sql b/数据库/2.7.2-重构/初始化-mysql-2.7.2.sql index f0958401..89285a67 100644 --- a/数据库/2.7.2-重构/初始化-mysql-2.7.2.sql +++ b/数据库/2.7.2-重构/初始化-mysql-2.7.2.sql @@ -327,7 +327,7 @@ create table wvp_stream_push update_time character varying(50), pushing bool default false, self bool default false, - auto_push_channel bool default true, + start_offline_push bool default true, constraint uk_stream_push_app_stream unique (app, stream) ); create table wvp_cloud_record