优化拉流代理的播放

结构优化
648540858 2024-01-22 00:02:50 +08:00
parent d9d8aaca6e
commit 7cb4c5c1df
11 changed files with 90 additions and 84 deletions

View File

@ -330,9 +330,11 @@ public class ZLMHttpHookListener {
public HookResult onStreamChanged(@RequestBody OnStreamChangedHookParam param) {
if (param.isRegist()) {
logger.info("[ZLM HOOK] 流注册, {}->{}->{}/{}", param.getMediaServerId(), param.getSchema(), param.getApp(), param.getStream());
logger.info("[ZLM HOOK] 流注册, {}->{}->{}->{}/{}", param.getMediaServerId(), param.getSchema(),
OriginType.values()[param.getOriginType()].getType(), param.getApp(), param.getStream());
} else {
logger.info("[ZLM HOOK] 流注销, {}->{}->{}/{}", param.getMediaServerId(), param.getSchema(), param.getApp(), param.getStream());
logger.info("[ZLM HOOK] 流注销, {}->{}->{}->{}/{}", param.getMediaServerId(), param.getSchema(),
OriginType.values()[param.getOriginType()].getType(), param.getApp(), param.getStream());
}
@ -373,18 +375,25 @@ public class ZLMHttpHookListener {
} else {
mediaServerService.removeCount(param.getMediaServerId());
}
// 设置拉流代理上线/离线
streamProxyService.updateStatus(param.isRegist(), param.getApp(), param.getStream());
if ("rtp".equals(param.getApp()) && !param.isRegist()) {
if ("rtp".equals(param.getApp())) {
if (!param.isRegist()) {
InviteInfo inviteInfo = inviteStreamService.getInviteInfoByStream(null, param.getStream());
if (inviteInfo != null && (inviteInfo.getType() == InviteSessionType.PLAY || inviteInfo.getType() == InviteSessionType.PLAYBACK)) {
inviteStreamService.removeInviteInfo(inviteInfo);
storager.stopPlay(inviteInfo.getDeviceId(), inviteInfo.getChannelId());
}
}
} else {
if (!"rtp".equals(param.getApp())) {
String type = OriginType.values()[param.getOriginType()].getType();
String type;
if (param.getOriginType() == 0) {
// 源类型为unknown则主动查询类型
type = mediaService.getStreamType(param.getApp(), param.getStream());
}else {
type = OriginType.values()[param.getOriginType()].getType();
}
if (type != null) {
if (param.isRegist()) {
StreamAuthorityInfo streamAuthorityInfo = redisCatchStorage.getStreamAuthorityInfo(
param.getApp(), param.getStream());
@ -415,12 +424,12 @@ public class ZLMHttpHookListener {
if ("PUSH".equalsIgnoreCase(type)) {
// 冗余数据,自己系统中自用
redisCatchStorage.removePushListItem(param.getApp(), param.getStream(), param.getMediaServerId());
zlmMediaListManager.removePush(param);
streamPushService.offline(param.getApp(), param.getStream());
}
}
zlmMediaListManager.streamOffline(param.getApp(), param.getStream());
}
if (type != null) {
// 设置拉流代理拉流状态
streamProxyService.updatePullingStatus(param.isRegist(), param.getApp(), param.getStream());
// 发送流变化redis消息
JSONObject jsonObject = new JSONObject();
jsonObject.put("serverId", userSetting.getServerId());
@ -431,7 +440,6 @@ public class ZLMHttpHookListener {
redisCatchStorage.sendStreamChangeMsg(type, jsonObject);
}
}
}
if (!param.isRegist()) {
List<SendRtpItem> sendRtpItems = redisCatchStorage.querySendRTPServerByStream(param.getStream());
if (sendRtpItems.size() > 0) {
@ -556,12 +564,6 @@ public class ZLMHttpHookListener {
}
return ret;
}
// TODO 推流具有主动性,暂时不做处理
// StreamPushItem streamPushItem = streamPushService.getPush(app, streamId);
// if (streamPushItem != null) {
// // TODO 发送停止
//
// }
}
return ret;
}

View File

@ -77,22 +77,6 @@ public class ZLMMediaListManager {
return transform;
}
public void removePush(OnStreamChangedHookParam param) {
StreamPush pushInDb = streamPushService.getPush(param.getApp(), param.getStream());
if (pushInDb == null) {
return;
}
if (ObjectUtils.isEmpty(pushInDb.getGbId())) {
streamPushService.remove(pushInDb.getId());
}else {
List<Integer> onlinePushers = new ArrayList<>();
onlinePushers.add(pushInDb.getCommonGbChannelId());
commonGbChannelService.offlineForList(onlinePushers);
streamPushService.offline(pushInDb.getId());
}
}
public void sendStreamEvent(String app, String stream, String mediaServerId) {
MediaServerItem mediaServerItem = mediaServerService.getOne(mediaServerId);
// 查看推流状态

View File

@ -51,4 +51,6 @@ public interface IMediaService {
* zlm
*/
boolean closeStream(MediaServerItem mediaInfo, String app, String stream);
String getStreamType(String app, String stream);
}

View File

@ -32,7 +32,7 @@ public interface IStreamProxyService {
/**
*
*/
void updateStatus(boolean status, String app, String stream);
void updatePullingStatus(boolean status, String app, String stream);

View File

@ -116,5 +116,5 @@ public interface IStreamPushService {
/**
* 线
*/
void offline(Integer id);
void offline(String app, String stream);
}

View File

@ -9,10 +9,14 @@ import com.genersoft.iot.vmp.conf.MediaConfig;
import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.media.zlm.dto.StreamAuthorityInfo;
import com.genersoft.iot.vmp.media.zlm.dto.StreamProxy;
import com.genersoft.iot.vmp.media.zlm.dto.StreamPush;
import com.genersoft.iot.vmp.service.IMediaServerService;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
import com.genersoft.iot.vmp.service.IMediaService;
import com.genersoft.iot.vmp.storager.dao.StreamProxyMapper;
import com.genersoft.iot.vmp.storager.dao.StreamPushMapper;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.util.ObjectUtils;
@ -26,7 +30,10 @@ public class MediaServiceImpl implements IMediaService {
private IRedisCatchStorage redisCatchStorage;
@Autowired
private IVideoManagerStorage storager;
private StreamProxyMapper streamProxyMapper;
@Autowired
private StreamPushMapper streamPushMapper;
@Autowired
private IMediaServerService mediaServerService;
@ -121,4 +128,19 @@ public class MediaServiceImpl implements IMediaService {
JSONObject jsonObject = zlmresTfulUtils.closeStreams(mediaInfo, app, stream);
return jsonObject != null && jsonObject.getInteger("code") == 0 && jsonObject.getInteger("count_hit") > 0;
}
@Override
public String getStreamType(String app, String stream) {
String result = null;
StreamProxy streamProxy = streamProxyMapper.selectOne(app, stream);
if (streamProxy != null) {
result = "PULL";
}else {
StreamPush streamPush = streamPushMapper.selectOneByAppAndStream(app, stream);
if (streamPush != null) {
result = "PUSH";
}
}
return result;
}
}

View File

@ -87,6 +87,9 @@ public class StreamProxyPlayServiceImpl implements IStreamProxyPlayService {
}
return;
}else {
if (streamProxy.getStreamKey() != null) {
zlmresTfulUtils.delStreamProxy(mediaInfo, streamProxy.getStreamKey());
}
redisCatchStorage.removeStream(streamChangedHookParam.getMediaServerId(), "PULL", streamChangedHookParam.getApp(),
streamChangedHookParam.getStream());
}
@ -94,10 +97,12 @@ public class StreamProxyPlayServiceImpl implements IStreamProxyPlayService {
redisCatchStorage.removeStream(streamChangedHookParam.getMediaServerId(), "PULL", streamChangedHookParam.getApp(),
streamChangedHookParam.getStream());
}
} else {
// 查询zlm是否含有这个流存在则停止
boolean ready = mediaService.isReady(mediaInfo, streamProxy.getApp(), streamProxy.getStream());
if (ready) {
mediaService.closeStream(mediaInfo, streamProxy.getApp(), streamProxy.getStream());
}
if (streamProxy.getStreamKey() != null) {
zlmresTfulUtils.delStreamProxy(mediaInfo, streamProxy.getStreamKey());
}
String delayTalkKey = UUID.randomUUID().toString();
@ -140,6 +145,7 @@ public class StreamProxyPlayServiceImpl implements IStreamProxyPlayService {
return;
}
if (result.getInteger("code") != 0) {
logger.info("[开始拉流代理] 失败: {}", result.getString("msg") );
hookSubscribe.removeSubscribe(hookSubscribeForStreamChange);
dynamicTask.stop(delayTalkKey);
if (callback != null) {

View File

@ -518,20 +518,6 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
syncPullStream(mediaServerId);
}
@Transactional
public void updateStatusById(StreamProxy streamProxy, boolean status) {
streamProxyMapper.updatePullingById(streamProxy.getId(), status);
if (streamProxy.getCommonGbChannelId() > 0) {
List<Integer> ids = new ArrayList<>();
ids.add(streamProxy.getCommonGbChannelId());
if (status) {
commonGbChannelService.onlineForList(ids);
}else {
commonGbChannelService.offlineForList(ids);
}
}
}
@Override
public void zlmServerOffline(String mediaServerId) {
// 移除开启了无人观看自动移除的流
@ -571,12 +557,8 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
@Override
@Transactional
public void updateStatus(boolean status, String app, String stream) {
StreamProxy streamProxy = streamProxyMapper.selectOne(app, stream);
if (streamProxy == null) {
return;
}
updateStatusById(streamProxy, status);
public void updatePullingStatus(boolean status, String app, String stream) {
streamProxyMapper.updatePullingById(app, stream, status);
}
private void syncPullStream(String mediaServerId){

View File

@ -529,14 +529,17 @@ public class StreamPushServiceImpl implements IStreamPushService {
}
@Override
public void offline(Integer id) {
if (id == null) {
public void offline(String app, String stream) {
if (app == null || stream == null) {
return;
}
StreamPush streamPush = streamPushMapper.selectOne(id);
StreamPush streamPush = streamPushMapper.selectOneByAppAndStream(app, stream);
if (streamPush == null) {
return;
}
if (streamPush.getGbId() == null) {
streamPushMapper.del(streamPush.getId());
}else {
if (userSetting.isUsePushingAsStatus()) {
if (streamPush.getCommonGbChannelId() > 0) {
List<Integer> pushers = new ArrayList<>();
@ -546,4 +549,5 @@ public class StreamPushServiceImpl implements IStreamPushService {
streamPushMapper.offlineById(streamPush.getId());
}
}
}
}

View File

@ -68,8 +68,8 @@ public interface StreamProxyMapper {
void updatePullingStatusByMediaServerId(@Param("mediaServerId") String mediaServerId, @Param("pulling") boolean pulling);
@Update("UPDATE wvp_stream_proxy " +
"SET pulling=#{pulling} " +
"WHERE id=#{id}")
int updatePullingById(@Param("id") int id, @Param("pulling") boolean pulling);
"WHERE app=#{app} and stream=#{stream}")
int updatePullingById(@Param("app") String app, @Param("stream") String stream, @Param("pulling") boolean pulling);
@Delete("DELETE FROM wvp_stream_proxy WHERE enable_remove_none_reader=true AND media_server_id=#{mediaServerId}")
void deleteAutoRemoveItemByMediaServerId(String mediaServerId);

View File

@ -479,11 +479,14 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage {
@Override
public OnStreamChangedHookParam getProxyStreamInfo(String app, String streamId, String mediaServerId) {
if (mediaServerId == null) {
mediaServerId = "*";
}
String scanKey = VideoManagerConstants.WVP_SERVER_STREAM_PREFIX + userSetting.getServerId() + "_PULL_" + app + "_" + streamId + "_" + mediaServerId;
OnStreamChangedHookParam result = null;
List<Object> keys = RedisUtil.scan(redisTemplate, scanKey);
if (keys.size() > 0) {
if (!keys.isEmpty()) {
String key = (String) keys.get(0);
result = JsonUtil.redisJsonToObject(redisTemplate, key, OnStreamChangedHookParam.class);
}
@ -679,4 +682,5 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage {
}
}
}