From 6290e94733ae49fc3aa986c6fbe559bf08b678d7 Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: Fri, 19 Jan 2024 14:33:34 +0800 Subject: [PATCH] =?UTF-8?q?=E4=B8=B4=E6=97=B6=E6=8F=90=E4=BA=A4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../iot/vmp/media/zlm/ZLMRESTfulUtils.java | 1 + .../iot/vmp/media/zlm/dto/StreamProxy.java | 24 ++- .../iot/vmp/service/IStreamProxyService.java | 9 +- .../service/impl/StreamProxyServiceImpl.java | 161 ++++++------------ .../service/impl/StreamPushServiceImpl.java | 2 +- .../vmp/storager/dao/StreamProxyMapper.java | 43 ++--- .../streamProxy/StreamProxyController.java | 8 +- 数据库/初始化-mysql.sql | 4 +- 数据库/初始化-postgresql-kingbase.sql | 4 +- 数据库/结构优化/common.sql | 11 +- 数据库/结构优化/初始化-mysql.sql | 4 +- .../结构优化/初始化-postgresql-kingbase.sql | 4 +- 12 files changed, 109 insertions(+), 166 deletions(-) diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRESTfulUtils.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRESTfulUtils.java index 3f4e15b9..c5b9ceb0 100755 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRESTfulUtils.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRESTfulUtils.java @@ -13,6 +13,7 @@ import org.springframework.stereotype.Component; import java.io.*; import java.net.ConnectException; import java.net.SocketTimeoutException; +import java.net.URLEncoder; import java.util.HashMap; import java.util.Map; import java.util.Objects; diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/StreamProxy.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/StreamProxy.java index e1b0a7b4..6305b193 100755 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/StreamProxy.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/StreamProxy.java @@ -146,14 +146,6 @@ public class StreamProxy { this.rtpType = rtpType; } - public boolean isEnable() { - return enable; - } - - public void setEnable(boolean enable) { - this.enable = enable; - } - public boolean isEnableAudio() { return enableAudio; } @@ -257,4 +249,20 @@ public class StreamProxy { public void setProxyError(String proxyError) { this.proxyError = proxyError; } + + public boolean isPulling() { + return pulling; + } + + public void setPulling(boolean pulling) { + this.pulling = pulling; + } + + public String getPullTime() { + return pullTime; + } + + public void setPullTime(String pullTime) { + this.pullTime = pullTime; + } } diff --git a/src/main/java/com/genersoft/iot/vmp/service/IStreamProxyService.java b/src/main/java/com/genersoft/iot/vmp/service/IStreamProxyService.java index 7edd497a..899665da 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/IStreamProxyService.java +++ b/src/main/java/com/genersoft/iot/vmp/service/IStreamProxyService.java @@ -73,11 +73,6 @@ public interface IStreamProxyService { */ void updateStreamGPS(List gpsMsgInfoList); - /** - * 获取所有启用的拉流代理 - */ - List getAllForEnable(); - /** * 添加拉流代理 */ @@ -98,4 +93,8 @@ public interface IStreamProxyService { */ void getStreamProxyById(Integer id, GeneralCallback callback); + /** + * 播放代理流 + */ + void play(Integer id, GeneralCallback callback); } diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java index a90e48c1..3ea5b174 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java @@ -10,7 +10,6 @@ import com.genersoft.iot.vmp.conf.DynamicTask; import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.conf.exception.ControllerException; import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils; -import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory; import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe; import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory; import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange; @@ -24,7 +23,6 @@ import com.genersoft.iot.vmp.service.IMediaService; import com.genersoft.iot.vmp.service.IStreamProxyService; import com.genersoft.iot.vmp.service.bean.GPSMsgInfo; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; -import com.genersoft.iot.vmp.storager.IVideoManagerStorage; import com.genersoft.iot.vmp.storager.dao.StreamProxyMapper; import com.genersoft.iot.vmp.utils.DateUtil; import com.genersoft.iot.vmp.vmanager.bean.ErrorCode; @@ -37,11 +35,9 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.jdbc.datasource.DataSourceTransactionManager; -import org.springframework.scheduling.annotation.Scheduled; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.stereotype.Service; import org.springframework.transaction.TransactionDefinition; -import org.springframework.util.CollectionUtils; import org.springframework.transaction.annotation.Transactional; import org.springframework.util.ObjectUtils; @@ -51,8 +47,6 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.UUID; -import java.util.function.Function; -import java.util.stream.Collectors; import java.util.*; /** @@ -166,6 +160,7 @@ public class StreamProxyServiceImpl implements IStreamProxyService { }else { // 新增 addProxyToDb(param); } + HookSubscribeForStreamChange hookSubscribeForStreamChange = HookSubscribeFactory.on_stream_changed(param.getApp(), param.getStream(), true, "rtsp", mediaInfo.getId()); hookSubscribe.addSubscribe(hookSubscribeForStreamChange, (mediaServerItem, response) -> { StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStream( @@ -174,76 +169,40 @@ public class StreamProxyServiceImpl implements IStreamProxyService { callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), streamInfo); } }); - if (param.isEnable()) { - startProxy(param, mediaInfo, (code, msg, data) -> { - if (code != ErrorCode.SUCCESS.getCode()) { - if (callback != null) { - callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), data); - } - param.setStatus(true); - streamProxyMapper.update(param); - }else { - if (callback != null) { - callback.run(code, msg, null); - } - param.setEnable(false); - // 直接移除 - if (param.isEnableRemoveNoneReader()) { - delProxyFromDb(param); - }else { - updateProxyToDb(param); - } + startProxy(param, mediaInfo, (code, msg, data) -> { + if (code != ErrorCode.SUCCESS.getCode()) { + if (callback != null) { + callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), data); + } + param.setPulling(true); + streamProxyMapper.update(param); + }else { + if (callback != null) { + callback.run(code, msg, null); + } + param.setPulling(false); + // 直接移除 + if (param.isEnableRemoveNoneReader()) { + delProxyFromDb(param); + }else { + updateProxyToDb(param); } - }); - } else{ - StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStream( - mediaInfo, param.getApp(), param.getStream(), null, null); - if (callback != null) { - callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), streamInfo); } - } + }); } @Override @Transactional public void add(StreamProxy param, GeneralCallback callback) { - MediaServerItem mediaInfo; - if (ObjectUtils.isEmpty(param.getMediaServerId()) || "auto".equals(param.getMediaServerId())){ - mediaInfo = mediaServerService.getMediaServerForMinimumLoad(null); - }else { - mediaInfo = mediaServerService.getOne(param.getMediaServerId()); - } - if (mediaInfo == null) { - logger.warn("[添加拉流代理] 未找到在线的ZLM..."); - throw new ControllerException(ErrorCode.ERROR100.getCode(), "保存代理未找到可用的ZLM"); - } proxyParamHandler(param); - param.setMediaServerId(mediaInfo.getId()); StreamProxy streamProxyInDb = streamProxyMapper.selectOne(param.getApp(), param.getStream()); if (streamProxyInDb != null) { throw new ControllerException(ErrorCode.ERROR100.getCode(), "app/stream已经存在"); } - if (!param.isEnable()) { - param.setStatus(false); - } addProxyToDb(param); if (callback != null) { callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), null); } - taskExecutor.execute(()->{ - startProxy(param, mediaInfo, (code, msg, data) -> { - if (code == ErrorCode.SUCCESS.getCode()) { - param.setStatus(true); - } else { - if (param.isEnableRemoveNoneReader()) { - return; - } - param.setProxyError(msg); - param.setStatus(false); - } -// updateStatusById(param.get); - }); - }); } @Override @@ -279,7 +238,6 @@ public class StreamProxyServiceImpl implements IStreamProxyService { boolean stopOldProxy = !streamProxyInDb.getType().equals(param.getType()) || !streamProxyInDb.getUrl().equals(param.getUrl()) || !streamProxyInDb.getMediaServerId().equals(param.getMediaServerId()) - || (streamProxyInDb.isEnable() && !param.isEnable()) || (streamProxyInDb.getType().equals("ffmpeg") && ( streamProxyInDb.getDstUrl().equals(param.getDstUrl()) || streamProxyInDb.getFfmpegCmdKey().equals(param.getFfmpegCmdKey()) @@ -288,9 +246,9 @@ public class StreamProxyServiceImpl implements IStreamProxyService { // 如果是开启代理这是开启代理结束后的回调 final GeneralCallback startProxyCallback = (code, msg, data) -> { if (code == ErrorCode.SUCCESS.getCode()) { - param.setStatus(true); + param.setPulling(true); } else { - param.setStatus(false); + param.setPulling(false); if (param.isEnableRemoveNoneReader()) { return; } @@ -300,17 +258,10 @@ public class StreamProxyServiceImpl implements IStreamProxyService { }; if(stopOldProxy) { stopProxy(param, mediaInfo, (code, msg, data) -> { - if (param.isEnable()) { + if (param.isPulling()) { startProxy(param, mediaInfo, startProxyCallback); } }); - }else { - if (param.isEnable()) { - startProxy(param, mediaInfo, startProxyCallback); - }else { - param.setStatus(false); - updateProxyToDb(param); - } } }); } @@ -335,15 +286,15 @@ public class StreamProxyServiceImpl implements IStreamProxyService { if (callback != null) { callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), streamInfo); } + return; } - return; } String delayTalkKey = UUID.randomUUID().toString(); HookSubscribeForStreamChange hookSubscribeForStreamChange = HookSubscribeFactory.on_stream_changed(streamProxy.getApp(), streamProxy.getStream(), true, "rtsp", mediaInfo.getId()); hookSubscribe.addSubscribe(hookSubscribeForStreamChange, (mediaServerItem, response) -> { dynamicTask.stop(delayTalkKey); - streamProxy.setStatus(true); + streamProxy.setPulling(true); StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStream( mediaInfo, streamProxy.getApp(), streamProxy.getStream(), null, null); logger.info("[开始拉流代理] 成功: {}/{}", streamProxy.getApp(), streamProxy.getStream()); @@ -558,7 +509,7 @@ public class StreamProxyServiceImpl implements IStreamProxyService { if (streamProxy == null) { return; } - if (streamProxy.isEnable()) { + if (streamProxy.isPulling()) { String mediaServerId = streamProxy.getMediaServerId(); MediaServerItem mediaServerItem = mediaServerService.getOne(mediaServerId); if (mediaServerItem != null) { @@ -581,9 +532,9 @@ public class StreamProxyServiceImpl implements IStreamProxyService { @Transactional public void start(String app, String stream, GeneralCallback callback) { StreamProxy streamProxy = streamProxyMapper.selectOne(app, stream); - if (streamProxy == null || !streamProxy.isEnable()){ + if (streamProxy == null ){ if (callback != null) { - callback.run(ErrorCode.ERROR100.getCode(), "代理不存在或未启用", null); + callback.run(ErrorCode.ERROR100.getCode(), "代理不存在", null); } return; } @@ -597,9 +548,9 @@ public class StreamProxyServiceImpl implements IStreamProxyService { } startProxy(streamProxy, mediaServerItem, (code, msg, data) -> { if (code == ErrorCode.SUCCESS.getCode()) { - streamProxy.setStatus(true); + streamProxy.setPulling(true); }else { - streamProxy.setStatus(false); + streamProxy.setPulling(false); } streamProxy.setUpdateTime(DateUtil.getNow()); updateProxyToDb(streamProxy); @@ -613,9 +564,9 @@ public class StreamProxyServiceImpl implements IStreamProxyService { @Transactional public void stop(String app, String stream, GeneralCallback callback) { StreamProxy streamProxy = streamProxyMapper.selectOne(app, stream); - if (streamProxy == null || !streamProxy.isEnable()){ + if (streamProxy == null){ if (callback != null) { - callback.run(ErrorCode.ERROR100.getCode(), "代理不存在或未启用", null); + callback.run(ErrorCode.ERROR100.getCode(), "代理不存在", null); } return; } @@ -628,7 +579,7 @@ public class StreamProxyServiceImpl implements IStreamProxyService { return; } stopProxy(streamProxy, mediaServerItem, (code, msg, data) -> { - streamProxy.setStatus(false); + streamProxy.setPulling(false); streamProxy.setUpdateTime(DateUtil.getNow()); updateProxyToDb(streamProxy); if (callback != null) { @@ -680,31 +631,11 @@ public class StreamProxyServiceImpl implements IStreamProxyService { // 移除拉流代理生成的流信息 syncPullStream(mediaServerId); - - // 恢复流代理, 只查找这个这个流媒体 - List streamProxyListForEnable = streamProxyMapper.selectForEnableInMediaServer( - mediaServerId, true); - for (StreamProxy streamProxyDto : streamProxyListForEnable) { - logger.info("恢复流代理," + streamProxyDto.getApp() + "/" + streamProxyDto.getStream()); - MediaServerItem mediaServerItem = mediaServerService.getOne(streamProxyDto.getMediaServerId()); - startProxy(streamProxyDto, mediaServerItem, (code, msg, data) -> { - if (code == ErrorCode.ERROR100.getCode()) { - if (!streamProxyDto.isStatus()) { - updateStatusById(streamProxyDto, true); - } - } else { - if (streamProxyDto.isStatus()) { - updateStatusById(streamProxyDto, false); - } - } - - }); - } } @Transactional public void updateStatusById(StreamProxy streamProxy, boolean status) { - streamProxyMapper.updateStatusById(streamProxy.getId(), status); + streamProxyMapper.updatePullingById(streamProxy.getId(), status); if (streamProxy.getCommonGbChannelId() > 0) { List ids = new ArrayList<>(); ids.add(streamProxy.getCommonGbChannelId()); @@ -733,7 +664,7 @@ public class StreamProxyServiceImpl implements IStreamProxyService { } streamProxyMapper.deleteAutoRemoveItemByMediaServerId(mediaServerId); // 其他的流设置离线 - streamProxyMapper.updateStatusByMediaServerId(mediaServerId, false); + streamProxyMapper.updatePullingStatusByMediaServerId(mediaServerId, false); String type = "PULL"; // 发送redis消息 @@ -808,7 +739,7 @@ public class StreamProxyServiceImpl implements IStreamProxyService { public ResourceBaseInfo getOverview() { int total = streamProxyMapper.getAllCount(); - int online = streamProxyMapper.getOnline(); + int online = streamProxyMapper.getPulline(); return new ResourceBaseInfo(total, online); } @@ -860,11 +791,6 @@ public class StreamProxyServiceImpl implements IStreamProxyService { streamProxyMapper.updateStreamGPS(gpsMsgInfoList); } - @Override - public List getAllForEnable() { - return streamProxyMapper.selectForEnable(true); - } - @Override public void getStreamProxyById(Integer id, GeneralCallback callback) { assert id != null; @@ -878,4 +804,21 @@ public class StreamProxyServiceImpl implements IStreamProxyService { callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), streamInfo); } } + + @Override + public void play(Integer id, GeneralCallback callback) { + StreamProxy streamProxy = streamProxyMapper.selectOneById(id); + assert streamProxy != null; + String mediaServerId = streamProxy.getMediaServerId(); + MediaServerItem mediaServerItem; + if (ObjectUtils.isEmpty(mediaServerId) || mediaServerId.equals("auto")) { + mediaServerItem = mediaServerService.getMediaServerForMinimumLoad(null); + }else { + mediaServerItem = mediaServerService.getOne(mediaServerId); + } + if (mediaServerItem == null && callback != null) { + callback.run(ErrorCode.ERROR100.getCode(), "未找到可用的节点", null); + } + startProxy(streamProxy, mediaServerItem, callback); + } } diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java index 42204479..1207ab42 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java @@ -248,7 +248,7 @@ public class StreamPushServiceImpl implements IStreamPushService { streamPushMapper.deleteWithoutGBId(mediaServerId); // 其他的流设置未启用 streamPushMapper.updateStatusByMediaServerId(mediaServerId, false); - streamProxyMapper.updateStatusByMediaServerId(mediaServerId, false); + streamProxyMapper.updatePullingStatusByMediaServerId(mediaServerId, false); // 发送流停止消息 String type = "PUSH"; // 发送redis消息 diff --git a/src/main/java/com/genersoft/iot/vmp/storager/dao/StreamProxyMapper.java b/src/main/java/com/genersoft/iot/vmp/storager/dao/StreamProxyMapper.java index 5128aaa2..cb3e7565 100755 --- a/src/main/java/com/genersoft/iot/vmp/storager/dao/StreamProxyMapper.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/dao/StreamProxyMapper.java @@ -13,11 +13,11 @@ import java.util.List; public interface StreamProxyMapper { @Insert("INSERT INTO wvp_stream_proxy (type, name, app, stream,media_server_id, url, dst_url, " + - "timeout_ms, ffmpeg_cmd_key, rtp_type, enable_audio, enable_mp4, enable, status, stream_key, " + + "timeout_ms, ffmpeg_cmd_key, rtp_type, enable_audio, enable_mp4, pulling, stream_key, " + "enable_remove_none_reader, enable_disable_none_reader, create_time, longitude, latitude, " + "common_gb_channel_id, gb_id) VALUES " + "(#{type}, #{name}, #{app}, #{stream}, #{mediaServerId}, #{url}, #{dstUrl}, " + - "#{timeoutMs}, #{ffmpegCmdKey}, #{rtpType}, #{enableAudio}, #{enableMp4}, #{enable}, #{status}, #{streamKey}, " + + "#{timeoutMs}, #{ffmpegCmdKey}, #{rtpType}, #{enableAudio}, #{enableMp4}, #{pulling}, #{streamKey}, " + "#{enableRemoveNoneReader}, #{enableDisableNoneReader}, #{createTime} , #{longitude} , #{latitude}, " + "#{commonGbChannelId}, #{gbId})") @Options(useGeneratedKeys=true, keyProperty="id", keyColumn="id") @@ -35,8 +35,7 @@ public interface StreamProxyMapper { "ffmpeg_cmd_key=#{ffmpegCmdKey}, " + "rtp_type=#{rtpType}, " + "enable_audio=#{enableAudio}, " + - "enable=#{enable}, " + - "status=#{status}, " + + "pulling=#{pulling}, " + "stream_key=#{streamKey}, " + "enable_remove_none_reader=#{enableRemoveNoneReader}, " + "enable_disable_none_reader=#{enableDisableNoneReader}, " + @@ -51,42 +50,26 @@ public interface StreamProxyMapper { "SELECT * FROM wvp_stream_proxy where 1 = 1 " + " AND (app LIKE '%${query}%' OR stream LIKE '%${query}%' OR name LIKE '%${query}%') " + " AND media_server_id=#{mediaServerId} " + - " AND status=true" + - " AND status=false" + + " AND pulling=true" + + " AND pulling=false" + "order by create_time desc"+ " " ) List selectAll(@Param("query") String query, - @Param("online") Boolean online, + @Param("pulling") Boolean pulling, @Param("mediaServerId") String mediaServerId); - @Select("SELECT st.* FROM wvp_stream_proxy st WHERE st.enable=#{enable} order by st.create_time desc") - List selectForEnable(boolean enable); @Select("SELECT st.* from wvp_stream_proxy st WHERE st.app=#{app} AND st.stream=#{stream}") StreamProxy selectOne(@Param("app") String app, @Param("stream") String stream); - @Select("SELECT st.* FROM wvp_stream_proxy st " + - "WHERE st.enable=#{enable} and st.media_server_id= #{id} order by st.create_time desc") - List selectForEnableInMediaServer(@Param("id") String id, @Param("enable") boolean enable); - - @Select("SELECT st.* FROM wvp_stream_proxy st " + - "WHERE st.media_server_id= #{id} order by st.create_time desc") - List selectInMediaServer(String id); - @Update("UPDATE wvp_stream_proxy " + - "SET status=#{status} " + + "SET pulling=#{pulling} " + "WHERE media_server_id=#{mediaServerId}") - void updateStatusByMediaServerId(@Param("mediaServerId") String mediaServerId, @Param("status") boolean status); - + void updatePullingStatusByMediaServerId(@Param("mediaServerId") String mediaServerId, @Param("pulling") boolean pulling); @Update("UPDATE wvp_stream_proxy " + - "SET status=#{status} " + - "WHERE app=#{app} AND stream=#{stream}") - int updateStatus(@Param("app") String app, @Param("stream") String stream, @Param("status") boolean status); - - @Update("UPDATE wvp_stream_proxy " + - "SET status=#{status} " + + "SET pulling=#{pulling} " + "WHERE id=#{id}") - int updateStatusById(@Param("id") int id, @Param("status") boolean status); + int updatePullingById(@Param("id") int id, @Param("pulling") boolean pulling); @Delete("DELETE FROM wvp_stream_proxy WHERE enable_remove_none_reader=true AND media_server_id=#{mediaServerId}") void deleteAutoRemoveItemByMediaServerId(String mediaServerId); @@ -94,15 +77,15 @@ public interface StreamProxyMapper { @Select("SELECT st.* FROM wvp_stream_proxy st WHERE st.enable_remove_none_reader=true AND st.media_server_id=#{mediaServerId} order by st.create_time desc") List selectAutoRemoveItemByMediaServerId(String mediaServerId); - @Select("select count(1) as total, sum(status) as online from wvp_stream_proxy") + @Select("select count(1) as total, sum(pulling) as online from wvp_stream_proxy") ResourceBaseInfo getOverview(); @Select("select count(1) from wvp_stream_proxy") int getAllCount(); - @Select("select count(1) from wvp_stream_proxy where status = true") - int getOnline(); + @Select("select count(1) from wvp_stream_proxy where pulling = true") + int getPulline(); @Update({"