diff --git a/sql/common.sql b/sql/common.sql index 97f176c4..5e364234 100644 --- a/sql/common.sql +++ b/sql/common.sql @@ -98,12 +98,36 @@ alter table wvp_platform alter table wvp_device_channel add common_gb_channel_id integer; +alter table wvp_stream_push + add name varchar(255) default NULL; + alter table wvp_stream_push add common_gb_channel_id integer; alter table wvp_stream_proxy add common_gb_channel_id integer; +alter table wvp_stream_push + drop column origin_type; + +alter table wvp_stream_push + drop column origin_type_str; + +alter table wvp_stream_push + add gb_id varchar(50) default NULL; + +alter table wvp_stream_push + add longitude double precision; + +alter table wvp_stream_push + add latitude double precision; + +alter table wvp_stream_push + add status bool default false; + + + + diff --git a/sql/初始化.sql b/sql/初始化.sql index 48ac612e..608d5d0d 100644 --- a/sql/初始化.sql +++ b/sql/初始化.sql @@ -254,11 +254,10 @@ create table wvp_stream_proxy ( create table wvp_stream_push ( id serial primary key, + name character varying(255) default NULL, app character varying(255), stream character varying(255), total_reader_count character varying(50), - origin_type integer, - origin_type_str character varying(50), create_time character varying(50), alive_second integer, media_server_id character varying(50), diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java index a4f3b5e7..29ea3dd0 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java @@ -160,7 +160,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements PlatformCatalog catalog = storager.getCatalog(requesterId, channelId); MediaServerItem mediaServerItem = null; - StreamPushItem streamPushItem = null; + StreamPush streamPushItem = null; StreamProxyItem proxyByAppAndStream =null; // 不是通道可能是直播流 if (channel != null && gbStream == null) { @@ -624,7 +624,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements } } - private void pushStream(RequestEvent evt, SIPRequest request, GbStream gbStream, StreamPushItem streamPushItem, ParentPlatform platform, + private void pushStream(RequestEvent evt, SIPRequest request, GbStream gbStream, StreamPush streamPushItem, ParentPlatform platform, CallIdHeader callIdHeader, MediaServerItem mediaServerItem, int port, Boolean tcpActive, boolean mediaTransmissionTCP, String channelId, String addressStr, String ssrc, String requesterId) { @@ -676,7 +676,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements /** * 通知流上线 */ - private void notifyStreamOnline(RequestEvent evt, SIPRequest request, GbStream gbStream, StreamPushItem streamPushItem, ParentPlatform platform, + private void notifyStreamOnline(RequestEvent evt, SIPRequest request, GbStream gbStream, StreamPush streamPushItem, ParentPlatform platform, CallIdHeader callIdHeader, MediaServerItem mediaServerItem, int port, Boolean tcpActive, boolean mediaTransmissionTCP, String channelId, String addressStr, String ssrc, String requesterId) { @@ -804,7 +804,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements /** * 来自其他wvp的推流 */ - private void otherWvpPushStream(RequestEvent evt, SIPRequest request, GbStream gbStream, StreamPushItem streamPushItem, ParentPlatform platform, + private void otherWvpPushStream(RequestEvent evt, SIPRequest request, GbStream gbStream, StreamPush streamPushItem, ParentPlatform platform, CallIdHeader callIdHeader, MediaServerItem mediaServerItem, int port, Boolean tcpActive, boolean mediaTransmissionTCP, String channelId, String addressStr, String ssrc, String requesterId) { @@ -848,7 +848,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements if (wvpResult.getCode() == RedisGbPlayMsgListener.ERROR_CODE_OFFLINE) { // 离线 // 查询是否在本机上线了 - StreamPushItem currentStreamPushItem = streamPushService.getPush(streamPushItem.getApp(), streamPushItem.getStream()); + StreamPush currentStreamPushItem = streamPushService.getPush(streamPushItem.getApp(), streamPushItem.getStream()); if (currentStreamPushItem.isPushIng()) { // 在线状态 pushStream(evt, request, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive, diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaListManager.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaListManager.java index cbc5fde6..2b93e637 100755 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaListManager.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaListManager.java @@ -68,9 +68,9 @@ public class ZLMMediaListManager { private Map channelOnPublishEvents = new ConcurrentHashMap<>(); - public StreamPushItem addPush(OnStreamChangedHookParam onStreamChangedHookParam) { - StreamPushItem transform = streamPushService.transform(onStreamChangedHookParam); - StreamPushItem pushInDb = streamPushService.getPush(onStreamChangedHookParam.getApp(), onStreamChangedHookParam.getStream()); + public StreamPush addPush(OnStreamChangedHookParam onStreamChangedHookParam) { + StreamPush transform = streamPushService.transform(onStreamChangedHookParam); + StreamPush pushInDb = streamPushService.getPush(onStreamChangedHookParam.getApp(), onStreamChangedHookParam.getStream()); transform.setPushIng(onStreamChangedHookParam.isRegist()); transform.setUpdateTime(DateUtil.getNow()); transform.setPushTime(DateUtil.getNow()); diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/StreamPushItem.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/StreamPush.java similarity index 60% rename from src/main/java/com/genersoft/iot/vmp/media/zlm/dto/StreamPushItem.java rename to src/main/java/com/genersoft/iot/vmp/media/zlm/dto/StreamPush.java index 6af06b59..e4e4d99c 100755 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/StreamPushItem.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/StreamPush.java @@ -1,15 +1,12 @@ package com.genersoft.iot.vmp.media.zlm.dto; -import com.genersoft.iot.vmp.gb28181.bean.GbStream; -import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam; import com.genersoft.iot.vmp.utils.DateUtil; import io.swagger.v3.oas.annotations.media.Schema; import org.jetbrains.annotations.NotNull; -import java.util.List; @Schema(description = "推流信息") -public class StreamPushItem implements Comparable{ +public class StreamPush implements Comparable{ /** * id @@ -17,6 +14,9 @@ public class StreamPushItem implements Comparable{ @Schema(description = "id") private Integer id; + @Schema(description = "名称") + private String name; + /** * 应用名 */ @@ -35,43 +35,6 @@ public class StreamPushItem implements Comparable{ @Schema(description = "观看总人数") private String totalReaderCount; - /** - * 协议 包括hls/rtsp/rtmp/http-flv/ws-flv - */ - @Schema(description = "协议 包括hls/rtsp/rtmp/http-flv/ws-flv") - private List schemas; - - /** - * 产生源类型, - * unknown = 0, - * rtmp_push=1, - * rtsp_push=2, - * rtp_push=3, - * pull=4, - * ffmpeg_pull=5, - * mp4_vod=6, - * device_chn=7 - */ - @Schema(description = "产生源类型") - private int originType; - - /** - * 客户端和服务器网络信息,可能为null类型 - */ - @Schema(description = "客户端和服务器网络信息,可能为null类型") - private OnStreamChangedHookParam.OriginSock originSock; - - /** - * 产生源类型的字符串描述 - */ - @Schema(description = "产生源类型的字符串描述") - private String originTypeStr; - - /** - * 产生源的url - */ - @Schema(description = "产生源的url") - private String originUrl; /** * 存活时间,单位秒 @@ -79,12 +42,6 @@ public class StreamPushItem implements Comparable{ @Schema(description = "存活时间,单位秒") private Long aliveSecond; - /** - * 音视频轨道 - */ - @Schema(description = "音视频轨道") - private List tracks; - /** * 音视频轨道 */ @@ -139,35 +96,25 @@ public class StreamPushItem implements Comparable{ @Schema(description = "国标通用信息ID") private int commonGbChannelId; + @Schema(description = "国标ID") + private String gbId; + + @Schema(description = "经度") + private double longitude; + + @Schema(description = "纬度") + private double latitude; + + @Schema(description = "状态") + private boolean status; @Override - public int compareTo(@NotNull StreamPushItem streamPushItem) { + public int compareTo(@NotNull StreamPush streamPushItem) { return Long.valueOf(DateUtil.yyyy_MM_dd_HH_mm_ssToTimestamp(this.createTime) - DateUtil.yyyy_MM_dd_HH_mm_ssToTimestamp(streamPushItem.getCreateTime())).intValue(); } - public static class MediaSchema { - private String schema; - private Long bytesSpeed; - - public String getSchema() { - return schema; - } - - public void setSchema(String schema) { - this.schema = schema; - } - - public Long getBytesSpeed() { - return bytesSpeed; - } - - public void setBytesSpeed(Long bytesSpeed) { - this.bytesSpeed = bytesSpeed; - } - } - public Integer getId() { return id; } @@ -176,6 +123,14 @@ public class StreamPushItem implements Comparable{ this.id = id; } + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + public String getApp() { return app; } @@ -200,46 +155,6 @@ public class StreamPushItem implements Comparable{ this.totalReaderCount = totalReaderCount; } - public List getSchemas() { - return schemas; - } - - public void setSchemas(List schemas) { - this.schemas = schemas; - } - - public int getOriginType() { - return originType; - } - - public void setOriginType(int originType) { - this.originType = originType; - } - - public OnStreamChangedHookParam.OriginSock getOriginSock() { - return originSock; - } - - public void setOriginSock(OnStreamChangedHookParam.OriginSock originSock) { - this.originSock = originSock; - } - - public String getOriginTypeStr() { - return originTypeStr; - } - - public void setOriginTypeStr(String originTypeStr) { - this.originTypeStr = originTypeStr; - } - - public String getOriginUrl() { - return originUrl; - } - - public void setOriginUrl(String originUrl) { - this.originUrl = originUrl; - } - public Long getAliveSecond() { return aliveSecond; } @@ -248,14 +163,6 @@ public class StreamPushItem implements Comparable{ this.aliveSecond = aliveSecond; } - public List getTracks() { - return tracks; - } - - public void setTracks(List tracks) { - this.tracks = tracks; - } - public String getVhost() { return vhost; } @@ -327,5 +234,37 @@ public class StreamPushItem implements Comparable{ public void setCommonGbChannelId(int commonGbChannelId) { this.commonGbChannelId = commonGbChannelId; } + + public String getGbId() { + return gbId; + } + + public void setGbId(String gbId) { + this.gbId = gbId; + } + + public double getLongitude() { + return longitude; + } + + public void setLongitude(double longitude) { + this.longitude = longitude; + } + + public double getLatitude() { + return latitude; + } + + public void setLatitude(double latitude) { + this.latitude = latitude; + } + + public boolean isStatus() { + return status; + } + + public void setStatus(boolean status) { + this.status = status; + } } diff --git a/src/main/java/com/genersoft/iot/vmp/service/IStreamPushService.java b/src/main/java/com/genersoft/iot/vmp/service/IStreamPushService.java index 72ff8514..329c6512 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/IStreamPushService.java +++ b/src/main/java/com/genersoft/iot/vmp/service/IStreamPushService.java @@ -4,7 +4,7 @@ import com.genersoft.iot.vmp.common.CommonGbChannel; import com.genersoft.iot.vmp.gb28181.bean.GbStream; import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; -import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem; +import com.genersoft.iot.vmp.media.zlm.dto.StreamPush; import com.genersoft.iot.vmp.service.bean.StreamPushItemFromRedis; import com.genersoft.iot.vmp.vmanager.bean.ResourceBaseInfo; import com.github.pagehelper.PageInfo; @@ -17,32 +17,18 @@ import java.util.Map; */ public interface IStreamPushService { - List handleJSON(String json, MediaServerItem mediaServerItem); - - /** - * 将应用名和流ID加入国标关联 - * @param stream - * @return - */ - boolean saveToGB(GbStream stream); - - /** - * 将应用名和流ID移出国标关联 - * @param stream - * @return - */ - boolean removeFromGB(GbStream stream); + List handleJSON(String json, MediaServerItem mediaServerItem); /** * 获取 */ - PageInfo getPushList(Integer page, Integer count, String query, Boolean pushing, String mediaServerId); + PageInfo getPushList(Integer page, Integer count, String query, Boolean pushing, String mediaServerId); - List getPushList(String mediaSererId); + List getPushList(String mediaSererId); - StreamPushItem transform(OnStreamChangedHookParam item); + StreamPush transform(OnStreamChangedHookParam item); - StreamPushItem getPush(String app, String streamId); + StreamPush getPush(String app, String streamId); /** * 停止一路推流 @@ -66,13 +52,10 @@ public interface IStreamPushService { */ void clean(); - - boolean saveToRandomGB(); - /** * 批量添加 */ - void batchAdd(List streamPushExcelDtoList); + void batchAdd(List streamPushExcelDtoList); /** * 中止多个推流 @@ -82,7 +65,7 @@ public interface IStreamPushService { /** * 导入时批量增加 */ - void batchAddForUpload(List streamPushItems, Map> streamPushItemsForAll); + void batchAddForUpload(List streamPushItems, Map> streamPushItemsForAll); /** * 全部离线 @@ -102,17 +85,19 @@ public interface IStreamPushService { /** * 增加推流 */ - boolean add(StreamPushItem stream, CommonGbChannel commonGbChannel); + boolean add(StreamPush stream, CommonGbChannel commonGbChannel); /** * 获取全部的app+Streanm 用于判断推流列表是新增还是修改 * @return */ - List getAllAppAndStream(); + Map getAllAppAndStream(); /** * 获取统计信息 * @return */ ResourceBaseInfo getOverview(); + + void batchUpdate(List streamPushItemForUpdate); } diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java index 5e42ea04..96fa16c9 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java @@ -1,7 +1,6 @@ package com.genersoft.iot.vmp.service.impl; import com.alibaba.fastjson2.JSON; -import com.alibaba.fastjson2.JSONArray; import com.alibaba.fastjson2.JSONObject; import com.alibaba.fastjson2.TypeReference; import com.genersoft.iot.vmp.common.CommonGbChannel; @@ -31,7 +30,6 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.jdbc.datasource.DataSourceTransactionManager; import org.springframework.stereotype.Service; import org.springframework.transaction.TransactionDefinition; -import org.springframework.transaction.TransactionStatus; import org.springframework.transaction.annotation.Transactional; import org.springframework.util.ObjectUtils; @@ -78,12 +76,12 @@ public class StreamPushServiceImpl implements IStreamPushService { @Override - public List handleJSON(String jsonData, MediaServerItem mediaServerItem) { + public List handleJSON(String jsonData, MediaServerItem mediaServerItem) { if (jsonData == null) { return null; } - Map result = new HashMap<>(); + Map result = new HashMap<>(); List onStreamChangedHookParams = JSON.parseObject(jsonData, new TypeReference>() {}); for (OnStreamChangedHookParam item : onStreamChangedHookParams) { @@ -93,7 +91,7 @@ public class StreamPushServiceImpl implements IStreamPushService { || item.getOriginType() == OriginType.RTMP_PUSH.ordinal() || item.getOriginType() == OriginType.RTC_PUSH.ordinal() ) { String key = item.getApp() + "_" + item.getStream(); - StreamPushItem streamPushItem = result.get(key); + StreamPush streamPushItem = result.get(key); if (streamPushItem == null) { streamPushItem = transform(item); result.put(key, streamPushItem); @@ -104,17 +102,13 @@ public class StreamPushServiceImpl implements IStreamPushService { return new ArrayList<>(result.values()); } @Override - public StreamPushItem transform(OnStreamChangedHookParam item) { - StreamPushItem streamPushItem = new StreamPushItem(); + public StreamPush transform(OnStreamChangedHookParam item) { + StreamPush streamPushItem = new StreamPush(); streamPushItem.setApp(item.getApp()); streamPushItem.setMediaServerId(item.getMediaServerId()); streamPushItem.setStream(item.getStream()); streamPushItem.setAliveSecond(item.getAliveSecond()); - streamPushItem.setOriginSock(item.getOriginSock()); streamPushItem.setTotalReaderCount(item.getTotalReaderCount()); - streamPushItem.setOriginType(item.getOriginType()); - streamPushItem.setOriginTypeStr(item.getOriginTypeStr()); - streamPushItem.setOriginUrl(item.getOriginUrl()); streamPushItem.setCreateTime(DateUtil.getNow()); streamPushItem.setAliveSecond(item.getAliveSecond()); streamPushItem.setVhost(item.getVhost()); @@ -123,37 +117,36 @@ public class StreamPushServiceImpl implements IStreamPushService { } @Override - public PageInfo getPushList(Integer page, Integer count, String query, Boolean pushing, String mediaServerId) { + public PageInfo getPushList(Integer page, Integer count, String query, Boolean pushing, String mediaServerId) { PageHelper.startPage(page, count); - List all = streamPushMapper.selectAllForList(query, pushing, mediaServerId); + List all = streamPushMapper.selectAllForList(query, pushing, mediaServerId); return new PageInfo<>(all); } @Override - public List getPushList(String mediaServerId) { + public List getPushList(String mediaServerId) { return streamPushMapper.selectAllByMediaServerIdWithOutGbID(mediaServerId); } @Override - public StreamPushItem getPush(String app, String streamId) { + public StreamPush getPush(String app, String streamId) { return streamPushMapper.selectOne(app, streamId); } @Override public boolean stop(String app, String streamId) { - logger.info("[推流 ] 停止流: {}/{}", app, streamId); - StreamPushItem streamPushItem = streamPushMapper.selectOne(app, streamId); - if (streamPushItem != null) { - gbStreamService.sendCatalogMsg(streamPushItem, CatalogEvent.DEL); - } + logger.info("[停止推流 ] {}/{}", app, streamId); - platformGbStreamMapper.delByAppAndStream(app, streamId); - gbStreamMapper.del(app, streamId); - int delStream = streamPushMapper.del(app, streamId); - if (delStream > 0) { - MediaServerItem mediaServerItem = mediaServerService.getOne(streamPushItem.getMediaServerId()); - zlmresTfulUtils.closeStreams(mediaServerItem,app, streamId); + StreamPush streamPushItem = streamPushMapper.selectOne(app, streamId); + if (streamPushItem == null) { + logger.info("[停止推流] 不存在 {}/{} ", app, streamId); + return false; } + if (streamPushItem.getCommonGbChannelId() == 0) { + streamPushMapper.del(app, streamId); + } + MediaServerItem mediaServerItem = mediaServerService.getOne(streamPushItem.getMediaServerId()); + zlmresTfulUtils.closeStreams(mediaServerItem,app, streamId); return true; } @@ -165,14 +158,14 @@ public class StreamPushServiceImpl implements IStreamPushService { return; } // 数据库记录 - List pushList = getPushList(mediaServerId); - Map pushItemMap = new HashMap<>(); + List pushList = getPushList(mediaServerId); + Map pushItemMap = new HashMap<>(); // redis记录 List onStreamChangedHookParams = redisCatchStorage.getStreams(mediaServerId, "PUSH"); Map streamInfoPushItemMap = new HashMap<>(); if (pushList.size() > 0) { - for (StreamPushItem streamPushItem : pushList) { - if (ObjectUtils.isEmpty(streamPushItem.getGbId())) { + for (StreamPush streamPushItem : pushList) { + if (streamPushItem.getCommonGbChannelId() > 0) { pushItemMap.put(streamPushItem.getApp() + streamPushItem.getStream(), streamPushItem); } } @@ -195,7 +188,7 @@ public class StreamPushServiceImpl implements IStreamPushService { String dataStr = mediaList.getString("data"); Integer code = mediaList.getInteger("code"); - List streamPushItems = null; + List streamPushItems = null; if (code == 0 ) { if (dataStr != null) { streamPushItems = handleJSON(dataStr, mediaServerItem); @@ -203,13 +196,13 @@ public class StreamPushServiceImpl implements IStreamPushService { } if (streamPushItems != null) { - for (StreamPushItem streamPushItem : streamPushItems) { + for (StreamPush streamPushItem : streamPushItems) { pushItemMap.remove(streamPushItem.getApp() + streamPushItem.getStream()); streamInfoPushItemMap.remove(streamPushItem.getApp() + streamPushItem.getStream()); streamAuthorityInfoInfoMap.remove(streamPushItem.getApp() + streamPushItem.getStream()); } } - List offlinePushItems = new ArrayList<>(pushItemMap.values()); + List offlinePushItems = new ArrayList<>(pushItemMap.values()); if (offlinePushItems.size() > 0) { String type = "PUSH"; int runLimit = 300; @@ -219,7 +212,7 @@ public class StreamPushServiceImpl implements IStreamPushService { if (i + runLimit > offlinePushItems.size()) { toIndex = offlinePushItems.size(); } - List streamPushItemsSub = offlinePushItems.subList(i, toIndex); + List streamPushItemsSub = offlinePushItems.subList(i, toIndex); streamPushMapper.delAll(streamPushItemsSub); } }else { @@ -255,10 +248,9 @@ public class StreamPushServiceImpl implements IStreamPushService { @Override public void zlmServerOffline(String mediaServerId) { - List streamPushItems = streamPushMapper.selectAllByMediaServerIdWithOutGbID(mediaServerId); + List streamPushItems = streamPushMapper.selectAllByMediaServerIdWithOutGbID(mediaServerId); // 移除没有GBId的推流 streamPushMapper.deleteWithoutGBId(mediaServerId); - gbStreamMapper.deleteWithoutGBId("push", mediaServerId); // 其他的流设置未启用 streamPushMapper.updateStatusByMediaServerId(mediaServerId, false); streamProxyMapper.updateStatusByMediaServerId(mediaServerId, false); @@ -287,44 +279,25 @@ public class StreamPushServiceImpl implements IStreamPushService { } @Override - public boolean saveToRandomGB() { - List streamPushItems = streamPushMapper.selectAll(); - long gbId = 100001; - for (StreamPushItem streamPushItem : streamPushItems) { - streamPushItem.setStreamType("push"); - streamPushItem.setStatus(true); - streamPushItem.setGbId("34020000004111" + gbId); - streamPushItem.setCreateTime(DateUtil.getNow()); - gbId ++; - } - int limitCount = 30; + @Transactional + public void batchAdd(List streamPushItems) { + // 把存在国标Id的写入同步资源库 - if (streamPushItems.size() > limitCount) { - for (int i = 0; i < streamPushItems.size(); i += limitCount) { - int toIndex = i + limitCount; - if (i + limitCount > streamPushItems.size()) { - toIndex = streamPushItems.size(); - } - gbStreamMapper.batchAdd(streamPushItems.subList(i, toIndex)); - } - }else { - gbStreamMapper.batchAdd(streamPushItems); - } - return true; - } - - @Override - public void batchAdd(List streamPushItems) { streamPushMapper.addAll(streamPushItems); - gbStreamMapper.batchAdd(streamPushItems); + + } + @Override + public void batchUpdate(List streamPushItemForUpdate) { + + } @Override - public void batchAddForUpload(List streamPushItems, Map> streamPushItemsForAll ) { + public void batchAddForUpload(List streamPushItems, Map> streamPushItemsForAll ) { // 存储数据到stream_push表 streamPushMapper.addAll(streamPushItems); - List streamPushItemForGbStream = streamPushItems.stream() + List streamPushItemForGbStream = streamPushItems.stream() .filter(streamPushItem-> streamPushItem.getGbId() != null) .collect(Collectors.toList()); // 存储数据到gb_stream表, id会返回到streamPushItemForGbStream里 @@ -332,7 +305,7 @@ public class StreamPushServiceImpl implements IStreamPushService { gbStreamMapper.batchAdd(streamPushItemForGbStream); } // 去除没有ID也就是没有存储到数据库的数据 - List streamPushItemsForPlatform = streamPushItemForGbStream.stream() + List streamPushItemsForPlatform = streamPushItemForGbStream.stream() .filter(streamPushItem-> streamPushItem.getGbStreamId() != null) .collect(Collectors.toList()); @@ -360,14 +333,14 @@ public class StreamPushServiceImpl implements IStreamPushService { } platformInfoMap.put(platform.getServerGBId(), catalogMap); } - List streamPushItemListFroPlatform = new ArrayList<>(); + List streamPushItemListFroPlatform = new ArrayList<>(); Map> platformForEvent = new HashMap<>(); // 遍历存储结果,查找app+Stream->platformId+catalogId的对应关系,然后执行批量写入 - for (StreamPushItem streamPushItem : streamPushItemsForPlatform) { + for (StreamPush streamPushItem : streamPushItemsForPlatform) { List platFormInfoList = streamPushItemsForAll.get(streamPushItem.getApp() + streamPushItem.getStream()); if (platFormInfoList != null && platFormInfoList.size() > 0) { for (String[] platFormInfoArray : platFormInfoList) { - StreamPushItem streamPushItemForPlatform = new StreamPushItem(); + StreamPush streamPushItemForPlatform = new StreamPush(); streamPushItemForPlatform.setGbStreamId(streamPushItem.getGbStreamId()); if (platFormInfoArray.length > 0) { // 数组 platFormInfoArray 0 为平台ID。 1为目录ID @@ -420,11 +393,12 @@ public class StreamPushServiceImpl implements IStreamPushService { MediaServerItem mediaServerItem = mediaServerService.getOne(gbStream.getMediaServerId()); zlmresTfulUtils.closeStreams(mediaServerItem, gbStream.getApp(), gbStream.getStream()); } - } return true; } + + @Override public void allStreamOffline() { List onlinePushers = streamPushMapper.getOnlinePusherForGb(); @@ -457,7 +431,7 @@ public class StreamPushServiceImpl implements IStreamPushService { @Override @Transactional - public boolean add(StreamPushItem stream, CommonGbChannel commonGbChannel) { + public boolean add(StreamPush stream, CommonGbChannel commonGbChannel) { assert !ObjectUtils.isEmpty(commonGbChannel.getCommonGbDeviceID()); assert !ObjectUtils.isEmpty(commonGbChannel.getCommonGbName()); String now = DateUtil.getNow(); @@ -478,8 +452,7 @@ public class StreamPushServiceImpl implements IStreamPushService { } @Override - public List getAllAppAndStream() { - + public Map getAllAppAndStream() { return streamPushMapper.getAllAppAndStream(); } diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushUploadFileHandler.java b/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushUploadFileHandler.java index a1271593..5d9a219c 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushUploadFileHandler.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushUploadFileHandler.java @@ -2,7 +2,7 @@ package com.genersoft.iot.vmp.service.impl; import com.alibaba.excel.context.AnalysisContext; import com.alibaba.excel.event.AnalysisEventListener; -import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem; +import com.genersoft.iot.vmp.media.zlm.dto.StreamPush; import com.genersoft.iot.vmp.service.IStreamPushService; import com.genersoft.iot.vmp.utils.DateUtil; import com.genersoft.iot.vmp.vmanager.bean.StreamPushExcelDto; @@ -32,12 +32,12 @@ public class StreamPushUploadFileHandler extends AnalysisEventListener streamPushItems = new ArrayList<>(); + private final List streamPushItems = new ArrayList<>(); /** * 用于存储更具APP+Stream过滤后的数据,可以直接存入stream_push表与gb_stream表 */ - private final Map streamPushItemForSave = new HashMap<>(); + private final Map streamPushItemForSave = new HashMap<>(); /** * 用于存储按照APP+Stream为KEY, 平台ID+目录Id 为value的数据,用于存储到gb_stream表后获取app+Stream对应的平台与目录信息,然后存入关联表 @@ -80,9 +80,9 @@ public class StreamPushUploadFileHandler extends AnalysisEventListener allAppAndStreams = pushService.getAllAppAndStream(); + Map allAppAndStreams = pushService.getAllAppAndStream(); if (allAppAndStreams.size() > 0) { - for (String allAppAndStream : allAppAndStreams) { + for (String allAppAndStream : allAppAndStreams.keySet()) { pushMapInDb.put(allAppAndStream, allAppAndStream); } } @@ -126,18 +126,16 @@ public class StreamPushUploadFileHandler extends AnalysisEventListener sendRtpItems = redisCatchStorage.querySendRTPServerByChnnelId( diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamStatusListMsgListener.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamStatusListMsgListener.java index cb34ff59..3209bb74 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamStatusListMsgListener.java +++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamStatusListMsgListener.java @@ -2,10 +2,11 @@ package com.genersoft.iot.vmp.service.redisMsg; import com.alibaba.fastjson2.JSON; import com.alibaba.fastjson2.JSONObject; -import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem; +import com.genersoft.iot.vmp.media.zlm.dto.StreamPush; import com.genersoft.iot.vmp.service.IGbStreamService; import com.genersoft.iot.vmp.service.IMediaServerService; import com.genersoft.iot.vmp.service.IStreamPushService; +import com.genersoft.iot.vmp.service.dto.StreamPushDto; import com.genersoft.iot.vmp.utils.DateUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -19,6 +20,7 @@ import org.springframework.stereotype.Component; import javax.annotation.Resource; import java.util.ArrayList; import java.util.List; +import java.util.Map; import java.util.concurrent.ConcurrentLinkedQueue; /** @@ -35,8 +37,6 @@ public class RedisPushStreamStatusListMsgListener implements MessageListener { @Resource private IStreamPushService streamPushService; - @Resource - private IGbStreamService gbStreamService; private ConcurrentLinkedQueue taskQueue = new ConcurrentLinkedQueue<>(); @@ -55,44 +55,43 @@ public class RedisPushStreamStatusListMsgListener implements MessageListener { while (!taskQueue.isEmpty()) { Message msg = taskQueue.poll(); try { - List streamPushItems = JSON.parseArray(new String(msg.getBody()), StreamPushItem.class); + List streamPushList = JSON.parseArray(new String(msg.getBody()), StreamPush.class); //查询全部的app+stream 用于判断是添加还是修改 - List allAppAndStream = streamPushService.getAllAppAndStream(); + Map allAppAndStream = streamPushService.getAllAppAndStream(); /** * 用于存储更具APP+Stream过滤后的数据,可以直接存入stream_push表与gb_stream表 */ - List streamPushItemForSave = new ArrayList<>(); - List streamPushItemForUpdate = new ArrayList<>(); - for (StreamPushItem streamPushItem : streamPushItems) { - String app = streamPushItem.getApp(); - String stream = streamPushItem.getStream(); - boolean contains = allAppAndStream.contains(app + stream); + List streamPushItemForSave = new ArrayList<>(); + List streamPushItemForUpdate = new ArrayList<>(); + for (StreamPush streamPush : streamPushList) { + + streamPush.setUpdateTime(DateUtil.getNow()); //不存在就添加 - if (!contains) { - streamPushItem.setStreamType("push"); - streamPushItem.setCreateTime(DateUtil.getNow()); - streamPushItem.setMediaServerId(mediaServerService.getDefaultMediaServer().getId()); - streamPushItem.setOriginType(2); - streamPushItem.setOriginTypeStr("rtsp_push"); - streamPushItem.setTotalReaderCount("0"); - streamPushItemForSave.add(streamPushItem); + if (!allAppAndStream.containsKey(streamPush.getApp() + streamPush.getStream())) { + streamPush.setCreateTime(DateUtil.getNow()); + streamPush.setMediaServerId(mediaServerService.getDefaultMediaServer().getId()); + streamPushItemForSave.add(streamPush); } else { - //存在就只修改 name和gbId - streamPushItemForUpdate.add(streamPushItem); + StreamPush streamPushBoInDB = allAppAndStream.get(streamPush.getApp() + streamPush.getStream()); + // 涉及可以变化的内容为名称,国标Id, 状态 + if (!streamPush.getName().equals(streamPushBoInDB.getName()) + || !streamPush.getGbId().equals(streamPushBoInDB.getGbId()) + || !streamPush.isStatus() == streamPushBoInDB.isStatus()) { + streamPushItemForUpdate.add(streamPush); + } } } - if (streamPushItemForSave.size() > 0) { - + if (!streamPushItemForSave.isEmpty()) { logger.info("添加{}条",streamPushItemForSave.size()); logger.info(JSONObject.toJSONString(streamPushItemForSave)); streamPushService.batchAdd(streamPushItemForSave); } - if(streamPushItemForUpdate.size()>0){ + if(!streamPushItemForUpdate.isEmpty()){ logger.info("修改{}条",streamPushItemForUpdate.size()); logger.info(JSONObject.toJSONString(streamPushItemForUpdate)); - gbStreamService.updateGbIdOrName(streamPushItemForUpdate); + streamPushService.batchUpdate(streamPushItemForUpdate); } }catch (Exception e) { logger.warn("[REDIS消息-推流设备列表更新] 发现未处理的异常, \r\n{}", JSON.toJSONString(message)); diff --git a/src/main/java/com/genersoft/iot/vmp/storager/dao/StreamPushMapper.java b/src/main/java/com/genersoft/iot/vmp/storager/dao/StreamPushMapper.java index be0924ab..eac48fb4 100755 --- a/src/main/java/com/genersoft/iot/vmp/storager/dao/StreamPushMapper.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/dao/StreamPushMapper.java @@ -1,12 +1,13 @@ package com.genersoft.iot.vmp.storager.dao; import com.genersoft.iot.vmp.gb28181.bean.GbStream; -import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem; +import com.genersoft.iot.vmp.media.zlm.dto.StreamPush; import com.genersoft.iot.vmp.service.bean.StreamPushItemFromRedis; import org.apache.ibatis.annotations.*; import org.springframework.stereotype.Repository; import java.util.List; +import java.util.Map; @Mapper @Repository @@ -17,7 +18,7 @@ public interface StreamPushMapper { "(#{app}, #{stream}, #{totalReaderCount}, #{originType}, #{originTypeStr}, " + "#{pushTime}, #{aliveSecond}, #{mediaServerId} , #{updateTime} , #{createTime}, " + "#{pushIng}, #{self} )") - int add(StreamPushItem streamPushItem); + int add(StreamPush streamPushItem); @Update(value = {" "}) - int update(StreamPushItem streamPushItem); + int update(StreamPush streamPushItem); @Delete("DELETE FROM wvp_stream_push WHERE app=#{app} AND stream=#{stream}") int del(String app, String stream); @@ -44,7 +45,7 @@ public interface StreamPushMapper { "(sp.app=#{item.app} and sp.stream=#{item.stream} and gs.gb_id is null) " + "" + "") - int delAllWithoutGBId(List streamPushItems); + int delAllWithoutGBId(List streamPushItems); @Delete("") - int delAll(List streamPushItems); + int delAll(List streamPushItems); @Delete(""}) - List selectAllForList(@Param("query") String query, @Param("pushing") Boolean pushing, @Param("mediaServerId") String mediaServerId); + List selectAllForList(@Param("query") String query, @Param("pushing") Boolean pushing, @Param("mediaServerId") String mediaServerId); @Select("SELECT st.*, gs.gb_id, gs.name, gs.longitude, gs.latitude FROM wvp_stream_push st LEFT join wvp_gb_stream gs on st.app = gs.app AND st.stream = gs.stream order by st.create_time desc") - List selectAll(); + List selectAll(); @Select("SELECT st.*, gs.gb_id, gs.name, gs.longitude, gs.latitude FROM wvp_stream_push st LEFT join wvp_gb_stream gs on st.app = gs.app AND st.stream = gs.stream WHERE st.app=#{app} AND st.stream=#{stream}") - StreamPushItem selectOne(@Param("app") String app, @Param("stream") String stream); + StreamPush selectOne(@Param("app") String app, @Param("stream") String stream); @Insert("") @Options(useGeneratedKeys = true, keyProperty = "id", keyColumn = "id") - int addAll(List streamPushItems); + int addAll(List streamPushItems); @Delete("DELETE FROM wvp_stream_push") void clear(); - @Delete("delete" + + @Delete( + "delete" + " from wvp_stream_push " + - " where id in " + - " (select temp.id from " + - " (select wgs.gb_stream_id as id " + - " from wvp_gb_stream wgs" + - " left join wvp_stream_push sp on sp.id = wgs.gb_stream_id" + - " where wgs.gb_id is null and wgs.media_server_id = #{mediaServerId}) temp)" + " where media_server_id = #{mediaServerId} and common_gb_channel_id = 0" ) void deleteWithoutGBId(String mediaServerId); @Select("SELECT * FROM wvp_stream_push WHERE media_server_id=#{mediaServerId}") - List selectAllByMediaServerId(String mediaServerId); + List selectAllByMediaServerId(String mediaServerId); @Select("SELECT sp.* FROM wvp_stream_push sp left join wvp_gb_stream gs on gs.app = sp.app and gs.stream= sp.stream WHERE sp.media_server_id=#{mediaServerId} and gs.gb_id is null") - List selectAllByMediaServerIdWithOutGbID(String mediaServerId); + List selectAllByMediaServerIdWithOutGbID(String mediaServerId); @Update("UPDATE wvp_stream_push " + "SET status=#{status} " + @@ -175,8 +172,9 @@ public interface StreamPushMapper { @Update("UPDATE wvp_stream_push SET status=0") void setAllStreamOffline(); - @Select("SELECT CONCAT(app,stream) from wvp_gb_stream") - List getAllAppAndStream(); + @MapKey("key") + @Select("SELECT CONCAT(wgs.app,wgs.stream) as keyId, wgs.* from wvp_gb_stream as wgs") + Map getAllAppAndStream(); @Select("select count(1) from wvp_stream_push ") int getAllCount(); @@ -193,5 +191,5 @@ public interface StreamPushMapper { "(#{item.app}, #{item.stream}) " + "" + "") - List getListIn(List streamPushItems); + List getListIn(List streamPushItems); } diff --git a/src/main/java/com/genersoft/iot/vmp/vmanager/streamPush/StreamPushController.java b/src/main/java/com/genersoft/iot/vmp/vmanager/streamPush/StreamPushController.java index 7423eb60..bbe345c5 100755 --- a/src/main/java/com/genersoft/iot/vmp/vmanager/streamPush/StreamPushController.java +++ b/src/main/java/com/genersoft/iot/vmp/vmanager/streamPush/StreamPushController.java @@ -3,16 +3,13 @@ package com.genersoft.iot.vmp.vmanager.streamPush; import com.alibaba.excel.EasyExcel; import com.alibaba.excel.ExcelReader; import com.alibaba.excel.read.metadata.ReadSheet; -import com.genersoft.iot.vmp.common.CommonGbChannel; import com.genersoft.iot.vmp.common.StreamInfo; -import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.conf.exception.ControllerException; import com.genersoft.iot.vmp.conf.security.SecurityUtils; import com.genersoft.iot.vmp.conf.security.dto.LoginUser; -import com.genersoft.iot.vmp.gb28181.bean.GbStream; import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder; import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage; -import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem; +import com.genersoft.iot.vmp.media.zlm.dto.StreamPush; import com.genersoft.iot.vmp.service.IMediaServerService; import com.genersoft.iot.vmp.service.IMediaService; import com.genersoft.iot.vmp.service.IStreamPushService; @@ -70,11 +67,11 @@ public class StreamPushController { @Parameter(name = "query", description = "查询内容") @Parameter(name = "pushing", description = "是否正在推流") @Parameter(name = "mediaServerId", description = "流媒体ID") - public PageInfo list(@RequestParam(required = false)Integer page, - @RequestParam(required = false)Integer count, - @RequestParam(required = false)String query, - @RequestParam(required = false)Boolean pushing, - @RequestParam(required = false)String mediaServerId ){ + public PageInfo list(@RequestParam(required = false)Integer page, + @RequestParam(required = false)Integer count, + @RequestParam(required = false)String query, + @RequestParam(required = false)Boolean pushing, + @RequestParam(required = false)String mediaServerId ){ if (ObjectUtils.isEmpty(query)) { query = null; @@ -82,7 +79,7 @@ public class StreamPushController { if (ObjectUtils.isEmpty(mediaServerId)) { mediaServerId = null; } - PageInfo pushList = streamPushService.getPushList(page, count, query, pushing, mediaServerId); + PageInfo pushList = streamPushService.getPushList(page, count, query, pushing, mediaServerId); return pushList; } @@ -231,7 +228,7 @@ public class StreamPushController { if (userInfo!= null) { authority = true; } - StreamPushItem push = streamPushService.getPush(app, stream); + StreamPush push = streamPushService.getPush(app, stream); if (push != null && !push.isSelf()) { throw new ControllerException(ErrorCode.ERROR100.getCode(), "来自其他平台的推流信息"); } @@ -252,7 +249,7 @@ public class StreamPushController { if (ObjectUtils.isEmpty(param.getApp()) && ObjectUtils.isEmpty(param.getStream())) { throw new ControllerException(ErrorCode.ERROR400.getCode(), "app或stream不可为空"); } - StreamPushItem streamPushItem = new StreamPushItem(); + StreamPush streamPushItem = new StreamPush(); streamPushItem.setApp(param.getApp()); streamPushItem.setStream(param.getStream());