Merge branch 'wvp-28181-2.0'

# Conflicts:
#	src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java
结构优化
648540858 2022-10-17 16:56:13 +08:00
commit 82fd369ce5
31 changed files with 502 additions and 361 deletions

View File

@ -277,7 +277,6 @@ CREATE TABLE `media_server` (
`rtspSSLPort` int NOT NULL,
`autoConfig` int NOT NULL,
`secret` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL,
`streamNoneReaderDelayMS` int NOT NULL,
`rtpEnable` int NOT NULL,
`rtpPortRange` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL,
`sendRtpPortRange` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL,

View File

@ -0,0 +1,5 @@
alter table wvp.media_server
drop column streamNoneReaderDelayMS;
alter table stream_proxy
add enable_disable_none_reader bit(1) default null;

View File

@ -1,44 +1,78 @@
package com.genersoft.iot.vmp.common;
import io.swagger.v3.oas.annotations.media.Schema;
@Schema(description = "流信息")
public class StreamInfo {
@Schema(description = "应用名")
private String app;
@Schema(description = "流ID")
private String stream;
@Schema(description = "设备编号")
private String deviceID;
@Schema(description = "通道编号")
private String channelId;
@Schema(description = "HTTP-FLV流地址")
private String flv;
@Schema(description = "IP")
private String ip;
@Schema(description = "HTTPS-FLV流地址")
private String https_flv;
@Schema(description = "Websocket-FLV流地址")
private String ws_flv;
@Schema(description = "Websockets-FLV流地址")
private String wss_flv;
@Schema(description = "HTTP-FMP4流地址")
private String fmp4;
@Schema(description = "HTTPS-FMP4流地址")
private String https_fmp4;
@Schema(description = "Websocket-FMP4流地址")
private String ws_fmp4;
@Schema(description = "Websockets-FMP4流地址")
private String wss_fmp4;
@Schema(description = "HLS流地址")
private String hls;
@Schema(description = "HTTPS-HLS流地址")
private String https_hls;
@Schema(description = "Websocket-HLS流地址")
private String ws_hls;
@Schema(description = "Websockets-HLS流地址")
private String wss_hls;
@Schema(description = "HTTP-TS流地址")
private String ts;
@Schema(description = "HTTPS-TS流地址")
private String https_ts;
@Schema(description = "Websocket-TS流地址")
private String ws_ts;
@Schema(description = "Websockets-TS流地址")
private String wss_ts;
@Schema(description = "RTMP流地址")
private String rtmp;
@Schema(description = "RTMPS流地址")
private String rtmps;
@Schema(description = "RTSP流地址")
private String rtsp;
@Schema(description = "RTSPS流地址")
private String rtsps;
@Schema(description = "RTC流地址")
private String rtc;
@Schema(description = "RTCS流地址")
private String rtcs;
@Schema(description = "流媒体ID")
private String mediaServerId;
@Schema(description = "流编码信息")
private Object tracks;
@Schema(description = "开始时间")
private String startTime;
@Schema(description = "结束时间")
private String endTime;
@Schema(description = "进度(录像下载使用)")
private double progress;
@Schema(description = "是否暂停(录像回放使用)")
private boolean pause;
public static class TransactionInfo{

View File

@ -69,9 +69,6 @@ public class MediaConfig{
@Value("${media.secret}")
private String secret;
@Value("${media.stream-none-reader-delay-ms:15000}")
private int streamNoneReaderDelayMS = 15000;
@Value("${media.rtp.enable}")
private boolean rtpEnable;
@ -151,10 +148,6 @@ public class MediaConfig{
return secret;
}
public int getStreamNoneReaderDelayMS() {
return streamNoneReaderDelayMS;
}
public boolean isRtpEnable() {
return rtpEnable;
}
@ -219,7 +212,6 @@ public class MediaConfig{
mediaServerItem.setRtspSSLPort(rtspSSLPort);
mediaServerItem.setAutoConfig(autoConfig);
mediaServerItem.setSecret(secret);
mediaServerItem.setStreamNoneReaderDelayMS(streamNoneReaderDelayMS);
mediaServerItem.setRtpEnable(rtpEnable);
mediaServerItem.setRtpPortRange(rtpPortRange);
mediaServerItem.setSendRtpPortRange(sendRtpPortRange);

View File

@ -33,6 +33,8 @@ public class UserSetting {
private Boolean usePushingAsStatus = Boolean.TRUE;
private Boolean streamOnDemand = Boolean.TRUE;
private String serverId = "000000";
private String thirdPartyGBIdReg = "[\\s\\S]*";
@ -146,4 +148,12 @@ public class UserSetting {
public void setUsePushingAsStatus(Boolean usePushingAsStatus) {
this.usePushingAsStatus = usePushingAsStatus;
}
public Boolean getStreamOnDemand() {
return streamOnDemand;
}
public void setStreamOnDemand(Boolean streamOnDemand) {
this.streamOnDemand = streamOnDemand;
}
}

View File

@ -162,7 +162,11 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
if (requesterId == null) {
logger.info("无法从FromHeader的Address中获取到平台/设备id返回400");
// 参数不全, 发400请求错误
responseAck(serverTransaction, Response.BAD_REQUEST);
try {
responseAck(serverTransaction, Response.BAD_REQUEST);
} catch (SipException | InvalidArgumentException | ParseException e) {
logger.error("[命令发送失败] invite BAD_REQUEST: {}", e.getMessage());
}
return;
}
String ssrc = null;
@ -209,7 +213,11 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
// return;
// }
// 通道存在发100TRYING
responseAck(serverTransaction, Response.TRYING);
try {
responseAck(serverTransaction, Response.TRYING);
} catch (SipException | InvalidArgumentException | ParseException e) {
logger.error("[命令发送失败] invite TRYING: {}", e.getMessage());
}
} else if (channel == null && gbStream != null) {
String mediaServerId = gbStream.getMediaServerId();
@ -217,13 +225,21 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
if (mediaServerItem == null) {
if ("proxy".equals(gbStream.getStreamType())) {
logger.info("[ app={}, stream={} ]找不到zlm {}返回410", gbStream.getApp(), gbStream.getStream(), mediaServerId);
responseAck(serverTransaction, Response.GONE);
try {
responseAck(serverTransaction, Response.GONE);
} catch (SipException | InvalidArgumentException | ParseException e) {
logger.error("[命令发送失败] invite GONE: {}", e.getMessage());
}
return;
} else {
streamPushItem = streamPushService.getPush(gbStream.getApp(), gbStream.getStream());
if (streamPushItem == null || streamPushItem.getServerId().equals(userSetting.getServerId())) {
logger.info("[ app={}, stream={} ]找不到zlm {}返回410", gbStream.getApp(), gbStream.getStream(), mediaServerId);
responseAck(serverTransaction, Response.GONE);
try {
responseAck(serverTransaction, Response.GONE);
} catch (SipException | InvalidArgumentException | ParseException e) {
logger.error("[命令发送失败] invite GONE: {}", e.getMessage());
}
return;
}
}
@ -232,25 +248,47 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
streamPushItem = streamPushService.getPush(gbStream.getApp(), gbStream.getStream());
if (streamPushItem == null) {
logger.info("[ app={}, stream={} ]找不到zlm {}返回410", gbStream.getApp(), gbStream.getStream(), mediaServerId);
responseAck(serverTransaction, Response.GONE);
try {
responseAck(serverTransaction, Response.GONE);
} catch (SipException | InvalidArgumentException | ParseException e) {
logger.error("[命令发送失败] invite GONE: {}", e.getMessage());
}
return;
}
}else if("proxy".equals(gbStream.getStreamType())){
proxyByAppAndStream = streamProxyService.getStreamProxyByAppAndStream(gbStream.getApp(), gbStream.getStream());
if (proxyByAppAndStream == null) {
logger.info("[ app={}, stream={} ]找不到zlm {}返回410", gbStream.getApp(), gbStream.getStream(), mediaServerId);
responseAck(serverTransaction, Response.GONE);
try {
responseAck(serverTransaction, Response.GONE);
} catch (SipException | InvalidArgumentException | ParseException e) {
logger.error("[命令发送失败] invite GONE: {}", e.getMessage());
}
return;
}
}
}
responseAck(serverTransaction, Response.CALL_IS_BEING_FORWARDED); // 通道存在发181呼叫转接中
try {
responseAck(serverTransaction, Response.CALL_IS_BEING_FORWARDED);
} catch (SipException | InvalidArgumentException | ParseException e) {
logger.error("[命令发送失败] invite CALL_IS_BEING_FORWARDED: {}", e.getMessage());
}
} else if (catalog != null) {
responseAck(serverTransaction, Response.BAD_REQUEST, "catalog channel can not play"); // 目录不支持点播
try {
// 目录不支持点播
responseAck(serverTransaction, Response.BAD_REQUEST, "catalog channel can not play");
} catch (SipException | InvalidArgumentException | ParseException e) {
logger.error("[命令发送失败] invite 目录不支持点播: {}", e.getMessage());
}
return;
} else {
logger.info("通道不存在返回404");
responseAck(serverTransaction, Response.NOT_FOUND); // 通道不存在发404资源不存在
try {
// 通道不存在发404资源不存在
responseAck(serverTransaction, Response.NOT_FOUND);
} catch (SipException | InvalidArgumentException | ParseException e) {
logger.error("[命令发送失败] invite 通道不存在: {}", e.getMessage());
}
return;
}
if (sdp == null || ssrc == null) {
@ -320,7 +358,12 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
if (port == -1) {
logger.info("不支持的媒体格式返回415");
// 回复不支持的格式
responseAck(serverTransaction, Response.UNSUPPORTED_MEDIA_TYPE); // 不支持的格式发415
try {
// 不支持的格式发415
responseAck(serverTransaction, Response.UNSUPPORTED_MEDIA_TYPE);
} catch (SipException | InvalidArgumentException | ParseException e) {
logger.error("[命令发送失败] invite 不支持的格式: {}", e.getMessage());
}
return;
}
String username = sdp.getOrigin().getUsername();
@ -333,13 +376,21 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
device = storager.queryVideoDeviceByPlatformIdAndChannelId(requesterId, channelId);
if (device == null) {
logger.warn("点播平台{}的通道{}时未找到设备信息", requesterId, channel);
responseAck(serverTransaction, Response.SERVER_INTERNAL_ERROR);
try {
responseAck(serverTransaction, Response.SERVER_INTERNAL_ERROR);
} catch (SipException | InvalidArgumentException | ParseException e) {
logger.error("[命令发送失败] invite 未找到设备信息: {}", e.getMessage());
}
return;
}
mediaServerItem = playService.getNewMediaServerItem(device);
if (mediaServerItem == null) {
logger.warn("未找到可用的zlm");
responseAck(serverTransaction, Response.BUSY_HERE);
try {
responseAck(serverTransaction, Response.BUSY_HERE);
} catch (SipException | InvalidArgumentException | ParseException e) {
logger.error("[命令发送失败] invite BUSY_HERE: {}", e.getMessage());
}
return;
}
SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(mediaServerItem, addressStr, port, ssrc, requesterId,
@ -351,7 +402,11 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
}
if (sendRtpItem == null) {
logger.warn("服务器端口资源不足");
responseAck(serverTransaction, Response.BUSY_HERE);
try {
responseAck(serverTransaction, Response.BUSY_HERE);
} catch (SipException | InvalidArgumentException | ParseException e) {
logger.error("[命令发送失败] invite 服务器端口资源不足: {}", e.getMessage());
}
return;
}
sendRtpItem.setCallId(callIdHeader.getCallId());
@ -526,13 +581,8 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
}
}
}
} catch (SipException | InvalidArgumentException | ParseException e) {
e.printStackTrace();
logger.warn("sdp解析错误");
e.printStackTrace();
} catch (SdpParseException e) {
e.printStackTrace();
logger.error("sdp解析错误", e);
} catch (SdpException e) {
e.printStackTrace();
}
@ -544,7 +594,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
private void pushProxyStream(RequestEvent evt, ServerTransaction serverTransaction, GbStream gbStream, ParentPlatform platform,
CallIdHeader callIdHeader, MediaServerItem mediaServerItem,
int port, Boolean tcpActive, boolean mediaTransmissionTCP,
String channelId, String addressStr, String ssrc, String requesterId) throws InvalidArgumentException, ParseException, SipException {
String channelId, String addressStr, String ssrc, String requesterId) {
Boolean streamReady = zlmrtpServerFactory.isStreamReady(mediaServerItem, gbStream.getApp(), gbStream.getStream());
if (streamReady) {
// 自平台内容
@ -554,7 +604,11 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
if (sendRtpItem == null) {
logger.warn("服务器端口资源不足");
responseAck(serverTransaction, Response.BUSY_HERE);
try {
responseAck(serverTransaction, Response.BUSY_HERE);
} catch (SipException | InvalidArgumentException | ParseException e) {
logger.error("[命令发送失败] invite 服务器端口资源不足: {}", e.getMessage());
}
return;
}
if (tcpActive != null) {
@ -579,7 +633,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
private void pushStream(RequestEvent evt, ServerTransaction serverTransaction, GbStream gbStream, StreamPushItem streamPushItem, ParentPlatform platform,
CallIdHeader callIdHeader, MediaServerItem mediaServerItem,
int port, Boolean tcpActive, boolean mediaTransmissionTCP,
String channelId, String addressStr, String ssrc, String requesterId) throws InvalidArgumentException, ParseException, SipException {
String channelId, String addressStr, String ssrc, String requesterId) {
// 推流
if (streamPushItem.isSelf()) {
Boolean streamReady = zlmrtpServerFactory.isStreamReady(mediaServerItem, gbStream.getApp(), gbStream.getStream());
@ -591,7 +645,11 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
if (sendRtpItem == null) {
logger.warn("服务器端口资源不足");
responseAck(serverTransaction, Response.BUSY_HERE);
try {
responseAck(serverTransaction, Response.BUSY_HERE);
} catch (SipException | InvalidArgumentException | ParseException e) {
logger.error("[命令发送失败] invite 服务器端口资源不足: {}", e.getMessage());
}
return;
}
if (tcpActive != null) {
@ -629,15 +687,23 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
private void notifyStreamOnline(RequestEvent evt, ServerTransaction serverTransaction, GbStream gbStream, StreamPushItem streamPushItem, ParentPlatform platform,
CallIdHeader callIdHeader, MediaServerItem mediaServerItem,
int port, Boolean tcpActive, boolean mediaTransmissionTCP,
String channelId, String addressStr, String ssrc, String requesterId) throws InvalidArgumentException, ParseException, SipException {
String channelId, String addressStr, String ssrc, String requesterId) {
if ("proxy".equals(gbStream.getStreamType())) {
// TODO 控制启用以使设备上线
logger.info("[ app={}, stream={} ]通道未推流,启用流后开始推流", gbStream.getApp(), gbStream.getStream());
responseAck(serverTransaction, Response.BAD_REQUEST, "channel [" + gbStream.getGbId() + "] offline");
try {
responseAck(serverTransaction, Response.BAD_REQUEST, "channel [" + gbStream.getGbId() + "] offline");
} catch (SipException | InvalidArgumentException | ParseException e) {
logger.error("[命令发送失败] invite 通道未推流: {}", e.getMessage());
}
} else if ("push".equals(gbStream.getStreamType())) {
if (!platform.isStartOfflinePush()) {
// 平台设置中关闭了拉起离线的推流则直接回复
responseAck(serverTransaction, Response.TEMPORARILY_UNAVAILABLE, "channel stream not pushing");
try {
responseAck(serverTransaction, Response.TEMPORARILY_UNAVAILABLE, "channel stream not pushing");
} catch (SipException | InvalidArgumentException | ParseException e) {
logger.error("[命令发送失败] invite 通道未推流: {}", e.getMessage());
}
return;
}
// 发送redis消息以使设备上线
@ -765,38 +831,28 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
}
redisCatchStorage.updateSendRTPSever(sendRtpItem);
}, (wvpResult) -> {
try {
// 错误
if (wvpResult.getCode() == RedisGbPlayMsgListener.ERROR_CODE_OFFLINE) {
// 离线
// 查询是否在本机上线了
StreamPushItem currentStreamPushItem = streamPushService.getPush(streamPushItem.getApp(), streamPushItem.getStream());
if (currentStreamPushItem.isPushIng()) {
// 在线状态
pushStream(evt, serverTransaction, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive,
mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId);
} else {
// 不在线 拉起
notifyStreamOnline(evt, serverTransaction, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive,
mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId);
}
// 错误
if (wvpResult.getCode() == RedisGbPlayMsgListener.ERROR_CODE_OFFLINE) {
// 离线
// 查询是否在本机上线了
StreamPushItem currentStreamPushItem = streamPushService.getPush(streamPushItem.getApp(), streamPushItem.getStream());
if (currentStreamPushItem.isPushIng()) {
// 在线状态
pushStream(evt, serverTransaction, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive,
mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId);
} else {
// 不在线 拉起
notifyStreamOnline(evt, serverTransaction, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive,
mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId);
}
} catch (InvalidArgumentException | ParseException | SipException e) {
logger.error("[命令发送失败] 国标级联 点播回复: {}", e.getMessage());
}
try {
responseAck(serverTransaction, Response.BUSY_HERE);
} catch (SipException e) {
e.printStackTrace();
} catch (InvalidArgumentException e) {
e.printStackTrace();
} catch (ParseException e) {
e.printStackTrace();
} catch (InvalidArgumentException | ParseException | SipException e) {
logger.error("[命令发送失败] 国标级联 点播回复 BUSY_HERE: {}", e.getMessage());
}
return;
});
}
@ -834,7 +890,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
return null;
}
public void inviteFromDeviceHandle(ServerTransaction serverTransaction, String requesterId, String channelId) throws InvalidArgumentException, ParseException, SipException, SdpException {
public void inviteFromDeviceHandle(ServerTransaction serverTransaction, String requesterId, String channelId) {
// 非上级平台请求,查询是否设备请求(通常为接收语音广播的设备)
Device device = redisCatchStorage.getDevice(requesterId);
AudioBroadcastCatch audioBroadcastCatch = audioBroadcastManager.get(requesterId, channelId);
@ -846,8 +902,11 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
Request request = serverTransaction.getRequest();
if (device != null) {
logger.info("收到设备" + requesterId + "的语音广播Invite请求");
responseAck(serverTransaction, Response.TRYING);
try {
responseAck(serverTransaction, Response.TRYING);
} catch (SipException | InvalidArgumentException | ParseException e) {
logger.error("[命令发送失败] invite BAD_REQUEST: {}", e.getMessage());
}
String contentString = new String(serverTransaction.getRequest().getRawContent());
// jainSip不支持y=字段, 移除移除以解析。
String substring = contentString;
@ -944,7 +1003,11 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
}
} else {
logger.warn("来自无效设备/平台的请求");
responseAck(serverTransaction, Response.BAD_REQUEST);
try {
responseAck(serverTransaction, Response.BAD_REQUEST);; // 不支持的格式发415
} catch (SipException | InvalidArgumentException | ParseException e) {
logger.error("[命令发送失败] invite 来自无效设备/平台的请求, {}", e.getMessage());
}
}
}

View File

@ -93,46 +93,44 @@ public class NotifyRequestProcessor extends SIPRequestProcessorParent implements
@Override
public void process(RequestEvent evt) {
ServerTransaction serverTransaction = getServerTransaction(evt);
try {
taskQueue.offer(new HandlerCatchData(evt, null, null));
ServerTransaction serverTransaction = getServerTransaction(evt);
responseAck(serverTransaction, Response.OK);
if (!taskQueueHandlerRun) {
taskQueueHandlerRun = true;
taskExecutor.execute(()-> {
while (!taskQueue.isEmpty()) {
try {
HandlerCatchData take = taskQueue.poll();
Element rootElement = getRootElement(take.getEvt());
if (rootElement == null) {
logger.error("处理NOTIFY消息时未获取到消息体,{}", take.getEvt().getRequest());
continue;
}
String cmd = XmlUtil.getText(rootElement, "CmdType");
if (CmdType.CATALOG.equals(cmd)) {
logger.info("接收到Catalog通知");
processNotifyCatalogList(take.getEvt());
} else if (CmdType.ALARM.equals(cmd)) {
logger.info("接收到Alarm通知");
processNotifyAlarm(take.getEvt());
} else if (CmdType.MOBILE_POSITION.equals(cmd)) {
logger.info("接收到MobilePosition通知");
processNotifyMobilePosition(take.getEvt());
} else {
logger.info("接收到消息:" + cmd);
}
} catch (DocumentException e) {
logger.error("处理NOTIFY消息时错误", e);
}
}
taskQueueHandlerRun = false;
});
}
} catch (SipException | InvalidArgumentException | ParseException e) {
}catch (SipException | InvalidArgumentException | ParseException e) {
e.printStackTrace();
} finally {
taskQueueHandlerRun = false;
}
taskQueue.offer(new HandlerCatchData(evt, null, null));
if (!taskQueueHandlerRun) {
taskQueueHandlerRun = true;
taskExecutor.execute(()-> {
while (!taskQueue.isEmpty()) {
try {
HandlerCatchData take = taskQueue.poll();
Element rootElement = getRootElement(take.getEvt());
if (rootElement == null) {
logger.error("处理NOTIFY消息时未获取到消息体,{}", take.getEvt().getRequest());
continue;
}
String cmd = XmlUtil.getText(rootElement, "CmdType");
if (CmdType.CATALOG.equals(cmd)) {
logger.info("接收到Catalog通知");
processNotifyCatalogList(take.getEvt());
} else if (CmdType.ALARM.equals(cmd)) {
logger.info("接收到Alarm通知");
processNotifyAlarm(take.getEvt());
} else if (CmdType.MOBILE_POSITION.equals(cmd)) {
logger.info("接收到MobilePosition通知");
processNotifyMobilePosition(take.getEvt());
} else {
logger.info("接收到消息:" + cmd);
}
} catch (DocumentException e) {
logger.error("处理NOTIFY消息时错误", e);
}
}
taskQueueHandlerRun = false;
});
}
}

View File

@ -112,10 +112,10 @@ public class DeviceControlQueryMessageHandler extends SIPRequestProcessorParent
if (deviceForPlatform == null) {
try {
responseAck(serverTransaction, Response.NOT_FOUND);
return;
} catch (SipException | InvalidArgumentException | ParseException e) {
logger.error("[命令发送失败] 错误信息: {}", e.getMessage());
}
return;
}
try {
cmder.fronEndCmd(deviceForPlatform, channelId, cmdString, eventResult -> {

View File

@ -52,35 +52,36 @@ public class KeepaliveNotifyMessageHandler extends SIPRequestProcessorParent imp
// 未注册的设备不做处理
return;
}
// 回复200 OK
try {
// 判断RPort是否改变改变则说明路由nat信息变化修改设备信息
// 获取到通信地址等信息
ViaHeader viaHeader = (ViaHeader) evt.getRequest().getHeader(ViaHeader.NAME);
String received = viaHeader.getReceived();
int rPort = viaHeader.getRPort();
// 解析本地地址替代
if (ObjectUtils.isEmpty(received) || rPort == -1) {
received = viaHeader.getHost();
rPort = viaHeader.getPort();
}
if (device.getPort() != rPort) {
device.setPort(rPort);
device.setHostAddress(received.concat(":").concat(String.valueOf(rPort)));
}
device.setKeepaliveTime(DateUtil.getNow());
// 回复200 OK
responseAck(getServerTransaction(evt), Response.OK);
if (device.getOnline() == 1) {
deviceService.updateDevice(device);
}else {
// 对于已经离线的设备判断他的注册是否已经过期
if (!deviceService.expire(device)){
deviceService.online(device);
}
}
} catch (SipException | InvalidArgumentException | ParseException e) {
logger.error("[命令发送失败] 国标级联 心跳回复: {}", e.getMessage());
}
// 判断RPort是否改变改变则说明路由nat信息变化修改设备信息
// 获取到通信地址等信息
ViaHeader viaHeader = (ViaHeader) evt.getRequest().getHeader(ViaHeader.NAME);
String received = viaHeader.getReceived();
int rPort = viaHeader.getRPort();
// 解析本地地址替代
if (ObjectUtils.isEmpty(received) || rPort == -1) {
received = viaHeader.getHost();
rPort = viaHeader.getPort();
}
if (device.getPort() != rPort) {
device.setPort(rPort);
device.setHostAddress(received.concat(":").concat(String.valueOf(rPort)));
}
device.setKeepaliveTime(DateUtil.getNow());
if (device.getOnline() == 1) {
deviceService.updateDevice(device);
}else {
// 对于已经离线的设备判断他的注册是否已经过期
if (!deviceService.expire(device)){
deviceService.online(device);
}
}
}
@Override

View File

@ -81,8 +81,12 @@ public class MobilePositionNotifyMessageHandler extends SIPRequestProcessorParen
try {
Element rootElementAfterCharset = getRootElement(sipMsgInfo.getEvt(), sipMsgInfo.getDevice().getCharset());
if (rootElementAfterCharset == null) {
logger.warn("[ 移动设备位置数据通知 ] content cannot be null, {}", sipMsgInfo.getEvt().getRequest());
responseAck(getServerTransaction(sipMsgInfo.getEvt()), Response.BAD_REQUEST);
try {
logger.warn("[ 移动设备位置数据通知 ] content cannot be null, {}", sipMsgInfo.getEvt().getRequest());
responseAck(getServerTransaction(sipMsgInfo.getEvt()), Response.BAD_REQUEST);
} catch (SipException | InvalidArgumentException | ParseException e) {
logger.error("[命令发送失败] 移动设备位置数据通知 内容为空: {}", e.getMessage());
}
continue;
}
MobilePosition mobilePosition = new MobilePosition();
@ -133,7 +137,11 @@ public class MobilePositionNotifyMessageHandler extends SIPRequestProcessorParen
}
storager.updateChannelPosition(deviceChannel);
//回复 200 OK
responseAck(getServerTransaction(sipMsgInfo.getEvt()), Response.OK);
try {
responseAck(getServerTransaction(sipMsgInfo.getEvt()), Response.OK);
} catch (SipException | InvalidArgumentException | ParseException e) {
logger.error("[命令发送失败] 移动设备位置数据回复200: {}", e.getMessage());
}
// 发送redis消息。 通知位置信息的变化
JSONObject jsonObject = new JSONObject();
@ -147,7 +155,7 @@ public class MobilePositionNotifyMessageHandler extends SIPRequestProcessorParen
jsonObject.put("speed", mobilePosition.getSpeed());
redisCatchStorage.sendMobilePositionMsg(jsonObject);
} catch (DocumentException | SipException | InvalidArgumentException | ParseException e) {
} catch (DocumentException e) {
e.printStackTrace();
}

View File

@ -67,33 +67,37 @@ public class CatalogQueryMessageHandler extends SIPRequestProcessorParent implem
try {
// 回复200 OK
responseAck(getServerTransaction(evt), Response.OK);
Element snElement = rootElement.element("SN");
String sn = snElement.getText();
// 准备回复通道信息
List<DeviceChannel> deviceChannelInPlatforms = storager.queryChannelWithCatalog(parentPlatform.getServerGBId());
// 查询关联的直播通道
List<DeviceChannel> gbStreams = storager.queryGbStreamListInPlatform(parentPlatform.getServerGBId());
// 回复目录信息
List<DeviceChannel> catalogs = storager.queryCatalogInPlatform(parentPlatform.getServerGBId());
} catch (SipException | InvalidArgumentException | ParseException e) {
logger.error("[命令发送失败] 国标级联 目录查询回复200OK: {}", e.getMessage());
}
Element snElement = rootElement.element("SN");
String sn = snElement.getText();
// 准备回复通道信息
List<DeviceChannel> deviceChannelInPlatforms = storager.queryChannelWithCatalog(parentPlatform.getServerGBId());
// 查询关联的直播通道
List<DeviceChannel> gbStreams = storager.queryGbStreamListInPlatform(parentPlatform.getServerGBId());
// 回复目录信息
List<DeviceChannel> catalogs = storager.queryCatalogInPlatform(parentPlatform.getServerGBId());
List<DeviceChannel> allChannels = new ArrayList<>();
List<DeviceChannel> allChannels = new ArrayList<>();
// 回复平台
// 回复平台
// DeviceChannel deviceChannel = getChannelForPlatform(parentPlatform);
// allChannels.add(deviceChannel);
// 回复目录
if (catalogs.size() > 0) {
allChannels.addAll(catalogs);
}
// 回复级联的通道
if (deviceChannelInPlatforms.size() > 0) {
allChannels.addAll(deviceChannelInPlatforms);
}
// 回复直播的通道
if (gbStreams.size() > 0) {
allChannels.addAll(gbStreams);
}
// 回复目录
if (catalogs.size() > 0) {
allChannels.addAll(catalogs);
}
// 回复级联的通道
if (deviceChannelInPlatforms.size() > 0) {
allChannels.addAll(deviceChannelInPlatforms);
}
// 回复直播的通道
if (gbStreams.size() > 0) {
allChannels.addAll(gbStreams);
}
try {
if (allChannels.size() > 0) {
cmderFroPlatform.catalogQuery(allChannels, parentPlatform, sn, fromHeader.getTag());
}else {
@ -101,9 +105,11 @@ public class CatalogQueryMessageHandler extends SIPRequestProcessorParent implem
cmderFroPlatform.catalogQuery(null, parentPlatform, sn, fromHeader.getTag(), 0);
}
} catch (SipException | InvalidArgumentException | ParseException e) {
logger.error("[命令发送失败] 国标级联 目录查询: {}", e.getMessage());
logger.error("[命令发送失败] 国标级联 目录查询回复: {}", e.getMessage());
}
}
private DeviceChannel getChannelForPlatform(ParentPlatform platform) {

View File

@ -53,19 +53,20 @@ public class ConfigDownloadResponseMessageHandler extends SIPRequestProcessorPar
try {
// 回复200 OK
responseAck(getServerTransaction(evt), Response.OK);
// 此处是对本平台发出DeviceControl指令的应答
JSONObject json = new JSONObject();
XmlUtil.node2Json(element, json);
if (logger.isDebugEnabled()) {
logger.debug(json.toJSONString());
}
RequestMessage msg = new RequestMessage();
msg.setKey(key);
msg.setData(json);
deferredResultHolder.invokeAllResult(msg);
} catch (SipException | InvalidArgumentException | ParseException e) {
logger.error("[命令发送失败] 国标级联 设备配置查询: {}", e.getMessage());
logger.error("[命令发送失败] 设备配置查询: {}", e.getMessage());
}
// 此处是对本平台发出DeviceControl指令的应答
JSONObject json = new JSONObject();
XmlUtil.node2Json(element, json);
if (logger.isDebugEnabled()) {
logger.debug(json.toJSONString());
}
RequestMessage msg = new RequestMessage();
msg.setKey(key);
msg.setData(json);
deferredResultHolder.invokeAllResult(msg);
}

View File

@ -47,20 +47,21 @@ public class DeviceControlResponseMessageHandler extends SIPRequestProcessorPare
// 此处是对本平台发出DeviceControl指令的应答
try {
responseAck(getServerTransaction(evt), Response.OK);
JSONObject json = new JSONObject();
String channelId = getText(element, "DeviceID");
XmlUtil.node2Json(element, json);
if (logger.isDebugEnabled()) {
logger.debug(json.toJSONString());
}
RequestMessage msg = new RequestMessage();
String key = DeferredResultHolder.CALLBACK_CMD_DEVICECONTROL + device.getDeviceId() + channelId;
msg.setKey(key);
msg.setData(json);
deferredResultHolder.invokeAllResult(msg);
} catch (SipException | InvalidArgumentException | ParseException e) {
logger.error("[命令发送失败] 国标级联 设备控制: {}", e.getMessage());
}
JSONObject json = new JSONObject();
String channelId = getText(element, "DeviceID");
XmlUtil.node2Json(element, json);
if (logger.isDebugEnabled()) {
logger.debug(json.toJSONString());
}
RequestMessage msg = new RequestMessage();
String key = DeferredResultHolder.CALLBACK_CMD_DEVICECONTROL + device.getDeviceId() + channelId;
msg.setKey(key);
msg.setData(json);
deferredResultHolder.invokeAllResult(msg);
}
@Override

View File

@ -78,9 +78,14 @@ public class DeviceInfoResponseMessageHandler extends SIPRequestProcessorParent
ServerTransaction serverTransaction = getServerTransaction(evt);
try {
rootElement = getRootElement(evt, device.getCharset());
if (rootElement == null) {
if (rootElement == null) {
logger.warn("[ 接收到DeviceInfo应答消息 ] content cannot be null, {}", evt.getRequest());
responseAck(serverTransaction, Response.BAD_REQUEST);
try {
responseAck(serverTransaction, Response.BAD_REQUEST);
} catch (SipException | InvalidArgumentException | ParseException e) {
logger.error("[命令发送失败] DeviceInfo应答消息 BAD_REQUEST: {}", e.getMessage());
}
return;
}
Element deviceIdElement = rootElement.element("DeviceID");
@ -100,17 +105,16 @@ public class DeviceInfoResponseMessageHandler extends SIPRequestProcessorParent
msg.setKey(key);
msg.setData(device);
deferredResultHolder.invokeAllResult(msg);
} catch (DocumentException e) {
throw new RuntimeException(e);
}
try {
// 回复200 OK
responseAck(serverTransaction, Response.OK);
} catch (DocumentException e) {
e.printStackTrace();
} catch (InvalidArgumentException e) {
e.printStackTrace();
} catch (ParseException e) {
e.printStackTrace();
} catch (SipException e) {
e.printStackTrace();
} catch (SipException | InvalidArgumentException | ParseException e) {
logger.error("[命令发送失败] DeviceInfo应答消息 200: {}", e.getMessage());
}
}
@Override

View File

@ -71,7 +71,11 @@ public class MobilePositionResponseMessageHandler extends SIPRequestProcessorPar
rootElement = getRootElement(evt, device.getCharset());
if (rootElement == null) {
logger.warn("[ 移动设备位置数据查询回复 ] content cannot be null, {}", evt.getRequest());
responseAck(serverTransaction, Response.BAD_REQUEST);
try {
responseAck(serverTransaction, Response.BAD_REQUEST);
} catch (SipException | InvalidArgumentException | ParseException e) {
logger.error("[命令发送失败] 移动设备位置数据查询 BAD_REQUEST: {}", e.getMessage());
}
return;
}
MobilePosition mobilePosition = new MobilePosition();
@ -133,8 +137,13 @@ public class MobilePositionResponseMessageHandler extends SIPRequestProcessorPar
jsonObject.put("speed", mobilePosition.getSpeed());
redisCatchStorage.sendMobilePositionMsg(jsonObject);
//回复 200 OK
responseAck(serverTransaction, Response.OK);
} catch (DocumentException | SipException | InvalidArgumentException | ParseException e) {
try {
responseAck(serverTransaction, Response.OK);
} catch (SipException | InvalidArgumentException | ParseException e) {
logger.error("[命令发送失败] 移动设备位置数据查询 200: {}", e.getMessage());
}
} catch (DocumentException e) {
e.printStackTrace();
}
}

View File

@ -58,7 +58,11 @@ public class PresetQueryResponseMessageHandler extends SIPRequestProcessorParent
if (rootElement == null) {
logger.warn("[ 设备预置位查询应答 ] content cannot be null, {}", evt.getRequest());
responseAck(serverTransaction, Response.BAD_REQUEST);
try {
responseAck(serverTransaction, Response.BAD_REQUEST);
} catch (InvalidArgumentException | ParseException | SipException e) {
logger.error("[命令发送失败] 设备预置位查询应答处理: {}", e.getMessage());
}
return;
}
Element presetListNumElement = rootElement.element("PresetList");
@ -67,7 +71,11 @@ public class PresetQueryResponseMessageHandler extends SIPRequestProcessorParent
String deviceId = getText(rootElement, "DeviceID");
String key = DeferredResultHolder.CALLBACK_CMD_PRESETQUERY + deviceId;
if (snElement == null || presetListNumElement == null) {
responseAck(serverTransaction, Response.BAD_REQUEST, "xml error");
try {
responseAck(serverTransaction, Response.BAD_REQUEST, "xml error");
} catch (InvalidArgumentException | ParseException | SipException e) {
logger.error("[命令发送失败] 设备预置位查询应答处理: {}", e.getMessage());
}
return;
}
int sumNum = Integer.parseInt(presetListNumElement.attributeValue("Num"));
@ -94,11 +102,13 @@ public class PresetQueryResponseMessageHandler extends SIPRequestProcessorParent
requestMessage.setKey(key);
requestMessage.setData(presetQuerySipReqList);
deferredResultHolder.invokeAllResult(requestMessage);
responseAck(serverTransaction, Response.OK);
try {
responseAck(serverTransaction, Response.OK);
} catch (InvalidArgumentException | ParseException | SipException e) {
logger.error("[命令发送失败] 设备预置位查询应答处理: {}", e.getMessage());
}
} catch (DocumentException e) {
logger.error("[解析xml]失败: ", e);
} catch (InvalidArgumentException | ParseException | SipException e) {
logger.error("[命令发送失败] 设备预置位查询应答处理: {}", e.getMessage());
}
}

