Merge branch 'wvp-28181-2.0' into main-dev

# Conflicts:
#	src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java
#	src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java
#	src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java
#	src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java
#	src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java
#	src/main/java/com/genersoft/iot/vmp/service/impl/PlatformServiceImpl.java
#	src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java
#	src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamCloseResponseListener.java
结构优化
648540858 2023-07-04 10:10:48 +08:00
commit becea82736
18 changed files with 392 additions and 106 deletions

View File

@ -158,6 +158,7 @@ public class VideoManagerConstants {
public static final String WVP_STREAM_GB_ID_PREFIX = "memberNo_"; public static final String WVP_STREAM_GB_ID_PREFIX = "memberNo_";
public static final String WVP_STREAM_GPS_MSG_PREFIX = "WVP_STREAM_GPS_MSG_"; public static final String WVP_STREAM_GPS_MSG_PREFIX = "WVP_STREAM_GPS_MSG_";
public static final String WVP_OTHER_SEND_RTP_INFO = "VMP_OTHER_SEND_RTP_INFO_";
/** /**
* Redis Const * Redis Const

View File

@ -18,6 +18,7 @@ import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory;
import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe; import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe;
import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory; import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory;
import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange; import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange;
import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.media.zlm.dto.hook.HookParam; import com.genersoft.iot.vmp.media.zlm.dto.hook.HookParam;
import com.genersoft.iot.vmp.service.IMediaServerService; import com.genersoft.iot.vmp.service.IMediaServerService;
@ -68,7 +69,7 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform {
private SipSubscribe sipSubscribe; private SipSubscribe sipSubscribe;
@Autowired @Autowired
private ZLMRTPServerFactory zlmrtpServerFactory; private ZLMServerFactory ZLMServerFactory;
@Autowired @Autowired
private SipLayer sipLayer; private SipLayer sipLayer;

View File

@ -9,7 +9,7 @@ import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver; import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.ISIPRequestProcessor; import com.genersoft.iot.vmp.gb28181.transmit.event.request.ISIPRequestProcessor;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent; import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent;
import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory; import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory;
import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe; import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.service.IDeviceService; import com.genersoft.iot.vmp.service.IDeviceService;
@ -66,7 +66,7 @@ public class AckRequestProcessor extends SIPRequestProcessorParent implements In
private IDeviceService deviceService; private IDeviceService deviceService;
@Autowired @Autowired
private ZLMRTPServerFactory zlmrtpServerFactory; private ZLMServerFactory ZLMServerFactory;
@Autowired @Autowired
private ZlmHttpHookSubscribe hookSubscribe; private ZlmHttpHookSubscribe hookSubscribe;

View File

@ -13,7 +13,7 @@ import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform; import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.ISIPRequestProcessor; import com.genersoft.iot.vmp.gb28181.transmit.event.request.ISIPRequestProcessor;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent; import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent;
import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory; import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.service.*; import com.genersoft.iot.vmp.service.*;
import com.genersoft.iot.vmp.service.bean.MessageForPushChannel; import com.genersoft.iot.vmp.service.bean.MessageForPushChannel;
@ -72,7 +72,7 @@ public class ByeRequestProcessor extends SIPRequestProcessorParent implements In
private IVideoManagerStorage storager; private IVideoManagerStorage storager;
@Autowired @Autowired
private ZLMRTPServerFactory zlmrtpServerFactory; private ZLMServerFactory ZLMServerFactory;
@Autowired @Autowired
private SSRCFactory ssrcFactory; private SSRCFactory ssrcFactory;
@ -125,7 +125,7 @@ public class ByeRequestProcessor extends SIPRequestProcessorParent implements In
MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId()); MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());
redisCatchStorage.deleteSendRTPServer(sendRtpItem.getPlatformId(), sendRtpItem.getChannelId(), redisCatchStorage.deleteSendRTPServer(sendRtpItem.getPlatformId(), sendRtpItem.getChannelId(),
callIdHeader.getCallId(), null); callIdHeader.getCallId(), null);
zlmrtpServerFactory.stopSendRtpStream(mediaInfo, param); ZLMServerFactory.stopSendRtpStream(mediaInfo, param);
if (sendRtpItem.getPlayType().equals(InviteStreamType.PUSH)) { if (sendRtpItem.getPlayType().equals(InviteStreamType.PUSH)) {
ParentPlatform platform = platformService.queryPlatformByServerGBId(sendRtpItem.getPlatformId()); ParentPlatform platform = platformService.queryPlatformByServerGBId(sendRtpItem.getPlatformId());
if (platform != null) { if (platform != null) {
@ -139,7 +139,7 @@ public class ByeRequestProcessor extends SIPRequestProcessorParent implements In
} }
} }
int totalReaderCount = zlmrtpServerFactory.totalReaderCount(mediaInfo, sendRtpItem.getApp(), streamId); int totalReaderCount = ZLMServerFactory.totalReaderCount(mediaInfo, sendRtpItem.getApp(), streamId);
if (totalReaderCount <= 0) { if (totalReaderCount <= 0) {
logger.info("[收到bye] {} 无其它观看者,通知设备停止推流", streamId); logger.info("[收到bye] {} 无其它观看者,通知设备停止推流", streamId);
if (sendRtpItem.getPlayType().equals(InviteStreamType.PLAY)) { if (sendRtpItem.getPlayType().equals(InviteStreamType.PLAY)) {

View File

@ -20,6 +20,7 @@ import com.genersoft.iot.vmp.gb28181.utils.SipUtils;
import com.genersoft.iot.vmp.media.zlm.ZLMMediaListManager; import com.genersoft.iot.vmp.media.zlm.ZLMMediaListManager;
import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils; import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils;
import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory; import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory;
import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory;
import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe; import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe;
import com.genersoft.iot.vmp.media.zlm.dto.*; import com.genersoft.iot.vmp.media.zlm.dto.*;
import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam; import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam;
@ -102,7 +103,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
private AudioBroadcastManager audioBroadcastManager; private AudioBroadcastManager audioBroadcastManager;
@Autowired @Autowired
private ZLMRTPServerFactory zlmrtpServerFactory; private ZLMServerFactory ZLMServerFactory;
@Autowired @Autowired
private IMediaServerService mediaServerService; private IMediaServerService mediaServerService;
@ -374,7 +375,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
streamTypeStr = "UDP"; streamTypeStr = "UDP";
} }
logger.info("[上级Invite] {}, 平台:{} 通道:{}, 收流地址:{}:{},收流方式:{}, ssrc{}", sessionName, username, channelId, addressStr, port, streamTypeStr, ssrc); logger.info("[上级Invite] {}, 平台:{} 通道:{}, 收流地址:{}:{},收流方式:{}, ssrc{}", sessionName, username, channelId, addressStr, port, streamTypeStr, ssrc);
SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(mediaServerItem, addressStr, port, ssrc, requesterId, SendRtpItem sendRtpItem = ZLMServerFactory.createSendRtpItem(mediaServerItem, addressStr, port, ssrc, requesterId,
device.getDeviceId(), channelId, mediaTransmissionTCP, platform.isRtcp()); device.getDeviceId(), channelId, mediaTransmissionTCP, platform.isRtcp());
if (tcpActive != null) { if (tcpActive != null) {
@ -577,10 +578,10 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
CallIdHeader callIdHeader, MediaServerItem mediaServerItem, CallIdHeader callIdHeader, MediaServerItem mediaServerItem,
int port, Boolean tcpActive, boolean mediaTransmissionTCP, int port, Boolean tcpActive, boolean mediaTransmissionTCP,
String channelId, String addressStr, String ssrc, String requesterId) { String channelId, String addressStr, String ssrc, String requesterId) {
Boolean streamReady = zlmrtpServerFactory.isStreamReady(mediaServerItem, gbStream.getApp(), gbStream.getStream()); Boolean streamReady = ZLMServerFactory.isStreamReady(mediaServerItem, gbStream.getApp(), gbStream.getStream());
if (streamReady != null && streamReady) { if (streamReady != null && streamReady) {
// 自平台内容 // 自平台内容
SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(mediaServerItem, addressStr, port, ssrc, requesterId, SendRtpItem sendRtpItem = ZLMServerFactory.createSendRtpItem(mediaServerItem, addressStr, port, ssrc, requesterId,
gbStream.getApp(), gbStream.getStream(), channelId, mediaTransmissionTCP, platform.isRtcp()); gbStream.getApp(), gbStream.getStream(), channelId, mediaTransmissionTCP, platform.isRtcp());
if (sendRtpItem == null) { if (sendRtpItem == null) {
@ -617,10 +618,10 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
String channelId, String addressStr, String ssrc, String requesterId) { String channelId, String addressStr, String ssrc, String requesterId) {
// 推流 // 推流
if (streamPushItem.isSelf()) { if (streamPushItem.isSelf()) {
Boolean streamReady = zlmrtpServerFactory.isStreamReady(mediaServerItem, gbStream.getApp(), gbStream.getStream()); Boolean streamReady = ZLMServerFactory.isStreamReady(mediaServerItem, gbStream.getApp(), gbStream.getStream());
if (streamReady != null && streamReady) { if (streamReady != null && streamReady) {
// 自平台内容 // 自平台内容
SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(mediaServerItem, addressStr, port, ssrc, requesterId, SendRtpItem sendRtpItem = ZLMServerFactory.createSendRtpItem(mediaServerItem, addressStr, port, ssrc, requesterId,
gbStream.getApp(), gbStream.getStream(), channelId, mediaTransmissionTCP, platform.isRtcp()); gbStream.getApp(), gbStream.getStream(), channelId, mediaTransmissionTCP, platform.isRtcp());
if (sendRtpItem == null) { if (sendRtpItem == null) {
@ -735,7 +736,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
mediaListManager.addChannelOnlineEventLister(gbStream.getApp(), gbStream.getStream(), (app, stream, serverId) -> { mediaListManager.addChannelOnlineEventLister(gbStream.getApp(), gbStream.getStream(), (app, stream, serverId) -> {
dynamicTask.stop(callIdHeader.getCallId()); dynamicTask.stop(callIdHeader.getCallId());
if (serverId.equals(userSetting.getServerId())) { if (serverId.equals(userSetting.getServerId())) {
SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(mediaServerItem, addressStr, finalPort, ssrc, requesterId, SendRtpItem sendRtpItem = ZLMServerFactory.createSendRtpItem(mediaServerItem, addressStr, finalPort, ssrc, requesterId,
app, stream, channelId, mediaTransmissionTCP, platform.isRtcp()); app, stream, channelId, mediaTransmissionTCP, platform.isRtcp());
if (sendRtpItem == null) { if (sendRtpItem == null) {

View File

@ -454,33 +454,35 @@ public class ZLMHttpHookListener {
GbStream gbStream = storager.getGbStream(param.getApp(), param.getStream()); GbStream gbStream = storager.getGbStream(param.getApp(), param.getStream());
if (gbStream != null) { if (gbStream != null) {
// eventPublisher.catalogEventPublishForStream(null, gbStream, CatalogEvent.OFF); // eventPublisher.catalogEventPublishForStream(null, gbStream, CatalogEvent.OFF);
}
zlmMediaListManager.removeMedia(param.getApp(), param.getStream());
} }
GbStream gbStream = storager.getGbStream(param.getApp(), param.getStream()); zlmMediaListManager.removeMedia(param.getApp(), param.getStream());
if (gbStream != null) { }
eventPublisher.catalogEventPublishForStream(null, gbStream, param.isRegist() ? CatalogEvent.ON : CatalogEvent.OFF); GbStream gbStream = storager.getGbStream(param.getApp(), param.getStream());
} if (gbStream != null) {
if (type != null) { if (userSetting.isUsePushingAsStatus()) {
// 发送流变化redis消息 eventPublisher.catalogEventPublishForStream(null, gbStream, param.isRegist()?CatalogEvent.ON:CatalogEvent.OFF);
JSONObject jsonObject = new JSONObject();
jsonObject.put("serverId", userSetting.getServerId());
jsonObject.put("app", param.getApp());
jsonObject.put("stream", param.getStream());
jsonObject.put("register", param.isRegist());
jsonObject.put("mediaServerId", param.getMediaServerId());
redisCatchStorage.sendStreamChangeMsg(type, jsonObject);
} }
} }
if (type != null) {
// 发送流变化redis消息
JSONObject jsonObject = new JSONObject();
jsonObject.put("serverId", userSetting.getServerId());
jsonObject.put("app", param.getApp());
jsonObject.put("stream", param.getStream());
jsonObject.put("register", param.isRegist());
jsonObject.put("mediaServerId", param.getMediaServerId());
redisCatchStorage.sendStreamChangeMsg(type, jsonObject);
}
} }
if (!param.isRegist()) { }
List<SendRtpItem> sendRtpItems = redisCatchStorage.querySendRTPServerByStream(param.getStream()); if (!param.isRegist()) {
if (sendRtpItems.size() > 0) { List<SendRtpItem> sendRtpItems = redisCatchStorage.querySendRTPServerByStream(param.getStream());
for (SendRtpItem sendRtpItem : sendRtpItems) { if (sendRtpItems.size() > 0) {
if (sendRtpItem != null && sendRtpItem.getApp().equals(param.getApp())) { for (SendRtpItem sendRtpItem : sendRtpItems) {
String platformId = sendRtpItem.getPlatformId(); if (sendRtpItem != null && sendRtpItem.getApp().equals(param.getApp())) {
ParentPlatform platform = storager.queryParentPlatByServerGBId(platformId); String platformId = sendRtpItem.getPlatformId();
Device device = deviceService.getDevice(platformId); ParentPlatform platform = storager.queryParentPlatByServerGBId(platformId);
Device device = deviceService.getDevice(platformId);
try { try {
if (platform != null) { if (platform != null) {

View File

@ -61,7 +61,7 @@ public class ZLMMediaListManager {
private UserSetting userSetting; private UserSetting userSetting;
@Autowired @Autowired
private ZLMRTPServerFactory zlmrtpServerFactory; private ZLMServerFactory ZLMServerFactory;
@Autowired @Autowired
private IMediaServerService mediaServerService; private IMediaServerService mediaServerService;
@ -97,7 +97,7 @@ public class ZLMMediaListManager {
public void sendStreamEvent(String app, String stream, String mediaServerId) { public void sendStreamEvent(String app, String stream, String mediaServerId) {
MediaServerItem mediaServerItem = mediaServerService.getOne(mediaServerId); MediaServerItem mediaServerItem = mediaServerService.getOne(mediaServerId);
// 查看推流状态 // 查看推流状态
Boolean streamReady = zlmrtpServerFactory.isStreamReady(mediaServerItem, app, stream); Boolean streamReady = ZLMServerFactory.isStreamReady(mediaServerItem, app, stream);
if (streamReady != null && streamReady) { if (streamReady != null && streamReady) {
ChannelOnlineEvent channelOnlineEventLister = getChannelOnlineEventLister(app, stream); ChannelOnlineEvent channelOnlineEventLister = getChannelOnlineEventLister(app, stream);
if (channelOnlineEventLister != null) { if (channelOnlineEventLister != null) {

View File

@ -15,7 +15,7 @@ import java.util.HashMap;
import java.util.Map; import java.util.Map;
@Component @Component
public class ZLMRTPServerFactory { public class ZLMServerFactory {
private Logger logger = LoggerFactory.getLogger("ZLMRTPServerFactory"); private Logger logger = LoggerFactory.getLogger("ZLMRTPServerFactory");

View File

@ -87,7 +87,7 @@ public class MediaServerServiceImpl implements IMediaServerService {
@Autowired @Autowired
private ZLMRTPServerFactory zlmrtpServerFactory; private ZLMServerFactory ZLMServerFactory;
@Autowired @Autowired
private EventPublisher publisher; private EventPublisher publisher;
@ -201,7 +201,7 @@ public class MediaServerServiceImpl implements IMediaServerService {
if (mediaServerItem == null) { if (mediaServerItem == null) {
return; return;
} }
zlmrtpServerFactory.closeRtpServer(mediaServerItem, streamId); ZLMServerFactory.closeRtpServer(mediaServerItem, streamId);
} }
@Override @Override
@ -210,7 +210,7 @@ public class MediaServerServiceImpl implements IMediaServerService {
callback.run(false); callback.run(false);
return; return;
} }
zlmrtpServerFactory.closeRtpServer(mediaServerItem, streamId, callback); ZLMServerFactory.closeRtpServer(mediaServerItem, streamId, callback);
} }
@Override @Override
@ -221,7 +221,7 @@ public class MediaServerServiceImpl implements IMediaServerService {
@Override @Override
public Boolean updateRtpServerSSRC(MediaServerItem mediaServerItem, String streamId, String ssrc) { public Boolean updateRtpServerSSRC(MediaServerItem mediaServerItem, String streamId, String ssrc) {
return zlmrtpServerFactory.updateRtpServerSSRC(mediaServerItem, streamId, ssrc); return ZLMServerFactory.updateRtpServerSSRC(mediaServerItem, streamId, ssrc);
} }
@Override @Override

View File

@ -14,6 +14,7 @@ import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory;
import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe; import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe;
import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory; import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory;
import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange; import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange;
import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam; import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam;
import com.genersoft.iot.vmp.service.IInviteStreamService; import com.genersoft.iot.vmp.service.IInviteStreamService;
@ -83,7 +84,7 @@ public class PlatformServiceImpl implements IPlatformService {
private DynamicTask dynamicTask; private DynamicTask dynamicTask;
@Autowired @Autowired
private ZLMRTPServerFactory zlmrtpServerFactory; private ZLMServerFactory ZLMServerFactory;
@Autowired @Autowired
private SubscribeHolder subscribeHolder; private SubscribeHolder subscribeHolder;
@ -364,7 +365,7 @@ public class PlatformServiceImpl implements IPlatformService {
param.put("vhost", "__defaultVhost__"); param.put("vhost", "__defaultVhost__");
param.put("app", sendRtpItem.getApp()); param.put("app", sendRtpItem.getApp());
param.put("stream", sendRtpItem.getStream()); param.put("stream", sendRtpItem.getStream());
zlmrtpServerFactory.stopSendRtpStream(mediaInfo, param); ZLMServerFactory.stopSendRtpStream(mediaInfo, param);
} }
} }
} }

View File

@ -20,6 +20,11 @@ import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander; import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander;
import com.genersoft.iot.vmp.gb28181.utils.SipUtils; import com.genersoft.iot.vmp.gb28181.utils.SipUtils;
import com.genersoft.iot.vmp.media.zlm.*; import com.genersoft.iot.vmp.media.zlm.*;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommanderFroPlatform;
import com.genersoft.iot.vmp.media.zlm.AssistRESTfulUtils;
import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils;
import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory;
import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe;
import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory; import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory;
import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange; import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
@ -95,6 +100,9 @@ public class PlayServiceImpl implements IPlayService {
@Autowired @Autowired
private ZLMRESTfulUtils zlmresTfulUtils; private ZLMRESTfulUtils zlmresTfulUtils;
@Autowired
private ZLMServerFactory ZLMServerFactory;
@Autowired @Autowired
private AssistRESTfulUtils assistRESTfulUtils; private AssistRESTfulUtils assistRESTfulUtils;
@ -167,7 +175,7 @@ public class PlayServiceImpl implements IPlayService {
String mediaServerId = streamInfo.getMediaServerId(); String mediaServerId = streamInfo.getMediaServerId();
MediaServerItem mediaInfo = mediaServerService.getOne(mediaServerId); MediaServerItem mediaInfo = mediaServerService.getOne(mediaServerId);
Boolean ready = zlmrtpServerFactory.isStreamReady(mediaInfo, "rtp", streamId); Boolean ready = ZLMServerFactory.isStreamReady(mediaInfo, "rtp", streamId);
if (ready != null && ready) { if (ready != null && ready) {
callback.run(InviteErrorCode.SUCCESS.getCode(), InviteErrorCode.SUCCESS.getMsg(), streamInfo); callback.run(InviteErrorCode.SUCCESS.getCode(), InviteErrorCode.SUCCESS.getMsg(), streamInfo);
inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId, null, inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId, null,
@ -183,12 +191,11 @@ public class PlayServiceImpl implements IPlayService {
} }
} }
} }
String streamId = null; String streamId = null;
if (mediaServerItem.isRtpEnable()) { if (mediaServerItem.isRtpEnable()) {
streamId = String.format("%s_%s", device.getDeviceId(), channelId); streamId = String.format("%s_%s", device.getDeviceId(), channelId);
} }
SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, streamId, null, device.isSsrcCheck(), false, 0, false, false, device.getStreamModeForParam()); SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, streamId, null, device.isSsrcCheck(), false, 0, false, device.getStreamModeForParam());
if (ssrcInfo == null) { if (ssrcInfo == null) {
callback.run(InviteErrorCode.ERROR_FOR_RESOURCE_EXHAUSTION.getCode(), InviteErrorCode.ERROR_FOR_RESOURCE_EXHAUSTION.getMsg(), null); callback.run(InviteErrorCode.ERROR_FOR_RESOURCE_EXHAUSTION.getCode(), InviteErrorCode.ERROR_FOR_RESOURCE_EXHAUSTION.getMsg(), null);
inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId, null, inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId, null,

View File

@ -6,7 +6,7 @@ import com.genersoft.iot.vmp.common.VideoManagerConstants;
import com.genersoft.iot.vmp.conf.DynamicTask; import com.genersoft.iot.vmp.conf.DynamicTask;
import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem; import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory; import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory;
import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe; import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe;
import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory; import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory;
import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange; import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange;
@ -72,7 +72,7 @@ public class RedisGbPlayMsgListener implements MessageListener {
private RedisTemplate<Object, Object> redisTemplate; private RedisTemplate<Object, Object> redisTemplate;
@Autowired @Autowired
private ZLMRTPServerFactory zlmrtpServerFactory; private ZLMServerFactory ZLMServerFactory;
@Autowired @Autowired
private IMediaServerService mediaServerService; private IMediaServerService mediaServerService;
@ -230,7 +230,7 @@ public class RedisGbPlayMsgListener implements MessageListener {
param.put("pt", requestPushStreamMsg.getPt()); param.put("pt", requestPushStreamMsg.getPt());
param.put("use_ps", requestPushStreamMsg.isPs() ? "1" : "0"); param.put("use_ps", requestPushStreamMsg.isPs() ? "1" : "0");
param.put("only_audio", requestPushStreamMsg.isOnlyAudio() ? "1" : "0"); param.put("only_audio", requestPushStreamMsg.isOnlyAudio() ? "1" : "0");
JSONObject jsonObject = zlmrtpServerFactory.startSendRtpStream(mediaInfo, param); JSONObject jsonObject = ZLMServerFactory.startSendRtpStream(mediaInfo, param);
// 回复消息 // 回复消息
responsePushStream(jsonObject, fromId, serial); responsePushStream(jsonObject, fromId, serial);
} }
@ -267,7 +267,7 @@ public class RedisGbPlayMsgListener implements MessageListener {
return; return;
} }
// 确定流是否在线 // 确定流是否在线
Boolean streamReady = zlmrtpServerFactory.isStreamReady(mediaServerItem, content.getApp(), content.getStream()); Boolean streamReady = ZLMServerFactory.isStreamReady(mediaServerItem, content.getApp(), content.getStream());
if (streamReady != null && streamReady) { if (streamReady != null && streamReady) {
logger.info("[回复推流信息] {}/{}", content.getApp(), content.getStream()); logger.info("[回复推流信息] {}/{}", content.getApp(), content.getStream());
responseSendItem(mediaServerItem, content, toId, serial); responseSendItem(mediaServerItem, content, toId, serial);
@ -311,7 +311,7 @@ public class RedisGbPlayMsgListener implements MessageListener {
* sendItem * sendItem
*/ */
private void responseSendItem(MediaServerItem mediaServerItem, RequestSendItemMsg content, String toId, String serial) { private void responseSendItem(MediaServerItem mediaServerItem, RequestSendItemMsg content, String toId, String serial) {
SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(mediaServerItem, content.getIp(), SendRtpItem sendRtpItem = ZLMServerFactory.createSendRtpItem(mediaServerItem, content.getIp(),
content.getPort(), content.getSsrc(), content.getPlatformId(), content.getPort(), content.getSsrc(), content.getPlatformId(),
content.getApp(), content.getStream(), content.getChannelId(), content.getApp(), content.getStream(), content.getChannelId(),
content.getTcp(), content.getRtcp()); content.getTcp(), content.getRtcp());

View File

@ -6,7 +6,7 @@ import com.genersoft.iot.vmp.gb28181.bean.InviteStreamType;
import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform; import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform;
import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem; import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform; import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform;
import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory; import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem; import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem;
import com.genersoft.iot.vmp.service.IMediaServerService; import com.genersoft.iot.vmp.service.IMediaServerService;
@ -58,7 +58,7 @@ public class RedisPushStreamCloseResponseListener implements MessageListener {
private IMediaServerService mediaServerService; private IMediaServerService mediaServerService;
@Autowired @Autowired
private ZLMRTPServerFactory zlmrtpServerFactory; private ZLMServerFactory ZLMServerFactory;
private Map<String, PushStreamResponseEvent> responseEvents = new ConcurrentHashMap<>(); private Map<String, PushStreamResponseEvent> responseEvents = new ConcurrentHashMap<>();

View File

@ -304,7 +304,7 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage {
@Override @Override
public void sendStreamChangeMsg(String type, JSONObject jsonObject) { public void sendStreamChangeMsg(String type, JSONObject jsonObject) {
String key = VideoManagerConstants.WVP_MSG_STREAM_CHANGE_PREFIX + type; String key = VideoManagerConstants.WVP_MSG_STREAM_CHANGE_PREFIX + type;
logger.info("[redis 流变化事件] {}: {}", key, jsonObject.toString()); logger.info("[redis 流变化事件] 发送 {}: {}", key, jsonObject.toString());
redisTemplate.convertAndSend(key, jsonObject); redisTemplate.convertAndSend(key, jsonObject);
} }
@ -540,14 +540,14 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage {
@Override @Override
public void sendMobilePositionMsg(JSONObject jsonObject) { public void sendMobilePositionMsg(JSONObject jsonObject) {
String key = VideoManagerConstants.VM_MSG_SUBSCRIBE_MOBILE_POSITION; String key = VideoManagerConstants.VM_MSG_SUBSCRIBE_MOBILE_POSITION;
logger.info("[redis发送通知] 移动位置 {}: {}", key, jsonObject.toString()); logger.info("[redis发送通知] 发送 移动位置 {}: {}", key, jsonObject.toString());
redisTemplate.convertAndSend(key, jsonObject); redisTemplate.convertAndSend(key, jsonObject);
} }
@Override @Override
public void sendStreamPushRequestedMsg(MessageForPushChannel msg) { public void sendStreamPushRequestedMsg(MessageForPushChannel msg) {
String key = VideoManagerConstants.VM_MSG_STREAM_PUSH_REQUESTED; String key = VideoManagerConstants.VM_MSG_STREAM_PUSH_REQUESTED;
logger.info("[redis发送通知] 推流被请求 {}: {}/{}", key, msg.getApp(), msg.getStream()); logger.info("[redis发送通知] 发送 推流被请求 {}: {}/{}", key, msg.getApp(), msg.getStream());
redisTemplate.convertAndSend(key, JSON.toJSON(msg)); redisTemplate.convertAndSend(key, JSON.toJSON(msg));
} }
@ -555,7 +555,7 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage {
public void sendAlarmMsg(AlarmChannelMessage msg) { public void sendAlarmMsg(AlarmChannelMessage msg) {
// 此消息用于对接第三方服务下级来的消息内容 // 此消息用于对接第三方服务下级来的消息内容
String key = VideoManagerConstants.VM_MSG_SUBSCRIBE_ALARM; String key = VideoManagerConstants.VM_MSG_SUBSCRIBE_ALARM;
logger.info("[redis发送通知] 报警{}: {}", key, JSON.toJSON(msg)); logger.info("[redis发送通知] 发送 报警{}: {}", key, JSON.toJSON(msg));
redisTemplate.convertAndSend(key, JSON.toJSON(msg)); redisTemplate.convertAndSend(key, JSON.toJSON(msg));
} }
@ -568,7 +568,7 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage {
@Override @Override
public void sendStreamPushRequestedMsgForStatus() { public void sendStreamPushRequestedMsgForStatus() {
String key = VideoManagerConstants.VM_MSG_GET_ALL_ONLINE_REQUESTED; String key = VideoManagerConstants.VM_MSG_GET_ALL_ONLINE_REQUESTED;
logger.info("[redis通知]获取所有推流设备的状态"); logger.info("[redis通知] 发送 获取所有推流设备的状态");
JSONObject jsonObject = new JSONObject(); JSONObject jsonObject = new JSONObject();
jsonObject.put(key, key); jsonObject.put(key, key);
redisTemplate.convertAndSend(key, jsonObject); redisTemplate.convertAndSend(key, jsonObject);
@ -596,6 +596,7 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage {
@Override @Override
public void sendDeviceOrChannelStatus(String deviceId, String channelId, boolean online) { public void sendDeviceOrChannelStatus(String deviceId, String channelId, boolean online) {
String key = VideoManagerConstants.VM_MSG_SUBSCRIBE_DEVICE_STATUS; String key = VideoManagerConstants.VM_MSG_SUBSCRIBE_DEVICE_STATUS;
logger.info("[redis通知] 发送 推送设备/通道状态, {}/{}-{}", deviceId, channelId, online);
StringBuilder msg = new StringBuilder(); StringBuilder msg = new StringBuilder();
msg.append(deviceId); msg.append(deviceId);
if (channelId != null) { if (channelId != null) {
@ -626,14 +627,14 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage {
@Override @Override
public void sendPlatformStartPlayMsg(MessageForPushChannel msg) { public void sendPlatformStartPlayMsg(MessageForPushChannel msg) {
String key = VideoManagerConstants.VM_MSG_STREAM_START_PLAY_NOTIFY; String key = VideoManagerConstants.VM_MSG_STREAM_START_PLAY_NOTIFY;
logger.info("[redis发送通知] 推流被上级平台观看 {}: {}/{}->{}", key, msg.getApp(), msg.getStream(), msg.getPlatFormId()); logger.info("[redis发送通知] 发送 推流被上级平台观看 {}: {}/{}->{}", key, msg.getApp(), msg.getStream(), msg.getPlatFormId());
redisTemplate.convertAndSend(key, JSON.toJSON(msg)); redisTemplate.convertAndSend(key, JSON.toJSON(msg));
} }
@Override @Override
public void sendPlatformStopPlayMsg(MessageForPushChannel msg) { public void sendPlatformStopPlayMsg(MessageForPushChannel msg) {
String key = VideoManagerConstants.VM_MSG_STREAM_STOP_PLAY_NOTIFY; String key = VideoManagerConstants.VM_MSG_STREAM_STOP_PLAY_NOTIFY;
logger.info("[redis发送通知] 上级平台停止观看 {}: {}/{}->{}", key, msg.getApp(), msg.getStream(), msg.getPlatFormId()); logger.info("[redis发送通知] 发送 上级平台停止观看 {}: {}/{}->{}", key, msg.getApp(), msg.getStream(), msg.getPlatFormId());
redisTemplate.convertAndSend(key, JSON.toJSON(msg)); redisTemplate.convertAndSend(key, JSON.toJSON(msg));
} }
} }

View File

@ -0,0 +1,135 @@
package com.genersoft.iot.vmp.vmanager.bean;
public class OtherRtpSendInfo {
/**
* IP
*/
private String ip;
/**
*
*/
private int port;
/**
* IP
*/
private String receiveIp;
/**
*
*/
private int receivePort;
/**
* ID
*/
private String callId;
/**
* ID
*/
private String stream;
/**
*
*/
private String pushApp;
/**
* ID
*/
private String pushStream;
/**
* SSRC
*/
private String pushSSRC;
public String getIp() {
return ip;
}
public void setIp(String ip) {
this.ip = ip;
}
public int getPort() {
return port;
}
public void setPort(int port) {
this.port = port;
}
public String getReceiveIp() {
return receiveIp;
}
public void setReceiveIp(String receiveIp) {
this.receiveIp = receiveIp;
}
public int getReceivePort() {
return receivePort;
}
public void setReceivePort(int receivePort) {
this.receivePort = receivePort;
}
public String getCallId() {
return callId;
}
public void setCallId(String callId) {
this.callId = callId;
}
public String getStream() {
return stream;
}
public void setStream(String stream) {
this.stream = stream;
}
public String getPushApp() {
return pushApp;
}
public void setPushApp(String pushApp) {
this.pushApp = pushApp;
}
public String getPushStream() {
return pushStream;
}
public void setPushStream(String pushStream) {
this.pushStream = pushStream;
}
public String getPushSSRC() {
return pushSSRC;
}
public void setPushSSRC(String pushSSRC) {
this.pushSSRC = pushSSRC;
}
@Override
public String toString() {
return "OtherRtpSendInfo{" +
"ip='" + ip + '\'' +
", port=" + port +
", receiveIp='" + receiveIp + '\'' +
", receivePort=" + receivePort +
", callId='" + callId + '\'' +
", stream='" + stream + '\'' +
'}';
}
}

View File

@ -11,7 +11,7 @@ import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder; import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder;
import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage; import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander; import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander;
import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory; import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory;
import com.genersoft.iot.vmp.service.IInviteStreamService; import com.genersoft.iot.vmp.service.IInviteStreamService;
import com.genersoft.iot.vmp.service.IPlayService; import com.genersoft.iot.vmp.service.IPlayService;
import com.genersoft.iot.vmp.service.bean.InviteErrorCode; import com.genersoft.iot.vmp.service.bean.InviteErrorCode;
@ -54,7 +54,7 @@ public class PlaybackController {
private SIPCommander cmder; private SIPCommander cmder;
@Autowired @Autowired
private ZLMRTPServerFactory zlmrtpServerFactory; private ZLMServerFactory ZLMServerFactory;
@Autowired @Autowired
private IVideoManagerStorage storager; private IVideoManagerStorage storager;

View File

@ -1,25 +1,45 @@
package com.genersoft.iot.vmp.vmanager.rtp; package com.genersoft.iot.vmp.vmanager.rtp;
import com.alibaba.fastjson2.JSONObject;
import com.genersoft.iot.vmp.common.VideoManagerConstants;
import com.genersoft.iot.vmp.conf.DynamicTask;
import com.genersoft.iot.vmp.conf.SipConfig; import com.genersoft.iot.vmp.conf.SipConfig;
import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.conf.VersionInfo; import com.genersoft.iot.vmp.conf.VersionInfo;
import com.genersoft.iot.vmp.conf.exception.ControllerException; import com.genersoft.iot.vmp.conf.exception.ControllerException;
import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem; import com.genersoft.iot.vmp.media.zlm.SendRtpPortManager;
import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory;
import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe; import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe;
import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory;
import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForRtpServerTimeout;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.service.*; import com.genersoft.iot.vmp.media.zlm.dto.hook.OnRtpServerTimeoutHookParam;
import com.genersoft.iot.vmp.service.IDeviceChannelService;
import com.genersoft.iot.vmp.service.IDeviceService;
import com.genersoft.iot.vmp.service.IMediaServerService;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.vmanager.bean.ErrorCode; import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
import com.genersoft.iot.vmp.vmanager.bean.OtherRtpSendInfo;
import io.swagger.v3.oas.annotations.Operation; import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.Parameter; import io.swagger.v3.oas.annotations.Parameter;
import io.swagger.v3.oas.annotations.tags.Tag; import io.swagger.v3.oas.annotations.tags.Tag;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody; import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.bind.annotation.RestController; import org.springframework.web.bind.annotation.RestController;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
@SuppressWarnings("rawtypes") @SuppressWarnings("rawtypes")
@Tag(name = "第三方服务对接") @Tag(name = "第三方服务对接")
@ -28,7 +48,15 @@ import org.springframework.web.bind.annotation.RestController;
public class RtpController { public class RtpController {
@Autowired @Autowired
private ZlmHttpHookSubscribe zlmHttpHookSubscribe; private ZLMServerFactory zlmServerFactory;
@Autowired
private SendRtpPortManager sendRtpPortManager;
private final static Logger logger = LoggerFactory.getLogger(RtpController.class);
@Autowired
private ZlmHttpHookSubscribe hookSubscribe;
@Autowired @Autowired
private IMediaServerService mediaServerService; private IMediaServerService mediaServerService;
@ -49,11 +77,11 @@ public class RtpController {
private IDeviceChannelService channelService; private IDeviceChannelService channelService;
@Autowired @Autowired
private IStreamPushService pushService; private DynamicTask dynamicTask;
@Autowired @Autowired
private IStreamProxyService proxyService; private RedisTemplate<Object, Object> redisTemplate;
@Value("${server.port}") @Value("${server.port}")
@ -73,12 +101,67 @@ public class RtpController {
@Parameter(name = "stream", description = "形成的流的ID", required = true) @Parameter(name = "stream", description = "形成的流的ID", required = true)
@Parameter(name = "tcpMode", description = "收流模式, 0为UDP 1为TCP被动", required = true) @Parameter(name = "tcpMode", description = "收流模式, 0为UDP 1为TCP被动", required = true)
@Parameter(name = "callBack", description = "回调地址如果收流超时会通道回调通知回调为get请求参数为callId", required = true) @Parameter(name = "callBack", description = "回调地址如果收流超时会通道回调通知回调为get请求参数为callId", required = true)
public SendRtpItem openRtpServer(Boolean isSend, String ssrc, String callId, String stream, Integer tcpMode, String callBack) { public OtherRtpSendInfo openRtpServer(Boolean isSend, String ssrc, String callId, String stream, Integer tcpMode, String callBack) {
MediaServerItem mediaServerItem = mediaServerService.getMediaServerForMinimumLoad(null);
logger.info("[第三方服务对接->开启收流和获取发流信息] isSend->{}, ssrc->{}, callId->{}, stream->{}, tcpMode->{}, callBack->{}",
isSend, ssrc, callId, stream, tcpMode==0?"UDP":"TCP被动", callBack);
MediaServerItem mediaServerItem = mediaServerService.getDefaultMediaServer();
if (mediaServerItem == null) { if (mediaServerItem == null) {
throw new ControllerException(ErrorCode.ERROR100.getCode(),"没有可用的MediaServer"); throw new ControllerException(ErrorCode.ERROR100.getCode(),"没有可用的MediaServer");
} }
return null; if (stream == null) {
throw new ControllerException(ErrorCode.ERROR100.getCode(),"stream参数不可为空");
}
if (isSend != null && isSend && callId == null) {
throw new ControllerException(ErrorCode.ERROR100.getCode(),"isSend为true时CallID不能为空");
}
int ssrcInt = 0;
if (ssrc != null) {
try {
ssrcInt = Integer.parseInt(ssrc);
}catch (NumberFormatException e) {
throw new ControllerException(ErrorCode.ERROR100.getCode(),"ssrc格式错误");
}
}
int localPort = zlmServerFactory.createRTPServer(mediaServerItem, stream, ssrcInt, null, false, tcpMode);
// 注册回调如果rtp收流超时则通过回调发送通知
if (callBack != null) {
HookSubscribeForRtpServerTimeout hookSubscribeForRtpServerTimeout = HookSubscribeFactory.on_rtp_server_timeout(ssrc, null, mediaServerItem.getId());
// 订阅 zlm启动事件, 新的zlm也会从这里进入系统
hookSubscribe.addSubscribe(hookSubscribeForRtpServerTimeout,
(mediaServerItemInUse, hookParam)->{
OnRtpServerTimeoutHookParam serverTimeoutHookParam = (OnRtpServerTimeoutHookParam) hookParam;
if (stream.equals(serverTimeoutHookParam.getStream_id())) {
logger.info("[开启收流和获取发流信息] 等待收流超时 callId->{}, 发送回调", callId);
OkHttpClient.Builder httpClientBuilder = new OkHttpClient.Builder();
OkHttpClient client = httpClientBuilder.build();
String url = callBack + "?callId=" + callId;
Request request = new Request.Builder().get().url(url).build();
try {
client.newCall(request).execute();
} catch (IOException e) {
logger.error("[开启收流和获取发流信息] 等待收流超时 callId->{}, 发送回调失败", callId, e);
}
}
});
}
String key = VideoManagerConstants.WVP_OTHER_SEND_RTP_INFO + userSetting.getServerId() + callId;
OtherRtpSendInfo otherRtpSendInfo = new OtherRtpSendInfo();
otherRtpSendInfo.setReceiveIp(mediaServerItem.getSdpIp());
otherRtpSendInfo.setReceivePort(localPort);
otherRtpSendInfo.setCallId(callId);
otherRtpSendInfo.setStream(stream);
if (isSend != null && isSend) {
int port = sendRtpPortManager.getNextPort(mediaServerItem.getId());
otherRtpSendInfo.setIp(mediaServerItem.getSdpIp());
otherRtpSendInfo.setPort(port);
logger.info("[开启收流和获取发流信息] 结果callId->{} {}", callId, otherRtpSendInfo);
}
// 将信息写入redis中以备后用
redisTemplate.opsForValue().set(key, otherRtpSendInfo, 300, TimeUnit.SECONDS);
return otherRtpSendInfo;
} }
@GetMapping(value = "/receive/close") @GetMapping(value = "/receive/close")
@ -86,7 +169,9 @@ public class RtpController {
@Operation(summary = "关闭收流") @Operation(summary = "关闭收流")
@Parameter(name = "stream", description = "流的ID", required = true) @Parameter(name = "stream", description = "流的ID", required = true)
public void closeRtpServer(String stream) { public void closeRtpServer(String stream) {
logger.info("[第三方服务对接->关闭收流] stream->{}", stream);
MediaServerItem mediaServerItem = mediaServerService.getDefaultMediaServer();
zlmServerFactory.closeRtpServer(mediaServerItem,stream);
} }
@GetMapping(value = "/send/start") @GetMapping(value = "/send/start")
@ -99,9 +184,44 @@ public class RtpController {
@Parameter(name = "stream", description = "待发送流Id", required = true) @Parameter(name = "stream", description = "待发送流Id", required = true)
@Parameter(name = "callId", description = "整个过程的唯一标识,不传则使用随机端口发流", required = true) @Parameter(name = "callId", description = "整个过程的唯一标识,不传则使用随机端口发流", required = true)
@Parameter(name = "onlyAudio", description = "是否只有音频", required = true) @Parameter(name = "onlyAudio", description = "是否只有音频", required = true)
@Parameter(name = "isUdp", description = "是否为UDP", required = true)
@Parameter(name = "streamType", description = "流类型1为es流2为ps流 默认es流", required = false) @Parameter(name = "streamType", description = "流类型1为es流2为ps流 默认es流", required = false)
public void sendRTP(String ssrc, String ip, Integer port, String app, String stream, String callId, Boolean onlyAudio, Integer streamType) { public void sendRTP(String ssrc, String ip, Integer port, String app, String stream, String callId, Boolean onlyAudio, Boolean isUdp, Integer streamType) {
logger.info("[第三方服务对接->发送流] ssrc->{}, ip->{}, port->{}, app->{}, stream->{}, callId->{}, onlyAudio->{}, streamType->{}",
ssrc, ip, port, app, stream, callId, onlyAudio, streamType == 1? "ES":"PS");
MediaServerItem mediaServerItem = mediaServerService.getDefaultMediaServer();
String key = VideoManagerConstants.WVP_OTHER_SEND_RTP_INFO + userSetting.getServerId() + callId;
OtherRtpSendInfo sendInfo = (OtherRtpSendInfo)redisTemplate.opsForValue().get(key);
if (sendInfo == null) {
sendInfo = new OtherRtpSendInfo();
}
sendInfo.setPushApp(app);
sendInfo.setPushStream(stream);
sendInfo.setPushSSRC(ssrc);
Map<String, Object> param = new HashMap<>(12);
param.put("vhost","__defaultVhost__");
param.put("app",app);
param.put("stream",stream);
param.put("ssrc", ssrc);
param.put("dst_url",ip);
param.put("dst_port", port);
String is_Udp = isUdp ? "1" : "0";
param.put("is_udp", is_Udp);
param.put("src_port", sendInfo.getPort());
param.put("use_ps", streamType==2 ? "1" : "0");
param.put("only_audio", onlyAudio ? "1" : "0");
JSONObject jsonObject = zlmServerFactory.startSendRtpStream(mediaServerItem, param);
if (jsonObject.getInteger("code") == 0) {
logger.info("[第三方服务对接->发送流] 发流成功callId->{}", callId);
redisTemplate.opsForValue().set(key, sendInfo);
}else {
redisTemplate.delete(key);
logger.info("[第三方服务对接->发送流] 发流失败callId->{}, {}", callId, jsonObject.getString("msg"));
throw new ControllerException(ErrorCode.ERROR100.getCode(), "[发流失败] " + jsonObject.getString("msg"));
}
} }
@ -111,7 +231,25 @@ public class RtpController {
@Operation(summary = "关闭发送流") @Operation(summary = "关闭发送流")
@Parameter(name = "callId", description = "整个过程的唯一标识,不传则使用随机端口发流", required = true) @Parameter(name = "callId", description = "整个过程的唯一标识,不传则使用随机端口发流", required = true)
public void closeSendRTP(String callId) { public void closeSendRTP(String callId) {
logger.info("[第三方服务对接->关闭发送流] callId->{}", callId);
String key = VideoManagerConstants.WVP_OTHER_SEND_RTP_INFO + userSetting.getServerId() + callId;
OtherRtpSendInfo sendInfo = (OtherRtpSendInfo)redisTemplate.opsForValue().get(key);
if (sendInfo == null){
throw new ControllerException(ErrorCode.ERROR100.getCode(), "未开启发流");
}
Map<String, Object> param = new HashMap<>();
param.put("vhost","__defaultVhost__");
param.put("app",sendInfo.getPushApp());
param.put("stream",sendInfo.getPushStream());
param.put("ssrc",sendInfo.getPushSSRC());
MediaServerItem mediaServerItem = mediaServerService.getDefaultMediaServer();
Boolean result = zlmServerFactory.stopSendRtpStream(mediaServerItem, param);
if (!result) {
logger.info("[第三方服务对接->关闭发送流] 失败 callId->{}", callId);
throw new ControllerException(ErrorCode.ERROR100.getCode(), "停止发流失败");
}else {
logger.info("[第三方服务对接->关闭发送流] 成功 callId->{}", callId);
}
} }
} }

View File

@ -12,15 +12,6 @@
> >
<div id="shared" style="margin-top: 1rem;margin-right: 100px;"> <div id="shared" style="margin-top: 1rem;margin-right: 100px;">
<el-form ref="form" :rules="rules" :model="form" label-width="140px" > <el-form ref="form" :rules="rules" :model="form" label-width="140px" >
<!-- <el-form-item >-->
<!-- 建议的类型-->
<!-- <br/>-->
<!-- &emsp;&emsp;行政区划可选2位/4/6/8/10位数字例如130432表示河北省邯郸市广平县-->
<!-- <br/>-->
<!-- &emsp;&emsp;业务分组第111213位215例如34020000002150000001-->
<!-- <br/>-->
<!-- &emsp;&emsp;虚拟组织第111213位216例如34020000002160000001-->
<!-- </el-form-item>-->
<el-form-item label="节点编号" prop="id" > <el-form-item label="节点编号" prop="id" >
<el-input v-model="form.id" :disabled="isEdit" clearable></el-input> <el-input v-model="form.id" :disabled="isEdit" clearable></el-input>
</el-form-item> </el-form-item>
@ -63,7 +54,11 @@ export default {
return callback(new Error('行政区划编号必须为2/4/6/8位')); return callback(new Error('行政区划编号必须为2/4/6/8位'));
} }
if (this.form.parentId !== this.platformDeviceId && this.form.parentId.length >= value.trim().length) { if (this.form.parentId !== this.platformDeviceId && this.form.parentId.length >= value.trim().length) {
return callback(new Error('行政区划编号长度应该每次两位递增')); if (this.form.parentId.length === 20) {
return callback(new Error('业务分组/虚拟组织下不可创建行政区划'));
}else {
return callback(new Error('行政区划编号长度应该每次两位递增'));
}
} }
}else { }else {
if (value.trim().length !== 20) { if (value.trim().length !== 20) {
@ -122,27 +117,31 @@ export default {
this.level = level; this.level = level;
}, },
onSubmit: function () { onSubmit: function () {
console.log("onSubmit"); this.$refs["form"].validate((valid) => {
console.log(this.form); if (valid) {
this.$axios({ this.$axios({
method:"post", method:"post",
url:`/api/platform/catalog/${!this.isEdit? "add":"edit"}`, url:`/api/platform/catalog/${!this.isEdit? "add":"edit"}`,
data: this.form data: this.form
}).then((res)=> { }).then((res)=> {
if (res.data.code === 0) { if (res.data.code === 0) {
if (this.submitCallback)this.submitCallback(this.form) if (this.submitCallback)this.submitCallback(this.form)
}else { }else {
this.$message({ this.$message({
showClose: true, showClose: true,
message: res.data.msg, message: res.data.msg,
type: "error", type: "error",
});
}
this.close();
})
.catch((error)=> {
console.log(error);
}); });
} } else {
this.close(); return false;
}) }
.catch((error)=> { });
console.log(error);
});
}, },
close: function () { close: function () {
this.isEdit = false; this.isEdit = false;