Merge branch 'wvp-28181-2.0' into database-structure-optimization

# Conflicts:
#	sql/2.6.9更新.sql
结构优化
648540858 2023-10-01 23:59:04 +08:00
commit 8f7673a083
11 changed files with 126 additions and 35 deletions

View File

@ -9,3 +9,6 @@ alter table wvp_platform
alter table wvp_device
add auto_sync_channel bool default true
alter table wvp_stream_proxy
add stream_key varying(255)

View File

@ -246,6 +246,7 @@ create table wvp_stream_proxy (
create_time character varying(50),
name character varying(255),
update_time character varying(50),
stream_key character varying(255),
enable_disable_none_reader bool default false,
constraint uk_stream_proxy_app_stream unique (app, stream)
);

View File

@ -93,7 +93,10 @@ public class CatalogEventLister implements ApplicationListener<CatalogEvent> {
}
if (event.getGbStreams() != null && !event.getGbStreams().isEmpty()){
for (GbStream gbStream : event.getGbStreams()) {
if (gbStream.getStreamType().equals("push") && !userSetting.isUsePushingAsStatus()) {
if (gbStream != null
&& gbStream.getStreamType() != null
&& gbStream.getStreamType().equals("push")
&& !userSetting.isUsePushingAsStatus()) {
continue;
}
DeviceChannel deviceChannelByStream = gbStreamService.getDeviceChannelListByStream(gbStream, gbStream.getCatalogId(), parentPlatform);

View File

@ -199,6 +199,13 @@ public class ZLMHttpHookListener {
}
// 推流鉴权的处理
if (!"rtp".equals(param.getApp())) {
StreamProxyItem stream = streamProxyService.getStreamProxyByAppAndStream(param.getApp(), param.getStream());
if (stream != null) {
HookResultForOnPublish result = HookResultForOnPublish.SUCCESS();
result.setEnable_audio(stream.isEnableAudio());
result.setEnable_mp4(stream.isEnableMp4());
return result;
}
if (userSetting.getPushAuthority()) {
// 推流鉴权
if (param.getParams() == null) {

View File

@ -32,13 +32,20 @@ public class ZLMRESTfulUtils {
}
private OkHttpClient getClient(){
return getClient(null);
}
private OkHttpClient getClient(Integer readTimeOut){
if (client == null) {
if (readTimeOut == null) {
readTimeOut = 10;
}
OkHttpClient.Builder httpClientBuilder = new OkHttpClient.Builder();
//todo 暂时写死超时时间 均为5s
// 设置连接超时时间
httpClientBuilder.connectTimeout(5,TimeUnit.SECONDS);
httpClientBuilder.connectTimeout(8,TimeUnit.SECONDS);
// 设置读取超时时间
httpClientBuilder.readTimeout(10,TimeUnit.SECONDS);
httpClientBuilder.readTimeout(readTimeOut,TimeUnit.SECONDS);
// 设置连接池
httpClientBuilder.connectionPool(new ConnectionPool(16, 5, TimeUnit.MINUTES));
if (logger.isDebugEnabled()) {
@ -55,9 +62,13 @@ public class ZLMRESTfulUtils {
}
public JSONObject sendPost(MediaServerItem mediaServerItem, String api, Map<String, Object> param, RequestCallback callback) {
OkHttpClient client = getClient();
return sendPost(mediaServerItem, api, param, callback, null);
}
public JSONObject sendPost(MediaServerItem mediaServerItem, String api, Map<String, Object> param, RequestCallback callback, Integer readTimeOut) {
OkHttpClient client = getClient(readTimeOut);
if (mediaServerItem == null) {
return null;
@ -261,6 +272,12 @@ public class ZLMRESTfulUtils {
return sendPost(mediaServerItem, "delFFmpegSource",param, null);
}
public JSONObject delStreamProxy(MediaServerItem mediaServerItem, String key){
Map<String, Object> param = new HashMap<>();
param.put("key", key);
return sendPost(mediaServerItem, "delStreamProxy",param, null);
}
public JSONObject getMediaServerConfig(MediaServerItem mediaServerItem){
return sendPost(mediaServerItem, "getServerConfig",null, null);
}
@ -310,7 +327,7 @@ public class ZLMRESTfulUtils {
param.put("enable_mp4", enable_mp4?1:0);
param.put("enable_audio", enable_audio?1:0);
param.put("rtp_type", rtp_type);
return sendPost(mediaServerItem, "addStreamProxy",param, null);
return sendPost(mediaServerItem, "addStreamProxy",param, null, 20);
}
public JSONObject closeStreams(MediaServerItem mediaServerItem, String app, String stream) {

View File

@ -41,6 +41,9 @@ public class StreamProxyItem extends GbStream {
@Schema(description = "是否 无人观看时自动停用")
private boolean enableDisableNoneReader;
@Schema(description = "拉流代理时zlm返回的key用于停止拉流代理")
private String streamKey;
public String getType() {
return type;
}
@ -167,5 +170,11 @@ public class StreamProxyItem extends GbStream {
this.enableAudio = enable_audio;
}
public String getStreamKey() {
return streamKey;
}
public void setStreamKey(String streamKey) {
this.streamKey = streamKey;
}
}

View File

@ -10,7 +10,10 @@ import com.genersoft.iot.vmp.conf.exception.ControllerException;
import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent;
import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils;
import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory;
import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe;
import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory;
import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem;
import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam;
@ -59,6 +62,9 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
@Autowired
private ZLMRESTfulUtils zlmresTfulUtils;
@Autowired
private ZLMServerFactory zlmServerFactory;
@Autowired
private StreamProxyMapper streamProxyMapper;
@ -143,7 +149,7 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
dstUrl = String.format("%s://%s:%s/%s/%s", schemaForUri, "127.0.0.1", port, param.getApp(),
param.getStream());
}else {
dstUrl = String.format("rtmp://%s:%s/%s/%s", "127.0.0.1", mediaInfo.getRtmpPort(), param.getApp(),
dstUrl = String.format("rtsp://%s:%s/%s/%s", "127.0.0.1", mediaInfo.getRtspPort(), param.getApp(),
param.getStream());
}
param.setDstUrl(dstUrl);
@ -160,15 +166,14 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
callback.run(ErrorCode.ERROR100.getCode(), "保存失败", null);
return;
}
HookSubscribeForStreamChange hookSubscribeForStreamChange = HookSubscribeFactory.on_stream_changed(param.getApp(), param.getStream(), true, "rtsp", mediaInfo.getId());
hookSubscribe.addSubscribe(hookSubscribeForStreamChange, (mediaServerItem, response) -> {
StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStream(
mediaInfo, param.getApp(), param.getStream(), null, null);
callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), streamInfo);
});
if (param.isEnable()) {
String talkKey = UUID.randomUUID().toString();
dynamicTask.startCron(talkKey, ()->{
StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStreamWithCheck(param.getApp(), param.getStream(), mediaInfo.getId(), false);
if (streamInfo != null) {
callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), streamInfo);
}
}, 1000);
String delayTalkKey = UUID.randomUUID().toString();
dynamicTask.startDelay(delayTalkKey, ()->{
StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStreamWithCheck(param.getApp(), param.getStream(), mediaInfo.getId(), false);
@ -178,9 +183,10 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
dynamicTask.stop(talkKey);
callback.run(ErrorCode.ERROR100.getCode(), "超时", null);
}
}, 5000);
}, 7000);
JSONObject jsonObject = addStreamProxyToZlm(param);
if (jsonObject != null && jsonObject.getInteger("code") == 0) {
hookSubscribe.removeSubscribe(hookSubscribeForStreamChange);
dynamicTask.stop(talkKey);
StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStream(
mediaInfo, param.getApp(), param.getStream(), null, null);
@ -310,13 +316,32 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
if (mediaServerItem == null) {
return null;
}
if ("default".equals(param.getType())){
result = zlmresTfulUtils.addStreamProxy(mediaServerItem, param.getApp(), param.getStream(), param.getUrl().trim(),
param.isEnableAudio(), param.isEnableMp4(), param.getRtpType());
}else if ("ffmpeg".equals(param.getType())) {
if (zlmServerFactory.isStreamReady(mediaServerItem, param.getApp(), param.getStream())) {
zlmresTfulUtils.closeStreams(mediaServerItem, param.getApp(), param.getStream());
}
if ("ffmpeg".equalsIgnoreCase(param.getType())){
result = zlmresTfulUtils.addFFmpegSource(mediaServerItem, param.getSrcUrl().trim(), param.getDstUrl(),
param.getTimeoutMs() + "", param.isEnableAudio(), param.isEnableMp4(),
param.getFfmpegCmdKey());
}else {
result = zlmresTfulUtils.addStreamProxy(mediaServerItem, param.getApp(), param.getStream(), param.getUrl().trim(),
param.isEnableAudio(), param.isEnableMp4(), param.getRtpType());
}
System.out.println("addStreamProxyToZlm====");
System.out.println(result);
if (result != null && result.getInteger("code") == 0) {
JSONObject data = result.getJSONObject("data");
if (data == null) {
logger.warn("[获取拉流代理的结果数据Data] 失败: {}", result );
return result;
}
String key = data.getString("key");
if (key == null) {
logger.warn("[获取拉流代理的结果数据Data中的KEY] 失败: {}", result );
return result;
}
param.setStreamKey(key);
streamProxyMapper.update(param);
}
return result;
}
@ -327,7 +352,12 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
return null;
}
MediaServerItem mediaServerItem = mediaServerService.getOne(param.getMediaServerId());
JSONObject result = zlmresTfulUtils.closeStreams(mediaServerItem, param.getApp(), param.getStream());
JSONObject result = null;
if ("ffmpeg".equalsIgnoreCase(param.getType())){
result = zlmresTfulUtils.delFFmpegSource(mediaServerItem, param.getStreamKey());
}else {
result = zlmresTfulUtils.delStreamProxy(mediaServerItem, param.getStreamKey());
}
return result;
}
@ -342,19 +372,18 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
if (streamProxyItem != null) {
gbStreamService.sendCatalogMsg(streamProxyItem, CatalogEvent.DEL);
JSONObject jsonObject = removeStreamProxyFromZlm(streamProxyItem);
if (jsonObject != null && jsonObject.getInteger("code") == 0) {
// 如果关联了国标那么移除关联
int i = platformGbStreamMapper.delByAppAndStream(app, stream);
gbStreamMapper.del(app, stream);
System.out.println();
// TODO 如果关联的推流, 那么状态设置为离线
}
// 如果关联了国标那么移除关联
platformGbStreamMapper.delByAppAndStream(app, stream);
gbStreamMapper.del(app, stream);
videoManagerStorager.deleteStreamProxy(app, stream);
redisCatchStorage.removeStream(streamProxyItem.getMediaServerId(), "PULL", app, stream);
JSONObject jsonObject = removeStreamProxyFromZlm(streamProxyItem);
if (jsonObject != null && jsonObject.getInteger("code") == 0) {
logger.info("[移除代理] 代理: {}/{}, 从zlm移除成功", app, stream);
}else {
logger.info("[移除代理] 代理: {}/{}, 从zlm移除失败", app, stream);
}
}
}
@Override

View File

@ -167,8 +167,8 @@ public interface DeviceChannelMapper {
" <if test='query != null'> AND (dc.channel_id LIKE concat('%',#{query},'%') OR dc.name LIKE concat('%',#{query},'%') OR dc.name LIKE concat('%',#{query},'%'))</if> " +
" <if test='online == true' > AND dc.status=true</if> " +
" <if test='online == false' > AND dc.status=false</if> " +
" <if test='hasSubChannel!= null and has_sub_channel == true' > AND dc.sub_count > 0</if> " +
" <if test='hasSubChannel!= null and has_sub_channel == false' > AND dc.sub_count = 0</if> " +
" <if test='hasSubChannel!= null and hasSubChannel == true' > AND dc.sub_count > 0</if> " +
" <if test='hasSubChannel!= null and hasSubChannel == false' > AND dc.sub_count = 0</if> " +
" <if test='catalogId == null ' > AND dc.id not in (select device_channel_id from wvp_platform_gb_channel where platform_id=#{platformId} ) </if> " +
" <if test='catalogId != null ' > AND pgc.platform_id = #{platformId} and pgc.catalog_id=#{catalogId} </if> " +
" ORDER BY dc.device_id, dc.channel_id ASC" +

View File

@ -12,9 +12,9 @@ import java.util.List;
public interface StreamProxyMapper {
@Insert("INSERT INTO wvp_stream_proxy (type, name, app, stream,media_server_id, url, src_url, dst_url, " +
"timeout_ms, ffmpeg_cmd_key, rtp_type, enable_audio, enable_mp4, enable, status, enable_remove_none_reader, enable_disable_none_reader, create_time) VALUES" +
"timeout_ms, ffmpeg_cmd_key, rtp_type, enable_audio, enable_mp4, enable, status, stream_key, enable_remove_none_reader, enable_disable_none_reader, create_time) VALUES" +
"(#{type}, #{name}, #{app}, #{stream}, #{mediaServerId}, #{url}, #{srcUrl}, #{dstUrl}, " +
"#{timeoutMs}, #{ffmpegCmdKey}, #{rtpType}, #{enableAudio}, #{enableMp4}, #{enable}, #{status}, " +
"#{timeoutMs}, #{ffmpegCmdKey}, #{rtpType}, #{enableAudio}, #{enableMp4}, #{enable}, #{status}, #{streamKey}, " +
"#{enableRemoveNoneReader}, #{enableDisableNoneReader}, #{createTime} )")
int add(StreamProxyItem streamProxyDto);
@ -33,6 +33,7 @@ public interface StreamProxyMapper {
"enable_audio=#{enableAudio}, " +
"enable=#{enable}, " +
"status=#{status}, " +
"stream_key=#{streamKey}, " +
"enable_remove_none_reader=#{enableRemoveNoneReader}, " +
"enable_disable_none_reader=#{enableDisableNoneReader}, " +
"enable_mp4=#{enableMp4} " +
@ -45,7 +46,7 @@ public interface StreamProxyMapper {
@Select("SELECT st.*, pgs.gb_id, pgs.name, pgs.longitude, pgs.latitude FROM wvp_stream_proxy st LEFT join wvp_gb_stream pgs on st.app = pgs.app AND st.stream = pgs.stream order by st.create_time desc")
List<StreamProxyItem> selectAll();
@Select("SELECT st.*, pgs.gb_id, pgs.name, pgs.longitude, pgs.latitude FROM wvp_stream_proxy st LEFT join wvp_gb_stream pgs on st.app = pgs.app AND st.stream = pgs.stream WHERE st.enable=#{enable} order by st.create_time desc")
@Select("SELECT st.*, pgs.gb_id, pgs.name, pgs.longitude, pgs.latitude, 'proxy' as streamType FROM wvp_stream_proxy st LEFT join wvp_gb_stream pgs on st.app = pgs.app AND st.stream = pgs.stream WHERE st.enable=#{enable} order by st.create_time desc")
List<StreamProxyItem> selectForEnable(boolean enable);
@Select("SELECT st.*, pgs.gb_id, pgs.name, pgs.longitude, pgs.latitude FROM wvp_stream_proxy st LEFT join wvp_gb_stream pgs on st.app = pgs.app AND st.stream = pgs.stream WHERE st.app=#{app} AND st.stream=#{stream} order by st.create_time desc")

View File

@ -36,6 +36,7 @@ import org.springframework.util.ObjectUtils;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.context.request.async.DeferredResult;
import javax.servlet.ServletOutputStream;
import javax.servlet.http.HttpServletResponse;
import javax.sip.InvalidArgumentException;
import javax.sip.SipException;
@ -477,7 +478,10 @@ public class DeviceQuery {
try {
final InputStream in = Files.newInputStream(new File("snap" + File.separator + deviceId + "_" + channelId + (mark == null? ".jpg": ("_" + mark + ".jpg"))).toPath());
resp.setContentType(MediaType.IMAGE_PNG_VALUE);
ServletOutputStream outputStream = resp.getOutputStream();
IOUtils.copy(in, resp.getOutputStream());
in.close();
outputStream.close();
} catch (IOException e) {
resp.setStatus(HttpServletResponse.SC_NOT_FOUND);
}

View File

@ -67,6 +67,16 @@ public class StreamProxyController {
return streamProxyService.getAll(page, count);
}
@Operation(summary = "查询流代理")
@Parameter(name = "app", description = "应用名")
@Parameter(name = "stream", description = "流Id")
@GetMapping(value = "/one")
@ResponseBody
public StreamProxyItem one(String app, String stream){
return streamProxyService.getStreamProxyByAppAndStream(app, stream);
}
@Operation(summary = "保存代理", parameters = {
@Parameter(name = "param", description = "代理参数", required = true),
})
@ -80,9 +90,16 @@ public class StreamProxyController {
if (ObjectUtils.isEmpty(param.getType())) {
param.setType("default");
}
if (ObjectUtils.isEmpty(param.getRtpType())) {
param.setRtpType("1");
}
if (ObjectUtils.isEmpty(param.getGbId())) {
param.setGbId(null);
}
StreamProxyItem streamProxyItem = streamProxyService.getStreamProxyByAppAndStream(param.getApp(), param.getStream());
if (streamProxyItem != null) {
streamProxyService.del(param.getApp(), param.getStream());
}
RequestMessage requestMessage = new RequestMessage();
String key = DeferredResultHolder.CALLBACK_CMD_PROXY + param.getApp() + param.getStream();