View File

@ -69,95 +69,91 @@ public class RecordInfoResponseMessageHandler extends SIPRequestProcessorParent
@Override
public void handForDevice(RequestEvent evt, Device device, Element rootElement) {
// 回复200 OK
try {
// 回复200 OK
responseAck(getServerTransaction(evt), Response.OK);
taskQueue.offer(new HandlerCatchData(evt, device, rootElement));
if (!taskQueueHandlerRun) {
taskQueueHandlerRun = true;
taskExecutor.execute(()->{
while (!taskQueue.isEmpty()) {
try {
HandlerCatchData take = taskQueue.poll();
Element rootElementForCharset = getRootElement(take.getEvt(), take.getDevice().getCharset());
if (rootElement == null) {
logger.warn("[ 国标录像 ] content cannot be null, {}", evt.getRequest());
continue;
}
String sn = getText(rootElementForCharset, "SN");
String channelId = getText(rootElementForCharset, "DeviceID");
RecordInfo recordInfo = new RecordInfo();
recordInfo.setChannelId(channelId);
recordInfo.setDeviceId(take.getDevice().getDeviceId());
recordInfo.setSn(sn);
recordInfo.setName(getText(rootElementForCharset, "Name"));
String sumNumStr = getText(rootElementForCharset, "SumNum");
int sumNum = 0;
if (!ObjectUtils.isEmpty(sumNumStr)) {
sumNum = Integer.parseInt(sumNumStr);
}
recordInfo.setSumNum(sumNum);
Element recordListElement = rootElementForCharset.element("RecordList");
if (recordListElement == null || sumNum == 0) {
logger.info("无录像数据");
eventPublisher.recordEndEventPush(recordInfo);
recordDataCatch.put(take.getDevice().getDeviceId(), sn, sumNum, new ArrayList<>());
releaseRequest(take.getDevice().getDeviceId(), sn);
} else {
Iterator<Element> recordListIterator = recordListElement.elementIterator();
if (recordListIterator != null) {
List<RecordItem> recordList = new ArrayList<>();
// 遍历DeviceList
while (recordListIterator.hasNext()) {
Element itemRecord = recordListIterator.next();
Element recordElement = itemRecord.element("DeviceID");
if (recordElement == null) {
logger.info("记录为空,下一个...");
continue;
}
RecordItem record = new RecordItem();
record.setDeviceId(getText(itemRecord, "DeviceID"));
record.setName(getText(itemRecord, "Name"));
record.setFilePath(getText(itemRecord, "FilePath"));
record.setFileSize(getText(itemRecord, "FileSize"));
record.setAddress(getText(itemRecord, "Address"));
String startTimeStr = getText(itemRecord, "StartTime");
record.setStartTime(DateUtil.ISO8601Toyyyy_MM_dd_HH_mm_ss(startTimeStr));
String endTimeStr = getText(itemRecord, "EndTime");
record.setEndTime(DateUtil.ISO8601Toyyyy_MM_dd_HH_mm_ss(endTimeStr));
record.setSecrecy(itemRecord.element("Secrecy") == null ? 0
: Integer.parseInt(getText(itemRecord, "Secrecy")));
record.setType(getText(itemRecord, "Type"));
record.setRecorderId(getText(itemRecord, "RecorderID"));
recordList.add(record);
}
recordInfo.setRecordList(recordList);
// 发送消息,如果是上级查询此录像,则会通过这里通知给上级
eventPublisher.recordEndEventPush(recordInfo);
int count = recordDataCatch.put(take.getDevice().getDeviceId(), sn, sumNum, recordList);
logger.info("[国标录像] {}->{}: {}/{}", take.getDevice().getDeviceId(), sn, count, sumNum);
}
if (recordDataCatch.isComplete(take.getDevice().getDeviceId(), sn)){
releaseRequest(take.getDevice().getDeviceId(), sn);
}
}
} catch (DocumentException e) {
logger.error("xml解析异常 ", e);
}
}
taskQueueHandlerRun = false;
});
}
} catch (SipException | InvalidArgumentException | ParseException e) {
}catch (SipException | InvalidArgumentException | ParseException e) {
logger.error("[命令发送失败] 国标级联 国标录像: {}", e.getMessage());
} finally {
taskQueueHandlerRun = false;
}
taskQueue.offer(new HandlerCatchData(evt, device, rootElement));
if (!taskQueueHandlerRun) {
taskQueueHandlerRun = true;
taskExecutor.execute(()->{
while (!taskQueue.isEmpty()) {
try {
HandlerCatchData take = taskQueue.poll();
Element rootElementForCharset = getRootElement(take.getEvt(), take.getDevice().getCharset());
if (rootElement == null) {
logger.warn("[ 国标录像 ] content cannot be null, {}", evt.getRequest());
continue;
}
String sn = getText(rootElementForCharset, "SN");
String channelId = getText(rootElementForCharset, "DeviceID");
RecordInfo recordInfo = new RecordInfo();
recordInfo.setChannelId(channelId);
recordInfo.setDeviceId(take.getDevice().getDeviceId());
recordInfo.setSn(sn);
recordInfo.setName(getText(rootElementForCharset, "Name"));
String sumNumStr = getText(rootElementForCharset, "SumNum");
int sumNum = 0;
if (!ObjectUtils.isEmpty(sumNumStr)) {
sumNum = Integer.parseInt(sumNumStr);
}
recordInfo.setSumNum(sumNum);
Element recordListElement = rootElementForCharset.element("RecordList");
if (recordListElement == null || sumNum == 0) {
logger.info("无录像数据");
eventPublisher.recordEndEventPush(recordInfo);
recordDataCatch.put(take.getDevice().getDeviceId(), sn, sumNum, new ArrayList<>());
releaseRequest(take.getDevice().getDeviceId(), sn);
} else {
Iterator<Element> recordListIterator = recordListElement.elementIterator();
if (recordListIterator != null) {
List<RecordItem> recordList = new ArrayList<>();
// 遍历DeviceList
while (recordListIterator.hasNext()) {
Element itemRecord = recordListIterator.next();
Element recordElement = itemRecord.element("DeviceID");
if (recordElement == null) {
logger.info("记录为空,下一个...");
continue;
}
RecordItem record = new RecordItem();
record.setDeviceId(getText(itemRecord, "DeviceID"));
record.setName(getText(itemRecord, "Name"));
record.setFilePath(getText(itemRecord, "FilePath"));
record.setFileSize(getText(itemRecord, "FileSize"));
record.setAddress(getText(itemRecord, "Address"));
String startTimeStr = getText(itemRecord, "StartTime");
record.setStartTime(DateUtil.ISO8601Toyyyy_MM_dd_HH_mm_ss(startTimeStr));
String endTimeStr = getText(itemRecord, "EndTime");
record.setEndTime(DateUtil.ISO8601Toyyyy_MM_dd_HH_mm_ss(endTimeStr));
record.setSecrecy(itemRecord.element("Secrecy") == null ? 0
: Integer.parseInt(getText(itemRecord, "Secrecy")));
record.setType(getText(itemRecord, "Type"));
record.setRecorderId(getText(itemRecord, "RecorderID"));
recordList.add(record);
}
recordInfo.setRecordList(recordList);
// 发送消息,如果是上级查询此录像,则会通过这里通知给上级
eventPublisher.recordEndEventPush(recordInfo);
int count = recordDataCatch.put(take.getDevice().getDeviceId(), sn, sumNum, recordList);
logger.info("[国标录像] {}->{}: {}/{}", take.getDevice().getDeviceId(), sn, count, sumNum);
}
if (recordDataCatch.isComplete(take.getDevice().getDeviceId(), sn)){
releaseRequest(take.getDevice().getDeviceId(), sn);
}
}
} catch (DocumentException e) {
logger.error("xml解析异常 ", e);
}
}
taskQueueHandlerRun = false;
});
}
}

