修复推流推流状态提示

pull/556/head
648540858 2022-07-25 11:17:46 +08:00
parent 91dc4566a2
commit c77c1a956c
13 changed files with 67 additions and 179 deletions

File diff suppressed because one or more lines are too long

View File

@ -77,5 +77,8 @@ alter table platform_catalog
alter table platform_catalog
add businessGroupId varchar(50) default null;
/********************* ADD ***************************/
alter table stream_push
add self int DEFAULT NULL;

View File

@ -461,7 +461,6 @@ public class ZLMHttpHookListener {
StreamInfo streamInfoByAppAndStream = mediaService.getStreamInfoByAppAndStream(mediaServerItem,
app, stream, tracks, streamAuthorityInfo.getCallId());
item.setStreamInfo(streamInfoByAppAndStream);
item.setSeverId(userSetting.getServerId());
redisCatchStorage.addStream(mediaServerItem, type, app, stream, item);
if (item.getOriginType() == OriginType.RTSP_PUSH.ordinal()
|| item.getOriginType() == OriginType.RTMP_PUSH.ordinal()
@ -469,20 +468,6 @@ public class ZLMHttpHookListener {
item.setSeverId(userSetting.getServerId());
zlmMediaListManager.addPush(item);
}
// List<GbStream> gbStreams = new ArrayList<>();
// if (streamPushItem == null || streamPushItem.getGbId() == null) {
// GbStream gbStream = storager.getGbStream(app, streamId);
// gbStreams.add(gbStream);
// }else {
// if (streamPushItem.getGbId() != null) {
// gbStreams.add(streamPushItem);
// }
// }
// if (gbStreams.size() > 0) {
// eventPublisher.catalogEventPublishForStream(null, gbStreams, CatalogEvent.ON);
// }
}else {
// 兼容流注销时类型从redis记录获取
MediaItem mediaItem = redisCatchStorage.getStreamInfo(app, stream, mediaServerId);

View File

@ -65,60 +65,14 @@ public class ZLMMediaListManager {
private Map<String, ChannelOnlineEvent> channelOnlineEvents = new ConcurrentHashMap<>();
public void updateMediaList(MediaServerItem mediaServerItem) {
storager.clearMediaList();
// 使用异步的当时更新媒体流列表
zlmresTfulUtils.getMediaList(mediaServerItem, (mediaList ->{
if (mediaList == null) {
return;
}
String dataStr = mediaList.getString("data");
Integer code = mediaList.getInteger("code");
Map<String, StreamPushItem> result = new HashMap<>();
List<StreamPushItem> streamPushItems = null;
// 获取所有的国标关联
// List<GbStream> gbStreams = gbStreamMapper.selectAllByMediaServerId(mediaServerItem.getId());
if (code == 0 ) {
if (dataStr != null) {
streamPushItems = streamPushService.handleJSON(dataStr, mediaServerItem);
}
}else {
logger.warn("更新视频流失败错误code " + code);
}
if (streamPushItems != null) {
storager.updateMediaList(streamPushItems);
for (StreamPushItem streamPushItem : streamPushItems) {
JSONObject jsonObject = new JSONObject();
jsonObject.put("app", streamPushItem.getApp());
jsonObject.put("stream", streamPushItem.getStream());
jsonObject.put("mediaServerId", mediaServerItem.getId());
subscribe.addSubscribe(ZLMHttpHookSubscribe.HookType.on_play,jsonObject,
(MediaServerItem mediaServerItemInuse, JSONObject response)->{
updateMedia(mediaServerItem, response.getString("app"), response.getString("stream"));
}
);
}
}
}));
}
public void addMedia(MediaServerItem mediaServerItem, String app, String streamId) {
//使用异步更新推流
updateMedia(mediaServerItem, app, streamId);
}
public StreamPushItem addPush(MediaItem mediaItem) {
// 查找此直播流是否存在redis预设gbId
StreamPushItem transform = streamPushService.transform(mediaItem);
StreamPushItem pushInDb = streamPushService.getPush(mediaItem.getApp(), mediaItem.getStream());
transform.setPushIng(true);
transform.setPushIng(mediaItem.isRegist());
transform.setUpdateTime(DateUtil.getNow());
transform.setPushTime(DateUtil.getNow());
transform.setSelf(userSetting.getServerId().equals(mediaItem.getSeverId()));
if (pushInDb == null) {
transform.setCreateTime(DateUtil.getNow());
streamPushMapper.add(transform);
@ -128,34 +82,6 @@ public class ZLMMediaListManager {
return transform;
}
public void updateMedia(MediaServerItem mediaServerItem, String app, String streamId) {
//使用异步更新推流
zlmresTfulUtils.getMediaList(mediaServerItem, app, streamId, "rtmp", json->{
if (json == null) {
return;
}
String dataStr = json.getString("data");
Integer code = json.getInteger("code");
Map<String, StreamPushItem> result = new HashMap<>();
List<StreamPushItem> streamPushItems = null;
if (code == 0 ) {
if (dataStr != null) {
streamPushItems = streamPushService.handleJSON(dataStr, mediaServerItem);
}
}else {
logger.warn("更新视频流失败错误code " + code);
}
if (streamPushItems != null && streamPushItems.size() == 1) {
storager.updateMedia(streamPushItems.get(0));
}
});
}
public int removeMedia(String app, String streamId) {
// 查找是否关联了国标, 关联了不删除, 置为离线
GbStream gbStream = gbStreamMapper.selectOne(app, streamId);

View File

@ -108,6 +108,13 @@ public class StreamPushItem extends GbStream implements Comparable<StreamPushIte
*/
private boolean pushIng;
/**
*
*/
private boolean self;
public String getVhost() {
return vhost;
}
@ -290,5 +297,13 @@ public class StreamPushItem extends GbStream implements Comparable<StreamPushIte
public void setPushIng(boolean pushIng) {
this.pushIng = pushIng;
}
public boolean isSelf() {
return self;
}
public void setSelf(boolean self) {
this.self = self;
}
}

View File

@ -633,7 +633,6 @@ public class MediaServerServiceImpl implements IMediaServerService {
MediaServerItem mediaServerItem = getOne(mediaServerId);
if (mediaServerItem == null) {
// zlm连接重试
logger.warn("[更新ZLM 保活信息]失败,未找到流媒体信息");
return;
}

View File

@ -46,7 +46,7 @@ public class RedisStreamMsgListener implements MessageListener {
JSONObject steamMsgJson = JSON.parseObject(message.getBody(), JSONObject.class);
if (steamMsgJson == null) {
logger.warn("[REDIS的ALARM通知]消息解析失败");
logger.warn("[收到 redis 流变化]消息解析失败");
return;
}
String serverId = steamMsgJson.getString("serverId");
@ -55,7 +55,7 @@ public class RedisStreamMsgListener implements MessageListener {
// 自己发送的消息忽略即可
return;
}
logger.info("[REDIS通知] 流变化 {}", new String(message.getBody()));
logger.info("[收到 redis 流变化] {}", new String(message.getBody()));
String app = steamMsgJson.getString("app");
String stream = steamMsgJson.getString("stream");
boolean register = steamMsgJson.getBoolean("register");

View File

@ -299,18 +299,6 @@ public interface IVideoManagerStorage {
*/
List<DeviceChannel> queryGbStreamListInPlatform(String platformId);
/**
*
* @param streamPushItems
*/
void updateMediaList(List<StreamPushItem> streamPushItems);
/**
*
* @param streamPushItem
*/
void updateMedia(StreamPushItem streamPushItem);
/**
*
* @param app
@ -318,21 +306,6 @@ public interface IVideoManagerStorage {
*/
int removeMedia(String app, String stream);
/**
*
* @param app
* @param stream
* @return
*/
StreamPushItem getMedia(String app, String stream);
/**
*
*/
void clearMediaList();
/**
* 线
*/

View File

@ -15,9 +15,10 @@ import java.util.List;
public interface StreamPushMapper {
@Insert("INSERT INTO stream_push (app, stream, totalReaderCount, originType, originTypeStr, " +
"pushTime, aliveSecond, mediaServerId, serverId, updateTime, createTime, pushIng) VALUES" +
"pushTime, aliveSecond, mediaServerId, serverId, updateTime, createTime, pushIng, self) VALUES" +
"('${app}', '${stream}', '${totalReaderCount}', '${originType}', '${originTypeStr}', " +
"'${pushTime}', '${aliveSecond}', '${mediaServerId}' , '${serverId}' , '${updateTime}' , '${createTime}', ${pushIng} )")
"'${pushTime}', '${aliveSecond}', '${mediaServerId}' , '${serverId}' , '${updateTime}' , '${createTime}', " +
"${pushIng}, ${self} )")
int add(StreamPushItem streamPushItem);
@ -31,6 +32,7 @@ public interface StreamPushMapper {
"<if test=\"pushTime != null\">, pushTime='${pushTime}'</if>" +
"<if test=\"aliveSecond != null\">, aliveSecond='${aliveSecond}'</if>" +
"<if test=\"pushIng != null\">, pushIng=${pushIng}</if>" +
"<if test=\"self != null\">, self=${self}</if>" +
"WHERE app=#{app} AND stream=#{stream}"+
" </script>"})
int update(StreamPushItem streamPushItem);
@ -119,7 +121,7 @@ public interface StreamPushMapper {
@Update("UPDATE stream_push " +
"SET pushIng=${pushIng} " +
"WHERE app=#{app} AND stream=#{stream}")
int updatePushStatus(String app, String stream, boolean status);
int updatePushStatus(String app, String stream, boolean pushIng);
@Update("UPDATE stream_push " +
"SET status=#{status} " +

View File

@ -479,7 +479,7 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage {
@Override
public void sendStreamChangeMsg(String type, JSONObject jsonObject) {
String key = VideoManagerConstants.WVP_MSG_STREAM_CHANGE_PREFIX + type;
logger.debug("[redis 流变化事件] {}: {}", key, jsonObject.toString());
logger.info("[redis 流变化事件] {}: {}", key, jsonObject.toString());
redis.convertAndSend(key, jsonObject);
}

View File

@ -635,47 +635,11 @@ public class VideoManagerStorageImpl implements IVideoManagerStorage {
return streamProxyMapper.selectOne(app, stream);
}
@Override
public void updateMediaList(List<StreamPushItem> streamPushItems) {
if (streamPushItems == null || streamPushItems.size() == 0) {
return;
}
logger.info("updateMediaList: " + streamPushItems.size());
streamPushMapper.addAll(streamPushItems);
// TODO 待优化
for (int i = 0; i < streamPushItems.size(); i++) {
int onlineResult = mediaOnline(streamPushItems.get(i).getApp(), streamPushItems.get(i).getStream());
if (onlineResult > 0) {
// 发送上线通知
eventPublisher.catalogEventPublishForStream(null, streamPushItems.get(i), CatalogEvent.ON);
}
}
}
@Override
public void updateMedia(StreamPushItem streamPushItem) {
streamPushMapper.del(streamPushItem.getApp(), streamPushItem.getStream());
streamPushMapper.add(streamPushItem);
mediaOffline(streamPushItem.getApp(), streamPushItem.getStream());
}
@Override
public int removeMedia(String app, String stream) {
return streamPushMapper.del(app, stream);
}
@Override
public StreamPushItem getMedia(String app, String stream) {
return streamPushMapper.selectOne(app, stream);
}
@Override
public void clearMediaList() {
streamPushMapper.clear();
}
@Override
public int mediaOffline(String app, String stream) {
GbStream gbStream = gbStreamMapper.selectOne(app, stream);
@ -683,7 +647,7 @@ public class VideoManagerStorageImpl implements IVideoManagerStorage {
if ("proxy".equals(gbStream.getStreamType())) {
result = streamProxyMapper.updateStatus(app, stream, false);
}else {
result = streamPushMapper.updateStatus(app, stream, false);
result = streamPushMapper.updatePushStatus(app, stream, false);
}
return result;
}
@ -695,7 +659,7 @@ public class VideoManagerStorageImpl implements IVideoManagerStorage {
if ("proxy".equals(gbStream.getStreamType())) {
result = streamProxyMapper.updateStatus(app, stream, true);
}else {
result = streamPushMapper.updateStatus(app, stream, true);
result = streamPushMapper.updatePushStatus(app, stream, true);
}
return result;
}

View File

@ -4,6 +4,7 @@ import com.alibaba.excel.EasyExcel;
import com.alibaba.excel.ExcelReader;
import com.alibaba.excel.read.metadata.ReadSheet;
import com.genersoft.iot.vmp.common.StreamInfo;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.conf.security.SecurityUtils;
import com.genersoft.iot.vmp.conf.security.dto.LoginUser;
import com.genersoft.iot.vmp.gb28181.bean.GbStream;
@ -63,6 +64,9 @@ public class StreamPushController {
@Autowired
private IMediaService mediaService;
@Autowired
private UserSetting userSetting;
@ApiOperation("推流列表查询")
@ApiImplicitParams({
@ApiImplicitParam(name="page", value = "当前页", required = true, dataTypeClass = Integer.class),
@ -269,18 +273,23 @@ public class StreamPushController {
if (userInfo!= null) {
authority = true;
}
StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStreamWithCheck(app, stream, mediaServerId, authority);
WVPResult<StreamInfo> result = new WVPResult<>();
StreamPushItem push = streamPushService.getPush(app, stream);
if (!userSetting.getServerId().equals(push.getServerId()) ) {
result.setCode(-1);
result.setMsg("来自其他平台的推流信息");
return result;
}
StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStreamWithCheck(app, stream, mediaServerId, authority);
if (streamInfo != null){
result.setCode(0);
result.setMsg("scccess");
result.setData(streamInfo);
}else {
result.setCode(-1);
result.setMsg("fail");
result.setMsg("获取播放地址失败");
}
return result;
}

View File

@ -69,7 +69,7 @@
</el-table-column>
<el-table-column label="本平台推流" min-width="100">
<template slot-scope="scope">
{{scope.row.pushIng && !!!scope.row.serverId ? '是' : '否' }}
{{scope.row.pushIng && !!scope.row.self ? '是' : '否' }}
</template>
</el-table-column>
@ -202,10 +202,15 @@ export default {
}
}).then(function (res) {
that.getListLoading = false;
if (res.data.code === 0 ) {
that.$refs.devicePlayer.openDialog("streamPlay", null, null, {
streamInfo: res.data.data,
hasAudio: true
});
}else {
that.$message.error(res.data.msg);
}
}).catch(function (error) {
console.error(error);
that.getListLoading = false;