From 5a75381a00a555443925bbbd8e333b14473b3ed1 Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: Tue, 17 Oct 2023 15:34:01 +0800 Subject: [PATCH] =?UTF-8?q?=E5=9F=BA=E4=BA=8E=E6=96=B0=E7=9A=84=E4=BA=91?= =?UTF-8?q?=E7=AB=AF=E5=BD=95=E5=83=8F=E7=BB=93=E6=9E=84=E5=AE=9E=E7=8E=B0?= =?UTF-8?q?=E5=9B=BD=E6=A0=87=E5=BD=95=E5=83=8F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../session/VideoStreamSessionManager.java | 27 +++ .../request/impl/ByeRequestProcessor.java | 3 +- .../cmd/KeepaliveNotifyMessageHandler.java | 2 +- .../iot/vmp/media/zlm/AssistRESTfulUtils.java | 180 ++++++++++++++---- .../vmp/media/zlm/ZLMHttpHookListener.java | 35 ++-- .../iot/vmp/service/ICloudRecordService.java | 12 ++ .../iot/vmp/service/IMediaServerService.java | 11 -- .../service/impl/CloudRecordServiceImpl.java | 84 +++++++- .../service/impl/MediaServerServiceImpl.java | 95 --------- .../iot/vmp/service/impl/PlayServiceImpl.java | 65 ++++--- .../dao/CloudRecordServiceMapper.java | 30 ++- .../cloudRecord/CloudRecordController.java | 50 ++++- web_src/src/components/CloudRecordDetail.vue | 14 +- .../src/components/dialog/recordDownload.vue | 8 +- 14 files changed, 404 insertions(+), 212 deletions(-) diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/session/VideoStreamSessionManager.java b/src/main/java/com/genersoft/iot/vmp/gb28181/session/VideoStreamSessionManager.java index c46e38a9..f6573246 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/session/VideoStreamSessionManager.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/session/VideoStreamSessionManager.java @@ -75,6 +75,33 @@ public class VideoStreamSessionManager { return (SsrcTransaction)redisTemplate.opsForValue().get(scanResult.get(0)); } + public SsrcTransaction getSsrcTransactionByCallId(String callId){ + + if (ObjectUtils.isEmpty(callId)) { + return null; + } + String key = VideoManagerConstants.MEDIA_TRANSACTION_USED_PREFIX + userSetting.getServerId() + "_*_*_" + callId+ "_*"; + List scanResult = RedisUtil.scan(redisTemplate, key); + if (!scanResult.isEmpty()) { + return (SsrcTransaction)redisTemplate.opsForValue().get(scanResult.get(0)); + }else { + key = VideoManagerConstants.MEDIA_TRANSACTION_USED_PREFIX + userSetting.getServerId() + "_*_*_play_*"; + scanResult = RedisUtil.scan(redisTemplate, key); + if (scanResult.isEmpty()) { + return null; + } + for (Object keyObj : scanResult) { + SsrcTransaction ssrcTransaction = (SsrcTransaction)redisTemplate.opsForValue().get(keyObj); + if (ssrcTransaction.getSipTransactionInfo() != null && + ssrcTransaction.getSipTransactionInfo().getCallId().equals(callId)) { + return ssrcTransaction; + } + } + return null; + } + + } + public List getSsrcTransactionForAll(String deviceId, String channelId, String callId, String stream){ if (ObjectUtils.isEmpty(deviceId)) { deviceId ="*"; diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java index b6aac9c7..39009836 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java @@ -31,6 +31,7 @@ import javax.sip.header.CallIdHeader; import javax.sip.message.Response; import java.text.ParseException; import java.util.HashMap; +import java.util.List; import java.util.Map; /** @@ -149,7 +150,7 @@ public class ByeRequestProcessor extends SIPRequestProcessorParent implements In }else { // 可能是设备发送的停止 - SsrcTransaction ssrcTransaction = streamSession.getSsrcTransaction(null, null, callIdHeader.getCallId(), null); + SsrcTransaction ssrcTransaction = streamSession.getSsrcTransactionByCallId(callIdHeader.getCallId()); if (ssrcTransaction == null) { return; } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/KeepaliveNotifyMessageHandler.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/KeepaliveNotifyMessageHandler.java index 7d94787e..41a7dd88 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/KeepaliveNotifyMessageHandler.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/KeepaliveNotifyMessageHandler.java @@ -76,7 +76,7 @@ public class KeepaliveNotifyMessageHandler extends SIPRequestProcessorParent imp RemoteAddressInfo remoteAddressInfo = SipUtils.getRemoteAddressFromRequest(request, userSetting.getSipUseSourceIpAsRemoteAddress()); if (!device.getIp().equalsIgnoreCase(remoteAddressInfo.getIp()) || device.getPort() != remoteAddressInfo.getPort()) { - logger.info("[心跳] 设备{}地址变化, 远程地址为: {}:{}", device.getDeviceId(), remoteAddressInfo.getIp(), remoteAddressInfo.getPort()); + logger.info("[收到心跳] 设备{}地址变化, 远程地址为: {}:{}", device.getDeviceId(), remoteAddressInfo.getIp(), remoteAddressInfo.getPort()); device.setPort(remoteAddressInfo.getPort()); device.setHostAddress(remoteAddressInfo.getIp().concat(":").concat(String.valueOf(remoteAddressInfo.getPort()))); device.setIp(remoteAddressInfo.getIp()); diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/AssistRESTfulUtils.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/AssistRESTfulUtils.java index cf71bf1d..f018eaec 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/AssistRESTfulUtils.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/AssistRESTfulUtils.java @@ -9,12 +9,16 @@ import org.jetbrains.annotations.NotNull; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Component; +import org.springframework.util.ObjectUtils; import java.io.IOException; import java.net.ConnectException; +import java.net.SocketTimeoutException; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.concurrent.TimeUnit; @Component public class AssistRESTfulUtils { @@ -22,21 +26,43 @@ public class AssistRESTfulUtils { private final static Logger logger = LoggerFactory.getLogger(AssistRESTfulUtils.class); + private OkHttpClient client; + + + + public interface RequestCallback{ void run(JSONObject response); } private OkHttpClient getClient(){ - OkHttpClient.Builder httpClientBuilder = new OkHttpClient.Builder(); - if (logger.isDebugEnabled()) { - HttpLoggingInterceptor logging = new HttpLoggingInterceptor(message -> { - logger.debug("http请求参数:" + message); - }); - logging.setLevel(HttpLoggingInterceptor.Level.BASIC); - // OkHttp進行添加攔截器loggingInterceptor - httpClientBuilder.addInterceptor(logging); + return getClient(null); + } + + private OkHttpClient getClient(Integer readTimeOut){ + if (client == null) { + if (readTimeOut == null) { + readTimeOut = 10; + } + OkHttpClient.Builder httpClientBuilder = new OkHttpClient.Builder(); + // 设置连接超时时间 + httpClientBuilder.connectTimeout(8, TimeUnit.SECONDS); + // 设置读取超时时间 + httpClientBuilder.readTimeout(readTimeOut,TimeUnit.SECONDS); + // 设置连接池 + httpClientBuilder.connectionPool(new ConnectionPool(16, 5, TimeUnit.MINUTES)); + if (logger.isDebugEnabled()) { + HttpLoggingInterceptor logging = new HttpLoggingInterceptor(message -> { + logger.debug("http请求参数:" + message); + }); + logging.setLevel(HttpLoggingInterceptor.Level.BASIC); + // OkHttp進行添加攔截器loggingInterceptor + httpClientBuilder.addInterceptor(logging); + } + client = httpClientBuilder.build(); } - return httpClientBuilder.build(); + return client; + } @@ -124,13 +150,91 @@ public class AssistRESTfulUtils { return responseJSON; } + public JSONObject sendPost(MediaServerItem mediaServerItem, String api, JSONObject param, ZLMRESTfulUtils.RequestCallback callback, Integer readTimeOut) { + OkHttpClient client = getClient(readTimeOut); - public JSONObject fileDuration(MediaServerItem mediaServerItem, String app, String stream, RequestCallback callback){ - Map param = new HashMap<>(); - param.put("app",app); - param.put("stream",stream); - param.put("recordIng",true); - return sendGet(mediaServerItem, "api/record/file/duration",param, callback); + if (mediaServerItem == null) { + return null; + } + String url = String.format("http://%s:%s/%s", mediaServerItem.getIp(), mediaServerItem.getRecordAssistPort(), api); + JSONObject responseJSON = new JSONObject(); + //-2自定义流媒体 调用错误码 + responseJSON.put("code",-2); + responseJSON.put("msg","ASSIST调用失败"); + + RequestBody requestBodyJson = RequestBody.create(MediaType.parse("application/json; charset=utf-8"), param.toString()); + + Request request = new Request.Builder() + .post(requestBodyJson) + .url(url) + .addHeader("Content-Type", "application/json") + .build(); + if (callback == null) { + try { + Response response = client.newCall(request).execute(); + if (response.isSuccessful()) { + ResponseBody responseBody = response.body(); + if (responseBody != null) { + String responseStr = responseBody.string(); + responseJSON = JSON.parseObject(responseStr); + } + }else { + response.close(); + Objects.requireNonNull(response.body()).close(); + } + }catch (IOException e) { + logger.error(String.format("[ %s ]ASSIST请求失败: %s", url, e.getMessage())); + + if(e instanceof SocketTimeoutException){ + //读取超时超时异常 + logger.error(String.format("读取ASSIST数据失败: %s, %s", url, e.getMessage())); + } + if(e instanceof ConnectException){ + //判断连接异常,我这里是报Failed to connect to 10.7.5.144 + logger.error(String.format("连接ASSIST失败: %s, %s", url, e.getMessage())); + } + + }catch (Exception e){ + logger.error(String.format("访问ASSIST失败: %s, %s", url, e.getMessage())); + } + }else { + client.newCall(request).enqueue(new Callback(){ + + @Override + public void onResponse(@NotNull Call call, @NotNull Response response){ + if (response.isSuccessful()) { + try { + String responseStr = Objects.requireNonNull(response.body()).string(); + callback.run(JSON.parseObject(responseStr)); + } catch (IOException e) { + logger.error(String.format("[ %s ]请求失败: %s", url, e.getMessage())); + } + + }else { + response.close(); + Objects.requireNonNull(response.body()).close(); + } + } + + @Override + public void onFailure(@NotNull Call call, @NotNull IOException e) { + logger.error(String.format("连接ZLM失败: %s, %s", call.request().toString(), e.getMessage())); + + if(e instanceof SocketTimeoutException){ + //读取超时超时异常 + logger.error(String.format("读取ZLM数据失败: %s, %s", call.request().toString(), e.getMessage())); + } + if(e instanceof ConnectException){ + //判断连接异常,我这里是报Failed to connect to 10.7.5.144 + logger.error(String.format("连接ZLM失败: %s, %s", call.request().toString(), e.getMessage())); + } + } + }); + } + + + + return responseJSON; } public JSONObject getInfo(MediaServerItem mediaServerItem, RequestCallback callback){ @@ -138,33 +242,33 @@ public class AssistRESTfulUtils { return sendGet(mediaServerItem, "api/record/info",param, callback); } - public JSONObject addStreamCallInfo(MediaServerItem mediaServerItem, String app, String stream, String callId, RequestCallback callback){ - Map param = new HashMap<>(); - param.put("app",app); - param.put("stream",stream); - param.put("callId",callId); - return sendGet(mediaServerItem, "api/record/addStreamCallInfo",param, callback); + public JSONObject addTask(MediaServerItem mediaServerItem, String app, String stream, String startTime, + String endTime, String callId, List filePathList, String remoteHost) { + + JSONObject videoTaskInfoJSON = new JSONObject(); + videoTaskInfoJSON.put("app", app); + videoTaskInfoJSON.put("stream", stream); + videoTaskInfoJSON.put("startTime", startTime); + videoTaskInfoJSON.put("endTime", endTime); + videoTaskInfoJSON.put("callId", callId); + videoTaskInfoJSON.put("filePathList", filePathList); + if (!ObjectUtils.isEmpty(remoteHost)) { + videoTaskInfoJSON.put("remoteHost", remoteHost); + } + + return sendPost(mediaServerItem, "api/record/file/download/task/add", videoTaskInfoJSON, null, 30); } - public JSONObject getDateList(MediaServerItem mediaServerItem, String app, String stream, int year, int month) { + public JSONObject queryTaskList(MediaServerItem mediaServerItem, String taskId, Boolean isEnd) { Map param = new HashMap<>(); - param.put("app", app); - param.put("stream", stream); - param.put("year", year); - param.put("month", month); - return sendGet(mediaServerItem, "api/record/date/list", param, null); - } + if (!ObjectUtils.isEmpty(taskId)) { + param.put("taskId", taskId); + } + if (!ObjectUtils.isEmpty(isEnd)) { + param.put("isEnd", isEnd); + } - public JSONObject getFileList(MediaServerItem mediaServerItem, int page, int count, String app, String stream, - String startTime, String endTime) { - Map param = new HashMap<>(); - param.put("app", app); - param.put("stream", stream); - param.put("page", page); - param.put("count", count); - param.put("startTime", startTime); - param.put("endTime", endTime); - return sendGet(mediaServerItem, "api/record/file/listWithDate", param, null); + return sendGet(mediaServerItem, "api/record/file/download/task/list", param, null); } } diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java index 045bd7e6..e385ad74 100755 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java @@ -234,12 +234,6 @@ public class ZLMHttpHookListener { streamAuthorityInfo.setSign(sign); // 鉴权通过 redisCatchStorage.updateStreamAuthorityInfo(param.getApp(), param.getStream(), streamAuthorityInfo); - // 通知assist新的callId - if (mediaInfo != null && mediaInfo.getRecordAssistPort() > 0) { - taskExecutor.execute(() -> { - assistRESTfulUtils.addStreamCallInfo(mediaInfo, param.getApp(), param.getStream(), callId, null); - }); - } } } else { zlmMediaListManager.sendStreamEvent(param.getApp(), param.getStream(), param.getMediaServerId()); @@ -267,15 +261,28 @@ public class ZLMHttpHookListener { } // 替换流地址 if ("rtp".equals(param.getApp()) && !mediaInfo.isRtpEnable()) { - String ssrc = String.format("%010d", Long.parseLong(param.getStream(), 16));; - InviteInfo inviteInfo = inviteStreamService.getInviteInfoBySSRC(ssrc); - if (inviteInfo != null) { - result.setStream_replace(inviteInfo.getStream()); - logger.info("[ZLM HOOK]推流鉴权 stream: {} 替换为 {}", param.getStream(), inviteInfo.getStream()); + if (!mediaInfo.isRtpEnable()) { + String ssrc = String.format("%010d", Long.parseLong(param.getStream(), 16));; + InviteInfo inviteInfo = inviteStreamService.getInviteInfoBySSRC(ssrc); + if (inviteInfo != null) { + result.setStream_replace(inviteInfo.getStream()); + logger.info("[ZLM HOOK]推流鉴权 stream: {} 替换为 {}", param.getStream(), inviteInfo.getStream()); + } } + } + List ssrcTransactionForAll = sessionManager.getSsrcTransactionForAll(null, null, null, param.getStream()); if (ssrcTransactionForAll != null && ssrcTransactionForAll.size() == 1) { + + // 为录制国标模拟一个鉴权信息 + StreamAuthorityInfo streamAuthorityInfo = StreamAuthorityInfo.getInstanceByHook(param); + streamAuthorityInfo.setApp(param.getApp()); + streamAuthorityInfo.setStream(ssrcTransactionForAll.get(0).getStream()); + streamAuthorityInfo.setCallId(ssrcTransactionForAll.get(0).getSipTransactionInfo().getCallId()); + + redisCatchStorage.updateStreamAuthorityInfo(param.getApp(), ssrcTransactionForAll.get(0).getStream(), streamAuthorityInfo); + String deviceId = ssrcTransactionForAll.get(0).getDeviceId(); String channelId = ssrcTransactionForAll.get(0).getChannelId(); DeviceChannel deviceChannel = storager.queryChannel(deviceId, channelId); @@ -349,13 +356,11 @@ public class ZLMHttpHookListener { List tracks = param.getTracks(); // TODO 重构此处逻辑 - boolean isPush = false; if (param.isRegist()) { - // 处理流注册的鉴权信息 + // 处理流注册的鉴权信息, 流注销这里不再删除鉴权信息,下次来了新的鉴权信息会对就的进行覆盖 if (param.getOriginType() == OriginType.RTMP_PUSH.ordinal() || param.getOriginType() == OriginType.RTSP_PUSH.ordinal() || param.getOriginType() == OriginType.RTC_PUSH.ordinal()) { - isPush = true; StreamAuthorityInfo streamAuthorityInfo = redisCatchStorage.getStreamAuthorityInfo(param.getApp(), param.getStream()); if (streamAuthorityInfo == null) { streamAuthorityInfo = StreamAuthorityInfo.getInstanceByHook(param); @@ -365,8 +370,6 @@ public class ZLMHttpHookListener { } redisCatchStorage.updateStreamAuthorityInfo(param.getApp(), param.getStream(), streamAuthorityInfo); } - } else { - redisCatchStorage.removeStreamAuthorityInfo(param.getApp(), param.getStream()); } if ("rtsp".equals(param.getSchema())) { diff --git a/src/main/java/com/genersoft/iot/vmp/service/ICloudRecordService.java b/src/main/java/com/genersoft/iot/vmp/service/ICloudRecordService.java index e353c7a5..5a5eb650 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/ICloudRecordService.java +++ b/src/main/java/com/genersoft/iot/vmp/service/ICloudRecordService.java @@ -1,5 +1,7 @@ package com.genersoft.iot.vmp.service; +import com.alibaba.fastjson2.JSONArray; +import com.alibaba.fastjson2.JSONObject; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import com.genersoft.iot.vmp.media.zlm.dto.hook.OnRecordMp4HookParam; import com.genersoft.iot.vmp.service.bean.CloudRecordItem; @@ -28,4 +30,14 @@ public interface ICloudRecordService { */ List getDateList(String app, String stream, int year, int month, List mediaServerItems); + /** + * 添加合并任务 + */ + String addTask(String app, String stream, String mediaServerId, String startTime, String endTime, String callId, String remoteHost); + + + /** + * 查询合并任务列表 + */ + JSONArray queryTask(String taskId, String mediaServerId, Boolean isEnd); } diff --git a/src/main/java/com/genersoft/iot/vmp/service/IMediaServerService.java b/src/main/java/com/genersoft/iot/vmp/service/IMediaServerService.java index 8cfdd88f..c5f11961 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/IMediaServerService.java +++ b/src/main/java/com/genersoft/iot/vmp/service/IMediaServerService.java @@ -87,21 +87,10 @@ public interface IMediaServerService { void updateMediaServerKeepalive(String mediaServerId, ServerKeepaliveData data); - boolean checkRtpServer(MediaServerItem mediaServerItem, String rtp, String stream); - /** * 获取负载信息 * @return */ MediaServerLoad getLoad(MediaServerItem mediaServerItem); - /** - * 按时间查找录像文件 - */ - List getRecords(String app, String stream, String startTime, String endTime, List mediaServerItems); - - /** - * 查找存在录像文件的时间 - */ - List getRecordDates(String app, String stream, int year, int month, List mediaServerItems); } diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/CloudRecordServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/CloudRecordServiceImpl.java index d9f51891..1d8a5374 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/CloudRecordServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/CloudRecordServiceImpl.java @@ -1,11 +1,17 @@ package com.genersoft.iot.vmp.service.impl; +import com.alibaba.fastjson2.JSONArray; +import com.alibaba.fastjson2.JSONObject; import com.genersoft.iot.vmp.conf.exception.ControllerException; import com.genersoft.iot.vmp.gb28181.bean.GbStream; +import com.genersoft.iot.vmp.gb28181.bean.SsrcTransaction; +import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager; +import com.genersoft.iot.vmp.media.zlm.AssistRESTfulUtils; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import com.genersoft.iot.vmp.media.zlm.dto.StreamAuthorityInfo; import com.genersoft.iot.vmp.media.zlm.dto.hook.OnRecordMp4HookParam; import com.genersoft.iot.vmp.service.ICloudRecordService; +import com.genersoft.iot.vmp.service.IMediaServerService; import com.genersoft.iot.vmp.service.bean.CloudRecordItem; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.dao.CloudRecordServiceMapper; @@ -31,9 +37,18 @@ public class CloudRecordServiceImpl implements ICloudRecordService { @Autowired private CloudRecordServiceMapper cloudRecordServiceMapper; + @Autowired + private IMediaServerService mediaServerService; + @Autowired private IRedisCatchStorage redisCatchStorage; + @Autowired + private AssistRESTfulUtils assistRESTfulUtils; + + @Autowired + private VideoStreamSessionManager streamSession; + @Override public PageInfo getList(int page, int count, String app, String stream, String startTime, String endTime, List mediaServerItems) { // 开始时间和结束时间在数据库中都是以秒为单位的 @@ -54,7 +69,8 @@ public class CloudRecordServiceImpl implements ICloudRecordService { } PageHelper.startPage(page, count); - List all = cloudRecordServiceMapper.getList(app, stream, startTimeStamp, endTimeStamp, mediaServerItems); + List all = cloudRecordServiceMapper.getList(app, stream, startTimeStamp, endTimeStamp, + null, mediaServerItems); return new PageInfo<>(all); } @@ -69,7 +85,8 @@ public class CloudRecordServiceImpl implements ICloudRecordService { } long startTimeStamp = startDate.atStartOfDay().toInstant(ZoneOffset.ofHours(8)).getEpochSecond(); long endTimeStamp = endDate.atStartOfDay().toInstant(ZoneOffset.ofHours(8)).getEpochSecond(); - List cloudRecordItemList = cloudRecordServiceMapper.getList(app, stream, startTimeStamp, endTimeStamp, mediaServerItems); + List cloudRecordItemList = cloudRecordServiceMapper.getList(app, stream, startTimeStamp, + endTimeStamp, null, mediaServerItems); if (cloudRecordItemList.isEmpty()) { return new ArrayList<>(); } @@ -83,12 +100,71 @@ public class CloudRecordServiceImpl implements ICloudRecordService { @Override public void addRecord(OnRecordMp4HookParam param) { - CloudRecordItem cloudRecordItem = CloudRecordItem.getInstance(param); + CloudRecordItem cloudRecordItem = CloudRecordItem.getInstance(param); StreamAuthorityInfo streamAuthorityInfo = redisCatchStorage.getStreamAuthorityInfo(param.getApp(), param.getStream()); if (streamAuthorityInfo != null) { cloudRecordItem.setCallId(streamAuthorityInfo.getCallId()); } - logger.info("[添加录像记录] {}/{} 文件大小:{}", param.getApp(), param.getStream(), param.getFile_size()); + logger.info("[添加录像记录] {}/{} 文件大小:{}, 时长: {}秒", param.getApp(), param.getStream(), param.getFile_size(),param.getTime_len()); cloudRecordServiceMapper.add(cloudRecordItem); } + + @Override + public String addTask(String app, String stream, String mediaServerId, String startTime, String endTime, String callId, String remoteHost) { + // 参数校验 + assert app != null; + assert stream != null; + MediaServerItem mediaServerItem = null; + if (mediaServerId == null) { + mediaServerItem = mediaServerService.getDefaultMediaServer(); + }else { + mediaServerItem = mediaServerService.getOne(mediaServerId); + } + if (mediaServerItem == null) { + throw new ControllerException(ErrorCode.ERROR100.getCode(), "未找到可用的流媒体"); + }else { + if (remoteHost == null) { + remoteHost = "http://" + mediaServerItem.getStreamIp() + ":" + mediaServerItem.getRecordAssistPort(); + } + } + Long startTimeStamp = null; + Long endTimeStamp = null; + if (startTime != null) { + startTimeStamp = DateUtil.yyyy_MM_dd_HH_mm_ssToTimestamp(startTime); + } + if (endTime != null) { + endTimeStamp = DateUtil.yyyy_MM_dd_HH_mm_ssToTimestamp(endTime); + } + + List mediaServers = new ArrayList<>(); + mediaServers.add(mediaServerItem); + // 检索相关的录像文件 + List filePathList = cloudRecordServiceMapper.queryRecordFilePathList(app, stream, startTimeStamp, endTimeStamp, callId, mediaServers); + if (filePathList == null || filePathList.isEmpty()) { + throw new ControllerException(ErrorCode.ERROR100.getCode(), "未检索到视频文件"); + } + JSONObject result = assistRESTfulUtils.addTask(mediaServerItem, app, stream, startTime, endTime, callId, filePathList, remoteHost); + if (result.getInteger("code") != 0) { + throw new ControllerException(result.getInteger("code"), result.getString("msg")); + } + return result.getString("data"); + } + + @Override + public JSONArray queryTask(String taskId, String mediaServerId, Boolean isEnd) { + MediaServerItem mediaServerItem = null; + if (mediaServerId == null) { + mediaServerItem = mediaServerService.getDefaultMediaServer(); + }else { + mediaServerItem = mediaServerService.getOne(mediaServerId); + } + if (mediaServerItem == null) { + throw new ControllerException(ErrorCode.ERROR100.getCode(), "未找到可用的流媒体"); + } + JSONObject result = assistRESTfulUtils.queryTaskList(mediaServerItem, taskId, isEnd); + if (result.getInteger("code") != 0) { + throw new ControllerException(result.getInteger("code"), result.getString("msg")); + } + return result.getJSONArray("data"); + } } diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java index 4c149d4b..4759d3a0 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java @@ -741,15 +741,6 @@ public class MediaServerServiceImpl implements IMediaServerService { } } - @Override - public boolean checkRtpServer(MediaServerItem mediaServerItem, String app, String stream) { - JSONObject rtpInfo = zlmresTfulUtils.getRtpInfo(mediaServerItem, stream); - if(rtpInfo.getInteger("code") == 0){ - return rtpInfo.getBoolean("exist"); - } - return false; - } - @Override public MediaServerLoad getLoad(MediaServerItem mediaServerItem) { MediaServerLoad result = new MediaServerLoad(); @@ -761,90 +752,4 @@ public class MediaServerServiceImpl implements IMediaServerService { result.setGbSend(redisCatchStorage.getGbSendCount(mediaServerItem.getId())); return result; } - - @Override - public List getRecords(String app, String stream, String startTime, String endTime, List mediaServerItems) { - Assert.notNull(app, "app不存在"); - Assert.notNull(stream, "stream不存在"); - Assert.notNull(startTime, "startTime不存在"); - Assert.notNull(endTime, "endTime不存在"); - Assert.notEmpty(mediaServerItems, "流媒体列表为空"); - - CompletableFuture[] completableFutures = new CompletableFuture[mediaServerItems.size()]; - for (int i = 0; i < mediaServerItems.size(); i++) { - completableFutures[i] = getRecordFilesForOne(app, stream, startTime, endTime, mediaServerItems.get(i)); - } - List result = new ArrayList<>(); - for (int i = 0; i < completableFutures.length; i++) { - try { - List list = (List) completableFutures[i].get(); - if (!list.isEmpty()) { - for (int g = 0; g < list.size(); g++) { - list.get(g).setMediaServerId(mediaServerItems.get(i).getId()); - } - result.addAll(list); - } - } catch (InterruptedException e) { - throw new RuntimeException(e); - } catch (ExecutionException e) { - throw new RuntimeException(e); - } - } - Comparator comparator = Comparator.comparing(RecordFile::getFileName); - result.sort(comparator); - return result; - } - - @Override - public List getRecordDates(String app, String stream, int year, int month, List mediaServerItems) { - Assert.notNull(app, "app不存在"); - Assert.notNull(stream, "stream不存在"); - Assert.notEmpty(mediaServerItems, "流媒体列表为空"); - CompletableFuture[] completableFutures = new CompletableFuture[mediaServerItems.size()]; - - for (int i = 0; i < mediaServerItems.size(); i++) { - completableFutures[i] = getRecordDatesForOne(app, stream, year, month, mediaServerItems.get(i)); - } - List result = new ArrayList<>(); - CompletableFuture.allOf(completableFutures).join(); - for (CompletableFuture completableFuture : completableFutures) { - try { - List list = (List) completableFuture.get(); - result.addAll(list); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } catch (ExecutionException e) { - throw new RuntimeException(e); - } - } - Collections.sort(result); - return result; - } - - @Async - public CompletableFuture> getRecordDatesForOne(String app, String stream, int year, int month, MediaServerItem mediaServerItem) { - JSONObject fileListJson = assistRESTfulUtils.getDateList(mediaServerItem, app, stream, year, month); - if (fileListJson != null && !fileListJson.isEmpty()) { - if (fileListJson.getString("code") != null && fileListJson.getInteger("code") == 0) { - JSONArray data = fileListJson.getJSONArray("data"); - return CompletableFuture.completedFuture(data.toJavaList(String.class)); - } - } - return CompletableFuture.completedFuture(new ArrayList<>()); - } - - @Async - public CompletableFuture> getRecordFilesForOne(String app, String stream, String startTime, String endTime, MediaServerItem mediaServerItem) { - JSONObject fileListJson = assistRESTfulUtils.getFileList(mediaServerItem, 1, 100000000, app, stream, startTime, endTime); - if (fileListJson != null && !fileListJson.isEmpty()) { - if (fileListJson.getString("code") != null && fileListJson.getInteger("code") == 0) { - JSONObject data = fileListJson.getJSONObject("data"); - JSONArray list = data.getJSONArray("list"); - if (list != null) { - return CompletableFuture.completedFuture(list.toJavaList(RecordFile.class)); - } - } - } - return CompletableFuture.completedFuture(new ArrayList<>()); - } } diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java index f2653f70..eb9e2ef6 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java @@ -27,11 +27,13 @@ import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import com.genersoft.iot.vmp.media.zlm.dto.hook.HookParam; import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam; import com.genersoft.iot.vmp.service.*; +import com.genersoft.iot.vmp.service.bean.CloudRecordItem; import com.genersoft.iot.vmp.service.bean.ErrorCallback; import com.genersoft.iot.vmp.service.bean.InviteErrorCode; import com.genersoft.iot.vmp.service.bean.SSRCInfo; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.IVideoManagerStorage; +import com.genersoft.iot.vmp.storager.dao.CloudRecordServiceMapper; import com.genersoft.iot.vmp.utils.DateUtil; import com.genersoft.iot.vmp.vmanager.bean.ErrorCode; import gov.nist.javax.sip.message.SIPResponse; @@ -107,6 +109,9 @@ public class PlayServiceImpl implements IPlayService { @Autowired private ZlmHttpHookSubscribe subscribe; + @Autowired + private CloudRecordServiceMapper cloudRecordServiceMapper; + @Override public SSRCInfo play(MediaServerItem mediaServerItem, String deviceId, String channelId, String ssrc, ErrorCallback callback) { @@ -749,31 +754,43 @@ public class PlayServiceImpl implements IPlayService { logger.warn("查询录像信息时发现节点已离线"); return null; } - if (mediaServerItem.getRecordAssistPort() > 0) { - JSONObject jsonObject = assistRESTfulUtils.fileDuration(mediaServerItem, inviteInfo.getStreamInfo().getApp(), inviteInfo.getStreamInfo().getStream(), null); - if (jsonObject == null) { - throw new ControllerException(ErrorCode.ERROR100.getCode(), "连接Assist服务失败"); - } - if (jsonObject.getInteger("code") == 0) { - long duration = jsonObject.getLong("data"); - - if (duration == 0) { - inviteInfo.getStreamInfo().setProgress(0); - } else { - String startTime = inviteInfo.getStreamInfo().getStartTime(); - String endTime = inviteInfo.getStreamInfo().getEndTime(); - long start = DateUtil.yyyy_MM_dd_HH_mm_ssToTimestamp(startTime); - long end = DateUtil.yyyy_MM_dd_HH_mm_ssToTimestamp(endTime); - - BigDecimal currentCount = new BigDecimal(duration / 1000); - BigDecimal totalCount = new BigDecimal(end - start); - BigDecimal divide = currentCount.divide(totalCount, 2, RoundingMode.HALF_UP); - double process = divide.doubleValue(); - inviteInfo.getStreamInfo().setProgress(process); - } - inviteStreamService.updateInviteInfo(inviteInfo); - } + if (mediaServerItem.getRecordAssistPort() == 0) { + throw new ControllerException(ErrorCode.ERROR100.getCode(), "未配置Assist服务,无法完成录像下载"); } + SsrcTransaction ssrcTransaction = streamSession.getSsrcTransaction(deviceId, channelId, null, stream); + + if (ssrcTransaction == null) { + logger.warn("[获取下载进度],未找到下载事务信息"); + return null; + } + + // 为了支持多个数据库,这里不能使用求和函数来直接获取总数了 + List cloudRecordItemList = cloudRecordServiceMapper.getList("rtp", inviteInfo.getStream(), null, null, ssrcTransaction.getCallId(), null); + + if (cloudRecordItemList.isEmpty()) { + logger.warn("[获取下载进度],未找到下载视频信息"); + return null; + } + long duration = 0; + for (CloudRecordItem cloudRecordItem : cloudRecordItemList) { + duration += cloudRecordItem.getTimeLen(); + } + if (duration == 0) { + inviteInfo.getStreamInfo().setProgress(0); + } else { + String startTime = inviteInfo.getStreamInfo().getStartTime(); + String endTime = inviteInfo.getStreamInfo().getEndTime(); + long start = DateUtil.yyyy_MM_dd_HH_mm_ssToTimestamp(startTime); + long end = DateUtil.yyyy_MM_dd_HH_mm_ssToTimestamp(endTime); + + BigDecimal currentCount = new BigDecimal(duration); + BigDecimal totalCount = new BigDecimal(end - start); + BigDecimal divide = currentCount.divide(totalCount, 2, RoundingMode.HALF_UP); + double process = divide.doubleValue(); + inviteInfo.getStreamInfo().setProgress(process); + } + inviteStreamService.updateInviteInfo(inviteInfo); + return inviteInfo.getStreamInfo(); } return null; diff --git a/src/main/java/com/genersoft/iot/vmp/storager/dao/CloudRecordServiceMapper.java b/src/main/java/com/genersoft/iot/vmp/storager/dao/CloudRecordServiceMapper.java index 10595ef7..d5a08592 100644 --- a/src/main/java/com/genersoft/iot/vmp/storager/dao/CloudRecordServiceMapper.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/dao/CloudRecordServiceMapper.java @@ -42,17 +42,37 @@ public interface CloudRecordServiceMapper { @Select(" ") List getList(@Param("app") String app, @Param("stream") String stream, @Param("startTimeStamp")Long startTimeStamp, @Param("endTimeStamp")Long endTimeStamp, - List mediaServerItemList); + @Param("callId")String callId, List mediaServerItemList); + + + @Select(" ") + List queryRecordFilePathList(@Param("app") String app, @Param("stream") String stream, + @Param("startTimeStamp")Long startTimeStamp, @Param("endTimeStamp")Long endTimeStamp, + @Param("callId")String callId, List mediaServerItemList); + } diff --git a/src/main/java/com/genersoft/iot/vmp/vmanager/cloudRecord/CloudRecordController.java b/src/main/java/com/genersoft/iot/vmp/vmanager/cloudRecord/CloudRecordController.java index 0d79675b..04778c36 100755 --- a/src/main/java/com/genersoft/iot/vmp/vmanager/cloudRecord/CloudRecordController.java +++ b/src/main/java/com/genersoft/iot/vmp/vmanager/cloudRecord/CloudRecordController.java @@ -1,5 +1,7 @@ package com.genersoft.iot.vmp.vmanager.cloudRecord; +import com.alibaba.fastjson2.JSONArray; +import com.alibaba.fastjson2.JSONObject; import com.genersoft.iot.vmp.conf.DynamicTask; import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.conf.exception.ControllerException; @@ -145,19 +147,53 @@ public class CloudRecordController { @Operation(summary = "添加合并任务") @Parameter(name = "app", description = "应用名", required = true) @Parameter(name = "stream", description = "流ID", required = true) + @Parameter(name = "mediaServerId", description = "流媒体ID", required = false) @Parameter(name = "startTime", description = "鉴权ID", required = false) @Parameter(name = "endTime", description = "鉴权ID", required = false) @Parameter(name = "callId", description = "鉴权ID", required = false) @Parameter(name = "remoteHost", description = "返回地址时的远程地址", required = false) public String addTask( - @RequestParam String app, - @RequestParam String stream, - @RequestParam String startTime, - @RequestParam String endTime, - @RequestParam String callId, - @RequestParam String remoteHost + @RequestParam(required = true) String app, + @RequestParam(required = true) String stream, + @RequestParam(required = false) String mediaServerId, + @RequestParam(required = false) String startTime, + @RequestParam(required = false) String endTime, + @RequestParam(required = false) String callId, + @RequestParam(required = false) String remoteHost ){ - return cloudRecordService.addTask(app, stream, startTime, endTime, callId, remoteHost); + return cloudRecordService.addTask(app, stream, mediaServerId, startTime, endTime, callId, remoteHost); + } + + @ResponseBody + @GetMapping("/task/list") + @Operation(summary = "查询合并任务") + @Parameter(name = "taskId", description = "任务Id", required = false) + @Parameter(name = "mediaServerId", description = "流媒体ID", required = false) + @Parameter(name = "isEnd", description = "是否结束", required = false) + public JSONArray queryTaskList( + @RequestParam(required = false) String taskId, + @RequestParam(required = false) String mediaServerId, + @RequestParam(required = false) Boolean isEnd + ){ + return cloudRecordService.queryTask(taskId, mediaServerId, isEnd); + } + + @ResponseBody + @GetMapping("/collect/add") + @Operation(summary = "添加收藏") + @Parameter(name = "app", description = "应用名", required = true) + @Parameter(name = "stream", description = "流ID", required = true) + @Parameter(name = "mediaServerId", description = "流媒体ID", required = false) + @Parameter(name = "startTime", description = "鉴权ID", required = false) + @Parameter(name = "endTime", description = "鉴权ID", required = false) + @Parameter(name = "callId", description = "鉴权ID", required = false) + @Parameter(name = "collectType", description = "收藏类型", required = false) + public JSONArray addCollect( + @RequestParam(required = false) String taskId, + @RequestParam(required = false) String mediaServerId, + @RequestParam(required = false) Boolean isEnd + ){ + return cloudRecordService.queryTask(taskId, mediaServerId, isEnd); } diff --git a/web_src/src/components/CloudRecordDetail.vue b/web_src/src/components/CloudRecordDetail.vue index 207fbef0..a3b1bc6b 100755 --- a/web_src/src/components/CloudRecordDetail.vue +++ b/web_src/src/components/CloudRecordDetail.vue @@ -480,12 +480,13 @@ let that = this; this.$axios({ method: 'get', - url:`/record_proxy/${that.mediaServerId}/api/record/file/download/task/add`, + url:`/api/cloud/record/task/add`, params: { - app: that.app, - stream: that.stream, - startTime: moment(this.taskTimeRange[0]).format('YYYY-MM-DD HH:mm:ss'), - endTime: moment(this.taskTimeRange[1]).format('YYYY-MM-DD HH:mm:ss'), + app: this.app, + stream: this.stream, + mediaServerId: this.mediaServerId, + startTime: moment(this.taskTimeRange[0]).format('YYYY-MM-DD HH:mm:ss'), + endTime: moment(this.taskTimeRange[1]).format('YYYY-MM-DD HH:mm:ss'), } }).then(function (res) { if (res.data.code === 0 ) { @@ -505,8 +506,9 @@ let that = this; this.$axios({ method: 'get', - url:`/record_proxy/${that.mediaServerId}/api/record/file/download/task/list`, + url:`/api/cloud/record/task/list`, params: { + mediaServerId: this.mediaServerId, isEnd: isEnd, } }).then(function (res) { diff --git a/web_src/src/components/dialog/recordDownload.vue b/web_src/src/components/dialog/recordDownload.vue index 7a945400..95a512e8 100755 --- a/web_src/src/components/dialog/recordDownload.vue +++ b/web_src/src/components/dialog/recordDownload.vue @@ -137,10 +137,11 @@ export default { getFileDownload: function (){ this.$axios({ method: 'get', - url:`/record_proxy/${this.mediaServerId}/api/record/file/download/task/add`, + url:`/api/cloud/record/task/add`, params: { app: this.app, stream: this.stream, + mediaServerId: this.mediaServerId, startTime: null, endTime: null, } @@ -169,10 +170,9 @@ export default { getProgressForFile: function (callback){ this.$axios({ method: 'get', - url:`/record_proxy/${this.mediaServerId}/api/record/file/download/task/list`, + url:`/api/cloud/record/task/list`, params: { - app: this.app, - stream: this.stream, + mediaServerId: this.mediaServerId, taskId: this.taskId, isEnd: true, }