View File

@ -695,9 +695,12 @@ public class ZLMHttpHookListener {
String app = json.getString("app");
JSONObject ret = new JSONObject();
ret.put("code", 0);
// 录像下载
ret.put("close", userSetting.getStreamOnDemand());
if ("rtp".equals(app)){
ret.put("close", true);
// 国标流, 点播/录像回放/录像下载
StreamInfo streamInfoForPlayCatch = redisCatchStorage.queryPlayByStreamId(streamId);
// 点播
if (streamInfoForPlayCatch != null) {
// 收到无人观看说明流也没有在往上级推送
if (redisCatchStorage.isChannelSendingRTP(streamInfoForPlayCatch.getChannelId())) {
@ -727,40 +730,39 @@ public class ZLMHttpHookListener {
redisCatchStorage.stopPlay(streamInfoForPlayCatch);
storager.stopPlay(streamInfoForPlayCatch.getDeviceID(), streamInfoForPlayCatch.getChannelId());
}else{
StreamInfo streamInfoForPlayBackCatch = redisCatchStorage.queryPlayback(null, null, streamId, null);
if (streamInfoForPlayBackCatch != null ) {
if (streamInfoForPlayBackCatch.isPause()) {
ret.put("close", false);
}else {
Device device = deviceService.queryDevice(streamInfoForPlayBackCatch.getDeviceID());
if (device != null) {
try {
cmder.streamByeCmd(device,streamInfoForPlayBackCatch.getChannelId(),
streamInfoForPlayBackCatch.getStream(), null);
} catch (InvalidArgumentException | ParseException | SipException |
SsrcTransactionNotFoundException e) {
logger.error("[无人观看]回放, 发送BYE失败 {}", e.getMessage());
}
}
redisCatchStorage.stopPlayback(streamInfoForPlayBackCatch.getDeviceID(),
streamInfoForPlayBackCatch.getChannelId(), streamInfoForPlayBackCatch.getStream(), null);
}
return ret;
}
// 录像回放
StreamInfo streamInfoForPlayBackCatch = redisCatchStorage.queryPlayback(null, null, streamId, null);
if (streamInfoForPlayBackCatch != null ) {
if (streamInfoForPlayBackCatch.isPause()) {
ret.put("close", false);
}else {
StreamInfo streamInfoForDownload = redisCatchStorage.queryDownload(null, null, streamId, null);
// 进行录像下载时无人观看不断流
if (streamInfoForDownload != null) {
ret.put("close", false);
Device device = deviceService.queryDevice(streamInfoForPlayBackCatch.getDeviceID());
if (device != null) {
try {
cmder.streamByeCmd(device,streamInfoForPlayBackCatch.getChannelId(),
streamInfoForPlayBackCatch.getStream(), null);
} catch (InvalidArgumentException | ParseException | SipException |
SsrcTransactionNotFoundException e) {
logger.error("[无人观看]回放, 发送BYE失败 {}", e.getMessage());
}
}
redisCatchStorage.stopPlayback(streamInfoForPlayBackCatch.getDeviceID(),
streamInfoForPlayBackCatch.getChannelId(), streamInfoForPlayBackCatch.getStream(), null);
}
return ret;
}
MediaServerItem mediaServerItem = mediaServerService.getOne(mediaServerId);
if (mediaServerItem != null && mediaServerItem.getStreamNoneReaderDelayMS() == -1) {
// 录像下载
StreamInfo streamInfoForDownload = redisCatchStorage.queryDownload(null, null, streamId, null);
// 进行录像下载时无人观看不断流
if (streamInfoForDownload != null) {
ret.put("close", false);
return ret;
}
return ret;
}else {
// 非国标流 推流/拉流代理
// 拉流代理
StreamProxyItem streamProxyItem = streamProxyService.getStreamProxyByAppAndStream(app, streamId);
if (streamProxyItem != null ) {
if (streamProxyItem.isEnable_remove_none_reader()) {
@ -772,12 +774,21 @@ public class ZLMHttpHookListener {
}else if (streamProxyItem.isEnable_disable_none_reader()) {
// 无人观看停用
ret.put("close", true);
// 修改数据
streamProxyService.stop(app, streamId);
}else {
ret.put("close", false);
}
return ret;
}
return ret;
// 推流具有主动性,暂时不做处理
// StreamPushItem streamPushItem = streamPushService.getPush(app, streamId);
// if (streamPushItem != null) {
// // TODO 发送停止
//
// }
}
return ret;
}
/**
@ -792,19 +803,27 @@ public class ZLMHttpHookListener {
}
String mediaServerId = json.getString("mediaServerId");
MediaServerItem mediaInfo = mediaServerService.getOne(mediaServerId);
if (userSetting.isAutoApplyPlay() && mediaInfo != null && mediaInfo.isRtpEnable()) {
if (userSetting.isAutoApplyPlay() && mediaInfo != null) {
String app = json.getString("app");
String streamId = json.getString("stream");
if ("rtp".equals(app)) {
String[] s = streamId.split("_");
if (s.length == 2) {
String deviceId = s[0];
String channelId = s[1];
Device device = redisCatchStorage.getDevice(deviceId);
if (device != null) {
playService.play(mediaInfo,deviceId, channelId, null, null, null);
if (mediaInfo.isRtpEnable()) {
String[] s = streamId.split("_");
if (s.length == 2) {
String deviceId = s[0];
String channelId = s[1];
Device device = redisCatchStorage.getDevice(deviceId);
if (device != null) {
playService.play(mediaInfo,deviceId, channelId, null, null, null);
}
}
}
}else {
// 拉流代理
StreamProxyItem streamProxyByAppAndStream = streamProxyService.getStreamProxyByAppAndStream(app, streamId);
if (streamProxyByAppAndStream != null && streamProxyByAppAndStream.isEnable_disable_none_reader()) {
streamProxyService.start(app, streamId);
}
}
}

View File

@ -54,9 +54,6 @@ public class MediaServerItem{
@Schema(description = "ZLM鉴权参数")
private String secret;
@Schema(description = "某个流无人观看时触发hook.on_stream_none_reader事件的最大等待时间单位毫秒")
private int streamNoneReaderDelayMS;
@Schema(description = "keepalive hook触发间隔,单位秒")
private int hookAliveInterval;
@ -119,7 +116,6 @@ public class MediaServerItem{
rtspSSLPort = zlmServerConfig.getRtspSSlport();
autoConfig = true; // 默认值true;
secret = zlmServerConfig.getApiSecret();
streamNoneReaderDelayMS = zlmServerConfig.getGeneralStreamNoneReaderDelayMS();
hookAliveInterval = zlmServerConfig.getHookAliveInterval();
rtpEnable = false; // 默认使用单端口;直到用户自己设置开启多端口
rtpPortRange = zlmServerConfig.getPortRange().replace("_",","); // 默认使用30000,30500作为级联时发送流的端口号
@ -240,14 +236,6 @@ public class MediaServerItem{
this.secret = secret;
}
public int getStreamNoneReaderDelayMS() {
return streamNoneReaderDelayMS;
}
public void setStreamNoneReaderDelayMS(int streamNoneReaderDelayMS) {
this.streamNoneReaderDelayMS = streamNoneReaderDelayMS;
}
public boolean isRtpEnable() {
return rtpEnable;
}

View File

@ -38,7 +38,7 @@ public class StreamProxyItem extends GbStream {
@Schema(description = "是否 无人观看时删除")
private boolean enable_remove_none_reader;
@Schema(description = "是否 无人观看时不启用")
@Schema(description = "是否 无人观看时自动停用")
private boolean enable_disable_none_reader;
@Schema(description = "上级平台国标ID")
private String platformGbId;

View File

@ -547,7 +547,6 @@ public class MediaServerServiceImpl implements IMediaServerService {
param.put("hook.on_record_mp4","");
}
param.put("hook.timeoutSec","20");
param.put("general.streamNoneReaderDelayMS",mediaServerItem.getStreamNoneReaderDelayMS()==-1?"3600000":mediaServerItem.getStreamNoneReaderDelayMS() );
// 推流断开后可以在超时时间内重新连接上继续推流,这样播放器会接着播放。
// 置0关闭此特性(推流断开会导致立即断开播放器)
// 此参数不应大于播放器超时时间
@ -612,7 +611,6 @@ public class MediaServerServiceImpl implements IMediaServerService {
mediaServerItem.setStreamIp(ip);
mediaServerItem.setHookIp(sipConfig.getIp());
mediaServerItem.setSdpIp(ip);
mediaServerItem.setStreamNoneReaderDelayMS(zlmServerConfig.getGeneralStreamNoneReaderDelayMS());
return mediaServerItem;
}

View File

@ -143,7 +143,7 @@ public class PlayServiceImpl implements IPlayService {
String uuid = UUID.randomUUID().toString();
msg.setId(uuid);
playResult.setUuid(uuid);
DeferredResult<WVPResult<String>> result = new DeferredResult<>(userSetting.getPlayTimeout().longValue());
DeferredResult<WVPResult<StreamInfo>> result = new DeferredResult<>(userSetting.getPlayTimeout().longValue());
playResult.setResult(result);
// 录像查询以channelId作为deviceId查询
resultHolder.put(key, uuid, result);

View File

@ -26,7 +26,6 @@ public interface MediaServerMapper {
"rtspSSLPort, " +
"autoConfig, " +
"secret, " +
"streamNoneReaderDelayMS, " +
"rtpEnable, " +
"rtpPortRange, " +
"sendRtpPortRange, " +
@ -51,7 +50,6 @@ public interface MediaServerMapper {
"${rtspSSLPort}, " +
"${autoConfig}, " +
"'${secret}', " +
"${streamNoneReaderDelayMS}, " +
"${rtpEnable}, " +
"'${rtpPortRange}', " +
"'${sendRtpPortRange}', " +
@ -77,7 +75,6 @@ public interface MediaServerMapper {
"<if test=\"rtspPort != null\">, rtspPort=${rtspPort}</if>" +
"<if test=\"rtspSSLPort != null\">, rtspSSLPort=${rtspSSLPort}</if>" +
"<if test=\"autoConfig != null\">, autoConfig=${autoConfig}</if>" +
"<if test=\"streamNoneReaderDelayMS != null\">, streamNoneReaderDelayMS=${streamNoneReaderDelayMS}</if>" +
"<if test=\"rtpEnable != null\">, rtpEnable=${rtpEnable}</if>" +
"<if test=\"rtpPortRange != null\">, rtpPortRange='${rtpPortRange}'</if>" +
"<if test=\"sendRtpPortRange != null\">, sendRtpPortRange='${sendRtpPortRange}'</if>" +
@ -102,7 +99,6 @@ public interface MediaServerMapper {
"<if test=\"rtspPort != null\">, rtspPort=${rtspPort}</if>" +
"<if test=\"rtspSSLPort != null\">, rtspSSLPort=${rtspSSLPort}</if>" +
"<if test=\"autoConfig != null\">, autoConfig=${autoConfig}</if>" +
"<if test=\"streamNoneReaderDelayMS != null\">, streamNoneReaderDelayMS=${streamNoneReaderDelayMS}</if>" +
"<if test=\"rtpEnable != null\">, rtpEnable=${rtpEnable}</if>" +
"<if test=\"rtpPortRange != null\">, rtpPortRange='${rtpPortRange}'</if>" +
"<if test=\"sendRtpPortRange != null\">, sendRtpPortRange='${sendRtpPortRange}'</if>" +

View File

@ -11,10 +11,10 @@ import java.util.List;
public interface StreamProxyMapper {
@Insert("INSERT INTO stream_proxy (type, name, app, stream,mediaServerId, url, src_url, dst_url, " +
"timeout_ms, ffmpeg_cmd_key, rtp_type, enable_hls, enable_mp4, enable, status, enable_remove_none_reader, createTime) VALUES" +
"timeout_ms, ffmpeg_cmd_key, rtp_type, enable_hls, enable_mp4, enable, status, enable_remove_none_reader, enable_disable_none_reader, createTime) VALUES" +
"('${type}','${name}', '${app}', '${stream}', '${mediaServerId}','${url}', '${src_url}', '${dst_url}', " +
"'${timeout_ms}', '${ffmpeg_cmd_key}', '${rtp_type}', ${enable_hls}, ${enable_mp4}, ${enable}, ${status}, " +
"${enable_remove_none_reader}, '${createTime}' )")
"${enable_remove_none_reader}, ${enable_disable_none_reader}, '${createTime}' )")
int add(StreamProxyItem streamProxyDto);
@Update("UPDATE stream_proxy " +
@ -33,6 +33,7 @@ public interface StreamProxyMapper {
"enable=#{enable}, " +
"status=#{status}, " +
"enable_remove_none_reader=#{enable_remove_none_reader}, " +
"enable_disable_none_reader=#{enable_disable_none_reader}, " +
"enable_mp4=#{enable_mp4} " +
"WHERE app=#{app} AND stream=#{stream}")
int update(StreamProxyItem streamProxyDto);

View File

@ -87,7 +87,7 @@ public class PlayController {
@Parameter(name = "deviceId", description = "设备国标编号", required = true)
@Parameter(name = "channelId", description = "通道国标编号", required = true)
@GetMapping("/start/{deviceId}/{channelId}")
public DeferredResult<WVPResult<String>> play(@PathVariable String deviceId,
public DeferredResult<WVPResult<StreamInfo>> play(@PathVariable String deviceId,
@PathVariable String channelId) {
// 获取可用的zlm

View File

@ -1,5 +1,6 @@
package com.genersoft.iot.vmp.vmanager.gb28181.play.bean;
import com.genersoft.iot.vmp.common.StreamInfo;
import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.vmanager.bean.WVPResult;
import org.springframework.http.ResponseEntity;
@ -7,16 +8,16 @@ import org.springframework.web.context.request.async.DeferredResult;
public class PlayResult {
private DeferredResult<WVPResult<String>> result;
private DeferredResult<WVPResult<StreamInfo>> result;
private String uuid;
private Device device;
public DeferredResult<WVPResult<String>> getResult() {
public DeferredResult<WVPResult<StreamInfo>> getResult() {
return result;
}
public void setResult(DeferredResult<WVPResult<String>> result) {
public void setResult(DeferredResult<WVPResult<StreamInfo>> result) {
this.result = result;
}

View File

@ -125,8 +125,8 @@ public class UserController {
}
}
@DeleteMapping("/删除用户")
@Operation(summary = "停止视频回放")
@DeleteMapping("/delete")
@Operation(summary = "删除用户")
@Parameter(name = "id", description = "用户Id", required = true)
public void delete(@RequestParam Integer id){
// 获取当前登录用户id

View File

@ -146,8 +146,6 @@ media:
auto-config: true
# [可选] zlm服务器的hook.admin_params=secret
secret: 035c73f7-bb6b-4889-a715-d9eb2d1925cc
# [可选] zlm服务器的general.streamNoneReaderDelayMS
stream-none-reader-delay-ms: 18000 # 无人观看多久自动关闭流, -1表示永不自动关闭,即 关闭按需拉流
# 启用多端口模式, 多端口模式使用端口区分每路流,兼容性更好。 单端口使用流的ssrc区分 点播超时建议使用多端口测试
rtp:
# [可选] 是否启用多端口模式, 开启后会在portRange范围内选择端口用于媒体流传输
@ -190,6 +188,8 @@ user-settings:
logInDatebase: true
# 使用推流状态作为推流通道状态
use-pushing-as-status: true
# 按需拉流, true有人观看拉流无人观看释放 false拉起后不自动释放
stream-on-demand: true
# 关闭在线文档(生产环境建议关闭)
springdoc:

View File

@ -41,10 +41,6 @@
<el-input v-if="currentStep === 2" v-model="mediaServerForm.httpPort" disabled :disabled="mediaServerForm.defaultServer"></el-input>
<el-input v-if="currentStep === 3" v-model="mediaServerForm.httpPort" :disabled="mediaServerForm.defaultServer"></el-input>
</el-form-item>
<el-form-item label="SECRET" prop="secret">
<el-input v-if="currentStep === 2" v-model="mediaServerForm.secret" disabled :disabled="mediaServerForm.defaultServer"></el-input>
<el-input v-if="currentStep === 3" v-model="mediaServerForm.secret" :disabled="mediaServerForm.defaultServer"></el-input>
</el-form-item>
<el-form-item label="HOOK IP" prop="ip">
<el-input v-model="mediaServerForm.hookIp" placeholder="媒体服务HOOK_IP" clearable :disabled="mediaServerForm.defaultServer"></el-input>
</el-form-item>
@ -74,6 +70,10 @@
<el-form-item label="RTMPS PORT" prop="rtmpSSlPort">
<el-input v-model="mediaServerForm.rtmpSSlPort" placeholder="媒体服务RTMPS_PORT" clearable :disabled="mediaServerForm.defaultServer"></el-input>
</el-form-item>
<el-form-item label="SECRET" prop="secret">
<el-input v-if="currentStep === 2" v-model="mediaServerForm.secret" disabled :disabled="mediaServerForm.defaultServer"></el-input>
<el-input v-if="currentStep === 3" v-model="mediaServerForm.secret" :disabled="mediaServerForm.defaultServer"></el-input>
</el-form-item>
<el-form-item label="自动配置媒体服务" >
<el-switch v-model="mediaServerForm.autoConfig" :disabled="mediaServerForm.defaultServer"></el-switch>
</el-form-item>
@ -94,9 +94,6 @@
-
<el-input v-model="sendRtpPortRange2" placeholder="终止" @change="portRangeChange" clearable style="width: 100px" prop="sendRtpPortRange2" :disabled="mediaServerForm.defaultServer"></el-input>
</el-form-item>
<el-form-item label="无人观看多久后停止拉流" >
<el-input v-model.number="mediaServerForm.streamNoneReaderDelayMS" clearable :disabled="mediaServerForm.defaultServer"></el-input>
</el-form-item>
<el-form-item label="录像管理服务端口" prop="recordAssistPort">
<el-input v-model.number="mediaServerForm.recordAssistPort" :disabled="mediaServerForm.defaultServer">
<!-- <el-button v-if="mediaServerForm.recordAssistPort > 0" slot="append" type="primary" @click="checkRecordServer"></el-button>-->
@ -172,7 +169,6 @@ export default {
hookIp: "",
sdpIp: "",
streamIp: "",
streamNoneReaderDelayMS: "",
secret: "035c73f7-bb6b-4889-a715-d9eb2d1925cc",
httpPort: "",
httpSSlPort: "",
@ -332,7 +328,6 @@ export default {
hookIp: "",
sdpIp: "",
streamIp: "",
streamNoneReaderDelayMS: "",
secret: "035c73f7-bb6b-4889-a715-d9eb2d1925cc",
httpPort: "",
httpSSlPort: "",

View File

@ -105,7 +105,9 @@
<el-checkbox label="启用" v-model="proxyParam.enable" ></el-checkbox>
<el-checkbox label="转HLS" v-model="proxyParam.enable_hls" ></el-checkbox>
<el-checkbox label="MP4录制" v-model="proxyParam.enable_mp4" ></el-checkbox>
<el-checkbox label="无人观看自动删除" v-model="proxyParam.enable_remove_none_reader" ></el-checkbox>
<el-checkbox label="无人观看自动删除" v-model="proxyParam.enable_remove_none_reader" @change="removeNoneReader"></el-checkbox>
<el-checkbox label="无人观看停止拉流" v-model="proxyParam.enable_disable_none_reader" @change="disableNoneReaderHandType"></el-checkbox>
</div>
</el-form-item>
@ -170,6 +172,7 @@ export default {
enable_hls: true,
enable_mp4: false,
enable_remove_none_reader: false,
enable_disable_none_reader: true,
platformGbId: null,
mediaServerId: null,
},
@ -276,6 +279,12 @@ export default {
if (this.platform.enable && this.platform.expires == "0") {
this.platform.expires = "300";
}
},
removeNoneReader: function(checked) {
this.proxyParam.enable_disable_none_reader = !checked;
},
disableNoneReaderHandType: function(checked) {
this.proxyParam.enable_remove_none_reader = !checked;
}
},
};

View File

@ -42,9 +42,6 @@
<el-form-item label="接口密钥" prop="secret">
<el-input v-model="form.secret" clearable></el-input>
</el-form-item>
<el-form-item label="无人观看触发时长">
<el-input v-model.number="form.streamNoneReaderDelayMS" clearable></el-input>
</el-form-item>
<el-form-item label="自动配置">
<el-switch v-model="form.autoConfig"></el-switch>
</el-form-item>