diff --git a/pom.xml b/pom.xml
index 7ec73b81..df841d52 100644
--- a/pom.xml
+++ b/pom.xml
@@ -11,7 +11,7 @@
com.genersoft
wvp-pro
- 2.7.0
+ 2.7.1
web video platform
国标28181视频平台
${project.packaging}
diff --git a/src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java b/src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java
index bfd39c25..df230d44 100644
--- a/src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java
+++ b/src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java
@@ -12,9 +12,9 @@ public class VideoManagerConstants {
public static final String WVP_SERVER_STREAM_PREFIX = "VMP_SIGNALLING_STREAM_";
- public static final String MEDIA_SERVER_PREFIX = "VMP_MEDIA_SERVER_INFO_";
+ public static final String MEDIA_SERVER_PREFIX = "VMP_MEDIA_SERVER:";
- public static final String MEDIA_SERVERS_ONLINE_PREFIX = "VMP_MEDIA_ONLINE_SERVERS_";
+ public static final String ONLINE_MEDIA_SERVERS_PREFIX = "VMP_ONLINE_MEDIA_SERVERS:";
public static final String DEVICE_PREFIX = "VMP_DEVICE_";
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java
index d1eb5d30..5410d67d 100644
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java
@@ -3,6 +3,9 @@ package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl;
import com.alibaba.fastjson2.JSONObject;
import com.genersoft.iot.vmp.conf.DynamicTask;
import com.genersoft.iot.vmp.conf.UserSetting;
+import com.genersoft.iot.vmp.conf.exception.ControllerException;
+import com.genersoft.iot.vmp.conf.exception.SsrcTransactionNotFoundException;
+import com.genersoft.iot.vmp.gb28181.bean.AudioBroadcastCatch;
import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform;
import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
@@ -25,12 +28,15 @@ import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
+import javax.sip.InvalidArgumentException;
import javax.sip.RequestEvent;
+import javax.sip.SipException;
import javax.sip.address.SipURI;
import javax.sip.header.CallIdHeader;
import javax.sip.header.FromHeader;
import javax.sip.header.HeaderAddress;
import javax.sip.header.ToHeader;
+import java.text.ParseException;
import java.util.HashMap;
import java.util.Map;
@@ -115,19 +121,24 @@ public class AckRequestProcessor extends SIPRequestProcessorParent implements In
ParentPlatform parentPlatform = storager.queryParentPlatByServerGBId(fromUserId);
if (parentPlatform != null) {
- Map param = getSendRtpParam(sendRtpItem);
if (!userSetting.getServerId().equals(sendRtpItem.getServerId())) {
RequestPushStreamMsg requestPushStreamMsg = RequestPushStreamMsg.getInstance(
sendRtpItem.getMediaServerId(), sendRtpItem.getApp(), sendRtpItem.getStream(),
sendRtpItem.getIp(), sendRtpItem.getPort(), sendRtpItem.getSsrc(), sendRtpItem.isTcp(),
sendRtpItem.getLocalPort(), sendRtpItem.getPt(), sendRtpItem.isUsePs(), sendRtpItem.isOnlyAudio());
redisGbPlayMsgListener.sendMsgForStartSendRtpStream(sendRtpItem.getServerId(), requestPushStreamMsg, json -> {
- playService.startSendRtpStreamHand(sendRtpItem, parentPlatform, json, param, callIdHeader);
+ playService.startSendRtpStreamFailHand(sendRtpItem, parentPlatform, callIdHeader);
});
} else {
- JSONObject startSendRtpStreamResult = sendRtp(sendRtpItem, mediaInfo, param);
- if (startSendRtpStreamResult != null) {
- playService.startSendRtpStreamHand(sendRtpItem, parentPlatform, startSendRtpStreamResult, param, callIdHeader);
+ try {
+ if (sendRtpItem.isTcpActive()) {
+ mediaServerService.startSendRtpPassive(mediaInfo, parentPlatform, sendRtpItem, null);
+ } else {
+ mediaServerService.startSendRtpStream(mediaInfo, parentPlatform, sendRtpItem);
+ }
+ }catch (ControllerException e) {
+ logger.error("RTP推流失败: {}", e.getMessage());
+ playService.startSendRtpStreamFailHand(sendRtpItem, parentPlatform, callIdHeader);
}
}
}else {
@@ -144,56 +155,17 @@ public class AckRequestProcessor extends SIPRequestProcessorParent implements In
logger.warn("[收到ACK]:来自{},目标为({})的推流信息为找到流体服务[{}]信息",fromUserId, toUserId, sendRtpItem.getMediaServerId());
return;
}
- Map param = getSendRtpParam(sendRtpItem);
- JSONObject startSendRtpStreamResult = sendRtp(sendRtpItem, mediaInfo, param);
- if (startSendRtpStreamResult != null) {
- playService.startSendRtpStreamHand(sendRtpItem, device, startSendRtpStreamResult, param, callIdHeader);
+ try {
+ if (sendRtpItem.isTcpActive()) {
+ mediaServerService.startSendRtpPassive(mediaInfo, null, sendRtpItem, null);
+ } else {
+ mediaServerService.startSendRtpStream(mediaInfo, null, sendRtpItem);
+ }
+ }catch (ControllerException e) {
+ logger.error("RTP推流失败: {}", e.getMessage());
+ playService.startSendRtpStreamFailHand(sendRtpItem, null, callIdHeader);
}
}
}
- private Map getSendRtpParam(SendRtpItem sendRtpItem) {
- String isUdp = sendRtpItem.isTcp() ? "0" : "1";
- Map param = new HashMap<>(12);
- param.put("vhost","__defaultVhost__");
- param.put("app",sendRtpItem.getApp());
- param.put("stream",sendRtpItem.getStream());
- param.put("ssrc", sendRtpItem.getSsrc());
- param.put("dst_url",sendRtpItem.getIp());
- param.put("dst_port", sendRtpItem.getPort());
- param.put("src_port", sendRtpItem.getLocalPort());
- param.put("pt", sendRtpItem.getPt());
- param.put("use_ps", sendRtpItem.isUsePs() ? "1" : "0");
- param.put("only_audio", sendRtpItem.isOnlyAudio() ? "1" : "0");
- param.put("is_udp", isUdp);
- if (!sendRtpItem.isTcp()) {
- // udp模式下开启rtcp保活
- param.put("udp_rtcp_timeout", sendRtpItem.isRtcp()? "1":"0");
- }
- return param;
- }
-
- private JSONObject sendRtp(SendRtpItem sendRtpItem, MediaServer mediaInfo, Map param){
- JSONObject startSendRtpStreamResult = null;
- if (sendRtpItem.getLocalPort() != 0) {
- if (sendRtpItem.isTcpActive()) {
- startSendRtpStreamResult = zlmServerFactory.startSendRtpPassive(mediaInfo, param);
- }else {
- param.put("dst_url", sendRtpItem.getIp());
- param.put("dst_port", sendRtpItem.getPort());
- startSendRtpStreamResult = zlmServerFactory.startSendRtpStream(mediaInfo, param);
- }
- }else {
- if (sendRtpItem.isTcpActive()) {
- startSendRtpStreamResult = zlmServerFactory.startSendRtpPassive(mediaInfo, param);
- }else {
- param.put("dst_url", sendRtpItem.getIp());
- param.put("dst_port", sendRtpItem.getPort());
- startSendRtpStreamResult = zlmServerFactory.startSendRtpStream(mediaInfo, param);
- }
- }
- return startSendRtpStreamResult;
-
- }
-
}
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java
index 7303594b..b4d183ed 100755
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java
@@ -641,7 +641,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
CallIdHeader callIdHeader, MediaServer mediaServerItem,
int port, Boolean tcpActive, boolean mediaTransmissionTCP,
String channelId, String addressStr, String ssrc, String requesterId) {
- Boolean streamReady = zlmServerFactory.isStreamReady(mediaServerItem, gbStream.getApp(), gbStream.getStream());
+ Boolean streamReady = mediaServerService.isStreamReady(mediaServerItem, gbStream.getApp(), gbStream.getStream());
if (streamReady != null && streamReady) {
// 自平台内容
SendRtpItem sendRtpItem = zlmServerFactory.createSendRtpItem(mediaServerItem, addressStr, port, ssrc, requesterId,
@@ -681,7 +681,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
String channelId, String addressStr, String ssrc, String requesterId) {
// 推流
if (streamPushItem.isSelf()) {
- Boolean streamReady = zlmServerFactory.isStreamReady(mediaServerItem, gbStream.getApp(), gbStream.getStream());
+ Boolean streamReady = mediaServerService.isStreamReady(mediaServerItem, gbStream.getApp(), gbStream.getStream());
if (streamReady != null && streamReady) {
// 自平台内容
SendRtpItem sendRtpItem = zlmServerFactory.createSendRtpItem(mediaServerItem, addressStr, port, ssrc, requesterId,
@@ -1108,7 +1108,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
redisCatchStorage.updateSendRTPSever(sendRtpItem);
- Boolean streamReady = zlmServerFactory.isStreamReady(mediaServerItem, broadcastCatch.getApp(), broadcastCatch.getStream());
+ Boolean streamReady = mediaServerService.isStreamReady(mediaServerItem, broadcastCatch.getApp(), broadcastCatch.getStream());
if (streamReady) {
sendOk(device, sendRtpItem, sdp, request, mediaServerItem, mediaTransmissionTCP, gb28181Sdp.getSsrc());
} else {
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/MessageRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/MessageRequestProcessor.java
index 0c23acea..34d7077e 100755
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/MessageRequestProcessor.java
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/MessageRequestProcessor.java
@@ -37,7 +37,7 @@ public class MessageRequestProcessor extends SIPRequestProcessorParent implement
private final String method = "MESSAGE";
- private static Map messageHandlerMap = new ConcurrentHashMap<>();
+ private static final Map messageHandlerMap = new ConcurrentHashMap<>();
@Autowired
private SIPProcessorObserver sipProcessorObserver;
@@ -100,9 +100,9 @@ public class MessageRequestProcessor extends SIPRequestProcessorParent implement
deviceNotFoundEvent.setCallId(callIdHeader.getCallId());
SipSubscribe.EventResult eventResult = new SipSubscribe.EventResult(deviceNotFoundEvent);
sipSubscribe.getErrorSubscribe(callIdHeader.getCallId()).response(eventResult);
- };
+ }
}else {
- Element rootElement = null;
+ Element rootElement;
try {
rootElement = getRootElement(evt);
if (rootElement == null) {
@@ -110,24 +110,24 @@ public class MessageRequestProcessor extends SIPRequestProcessorParent implement
responseAck(request, Response.BAD_REQUEST, "content is null");
return;
}
+ String name = rootElement.getName();
+ IMessageHandler messageHandler = messageHandlerMap.get(name);
+ if (messageHandler != null) {
+ if (device != null) {
+ messageHandler.handForDevice(evt, device, rootElement);
+ }else { // 由于上面已经判断都为null则直接返回,所以这里device和parentPlatform必有一个不为null
+ messageHandler.handForPlatform(evt, parentPlatform, rootElement);
+ }
+ }else {
+ // 不支持的message
+ // 不存在则回复415
+ responseAck(request, Response.UNSUPPORTED_MEDIA_TYPE, "Unsupported message type, must Control/Notify/Query/Response");
+ }
} catch (DocumentException e) {
logger.warn("解析XML消息内容异常", e);
// 不存在则回复404
responseAck(request, Response.BAD_REQUEST, e.getMessage());
}
- String name = rootElement.getName();
- IMessageHandler messageHandler = messageHandlerMap.get(name);
- if (messageHandler != null) {
- if (device != null) {
- messageHandler.handForDevice(evt, device, rootElement);
- }else { // 由于上面已经判断都为null则直接返回,所以这里device和parentPlatform必有一个不为null
- messageHandler.handForPlatform(evt, parentPlatform, rootElement);
- }
- }else {
- // 不支持的message
- // 不存在则回复415
- responseAck(request, Response.UNSUPPORTED_MEDIA_TYPE, "Unsupported message type, must Control/Notify/Query/Response");
- }
}
} catch (SipException e) {
logger.warn("SIP 回复错误", e);
diff --git a/src/main/java/com/genersoft/iot/vmp/media/service/IMediaNodeServerService.java b/src/main/java/com/genersoft/iot/vmp/media/service/IMediaNodeServerService.java
index 4b6b2215..87399382 100644
--- a/src/main/java/com/genersoft/iot/vmp/media/service/IMediaNodeServerService.java
+++ b/src/main/java/com/genersoft/iot/vmp/media/service/IMediaNodeServerService.java
@@ -2,6 +2,7 @@ package com.genersoft.iot.vmp.media.service;
import com.genersoft.iot.vmp.common.CommonCallback;
import com.genersoft.iot.vmp.common.StreamInfo;
+import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
import com.genersoft.iot.vmp.media.bean.MediaInfo;
import com.genersoft.iot.vmp.media.bean.MediaServer;
import com.genersoft.iot.vmp.vmanager.bean.WVPResult;
@@ -53,4 +54,8 @@ public interface IMediaNodeServerService {
Boolean delStreamProxy(MediaServer mediaServer, String streamKey);
Map getFFmpegCMDs(MediaServer mediaServer);
+
+ void startSendRtpPassive(MediaServer mediaServer, SendRtpItem sendRtpItem, Integer timeout);
+
+ void startSendRtpStream(MediaServer mediaServer, SendRtpItem sendRtpItem);
}
diff --git a/src/main/java/com/genersoft/iot/vmp/media/service/IMediaServerService.java b/src/main/java/com/genersoft/iot/vmp/media/service/IMediaServerService.java
index 2a72eb43..fb56ab6f 100755
--- a/src/main/java/com/genersoft/iot/vmp/media/service/IMediaServerService.java
+++ b/src/main/java/com/genersoft/iot/vmp/media/service/IMediaServerService.java
@@ -2,6 +2,8 @@ package com.genersoft.iot.vmp.media.service;
import com.genersoft.iot.vmp.common.CommonCallback;
import com.genersoft.iot.vmp.common.StreamInfo;
+import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform;
+import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
import com.genersoft.iot.vmp.media.bean.MediaInfo;
import com.genersoft.iot.vmp.media.bean.MediaServer;
import com.genersoft.iot.vmp.service.bean.MediaServerLoad;
@@ -133,5 +135,11 @@ public interface IMediaServerService {
* @param stream
* @return
*/
- StreamInfo getStreamInfoByAppAndStream(MediaServer mediaServerItem, String app, String stream, MediaInfo mediaInfo, String addr, String callId, boolean isPlay);
+ StreamInfo getStreamInfoByAppAndStream(MediaServer mediaServer, String app, String stream, MediaInfo mediaInfo, String addr, String callId, boolean isPlay);
+
+ Boolean isStreamReady(MediaServer mediaServer, String rtp, String streamId);
+
+ void startSendRtpPassive(MediaServer mediaServer, ParentPlatform platform, SendRtpItem sendRtpItem, Integer timeout);
+
+ void startSendRtpStream(MediaServer mediaServer, ParentPlatform platform, SendRtpItem sendRtpItem);
}
diff --git a/src/main/java/com/genersoft/iot/vmp/media/service/impl/MediaServerServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/media/service/impl/MediaServerServiceImpl.java
index 3721c8b0..fab8a3de 100755
--- a/src/main/java/com/genersoft/iot/vmp/media/service/impl/MediaServerServiceImpl.java
+++ b/src/main/java/com/genersoft/iot/vmp/media/service/impl/MediaServerServiceImpl.java
@@ -7,6 +7,9 @@ import com.genersoft.iot.vmp.common.VideoManagerConstants;
import com.genersoft.iot.vmp.conf.MediaConfig;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.conf.exception.ControllerException;
+import com.genersoft.iot.vmp.gb28181.bean.InviteStreamType;
+import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform;
+import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
import com.genersoft.iot.vmp.gb28181.session.SSRCFactory;
import com.genersoft.iot.vmp.media.bean.MediaInfo;
import com.genersoft.iot.vmp.media.event.media.MediaArrivalEvent;
@@ -19,6 +22,7 @@ import com.genersoft.iot.vmp.media.bean.MediaServer;
import com.genersoft.iot.vmp.media.zlm.dto.StreamAuthorityInfo;
import com.genersoft.iot.vmp.service.IInviteStreamService;
import com.genersoft.iot.vmp.service.bean.MediaServerLoad;
+import com.genersoft.iot.vmp.service.bean.MessageForPushChannel;
import com.genersoft.iot.vmp.service.bean.SSRCInfo;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.dao.MediaServerMapper;
@@ -121,7 +125,7 @@ public class MediaServerServiceImpl implements IMediaServerService {
ssrcFactory.initMediaServerSSRC(mediaServer.getId(), null);
}
// 查询redis是否存在此mediaServer
- String key = VideoManagerConstants.MEDIA_SERVER_PREFIX + userSetting.getServerId() + "_" + mediaServer.getId();
+ String key = VideoManagerConstants.MEDIA_SERVER_PREFIX + userSetting.getServerId() + ":" + mediaServer.getId();
Boolean hasKey = redisTemplate.hasKey(key);
if (hasKey != null && ! hasKey) {
redisTemplate.opsForValue().set(key, mediaServer);
@@ -257,7 +261,7 @@ public class MediaServerServiceImpl implements IMediaServerService {
if (mediaServerInRedis == null || !ssrcFactory.hasMediaServerSSRC(mediaServerInDataBase.getId())) {
ssrcFactory.initMediaServerSSRC(mediaServerInDataBase.getId(),null);
}
- String key = VideoManagerConstants.MEDIA_SERVER_PREFIX + userSetting.getServerId() + "_" + mediaServerInDataBase.getId();
+ String key = VideoManagerConstants.MEDIA_SERVER_PREFIX + userSetting.getServerId() + ":" + mediaServerInDataBase.getId();
redisTemplate.opsForValue().set(key, mediaServerInDataBase);
if (mediaServerInDataBase.isStatus()) {
resetOnlineServerItem(mediaServerInDataBase);
@@ -273,8 +277,8 @@ public class MediaServerServiceImpl implements IMediaServerService {
@Override
public List getAllOnlineList() {
List result = new ArrayList<>();
- List