临时提交

结构优化
648540858 2023-12-08 18:05:14 +08:00
parent dc4385da74
commit 6678ac8605
12 changed files with 209 additions and 297 deletions

View File

@ -98,12 +98,36 @@ alter table wvp_platform
alter table wvp_device_channel
add common_gb_channel_id integer;
alter table wvp_stream_push
add name varchar(255) default NULL;
alter table wvp_stream_push
add common_gb_channel_id integer;
alter table wvp_stream_proxy
add common_gb_channel_id integer;
alter table wvp_stream_push
drop column origin_type;
alter table wvp_stream_push
drop column origin_type_str;
alter table wvp_stream_push
add gb_id varchar(50) default NULL;
alter table wvp_stream_push
add longitude double precision;
alter table wvp_stream_push
add latitude double precision;
alter table wvp_stream_push
add status bool default false;

View File

@ -254,11 +254,10 @@ create table wvp_stream_proxy (
create table wvp_stream_push (
id serial primary key,
name character varying(255) default NULL,
app character varying(255),
stream character varying(255),
total_reader_count character varying(50),
origin_type integer,
origin_type_str character varying(50),
create_time character varying(50),
alive_second integer,
media_server_id character varying(50),

View File

@ -160,7 +160,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
PlatformCatalog catalog = storager.getCatalog(requesterId, channelId);
MediaServerItem mediaServerItem = null;
StreamPushItem streamPushItem = null;
StreamPush streamPushItem = null;
StreamProxyItem proxyByAppAndStream =null;
// 不是通道可能是直播流
if (channel != null && gbStream == null) {
@ -624,7 +624,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
}
}
private void pushStream(RequestEvent evt, SIPRequest request, GbStream gbStream, StreamPushItem streamPushItem, ParentPlatform platform,
private void pushStream(RequestEvent evt, SIPRequest request, GbStream gbStream, StreamPush streamPushItem, ParentPlatform platform,
CallIdHeader callIdHeader, MediaServerItem mediaServerItem,
int port, Boolean tcpActive, boolean mediaTransmissionTCP,
String channelId, String addressStr, String ssrc, String requesterId) {
@ -676,7 +676,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
/**
* 线
*/
private void notifyStreamOnline(RequestEvent evt, SIPRequest request, GbStream gbStream, StreamPushItem streamPushItem, ParentPlatform platform,
private void notifyStreamOnline(RequestEvent evt, SIPRequest request, GbStream gbStream, StreamPush streamPushItem, ParentPlatform platform,
CallIdHeader callIdHeader, MediaServerItem mediaServerItem,
int port, Boolean tcpActive, boolean mediaTransmissionTCP,
String channelId, String addressStr, String ssrc, String requesterId) {
@ -804,7 +804,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
/**
* wvp
*/
private void otherWvpPushStream(RequestEvent evt, SIPRequest request, GbStream gbStream, StreamPushItem streamPushItem, ParentPlatform platform,
private void otherWvpPushStream(RequestEvent evt, SIPRequest request, GbStream gbStream, StreamPush streamPushItem, ParentPlatform platform,
CallIdHeader callIdHeader, MediaServerItem mediaServerItem,
int port, Boolean tcpActive, boolean mediaTransmissionTCP,
String channelId, String addressStr, String ssrc, String requesterId) {
@ -848,7 +848,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
if (wvpResult.getCode() == RedisGbPlayMsgListener.ERROR_CODE_OFFLINE) {
// 离线
// 查询是否在本机上线了
StreamPushItem currentStreamPushItem = streamPushService.getPush(streamPushItem.getApp(), streamPushItem.getStream());
StreamPush currentStreamPushItem = streamPushService.getPush(streamPushItem.getApp(), streamPushItem.getStream());
if (currentStreamPushItem.isPushIng()) {
// 在线状态
pushStream(evt, request, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive,

View File

@ -68,9 +68,9 @@ public class ZLMMediaListManager {
private Map<String, ChannelOnlineEvent> channelOnPublishEvents = new ConcurrentHashMap<>();
public StreamPushItem addPush(OnStreamChangedHookParam onStreamChangedHookParam) {
StreamPushItem transform = streamPushService.transform(onStreamChangedHookParam);
StreamPushItem pushInDb = streamPushService.getPush(onStreamChangedHookParam.getApp(), onStreamChangedHookParam.getStream());
public StreamPush addPush(OnStreamChangedHookParam onStreamChangedHookParam) {
StreamPush transform = streamPushService.transform(onStreamChangedHookParam);
StreamPush pushInDb = streamPushService.getPush(onStreamChangedHookParam.getApp(), onStreamChangedHookParam.getStream());
transform.setPushIng(onStreamChangedHookParam.isRegist());
transform.setUpdateTime(DateUtil.getNow());
transform.setPushTime(DateUtil.getNow());

View File

@ -1,15 +1,12 @@
package com.genersoft.iot.vmp.media.zlm.dto;
import com.genersoft.iot.vmp.gb28181.bean.GbStream;
import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam;
import com.genersoft.iot.vmp.utils.DateUtil;
import io.swagger.v3.oas.annotations.media.Schema;
import org.jetbrains.annotations.NotNull;
import java.util.List;
@Schema(description = "推流信息")
public class StreamPushItem implements Comparable<StreamPushItem>{
public class StreamPush implements Comparable<StreamPush>{
/**
* id
@ -17,6 +14,9 @@ public class StreamPushItem implements Comparable<StreamPushItem>{
@Schema(description = "id")
private Integer id;
@Schema(description = "名称")
private String name;
/**
*
*/
@ -35,43 +35,6 @@ public class StreamPushItem implements Comparable<StreamPushItem>{
@Schema(description = "观看总人数")
private String totalReaderCount;
/**
* hls/rtsp/rtmp/http-flv/ws-flv
*/
@Schema(description = "协议 包括hls/rtsp/rtmp/http-flv/ws-flv")
private List<MediaSchema> schemas;
/**
*
* unknown = 0,
* rtmp_push=1,
* rtsp_push=2,
* rtp_push=3,
* pull=4,
* ffmpeg_pull=5,
* mp4_vod=6,
* device_chn=7
*/
@Schema(description = "产生源类型")
private int originType;
/**
* null
*/
@Schema(description = "客户端和服务器网络信息可能为null类型")
private OnStreamChangedHookParam.OriginSock originSock;
/**
*
*/
@Schema(description = "产生源类型的字符串描述")
private String originTypeStr;
/**
* url
*/
@Schema(description = "产生源的url")
private String originUrl;
/**
*
@ -79,12 +42,6 @@ public class StreamPushItem implements Comparable<StreamPushItem>{
@Schema(description = "存活时间,单位秒")
private Long aliveSecond;
/**
*
*/
@Schema(description = "音视频轨道")
private List<OnStreamChangedHookParam.MediaTrack> tracks;
/**
*
*/
@ -139,35 +96,25 @@ public class StreamPushItem implements Comparable<StreamPushItem>{
@Schema(description = "国标通用信息ID")
private int commonGbChannelId;
@Schema(description = "国标ID")
private String gbId;
@Schema(description = "经度")
private double longitude;
@Schema(description = "纬度")
private double latitude;
@Schema(description = "状态")
private boolean status;
@Override
public int compareTo(@NotNull StreamPushItem streamPushItem) {
public int compareTo(@NotNull StreamPush streamPushItem) {
return Long.valueOf(DateUtil.yyyy_MM_dd_HH_mm_ssToTimestamp(this.createTime)
- DateUtil.yyyy_MM_dd_HH_mm_ssToTimestamp(streamPushItem.getCreateTime())).intValue();
}
public static class MediaSchema {
private String schema;
private Long bytesSpeed;
public String getSchema() {
return schema;
}
public void setSchema(String schema) {
this.schema = schema;
}
public Long getBytesSpeed() {
return bytesSpeed;
}
public void setBytesSpeed(Long bytesSpeed) {
this.bytesSpeed = bytesSpeed;
}
}
public Integer getId() {
return id;
}
@ -176,6 +123,14 @@ public class StreamPushItem implements Comparable<StreamPushItem>{
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getApp() {
return app;
}
@ -200,46 +155,6 @@ public class StreamPushItem implements Comparable<StreamPushItem>{
this.totalReaderCount = totalReaderCount;
}
public List<MediaSchema> getSchemas() {
return schemas;
}
public void setSchemas(List<MediaSchema> schemas) {
this.schemas = schemas;
}
public int getOriginType() {
return originType;
}
public void setOriginType(int originType) {
this.originType = originType;
}
public OnStreamChangedHookParam.OriginSock getOriginSock() {
return originSock;
}
public void setOriginSock(OnStreamChangedHookParam.OriginSock originSock) {
this.originSock = originSock;
}
public String getOriginTypeStr() {
return originTypeStr;
}
public void setOriginTypeStr(String originTypeStr) {
this.originTypeStr = originTypeStr;
}
public String getOriginUrl() {
return originUrl;
}
public void setOriginUrl(String originUrl) {
this.originUrl = originUrl;
}
public Long getAliveSecond() {
return aliveSecond;
}
@ -248,14 +163,6 @@ public class StreamPushItem implements Comparable<StreamPushItem>{
this.aliveSecond = aliveSecond;
}
public List<OnStreamChangedHookParam.MediaTrack> getTracks() {
return tracks;
}
public void setTracks(List<OnStreamChangedHookParam.MediaTrack> tracks) {
this.tracks = tracks;
}
public String getVhost() {
return vhost;
}
@ -327,5 +234,37 @@ public class StreamPushItem implements Comparable<StreamPushItem>{
public void setCommonGbChannelId(int commonGbChannelId) {
this.commonGbChannelId = commonGbChannelId;
}
public String getGbId() {
return gbId;
}
public void setGbId(String gbId) {
this.gbId = gbId;
}
public double getLongitude() {
return longitude;
}
public void setLongitude(double longitude) {
this.longitude = longitude;
}
public double getLatitude() {
return latitude;
}
public void setLatitude(double latitude) {
this.latitude = latitude;
}
public boolean isStatus() {
return status;
}
public void setStatus(boolean status) {
this.status = status;
}
}

View File

@ -4,7 +4,7 @@ import com.genersoft.iot.vmp.common.CommonGbChannel;
import com.genersoft.iot.vmp.gb28181.bean.GbStream;
import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem;
import com.genersoft.iot.vmp.media.zlm.dto.StreamPush;
import com.genersoft.iot.vmp.service.bean.StreamPushItemFromRedis;
import com.genersoft.iot.vmp.vmanager.bean.ResourceBaseInfo;
import com.github.pagehelper.PageInfo;
@ -17,32 +17,18 @@ import java.util.Map;
*/
public interface IStreamPushService {
List<StreamPushItem> handleJSON(String json, MediaServerItem mediaServerItem);
/**
* ID
* @param stream
* @return
*/
boolean saveToGB(GbStream stream);
/**
* ID
* @param stream
* @return
*/
boolean removeFromGB(GbStream stream);
List<StreamPush> handleJSON(String json, MediaServerItem mediaServerItem);
/**
*
*/
PageInfo<StreamPushItem> getPushList(Integer page, Integer count, String query, Boolean pushing, String mediaServerId);
PageInfo<StreamPush> getPushList(Integer page, Integer count, String query, Boolean pushing, String mediaServerId);
List<StreamPushItem> getPushList(String mediaSererId);
List<StreamPush> getPushList(String mediaSererId);
StreamPushItem transform(OnStreamChangedHookParam item);
StreamPush transform(OnStreamChangedHookParam item);
StreamPushItem getPush(String app, String streamId);
StreamPush getPush(String app, String streamId);
/**
*
@ -66,13 +52,10 @@ public interface IStreamPushService {
*/
void clean();
boolean saveToRandomGB();
/**
*
*/
void batchAdd(List<StreamPushItem> streamPushExcelDtoList);
void batchAdd(List<StreamPush> streamPushExcelDtoList);
/**
*
@ -82,7 +65,7 @@ public interface IStreamPushService {
/**
*
*/
void batchAddForUpload(List<StreamPushItem> streamPushItems, Map<String, List<String[]>> streamPushItemsForAll);
void batchAddForUpload(List<StreamPush> streamPushItems, Map<String, List<String[]>> streamPushItemsForAll);
/**
* 线
@ -102,17 +85,19 @@ public interface IStreamPushService {
/**
*
*/
boolean add(StreamPushItem stream, CommonGbChannel commonGbChannel);
boolean add(StreamPush stream, CommonGbChannel commonGbChannel);
/**
* app+Streanm
* @return
*/
List<String> getAllAppAndStream();
Map<String, StreamPush> getAllAppAndStream();
/**
*
* @return
*/
ResourceBaseInfo getOverview();
void batchUpdate(List<StreamPush> streamPushItemForUpdate);
}

View File

@ -1,7 +1,6 @@
package com.genersoft.iot.vmp.service.impl;
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONArray;
import com.alibaba.fastjson2.JSONObject;
import com.alibaba.fastjson2.TypeReference;
import com.genersoft.iot.vmp.common.CommonGbChannel;
@ -31,7 +30,6 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
import org.springframework.stereotype.Service;
import org.springframework.transaction.TransactionDefinition;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.ObjectUtils;
@ -78,12 +76,12 @@ public class StreamPushServiceImpl implements IStreamPushService {
@Override
public List<StreamPushItem> handleJSON(String jsonData, MediaServerItem mediaServerItem) {
public List<StreamPush> handleJSON(String jsonData, MediaServerItem mediaServerItem) {
if (jsonData == null) {
return null;
}
Map<String, StreamPushItem> result = new HashMap<>();
Map<String, StreamPush> result = new HashMap<>();
List<OnStreamChangedHookParam> onStreamChangedHookParams = JSON.parseObject(jsonData, new TypeReference<List<OnStreamChangedHookParam>>() {});
for (OnStreamChangedHookParam item : onStreamChangedHookParams) {
@ -93,7 +91,7 @@ public class StreamPushServiceImpl implements IStreamPushService {
|| item.getOriginType() == OriginType.RTMP_PUSH.ordinal()
|| item.getOriginType() == OriginType.RTC_PUSH.ordinal() ) {
String key = item.getApp() + "_" + item.getStream();
StreamPushItem streamPushItem = result.get(key);
StreamPush streamPushItem = result.get(key);
if (streamPushItem == null) {
streamPushItem = transform(item);
result.put(key, streamPushItem);
@ -104,17 +102,13 @@ public class StreamPushServiceImpl implements IStreamPushService {
return new ArrayList<>(result.values());
}
@Override
public StreamPushItem transform(OnStreamChangedHookParam item) {
StreamPushItem streamPushItem = new StreamPushItem();
public StreamPush transform(OnStreamChangedHookParam item) {
StreamPush streamPushItem = new StreamPush();
streamPushItem.setApp(item.getApp());
streamPushItem.setMediaServerId(item.getMediaServerId());
streamPushItem.setStream(item.getStream());
streamPushItem.setAliveSecond(item.getAliveSecond());
streamPushItem.setOriginSock(item.getOriginSock());
streamPushItem.setTotalReaderCount(item.getTotalReaderCount());
streamPushItem.setOriginType(item.getOriginType());
streamPushItem.setOriginTypeStr(item.getOriginTypeStr());
streamPushItem.setOriginUrl(item.getOriginUrl());
streamPushItem.setCreateTime(DateUtil.getNow());
streamPushItem.setAliveSecond(item.getAliveSecond());
streamPushItem.setVhost(item.getVhost());
@ -123,37 +117,36 @@ public class StreamPushServiceImpl implements IStreamPushService {
}
@Override
public PageInfo<StreamPushItem> getPushList(Integer page, Integer count, String query, Boolean pushing, String mediaServerId) {
public PageInfo<StreamPush> getPushList(Integer page, Integer count, String query, Boolean pushing, String mediaServerId) {
PageHelper.startPage(page, count);
List<StreamPushItem> all = streamPushMapper.selectAllForList(query, pushing, mediaServerId);
List<StreamPush> all = streamPushMapper.selectAllForList(query, pushing, mediaServerId);
return new PageInfo<>(all);
}
@Override
public List<StreamPushItem> getPushList(String mediaServerId) {
public List<StreamPush> getPushList(String mediaServerId) {
return streamPushMapper.selectAllByMediaServerIdWithOutGbID(mediaServerId);
}
@Override
public StreamPushItem getPush(String app, String streamId) {
public StreamPush getPush(String app, String streamId) {
return streamPushMapper.selectOne(app, streamId);
}
@Override
public boolean stop(String app, String streamId) {
logger.info("[推流 ] 停止流: {}/{}", app, streamId);
StreamPushItem streamPushItem = streamPushMapper.selectOne(app, streamId);
if (streamPushItem != null) {
gbStreamService.sendCatalogMsg(streamPushItem, CatalogEvent.DEL);
}
logger.info("[停止推流 ] {}/{}", app, streamId);
platformGbStreamMapper.delByAppAndStream(app, streamId);
gbStreamMapper.del(app, streamId);
int delStream = streamPushMapper.del(app, streamId);
if (delStream > 0) {
MediaServerItem mediaServerItem = mediaServerService.getOne(streamPushItem.getMediaServerId());
zlmresTfulUtils.closeStreams(mediaServerItem,app, streamId);
StreamPush streamPushItem = streamPushMapper.selectOne(app, streamId);
if (streamPushItem == null) {
logger.info("[停止推流] 不存在 {}/{} ", app, streamId);
return false;
}
if (streamPushItem.getCommonGbChannelId() == 0) {
streamPushMapper.del(app, streamId);
}
MediaServerItem mediaServerItem = mediaServerService.getOne(streamPushItem.getMediaServerId());
zlmresTfulUtils.closeStreams(mediaServerItem,app, streamId);
return true;
}
@ -165,14 +158,14 @@ public class StreamPushServiceImpl implements IStreamPushService {
return;
}
// 数据库记录
List<StreamPushItem> pushList = getPushList(mediaServerId);
Map<String, StreamPushItem> pushItemMap = new HashMap<>();
List<StreamPush> pushList = getPushList(mediaServerId);
Map<String, StreamPush> pushItemMap = new HashMap<>();
// redis记录
List<OnStreamChangedHookParam> onStreamChangedHookParams = redisCatchStorage.getStreams(mediaServerId, "PUSH");
Map<String, OnStreamChangedHookParam> streamInfoPushItemMap = new HashMap<>();
if (pushList.size() > 0) {
for (StreamPushItem streamPushItem : pushList) {
if (ObjectUtils.isEmpty(streamPushItem.getGbId())) {
for (StreamPush streamPushItem : pushList) {
if (streamPushItem.getCommonGbChannelId() > 0) {
pushItemMap.put(streamPushItem.getApp() + streamPushItem.getStream(), streamPushItem);
}
}
@ -195,7 +188,7 @@ public class StreamPushServiceImpl implements IStreamPushService {
String dataStr = mediaList.getString("data");
Integer code = mediaList.getInteger("code");
List<StreamPushItem> streamPushItems = null;
List<StreamPush> streamPushItems = null;
if (code == 0 ) {
if (dataStr != null) {
streamPushItems = handleJSON(dataStr, mediaServerItem);
@ -203,13 +196,13 @@ public class StreamPushServiceImpl implements IStreamPushService {
}
if (streamPushItems != null) {
for (StreamPushItem streamPushItem : streamPushItems) {
for (StreamPush streamPushItem : streamPushItems) {
pushItemMap.remove(streamPushItem.getApp() + streamPushItem.getStream());
streamInfoPushItemMap.remove(streamPushItem.getApp() + streamPushItem.getStream());
streamAuthorityInfoInfoMap.remove(streamPushItem.getApp() + streamPushItem.getStream());
}
}
List<StreamPushItem> offlinePushItems = new ArrayList<>(pushItemMap.values());
List<StreamPush> offlinePushItems = new ArrayList<>(pushItemMap.values());
if (offlinePushItems.size() > 0) {
String type = "PUSH";
int runLimit = 300;
@ -219,7 +212,7 @@ public class StreamPushServiceImpl implements IStreamPushService {
if (i + runLimit > offlinePushItems.size()) {
toIndex = offlinePushItems.size();
}
List<StreamPushItem> streamPushItemsSub = offlinePushItems.subList(i, toIndex);
List<StreamPush> streamPushItemsSub = offlinePushItems.subList(i, toIndex);
streamPushMapper.delAll(streamPushItemsSub);
}
}else {
@ -255,10 +248,9 @@ public class StreamPushServiceImpl implements IStreamPushService {
@Override
public void zlmServerOffline(String mediaServerId) {
List<StreamPushItem> streamPushItems = streamPushMapper.selectAllByMediaServerIdWithOutGbID(mediaServerId);
List<StreamPush> streamPushItems = streamPushMapper.selectAllByMediaServerIdWithOutGbID(mediaServerId);
// 移除没有GBId的推流
streamPushMapper.deleteWithoutGBId(mediaServerId);
gbStreamMapper.deleteWithoutGBId("push", mediaServerId);
// 其他的流设置未启用
streamPushMapper.updateStatusByMediaServerId(mediaServerId, false);
streamProxyMapper.updateStatusByMediaServerId(mediaServerId, false);
@ -287,44 +279,25 @@ public class StreamPushServiceImpl implements IStreamPushService {
}
@Override
public boolean saveToRandomGB() {
List<StreamPushItem> streamPushItems = streamPushMapper.selectAll();
long gbId = 100001;
for (StreamPushItem streamPushItem : streamPushItems) {
streamPushItem.setStreamType("push");
streamPushItem.setStatus(true);
streamPushItem.setGbId("34020000004111" + gbId);
streamPushItem.setCreateTime(DateUtil.getNow());
gbId ++;
}
int limitCount = 30;
@Transactional
public void batchAdd(List<StreamPush> streamPushItems) {
// 把存在国标Id的写入同步资源库
if (streamPushItems.size() > limitCount) {
for (int i = 0; i < streamPushItems.size(); i += limitCount) {
int toIndex = i + limitCount;
if (i + limitCount > streamPushItems.size()) {
toIndex = streamPushItems.size();
}
gbStreamMapper.batchAdd(streamPushItems.subList(i, toIndex));
}
}else {
gbStreamMapper.batchAdd(streamPushItems);
}
return true;
}
@Override
public void batchAdd(List<StreamPushItem> streamPushItems) {
streamPushMapper.addAll(streamPushItems);
gbStreamMapper.batchAdd(streamPushItems);
}
@Override
public void batchUpdate(List<StreamPush> streamPushItemForUpdate) {
}
@Override
public void batchAddForUpload(List<StreamPushItem> streamPushItems, Map<String, List<String[]>> streamPushItemsForAll ) {
public void batchAddForUpload(List<StreamPush> streamPushItems, Map<String, List<String[]>> streamPushItemsForAll ) {
// 存储数据到stream_push表
streamPushMapper.addAll(streamPushItems);
List<StreamPushItem> streamPushItemForGbStream = streamPushItems.stream()
List<StreamPush> streamPushItemForGbStream = streamPushItems.stream()
.filter(streamPushItem-> streamPushItem.getGbId() != null)
.collect(Collectors.toList());
// 存储数据到gb_stream表 id会返回到streamPushItemForGbStream里
@ -332,7 +305,7 @@ public class StreamPushServiceImpl implements IStreamPushService {
gbStreamMapper.batchAdd(streamPushItemForGbStream);
}
// 去除没有ID也就是没有存储到数据库的数据
List<StreamPushItem> streamPushItemsForPlatform = streamPushItemForGbStream.stream()
List<StreamPush> streamPushItemsForPlatform = streamPushItemForGbStream.stream()
.filter(streamPushItem-> streamPushItem.getGbStreamId() != null)
.collect(Collectors.toList());
@ -360,14 +333,14 @@ public class StreamPushServiceImpl implements IStreamPushService {
}
platformInfoMap.put(platform.getServerGBId(), catalogMap);
}
List<StreamPushItem> streamPushItemListFroPlatform = new ArrayList<>();
List<StreamPush> streamPushItemListFroPlatform = new ArrayList<>();
Map<String, List<GbStream>> platformForEvent = new HashMap<>();
// 遍历存储结果查找app+Stream->platformId+catalogId的对应关系然后执行批量写入
for (StreamPushItem streamPushItem : streamPushItemsForPlatform) {
for (StreamPush streamPushItem : streamPushItemsForPlatform) {
List<String[]> platFormInfoList = streamPushItemsForAll.get(streamPushItem.getApp() + streamPushItem.getStream());
if (platFormInfoList != null && platFormInfoList.size() > 0) {
for (String[] platFormInfoArray : platFormInfoList) {
StreamPushItem streamPushItemForPlatform = new StreamPushItem();
StreamPush streamPushItemForPlatform = new StreamPush();
streamPushItemForPlatform.setGbStreamId(streamPushItem.getGbStreamId());
if (platFormInfoArray.length > 0) {
// 数组 platFormInfoArray 0 为平台ID。 1为目录ID
@ -420,11 +393,12 @@ public class StreamPushServiceImpl implements IStreamPushService {
MediaServerItem mediaServerItem = mediaServerService.getOne(gbStream.getMediaServerId());
zlmresTfulUtils.closeStreams(mediaServerItem, gbStream.getApp(), gbStream.getStream());
}
}
return true;
}
@Override
public void allStreamOffline() {
List<GbStream> onlinePushers = streamPushMapper.getOnlinePusherForGb();
@ -457,7 +431,7 @@ public class StreamPushServiceImpl implements IStreamPushService {
@Override
@Transactional
public boolean add(StreamPushItem stream, CommonGbChannel commonGbChannel) {
public boolean add(StreamPush stream, CommonGbChannel commonGbChannel) {
assert !ObjectUtils.isEmpty(commonGbChannel.getCommonGbDeviceID());
assert !ObjectUtils.isEmpty(commonGbChannel.getCommonGbName());
String now = DateUtil.getNow();
@ -478,8 +452,7 @@ public class StreamPushServiceImpl implements IStreamPushService {
}
@Override
public List<String> getAllAppAndStream() {
public Map<String, StreamPush> getAllAppAndStream() {
return streamPushMapper.getAllAppAndStream();
}

View File

@ -2,7 +2,7 @@ package com.genersoft.iot.vmp.service.impl;
import com.alibaba.excel.context.AnalysisContext;
import com.alibaba.excel.event.AnalysisEventListener;
import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem;
import com.genersoft.iot.vmp.media.zlm.dto.StreamPush;
import com.genersoft.iot.vmp.service.IStreamPushService;
import com.genersoft.iot.vmp.utils.DateUtil;
import com.genersoft.iot.vmp.vmanager.bean.StreamPushExcelDto;
@ -32,12 +32,12 @@ public class StreamPushUploadFileHandler extends AnalysisEventListener<StreamPus
/**
*
*/
private final List<StreamPushItem> streamPushItems = new ArrayList<>();
private final List<StreamPush> streamPushItems = new ArrayList<>();
/**
* APP+Streamstream_pushgb_stream
*/
private final Map<String,StreamPushItem> streamPushItemForSave = new HashMap<>();
private final Map<String, StreamPush> streamPushItemForSave = new HashMap<>();
/**
* APP+StreamKEY ID+Id valuegb_streamapp+Stream
@ -80,9 +80,9 @@ public class StreamPushUploadFileHandler extends AnalysisEventListener<StreamPus
this.defaultMediaServerId = defaultMediaServerId;
this.errorDataHandler = errorDataHandler;
// 获取数据库已有的数据,已经存在的则忽略
List<String> allAppAndStreams = pushService.getAllAppAndStream();
Map<String, StreamPush> allAppAndStreams = pushService.getAllAppAndStream();
if (allAppAndStreams.size() > 0) {
for (String allAppAndStream : allAppAndStreams) {
for (String allAppAndStream : allAppAndStreams.keySet()) {
pushMapInDb.put(allAppAndStream, allAppAndStream);
}
}
@ -126,18 +126,16 @@ public class StreamPushUploadFileHandler extends AnalysisEventListener<StreamPus
streamPushStreamSet.add(streamPushExcelDto.getApp()+streamPushExcelDto.getStream() + streamPushExcelDto.getPlatformId());
}
StreamPushItem streamPushItem = new StreamPushItem();
StreamPush streamPushItem = new StreamPush();
streamPushItem.setApp(streamPushExcelDto.getApp());
streamPushItem.setStream(streamPushExcelDto.getStream());
streamPushItem.setGbId(streamPushExcelDto.getGbId());
streamPushItem.setStatus(streamPushExcelDto.getStatus());
streamPushItem.setStreamType("push");
streamPushItem.setCreateTime(DateUtil.getNow());
streamPushItem.setMediaServerId(defaultMediaServerId);
streamPushItem.setName(streamPushExcelDto.getName());
streamPushItem.setOriginType(2);
streamPushItem.setOriginTypeStr("rtsp_push");
streamPushItem.setTotalReaderCount("0");
streamPushItem.setPlatformId(streamPushExcelDto.getPlatformId());
streamPushItem.setCatalogId(streamPushExcelDto.getCatalogId());

View File

@ -8,7 +8,7 @@ import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform;
import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem;
import com.genersoft.iot.vmp.media.zlm.dto.StreamPush;
import com.genersoft.iot.vmp.service.IMediaServerService;
import com.genersoft.iot.vmp.service.IStreamPushService;
import com.genersoft.iot.vmp.service.bean.MessageForPushChannel;
@ -71,7 +71,7 @@ public class RedisPushStreamCloseResponseListener implements MessageListener {
public void onMessage(Message message, byte[] bytes) {
logger.info("[REDIS消息-推流结束] {}", new String(message.getBody()));
MessageForPushChannel pushChannel = JSON.parseObject(message.getBody(), MessageForPushChannel.class);
StreamPushItem push = streamPushService.getPush(pushChannel.getApp(), pushChannel.getStream());
StreamPush push = streamPushService.getPush(pushChannel.getApp(), pushChannel.getStream());
if (push != null) {
if (redisCatchStorage.isChannelSendingRTP(push.getGbId())) {
List<SendRtpItem> sendRtpItems = redisCatchStorage.querySendRTPServerByChnnelId(

View File

@ -2,10 +2,11 @@ package com.genersoft.iot.vmp.service.redisMsg;
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONObject;
import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem;
import com.genersoft.iot.vmp.media.zlm.dto.StreamPush;
import com.genersoft.iot.vmp.service.IGbStreamService;
import com.genersoft.iot.vmp.service.IMediaServerService;
import com.genersoft.iot.vmp.service.IStreamPushService;
import com.genersoft.iot.vmp.service.dto.StreamPushDto;
import com.genersoft.iot.vmp.utils.DateUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -19,6 +20,7 @@ import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentLinkedQueue;
/**
@ -35,8 +37,6 @@ public class RedisPushStreamStatusListMsgListener implements MessageListener {
@Resource
private IStreamPushService streamPushService;
@Resource
private IGbStreamService gbStreamService;
private ConcurrentLinkedQueue<Message> taskQueue = new ConcurrentLinkedQueue<>();
@ -55,44 +55,43 @@ public class RedisPushStreamStatusListMsgListener implements MessageListener {
while (!taskQueue.isEmpty()) {
Message msg = taskQueue.poll();
try {
List<StreamPushItem> streamPushItems = JSON.parseArray(new String(msg.getBody()), StreamPushItem.class);
List<StreamPush> streamPushList = JSON.parseArray(new String(msg.getBody()), StreamPush.class);
//查询全部的app+stream 用于判断是添加还是修改
List<String> allAppAndStream = streamPushService.getAllAppAndStream();
Map<String, StreamPush> allAppAndStream = streamPushService.getAllAppAndStream();
/**
* APP+Streamstream_pushgb_stream
*/
List<StreamPushItem> streamPushItemForSave = new ArrayList<>();
List<StreamPushItem> streamPushItemForUpdate = new ArrayList<>();
for (StreamPushItem streamPushItem : streamPushItems) {
String app = streamPushItem.getApp();
String stream = streamPushItem.getStream();
boolean contains = allAppAndStream.contains(app + stream);
List<StreamPush> streamPushItemForSave = new ArrayList<>();
List<StreamPush> streamPushItemForUpdate = new ArrayList<>();
for (StreamPush streamPush : streamPushList) {
streamPush.setUpdateTime(DateUtil.getNow());
//不存在就添加
if (!contains) {
streamPushItem.setStreamType("push");
streamPushItem.setCreateTime(DateUtil.getNow());
streamPushItem.setMediaServerId(mediaServerService.getDefaultMediaServer().getId());
streamPushItem.setOriginType(2);
streamPushItem.setOriginTypeStr("rtsp_push");
streamPushItem.setTotalReaderCount("0");
streamPushItemForSave.add(streamPushItem);
if (!allAppAndStream.containsKey(streamPush.getApp() + streamPush.getStream())) {
streamPush.setCreateTime(DateUtil.getNow());
streamPush.setMediaServerId(mediaServerService.getDefaultMediaServer().getId());
streamPushItemForSave.add(streamPush);
} else {
//存在就只修改 name和gbId
streamPushItemForUpdate.add(streamPushItem);
StreamPush streamPushBoInDB = allAppAndStream.get(streamPush.getApp() + streamPush.getStream());
// 涉及可以变化的内容为名称国标Id 状态
if (!streamPush.getName().equals(streamPushBoInDB.getName())
|| !streamPush.getGbId().equals(streamPushBoInDB.getGbId())
|| !streamPush.isStatus() == streamPushBoInDB.isStatus()) {
streamPushItemForUpdate.add(streamPush);
}
}
}
if (streamPushItemForSave.size() > 0) {
if (!streamPushItemForSave.isEmpty()) {
logger.info("添加{}条",streamPushItemForSave.size());
logger.info(JSONObject.toJSONString(streamPushItemForSave));
streamPushService.batchAdd(streamPushItemForSave);
}
if(streamPushItemForUpdate.size()>0){
if(!streamPushItemForUpdate.isEmpty()){
logger.info("修改{}条",streamPushItemForUpdate.size());
logger.info(JSONObject.toJSONString(streamPushItemForUpdate));
gbStreamService.updateGbIdOrName(streamPushItemForUpdate);
streamPushService.batchUpdate(streamPushItemForUpdate);
}
}catch (Exception e) {
logger.warn("[REDIS消息-推流设备列表更新] 发现未处理的异常, \r\n{}", JSON.toJSONString(message));

View File

@ -1,12 +1,13 @@
package com.genersoft.iot.vmp.storager.dao;
import com.genersoft.iot.vmp.gb28181.bean.GbStream;
import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem;
import com.genersoft.iot.vmp.media.zlm.dto.StreamPush;
import com.genersoft.iot.vmp.service.bean.StreamPushItemFromRedis;
import org.apache.ibatis.annotations.*;
import org.springframework.stereotype.Repository;
import java.util.List;
import java.util.Map;
@Mapper
@Repository
@ -17,7 +18,7 @@ public interface StreamPushMapper {
"(#{app}, #{stream}, #{totalReaderCount}, #{originType}, #{originTypeStr}, " +
"#{pushTime}, #{aliveSecond}, #{mediaServerId} , #{updateTime} , #{createTime}, " +
"#{pushIng}, #{self} )")
int add(StreamPushItem streamPushItem);
int add(StreamPush streamPushItem);
@Update(value = {" <script>" +
@ -33,7 +34,7 @@ public interface StreamPushMapper {
"<if test=\"self != null\">, self=#{self}</if>" +
"WHERE app=#{app} AND stream=#{stream}"+
" </script>"})
int update(StreamPushItem streamPushItem);
int update(StreamPush streamPushItem);
@Delete("DELETE FROM wvp_stream_push WHERE app=#{app} AND stream=#{stream}")
int del(String app, String stream);
@ -44,7 +45,7 @@ public interface StreamPushMapper {
"(sp.app=#{item.app} and sp.stream=#{item.stream} and gs.gb_id is null) " +
"</foreach>" +
"</script>")
int delAllWithoutGBId(List<StreamPushItem> streamPushItems);
int delAllWithoutGBId(List<StreamPush> streamPushItems);
@Delete("<script> "+
"DELETE FROM wvp_stream_push where " +
@ -52,7 +53,7 @@ public interface StreamPushMapper {
"(app=#{item.app} and stream=#{item.stream}) " +
"</foreach>" +
"</script>")
int delAll(List<StreamPushItem> streamPushItems);
int delAll(List<StreamPush> streamPushItems);
@Delete("<script> "+
"DELETE FROM wvp_stream_push where " +
@ -79,13 +80,13 @@ public interface StreamPushMapper {
" <if test='mediaServerId != null' > AND st.media_server_id=#{mediaServerId} </if>" +
"order by st.create_time desc" +
" </script>"})
List<StreamPushItem> selectAllForList(@Param("query") String query, @Param("pushing") Boolean pushing, @Param("mediaServerId") String mediaServerId);
List<StreamPush> selectAllForList(@Param("query") String query, @Param("pushing") Boolean pushing, @Param("mediaServerId") String mediaServerId);
@Select("SELECT st.*, gs.gb_id, gs.name, gs.longitude, gs.latitude FROM wvp_stream_push st LEFT join wvp_gb_stream gs on st.app = gs.app AND st.stream = gs.stream order by st.create_time desc")
List<StreamPushItem> selectAll();
List<StreamPush> selectAll();
@Select("SELECT st.*, gs.gb_id, gs.name, gs.longitude, gs.latitude FROM wvp_stream_push st LEFT join wvp_gb_stream gs on st.app = gs.app AND st.stream = gs.stream WHERE st.app=#{app} AND st.stream=#{stream}")
StreamPushItem selectOne(@Param("app") String app, @Param("stream") String stream);
StreamPush selectOne(@Param("app") String app, @Param("stream") String stream);
@Insert("<script>" +
"Insert INTO wvp_stream_push (app, stream, total_reader_count, origin_type, origin_type_str, " +
@ -97,27 +98,23 @@ public interface StreamPushMapper {
" </foreach>" +
"</script>")
@Options(useGeneratedKeys = true, keyProperty = "id", keyColumn = "id")
int addAll(List<StreamPushItem> streamPushItems);
int addAll(List<StreamPush> streamPushItems);
@Delete("DELETE FROM wvp_stream_push")
void clear();
@Delete("delete" +
@Delete(
"delete" +
" from wvp_stream_push " +
" where id in " +
" (select temp.id from " +
" (select wgs.gb_stream_id as id " +
" from wvp_gb_stream wgs" +
" left join wvp_stream_push sp on sp.id = wgs.gb_stream_id" +
" where wgs.gb_id is null and wgs.media_server_id = #{mediaServerId}) temp)"
" where media_server_id = #{mediaServerId} and common_gb_channel_id = 0"
)
void deleteWithoutGBId(String mediaServerId);
@Select("SELECT * FROM wvp_stream_push WHERE media_server_id=#{mediaServerId}")
List<StreamPushItem> selectAllByMediaServerId(String mediaServerId);
List<StreamPush> selectAllByMediaServerId(String mediaServerId);
@Select("SELECT sp.* FROM wvp_stream_push sp left join wvp_gb_stream gs on gs.app = sp.app and gs.stream= sp.stream WHERE sp.media_server_id=#{mediaServerId} and gs.gb_id is null")
List<StreamPushItem> selectAllByMediaServerIdWithOutGbID(String mediaServerId);
List<StreamPush> selectAllByMediaServerIdWithOutGbID(String mediaServerId);
@Update("UPDATE wvp_stream_push " +
"SET status=#{status} " +
@ -175,8 +172,9 @@ public interface StreamPushMapper {
@Update("UPDATE wvp_stream_push SET status=0")
void setAllStreamOffline();
@Select("SELECT CONCAT(app,stream) from wvp_gb_stream")
List<String> getAllAppAndStream();
@MapKey("key")
@Select("SELECT CONCAT(wgs.app,wgs.stream) as keyId, wgs.* from wvp_gb_stream as wgs")
Map<String, StreamPush> getAllAppAndStream();
@Select("select count(1) from wvp_stream_push ")
int getAllCount();
@ -193,5 +191,5 @@ public interface StreamPushMapper {
"(#{item.app}, #{item.stream}) " +
"</foreach>" +
"</script>")
List<StreamPushItem> getListIn(List<StreamPushItem> streamPushItems);
List<StreamPush> getListIn(List<StreamPush> streamPushItems);
}

View File

@ -3,16 +3,13 @@ package com.genersoft.iot.vmp.vmanager.streamPush;
import com.alibaba.excel.EasyExcel;
import com.alibaba.excel.ExcelReader;
import com.alibaba.excel.read.metadata.ReadSheet;
import com.genersoft.iot.vmp.common.CommonGbChannel;
import com.genersoft.iot.vmp.common.StreamInfo;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.conf.exception.ControllerException;
import com.genersoft.iot.vmp.conf.security.SecurityUtils;
import com.genersoft.iot.vmp.conf.security.dto.LoginUser;
import com.genersoft.iot.vmp.gb28181.bean.GbStream;
import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder;
import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage;
import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem;
import com.genersoft.iot.vmp.media.zlm.dto.StreamPush;
import com.genersoft.iot.vmp.service.IMediaServerService;
import com.genersoft.iot.vmp.service.IMediaService;
import com.genersoft.iot.vmp.service.IStreamPushService;
@ -70,11 +67,11 @@ public class StreamPushController {
@Parameter(name = "query", description = "查询内容")
@Parameter(name = "pushing", description = "是否正在推流")
@Parameter(name = "mediaServerId", description = "流媒体ID")
public PageInfo<StreamPushItem> list(@RequestParam(required = false)Integer page,
@RequestParam(required = false)Integer count,
@RequestParam(required = false)String query,
@RequestParam(required = false)Boolean pushing,
@RequestParam(required = false)String mediaServerId ){
public PageInfo<StreamPush> list(@RequestParam(required = false)Integer page,
@RequestParam(required = false)Integer count,
@RequestParam(required = false)String query,
@RequestParam(required = false)Boolean pushing,
@RequestParam(required = false)String mediaServerId ){
if (ObjectUtils.isEmpty(query)) {
query = null;
@ -82,7 +79,7 @@ public class StreamPushController {
if (ObjectUtils.isEmpty(mediaServerId)) {
mediaServerId = null;
}
PageInfo<StreamPushItem> pushList = streamPushService.getPushList(page, count, query, pushing, mediaServerId);
PageInfo<StreamPush> pushList = streamPushService.getPushList(page, count, query, pushing, mediaServerId);
return pushList;
}
@ -231,7 +228,7 @@ public class StreamPushController {
if (userInfo!= null) {
authority = true;
}
StreamPushItem push = streamPushService.getPush(app, stream);
StreamPush push = streamPushService.getPush(app, stream);
if (push != null && !push.isSelf()) {
throw new ControllerException(ErrorCode.ERROR100.getCode(), "来自其他平台的推流信息");
}
@ -252,7 +249,7 @@ public class StreamPushController {
if (ObjectUtils.isEmpty(param.getApp()) && ObjectUtils.isEmpty(param.getStream())) {
throw new ControllerException(ErrorCode.ERROR400.getCode(), "app或stream不可为空");
}
StreamPushItem streamPushItem = new StreamPushItem();
StreamPush streamPushItem = new StreamPush();
streamPushItem.setApp(param.getApp());
streamPushItem.setStream(param.getStream());