新增RPC录像回放暂停和恢复

master
648540858 2024-12-13 17:28:38 +08:00
parent 381d92e047
commit e16ca179a6
9 changed files with 135 additions and 13 deletions

View File

@ -41,6 +41,8 @@ import java.net.URL;
import java.text.ParseException; import java.text.ParseException;
import java.util.UUID; import java.util.UUID;
import static sun.audio.AudioDevice.device;
/** /**
* @author lin * @author lin
*/ */
@ -166,8 +168,7 @@ public class PlaybackController {
@Parameter(name = "streamId", description = "回放流ID", required = true) @Parameter(name = "streamId", description = "回放流ID", required = true)
@GetMapping("/pause/{streamId}") @GetMapping("/pause/{streamId}")
public void playPause(@PathVariable String streamId) { public void playPause(@PathVariable String streamId) {
log.info("playPause: "+streamId); log.info("[回放暂停] streamId: {}", streamId);
try { try {
playService.pauseRtp(streamId); playService.pauseRtp(streamId);
} catch (ServiceException e) { } catch (ServiceException e) {

View File

@ -5,7 +5,6 @@ import com.genersoft.iot.vmp.common.StreamInfo;
import com.genersoft.iot.vmp.gb28181.bean.CommonGBChannel; import com.genersoft.iot.vmp.gb28181.bean.CommonGBChannel;
import com.genersoft.iot.vmp.gb28181.bean.InviteMessageInfo; import com.genersoft.iot.vmp.gb28181.bean.InviteMessageInfo;
import com.genersoft.iot.vmp.gb28181.bean.Platform; import com.genersoft.iot.vmp.gb28181.bean.Platform;
import com.genersoft.iot.vmp.gb28181.bean.RecordInfo;
import com.genersoft.iot.vmp.service.bean.ErrorCallback; import com.genersoft.iot.vmp.service.bean.ErrorCallback;
public interface IGbChannelPlayService { public interface IGbChannelPlayService {
@ -27,4 +26,8 @@ public interface IGbChannelPlayService {
void playPush(CommonGBChannel channel, String platformDeviceId, String platformName, ErrorCallback<StreamInfo> callback); void playPush(CommonGBChannel channel, String platformDeviceId, String platformName, ErrorCallback<StreamInfo> callback);
void stopPlayPush(CommonGBChannel channel); void stopPlayPush(CommonGBChannel channel);
void pauseRtp(String streamId);
void resumeRtp(String streamId);
} }

View File

@ -89,4 +89,5 @@ public interface IGbChannelService {
PageInfo<CommonGBChannel> queryList(int page, int count, String query, Boolean online, Boolean hasRecordPlan, Integer channelType); PageInfo<CommonGBChannel> queryList(int page, int count, String query, Boolean online, Boolean hasRecordPlan, Integer channelType);
void queryRecordInfo(CommonGBChannel channel, String startTime, String endTime, ErrorCallback<RecordInfo> callback); void queryRecordInfo(CommonGBChannel channel, String startTime, String endTime, ErrorCallback<RecordInfo> callback);
} }

View File

@ -2,6 +2,7 @@ package com.genersoft.iot.vmp.gb28181.service.impl;
import com.genersoft.iot.vmp.common.InviteSessionType; import com.genersoft.iot.vmp.common.InviteSessionType;
import com.genersoft.iot.vmp.common.StreamInfo; import com.genersoft.iot.vmp.common.StreamInfo;
import com.genersoft.iot.vmp.conf.exception.ServiceException;
import com.genersoft.iot.vmp.gb28181.bean.*; import com.genersoft.iot.vmp.gb28181.bean.*;
import com.genersoft.iot.vmp.gb28181.service.IGbChannelPlayService; import com.genersoft.iot.vmp.gb28181.service.IGbChannelPlayService;
import com.genersoft.iot.vmp.gb28181.service.IPlayService; import com.genersoft.iot.vmp.gb28181.service.IPlayService;
@ -13,7 +14,10 @@ import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import javax.sip.InvalidArgumentException;
import javax.sip.SipException;
import javax.sip.message.Response; import javax.sip.message.Response;
import java.text.ParseException;
@Slf4j @Slf4j
@Service @Service
@ -208,6 +212,20 @@ public class GbChannelPlayServiceImpl implements IGbChannelPlayService {
} }
} }
@Override
public void pauseRtp(String streamId) {
try {
deviceChannelPlayService.pauseRtp(streamId);
} catch (ServiceException | InvalidArgumentException | ParseException | SipException ignore) {}
}
@Override
public void resumeRtp(String streamId) {
try {
deviceChannelPlayService.resumeRtp(streamId);
} catch (ServiceException | InvalidArgumentException | ParseException | SipException ignore) {}
}
private void downloadGbDeviceChannel(CommonGBChannel channel, Long startTime, Long stopTime, Integer downloadSpeed, private void downloadGbDeviceChannel(CommonGBChannel channel, Long startTime, Long stopTime, Integer downloadSpeed,
ErrorCallback<StreamInfo> callback){ ErrorCallback<StreamInfo> callback){
try { try {

View File

@ -9,14 +9,12 @@ import com.genersoft.iot.vmp.gb28181.dao.RegionMapper;
import com.genersoft.iot.vmp.gb28181.event.EventPublisher; import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent; import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent;
import com.genersoft.iot.vmp.gb28181.service.IDeviceChannelService; import com.genersoft.iot.vmp.gb28181.service.IDeviceChannelService;
import com.genersoft.iot.vmp.gb28181.service.IDeviceService;
import com.genersoft.iot.vmp.gb28181.service.IGbChannelService; import com.genersoft.iot.vmp.gb28181.service.IGbChannelService;
import com.genersoft.iot.vmp.gb28181.service.IPlatformChannelService; import com.genersoft.iot.vmp.gb28181.service.IPlatformChannelService;
import com.genersoft.iot.vmp.service.bean.ErrorCallback; import com.genersoft.iot.vmp.service.bean.ErrorCallback;
import com.genersoft.iot.vmp.streamPush.bean.StreamPush; import com.genersoft.iot.vmp.streamPush.bean.StreamPush;
import com.genersoft.iot.vmp.utils.DateUtil; import com.genersoft.iot.vmp.utils.DateUtil;
import com.genersoft.iot.vmp.vmanager.bean.ErrorCode; import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
import com.genersoft.iot.vmp.vmanager.bean.WVPResult;
import com.github.pagehelper.PageHelper; import com.github.pagehelper.PageHelper;
import com.github.pagehelper.PageInfo; import com.github.pagehelper.PageInfo;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
@ -27,7 +25,10 @@ import org.springframework.util.Assert;
import org.springframework.util.ObjectUtils; import org.springframework.util.ObjectUtils;
import javax.sip.message.Response; import javax.sip.message.Response;
import java.util.*; import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
@Slf4j @Slf4j
@Service @Service

View File

@ -1093,6 +1093,8 @@ public class PlayServiceImpl implements IPlayService {
@Override @Override
public StreamInfo getDownLoadInfo(Device device, DeviceChannel channel, String stream) { public StreamInfo getDownLoadInfo(Device device, DeviceChannel channel, String stream) {
InviteInfo inviteInfo = inviteStreamService.getInviteInfo(InviteSessionType.DOWNLOAD, channel.getId(), stream); InviteInfo inviteInfo = inviteStreamService.getInviteInfo(InviteSessionType.DOWNLOAD, channel.getId(), stream);
if (inviteInfo == null) { if (inviteInfo == null) {
String app = "rtp"; String app = "rtp";
@ -1369,11 +1371,20 @@ public class PlayServiceImpl implements IPlayService {
@Override @Override
public void pauseRtp(String streamId) throws ServiceException, InvalidArgumentException, ParseException, SipException { public void pauseRtp(String streamId) throws ServiceException, InvalidArgumentException, ParseException, SipException {
InviteInfo inviteInfo = inviteStreamService.getInviteInfoByStream(InviteSessionType.PLAYBACK, streamId); InviteInfo inviteInfo = inviteStreamService.getInviteInfoByStream(InviteSessionType.PLAYBACK, streamId);
if (null == inviteInfo || inviteInfo.getStreamInfo() == null) { if (null == inviteInfo || inviteInfo.getStreamInfo() == null) {
log.warn("streamId不存在!"); throw new ControllerException(ErrorCode.ERROR100.getCode(), "streamId不存在");
throw new ServiceException("streamId不存在");
} }
Device device = deviceService.getDeviceByDeviceId(inviteInfo.getDeviceId());
if (device == null) {
throw new ControllerException(ErrorCode.ERROR100.getCode(), "设备不存在");
}
if (!userSetting.getServerId().equals(device.getServerId())) {
redisRpcPlayService.pauseRtp(device.getServerId(), streamId);
return;
}
inviteInfo.getStreamInfo().setPause(true); inviteInfo.getStreamInfo().setPause(true);
inviteStreamService.updateInviteInfo(inviteInfo); inviteStreamService.updateInviteInfo(inviteInfo);
MediaServer mediaServerItem = inviteInfo.getStreamInfo().getMediaServer(); MediaServer mediaServerItem = inviteInfo.getStreamInfo().getMediaServer();
@ -1391,7 +1402,7 @@ public class PlayServiceImpl implements IPlayService {
if (!result) { if (!result) {
throw new ServiceException("暂停RTP接收失败"); throw new ServiceException("暂停RTP接收失败");
} }
Device device = deviceService.getDeviceByDeviceId(inviteInfo.getDeviceId());
DeviceChannel channel = deviceChannelService.getOneById(inviteInfo.getChannelId()); DeviceChannel channel = deviceChannelService.getOneById(inviteInfo.getChannelId());
cmder.playPauseCmd(device, channel, inviteInfo.getStreamInfo()); cmder.playPauseCmd(device, channel, inviteInfo.getStreamInfo());
} }
@ -1400,9 +1411,17 @@ public class PlayServiceImpl implements IPlayService {
public void resumeRtp(String streamId) throws ServiceException, InvalidArgumentException, ParseException, SipException { public void resumeRtp(String streamId) throws ServiceException, InvalidArgumentException, ParseException, SipException {
InviteInfo inviteInfo = inviteStreamService.getInviteInfoByStream(InviteSessionType.PLAYBACK, streamId); InviteInfo inviteInfo = inviteStreamService.getInviteInfoByStream(InviteSessionType.PLAYBACK, streamId);
if (null == inviteInfo || inviteInfo.getStreamInfo() == null) { if (null == inviteInfo || inviteInfo.getStreamInfo() == null) {
log.warn("streamId不存在!"); throw new ControllerException(ErrorCode.ERROR100.getCode(), "streamId不存在");
throw new ServiceException("streamId不存在");
} }
Device device = deviceService.getDeviceByDeviceId(inviteInfo.getDeviceId());
if (device == null) {
throw new ControllerException(ErrorCode.ERROR100.getCode(), "设备不存在");
}
if (!userSetting.getServerId().equals(device.getServerId())) {
redisRpcPlayService.resumeRtp(device.getServerId(), streamId);
return;
}
inviteInfo.getStreamInfo().setPause(false); inviteInfo.getStreamInfo().setPause(false);
inviteStreamService.updateInviteInfo(inviteInfo); inviteStreamService.updateInviteInfo(inviteInfo);
MediaServer mediaServerItem = inviteInfo.getStreamInfo().getMediaServer(); MediaServer mediaServerItem = inviteInfo.getStreamInfo().getMediaServer();
@ -1410,7 +1429,6 @@ public class PlayServiceImpl implements IPlayService {
log.warn("mediaServer 不存在!"); log.warn("mediaServer 不存在!");
throw new ServiceException("mediaServer不存在"); throw new ServiceException("mediaServer不存在");
} }
// zlm 暂停RTP超时检查
// 使用zlm中的流ID // 使用zlm中的流ID
String streamKey = inviteInfo.getStream(); String streamKey = inviteInfo.getStream();
if (!mediaServerItem.isRtpEnable()) { if (!mediaServerItem.isRtpEnable()) {
@ -1420,7 +1438,6 @@ public class PlayServiceImpl implements IPlayService {
if (!result) { if (!result) {
throw new ServiceException("继续RTP接收失败"); throw new ServiceException("继续RTP接收失败");
} }
Device device = deviceService.getDeviceByDeviceId(inviteInfo.getDeviceId());
DeviceChannel channel = deviceChannelService.getOneById(inviteInfo.getChannelId()); DeviceChannel channel = deviceChannelService.getOneById(inviteInfo.getChannelId());
cmder.playResumeCmd(device, channel, inviteInfo.getStreamInfo()); cmder.playResumeCmd(device, channel, inviteInfo.getStreamInfo());
} }

View File

@ -18,4 +18,7 @@ public interface IRedisRpcPlayService {
void queryRecordInfo(String serverId, Integer channelId, String startTime, String endTime, ErrorCallback<RecordInfo> callback); void queryRecordInfo(String serverId, Integer channelId, String startTime, String endTime, ErrorCallback<RecordInfo> callback);
void pauseRtp(String serverId, String streamId);
void resumeRtp(String serverId, String streamId);
} }

View File

@ -131,6 +131,56 @@ public class RedisRpcChannelPlayController extends RpcController {
return null; return null;
} }
/**
*
*/
@RedisRpcMapping("pauseRtp")
public RedisRpcResponse pauseRtp(RedisRpcRequest request) {
String streamId = request.getParam().toString();
RedisRpcResponse response = request.getResponse();
if (streamId == null) {
response.setStatusCode(ErrorCode.ERROR400.getCode());
response.setBody("param error");
return response;
}
try {
channelPlayService.pauseRtp(streamId);
response.setStatusCode(ErrorCode.SUCCESS.getCode());
}catch (ControllerException e) {
response.setStatusCode(ErrorCode.ERROR100.getCode());
response.setBody(e.getMessage());
}
return response;
}
/**
*
*/
@RedisRpcMapping("resumeRtp")
public RedisRpcResponse resumeRtp(RedisRpcRequest request) {
String streamId = request.getParam().toString();
RedisRpcResponse response = request.getResponse();
if (streamId == null) {
response.setStatusCode(ErrorCode.ERROR400.getCode());
response.setBody("param error");
return response;
}
try {
channelPlayService.resumeRtp(streamId);
response.setStatusCode(ErrorCode.SUCCESS.getCode());
}catch (ControllerException e) {
response.setStatusCode(ErrorCode.ERROR100.getCode());
response.setBody(e.getMessage());
}
return response;
}
/** /**
* *

View File

@ -118,6 +118,34 @@ public class RedisRpcPlayServiceImpl implements IRedisRpcPlayService {
} }
} }
@Override
public void pauseRtp(String serverId, String streamId) {
RedisRpcRequest request = buildRequest("channel/pauseRtp", streamId);
request.setToId(serverId);
RedisRpcResponse response = redisRpcConfig.request(request, 5, TimeUnit.SECONDS);
if (response == null) {
log.info("[RPC 暂停回放] 失败, streamId: {}", streamId);
}else {
if (response.getStatusCode() != ErrorCode.SUCCESS.getCode()) {
log.info("[RPC 暂停回放] 失败, {}, streamId: {}", response.getBody(), streamId);
}
}
}
@Override
public void resumeRtp(String serverId, String streamId) {
RedisRpcRequest request = buildRequest("channel/resumeRtp", streamId);
request.setToId(serverId);
RedisRpcResponse response = redisRpcConfig.request(request, 5, TimeUnit.SECONDS);
if (response == null) {
log.info("[RPC 恢复回放] 失败, streamId: {}", streamId);
}else {
if (response.getStatusCode() != ErrorCode.SUCCESS.getCode()) {
log.info("[RPC 恢复回放] 失败, {}, streamId: {}", response.getBody(), streamId);
}
}
}
@Override @Override
public void download(String serverId, Integer channelId, String startTime, String endTime, int downloadSpeed, ErrorCallback<StreamInfo> callback) { public void download(String serverId, Integer channelId, String startTime, String endTime, int downloadSpeed, ErrorCallback<StreamInfo> callback) {