diff --git a/src/main/java/com/genersoft/iot/vmp/common/InviteInfo.java b/src/main/java/com/genersoft/iot/vmp/common/InviteInfo.java index dbe9e090..8c91dc82 100644 --- a/src/main/java/com/genersoft/iot/vmp/common/InviteInfo.java +++ b/src/main/java/com/genersoft/iot/vmp/common/InviteInfo.java @@ -1,15 +1,17 @@ package com.genersoft.iot.vmp.common; import com.genersoft.iot.vmp.service.bean.SSRCInfo; +import lombok.Data; /** * 记录每次发送invite消息的状态 */ +@Data public class InviteInfo { private String deviceId; - private String channelId; + private Integer channelId; private String stream; @@ -28,7 +30,7 @@ public class InviteInfo { private StreamInfo streamInfo; - public static InviteInfo getInviteInfo(String deviceId, String channelId, String stream, SSRCInfo ssrcInfo, + public static InviteInfo getInviteInfo(String deviceId, Integer channelId, String stream, SSRCInfo ssrcInfo, String receiveIp, Integer receivePort, String streamMode, InviteSessionType type, InviteSessionStatus status) { InviteInfo inviteInfo = new InviteInfo(); @@ -44,84 +46,4 @@ public class InviteInfo { return inviteInfo; } - public String getDeviceId() { - return deviceId; - } - - public void setDeviceId(String deviceId) { - this.deviceId = deviceId; - } - - public String getChannelId() { - return channelId; - } - - public void setChannelId(String channelId) { - this.channelId = channelId; - } - - public InviteSessionType getType() { - return type; - } - - public void setType(InviteSessionType type) { - this.type = type; - } - - public InviteSessionStatus getStatus() { - return status; - } - - public void setStatus(InviteSessionStatus status) { - this.status = status; - } - - public StreamInfo getStreamInfo() { - return streamInfo; - } - - public void setStreamInfo(StreamInfo streamInfo) { - this.streamInfo = streamInfo; - } - - public String getStream() { - return stream; - } - - public void setStream(String stream) { - this.stream = stream; - } - - public SSRCInfo getSsrcInfo() { - return ssrcInfo; - } - - public void setSsrcInfo(SSRCInfo ssrcInfo) { - this.ssrcInfo = ssrcInfo; - } - - public String getReceiveIp() { - return receiveIp; - } - - public void setReceiveIp(String receiveIp) { - this.receiveIp = receiveIp; - } - - public Integer getReceivePort() { - return receivePort; - } - - public void setReceivePort(Integer receivePort) { - this.receivePort = receivePort; - } - - public String getStreamMode() { - return streamMode; - } - - public void setStreamMode(String streamMode) { - this.streamMode = streamMode; - } - } diff --git a/src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java b/src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java index c4911e4b..ad353d3b 100644 --- a/src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java +++ b/src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java @@ -26,7 +26,7 @@ public class VideoManagerConstants { public static final String KEEPLIVEKEY_PREFIX = "VMP_KEEPALIVE_"; // TODO 此处多了一个_,暂不修改 - public static final String INVITE_PREFIX = "VMP_INVITE"; + public static final String INVITE_PREFIX = "VMP_INVITE_INFO_"; public static final String PLAYER_PREFIX = "VMP_INVITE_PLAY_"; public static final String PLAY_BLACK_PREFIX = "VMP_INVITE_PLAYBACK_"; public static final String DOWNLOAD_PREFIX = "VMP_INVITE_DOWNLOAD_"; @@ -39,7 +39,7 @@ public class VideoManagerConstants { public static final String PLATFORM_REGISTER_INFO_PREFIX = "VMP_PLATFORM_REGISTER_INFO_"; - public static final String PLATFORM_SEND_RTP_INFO_PREFIX = "VMP_PLATFORM_SEND_RTP_INFO_"; + public static final String SEND_RTP_INFO_PREFIX = "VMP_SEND_RTP_INFO"; public static final String EVENT_ONLINE_REGISTER = "1"; diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/CommonGBChannel.java b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/CommonGBChannel.java index 74179cae..b3a41392 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/CommonGBChannel.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/CommonGBChannel.java @@ -301,8 +301,10 @@ public class CommonGBChannel { if (this.getGbCivilCode() != null) { content.append("" + this.getGbCivilCode() + "\n"); } - content.append("" + this.getGbParentId() + "\n") - .append("" + this.getGbBusinessGroupId() + "\n"); + if (this.getGbParentId() != null) { + content.append("" + this.getGbParentId() + "\n"); + } + content.append("" + this.getGbBusinessGroupId() + "\n"); } else { if (this.getGbManufacturer() != null) { content.append("" + this.getGbManufacturer() + "\n"); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/MobilePosition.java b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/MobilePosition.java index 8b08930c..52b9d798 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/MobilePosition.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/MobilePosition.java @@ -18,7 +18,7 @@ public class MobilePosition { /** * 通道Id */ - private String channelId; + private Integer channelId; /** * 设备名称 diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SendRtpItem.java b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SendRtpItem.java index 55f09df3..effcad8d 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SendRtpItem.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SendRtpItem.java @@ -3,7 +3,9 @@ package com.genersoft.iot.vmp.gb28181.bean; import com.genersoft.iot.vmp.service.bean.RequestPushStreamMsg; import com.genersoft.iot.vmp.common.VideoManagerConstants; +import lombok.Data; +@Data public class SendRtpItem { /** @@ -44,7 +46,7 @@ public class SendRtpItem { /** * 通道id */ - private String channelId; + private Integer channelId; /** * 推流状态 @@ -174,214 +176,6 @@ public class SendRtpItem { return sendRtpItem; } - public String getIp() { - return ip; - } - - public void setIp(String ip) { - this.ip = ip; - } - - public int getPort() { - return port; - } - - public void setPort(int port) { - this.port = port; - } - - public String getSsrc() { - return ssrc; - } - - public void setSsrc(String ssrc) { - this.ssrc = ssrc; - } - - public String getPlatformId() { - return platformId; - } - - public void setPlatformId(String platformId) { - this.platformId = platformId; - } - - public String getDeviceId() { - return deviceId; - } - - public void setDeviceId(String deviceId) { - this.deviceId = deviceId; - } - - public String getChannelId() { - return channelId; - } - - public void setChannelId(String channelId) { - this.channelId = channelId; - } - - public int getStatus() { - return status; - } - - public void setStatus(int status) { - this.status = status; - } - - public String getApp() { - return app; - } - - public void setApp(String app) { - this.app = app; - } - - public String getStream() { - return stream; - } - - public void setStream(String stream) { - this.stream = stream; - } - - public boolean isTcp() { - return tcp; - } - - public void setTcp(boolean tcp) { - this.tcp = tcp; - } - - public int getLocalPort() { - return localPort; - } - - public void setLocalPort(int localPort) { - this.localPort = localPort; - } - - public boolean isTcpActive() { - return tcpActive; - } - - public void setTcpActive(boolean tcpActive) { - this.tcpActive = tcpActive; - } - - public String getMediaServerId() { - return mediaServerId; - } - - public void setMediaServerId(String mediaServerId) { - this.mediaServerId = mediaServerId; - } - - public String getCallId() { - return callId; - } - - public void setCallId(String callId) { - this.callId = callId; - } - - public InviteStreamType getPlayType() { - return playType; - } - - public void setPlayType(InviteStreamType playType) { - this.playType = playType; - } - - public int getPt() { - return pt; - } - - public void setPt(int pt) { - this.pt = pt; - } - - public boolean isUsePs() { - return usePs; - } - - public void setUsePs(boolean usePs) { - this.usePs = usePs; - } - - public boolean isOnlyAudio() { - return onlyAudio; - } - - public void setOnlyAudio(boolean onlyAudio) { - this.onlyAudio = onlyAudio; - } - - public String getServerId() { - return serverId; - } - - public void setServerId(String serverId) { - this.serverId = serverId; - } - - public String getFromTag() { - return fromTag; - } - - public void setFromTag(String fromTag) { - this.fromTag = fromTag; - } - - public String getToTag() { - return toTag; - } - - public void setToTag(String toTag) { - this.toTag = toTag; - } - - public boolean isRtcp() { - return rtcp; - } - - public void setRtcp(boolean rtcp) { - this.rtcp = rtcp; - } - - public String getReceiveStream() { - return receiveStream; - } - - public void setReceiveStream(String receiveStream) { - this.receiveStream = receiveStream; - } - - public String getPlatformName() { - return platformName; - } - - public void setPlatformName(String platformName) { - this.platformName = platformName; - } - - public String getLocalIp() { - return localIp; - } - - public void setLocalIp(String localIp) { - this.localIp = localIp; - } - - public String getSessionName() { - return sessionName; - } - - public void setSessionName(String sessionName) { - this.sessionName = sessionName; - } - @Override public String toString() { return "SendRtpItem{" + @@ -415,13 +209,12 @@ public class SendRtpItem { } public String getRedisKey() { - String key = VideoManagerConstants.PLATFORM_SEND_RTP_INFO_PREFIX + + return VideoManagerConstants.SEND_RTP_INFO_PREFIX + serverId + "_" + mediaServerId + "_" + platformId + "_" + channelId + "_" + stream + "_" + callId; - return key; } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SsrcTransaction.java b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SsrcTransaction.java index 6ed8d144..27f3f4cb 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SsrcTransaction.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SsrcTransaction.java @@ -1,11 +1,14 @@ package com.genersoft.iot.vmp.gb28181.bean; import com.genersoft.iot.vmp.common.InviteSessionType; +import lombok.Data; +@Data public class SsrcTransaction { private String deviceId; - private String channelId; + private String platformId; + private Integer channelId; private String callId; private String stream; private String mediaServerId; @@ -15,67 +18,20 @@ public class SsrcTransaction { private InviteSessionType type; - public String getDeviceId() { - return deviceId; - } - - public void setDeviceId(String deviceId) { + public SsrcTransaction(String deviceId, String platformId, Integer channelId, String callId, + String stream, String mediaServerId, String ssrc, + SipTransactionInfo sipTransactionInfo, InviteSessionType type) { this.deviceId = deviceId; - } - - public String getChannelId() { - return channelId; - } - - public void setChannelId(String channelId) { + this.platformId = platformId; this.channelId = channelId; - } - - public String getCallId() { - return callId; - } - - public void setCallId(String callId) { this.callId = callId; - } - - public String getStream() { - return stream; - } - - public void setStream(String stream) { this.stream = stream; - } - - public String getMediaServerId() { - return mediaServerId; - } - - public void setMediaServerId(String mediaServerId) { this.mediaServerId = mediaServerId; - } - - public String getSsrc() { - return ssrc; - } - - public void setSsrc(String ssrc) { this.ssrc = ssrc; - } - - public InviteSessionType getType() { - return type; - } - - public void setType(InviteSessionType type) { + this.sipTransactionInfo = sipTransactionInfo; this.type = type; } - public SipTransactionInfo getSipTransactionInfo() { - return sipTransactionInfo; - } - - public void setSipTransactionInfo(SipTransactionInfo sipTransactionInfo) { - this.sipTransactionInfo = sipTransactionInfo; + public SsrcTransaction() { } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/dao/DeviceChannelMapper.java b/src/main/java/com/genersoft/iot/vmp/gb28181/dao/DeviceChannelMapper.java index 5659828c..74fc8d78 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/dao/DeviceChannelMapper.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/dao/DeviceChannelMapper.java @@ -539,7 +539,7 @@ public interface DeviceChannelMapper { List queryAllChannels(@Param("deviceDbId") int deviceDbId); @Select("select de.* from wvp_device de left join wvp_device_channel dc on de.device_id = dc.deviceId where dc.device_id=#{channelId}") - List getDeviceByChannelId(String channelId); + List getDeviceByChannelDeviceId(String channelId); @Delete({""}) DeviceChannel getOneByDeviceId(@Param("deviceDbId") int deviceDbId, @Param("channelId") String channelId); + + + @Update(value = {"UPDATE wvp_device_channel SET stream_id=null WHERE id=#{channelId}"}) + void stopPlayById(@Param("channelId") Integer channelId); } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/dao/PlatformChannelMapper.java b/src/main/java/com/genersoft/iot/vmp/gb28181/dao/PlatformChannelMapper.java index cc607ae2..b82b61c0 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/dao/PlatformChannelMapper.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/dao/PlatformChannelMapper.java @@ -39,19 +39,17 @@ public interface PlatformChannelMapper { @Select(" ") - List queryPlatFormListForGBWithGBId(@Param("channelId") String channelId, @Param("platforms") List platforms); + List queryPlatFormListForGBWithGBId(@Param("channelId") Integer channelId, List platforms); @Select("select dc.channel_id, dc.device_id,dc.name,d.manufacturer,d.model,d.firmware\n" + "from wvp_platform_channel pgc\n" + @@ -336,7 +334,7 @@ public interface PlatformChannelMapper { " (#{platformId}, #{item.id} )" + "" + "") - int addPlatformGroup(List groupListNotShare, @Param("platformId") Integer platformId); + int addPlatformGroup(Collection groupListNotShare, @Param("platformId") Integer platformId); @Insert("") - Set queryShareChildrenGroup(@Param("parentId") String parentId, @Param("platformId") Integer platformId); + Set queryShareChildrenGroup(@Param("parentId") Integer parentId, @Param("platformId") Integer platformId); @Select(" ") CommonGBChannel queryShareChannel(@Param("platformId") int platformId, @Param("gbId") int gbId); + + + @Select(" ") + Set queryShareGroup(@Param("platformId") Integer platformId); + } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/catalog/CatalogEventLister.java b/src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/catalog/CatalogEventLister.java index 91f2bdd4..28a42264 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/catalog/CatalogEventLister.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/catalog/CatalogEventLister.java @@ -4,9 +4,9 @@ import com.genersoft.iot.vmp.gb28181.bean.CommonGBChannel; import com.genersoft.iot.vmp.gb28181.bean.Platform; import com.genersoft.iot.vmp.gb28181.bean.SubscribeHolder; import com.genersoft.iot.vmp.gb28181.bean.SubscribeInfo; +import com.genersoft.iot.vmp.gb28181.service.IPlatformChannelService; import com.genersoft.iot.vmp.gb28181.service.IPlatformService; import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform; -import com.genersoft.iot.vmp.storager.IVideoManagerStorage; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.ApplicationListener; @@ -28,7 +28,7 @@ import java.util.Map; public class CatalogEventLister implements ApplicationListener { @Autowired - private IVideoManagerStorage storager; + private IPlatformChannelService platformChannelService; @Autowired private IPlatformService platformService; @@ -62,7 +62,8 @@ public class CatalogEventLister implements ApplicationListener { if (event.getChannels() != null) { if (!platforms.isEmpty()) { for (CommonGBChannel deviceChannel : event.getChannels()) { - List parentPlatformsForGB = storager.queryPlatFormListForGBWithGBId(deviceChannel.getGbDeviceId(), platforms); + List parentPlatformsForGB = platformChannelService.queryPlatFormListByChannelDeviceId( + deviceChannel.getGbId(), platforms); parentPlatformMap.put(deviceChannel.getGbDeviceId(), parentPlatformsForGB); channelMap.put(deviceChannel.getGbDeviceId(), deviceChannel); } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/mobilePosition/MobilePositionEvent.java b/src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/mobilePosition/MobilePositionEvent.java index 06866517..f6a4ad75 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/mobilePosition/MobilePositionEvent.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/mobilePosition/MobilePositionEvent.java @@ -1,20 +1,17 @@ package com.genersoft.iot.vmp.gb28181.event.subscribe.mobilePosition; import com.genersoft.iot.vmp.gb28181.bean.MobilePosition; +import lombok.Getter; +import lombok.Setter; import org.springframework.context.ApplicationEvent; + public class MobilePositionEvent extends ApplicationEvent { public MobilePositionEvent(Object source) { super(source); } + @Getter + @Setter private MobilePosition mobilePosition; - - public MobilePosition getMobilePosition() { - return mobilePosition; - } - - public void setMobilePosition(MobilePosition mobilePosition) { - this.mobilePosition = mobilePosition; - } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/mobilePosition/MobilePositionEventLister.java b/src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/mobilePosition/MobilePositionEventLister.java index 83aba09d..2ba42af5 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/mobilePosition/MobilePositionEventLister.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/mobilePosition/MobilePositionEventLister.java @@ -1,11 +1,12 @@ package com.genersoft.iot.vmp.gb28181.event.subscribe.mobilePosition; +import com.genersoft.iot.vmp.gb28181.bean.CommonGBChannel; import com.genersoft.iot.vmp.gb28181.bean.Platform; import com.genersoft.iot.vmp.gb28181.bean.SubscribeHolder; import com.genersoft.iot.vmp.gb28181.bean.SubscribeInfo; +import com.genersoft.iot.vmp.gb28181.service.IPlatformChannelService; import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommanderFroPlatform; import com.genersoft.iot.vmp.service.bean.GPSMsgInfo; -import com.genersoft.iot.vmp.storager.IVideoManagerStorage; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.ApplicationListener; @@ -24,7 +25,7 @@ import java.util.List; public class MobilePositionEventLister implements ApplicationListener { @Autowired - private IVideoManagerStorage storager; + private IPlatformChannelService platformChannelService; @Autowired private SIPCommanderFroPlatform sipCommanderFroPlatform; @@ -39,14 +40,18 @@ public class MobilePositionEventLister implements ApplicationListener parentPlatformsForGB = storager.queryPlatFormListForGBWithGBId(event.getMobilePosition().getChannelId(), platforms); + + List parentPlatformsForGB = platformChannelService.queryPlatFormListByChannelDeviceId(event.getMobilePosition().getChannelId(), platforms); for (Platform platform : parentPlatformsForGB) { log.info("[向上级发送MobilePosition] 通道:{},平台:{}, 位置: {}:{}", event.getMobilePosition().getChannelId(), platform.getServerGBId(), event.getMobilePosition().getLongitude(), event.getMobilePosition().getLatitude()); SubscribeInfo subscribe = subscribeHolder.getMobilePositionSubscribe(platform.getServerGBId()); try { - sipCommanderFroPlatform.sendNotifyMobilePosition(platform, GPSMsgInfo.getInstance(event.getMobilePosition()), + GPSMsgInfo gpsMsgInfo = GPSMsgInfo.getInstance(event.getMobilePosition()); + // 获取通道编号 + CommonGBChannel commonGBChannel = platformChannelService.queryChannelByPlatformIdAndChannelId(platform.getId(), event.getMobilePosition().getChannelId()); + sipCommanderFroPlatform.sendNotifyMobilePosition(platform, gpsMsgInfo, commonGBChannel, subscribe); } catch (InvalidArgumentException | ParseException | NoSuchFieldException | SipException | IllegalAccessException e) { diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/service/IDeviceChannelService.java b/src/main/java/com/genersoft/iot/vmp/gb28181/service/IDeviceChannelService.java index e176c355..ffb8c23a 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/service/IDeviceChannelService.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/service/IDeviceChannelService.java @@ -93,6 +93,8 @@ public interface IDeviceChannelService { void stopPlay(String deviceId, String channelId); + void stopPlay(Integer channelId); + void batchUpdateChannelGPS(List channelList); void batchAddMobilePosition(List addMobilePositionList); @@ -118,4 +120,5 @@ public interface IDeviceChannelService { DeviceChannel getRawChannel(int id); + DeviceChannel getOneById(Integer channelId); } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/service/IInviteStreamService.java b/src/main/java/com/genersoft/iot/vmp/gb28181/service/IInviteStreamService.java index 88c43b39..9903d59b 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/service/IInviteStreamService.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/service/IInviteStreamService.java @@ -5,6 +5,8 @@ import com.genersoft.iot.vmp.common.InviteSessionType; import com.genersoft.iot.vmp.common.StreamInfo; import com.genersoft.iot.vmp.service.bean.ErrorCallback; +import java.util.List; + /** * 记录国标点播的状态,包括实时预览,下载,录像回放 */ @@ -22,18 +24,12 @@ public interface IInviteStreamService { /** * 获取点播的状态信息 */ - InviteInfo getInviteInfo(InviteSessionType type, - String deviceId, - String channelId, - String stream); + InviteInfo getInviteInfo(InviteSessionType type, Integer channelId, String stream); /** * 移除点播的状态信息 */ - void removeInviteInfo(InviteSessionType type, - String deviceId, - String channelId, - String stream); + void removeInviteInfo(InviteSessionType type, Integer channelId, String stream); /** * 移除点播的状态信息 */ @@ -41,14 +37,14 @@ public interface IInviteStreamService { /** * 移除点播的状态信息 */ - void removeInviteInfoByDeviceAndChannel(InviteSessionType inviteSessionType, String deviceId, String channelId); + void removeInviteInfoByDeviceAndChannel(InviteSessionType inviteSessionType, Integer channelId); + + List getAllInviteInfo(InviteSessionType type, Integer channelId, String stream); /** * 获取点播的状态信息 */ - InviteInfo getInviteInfoByDeviceAndChannel(InviteSessionType type, - String deviceId, - String channelId); + InviteInfo getInviteInfoByDeviceAndChannel(InviteSessionType type, Integer channelId); /** * 获取点播的状态信息 @@ -59,12 +55,12 @@ public interface IInviteStreamService { /** * 添加一个invite回调 */ - void once(InviteSessionType type, String deviceId, String channelId, String stream, ErrorCallback callback); + void once(InviteSessionType type, Integer channelId, String stream, ErrorCallback callback); /** * 调用一个invite回调 */ - void call(InviteSessionType type, String deviceId, String channelId, String stream, int code, String msg, StreamInfo data); + void call(InviteSessionType type, Integer channelId, String stream, int code, String msg, StreamInfo data); /** * 清空一个设备的所有invite信息 diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/service/IPlatformChannelService.java b/src/main/java/com/genersoft/iot/vmp/gb28181/service/IPlatformChannelService.java index 983fdf76..66233a6f 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/service/IPlatformChannelService.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/service/IPlatformChannelService.java @@ -1,6 +1,7 @@ package com.genersoft.iot.vmp.gb28181.service; import com.genersoft.iot.vmp.gb28181.bean.CommonGBChannel; +import com.genersoft.iot.vmp.gb28181.bean.Group; import com.genersoft.iot.vmp.gb28181.bean.Platform; import com.genersoft.iot.vmp.gb28181.bean.PlatformChannel; import com.github.pagehelper.PageInfo; @@ -36,4 +37,12 @@ public interface IPlatformChannelService { void removeChannelByDevice(Integer platformId, List deviceIds); void updateCustomChannel(PlatformChannel channel); + + void checkGroupRemove(List channelList, List groups); + + void checkGroupAdd(List channelList); + + List queryPlatFormListByChannelDeviceId(Integer channelId, List platforms); + + CommonGBChannel queryChannelByPlatformIdAndChannelId(Integer platformId, Integer channelId); } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/service/IPlatformService.java b/src/main/java/com/genersoft/iot/vmp/gb28181/service/IPlatformService.java index 7d087160..6c43dcf5 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/service/IPlatformService.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/service/IPlatformService.java @@ -75,7 +75,7 @@ public interface IPlatformService { * @param errorEvent 信令错误事件 * @param timeoutCallback 超时事件 */ - void broadcastInvite(Platform platform, String channelId, MediaServer mediaServerItem, HookSubscribe.Event hookEvent, + void broadcastInvite(Platform platform, CommonGBChannel channelId, MediaServer mediaServerItem, HookSubscribe.Event hookEvent, SipSubscribe.Event errorEvent, InviteTimeOutCallback timeoutCallback) throws InvalidArgumentException, ParseException, SipException; /** diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/service/IPlayService.java b/src/main/java/com/genersoft/iot/vmp/gb28181/service/IPlayService.java index 45f88406..1e7d69ef 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/service/IPlayService.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/service/IPlayService.java @@ -21,7 +21,7 @@ import java.text.ParseException; */ public interface IPlayService { - void play(MediaServer mediaServerItem, SSRCInfo ssrcInfo, Device device, DeviceChannel channelId, + void play(MediaServer mediaServerItem, SSRCInfo ssrcInfo, Device device, DeviceChannel channel, ErrorCallback callback); SSRCInfo play(MediaServer mediaServerItem, String deviceId, String channelId, String ssrc, ErrorCallback callback); @@ -30,7 +30,6 @@ public interface IPlayService { MediaServer getNewMediaServerItem(Device device); void playBack(String deviceId, String channelId, String startTime, String endTime, ErrorCallback callback); - void playBack(MediaServer mediaServerItem, SSRCInfo ssrcInfo, String deviceId, String channelId, String startTime, String endTime, ErrorCallback callback); void zlmServerOffline(String mediaServerId); void download(String deviceId, String channelId, String startTime, String endTime, int downloadSpeed, ErrorCallback callback); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/DeviceChannelServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/DeviceChannelServiceImpl.java index 9e51ff48..d9a1b6f6 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/DeviceChannelServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/DeviceChannelServiceImpl.java @@ -208,7 +208,7 @@ public class DeviceChannelServiceImpl implements IDeviceChannelService { @Override public List getDeviceByChannelId(String channelId) { - return channelMapper.getDeviceByChannelId(channelId); + return channelMapper.getDeviceByChannelDeviceId(channelId); } @Override @@ -340,7 +340,7 @@ public class DeviceChannelServiceImpl implements IDeviceChannelService { } for (DeviceChannel channel : deviceChannels) { // 向关联了该通道并且开启移动位置订阅的上级平台发送移动位置订阅消息 - mobilePosition.setChannelId(channel.getDeviceId()); + mobilePosition.setChannelId(channel.getId()); try { eventPublisher.mobilePositionEventPublish(mobilePosition); }catch (Exception e) { @@ -378,6 +378,11 @@ public class DeviceChannelServiceImpl implements IDeviceChannelService { channelMapper.stopPlay(device.getId(), channelId); } + @Override + public void stopPlay(Integer channelId) { + channelMapper.stopPlayById(channelId); + } + @Override @Transactional public void batchUpdateChannelGPS(List channelList) { @@ -596,4 +601,9 @@ public class DeviceChannelServiceImpl implements IDeviceChannelService { public DeviceChannel getRawChannel(int id) { return deviceMapper.getRawChannel(id); } + + @Override + public DeviceChannel getOneById(Integer channelId) { + return channelMapper.getOne(channelId); + } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/GbChannelServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/GbChannelServiceImpl.java index 2fae3b02..ffbe6f64 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/GbChannelServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/GbChannelServiceImpl.java @@ -500,12 +500,15 @@ public class GbChannelServiceImpl implements IGbChannelService { } @Override + @Transactional public void removeParentIdByBusinessGroup(String businessGroup) { List channelList = commonGBChannelMapper.queryByBusinessGroup(businessGroup); if (channelList.isEmpty()) { return; } int result = commonGBChannelMapper.removeParentIdByChannels(channelList); + List groupList = groupMapper.queryByBusinessGroup(businessGroup); + platformChannelService.checkGroupRemove(channelList, groupList); } @@ -516,7 +519,7 @@ public class GbChannelServiceImpl implements IGbChannelService { return; } commonGBChannelMapper.removeParentIdByChannels(channelList); - // TODO 可能需要发送通道更新通知 + platformChannelService.checkGroupRemove(channelList, groupList); } @Override @@ -560,18 +563,21 @@ public class GbChannelServiceImpl implements IGbChannelService { } @Override + @Transactional public void addChannelToGroup(String parentId, String businessGroup, List channelIds) { List channelList = commonGBChannelMapper.queryByIds(channelIds); if (channelList.isEmpty()) { throw new ControllerException(ErrorCode.ERROR100.getCode(), "所有通道Id不存在"); } int result = commonGBChannelMapper.updateGroup(parentId, businessGroup, channelList); + for (CommonGBChannel commonGBChannel : channelList) { + commonGBChannel.setGbParentId(parentId); + commonGBChannel.setGbBusinessGroupId(businessGroup); + } + // 发送通知 if (result > 0) { - for (CommonGBChannel channel : channelList) { - channel.setGbBusinessGroupId(businessGroup); - channel.setGbParentId(parentId); - } + platformChannelService.checkGroupAdd(channelList); try { // 发送catalog eventPublisher.catalogEventPublish(null, channelList, CatalogEvent.UPDATE); @@ -587,10 +593,20 @@ public class GbChannelServiceImpl implements IGbChannelService { if (channelList.isEmpty()) { throw new ControllerException(ErrorCode.ERROR100.getCode(), "所有通道Id不存在"); } - int result = commonGBChannelMapper.removeParentIdByChannels(channelList); + commonGBChannelMapper.removeParentIdByChannels(channelList); + + Group group = groupMapper.queryOneByDeviceId(parentId, businessGroup); + if (group == null) { + platformChannelService.checkGroupRemove(channelList, null); + }else { + List groupList = new ArrayList<>(); + groupList.add(group); + platformChannelService.checkGroupRemove(channelList, groupList); + } } @Override + @Transactional public void addChannelToGroupByGbDevice(String parentId, String businessGroup, List deviceIds) { List channelList = commonGBChannelMapper.queryByGbDeviceIds(deviceIds); if (channelList.isEmpty()) { @@ -601,12 +617,14 @@ public class GbChannelServiceImpl implements IGbChannelService { channel.setGbBusinessGroupId(businessGroup); } int result = commonGBChannelMapper.updateGroup(parentId, businessGroup, channelList); + + for (CommonGBChannel commonGBChannel : channelList) { + commonGBChannel.setGbParentId(parentId); + commonGBChannel.setGbBusinessGroupId(businessGroup); + } // 发送通知 if (result > 0) { - for (CommonGBChannel channel : channelList) { - channel.setGbBusinessGroupId(businessGroup); - channel.setGbParentId(parentId); - } + platformChannelService.checkGroupAdd(channelList); try { // 发送catalog eventPublisher.catalogEventPublish(null, channelList, CatalogEvent.UPDATE); @@ -623,10 +641,12 @@ public class GbChannelServiceImpl implements IGbChannelService { throw new ControllerException(ErrorCode.ERROR100.getCode(), "所有通道Id不存在"); } commonGBChannelMapper.removeParentIdByChannels(channelList); + platformChannelService.checkGroupRemove(channelList, null); } @Override public CommonGBChannel queryOneWithPlatform(Integer platformId, String channelDeviceId) { + // 防止共享的通道编号重复 List channelList = platformChannelMapper.queryOneWithPlatform(platformId, channelDeviceId); if (!channelList.isEmpty()) { return channelList.get(channelList.size() - 1); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/InviteStreamServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/InviteStreamServiceImpl.java index b81f07a0..9b983336 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/InviteStreamServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/InviteStreamServiceImpl.java @@ -7,9 +7,8 @@ import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.gb28181.bean.Device; import com.genersoft.iot.vmp.gb28181.dao.DeviceChannelMapper; import com.genersoft.iot.vmp.gb28181.dao.DeviceMapper; -import com.genersoft.iot.vmp.media.event.media.MediaArrivalEvent; -import com.genersoft.iot.vmp.media.event.media.MediaDepartureEvent; import com.genersoft.iot.vmp.gb28181.service.IInviteStreamService; +import com.genersoft.iot.vmp.media.event.media.MediaDepartureEvent; import com.genersoft.iot.vmp.service.bean.ErrorCallback; import com.genersoft.iot.vmp.utils.redis.RedisUtil; import lombok.extern.slf4j.Slf4j; @@ -19,6 +18,7 @@ import org.springframework.data.redis.core.RedisTemplate; import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Service; +import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -44,17 +44,6 @@ public class InviteStreamServiceImpl implements IInviteStreamService { @Autowired private DeviceChannelMapper deviceChannelMapper; - /** - * 流到来的处理 - */ - @Async("taskExecutor") - @org.springframework.context.event.EventListener - public void onApplicationEvent(MediaArrivalEvent event) { -// if ("rtsp".equals(event.getSchema()) && "rtp".equals(event.getApp())) { -// -// } - } - /** * 流离开的处理 */ @@ -67,7 +56,7 @@ public class InviteStreamServiceImpl implements IInviteStreamService { removeInviteInfo(inviteInfo); Device device = deviceMapper.getDeviceByDeviceId(inviteInfo.getDeviceId()); if (device != null) { - deviceChannelMapper.stopPlay(device.getId(), inviteInfo.getChannelId()); + deviceChannelMapper.stopPlayById(inviteInfo.getChannelId()); } } } @@ -87,7 +76,7 @@ public class InviteStreamServiceImpl implements IInviteStreamService { log.warn("[更新Invite信息],参数不全: {}", JSON.toJSON(inviteInfo)); return; } - InviteInfo inviteInfoForUpdate = null; + InviteInfo inviteInfoForUpdate; if (InviteSessionStatus.ready == inviteInfo.getStatus()) { if (inviteInfo.getDeviceId() == null @@ -99,8 +88,7 @@ public class InviteStreamServiceImpl implements IInviteStreamService { } inviteInfoForUpdate = inviteInfo; } else { - InviteInfo inviteInfoInRedis = getInviteInfo(inviteInfo.getType(), inviteInfo.getDeviceId(), - inviteInfo.getChannelId(), inviteInfo.getStream()); + InviteInfo inviteInfoInRedis = getInviteInfo(inviteInfo.getType(), inviteInfo.getChannelId(), inviteInfo.getStream()); if (inviteInfoInRedis == null) { log.warn("[更新Invite信息],未从缓存中读取到Invite信息: deviceId: {}, channel: {}, stream: {}", inviteInfo.getDeviceId(), inviteInfo.getChannelId(), inviteInfo.getStream()); @@ -144,7 +132,7 @@ public class InviteStreamServiceImpl implements IInviteStreamService { @Override public InviteInfo updateInviteInfoForStream(InviteInfo inviteInfo, String stream) { - InviteInfo inviteInfoInDb = getInviteInfo(inviteInfo.getType(), inviteInfo.getDeviceId(), inviteInfo.getChannelId(), inviteInfo.getStream()); + InviteInfo inviteInfoInDb = getInviteInfo(inviteInfo.getType(), inviteInfo.getChannelId(), inviteInfo.getStream()); if (inviteInfoInDb == null) { return null; } @@ -169,10 +157,9 @@ public class InviteStreamServiceImpl implements IInviteStreamService { } @Override - public InviteInfo getInviteInfo(InviteSessionType type, String deviceId, String channelId, String stream) { + public InviteInfo getInviteInfo(InviteSessionType type, Integer channelId, String stream) { String key = VideoManagerConstants.INVITE_PREFIX + ":" + (type != null ? type : "*") + - ":" + (deviceId != null ? deviceId : "*") + ":" + (channelId != null ? channelId : "*") + ":" + (stream != null ? stream : "*") + ":*"; @@ -188,25 +175,42 @@ public class InviteStreamServiceImpl implements IInviteStreamService { } @Override - public InviteInfo getInviteInfoByDeviceAndChannel(InviteSessionType type, String deviceId, String channelId) { - return getInviteInfo(type, deviceId, channelId, null); + public List getAllInviteInfo(InviteSessionType type, Integer channelId, String stream) { + String key = VideoManagerConstants.INVITE_PREFIX + + ":" + (type != null ? type : "*") + + ":" + (channelId != null ? channelId : "*") + + ":" + (stream != null ? stream : "*") + + ":*"; + List scanResult = RedisUtil.scan(redisTemplate, key); + if (scanResult.isEmpty()) { + return new ArrayList<>(); + } + List result = new ArrayList<>(); + for (Object keyObj : scanResult) { + result.add((InviteInfo) redisTemplate.opsForValue().get(keyObj)); + } + return result; + } + + @Override + public InviteInfo getInviteInfoByDeviceAndChannel(InviteSessionType type, Integer channelId) { + return getInviteInfo(type, channelId, null); } @Override public InviteInfo getInviteInfoByStream(InviteSessionType type, String stream) { - return getInviteInfo(type, null, null, stream); + return getInviteInfo(type, null, stream); } @Override - public void removeInviteInfo(InviteSessionType type, String deviceId, String channelId, String stream) { + public void removeInviteInfo(InviteSessionType type, Integer channelId, String stream) { String scanKey = VideoManagerConstants.INVITE_PREFIX + ":" + (type != null ? type : "*") + - ":" + (deviceId != null ? deviceId : "*") + ":" + (channelId != null ? channelId : "*") + ":" + (stream != null ? stream : "*") + ":*"; List scanResult = RedisUtil.scan(redisTemplate, scanKey); - if (scanResult.size() > 0) { + if (!scanResult.isEmpty()) { for (Object keyObj : scanResult) { String key = (String) keyObj; InviteInfo inviteInfo = (InviteInfo) redisTemplate.opsForValue().get(key); @@ -214,35 +218,31 @@ public class InviteStreamServiceImpl implements IInviteStreamService { continue; } redisTemplate.delete(key); - inviteErrorCallbackMap.remove(buildKey(type, deviceId, channelId, inviteInfo.getStream())); + inviteErrorCallbackMap.remove(buildKey(type,channelId, inviteInfo.getStream())); } } } @Override - public void removeInviteInfoByDeviceAndChannel(InviteSessionType inviteSessionType, String deviceId, String channelId) { - removeInviteInfo(inviteSessionType, deviceId, channelId, null); + public void removeInviteInfoByDeviceAndChannel(InviteSessionType inviteSessionType, Integer channelId) { + removeInviteInfo(inviteSessionType, channelId, null); } @Override public void removeInviteInfo(InviteInfo inviteInfo) { - removeInviteInfo(inviteInfo.getType(), inviteInfo.getDeviceId(), inviteInfo.getChannelId(), inviteInfo.getStream()); + removeInviteInfo(inviteInfo.getType(), inviteInfo.getChannelId(), inviteInfo.getStream()); } @Override - public void once(InviteSessionType type, String deviceId, String channelId, String stream, ErrorCallback callback) { - String key = buildKey(type, deviceId, channelId, stream); - List> callbacks = inviteErrorCallbackMap.get(key); - if (callbacks == null) { - callbacks = new CopyOnWriteArrayList<>(); - inviteErrorCallbackMap.put(key, callbacks); - } + public void once(InviteSessionType type, Integer channelId, String stream, ErrorCallback callback) { + String key = buildKey(type, channelId, stream); + List> callbacks = inviteErrorCallbackMap.computeIfAbsent(key, k -> new CopyOnWriteArrayList<>()); callbacks.add(callback); } - private String buildKey(InviteSessionType type, String deviceId, String channelId, String stream) { - String key = type + ":" + deviceId + ":" + channelId; + private String buildKey(InviteSessionType type, Integer channelId, String stream) { + String key = type + ":" + channelId; // 如果ssrc未null那么可以实现一个通道只能一次操作,ssrc不为null则可以支持一个通道多次invite if (stream != null) { key += (":" + stream); @@ -253,7 +253,12 @@ public class InviteStreamServiceImpl implements IInviteStreamService { @Override public void clearInviteInfo(String deviceId) { - removeInviteInfo(null, deviceId, null, null); + List inviteInfoList = getAllInviteInfo(null, null, null); + for (InviteInfo inviteInfo : inviteInfoList) { + if (inviteInfo.getDeviceId().equals(deviceId)) { + removeInviteInfo(inviteInfo); + } + } } @Override @@ -261,7 +266,7 @@ public class InviteStreamServiceImpl implements IInviteStreamService { int count = 0; String key = VideoManagerConstants.INVITE_PREFIX + ":*:*:*:*:*"; List scanResult = RedisUtil.scan(redisTemplate, key); - if (scanResult.size() == 0) { + if (scanResult.isEmpty()) { return 0; }else { for (Object keyObj : scanResult) { @@ -282,8 +287,8 @@ public class InviteStreamServiceImpl implements IInviteStreamService { } @Override - public void call(InviteSessionType type, String deviceId, String channelId, String stream, int code, String msg, StreamInfo data) { - String key = buildSubStreamKey(type, deviceId, channelId, stream); + public void call(InviteSessionType type, Integer channelId, String stream, int code, String msg, StreamInfo data) { + String key = buildSubStreamKey(type, channelId, stream); List> callbacks = inviteErrorCallbackMap.get(key); if (callbacks == null) { return; @@ -295,8 +300,8 @@ public class InviteStreamServiceImpl implements IInviteStreamService { } - private String buildSubStreamKey(InviteSessionType type, String deviceId, String channelId, String stream) { - String key = type + ":" + ":" + deviceId + ":" + channelId; + private String buildSubStreamKey(InviteSessionType type, Integer channelId, String stream) { + String key = type + ":" + channelId; // 如果ssrc为null那么可以实现一个通道只能一次操作,ssrc不为null则可以支持一个通道多次invite if (stream != null) { key += (":" + stream); @@ -317,7 +322,7 @@ public class InviteStreamServiceImpl implements IInviteStreamService { @Override public InviteInfo updateInviteInfoForSSRC(InviteInfo inviteInfo, String ssrc) { - InviteInfo inviteInfoInDb = getInviteInfo(inviteInfo.getType(), inviteInfo.getDeviceId(), inviteInfo.getChannelId(), inviteInfo.getStream()); + InviteInfo inviteInfoInDb = getInviteInfo(inviteInfo.getType(), inviteInfo.getChannelId(), inviteInfo.getStream()); if (inviteInfoInDb == null) { return null; } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlatformChannelServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlatformChannelServiceImpl.java index eb09d7b0..20d8b73a 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlatformChannelServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlatformChannelServiceImpl.java @@ -63,7 +63,8 @@ public class PlatformChannelServiceImpl implements IPlatformChannelService { /** * 获取通道使用的分组中未分享的 */ - private Set getGroupNotShareByChannelList(List channelList, Integer platformId) { + @Transactional + public Set getGroupNotShareByChannelList(List channelList, Integer platformId) { // 获取分组中未分享的节点 Set groupList = groupMapper.queryNotShareGroupForPlatformByChannelList(channelList, platformId); // 获取这些节点的所有父节点 @@ -92,16 +93,19 @@ public class PlatformChannelServiceImpl implements IPlatformChannelService { return regionMapper.queryNotShareRegionForPlatformByRegionList(allRegion, platformId); } + + /** * 移除空的共享,并返回移除的分组 */ - private Set deleteEmptyGroup(Set groupSet, Integer platformId) { + @Transactional + public Set deleteEmptyGroup(Set groupSet, Integer platformId) { Iterator iterator = groupSet.iterator(); while (iterator.hasNext()) { Group group = iterator.next(); // groupSet 为当前通道直接使用的分组,如果已经没有子分组与其他的通道,则可以移除 // 获取分组子节点 - Set children = platformChannelMapper.queryShareChildrenGroup(group.getDeviceId(), platformId); + Set children = platformChannelMapper.queryShareChildrenGroup(group.getId(), platformId); if (!children.isEmpty()) { iterator.remove(); continue; @@ -427,4 +431,85 @@ public class PlatformChannelServiceImpl implements IPlatformChannelService { channel.getGbName(), channel.getGbDeviceDbId(), e); } } + + @Override + @Transactional + public void checkGroupRemove(List channelList, List groupList) { + + List channelIds = new ArrayList<>(); + channelList.stream().forEach(commonGBChannel -> { + channelIds.add(commonGBChannel.getGbId()); + }); + // 获取关联这些通道的平台 + List platformList = platformChannelMapper.queryPlatFormListByChannelList(channelIds); + if (platformList.isEmpty()) { + return; + } + for (Platform platform : platformList) { + Set groupSet; + if (groupList == null || groupList.isEmpty()) { + groupSet = platformChannelMapper.queryShareGroup(platform.getId()); + }else { + groupSet = new HashSet<>(groupList); + } + // 清理空的分组并发送消息 + Set deleteGroup = deleteEmptyGroup(groupSet, platform.getId()); + + List channelListForEvent = new ArrayList<>(); + if (!deleteGroup.isEmpty()) { + for (Group group : deleteGroup) { + channelListForEvent.add(0, CommonGBChannel.build(group)); + } + } + // 发送消息 + try { + // 发送catalog + eventPublisher.catalogEventPublish(platform.getId(), channelListForEvent, CatalogEvent.DEL); + } catch (Exception e) { + log.warn("[移除关联通道] 发送失败,数量:{}", channelList.size(), e); + } + } + } + + @Override + @Transactional + public void checkGroupAdd(List channelList) { + List channelIds = new ArrayList<>(); + channelList.stream().forEach(commonGBChannel -> { + channelIds.add(commonGBChannel.getGbId()); + }); + List platformList = platformChannelMapper.queryPlatFormListByChannelList(channelIds); + if (platformList.isEmpty()) { + return; + } + for (Platform platform : platformList) { + + Set addGroup = getGroupNotShareByChannelList(channelList, platform.getId()); + + List channelListForEvent = new ArrayList<>(); + if (!addGroup.isEmpty()) { + for (Group group : addGroup) { + channelListForEvent.add(0, CommonGBChannel.build(group)); + } + platformChannelMapper.addPlatformGroup(addGroup, platform.getId()); + // 发送消息 + try { + // 发送catalog + eventPublisher.catalogEventPublish(platform.getId(), channelListForEvent, CatalogEvent.ADD); + } catch (Exception e) { + log.warn("[移除关联通道] 发送失败,数量:{}", channelList.size(), e); + } + } + } + } + + @Override + public List queryPlatFormListByChannelDeviceId(Integer channelId, List platforms) { + return platformChannelMapper.queryPlatFormListForGBWithGBId(channelId, platforms); + } + + @Override + public CommonGBChannel queryChannelByPlatformIdAndChannelId(Integer platformId, Integer channelId) { + return platformChannelMapper.queryShareChannel(platformId, channelId); + } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlatformServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlatformServiceImpl.java index 02ed3e34..1a4fd841 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlatformServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlatformServiceImpl.java @@ -466,9 +466,13 @@ public class PlatformServiceImpl implements IPlatformService { if (gpsMsgInfo.getLng() == 0 && gpsMsgInfo.getLat() == 0) { continue; } + CommonGBChannel commonGBChannel = platformChannelMapper.queryShareChannel(platform.getId(), deviceChannel.getId()); + if (commonGBChannel == null) { + continue; + } // 发送GPS消息 try { - commanderForPlatform.sendNotifyMobilePosition(platform, gpsMsgInfo, subscribe); + commanderForPlatform.sendNotifyMobilePosition(platform, gpsMsgInfo, commonGBChannel, subscribe); } catch (InvalidArgumentException | ParseException | NoSuchFieldException | SipException | IllegalAccessException e) { log.error("[命令发送失败] 国标级联 移动位置通知: {}", e.getMessage()); @@ -479,14 +483,14 @@ public class PlatformServiceImpl implements IPlatformService { } @Override - public void broadcastInvite(Platform platform, String channelId, MediaServer mediaServerItem, HookSubscribe.Event hookEvent, + public void broadcastInvite(Platform platform, CommonGBChannel channel, MediaServer mediaServerItem, HookSubscribe.Event hookEvent, SipSubscribe.Event errorEvent, InviteTimeOutCallback timeoutCallback) throws InvalidArgumentException, ParseException, SipException { if (mediaServerItem == null) { log.info("[国标级联] 语音喊话未找到可用的zlm. platform: {}", platform.getServerGBId()); return; } - InviteInfo inviteInfoForOld = inviteStreamService.getInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, platform.getServerGBId(), channelId); + InviteInfo inviteInfoForOld = inviteStreamService.getInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, platform.getServerGBId(), channel.getGbDeviceId()); if (inviteInfoForOld != null && inviteInfoForOld.getStreamInfo() != null) { // 如果zlm不存在这个流,则删除数据即可 @@ -510,7 +514,7 @@ public class PlatformServiceImpl implements IPlatformService { String streamId = null; if (mediaServerItem.isRtpEnable()) { - streamId = String.format("%s_%s", platform.getServerGBId(), channelId); + streamId = String.format("%s_%s", platform.getServerGBId(), channel.getGbDeviceId()); } // 默认不进行SSRC校验, TODO 后续可改为配置 boolean ssrcCheck = false; @@ -524,7 +528,7 @@ public class PlatformServiceImpl implements IPlatformService { } SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, streamId, null, ssrcCheck, false, null, true, false, false, tcpMode); if (ssrcInfo == null || ssrcInfo.getPort() < 0) { - log.info("[国标级联] 发起语音喊话 开启端口监听失败, platform: {}, channel: {}", platform.getServerGBId(), channelId); + log.info("[国标级联] 发起语音喊话 开启端口监听失败, platform: {}, channel: {}", platform.getServerGBId(), channel.getGbDeviceId()); SipSubscribe.EventResult eventResult = new SipSubscribe.EventResult<>(); eventResult.statusCode = -1; eventResult.msg = "端口监听失败"; @@ -533,45 +537,45 @@ public class PlatformServiceImpl implements IPlatformService { return; } log.info("[国标级联] 语音喊话,发起Invite消息 deviceId: {}, channelId: {},收流端口: {}, 收流模式:{}, SSRC: {}, SSRC校验:{}", - platform.getServerGBId(), channelId, ssrcInfo.getPort(), userSetting.getBroadcastForPlatform(), ssrcInfo.getSsrc(), ssrcCheck); + platform.getServerGBId(), channel.getGbDeviceId(), ssrcInfo.getPort(), userSetting.getBroadcastForPlatform(), ssrcInfo.getSsrc(), ssrcCheck); // 初始化redis中的invite消息状态 - InviteInfo inviteInfo = InviteInfo.getInviteInfo(platform.getServerGBId(), channelId, ssrcInfo.getStream(), ssrcInfo, + InviteInfo inviteInfo = InviteInfo.getInviteInfo(platform.getServerGBId(), channel.getGbId(), ssrcInfo.getStream(), ssrcInfo, mediaServerItem.getSdpIp(), ssrcInfo.getPort(), userSetting.getBroadcastForPlatform(), InviteSessionType.BROADCAST, InviteSessionStatus.ready); inviteStreamService.updateInviteInfo(inviteInfo); String timeOutTaskKey = UUID.randomUUID().toString(); dynamicTask.startDelay(timeOutTaskKey, () -> { // 执行超时任务时查询是否已经成功,成功了则不执行超时任务,防止超时任务取消失败的情况 - InviteInfo inviteInfoForBroadcast = inviteStreamService.getInviteInfo(InviteSessionType.BROADCAST, platform.getServerGBId(), channelId, null); + InviteInfo inviteInfoForBroadcast = inviteStreamService.getInviteInfo(InviteSessionType.BROADCAST, platform.getServerGBId(), channel.getGbDeviceId(), null); if (inviteInfoForBroadcast == null) { - log.info("[国标级联] 发起语音喊话 收流超时 deviceId: {}, channelId: {},端口:{}, SSRC: {}", platform.getServerGBId(), channelId, ssrcInfo.getPort(), ssrcInfo.getSsrc()); + log.info("[国标级联] 发起语音喊话 收流超时 deviceId: {}, channelId: {},端口:{}, SSRC: {}", platform.getServerGBId(), channel.getGbDeviceId(), ssrcInfo.getPort(), ssrcInfo.getSsrc()); // 点播超时回复BYE 同时释放ssrc以及此次点播的资源 try { - commanderForPlatform.streamByeCmd(platform, channelId, ssrcInfo.getStream(), null, null); + commanderForPlatform.streamByeCmd(platform, channel.getGbDeviceId(), ssrcInfo.getStream(), null, null); } catch (InvalidArgumentException | ParseException | SipException | SsrcTransactionNotFoundException e) { log.error("[点播超时], 发送BYE失败 {}", e.getMessage()); } finally { timeoutCallback.run(1, "收流超时"); mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc()); mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream()); - streamSession.remove(platform.getServerGBId(), channelId, ssrcInfo.getStream()); + streamSession.remove(platform.getServerGBId(), channel.getGbDeviceId(), ssrcInfo.getStream()); mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream()); } } }, userSetting.getPlayTimeout()); - commanderForPlatform.broadcastInviteCmd(platform, channelId, mediaServerItem, ssrcInfo, (hookData)->{ - log.info("[国标级联] 发起语音喊话 收到上级推流 deviceId: {}, channelId: {}", platform.getServerGBId(), channelId); + commanderForPlatform.broadcastInviteCmd(platform, channel.getGbDeviceId(), mediaServerItem, ssrcInfo, (hookData)->{ + log.info("[国标级联] 发起语音喊话 收到上级推流 deviceId: {}, channelId: {}", platform.getServerGBId(), channel.getGbDeviceId()); dynamicTask.stop(timeOutTaskKey); // hook响应 - onPublishHandlerForBroadcast(hookData.getMediaServer(), hookData.getMediaInfo(), platform.getServerGBId(), channelId); + onPublishHandlerForBroadcast(hookData.getMediaServer(), hookData.getMediaInfo(), platform.getServerGBId(), channel.getGbDeviceId()); // 收到流 if (hookEvent != null) { hookEvent.response(hookData); } }, event -> { - inviteOKHandler(event, ssrcInfo, tcpMode, ssrcCheck, mediaServerItem, platform, channelId, timeOutTaskKey, + inviteOKHandler(event, ssrcInfo, tcpMode, ssrcCheck, mediaServerItem, platform, channel, timeOutTaskKey, null, inviteInfo, InviteSessionType.BROADCAST); // // 收到200OK 检测ssrc是否有变化,防止上级自定义了ssrc // ResponseEvent responseEvent = (ResponseEvent) event.event; @@ -633,7 +637,7 @@ public class PlatformServiceImpl implements IPlatformService { } private void inviteOKHandler(SipSubscribe.EventResult eventResult, SSRCInfo ssrcInfo, int tcpMode, boolean ssrcCheck, MediaServer mediaServerItem, - Platform platform, String channelId, String timeOutTaskKey, ErrorCallback callback, + Platform platform, CommonGBChannel channel, String timeOutTaskKey, ErrorCallback callback, InviteInfo inviteInfo, InviteSessionType inviteSessionType){ inviteInfo.setStatus(InviteSessionStatus.ok); ResponseEvent responseEvent = (ResponseEvent) eventResult.event; @@ -648,7 +652,7 @@ public class PlatformServiceImpl implements IPlatformService { if (mediaServerItem.isRtpEnable()) { // 多端口 if (tcpMode == 2) { - tcpActiveHandler(platform, channelId, contentString, mediaServerItem, tcpMode, ssrcCheck, + tcpActiveHandler(platform, channel.getGbDeviceId(), contentString, mediaServerItem, tcpMode, ssrcCheck, timeOutTaskKey, ssrcInfo, callback); } }else { @@ -671,8 +675,8 @@ public class PlatformServiceImpl implements IPlatformService { Boolean result = mediaServerService.updateRtpServerSSRC(mediaServerItem, ssrcInfo.getStream(), ssrcInResponse); if (!result) { try { - log.warn("[Invite 200OK] 更新ssrc失败,停止喊话 {}/{}", platform.getServerGBId(), channelId); - commanderForPlatform.streamByeCmd(platform, channelId, ssrcInfo.getStream(), null, null); + log.warn("[Invite 200OK] 更新ssrc失败,停止喊话 {}/{}", platform.getServerGBId(), channel.getGbDeviceId()); + commanderForPlatform.streamByeCmd(platform, channel.getGbDeviceId(), ssrcInfo.getStream(), null, null); } catch (InvalidArgumentException | SipException | ParseException | SsrcTransactionNotFoundException e) { log.error("[命令发送失败] 停止播放, 发送BYE: {}", e.getMessage()); } @@ -681,11 +685,11 @@ public class PlatformServiceImpl implements IPlatformService { // 释放ssrc mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc()); - streamSession.remove(platform.getServerGBId(), channelId, ssrcInfo.getStream()); + streamSession.remove(platform.getServerGBId(), channel.getGbDeviceId(), ssrcInfo.getStream()); callback.run(InviteErrorCode.ERROR_FOR_RESET_SSRC.getCode(), "下级自定义了ssrc,重新设置收流信息失败", null); - inviteStreamService.call(inviteSessionType, platform.getServerGBId(), channelId, null, + inviteStreamService.call(inviteSessionType, platform.getServerGBId(), channel.getGbDeviceId(), null, InviteErrorCode.ERROR_FOR_RESET_SSRC.getCode(), "下级自定义了ssrc,重新设置收流信息失败", null); @@ -695,7 +699,7 @@ public class PlatformServiceImpl implements IPlatformService { inviteInfo.setStream(ssrcInfo.getStream()); if (tcpMode == 2) { if (mediaServerItem.isRtpEnable()) { - tcpActiveHandler(platform, channelId, contentString, mediaServerItem, tcpMode, ssrcCheck, + tcpActiveHandler(platform, channel.getGbDeviceId(), contentString, mediaServerItem, tcpMode, ssrcCheck, timeOutTaskKey, ssrcInfo, callback); }else { log.warn("[Invite 200OK] 单端口收流模式不支持tcp主动模式收流"); @@ -709,7 +713,7 @@ public class PlatformServiceImpl implements IPlatformService { inviteInfo.setStream(ssrcInfo.getStream()); if (tcpMode == 2) { if (mediaServerItem.isRtpEnable()) { - tcpActiveHandler(platform, channelId, contentString, mediaServerItem, tcpMode, ssrcCheck, + tcpActiveHandler(platform, channel.getGbDeviceId(), contentString, mediaServerItem, tcpMode, ssrcCheck, timeOutTaskKey, ssrcInfo, callback); }else { log.warn("[Invite 200OK] 单端口收流模式不支持tcp主动模式收流"); @@ -721,13 +725,20 @@ public class PlatformServiceImpl implements IPlatformService { if (ssrcInResponse != null) { // 单端口 // 重新订阅流上线 - SsrcTransaction ssrcTransaction = streamSession.getSsrcTransaction(inviteInfo.getDeviceId(), - inviteInfo.getChannelId(), null, inviteInfo.getStream()); + SsrcTransaction ssrcTransaction = streamSession.getSsrcTransaction(inviteInfo.getChannelId(), null, inviteInfo.getStream()); streamSession.remove(inviteInfo.getDeviceId(), inviteInfo.getChannelId(), inviteInfo.getStream()); inviteStreamService.updateInviteInfoForSSRC(inviteInfo, ssrcInResponse); - streamSession.put(platform.getServerGBId(), channelId, ssrcTransaction.getCallId(), - inviteInfo.getStream(), ssrcInResponse, mediaServerItem.getId(), (SIPResponse) responseEvent.getResponse(), inviteSessionType); + + ssrcTransaction.setPlatformId(platform.getServerGBId()); + ssrcTransaction.setChannelId(channel.getGbId()); + ssrcTransaction.setStream(inviteInfo.getStream()); + ssrcTransaction.setSsrc(ssrcInResponse); + ssrcTransaction.setMediaServerId(mediaServerItem.getId()); + ssrcTransaction.setSipTransactionInfo(new SipTransactionInfo((SIPResponse) responseEvent.getResponse())); + ssrcTransaction.setType(inviteSessionType); + + streamSession.put(ssrcTransaction); } } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlayServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlayServiceImpl.java index c91e6cc7..b1d53f3b 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlayServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlayServiceImpl.java @@ -261,7 +261,7 @@ public class PlayServiceImpl implements IPlayService { int tcpMode = device.getStreamMode().equals("TCP-ACTIVE")? 2: (device.getStreamMode().equals("TCP-PASSIVE")? 1:0); SSRCInfo ssrcInfo = mediaServerService.openRTPServer(event.getMediaServer(), event.getStream(), null, device.isSsrcCheck(), true, 0, false, !deviceChannel.isHasAudio(), false, tcpMode); - playBack(event.getMediaServer(), ssrcInfo, deviceId, channelId, startTime, endTime, null); + playBack(event.getMediaServer(), ssrcInfo, deviceId, deviceChannel, startTime, endTime, null); } } @@ -494,7 +494,7 @@ public class PlayServiceImpl implements IPlayService { } // 初始化redis中的invite消息状态 - InviteInfo inviteInfo = InviteInfo.getInviteInfo(device.getDeviceId(), channel.getDeviceId(), ssrcInfo.getStream(), ssrcInfo, + InviteInfo inviteInfo = InviteInfo.getInviteInfo(device.getDeviceId(), channel.getId(), ssrcInfo.getStream(), ssrcInfo, mediaServerItem.getSdpIp(), ssrcInfo.getPort(), device.getStreamMode(), InviteSessionType.PLAY, InviteSessionStatus.ready); inviteStreamService.updateInviteInfo(inviteInfo); @@ -780,12 +780,11 @@ public class PlayServiceImpl implements IPlayService { String stream = deviceId + "_" + channelId + "_" + startTimeStr + "_" + endTimeTimeStr; int tcpMode = device.getStreamMode().equals("TCP-ACTIVE")? 2: (device.getStreamMode().equals("TCP-PASSIVE")? 1:0); SSRCInfo ssrcInfo = mediaServerService.openRTPServer(newMediaServerItem, stream, null, device.isSsrcCheck(), true, 0, false, !channel.isHasAudio(), false, tcpMode); - playBack(newMediaServerItem, ssrcInfo, deviceId, channelId, startTime, endTime, callback); + playBack(newMediaServerItem, ssrcInfo, deviceId, channel, startTime, endTime, callback); } - @Override public void playBack(MediaServer mediaServerItem, SSRCInfo ssrcInfo, - String deviceId, String channelId, String startTime, + String deviceId, DeviceChannel channel, String startTime, String endTime, ErrorCallback callback) { if (mediaServerItem == null || ssrcInfo == null) { callback.run(InviteErrorCode.ERROR_FOR_PARAMETER_ERROR.getCode(), @@ -799,28 +798,28 @@ public class PlayServiceImpl implements IPlayService { throw new ControllerException(ErrorCode.ERROR100.getCode(), "设备: " + deviceId + "不存在"); } log.info("[录像回放] deviceId: {}, channelId: {}, 开始时间: {}, 结束时间: {}, 收流端口:{}, 收流模式:{}, SSRC: {}, SSRC校验:{}", - device.getDeviceId(), channelId, startTime, endTime, ssrcInfo.getPort(), device.getStreamMode(), + device.getDeviceId(), channel.getGbDeviceId(), startTime, endTime, ssrcInfo.getPort(), device.getStreamMode(), ssrcInfo.getSsrc(), device.isSsrcCheck()); // 初始化redis中的invite消息状态 - InviteInfo inviteInfo = InviteInfo.getInviteInfo(device.getDeviceId(), channelId, ssrcInfo.getStream(), ssrcInfo, + InviteInfo inviteInfo = InviteInfo.getInviteInfo(device.getDeviceId(), channel.getId(), ssrcInfo.getStream(), ssrcInfo, mediaServerItem.getSdpIp(), ssrcInfo.getPort(), device.getStreamMode(), InviteSessionType.PLAYBACK, InviteSessionStatus.ready); inviteStreamService.updateInviteInfo(inviteInfo); String playBackTimeOutTaskKey = UUID.randomUUID().toString(); dynamicTask.startDelay(playBackTimeOutTaskKey, () -> { - log.warn("[录像回放] 超时,deviceId:{} ,channelId:{}", deviceId, channelId); + log.warn("[录像回放] 超时,deviceId:{} ,channelId:{}", deviceId, channel.getGbDeviceId()); inviteStreamService.removeInviteInfo(inviteInfo); callback.run(InviteErrorCode.ERROR_FOR_SIGNALLING_TIMEOUT.getCode(), InviteErrorCode.ERROR_FOR_SIGNALLING_TIMEOUT.getMsg(), null); try { - cmder.streamByeCmd(device, channelId, ssrcInfo.getStream(), null); + cmder.streamByeCmd(device, channel.getGbDeviceId(), ssrcInfo.getStream(), null); } catch (InvalidArgumentException | ParseException | SipException e) { log.error("[录像回放] 超时 发送BYE失败 {}", e.getMessage()); } catch (SsrcTransactionNotFoundException e) { // 点播超时回复BYE 同时释放ssrc以及此次点播的资源 mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc()); mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream()); - streamSession.remove(deviceId, channelId, ssrcInfo.getStream()); + streamSession.remove(deviceId, channel.getGbDeviceId(), ssrcInfo.getStream()); } }, userSetting.getPlayTimeout()); @@ -831,14 +830,14 @@ public class PlayServiceImpl implements IPlayService { String.format("回放失败, 错误码: %s, %s", event.statusCode, event.msg), null); mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc()); mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream()); - streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream()); + streamSession.remove(device.getDeviceId(), channel.getGbDeviceId(), ssrcInfo.getStream()); inviteStreamService.removeInviteInfo(inviteInfo); }; HookSubscribe.Event hookEvent = (hookData) -> { log.info("收到回放订阅消息: " + hookData); dynamicTask.stop(playBackTimeOutTaskKey); - StreamInfo streamInfo = onPublishHandlerForPlayback(hookData.getMediaServer(), hookData.getMediaInfo(), deviceId, channelId, startTime, endTime); + StreamInfo streamInfo = onPublishHandlerForPlayback(hookData.getMediaServer(), hookData.getMediaInfo(), deviceId, channel.getGbDeviceId(), startTime, endTime); if (streamInfo == null) { log.warn("设备回放API调用失败!"); callback.run(InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getCode(), @@ -846,14 +845,14 @@ public class PlayServiceImpl implements IPlayService { return; } callback.run(InviteErrorCode.SUCCESS.getCode(), InviteErrorCode.SUCCESS.getMsg(), streamInfo); - log.info("[录像回放] 成功 deviceId: {}, channelId: {}, 开始时间: {}, 结束时间: {}", device.getDeviceId(), channelId, startTime, endTime); + log.info("[录像回放] 成功 deviceId: {}, channelId: {}, 开始时间: {}, 结束时间: {}", device.getDeviceId(), channel.getGbDeviceId(), startTime, endTime); }; try { - cmder.playbackStreamCmd(mediaServerItem, ssrcInfo, device, channelId, startTime, endTime, + cmder.playbackStreamCmd(mediaServerItem, ssrcInfo, device, channel.getGbDeviceId(), startTime, endTime, hookEvent, eventResult -> { // 处理收到200ok后的TCP主动连接以及SSRC不一致的问题 - InviteOKHandler(eventResult, ssrcInfo, mediaServerItem, device, channelId, + InviteOKHandler(eventResult, ssrcInfo, mediaServerItem, device, channel.getGbDeviceId(), playBackTimeOutTaskKey, callback, inviteInfo, InviteSessionType.PLAYBACK); }, errorEvent); } catch (InvalidArgumentException | SipException | ParseException e) { diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/session/VideoStreamSessionManager.java b/src/main/java/com/genersoft/iot/vmp/gb28181/session/VideoStreamSessionManager.java index cb2caeca..86f4bf41 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/session/VideoStreamSessionManager.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/session/VideoStreamSessionManager.java @@ -53,21 +53,21 @@ public class VideoStreamSessionManager { + ":" + deviceId + ":" + channelId + ":" + callId + ":" + stream, ssrcTransaction); } - public SsrcTransaction getSsrcTransaction(String deviceId, String channelId, String callId, String stream){ + public void put(SsrcTransaction ssrcTransaction){ + redisTemplate.opsForValue().set(VideoManagerConstants.MEDIA_TRANSACTION_USED_PREFIX + userSetting.getServerId() + + ":" + ssrcTransaction.getChannelId() + ":" + ssrcTransaction.getCallId() + ":" + ssrcTransaction.getStream(), ssrcTransaction); + } - if (ObjectUtils.isEmpty(deviceId)) { - deviceId ="*"; - } - if (ObjectUtils.isEmpty(channelId)) { - channelId ="*"; - } + public SsrcTransaction getSsrcTransaction(Integer channelId, String callId, String stream){ + + String chanelStr = channelId==null?"*":channelId.toString(); if (ObjectUtils.isEmpty(callId)) { callId ="*"; } if (ObjectUtils.isEmpty(stream)) { stream ="*"; } - String key = VideoManagerConstants.MEDIA_TRANSACTION_USED_PREFIX + userSetting.getServerId() + ":" + deviceId + ":" + channelId + ":" + callId+ ":" + stream; + String key = VideoManagerConstants.MEDIA_TRANSACTION_USED_PREFIX + userSetting.getServerId() + ":" + chanelStr + ":" + callId+ ":" + stream; List scanResult = RedisUtil.scan(redisTemplate, key); if (scanResult.size() == 0) { return null; @@ -80,12 +80,12 @@ public class VideoStreamSessionManager { if (ObjectUtils.isEmpty(callId)) { return null; } - String key = VideoManagerConstants.MEDIA_TRANSACTION_USED_PREFIX + userSetting.getServerId() + ":*:*:" + callId+ ":*"; + String key = VideoManagerConstants.MEDIA_TRANSACTION_USED_PREFIX + userSetting.getServerId() + ":*:" + callId+ ":*"; List scanResult = RedisUtil.scan(redisTemplate, key); if (!scanResult.isEmpty()) { return (SsrcTransaction)redisTemplate.opsForValue().get(scanResult.get(0)); }else { - key = VideoManagerConstants.MEDIA_TRANSACTION_USED_PREFIX + userSetting.getServerId() + ":*:*:play:*"; + key = VideoManagerConstants.MEDIA_TRANSACTION_USED_PREFIX + userSetting.getServerId() + ":*:play:*"; scanResult = RedisUtil.scan(redisTemplate, key); if (scanResult.isEmpty()) { return null; @@ -102,20 +102,15 @@ public class VideoStreamSessionManager { } - public List getSsrcTransactionForAll(String deviceId, String channelId, String callId, String stream){ - if (ObjectUtils.isEmpty(deviceId)) { - deviceId ="*"; - } - if (ObjectUtils.isEmpty(channelId)) { - channelId ="*"; - } + public List getSsrcTransactionForAll(Integer channelId, String callId, String stream){ + String chanelStr = channelId==null?"*":channelId.toString(); if (ObjectUtils.isEmpty(callId)) { callId ="*"; } if (ObjectUtils.isEmpty(stream)) { stream ="*"; } - String key = VideoManagerConstants.MEDIA_TRANSACTION_USED_PREFIX + userSetting.getServerId() + ":" + deviceId + ":" + channelId + ":" + callId+ ":" + stream; + String key = VideoManagerConstants.MEDIA_TRANSACTION_USED_PREFIX + userSetting.getServerId() + ":" + chanelStr + ":" + callId+ ":" + stream; List scanResult = RedisUtil.scan(redisTemplate, key); if (scanResult.size() == 0) { return null; @@ -127,24 +122,24 @@ public class VideoStreamSessionManager { return result; } - public String getMediaServerId(String deviceId, String channelId, String stream){ - SsrcTransaction ssrcTransaction = getSsrcTransaction(deviceId, channelId, null, stream); + public String getMediaServerId(Integer channelId, String stream){ + SsrcTransaction ssrcTransaction = getSsrcTransaction( channelId, null, stream); if (ssrcTransaction == null) { return null; } return ssrcTransaction.getMediaServerId(); } - public String getSSRC(String deviceId, String channelId, String stream){ - SsrcTransaction ssrcTransaction = getSsrcTransaction(deviceId, channelId, null, stream); + public String getSSRC(Integer channelId, String stream){ + SsrcTransaction ssrcTransaction = getSsrcTransaction(channelId, null, stream); if (ssrcTransaction == null) { return null; } return ssrcTransaction.getSsrc(); } - public void remove(String deviceId, String channelId, String stream) { - List ssrcTransactionList = getSsrcTransactionForAll(deviceId, channelId, null, stream); + public void remove(String deviceId, Integer channelId, String stream) { + List ssrcTransactionList = getSsrcTransactionForAll(channelId, null, stream); if (ssrcTransactionList == null || ssrcTransactionList.isEmpty()) { return; } @@ -154,8 +149,8 @@ public class VideoStreamSessionManager { } } - public void removeByCallId(String deviceId, String channelId, String callId) { - SsrcTransaction ssrcTransaction = getSsrcTransaction(deviceId, channelId, callId, null); + public void removeByCallId(String deviceId, Integer channelId, String callId) { + SsrcTransaction ssrcTransaction = getSsrcTransaction(channelId, callId, null); if (ssrcTransaction == null ) { return; } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommanderForPlatform.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommanderForPlatform.java index 393d0aac..7eefe8eb 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommanderForPlatform.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommanderForPlatform.java @@ -92,7 +92,7 @@ public interface ISIPCommanderForPlatform { * @param subscribeInfo 订阅相关的信息 * @return */ - void sendNotifyMobilePosition(Platform parentPlatform, GPSMsgInfo gpsMsgInfo, SubscribeInfo subscribeInfo) + void sendNotifyMobilePosition(Platform parentPlatform, GPSMsgInfo gpsMsgInfo, CommonGBChannel channel, SubscribeInfo subscribeInfo) throws InvalidArgumentException, ParseException, NoSuchFieldException, SipException, IllegalAccessException; /** diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java index fe88997e..01a776ad 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java @@ -357,11 +357,11 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform { } @Override - public void sendNotifyMobilePosition(Platform parentPlatform, GPSMsgInfo gpsMsgInfo, SubscribeInfo subscribeInfo) throws InvalidArgumentException, ParseException, NoSuchFieldException, SipException, IllegalAccessException { + public void sendNotifyMobilePosition(Platform parentPlatform, GPSMsgInfo gpsMsgInfo, CommonGBChannel channel, SubscribeInfo subscribeInfo) throws InvalidArgumentException, ParseException, NoSuchFieldException, SipException, IllegalAccessException { if (parentPlatform == null) { return; } - log.info("[发送 移动位置订阅] {}/{}->{},{}", parentPlatform.getServerGBId(), gpsMsgInfo.getId(), gpsMsgInfo.getLng(), gpsMsgInfo.getLat()); + log.info("[发送 移动位置订阅] {}/{}->{},{}", parentPlatform.getServerGBId(), gpsMsgInfo.getChannelId(), gpsMsgInfo.getLng(), gpsMsgInfo.getLat()); String characterSet = parentPlatform.getCharacterSet(); StringBuffer deviceStatusXml = new StringBuffer(600); @@ -369,7 +369,7 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform { .append("\r\n") .append("MobilePosition\r\n") .append("" + (int)((Math.random()*9+1)*100000) + "\r\n") - .append("" + gpsMsgInfo.getId() + "\r\n") + .append("" + channel.getGbDeviceId() + "\r\n") .append("\r\n") .append("" + gpsMsgInfo.getLng() + "\r\n") .append("" + gpsMsgInfo.getLat() + "\r\n") @@ -436,10 +436,10 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform { Integer finalIndex = index; String catalogXmlContent = getCatalogXmlContentForCatalogAddOrUpdate(parentPlatform, channels, deviceChannels.size(), type, subscribeInfo); - System.out.println(catalogXmlContent); log.info("[发送NOTIFY通知]类型: {},发送数量: {}", type, channels.size()); sendNotify(parentPlatform, catalogXmlContent, subscribeInfo, eventResult -> { log.error("发送NOTIFY通知消息失败。错误:{} {}", eventResult.statusCode, eventResult.msg); + log.error(catalogXmlContent); }, (eventResult -> { try { sendNotifyForCatalogAddOrUpdate(type, parentPlatform, deviceChannels, subscribeInfo, diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java index a57bca1a..948bcbc9 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java @@ -183,7 +183,7 @@ public class ByeRequestProcessor extends SIPRequestProcessorParent implements In if (ssrcTransaction == null) { return; } - log.info("[收到bye] 来自设备:{}, 通道: {}, 类型: {}", ssrcTransaction.getDeviceId(), ssrcTransaction.getChannelId(), ssrcTransaction.getType()); + log.info("[收到bye] 来自:{}, 通道: {}, 类型: {}", ssrcTransaction.getDeviceId(), ssrcTransaction.getChannelId(), ssrcTransaction.getType()); Platform platform = platformService.queryPlatformByServerGBId(ssrcTransaction.getDeviceId()); if (platform != null ) { @@ -240,8 +240,6 @@ public class ByeRequestProcessor extends SIPRequestProcessorParent implements In audioBroadcastManager.del(ssrcTransaction.getDeviceId(), channel.getDeviceId()); } break; - - } // 释放ssrc MediaServer mediaServerItem = mediaServerService.getOne(ssrcTransaction.getMediaServerId()); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForMobilePositionProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForMobilePositionProcessor.java index ac20a564..8f3c5cd8 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForMobilePositionProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForMobilePositionProcessor.java @@ -95,7 +95,10 @@ public class NotifyRequestForMobilePositionProcessor extends SIPRequestProcessor case "DeviceID": String channelId = element.getStringValue(); if (!deviceId.equals(channelId)) { - mobilePosition.setChannelId(channelId); + DeviceChannel deviceChannel = deviceChannelService.getOne(device.getDeviceId(), channelId); + if (deviceChannel != null) { + mobilePosition.setChannelId(deviceChannel.getId()); + } } continue; case "Time": diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java index d2a47c60..ba1cd623 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java @@ -147,24 +147,26 @@ public class NotifyRequestProcessor extends SIPRequestProcessorParent implements } log.info("[收到Notify-Alarm]:{}/{}", device.getDeviceId(), deviceAlarm.getChannelId()); if ("4".equals(deviceAlarm.getAlarmMethod())) { - MobilePosition mobilePosition = new MobilePosition(); - mobilePosition.setChannelId(channelId); - mobilePosition.setCreateTime(DateUtil.getNow()); - mobilePosition.setDeviceId(deviceAlarm.getDeviceId()); - mobilePosition.setTime(deviceAlarm.getAlarmTime()); - mobilePosition.setLongitude(deviceAlarm.getLongitude()); - mobilePosition.setLatitude(deviceAlarm.getLatitude()); - mobilePosition.setReportSource("GPS Alarm"); + DeviceChannel deviceChannel = deviceChannelService.getOne(device.getDeviceId(), channelId); + if (deviceChannel == null) { + log.warn("[解析报警通知] 未找到通道:{}/{}", device.getDeviceId(), channelId); + }else { + MobilePosition mobilePosition = new MobilePosition(); + mobilePosition.setChannelId(deviceChannel.getId()); + mobilePosition.setCreateTime(DateUtil.getNow()); + mobilePosition.setDeviceId(deviceAlarm.getDeviceId()); + mobilePosition.setTime(deviceAlarm.getAlarmTime()); + mobilePosition.setLongitude(deviceAlarm.getLongitude()); + mobilePosition.setLatitude(deviceAlarm.getLatitude()); + mobilePosition.setReportSource("GPS Alarm"); - // 更新device channel 的经纬度 - DeviceChannel deviceChannel = new DeviceChannel(); - deviceChannel.setGbDeviceDbId(device.getId()); - deviceChannel.setDeviceId(channelId); - deviceChannel.setLongitude(mobilePosition.getLongitude()); - deviceChannel.setLatitude(mobilePosition.getLatitude()); - deviceChannel.setGpsTime(mobilePosition.getTime()); + // 更新device channel 的经纬度 + deviceChannel.setLongitude(mobilePosition.getLongitude()); + deviceChannel.setLatitude(mobilePosition.getLatitude()); + deviceChannel.setGpsTime(mobilePosition.getTime()); - deviceChannelService.updateChannelGPS(device, deviceChannel, mobilePosition); + deviceChannelService.updateChannelGPS(device, deviceChannel, mobilePosition); + } } // 回复200 OK diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/AlarmNotifyMessageHandler.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/AlarmNotifyMessageHandler.java index f7d0492e..8bab63b4 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/AlarmNotifyMessageHandler.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/AlarmNotifyMessageHandler.java @@ -134,21 +134,21 @@ public class AlarmNotifyMessageHandler extends SIPRequestProcessorParent impleme deviceAlarm.setLatitude(0.00); } - if (!ObjectUtils.isEmpty(deviceAlarm.getAlarmMethod())) { - if ( deviceAlarm.getAlarmMethod().contains(DeviceAlarmMethod.GPS.getVal() + "")) { + if (!ObjectUtils.isEmpty(deviceAlarm.getAlarmMethod()) && deviceAlarm.getAlarmMethod().contains(DeviceAlarmMethod.GPS.getVal() + "")) { + DeviceChannel deviceChannel = deviceChannelService.getOne(device.getDeviceId(), channelId); + if (deviceChannel == null) { + log.warn("[解析报警消息] 未找到通道:{}/{}", device.getDeviceId(), channelId); + } else { MobilePosition mobilePosition = new MobilePosition(); mobilePosition.setCreateTime(DateUtil.getNow()); mobilePosition.setDeviceId(deviceAlarm.getDeviceId()); - mobilePosition.setChannelId(channelId); + mobilePosition.setChannelId(deviceChannel.getId()); mobilePosition.setTime(deviceAlarm.getAlarmTime()); mobilePosition.setLongitude(deviceAlarm.getLongitude()); mobilePosition.setLatitude(deviceAlarm.getLatitude()); mobilePosition.setReportSource("GPS Alarm"); // 更新device channel 的经纬度 - DeviceChannel deviceChannel = new DeviceChannel(); - deviceChannel.setGbDeviceDbId(sipMsgInfo.getDevice().getId()); - deviceChannel.setDeviceId(channelId); deviceChannel.setLongitude(mobilePosition.getLongitude()); deviceChannel.setLatitude(mobilePosition.getLatitude()); deviceChannel.setGpsTime(mobilePosition.getTime()); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/BroadcastNotifyMessageHandler.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/BroadcastNotifyMessageHandler.java index ceca2576..76d9ac4b 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/BroadcastNotifyMessageHandler.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/BroadcastNotifyMessageHandler.java @@ -125,7 +125,7 @@ public class BroadcastNotifyMessageHandler extends SIPRequestProcessorParent imp // 消息发送成功, 向上级发送invite,获取推流 try { - platformService.broadcastInvite(platform, channel.getGbDeviceId(), mediaServerForMinimumLoad, (hookData)->{ + platformService.broadcastInvite(platform, channel, mediaServerForMinimumLoad, (hookData)->{ // 上级平台推流成功 AudioBroadcastCatch broadcastCatch = audioBroadcastManager.get(device.getDeviceId(), targetId); if (broadcastCatch != null ) { diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/MobilePositionNotifyMessageHandler.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/MobilePositionNotifyMessageHandler.java index 66088684..6db78e65 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/MobilePositionNotifyMessageHandler.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/MobilePositionNotifyMessageHandler.java @@ -91,13 +91,22 @@ public class MobilePositionNotifyMessageHandler extends SIPRequestProcessorParen log.warn("[移动位置通知] {}处理失败,未识别到信息体", device.getDeviceId()); continue; } + String channelId = getText(rootElementAfterCharset, "DeviceID"); + DeviceChannel deviceChannel = deviceChannelService.getOne(device.getDeviceId(), channelId); + if (deviceChannel == null) { + log.warn("[解析报警消息] 未找到通道:{}/{}", device.getDeviceId(), channelId); + continue; + } + + MobilePosition mobilePosition = new MobilePosition(); mobilePosition.setCreateTime(DateUtil.getNow()); if (!ObjectUtils.isEmpty(sipMsgInfo.getDevice().getName())) { mobilePosition.setDeviceName(sipMsgInfo.getDevice().getName()); } mobilePosition.setDeviceId(sipMsgInfo.getDevice().getDeviceId()); - mobilePosition.setChannelId(getText(rootElementAfterCharset, "DeviceID")); + + mobilePosition.setChannelId(deviceChannel.getId()); String time = getText(rootElementAfterCharset, "Time"); if (ObjectUtils.isEmpty(time)){ mobilePosition.setTime(DateUtil.getNow()); @@ -123,11 +132,7 @@ public class MobilePositionNotifyMessageHandler extends SIPRequestProcessorParen } mobilePosition.setReportSource("Mobile Position"); - // 更新device channel 的经纬度 - DeviceChannel deviceChannel = new DeviceChannel(); - deviceChannel.setDeviceId(sipMsgInfo.getDevice().getDeviceId()); - deviceChannel.setDeviceId(mobilePosition.getChannelId()); deviceChannel.setLongitude(mobilePosition.getLongitude()); deviceChannel.setLatitude(mobilePosition.getLatitude()); deviceChannel.setGpsTime(mobilePosition.getTime()); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/MobilePositionResponseMessageHandler.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/MobilePositionResponseMessageHandler.java index a239b385..15461ec2 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/MobilePositionResponseMessageHandler.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/MobilePositionResponseMessageHandler.java @@ -69,54 +69,57 @@ public class MobilePositionResponseMessageHandler extends SIPRequestProcessorPar } return; } - MobilePosition mobilePosition = new MobilePosition(); - mobilePosition.setCreateTime(DateUtil.getNow()); - if (!ObjectUtils.isEmpty(device.getName())) { - mobilePosition.setDeviceName(device.getName()); - } - mobilePosition.setDeviceId(device.getDeviceId()); - mobilePosition.setChannelId(getText(rootElement, "DeviceID")); - //兼容ISO 8601格式时间 - String time = getText(rootElement, "Time"); - if (ObjectUtils.isEmpty(time)){ - mobilePosition.setTime(DateUtil.getNow()); + String channelId = getText(rootElement, "DeviceID"); + DeviceChannel deviceChannel = deviceChannelService.getOne(device.getDeviceId(), channelId); + if (deviceChannel == null) { + log.warn("[解析报警消息] 未找到通道:{}/{}", device.getDeviceId(), channelId); }else { - mobilePosition.setTime(SipUtils.parseTime(time)); - } - mobilePosition.setLongitude(Double.parseDouble(getText(rootElement, "Longitude"))); - mobilePosition.setLatitude(Double.parseDouble(getText(rootElement, "Latitude"))); - if (NumericUtil.isDouble(getText(rootElement, "Speed"))) { - mobilePosition.setSpeed(Double.parseDouble(getText(rootElement, "Speed"))); - } else { - mobilePosition.setSpeed(0.0); - } - if (NumericUtil.isDouble(getText(rootElement, "Direction"))) { - mobilePosition.setDirection(Double.parseDouble(getText(rootElement, "Direction"))); - } else { - mobilePosition.setDirection(0.0); - } - if (NumericUtil.isDouble(getText(rootElement, "Altitude"))) { - mobilePosition.setAltitude(Double.parseDouble(getText(rootElement, "Altitude"))); - } else { - mobilePosition.setAltitude(0.0); - } - mobilePosition.setReportSource("Mobile Position"); + MobilePosition mobilePosition = new MobilePosition(); + mobilePosition.setCreateTime(DateUtil.getNow()); + if (!ObjectUtils.isEmpty(device.getName())) { + mobilePosition.setDeviceName(device.getName()); + } + mobilePosition.setDeviceId(device.getDeviceId()); + mobilePosition.setChannelId(deviceChannel.getId()); + //兼容ISO 8601格式时间 + String time = getText(rootElement, "Time"); + if (ObjectUtils.isEmpty(time)){ + mobilePosition.setTime(DateUtil.getNow()); + }else { + mobilePosition.setTime(SipUtils.parseTime(time)); + } + mobilePosition.setLongitude(Double.parseDouble(getText(rootElement, "Longitude"))); + mobilePosition.setLatitude(Double.parseDouble(getText(rootElement, "Latitude"))); + if (NumericUtil.isDouble(getText(rootElement, "Speed"))) { + mobilePosition.setSpeed(Double.parseDouble(getText(rootElement, "Speed"))); + } else { + mobilePosition.setSpeed(0.0); + } + if (NumericUtil.isDouble(getText(rootElement, "Direction"))) { + mobilePosition.setDirection(Double.parseDouble(getText(rootElement, "Direction"))); + } else { + mobilePosition.setDirection(0.0); + } + if (NumericUtil.isDouble(getText(rootElement, "Altitude"))) { + mobilePosition.setAltitude(Double.parseDouble(getText(rootElement, "Altitude"))); + } else { + mobilePosition.setAltitude(0.0); + } + mobilePosition.setReportSource("Mobile Position"); - // 更新device channel 的经纬度 - DeviceChannel deviceChannel = new DeviceChannel(); - deviceChannel.setGbDeviceDbId(device.getId()); - deviceChannel.setDeviceId(mobilePosition.getChannelId()); - deviceChannel.setLongitude(mobilePosition.getLongitude()); - deviceChannel.setLatitude(mobilePosition.getLatitude()); - deviceChannel.setGpsTime(mobilePosition.getTime()); + // 更新device channel 的经纬度 + deviceChannel.setLongitude(mobilePosition.getLongitude()); + deviceChannel.setLatitude(mobilePosition.getLatitude()); + deviceChannel.setGpsTime(mobilePosition.getTime()); - deviceChannelService.updateChannelGPS(device, deviceChannel, mobilePosition); + deviceChannelService.updateChannelGPS(device, deviceChannel, mobilePosition); - String key = DeferredResultHolder.CALLBACK_CMD_MOBILE_POSITION + device.getDeviceId(); - RequestMessage msg = new RequestMessage(); - msg.setKey(key); - msg.setData(mobilePosition); - resultHolder.invokeAllResult(msg); + String key = DeferredResultHolder.CALLBACK_CMD_MOBILE_POSITION + device.getDeviceId(); + RequestMessage msg = new RequestMessage(); + msg.setKey(key); + msg.setData(mobilePosition); + resultHolder.invokeAllResult(msg); + } //回复 200 OK try { diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/SendRtpPortManager.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/SendRtpPortManager.java index 74ad7af1..f5255190 100755 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/SendRtpPortManager.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/SendRtpPortManager.java @@ -34,7 +34,7 @@ public class SendRtpPortManager { return -1; } String sendIndexKey = KEY + userSetting.getServerId() + "_" + mediaServer.getId(); - String key = VideoManagerConstants.PLATFORM_SEND_RTP_INFO_PREFIX + String key = VideoManagerConstants.SEND_RTP_INFO_PREFIX + userSetting.getServerId() + "_*"; List queryResult = RedisUtil.scan(redisTemplate, key); Map sendRtpItemMap = new HashMap<>(); diff --git a/src/main/java/com/genersoft/iot/vmp/service/bean/GPSMsgInfo.java b/src/main/java/com/genersoft/iot/vmp/service/bean/GPSMsgInfo.java index 445b08e7..c3a90101 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/bean/GPSMsgInfo.java +++ b/src/main/java/com/genersoft/iot/vmp/service/bean/GPSMsgInfo.java @@ -8,9 +8,9 @@ import lombok.Data; public class GPSMsgInfo { /** - * ID + * 通道ID */ - private String id; + private Integer channelId; /** * 经度 (必选) @@ -46,7 +46,7 @@ public class GPSMsgInfo { public static GPSMsgInfo getInstance(MobilePosition mobilePosition) { GPSMsgInfo gpsMsgInfo = new GPSMsgInfo(); - gpsMsgInfo.setId(mobilePosition.getChannelId()); + gpsMsgInfo.setChannelId(mobilePosition.getChannelId()); gpsMsgInfo.setAltitude(mobilePosition.getAltitude() + ""); gpsMsgInfo.setLng(mobilePosition.getLongitude()); gpsMsgInfo.setLat(mobilePosition.getLatitude()); diff --git a/src/main/java/com/genersoft/iot/vmp/service/bean/MessageForPushChannel.java b/src/main/java/com/genersoft/iot/vmp/service/bean/MessageForPushChannel.java index 6a4f866c..3c8aba52 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/bean/MessageForPushChannel.java +++ b/src/main/java/com/genersoft/iot/vmp/service/bean/MessageForPushChannel.java @@ -1,9 +1,13 @@ package com.genersoft.iot.vmp.service.bean; +import lombok.Data; + /** * 当上级平台 * @author lin */ + +@Data public class MessageForPushChannel { /** * 消息类型 @@ -67,77 +71,4 @@ public class MessageForPushChannel { messageForPushChannel.setPlatFormName(platFormName); return messageForPushChannel; } - - - public int getType() { - return type; - } - - public void setType(int type) { - this.type = type; - } - - public String getApp() { - return app; - } - - public void setApp(String app) { - this.app = app; - } - - public String getStream() { - return stream; - } - - public void setStream(String stream) { - this.stream = stream; - } - - public String getGbId() { - return gbId; - } - - public void setGbId(String gbId) { - this.gbId = gbId; - } - - public String getPlatFormId() { - return platFormId; - } - - public void setPlatFormId(String platFormId) { - this.platFormId = platFormId; - } - - public String getPlatFormName() { - return platFormName; - } - - public void setPlatFormName(String platFormName) { - this.platFormName = platFormName; - } - - public String getServerId() { - return serverId; - } - - public void setServerId(String serverId) { - this.serverId = serverId; - } - - public String getMediaServerId() { - return mediaServerId; - } - - public void setMediaServerId(String mediaServerId) { - this.mediaServerId = mediaServerId; - } - - public int getPlatFormIndex() { - return platFormIndex; - } - - public void setPlatFormIndex(int platFormIndex) { - this.platFormIndex = platFormIndex; - } } diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServiceImpl.java index 6f122e7b..72b3837f 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServiceImpl.java @@ -161,7 +161,7 @@ public class MediaServiceImpl implements IMediaService { } // 设置音频信息及录制信息 - List ssrcTransactionForAll = sessionManager.getSsrcTransactionForAll(null, null, null, stream); + List ssrcTransactionForAll = sessionManager.getSsrcTransactionForAll(null, null, stream); if (ssrcTransactionForAll != null && ssrcTransactionForAll.size() == 1) { // 为录制国标模拟一个鉴权信息, 方便后续写入录像文件时使用 @@ -173,15 +173,15 @@ public class MediaServiceImpl implements IMediaService { redisCatchStorage.updateStreamAuthorityInfo(app, ssrcTransactionForAll.get(0).getStream(), streamAuthorityInfo); String deviceId = ssrcTransactionForAll.get(0).getDeviceId(); - String channelId = ssrcTransactionForAll.get(0).getChannelId(); - DeviceChannel deviceChannel = deviceChannelService.getOne(deviceId, channelId); + Integer channelId = ssrcTransactionForAll.get(0).getChannelId(); + DeviceChannel deviceChannel = deviceChannelService.getOneById(channelId); if (deviceChannel != null) { result.setEnable_audio(deviceChannel.isHasAudio()); } // 如果是录像下载就设置视频间隔十秒 if (ssrcTransactionForAll.get(0).getType() == InviteSessionType.DOWNLOAD) { // 获取录像的总时长,然后设置为这个视频的时长 - InviteInfo inviteInfoForDownload = inviteStreamService.getInviteInfo(InviteSessionType.DOWNLOAD, deviceId, channelId, stream); + InviteInfo inviteInfoForDownload = inviteStreamService.getInviteInfo(InviteSessionType.DOWNLOAD, channelId, stream); if (inviteInfoForDownload != null && inviteInfoForDownload.getStreamInfo() != null) { String startTime = inviteInfoForDownload.getStreamInfo().getStartTime(); String endTime = inviteInfoForDownload.getStreamInfo().getEndTime(); @@ -231,8 +231,7 @@ public class MediaServiceImpl implements IMediaService { } // 收到无人观看说明流也没有在往上级推送 if (redisCatchStorage.isChannelSendingRTP(inviteInfo.getChannelId())) { - List sendRtpItems = redisCatchStorage.querySendRTPServerByChannelId( - inviteInfo.getChannelId()); + List sendRtpItems = redisCatchStorage.querySendRTPServerByChannelId(inviteInfo.getChannelId()); if (!sendRtpItems.isEmpty()) { for (SendRtpItem sendRtpItem : sendRtpItems) { Platform parentPlatform = platformService.queryPlatformByServerGBId(sendRtpItem.getPlatformId()); @@ -252,12 +251,11 @@ public class MediaServiceImpl implements IMediaService { Device device = deviceService.getDeviceByDeviceId(inviteInfo.getDeviceId()); if (device != null) { try { + DeviceChannel channel = deviceChannelService.getOneById(inviteInfo.getChannelId()); // 多查询一次防止已经被处理了 - InviteInfo info = inviteStreamService.getInviteInfo(inviteInfo.getType(), - inviteInfo.getDeviceId(), inviteInfo.getChannelId(), inviteInfo.getStream()); - if (info != null) { - commander.streamByeCmd(device, inviteInfo.getChannelId(), - inviteInfo.getStream(), null); + InviteInfo info = inviteStreamService.getInviteInfo(inviteInfo.getType(), inviteInfo.getChannelId(), inviteInfo.getStream()); + if (info != null && channel != null) { + commander.streamByeCmd(device, channel.getDeviceId(), inviteInfo.getStream(), null); } else { log.info("[无人观看] 未找到设备的点播信息: {}, 流:{}", inviteInfo.getDeviceId(), stream); } @@ -269,9 +267,8 @@ public class MediaServiceImpl implements IMediaService { log.info("[无人观看] 未找到设备: {},流:{}", inviteInfo.getDeviceId(), stream); } - inviteStreamService.removeInviteInfo(inviteInfo.getType(), inviteInfo.getDeviceId(), - inviteInfo.getChannelId(), inviteInfo.getStream()); - deviceChannelService.stopPlay(inviteInfo.getDeviceId(), inviteInfo.getChannelId()); + inviteStreamService.removeInviteInfo(inviteInfo.getType(), inviteInfo.getChannelId(), inviteInfo.getStream()); + deviceChannelService.stopPlay(inviteInfo.getChannelId()); return result; } SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(null, null, stream, null); diff --git a/src/main/java/com/genersoft/iot/vmp/storager/IVideoManagerStorage.java b/src/main/java/com/genersoft/iot/vmp/storager/IVideoManagerStorage.java index c62d45cd..66e1af42 100755 --- a/src/main/java/com/genersoft/iot/vmp/storager/IVideoManagerStorage.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/IVideoManagerStorage.java @@ -47,9 +47,7 @@ public interface IVideoManagerStorage { * @return */ Device queryVideoDeviceByChannelId(String channelId); - - List queryPlatFormListForGBWithGBId(String channelId, List platforms); - + List queryEnablePlatformListWithAsMessageChannel(); } diff --git a/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java b/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java index 895efa3c..247eadf4 100755 --- a/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java @@ -140,7 +140,7 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage { @Override public List querySendRTPServer(String platformGbId, String channelId, String streamId) { - String scanKey = VideoManagerConstants.PLATFORM_SEND_RTP_INFO_PREFIX + String scanKey = VideoManagerConstants.SEND_RTP_INFO_PREFIX + userSetting.getServerId() + "_*_" + platformGbId + "_" + channelId + "_" @@ -171,7 +171,7 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage { if (callId == null) { callId = "*"; } - String key = VideoManagerConstants.PLATFORM_SEND_RTP_INFO_PREFIX + String key = VideoManagerConstants.SEND_RTP_INFO_PREFIX + "*_*_" + platformGbId + "_" + channelId + "_" @@ -193,7 +193,7 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage { String platformGbId = "*"; String callId = "*"; String streamId = "*"; - String key = VideoManagerConstants.PLATFORM_SEND_RTP_INFO_PREFIX + String key = VideoManagerConstants.SEND_RTP_INFO_PREFIX + userSetting.getServerId() + "_*_" + platformGbId + "_" + channelId + "_" @@ -215,7 +215,7 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage { String platformGbId = "*"; String callId = "*"; String channelId = "*"; - String key = VideoManagerConstants.PLATFORM_SEND_RTP_INFO_PREFIX + String key = VideoManagerConstants.SEND_RTP_INFO_PREFIX + userSetting.getServerId() + "_*_" + platformGbId + "_" + channelId + "_" @@ -234,7 +234,7 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage { if (platformGbId == null) { platformGbId = "*"; } - String key = VideoManagerConstants.PLATFORM_SEND_RTP_INFO_PREFIX + String key = VideoManagerConstants.SEND_RTP_INFO_PREFIX + userSetting.getServerId() + "_*_" + platformGbId + "_*" + "_*" + "_*"; List queryResult = RedisUtil.scan(redisTemplate, key); @@ -259,7 +259,7 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage { if (callId == null) { callId = "*"; } - String key = VideoManagerConstants.PLATFORM_SEND_RTP_INFO_PREFIX + String key = VideoManagerConstants.SEND_RTP_INFO_PREFIX + userSetting.getServerId() + "_*_" + platformGbId + "_" + channelId + "_" @@ -284,7 +284,7 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage { @Override public List queryAllSendRTPServer() { - String key = VideoManagerConstants.PLATFORM_SEND_RTP_INFO_PREFIX + String key = VideoManagerConstants.SEND_RTP_INFO_PREFIX + userSetting.getServerId() + "_*"; List queryResult = RedisUtil.scan(redisTemplate, key); List result= new ArrayList<>(); @@ -302,7 +302,7 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage { */ @Override public boolean isChannelSendingRTP(String channelId) { - String key = VideoManagerConstants.PLATFORM_SEND_RTP_INFO_PREFIX + String key = VideoManagerConstants.SEND_RTP_INFO_PREFIX + userSetting.getServerId() + "_*_*_" + channelId + "*_" + "*_"; List RtpStreams = RedisUtil.scan(redisTemplate, key); @@ -414,7 +414,7 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage { @Override public void updateGpsMsgInfo(GPSMsgInfo gpsMsgInfo) { - String key = VideoManagerConstants.WVP_STREAM_GPS_MSG_PREFIX + userSetting.getServerId() + "_" + gpsMsgInfo.getId(); + String key = VideoManagerConstants.WVP_STREAM_GPS_MSG_PREFIX + userSetting.getServerId() + "_" + gpsMsgInfo.getChannelId(); Duration duration = Duration.ofSeconds(60L); redisTemplate.opsForValue().set(key, gpsMsgInfo, duration); // 默认GPS消息保存1分钟 @@ -626,7 +626,7 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage { @Override public int getGbSendCount(String id) { - String key = VideoManagerConstants.PLATFORM_SEND_RTP_INFO_PREFIX + String key = VideoManagerConstants.SEND_RTP_INFO_PREFIX + userSetting.getServerId() + "_*_" + id + "_*"; return RedisUtil.scan(redisTemplate, key).size(); } diff --git a/src/main/java/com/genersoft/iot/vmp/storager/impl/VideoManagerStorageImpl.java b/src/main/java/com/genersoft/iot/vmp/storager/impl/VideoManagerStorageImpl.java index a26b17ce..5818611a 100755 --- a/src/main/java/com/genersoft/iot/vmp/storager/impl/VideoManagerStorageImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/impl/VideoManagerStorageImpl.java @@ -6,7 +6,6 @@ import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel; import com.genersoft.iot.vmp.gb28181.bean.MobilePosition; import com.genersoft.iot.vmp.gb28181.bean.Platform; import com.genersoft.iot.vmp.gb28181.dao.*; -import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.IVideoManagerStorage; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; @@ -38,9 +37,6 @@ public class VideoManagerStorageImpl implements IVideoManagerStorage { @Autowired private PlatformMapper platformMapper; - @Autowired - private IRedisCatchStorage redisCatchStorage; - @Autowired private PlatformChannelMapper platformChannelMapper; @@ -111,8 +107,6 @@ public class VideoManagerStorageImpl implements IVideoManagerStorage { return result; } - @Override - public List queryPlatFormListForGBWithGBId(String channelId, List platforms) { - return platformChannelMapper.queryPlatFormListForGBWithGBId(channelId, platforms); - } + + } diff --git a/src/main/java/com/genersoft/iot/vmp/web/gb28181/ApiStreamController.java b/src/main/java/com/genersoft/iot/vmp/web/gb28181/ApiStreamController.java index e8566dee..82a45060 100644 --- a/src/main/java/com/genersoft/iot/vmp/web/gb28181/ApiStreamController.java +++ b/src/main/java/com/genersoft/iot/vmp/web/gb28181/ApiStreamController.java @@ -244,7 +244,7 @@ public class ApiStreamController { return result; } inviteStreamService.removeInviteInfo(inviteInfo); - deviceChannelService.stopPlay(inviteInfo.getDeviceId(), inviteInfo.getChannelId()); + deviceChannelService.stopPlay(inviteInfo.getChannelId()); return null; }