增加RPC录像回放和录像下载

master
648540858 2024-12-12 17:46:51 +08:00
parent 26feb7a9b4
commit 5a63e7f958
8 changed files with 167 additions and 19 deletions

View File

@ -4,7 +4,7 @@ import lombok.Data;
// 从INVITE消息中解析需要的信息 // 从INVITE消息中解析需要的信息
@Data @Data
public class InviteInfo { public class InviteMessageInfo {
private String requesterId; private String requesterId;
private String targetChannelId; private String targetChannelId;
private String sourceChannelId; private String sourceChannelId;

View File

@ -3,13 +3,13 @@ package com.genersoft.iot.vmp.gb28181.service;
import com.genersoft.iot.vmp.common.InviteSessionType; import com.genersoft.iot.vmp.common.InviteSessionType;
import com.genersoft.iot.vmp.common.StreamInfo; import com.genersoft.iot.vmp.common.StreamInfo;
import com.genersoft.iot.vmp.gb28181.bean.CommonGBChannel; import com.genersoft.iot.vmp.gb28181.bean.CommonGBChannel;
import com.genersoft.iot.vmp.gb28181.bean.InviteInfo; import com.genersoft.iot.vmp.gb28181.bean.InviteMessageInfo;
import com.genersoft.iot.vmp.gb28181.bean.Platform; import com.genersoft.iot.vmp.gb28181.bean.Platform;
import com.genersoft.iot.vmp.service.bean.ErrorCallback; import com.genersoft.iot.vmp.service.bean.ErrorCallback;
public interface IGbChannelPlayService { public interface IGbChannelPlayService {
void start(CommonGBChannel channel, InviteInfo inviteInfo, Platform platform, ErrorCallback<StreamInfo> callback); void start(CommonGBChannel channel, InviteMessageInfo inviteInfo, Platform platform, ErrorCallback<StreamInfo> callback);
void stopPlay(InviteSessionType type, CommonGBChannel channel, String stream); void stopPlay(InviteSessionType type, CommonGBChannel channel, String stream);

View File

@ -3,7 +3,7 @@ package com.genersoft.iot.vmp.gb28181.service.impl;
import com.genersoft.iot.vmp.common.InviteSessionType; import com.genersoft.iot.vmp.common.InviteSessionType;
import com.genersoft.iot.vmp.common.StreamInfo; import com.genersoft.iot.vmp.common.StreamInfo;
import com.genersoft.iot.vmp.gb28181.bean.CommonGBChannel; import com.genersoft.iot.vmp.gb28181.bean.CommonGBChannel;
import com.genersoft.iot.vmp.gb28181.bean.InviteInfo; import com.genersoft.iot.vmp.gb28181.bean.InviteMessageInfo;
import com.genersoft.iot.vmp.gb28181.bean.Platform; import com.genersoft.iot.vmp.gb28181.bean.Platform;
import com.genersoft.iot.vmp.gb28181.bean.PlayException; import com.genersoft.iot.vmp.gb28181.bean.PlayException;
import com.genersoft.iot.vmp.gb28181.service.IGbChannelPlayService; import com.genersoft.iot.vmp.gb28181.service.IGbChannelPlayService;
@ -33,7 +33,7 @@ public class GbChannelPlayServiceImpl implements IGbChannelPlayService {
@Override @Override
public void start(CommonGBChannel channel, InviteInfo inviteInfo, Platform platform, ErrorCallback<StreamInfo> callback) { public void start(CommonGBChannel channel, InviteMessageInfo inviteInfo, Platform platform, ErrorCallback<StreamInfo> callback) {
if (channel == null || inviteInfo == null || callback == null) { if (channel == null || inviteInfo == null || callback == null) {
log.warn("[通用通道点播] 参数异常, channel: {}, inviteInfo: {}, callback: {}", channel != null, inviteInfo != null, callback != null); log.warn("[通用通道点播] 参数异常, channel: {}, inviteInfo: {}, callback: {}", channel != null, inviteInfo != null, callback != null);
throw new PlayException(Response.SERVER_INTERNAL_ERROR, "server internal error"); throw new PlayException(Response.SERVER_INTERNAL_ERROR, "server internal error");

View File

@ -295,14 +295,14 @@ public class PlayServiceImpl implements IPlayService {
// 判断设备是否属于当前平台, 如果不属于则发起自动调用 // 判断设备是否属于当前平台, 如果不属于则发起自动调用
if (!userSetting.getServerId().equals(device.getServerId())) { if (!userSetting.getServerId().equals(device.getServerId())) {
redisRpcPlayService.play(device.getServerId(), channel.getId(), callback); redisRpcPlayService.play(device.getServerId(), channel.getId(), callback);
}else { return;
MediaServer mediaServerItem = getNewMediaServerItem(device);
if (mediaServerItem == null) {
log.warn("[点播] 未找到可用的zlm deviceId: {},channelId:{}", device.getDeviceId(), channel.getDeviceId());
throw new ControllerException(ErrorCode.ERROR100.getCode(), "未找到可用的zlm");
}
play(mediaServerItem, device, channel, null, callback);
} }
MediaServer mediaServerItem = getNewMediaServerItem(device);
if (mediaServerItem == null) {
log.warn("[点播] 未找到可用的zlm deviceId: {},channelId:{}", device.getDeviceId(), channel.getDeviceId());
throw new ControllerException(ErrorCode.ERROR100.getCode(), "未找到可用的zlm");
}
play(mediaServerItem, device, channel, null, callback);
} }
@Override @Override
@ -746,6 +746,11 @@ public class PlayServiceImpl implements IPlayService {
if (channel == null) { if (channel == null) {
throw new ControllerException(ErrorCode.ERROR100.getCode(), "通道不存在"); throw new ControllerException(ErrorCode.ERROR100.getCode(), "通道不存在");
} }
if (!userSetting.getServerId().equals(device.getServerId())) {
redisRpcPlayService.playback(device.getServerId(), channel.getId(), startTime, endTime, callback);
return;
}
MediaServer newMediaServerItem = getNewMediaServerItem(device); MediaServer newMediaServerItem = getNewMediaServerItem(device);
if (newMediaServerItem == null) { if (newMediaServerItem == null) {
throw new ControllerException(ErrorCode.ERROR100.getCode(), "未找到可用的节点"); throw new ControllerException(ErrorCode.ERROR100.getCode(), "未找到可用的节点");
@ -954,6 +959,11 @@ public class PlayServiceImpl implements IPlayService {
@Override @Override
public void download(Device device, DeviceChannel channel, String startTime, String endTime, int downloadSpeed, ErrorCallback<StreamInfo> callback) { public void download(Device device, DeviceChannel channel, String startTime, String endTime, int downloadSpeed, ErrorCallback<StreamInfo> callback) {
if (!userSetting.getServerId().equals(device.getServerId())) {
redisRpcPlayService.download(device.getServerId(), channel.getId(), startTime, endTime, downloadSpeed, callback);
return;
}
MediaServer newMediaServerItem = this.getNewMediaServerItem(device); MediaServer newMediaServerItem = this.getNewMediaServerItem(device);
if (newMediaServerItem == null) { if (newMediaServerItem == null) {
callback.run(InviteErrorCode.ERROR_FOR_ASSIST_NOT_READY.getCode(), callback.run(InviteErrorCode.ERROR_FOR_ASSIST_NOT_READY.getCode(),

View File

@ -121,7 +121,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
SIPRequest request = (SIPRequest)evt.getRequest(); SIPRequest request = (SIPRequest)evt.getRequest();
try { try {
InviteInfo inviteInfo = decode(evt); InviteMessageInfo inviteInfo = decode(evt);
// 查询请求是否来自上级平台\设备 // 查询请求是否来自上级平台\设备
Platform platform = platformService.queryPlatformByServerGBId(inviteInfo.getRequesterId()); Platform platform = platformService.queryPlatformByServerGBId(inviteInfo.getRequesterId());
@ -247,9 +247,9 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
} }
} }
private InviteInfo decode(RequestEvent evt) throws SdpException { private InviteMessageInfo decode(RequestEvent evt) throws SdpException {
InviteInfo inviteInfo = new InviteInfo(); InviteMessageInfo inviteInfo = new InviteMessageInfo();
SIPRequest request = (SIPRequest)evt.getRequest(); SIPRequest request = (SIPRequest)evt.getRequest();
String[] channelIdArrayFromSub = SipUtils.getChannelIdFromRequest(request); String[] channelIdArrayFromSub = SipUtils.getChannelIdFromRequest(request);
@ -349,7 +349,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
} }
private String createSendSdp(SendRtpInfo sendRtpItem, InviteInfo inviteInfo, String sdpIp) { private String createSendSdp(SendRtpInfo sendRtpItem, InviteMessageInfo inviteInfo, String sdpIp) {
StringBuilder content = new StringBuilder(200); StringBuilder content = new StringBuilder(200);
content.append("v=0\r\n"); content.append("v=0\r\n");
content.append("o=" + inviteInfo.getTargetChannelId() + " 0 0 IN IP4 " + sdpIp + "\r\n"); content.append("o=" + inviteInfo.getTargetChannelId() + " 0 0 IN IP4 " + sdpIp + "\r\n");
@ -393,7 +393,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
} }
} }
public void inviteFromDeviceHandle(SIPRequest request, InviteInfo inviteInfo) { public void inviteFromDeviceHandle(SIPRequest request, InviteMessageInfo inviteInfo) {
if (inviteInfo.getSourceChannelId() == null) { if (inviteInfo.getSourceChannelId() == null) {
log.warn("来自设备的Invite请求无法从请求信息中确定请求来自的通道已忽略requesterId {}", inviteInfo.getRequesterId()); log.warn("来自设备的Invite请求无法从请求信息中确定请求来自的通道已忽略requesterId {}", inviteInfo.getRequesterId());

View File

@ -10,4 +10,8 @@ public interface IRedisRpcPlayService {
void play(String serverId, Integer channelId, ErrorCallback<StreamInfo> callback); void play(String serverId, Integer channelId, ErrorCallback<StreamInfo> callback);
void stop(String serverId, InviteSessionType type, int channelId, String stream); void stop(String serverId, InviteSessionType type, int channelId, String stream);
void playback(String serverId, Integer channelId, String startTime, String endTime, ErrorCallback<StreamInfo> callback);
void download(String serverId, Integer id, String startTime, String endTime, int downloadSpeed, ErrorCallback<StreamInfo> callback);
} }

View File

@ -1,6 +1,7 @@
package com.genersoft.iot.vmp.service.redisMsg.control; package com.genersoft.iot.vmp.service.redisMsg.control;
import com.alibaba.fastjson2.JSONObject; import com.alibaba.fastjson2.JSONObject;
import com.genersoft.iot.vmp.common.InviteInfo;
import com.genersoft.iot.vmp.common.InviteSessionType; import com.genersoft.iot.vmp.common.InviteSessionType;
import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.conf.redis.RedisRpcConfig; import com.genersoft.iot.vmp.conf.redis.RedisRpcConfig;
@ -8,12 +9,15 @@ import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcMessage;
import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcRequest; import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcRequest;
import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcResponse; import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcResponse;
import com.genersoft.iot.vmp.gb28181.bean.CommonGBChannel; import com.genersoft.iot.vmp.gb28181.bean.CommonGBChannel;
import com.genersoft.iot.vmp.gb28181.bean.InviteMessageInfo;
import com.genersoft.iot.vmp.gb28181.service.IGbChannelPlayService; import com.genersoft.iot.vmp.gb28181.service.IGbChannelPlayService;
import com.genersoft.iot.vmp.gb28181.service.IGbChannelService; import com.genersoft.iot.vmp.gb28181.service.IGbChannelService;
import com.genersoft.iot.vmp.service.bean.InviteErrorCode; import com.genersoft.iot.vmp.service.bean.InviteErrorCode;
import com.genersoft.iot.vmp.service.redisMsg.dto.RedisRpcController; import com.genersoft.iot.vmp.service.redisMsg.dto.RedisRpcController;
import com.genersoft.iot.vmp.service.redisMsg.dto.RedisRpcMapping; import com.genersoft.iot.vmp.service.redisMsg.dto.RedisRpcMapping;
import com.genersoft.iot.vmp.service.redisMsg.dto.RpcController; import com.genersoft.iot.vmp.service.redisMsg.dto.RpcController;
import com.genersoft.iot.vmp.utils.DateUtil;
import lombok.Data;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.core.RedisTemplate;
@ -68,7 +72,9 @@ public class RedisRpcChannelPlayController extends RpcController {
return response; return response;
} }
channelPlayService.play(channel, null, (code, msg, data) ->{ InviteMessageInfo inviteInfo = new InviteMessageInfo();
inviteInfo.setSessionName("Play");
channelPlayService.start(channel, inviteInfo, null, (code, msg, data) ->{
if (code == InviteErrorCode.SUCCESS.getCode()) { if (code == InviteErrorCode.SUCCESS.getCode()) {
response.setStatusCode(Response.OK); response.setStatusCode(Response.OK);
response.setBody(data); response.setBody(data);
@ -87,7 +93,6 @@ public class RedisRpcChannelPlayController extends RpcController {
*/ */
@RedisRpcMapping("stop") @RedisRpcMapping("stop")
public RedisRpcResponse stop(RedisRpcRequest request) { public RedisRpcResponse stop(RedisRpcRequest request) {
System.out.println(request.getParam().toString());
JSONObject jsonObject = JSONObject.parseObject(request.getParam().toString()); JSONObject jsonObject = JSONObject.parseObject(request.getParam().toString());
RedisRpcResponse response = request.getResponse(); RedisRpcResponse response = request.getResponse();
@ -119,4 +124,88 @@ public class RedisRpcChannelPlayController extends RpcController {
return response; return response;
} }
/**
*
*/
@RedisRpcMapping("playback")
public RedisRpcResponse playbackChannel(RedisRpcRequest request) {
JSONObject paramJson = JSONObject.parseObject(request.getParam().toString());
int channelId = paramJson.getIntValue("channelId");
String startTime = paramJson.getString("startTime");
String endTime = paramJson.getString("endTime");
RedisRpcResponse response = request.getResponse();
if (channelId <= 0) {
response.setStatusCode(Response.BAD_REQUEST);
response.setBody("param error");
return response;
}
// 获取对应的设备和通道信息
CommonGBChannel channel = channelService.getOne(channelId);
if (channel == null) {
response.setStatusCode(Response.BAD_REQUEST);
response.setBody("param error");
return response;
}
InviteMessageInfo inviteInfo = new InviteMessageInfo();
inviteInfo.setSessionName("Playback");
inviteInfo.setStartTime(DateUtil.yyyy_MM_dd_HH_mm_ssToTimestamp(startTime));
inviteInfo.setStopTime(DateUtil.yyyy_MM_dd_HH_mm_ssToTimestamp(endTime));
channelPlayService.start(channel, inviteInfo, null, (code, msg, data) ->{
if (code == InviteErrorCode.SUCCESS.getCode()) {
response.setStatusCode(Response.OK);
response.setBody(data);
}else {
response.setStatusCode(code);
}
// 手动发送结果
sendResponse(response);
});
return null;
}
/**
*
*/
@RedisRpcMapping("download")
public RedisRpcResponse downloadChannel(RedisRpcRequest request) {
JSONObject paramJson = JSONObject.parseObject(request.getParam().toString());
int channelId = paramJson.getIntValue("channelId");
String startTime = paramJson.getString("startTime");
String endTime = paramJson.getString("endTime");
int downloadSpeed = paramJson.getIntValue("downloadSpeed");
RedisRpcResponse response = request.getResponse();
if (channelId <= 0) {
response.setStatusCode(Response.BAD_REQUEST);
response.setBody("param error");
return response;
}
// 获取对应的设备和通道信息
CommonGBChannel channel = channelService.getOne(channelId);
if (channel == null) {
response.setStatusCode(Response.BAD_REQUEST);
response.setBody("param error");
return response;
}
InviteMessageInfo inviteInfo = new InviteMessageInfo();
inviteInfo.setSessionName("Download");
inviteInfo.setStartTime(DateUtil.yyyy_MM_dd_HH_mm_ssToTimestamp(startTime));
inviteInfo.setStopTime(DateUtil.yyyy_MM_dd_HH_mm_ssToTimestamp(endTime));
inviteInfo.setDownloadSpeed(downloadSpeed + "");
channelPlayService.start(channel, inviteInfo, null, (code, msg, data) ->{
if (code == InviteErrorCode.SUCCESS.getCode()) {
response.setStatusCode(Response.OK);
response.setBody(data);
}else {
response.setStatusCode(code);
}
// 手动发送结果
sendResponse(response);
});
return null;
}
} }

View File

@ -74,5 +74,50 @@ public class RedisRpcPlayServiceImpl implements IRedisRpcPlayService {
} }
} }
} }
@Override
public void playback(String serverId, Integer channelId, String startTime, String endTime, ErrorCallback<StreamInfo> callback) {
JSONObject jsonObject = new JSONObject();
jsonObject.put("channelId", channelId);
jsonObject.put("startTime", startTime);
jsonObject.put("endTime", endTime);
RedisRpcRequest request = buildRequest("channel/playback", jsonObject.toString());
request.setToId(serverId);
RedisRpcResponse response = redisRpcConfig.request(request, userSetting.getPlayTimeout(), TimeUnit.MILLISECONDS);
if (response == null) {
callback.run(ErrorCode.ERROR100.getCode(), ErrorCode.ERROR100.getMsg(), null);
}else {
if (response.getStatusCode() == Response.OK) {
StreamInfo streamInfo = JSON.parseObject(response.getBody().toString(), StreamInfo.class);
callback.run(InviteErrorCode.SUCCESS.getCode(), InviteErrorCode.SUCCESS.getMsg(), streamInfo);
}else {
callback.run(response.getStatusCode(), response.getBody().toString(), null);
}
}
}
@Override
public void download(String serverId, Integer channelId, String startTime, String endTime, int downloadSpeed, ErrorCallback<StreamInfo> callback) {
JSONObject jsonObject = new JSONObject();
jsonObject.put("channelId", channelId);
jsonObject.put("startTime", startTime);
jsonObject.put("endTime", endTime);
jsonObject.put("downloadSpeed", downloadSpeed);
RedisRpcRequest request = buildRequest("channel/download", jsonObject.toString());
request.setToId(serverId);
RedisRpcResponse response = redisRpcConfig.request(request, userSetting.getPlayTimeout(), TimeUnit.MILLISECONDS);
if (response == null) {
callback.run(ErrorCode.ERROR100.getCode(), ErrorCode.ERROR100.getMsg(), null);
}else {
if (response.getStatusCode() == Response.OK) {
StreamInfo streamInfo = JSON.parseObject(response.getBody().toString(), StreamInfo.class);
callback.run(InviteErrorCode.SUCCESS.getCode(), InviteErrorCode.SUCCESS.getMsg(), streamInfo);
}else {
callback.run(response.getStatusCode(), response.getBody().toString(), null);
}
}
}
} }