diff --git a/src/main/java/com/genersoft/iot/vmp/conf/SipPlatformRunner.java b/src/main/java/com/genersoft/iot/vmp/conf/SipPlatformRunner.java index b3e86271..3a734b6f 100644 --- a/src/main/java/com/genersoft/iot/vmp/conf/SipPlatformRunner.java +++ b/src/main/java/com/genersoft/iot/vmp/conf/SipPlatformRunner.java @@ -38,32 +38,33 @@ public class SipPlatformRunner implements CommandLineRunner { @Override public void run(String... args) throws Exception { // 获取所有启用的平台 - List parentPlatforms = storager.queryEnableParentPlatformList(true); + List parentPlatforms = platformService.queryEnablePlatformList(); - for (Platform parentPlatform : parentPlatforms) { + for (Platform platform : parentPlatforms) { - PlatformCatch parentPlatformCatchOld = redisCatchStorage.queryPlatformCatchInfo(parentPlatform.getServerGBId()); + PlatformCatch parentPlatformCatchOld = redisCatchStorage.queryPlatformCatchInfo(platform.getServerGBId()); // 更新缓存 PlatformCatch parentPlatformCatch = new PlatformCatch(); - parentPlatformCatch.setPlatform(parentPlatform); - parentPlatformCatch.setId(parentPlatform.getServerGBId()); + parentPlatformCatch.setPlatform(platform); + parentPlatformCatch.setId(platform.getServerGBId()); redisCatchStorage.updatePlatformCatchInfo(parentPlatformCatch); if (parentPlatformCatchOld != null) { // 取消订阅 try { - sipCommanderForPlatform.unregister(parentPlatform, parentPlatformCatchOld.getSipTransactionInfo(), null, (eventResult)->{ - platformService.login(parentPlatform); + log.info("[平台主动注销] {}({})", platform.getName(), platform.getServerGBId()); + sipCommanderForPlatform.unregister(platform, parentPlatformCatchOld.getSipTransactionInfo(), null, (eventResult)->{ + platformService.login(platform); }); } catch (Exception e) { log.error("[命令发送失败] 国标级联 注销: {}", e.getMessage()); - platformService.offline(parentPlatform, true); + platformService.offline(platform, true); continue; } } - // 设置所有平台离线 - platformService.offline(parentPlatform, false); + // 设置平台离线 + platformService.offline(platform, false); } } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/dao/PlatformChannelMapper.java b/src/main/java/com/genersoft/iot/vmp/gb28181/dao/PlatformChannelMapper.java index 0e188e5c..e1a2d5cd 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/dao/PlatformChannelMapper.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/dao/PlatformChannelMapper.java @@ -93,7 +93,7 @@ public interface PlatformChannelMapper { "left join wvp_device_channel dc on " + "dc.id = pgc.device_channel_id " + "WHERE " + - "dc.channel_id = #{channelId} and pp.status = true " + + "dc.device_id = #{channelId} and pp.status = true " + "AND pp.server_gb_id IN" + " #{item}" + " ") diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/dao/PlatformMapper.java b/src/main/java/com/genersoft/iot/vmp/gb28181/dao/PlatformMapper.java index 2399ea29..b15b0d2e 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/dao/PlatformMapper.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/dao/PlatformMapper.java @@ -83,4 +83,7 @@ public interface PlatformMapper { @Update("UPDATE wvp_platform SET status=#{online} WHERE server_gb_id=#{platformGbID}" ) int updateStatus(@Param("platformGbID") String platformGbID, @Param("online") boolean online); + @Select("SELECT * FROM wvp_platform WHERE enable=true") + List queryEnablePlatformList(); + } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/event/record/RecordEndEventListener.java b/src/main/java/com/genersoft/iot/vmp/gb28181/event/record/RecordEndEventListener.java index 067c7f24..411b54d5 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/event/record/RecordEndEventListener.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/event/record/RecordEndEventListener.java @@ -28,11 +28,11 @@ public class RecordEndEventListener implements ApplicationListener queryEnablePlatformList(); + + } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/GbChannelPlayServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/GbChannelPlayServiceImpl.java index 57af9c9c..df94e217 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/GbChannelPlayServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/GbChannelPlayServiceImpl.java @@ -8,6 +8,7 @@ import com.genersoft.iot.vmp.gb28181.bean.PlayException; import com.genersoft.iot.vmp.gb28181.service.IGbChannelPlayService; import com.genersoft.iot.vmp.gb28181.service.IPlayService; import com.genersoft.iot.vmp.service.bean.ErrorCallback; +import com.genersoft.iot.vmp.service.bean.InviteErrorCode; import com.genersoft.iot.vmp.streamProxy.service.IStreamProxyPlayService; import com.genersoft.iot.vmp.streamPush.service.IStreamPushPlayService; import lombok.extern.slf4j.Slf4j; @@ -38,13 +39,13 @@ public class GbChannelPlayServiceImpl implements IGbChannelPlayService { } log.info("[点播通用通道] 类型:{}, 通道: {}({})", inviteInfo.getSessionName(), channel.getGbName(), channel.getGbDeviceId()); if ("Play".equalsIgnoreCase(inviteInfo.getSessionName())) { - if (channel.getGbDeviceDbId() > 0) { + if (channel.getGbDeviceDbId() != null) { // 国标通道 playGbDeviceChannel(channel, callback); - } else if (channel.getStreamProxyId() > 0) { + } else if (channel.getStreamProxyId() != null) { // 拉流代理 playProxy(channel, callback); - } else if (channel.getStreamPushId() > 0) { + } else if (channel.getStreamPushId() != null) { // 推流 playPush(channel, platform.getServerGBId(), platform.getName(), callback); } else { @@ -53,14 +54,14 @@ public class GbChannelPlayServiceImpl implements IGbChannelPlayService { throw new PlayException(Response.SERVER_INTERNAL_ERROR, "server internal error"); } }else if ("Playback".equals(inviteInfo.getSessionName())) { - if (channel.getGbDeviceDbId() > 0) { + if (channel.getGbDeviceDbId() != null) { // 国标通道 playbackGbDeviceChannel(channel, inviteInfo.getStartTime(), inviteInfo.getStopTime(), callback); - } else if (channel.getStreamProxyId() > 0) { + } else if (channel.getStreamProxyId() != null) { // 拉流代理 log.warn("[回放通用通道] 不支持回放拉流代理的录像: {}({})", channel.getGbName(), channel.getGbDeviceId()); throw new PlayException(Response.FORBIDDEN, "forbidden"); - } else if (channel.getStreamPushId() > 0) { + } else if (channel.getStreamPushId() != null) { // 推流 log.warn("[回放通用通道] 不支持回放推流的录像: {}({})", channel.getGbName(), channel.getGbDeviceId()); throw new PlayException(Response.FORBIDDEN, "forbidden"); @@ -70,7 +71,7 @@ public class GbChannelPlayServiceImpl implements IGbChannelPlayService { throw new PlayException(Response.SERVER_INTERNAL_ERROR, "server internal error"); } }else if ("Download".equals(inviteInfo.getSessionName())) { - if (channel.getGbDeviceDbId() > 0) { + if (channel.getGbDeviceDbId() != null) { int downloadSpeed = 4; try { if (inviteInfo.getDownloadSpeed() != null){ @@ -80,11 +81,11 @@ public class GbChannelPlayServiceImpl implements IGbChannelPlayService { // 国标通道 downloadGbDeviceChannel(channel, inviteInfo.getStartTime(), inviteInfo.getStopTime(), downloadSpeed, callback); - } else if (channel.getStreamProxyId() > 0) { + } else if (channel.getStreamProxyId() != null) { // 拉流代理 log.warn("[下载通用通道录像] 不支持下载拉流代理的录像: {}({})", channel.getGbName(), channel.getGbDeviceId()); throw new PlayException(Response.FORBIDDEN, "forbidden"); - } else if (channel.getStreamPushId() > 0) { + } else if (channel.getStreamPushId() != null) { // 推流 log.warn("[下载通用通道录像] 不支持下载推流的录像: {}({})", channel.getGbName(), channel.getGbDeviceId()); throw new PlayException(Response.FORBIDDEN, "forbidden"); @@ -107,6 +108,7 @@ public class GbChannelPlayServiceImpl implements IGbChannelPlayService { } catch (PlayException e) { callback.run(e.getCode(), e.getMsg(), null); } catch (Exception e) { + log.error("[点播失败] {}({})", channel.getGbName(), channel.getGbDeviceId(), e); callback.run(Response.BUSY_HERE, "busy here", null); } } @@ -118,7 +120,7 @@ public class GbChannelPlayServiceImpl implements IGbChannelPlayService { if (streamInfo == null) { callback.run(Response.BUSY_HERE, "busy here", null); }else { - callback.run(Response.OK, "success", streamInfo); + callback.run(InviteErrorCode.SUCCESS.getCode(), InviteErrorCode.SUCCESS.getMsg(), streamInfo); } }catch (Exception e) { callback.run(Response.BUSY_HERE, "busy here", null); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/GbChannelServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/GbChannelServiceImpl.java index 02bdc80e..20631e5d 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/GbChannelServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/GbChannelServiceImpl.java @@ -370,7 +370,6 @@ public class GbChannelServiceImpl implements IGbChannelService { return channelList; } - private Set getAllGroup(Set regionChannelList ) { if (regionChannelList.isEmpty()) { return new HashSet<>(); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlatformServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlatformServiceImpl.java index 47c14829..1b2a53be 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlatformServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlatformServiceImpl.java @@ -43,6 +43,7 @@ import javax.sip.InvalidArgumentException; import javax.sip.ResponseEvent; import javax.sip.SipException; import java.text.ParseException; +import java.util.Collections; import java.util.List; import java.util.UUID; import java.util.Vector; @@ -365,7 +366,7 @@ public class PlatformServiceImpl implements IPlatformService { @Override public void offline(Platform platform, boolean stopRegister) { - log.info("[平台离线]:{}", platform.getServerGBId()); + log.info("[平台离线]:{}({})", platform.getName(), platform.getServerGBId()); PlatformCatch platformCatch = redisCatchStorage.queryPlatformCatchInfo(platform.getServerGBId()); platformCatch.setKeepAliveReply(0); platformCatch.setRegisterAliveReply(0); @@ -376,17 +377,17 @@ public class PlatformServiceImpl implements IPlatformService { platformMapper.updateStatus(platform.getServerGBId(), false); // 停止所有推流 - log.info("[平台离线] {}, 停止所有推流", platform.getServerGBId()); + log.info("[平台离线] {}({}), 停止所有推流", platform.getName(), platform.getServerGBId()); stopAllPush(platform.getServerGBId()); // 清除注册定时 - log.info("[平台离线] {}, 停止定时注册任务", platform.getServerGBId()); + log.info("[平台离线] {}({}), 停止定时注册任务", platform.getName(), platform.getServerGBId()); final String registerTaskKey = REGISTER_KEY_PREFIX + platform.getServerGBId(); if (dynamicTask.contains(registerTaskKey)) { dynamicTask.stop(registerTaskKey); } // 清除心跳定时 - log.info("[平台离线] {}, 停止定时发送心跳任务", platform.getServerGBId()); + log.info("[平台离线] {}({}), 停止定时发送心跳任务", platform.getName(), platform.getServerGBId()); final String keepaliveTaskKey = KEEPALIVE_KEY_PREFIX + platform.getServerGBId(); if (dynamicTask.contains(keepaliveTaskKey)) { // 清除心跳任务 @@ -396,11 +397,11 @@ public class PlatformServiceImpl implements IPlatformService { SubscribeInfo catalogSubscribe = subscribeHolder.getCatalogSubscribe(platform.getServerGBId()); if (catalogSubscribe != null) { if (catalogSubscribe.getExpires() > 0) { - log.info("[平台离线] {}, 停止目录订阅回复", platform.getServerGBId()); + log.info("[平台离线] {}({}), 停止目录订阅回复", platform.getName(), platform.getServerGBId()); subscribeHolder.removeCatalogSubscribe(platform.getServerGBId()); } } - log.info("[平台离线] {}, 停止移动位置订阅回复", platform.getServerGBId()); + log.info("[平台离线] {}({}), 停止移动位置订阅回复", platform.getName(), platform.getServerGBId()); subscribeHolder.removeMobilePositionSubscribe(platform.getServerGBId()); // 发起定时自动重新注册 if (!stopRegister) { @@ -436,7 +437,7 @@ public class PlatformServiceImpl implements IPlatformService { // 添加注册任务 dynamicTask.startCron(registerTaskKey, // 注册失败(注册成功时由程序直接调用了online方法) - ()-> log.info("[国标级联] {},平台离线后持续发起注册,失败", platform.getServerGBId()), + ()-> log.info("[国标级联] {}({}),平台离线后持续发起注册,失败", platform.getName(), platform.getServerGBId()), 60*1000); }, null); } catch (InvalidArgumentException | ParseException | SipException e) { @@ -809,4 +810,9 @@ public class PlatformServiceImpl implements IPlatformService { public Platform queryOne(Integer platformId) { return platformMapper.query(platformId); } + + @Override + public List queryEnablePlatformList() { + return platformMapper.queryEnablePlatformList(); + } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java index 1c622b99..fe88997e 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java @@ -585,7 +585,7 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform { recordXml.append("\r\n") .append("\r\n"); - log.info("[国标级联] 发送录像数据通道:{}, 内容: {}", recordInfo.getChannelId(), recordXml); + log.debug("[国标级联] 发送录像数据通道:{}, 内容: {}", recordInfo.getChannelId(), recordXml); // callid CallIdHeader callIdHeader = sipSender.getNewCallIdHeader(parentPlatform.getDeviceIp(),parentPlatform.getTransport()); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java index 7a303bcb..061a9eae 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java @@ -28,6 +28,7 @@ import com.genersoft.iot.vmp.media.event.hook.HookSubscribe; import com.genersoft.iot.vmp.media.event.hook.HookType; import com.genersoft.iot.vmp.media.service.IMediaServerService; import com.genersoft.iot.vmp.media.zlm.SendRtpPortManager; +import com.genersoft.iot.vmp.service.bean.InviteErrorCode; import com.genersoft.iot.vmp.service.bean.MessageForPushChannel; import com.genersoft.iot.vmp.service.redisMsg.IRedisRpcService; import com.genersoft.iot.vmp.service.redisMsg.RedisPushStreamResponseListener; @@ -180,7 +181,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements } channelPlayService.start(channel, inviteInfo, platform, ((code, msg, streamInfo) -> { - if (code != Response.OK) { + if (code != InviteErrorCode.SUCCESS.getCode()) { try { responseAck(request, code, msg); } catch (SipException | InvalidArgumentException | ParseException e) { diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/query/cmd/RecordInfoQueryMessageHandler.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/query/cmd/RecordInfoQueryMessageHandler.java index 55782a0b..2343d232 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/query/cmd/RecordInfoQueryMessageHandler.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/query/cmd/RecordInfoQueryMessageHandler.java @@ -106,7 +106,7 @@ public class RecordInfoQueryMessageHandler extends SIPRequestProcessorParent imp } return; } - if (channel.getStreamProxyId() > 0 || channel.getStreamPushId() > 0) { + if (channel.getStreamProxyId() != null || channel.getStreamPushId() != null ) { log.info("[平台查询录像记录] 不支持查询推流和拉流代理的录像数据 {}/{}", parentPlatform.getName(), channelId ); try { responseAck(request, Response.NOT_IMPLEMENTED); // 回复未实现 @@ -126,7 +126,7 @@ public class RecordInfoQueryMessageHandler extends SIPRequestProcessorParent imp return; } // 接收录像数据 - recordEndEventListener.addEndEventHandler(channel.getGbDeviceId(), channelId, (recordInfo)->{ + recordEndEventListener.addEndEventHandler(device.getDeviceId(), channelId, (recordInfo)->{ try { log.info("[国标级联] 录像查询收到数据, 通道: {},准备转发===", channelId); cmderFroPlatform.recordInfo(channel, parentPlatform, request.getFromTag(), recordInfo); diff --git a/src/main/java/com/genersoft/iot/vmp/media/service/impl/MediaServerServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/media/service/impl/MediaServerServiceImpl.java index 9a0fda0f..699be99a 100755 --- a/src/main/java/com/genersoft/iot/vmp/media/service/impl/MediaServerServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/media/service/impl/MediaServerServiceImpl.java @@ -847,7 +847,7 @@ public class MediaServerServiceImpl implements IMediaServerService { log.info("[startSendRtpStream] 失败, mediaServer的类型: {},未找到对应的实现类", mediaServer.getType()); throw new ControllerException(ErrorCode.ERROR100.getCode(), "未找到mediaServer对应的实现类"); } - log.info("[开始推流] rtp/{}, 目标={}:{},SSRC={}, RTCP={}", sendRtpItem.getStream(), + log.info("[开始推流] {}/{}, 目标={}:{},SSRC={}, RTCP={}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort(), sendRtpItem.getSsrc(), sendRtpItem.isRtcp()); mediaNodeServerService.startSendRtpStream(mediaServer, sendRtpItem); } diff --git a/src/main/java/com/genersoft/iot/vmp/streamProxy/service/impl/StreamProxyPlayServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/streamProxy/service/impl/StreamProxyPlayServiceImpl.java index 915dade1..5e26e889 100755 --- a/src/main/java/com/genersoft/iot/vmp/streamProxy/service/impl/StreamProxyPlayServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/streamProxy/service/impl/StreamProxyPlayServiceImpl.java @@ -28,10 +28,6 @@ public class StreamProxyPlayServiceImpl implements IStreamProxyPlayService { @Autowired private IMediaServerService mediaServerService; - - - - @Override public StreamInfo start(int id) { StreamProxy streamProxy = streamProxyMapper.select(id);