临时提交

pull/1642/head
648540858 2024-09-25 15:42:04 +08:00
parent abcaa32cec
commit 76d3a5d8e4
8 changed files with 41 additions and 40 deletions

View File

@ -137,6 +137,7 @@ public class GbChannelPlayServiceImpl implements IGbChannelPlayService {
}catch (PlayException e) { }catch (PlayException e) {
callback.run(e.getCode(), e.getMsg(), null); callback.run(e.getCode(), e.getMsg(), null);
}catch (Exception e) { }catch (Exception e) {
log.error("[点播推流通道失败] 通道: {}({})", channel.getGbName(), channel.getGbDeviceId(), e);
callback.run(Response.BUSY_HERE, "busy here", null); callback.run(Response.BUSY_HERE, "busy here", null);
} }
} }

View File

@ -115,17 +115,23 @@ public class AckRequestProcessor extends SIPRequestProcessorParent implements In
if (parentPlatform != null) { if (parentPlatform != null) {
DeviceChannel deviceChannel = deviceChannelService.getOneForSourceById(sendRtpItem.getChannelId()); DeviceChannel deviceChannel = deviceChannelService.getOneForSourceById(sendRtpItem.getChannelId());
if (!userSetting.getServerId().equals(sendRtpItem.getServerId())) { if (!userSetting.getServerId().equals(sendRtpItem.getServerId())) {
WVPResult wvpResult = redisRpcService.startSendRtp(sendRtpItem.getChannelId(), sendRtpItem); WVPResult wvpResult = redisRpcService.startSendRtp(callIdHeader.getCallId(), sendRtpItem);
if (wvpResult.getCode() == 0) { if (wvpResult.getCode() == 0) {
redisCatchStorage.sendPlatformStartPlayMsg(sendRtpItem, deviceChannel, parentPlatform); redisCatchStorage.sendPlatformStartPlayMsg(sendRtpItem, deviceChannel, parentPlatform);
} }
} else { } else {
try { try {
if (sendRtpItem.isTcpActive()) { if (mediaInfo != null) {
mediaServerService.startSendRtpPassive(mediaInfo,sendRtpItem, null); if (sendRtpItem.isTcpActive()) {
} else { mediaServerService.startSendRtpPassive(mediaInfo,sendRtpItem, null);
mediaServerService.startSendRtp(mediaInfo, sendRtpItem); } else {
mediaServerService.startSendRtp(mediaInfo, sendRtpItem);
}
}else {
// mediaInfo 在集群的其他wvp里
} }
redisCatchStorage.sendPlatformStartPlayMsg(sendRtpItem, deviceChannel, parentPlatform); redisCatchStorage.sendPlatformStartPlayMsg(sendRtpItem, deviceChannel, parentPlatform);
}catch (ControllerException e) { }catch (ControllerException e) {
log.error("RTP推流失败: {}", e.getMessage()); log.error("RTP推流失败: {}", e.getMessage());

View File

@ -189,7 +189,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
sendRtpItem.setStatus(1); sendRtpItem.setStatus(1);
sendRtpItem.setCallId(inviteInfo.getCallId()); sendRtpItem.setCallId(inviteInfo.getCallId());
sendRtpItem.setPlayType("Play".equalsIgnoreCase(inviteInfo.getSessionName()) ? InviteStreamType.PLAY : InviteStreamType.PLAYBACK); sendRtpItem.setPlayType("Play".equalsIgnoreCase(inviteInfo.getSessionName()) ? InviteStreamType.PLAY : InviteStreamType.PLAYBACK);
sendRtpItem.setServerId(streamInfo.getServerId());
sendRtpServerService.update(sendRtpItem); sendRtpServerService.update(sendRtpItem);
String sdpIp = streamInfo.getMediaServer().getSdpIp(); String sdpIp = streamInfo.getMediaServer().getSdpIp();
if (!ObjectUtils.isEmpty(platform.getSendStreamIp())) { if (!ObjectUtils.isEmpty(platform.getSendStreamIp())) {

View File

@ -2,14 +2,12 @@ package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.query.
import com.genersoft.iot.vmp.gb28181.bean.CommonGBChannel; import com.genersoft.iot.vmp.gb28181.bean.CommonGBChannel;
import com.genersoft.iot.vmp.gb28181.bean.Device; import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel;
import com.genersoft.iot.vmp.gb28181.bean.Platform; import com.genersoft.iot.vmp.gb28181.bean.Platform;
import com.genersoft.iot.vmp.gb28181.service.IGbChannelService; import com.genersoft.iot.vmp.gb28181.service.IGbChannelService;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform; import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent; import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.IMessageHandler; import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.IMessageHandler;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.query.QueryMessageHandler; import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.query.QueryMessageHandler;
import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
import gov.nist.javax.sip.message.SIPRequest; import gov.nist.javax.sip.message.SIPRequest;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.dom4j.Element; import org.dom4j.Element;

View File

@ -7,17 +7,17 @@ import com.genersoft.iot.vmp.vmanager.bean.WVPResult;
public interface IRedisRpcService { public interface IRedisRpcService {
SendRtpInfo getSendRtpItem(Integer sendRtpChannelId); SendRtpInfo getSendRtpItem(String callId);
WVPResult startSendRtp(Integer sendRtpChannelId, SendRtpInfo sendRtpItem); WVPResult startSendRtp(String callId, SendRtpInfo sendRtpItem);
WVPResult stopSendRtp(Integer sendRtpChannelId); WVPResult stopSendRtp(String callId);
long waitePushStreamOnline(SendRtpInfo sendRtpItem, CommonCallback<Integer> callback); long waitePushStreamOnline(SendRtpInfo sendRtpItem, CommonCallback<Integer> callback);
void stopWaitePushStreamOnline(SendRtpInfo sendRtpItem); void stopWaitePushStreamOnline(SendRtpInfo sendRtpItem);
void rtpSendStopped(Integer sendRtpChannelId); void rtpSendStopped(String callId);
void removeCallback(long key); void removeCallback(long key);

View File

@ -9,7 +9,6 @@ import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcMessage;
import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcRequest; import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcRequest;
import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcResponse; import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcResponse;
import com.genersoft.iot.vmp.gb28181.bean.SendRtpInfo; import com.genersoft.iot.vmp.gb28181.bean.SendRtpInfo;
import com.genersoft.iot.vmp.gb28181.service.IPlatformService;
import com.genersoft.iot.vmp.gb28181.session.SSRCFactory; import com.genersoft.iot.vmp.gb28181.session.SSRCFactory;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform; import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform;
import com.genersoft.iot.vmp.media.bean.MediaInfo; import com.genersoft.iot.vmp.media.bean.MediaInfo;
@ -20,7 +19,6 @@ import com.genersoft.iot.vmp.media.event.hook.HookType;
import com.genersoft.iot.vmp.media.service.IMediaServerService; import com.genersoft.iot.vmp.media.service.IMediaServerService;
import com.genersoft.iot.vmp.media.zlm.SendRtpPortManager; import com.genersoft.iot.vmp.media.zlm.SendRtpPortManager;
import com.genersoft.iot.vmp.service.ISendRtpServerService; import com.genersoft.iot.vmp.service.ISendRtpServerService;
import com.genersoft.iot.vmp.service.impl.SendRtpServerServiceImpl;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.vmanager.bean.ErrorCode; import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
import com.genersoft.iot.vmp.vmanager.bean.WVPResult; import com.genersoft.iot.vmp.vmanager.bean.WVPResult;
@ -61,9 +59,6 @@ public class RedisRpcController {
@Autowired @Autowired
private ISIPCommanderForPlatform commanderFroPlatform; private ISIPCommanderForPlatform commanderFroPlatform;
@Autowired
private IPlatformService platformService;
@Autowired @Autowired
private ISendRtpServerService sendRtpServerService; private ISendRtpServerService sendRtpServerService;
@ -72,10 +67,10 @@ public class RedisRpcController {
* *
*/ */
public RedisRpcResponse getSendRtpItem(RedisRpcRequest request) { public RedisRpcResponse getSendRtpItem(RedisRpcRequest request) {
String sendRtpItemKey = request.getParam().toString(); String callId = request.getParam().toString();
SendRtpInfo sendRtpItem = (SendRtpInfo) redisTemplate.opsForValue().get(sendRtpItemKey); SendRtpInfo sendRtpItem = sendRtpServerService.queryByCallId(callId);
if (sendRtpItem == null) { if (sendRtpItem == null) {
log.info("[redis-rpc] 获取发流的信息, 未找到redis中的发流信息 key{}", sendRtpItemKey); log.info("[redis-rpc] 获取发流的信息, 未找到redis中的发流信息 callId{}", callId);
RedisRpcResponse response = request.getResponse(); RedisRpcResponse response = request.getResponse();
response.setStatusCode(200); response.setStatusCode(200);
return response; return response;
@ -104,10 +99,9 @@ public class RedisRpcController {
sendRtpItem.setSsrc(ssrc); sendRtpItem.setSsrc(ssrc);
} }
sendRtpServerService.update(sendRtpItem); sendRtpServerService.update(sendRtpItem);
redisTemplate.opsForValue().set(sendRtpItemKey, sendRtpItem);
RedisRpcResponse response = request.getResponse(); RedisRpcResponse response = request.getResponse();
response.setStatusCode(200); response.setStatusCode(200);
response.setBody(sendRtpItemKey); response.setBody(callId);
return response; return response;
} }
@ -228,12 +222,12 @@ public class RedisRpcController {
* *
*/ */
public RedisRpcResponse startSendRtp(RedisRpcRequest request) { public RedisRpcResponse startSendRtp(RedisRpcRequest request) {
String sendRtpItemKey = request.getParam().toString(); String callId = request.getParam().toString();
SendRtpInfo sendRtpItem = (SendRtpInfo) redisTemplate.opsForValue().get(sendRtpItemKey); SendRtpInfo sendRtpItem = sendRtpServerService.queryByCallId(callId);
RedisRpcResponse response = request.getResponse(); RedisRpcResponse response = request.getResponse();
response.setStatusCode(200); response.setStatusCode(200);
if (sendRtpItem == null) { if (sendRtpItem == null) {
log.info("[redis-rpc] 开始发流, 未找到redis中的发流信息 key{}", sendRtpItemKey); log.info("[redis-rpc] 开始发流, 未找到redis中的发流信息 callId{}", callId);
WVPResult wvpResult = WVPResult.fail(ErrorCode.ERROR100.getCode(), "未找到redis中的发流信息"); WVPResult wvpResult = WVPResult.fail(ErrorCode.ERROR100.getCode(), "未找到redis中的发流信息");
response.setBody(wvpResult); response.setBody(wvpResult);
return response; return response;
@ -271,12 +265,12 @@ public class RedisRpcController {
* *
*/ */
public RedisRpcResponse stopSendRtp(RedisRpcRequest request) { public RedisRpcResponse stopSendRtp(RedisRpcRequest request) {
String sendRtpItemKey = request.getParam().toString(); String callId = request.getParam().toString();
SendRtpInfo sendRtpItem = (SendRtpInfo) redisTemplate.opsForValue().get(sendRtpItemKey); SendRtpInfo sendRtpItem = sendRtpServerService.queryByCallId(callId);
RedisRpcResponse response = request.getResponse(); RedisRpcResponse response = request.getResponse();
response.setStatusCode(200); response.setStatusCode(200);
if (sendRtpItem == null) { if (sendRtpItem == null) {
log.info("[redis-rpc] 停止推流, 未找到redis中的发流信息 key{}", sendRtpItemKey); log.info("[redis-rpc] 停止推流, 未找到redis中的发流信息 key{}", callId);
WVPResult wvpResult = WVPResult.fail(ErrorCode.ERROR100.getCode(), "未找到redis中的发流信息"); WVPResult wvpResult = WVPResult.fail(ErrorCode.ERROR100.getCode(), "未找到redis中的发流信息");
response.setBody(wvpResult); response.setBody(wvpResult);
return response; return response;

View File

@ -57,8 +57,8 @@ public class RedisRpcServiceImpl implements IRedisRpcService {
} }
@Override @Override
public SendRtpInfo getSendRtpItem(Integer sendRtpItemKey) { public SendRtpInfo getSendRtpItem(String callId) {
RedisRpcRequest request = buildRequest("getSendRtpItem", sendRtpItemKey); RedisRpcRequest request = buildRequest("getSendRtpItem", callId);
RedisRpcResponse response = redisRpcConfig.request(request, 10); RedisRpcResponse response = redisRpcConfig.request(request, 10);
if (response.getBody() == null) { if (response.getBody() == null) {
return null; return null;
@ -67,23 +67,23 @@ public class RedisRpcServiceImpl implements IRedisRpcService {
} }
@Override @Override
public WVPResult startSendRtp(Integer sendRtpItemKey, SendRtpInfo sendRtpItem) { public WVPResult startSendRtp(String callId, SendRtpInfo sendRtpItem) {
log.info("[请求其他WVP] 开始推流wvp{} {}/{}", sendRtpItem.getServerId(), sendRtpItem.getApp(), sendRtpItem.getStream()); log.info("[请求其他WVP] 开始推流wvp{} {}/{}", sendRtpItem.getServerId(), sendRtpItem.getApp(), sendRtpItem.getStream());
RedisRpcRequest request = buildRequest("startSendRtp", sendRtpItemKey); RedisRpcRequest request = buildRequest("startSendRtp", callId);
request.setToId(sendRtpItem.getServerId()); request.setToId(sendRtpItem.getServerId());
RedisRpcResponse response = redisRpcConfig.request(request, 10); RedisRpcResponse response = redisRpcConfig.request(request, 10);
return JSON.parseObject(response.getBody().toString(), WVPResult.class); return JSON.parseObject(response.getBody().toString(), WVPResult.class);
} }
@Override @Override
public WVPResult stopSendRtp(Integer sendRtpItemKey) { public WVPResult stopSendRtp(String callId) {
SendRtpInfo sendRtpItem = (SendRtpInfo)redisTemplate.opsForValue().get(sendRtpItemKey); SendRtpInfo sendRtpItem = (SendRtpInfo)redisTemplate.opsForValue().get(callId);
if (sendRtpItem == null) { if (sendRtpItem == null) {
log.info("[请求其他WVP] 停止推流, 未找到redis中的发流信息 key{}", sendRtpItemKey); log.info("[请求其他WVP] 停止推流, 未找到redis中的发流信息 key{}", callId);
return WVPResult.fail(ErrorCode.ERROR100.getCode(), "未找到发流信息"); return WVPResult.fail(ErrorCode.ERROR100.getCode(), "未找到发流信息");
} }
log.info("[请求其他WVP] 停止推流wvp{} {}/{}", sendRtpItem.getServerId(), sendRtpItem.getApp(), sendRtpItem.getStream()); log.info("[请求其他WVP] 停止推流wvp{} {}/{}", sendRtpItem.getServerId(), sendRtpItem.getApp(), sendRtpItem.getStream());
RedisRpcRequest request = buildRequest("stopSendRtp", sendRtpItemKey); RedisRpcRequest request = buildRequest("stopSendRtp", callId);
request.setToId(sendRtpItem.getServerId()); request.setToId(sendRtpItem.getServerId());
RedisRpcResponse response = redisRpcConfig.request(request, 10); RedisRpcResponse response = redisRpcConfig.request(request, 10);
return JSON.parseObject(response.getBody().toString(), WVPResult.class); return JSON.parseObject(response.getBody().toString(), WVPResult.class);
@ -141,13 +141,13 @@ public class RedisRpcServiceImpl implements IRedisRpcService {
} }
@Override @Override
public void rtpSendStopped(Integer sendRtpItemKey) { public void rtpSendStopped(String callId) {
SendRtpInfo sendRtpItem = (SendRtpInfo)redisTemplate.opsForValue().get(sendRtpItemKey); SendRtpInfo sendRtpItem = (SendRtpInfo)redisTemplate.opsForValue().get(callId);
if (sendRtpItem == null) { if (sendRtpItem == null) {
log.info("[停止WVP监听流上线] 未找到redis中的发流信息 key{}", sendRtpItemKey); log.info("[停止WVP监听流上线] 未找到redis中的发流信息 key{}", callId);
return; return;
} }
RedisRpcRequest request = buildRequest("rtpSendStopped", sendRtpItemKey); RedisRpcRequest request = buildRequest("rtpSendStopped", callId);
request.setToId(sendRtpItem.getServerId()); request.setToId(sendRtpItem.getServerId());
redisRpcConfig.request(request, 10); redisRpcConfig.request(request, 10);
} }

View File

@ -58,6 +58,8 @@
</el-table-column> </el-table-column>
<el-table-column prop="gbDeviceId" label="国标编码" min-width="200" > <el-table-column prop="gbDeviceId" label="国标编码" min-width="200" >
</el-table-column> </el-table-column>
<el-table-column prop="gbDeviceId" label="位置" min-width="200" >
</el-table-column>
<el-table-column prop="mediaServerId" label="流媒体" min-width="200" > <el-table-column prop="mediaServerId" label="流媒体" min-width="200" >
</el-table-column> </el-table-column>
<el-table-column label="开始时间" min-width="200"> <el-table-column label="开始时间" min-width="200">