abl添加checkMediaServer实现

dev/abl支持
648540858 2024-04-09 23:41:48 +08:00
parent 48a0e88b95
commit 2dc7eecb47
3 changed files with 415 additions and 6 deletions

View File

@ -1,8 +1,12 @@
package com.genersoft.iot.vmp.media.abl;
import com.alibaba.fastjson2.JSONArray;
import com.alibaba.fastjson2.JSONObject;
import com.genersoft.iot.vmp.common.CommonCallback;
import com.genersoft.iot.vmp.common.StreamInfo;
import com.genersoft.iot.vmp.conf.SipConfig;
import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
import com.genersoft.iot.vmp.media.abl.bean.AblServerConfig;
import com.genersoft.iot.vmp.media.bean.MediaInfo;
import com.genersoft.iot.vmp.media.bean.MediaServer;
import com.genersoft.iot.vmp.media.service.IMediaNodeServerService;
@ -24,6 +28,9 @@ public class ABLMediaNodeServerService implements IMediaNodeServerService {
@Autowired
private ABLRESTfulUtils ablresTfulUtils;
@Autowired
private SipConfig sipConfig;
@Override
public int createRTPServer(MediaServer mediaServer, String stream, long ssrc, Integer port, Boolean onlyAuto, Boolean disableAudio, Boolean reUsePort, Integer tcpMode) {
return ablresTfulUtils.openRtpServer(mediaServer, "rtp", stream, 96, port, tcpMode, disableAudio?1:0);
@ -46,17 +53,28 @@ public class ABLMediaNodeServerService implements IMediaNodeServerService {
logger.info("关闭RTP Server " + jsonObject);
if (jsonObject != null ) {
if (jsonObject.getInteger("code") != 0) {
logger.error("关闭RTP Server 失败: " + jsonObject.getString("msg"));
logger.error("[closeRtpServer] 失败: " + jsonObject.getString("msg"));
}
}else {
// 检查ZLM状态
logger.error("关闭RTP Server 失败: 请检查ZLM服务");
logger.error("[closeRtpServer] 失败: 请检查ZLM服务");
}
}
@Override
public void closeStreams(MediaServer mediaServerItem, String rtp, String streamId) {
public void closeStreams(MediaServer mediaServer, String app, String streamId) {
Map<String, Object> param = new HashMap<>();
param.put("stream_id", streamId);
param.put("force", 1);
JSONObject jsonObject = ablresTfulUtils.closeStreams(mediaServer, app, streamId);
if (jsonObject != null ) {
if (jsonObject.getInteger("code") != 0) {
logger.error("[closeStreams] 失败: " + jsonObject.getString("msg"));
}
}else {
// 检查ZLM状态
logger.error("[closeStreams] 失败: 请检查ZLM服务");
}
}
@Override
@ -66,86 +84,125 @@ public class ABLMediaNodeServerService implements IMediaNodeServerService {
@Override
public boolean checkNodeId(MediaServer mediaServerItem) {
logger.warn("[abl-checkNodeId] 未实现");
return false;
}
@Override
public void online(MediaServer mediaServerItem) {
logger.warn("[abl-online] 未实现");
}
@Override
public MediaServer checkMediaServer(String ip, int port, String secret) {
MediaServer mediaServer = new MediaServer();
mediaServer.setIp(ip);
mediaServer.setHttpPort(port);
mediaServer.setSecret(secret);
JSONObject responseJSON = ablresTfulUtils.getServerConfig(mediaServer);
JSONArray data = responseJSON.getJSONArray("params");
if (data != null && !data.isEmpty()) {
AblServerConfig config = AblServerConfig.getInstance(data);
config.setServerIp(ip);
config.setHttpServerPort(port);
return new MediaServer(config, sipConfig.getIp());
}
return null;
}
@Override
public boolean stopSendRtp(MediaServer mediaInfo, String app, String stream, String ssrc) {
// TODO 需要记录开始发流返回的KEY暂不做实现
logger.warn("[abl-stopSendRtp] 未实现");
// ablresTfulUtils.stopSendRtp()
return false;
}
@Override
public boolean deleteRecordDirectory(MediaServer mediaServerItem, String app, String stream, String date, String fileName) {
logger.warn("[abl-deleteRecordDirectory] 未实现");
return false;
}
@Override
public List<StreamInfo> getMediaList(MediaServer mediaServerItem, String app, String stream, String callId) {
logger.warn("[abl-getMediaList] 未实现");
return null;
}
@Override
public Boolean connectRtpServer(MediaServer mediaServerItem, String address, int port, String stream) {
logger.warn("[abl-connectRtpServer] 未实现");
return null;
}
@Override
public void getSnap(MediaServer mediaServerItem, String streamUrl, int timeoutSec, int expireSec, String path, String fileName) {
logger.warn("[abl-getSnap] 未实现");
}
@Override
public MediaInfo getMediaInfo(MediaServer mediaServerItem, String app, String stream) {
logger.warn("[abl-getMediaInfo] 未实现");
return null;
}
@Override
public Boolean pauseRtpCheck(MediaServer mediaServer, String streamKey) {
logger.warn("[abl-pauseRtpCheck] 未实现");
return null;
}
@Override
public Boolean resumeRtpCheck(MediaServer mediaServer, String streamKey) {
logger.warn("[abl-resumeRtpCheck] 未实现");
return null;
}
@Override
public String getFfmpegCmd(MediaServer mediaServer, String cmdKey) {
logger.warn("[abl-getFfmpegCmd] 未实现");
return null;
}
@Override
public WVPResult<String> addFFmpegSource(MediaServer mediaServer, String srcUrl, String dstUrl, int timeoutMs, boolean enableAudio, boolean enableMp4, String ffmpegCmdKey) {
logger.warn("[abl-addFFmpegSource] 未实现");
return null;
}
@Override
public WVPResult<String> addStreamProxy(MediaServer mediaServer, String app, String stream, String url, boolean enableAudio, boolean enableMp4, String rtpType) {
logger.warn("[abl-addStreamProxy] 未实现");
return null;
}
@Override
public Boolean delFFmpegSource(MediaServer mediaServer, String streamKey) {
logger.warn("[abl-delFFmpegSource] 未实现");
return null;
}
@Override
public Boolean delStreamProxy(MediaServer mediaServer, String streamKey) {
logger.warn("[abl-delStreamProxy] 未实现");
return null;
}
@Override
public Map<String, String> getFFmpegCMDs(MediaServer mediaServer) {
logger.warn("[abl-getFFmpegCMDs] 未实现");
return null;
}
@Override
public void startSendRtpPassive(MediaServer mediaServer, SendRtpItem sendRtpItem, Integer timeout) {
logger.warn("[abl-startSendRtpPassive] 未实现");
}
@Override
public void startSendRtpStream(MediaServer mediaServer, SendRtpItem sendRtpItem) {
logger.warn("[abl-startSendRtpStream] 未实现");
}
}

View File

@ -0,0 +1,325 @@
package com.genersoft.iot.vmp.media.abl;
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONObject;
import com.genersoft.iot.vmp.media.bean.MediaServer;
import okhttp3.*;
import okhttp3.logging.HttpLoggingInterceptor;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.net.ConnectException;
import java.net.SocketTimeoutException;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
@Component
public class ABLRESTfulUtils {
private final static Logger logger = LoggerFactory.getLogger(ABLRESTfulUtils.class);
private OkHttpClient client;
public interface RequestCallback{
void run(JSONObject response);
}
private OkHttpClient getClient(){
return getClient(null);
}
private OkHttpClient getClient(Integer readTimeOut){
if (client == null) {
if (readTimeOut == null) {
readTimeOut = 10;
}
OkHttpClient.Builder httpClientBuilder = new OkHttpClient.Builder();
//todo 暂时写死超时时间 均为5s
// 设置连接超时时间
httpClientBuilder.connectTimeout(8,TimeUnit.SECONDS);
// 设置读取超时时间
httpClientBuilder.readTimeout(readTimeOut,TimeUnit.SECONDS);
// 设置连接池
httpClientBuilder.connectionPool(new ConnectionPool(16, 5, TimeUnit.MINUTES));
if (logger.isDebugEnabled()) {
HttpLoggingInterceptor logging = new HttpLoggingInterceptor(message -> {
logger.debug("http请求参数" + message);
});
logging.setLevel(HttpLoggingInterceptor.Level.BASIC);
// OkHttp進行添加攔截器loggingInterceptor
httpClientBuilder.addInterceptor(logging);
}
client = httpClientBuilder.build();
}
return client;
}
public JSONObject sendPost(MediaServer mediaServerItem, String api, Map<String, Object> param, RequestCallback callback) {
return sendPost(mediaServerItem, api, param, callback, null);
}
public JSONObject sendPost(MediaServer mediaServerItem, String api, Map<String, Object> param, RequestCallback callback, Integer readTimeOut) {
OkHttpClient client = getClient(readTimeOut);
if (mediaServerItem == null) {
return null;
}
String url = String.format("http://%s:%s/index/api/%s", mediaServerItem.getIp(), mediaServerItem.getHttpPort(), api);
JSONObject responseJSON = new JSONObject();
//-2自定义流媒体 调用错误码
responseJSON.put("code",-2);
responseJSON.put("msg","流媒体调用失败");
FormBody.Builder builder = new FormBody.Builder();
builder.add("secret",mediaServerItem.getSecret());
if (param != null && param.keySet().size() > 0) {
for (String key : param.keySet()){
if (param.get(key) != null) {
builder.add(key, param.get(key).toString());
}
}
}
FormBody body = builder.build();
Request request = new Request.Builder()
.post(body)
.url(url)
.build();
if (callback == null) {
try {
Response response = client.newCall(request).execute();
if (response.isSuccessful()) {
ResponseBody responseBody = response.body();
if (responseBody != null) {
String responseStr = responseBody.string();
responseJSON = JSON.parseObject(responseStr);
}
}else {
System.out.println( 2222);
System.out.println( response.code());
response.close();
Objects.requireNonNull(response.body()).close();
}
}catch (IOException e) {
logger.error(String.format("[ %s ]请求失败: %s", url, e.getMessage()));
if(e instanceof SocketTimeoutException){
//读取超时超时异常
logger.error(String.format("读取ABL数据超时失败: %s, %s", url, e.getMessage()));
}
if(e instanceof ConnectException){
//判断连接异常我这里是报Failed to connect to 10.7.5.144
logger.error(String.format("连接ABL连接失败: %s, %s", url, e.getMessage()));
}
}catch (Exception e){
logger.error(String.format("访问ABL失败: %s, %s", url, e.getMessage()));
}
}else {
client.newCall(request).enqueue(new Callback(){
@Override
public void onResponse(@NotNull Call call, @NotNull Response response){
if (response.isSuccessful()) {
try {
String responseStr = Objects.requireNonNull(response.body()).string();
callback.run(JSON.parseObject(responseStr));
} catch (IOException e) {
logger.error(String.format("[ %s ]请求失败: %s", url, e.getMessage()));
}
}else {
response.close();
Objects.requireNonNull(response.body()).close();
}
}
@Override
public void onFailure(@NotNull Call call, @NotNull IOException e) {
logger.error(String.format("连接ABL失败: %s, %s", call.request().toString(), e.getMessage()));
if(e instanceof SocketTimeoutException){
//读取超时超时异常
logger.error(String.format("读取ABL数据失败: %s, %s", call.request().toString(), e.getMessage()));
}
if(e instanceof ConnectException){
//判断连接异常我这里是报Failed to connect to 10.7.5.144
logger.error(String.format("连接ABL失败: %s, %s", call.request().toString(), e.getMessage()));
}
}
});
}
return responseJSON;
}
public JSONObject sendGet(MediaServer mediaServerItem, String api, Map<String, Object> param) {
OkHttpClient client = getClient();
if (mediaServerItem == null) {
return null;
}
JSONObject responseJSON = null;
StringBuilder stringBuffer = new StringBuilder();
stringBuffer.append(String.format("http://%s:%s/index/api/%s", mediaServerItem.getIp(), mediaServerItem.getHttpPort(), api));
if (param != null && !param.keySet().isEmpty()) {
stringBuffer.append("?secret=").append(mediaServerItem.getSecret()).append("&");
int index = 1;
for (String key : param.keySet()){
if (param.get(key) != null) {
stringBuffer.append(key + "=" + param.get(key));
if (index < param.size()) {
stringBuffer.append("&");
}
}
index++;
}
}
String url = stringBuffer.toString();
logger.info("[访问ABL] {}", url);
Request request = new Request.Builder()
.get()
.url(url)
.build();
try {
Response response = client.newCall(request).execute();
if (response.isSuccessful()) {
ResponseBody responseBody = response.body();
if (responseBody != null) {
String responseStr = responseBody.string();
responseJSON = JSON.parseObject(responseStr);
}
}else {
response.close();
Objects.requireNonNull(response.body()).close();
}
} catch (ConnectException e) {
logger.error(String.format("连接ABL失败: %s, %s", e.getCause().getMessage(), e.getMessage()));
logger.info("请检查media配置并确认ABL已启动...");
}catch (IOException e) {
logger.error(String.format("[ %s ]请求失败: %s", url, e.getMessage()));
}
return responseJSON;
}
public void sendGetForImg(MediaServer mediaServerItem, String api, Map<String, Object> params, String targetPath, String fileName) {
String url = String.format("http://%s:%s/index/api/%s", mediaServerItem.getIp(), mediaServerItem.getHttpPort(), api);
HttpUrl parseUrl = HttpUrl.parse(url);
if (parseUrl == null) {
return;
}
HttpUrl.Builder httpBuilder = parseUrl.newBuilder();
httpBuilder.addQueryParameter("secret", mediaServerItem.getSecret());
if (params != null) {
for (Map.Entry<String, Object> param : params.entrySet()) {
httpBuilder.addQueryParameter(param.getKey(), param.getValue().toString());
}
}
Request request = new Request.Builder()
.url(httpBuilder.build())
.build();
logger.info(request.toString());
try {
OkHttpClient client = getClient();
Response response = client.newCall(request).execute();
if (response.isSuccessful()) {
if (targetPath != null) {
File snapFolder = new File(targetPath);
if (!snapFolder.exists()) {
if (!snapFolder.mkdirs()) {
logger.warn("{}路径创建失败", snapFolder.getAbsolutePath());
}
}
File snapFile = new File(targetPath + File.separator + fileName);
FileOutputStream outStream = new FileOutputStream(snapFile);
outStream.write(Objects.requireNonNull(response.body()).bytes());
outStream.flush();
outStream.close();
} else {
logger.error(String.format("[ %s ]请求失败: %s %s", url, response.code(), response.message()));
}
} else {
logger.error(String.format("[ %s ]请求失败: %s %s", url, response.code(), response.message()));
}
Objects.requireNonNull(response.body()).close();
} catch (ConnectException e) {
logger.error(String.format("连接ABL失败: %s, %s", e.getCause().getMessage(), e.getMessage()));
logger.info("请检查media配置并确认ABL已启动...");
} catch (IOException e) {
logger.error(String.format("[ %s ]请求失败: %s", url, e.getMessage()));
}
}
public Integer openRtpServer(MediaServer mediaServer, String app, String stream, int payload, Integer port, Integer tcpMode, Integer disableAudio) {
Map<String, Object> param = new HashMap<>();
param.put("vhost", "_defaultVhost_");
param.put("app", app);
param.put("stream_id", stream);
param.put("payload", payload);
if (port != null) {
param.put("port", port);
}
if (tcpMode != null) {
param.put("enable_tcp", tcpMode);
}
if (disableAudio != null) {
param.put("disableAudio", disableAudio);
}
JSONObject jsonObject = sendPost(mediaServer, "openRtpServer", param, null);
if (jsonObject.getInteger("code") == 0) {
return jsonObject.getInteger("port");
}else {
return 0;
}
}
public JSONObject closeStreams(MediaServer mediaServerItem, String app, String stream) {
Map<String, Object> param = new HashMap<>();
param.put("vhost", "__defaultVhost__");
param.put("app", app);
param.put("stream", stream);
param.put("force", 1);
return sendPost(mediaServerItem, "close_streams",param, null);
}
public JSONObject getServerConfig(MediaServer mediaServerItem){
return sendPost(mediaServerItem, "getServerConfig",null, null);
}
public JSONObject setConfigParamValue(MediaServer mediaServerItem, String key, Object value){
Map<String, Object> param = new HashMap<>();
param.put("key", key);
param.put("value", value);
return sendGet(mediaServerItem,"setConfigParamValue", param);
}
public void stopSendRtp(MediaServer mediaServer,String key) {
Map<String, Object> param = new HashMap<>();
param.put("key", key);
sendPost(mediaServer,"stopSendRtp", param, null);
}
}

View File

@ -1,6 +1,7 @@
package com.genersoft.iot.vmp.media.bean;
import com.genersoft.iot.vmp.media.abl.bean.AblServerConfig;
import com.genersoft.iot.vmp.media.zlm.dto.ZLMServerConfig;
import io.swagger.v3.oas.annotations.media.Schema;
import org.springframework.util.ObjectUtils;
@ -129,6 +130,32 @@ public class MediaServer {
}
public MediaServer(AblServerConfig config, String sipIp) {
id = config.getMediaServerId();
ip = config.getServerIp();
hookIp = sipIp;
sdpIp = config.getServerIp();
streamIp = config.getServerIp();
httpPort = config.getHttpServerPort();
flvPort = config.getHttpFlvPort();
wsFlvPort = config.getWsPort();
// httpSSlPort = config.getHttpSSLport();
// flvSSLPort = config.getHttpSSLport();
// wsFlvSSLPort = config.getHttpSSLport();
rtmpPort = config.getRtmpPort();
// rtmpSSlPort = config.getRtmpSslPort();
rtpProxyPort = config.getPsTsRecvPort();
rtspPort = config.getRtspPort();
// rtspSSLPort = config.getRtspSSlport();
autoConfig = true; // 默认值true;
secret = config.getSecret();
// hookAliveInterval = config.getHookAliveInterval();
rtpEnable = false; // 默认使用单端口;直到用户自己设置开启多端口
// rtpPortRange = config.getPortRange().replace("_",","); // 默认使用30000,30500作为级联时发送流的端口号
rtpPortRange = "30000,30500"; // 默认使用30000,30500作为级联时发送流的端口号
recordAssistPort = 0; // 默认关闭
}
public String getId() {
return id;
}