From 7dc8fd4a1e8c5afb6fad53454935419c239838c0 Mon Sep 17 00:00:00 2001 From: panlinlin <648540858@qq.com> Date: Thu, 1 Apr 2021 18:06:21 +0800 Subject: [PATCH] =?UTF-8?q?=E6=B7=BB=E5=8A=A0=E6=8B=89=E6=B5=81=E4=BB=A3?= =?UTF-8?q?=E7=90=86=E4=B8=8E=E5=9B=BD=E6=A0=87=E5=85=B3=E8=81=94,=20?= =?UTF-8?q?=E6=94=AF=E6=8C=81=E4=BB=A3=E7=90=86rtsp/rtmp/...=EF=BC=8C?= =?UTF-8?q?=E8=BD=AC=E5=8F=91=E5=88=B0=E5=9B=BD=E6=A0=87?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../genersoft/iot/vmp/common/StreamInfo.java | 9 + .../iot/vmp/gb28181/bean/GbStream.java | 71 +++++ .../vmp/gb28181/bean/PlatformGbStream.java | 31 ++ .../iot/vmp/gb28181/bean/SendRtpItem.java | 9 +- .../transmit/cmd/impl/SIPCommander.java | 4 +- .../cmd/impl/SIPCommanderFroPlatform.java | 30 +- .../request/impl/AckRequestProcessor.java | 17 +- .../request/impl/ByeRequestProcessor.java | 4 +- .../request/impl/InviteRequestProcessor.java | 181 +++++++---- .../request/impl/MessageRequestProcessor.java | 33 +- .../vmp/media/zlm/ZLMRTPServerFactory.java | 47 ++- .../iot/vmp/media/zlm/ZLMRunner.java | 6 +- ...reamProxyDto.java => StreamProxyItem.java} | 7 +- .../vmp/storager/IVideoManagerStorager.java | 34 ++- .../iot/vmp/storager/dao/GbStreamMapper.java | 49 +++ .../storager/dao/PlarfotmGbStreamMapper.java | 28 ++ ...Mapper.java => PlatformChannelMapper.java} | 2 +- .../vmp/storager/dao/StreamProxyMapper.java | 18 +- .../impl/VideoManagerStoragerImpl.java | 102 +++++-- .../vmanager/gbStream/GbStreamController.java | 65 ++++ .../vmanager/gbStream/bean/GbStreamParam.java | 28 ++ .../vmp/vmanager/media/MediaController.java | 3 - .../PlatformGbStreamController.java | 35 +++ .../iot/vmp/vmanager/play/PlayController.java | 13 - .../vmanager/service/IGbStreamService.java | 41 +++ .../vmanager/service/IStreamProxyService.java | 11 +- .../service/impl/GbStreamServiceImpl.java | 89 ++++++ .../service/impl/MediaServiceImpl.java | 2 + .../service/impl/PlayServiceImpl.java | 19 -- .../service/impl/StreamProxyServiceImpl.java | 56 ++-- .../streamProxy/StreamProxyController.java | 13 +- src/main/resources/wvp.sqlite | Bin 73728 -> 81920 bytes web_src/src/components/PLatformStreamList.vue | 283 ++++++++++++++++++ web_src/src/components/ParentPlatformList.vue | 2 +- web_src/src/components/PushVideoList.vue | 21 +- web_src/src/components/StreamProxyList.vue | 7 +- .../src/components/dialog/StreamProxyEdit.vue | 11 +- .../src/components/dialog/chooseChannel.vue | 8 +- .../components/dialog/chooseChannelForGb.vue | 2 +- .../dialog/chooseChannelForStream.vue | 215 +++++++++++++ .../src/components/dialog/devicePlayer.vue | 7 +- web_src/src/router/index.js | 5 + 42 files changed, 1394 insertions(+), 224 deletions(-) create mode 100644 src/main/java/com/genersoft/iot/vmp/gb28181/bean/GbStream.java create mode 100644 src/main/java/com/genersoft/iot/vmp/gb28181/bean/PlatformGbStream.java rename src/main/java/com/genersoft/iot/vmp/media/zlm/dto/{StreamProxyDto.java => StreamProxyItem.java} (95%) create mode 100644 src/main/java/com/genersoft/iot/vmp/storager/dao/GbStreamMapper.java create mode 100644 src/main/java/com/genersoft/iot/vmp/storager/dao/PlarfotmGbStreamMapper.java rename src/main/java/com/genersoft/iot/vmp/storager/dao/{PatformChannelMapper.java => PlatformChannelMapper.java} (98%) create mode 100644 src/main/java/com/genersoft/iot/vmp/vmanager/gbStream/GbStreamController.java create mode 100644 src/main/java/com/genersoft/iot/vmp/vmanager/gbStream/bean/GbStreamParam.java create mode 100644 src/main/java/com/genersoft/iot/vmp/vmanager/platformGbStream/PlatformGbStreamController.java create mode 100644 src/main/java/com/genersoft/iot/vmp/vmanager/service/IGbStreamService.java create mode 100644 src/main/java/com/genersoft/iot/vmp/vmanager/service/impl/GbStreamServiceImpl.java create mode 100644 web_src/src/components/PLatformStreamList.vue create mode 100644 web_src/src/components/dialog/chooseChannelForStream.vue diff --git a/src/main/java/com/genersoft/iot/vmp/common/StreamInfo.java b/src/main/java/com/genersoft/iot/vmp/common/StreamInfo.java index 0082a27d..1c24eb9d 100644 --- a/src/main/java/com/genersoft/iot/vmp/common/StreamInfo.java +++ b/src/main/java/com/genersoft/iot/vmp/common/StreamInfo.java @@ -4,6 +4,7 @@ import com.alibaba.fastjson.JSONArray; public class StreamInfo { + private String app; private String streamId; private String deviceID; private String channelId; @@ -19,6 +20,14 @@ public class StreamInfo { private String rtsp; private JSONArray tracks; + public String getApp() { + return app; + } + + public void setApp(String app) { + this.app = app; + } + public String getDeviceID() { return deviceID; } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/GbStream.java b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/GbStream.java new file mode 100644 index 00000000..8fb1980c --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/GbStream.java @@ -0,0 +1,71 @@ +package com.genersoft.iot.vmp.gb28181.bean; + +/** + * 直播流关联国标上级平台 + */ +public class GbStream extends PlatformGbStream{ + + private String app; + private String stream; + private String gbId; + private String name; + private double longitude; + private double latitude; + private String streamType; + + public String getApp() { + return app; + } + + public void setApp(String app) { + this.app = app; + } + + public String getStream() { + return stream; + } + + public void setStream(String stream) { + this.stream = stream; + } + + public String getGbId() { + return gbId; + } + + public void setGbId(String gbId) { + this.gbId = gbId; + } + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + public double getLongitude() { + return longitude; + } + + public void setLongitude(double longitude) { + this.longitude = longitude; + } + + public double getLatitude() { + return latitude; + } + + public void setLatitude(double latitude) { + this.latitude = latitude; + } + + public String getStreamType() { + return streamType; + } + + public void setStreamType(String streamType) { + this.streamType = streamType; + } +} diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/PlatformGbStream.java b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/PlatformGbStream.java new file mode 100644 index 00000000..a4871aef --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/PlatformGbStream.java @@ -0,0 +1,31 @@ +package com.genersoft.iot.vmp.gb28181.bean; + +public class PlatformGbStream { + private String app; + private String stream; + private String platformId; + + public String getApp() { + return app; + } + + public void setApp(String app) { + this.app = app; + } + + public String getStream() { + return stream; + } + + public void setStream(String stream) { + this.stream = stream; + } + + public String getPlatformId() { + return platformId; + } + + public void setPlatformId(String platformId) { + this.platformId = platformId; + } +} diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SendRtpItem.java b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SendRtpItem.java index e0220dc9..15e6f1e9 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SendRtpItem.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SendRtpItem.java @@ -27,6 +27,11 @@ public class SendRtpItem { */ private String deviceId; + /** + * 直播流的应用名 + */ + private String app; + /** * 通道id */ @@ -40,10 +45,6 @@ public class SendRtpItem { */ private int status = 0; - /** - * 设备推流的app - */ - private String app = "rtp"; /** * 设备推流的streamId diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java index 313348e2..579e168b 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java @@ -91,7 +91,9 @@ public class SIPCommander implements ISIPCommander { @Autowired private SipSubscribe sipSubscribe; - + public SipConfig getSipConfig() { + return sipConfig; + } /** * 云台方向放控制,使用配置文件中的默认镜头移动速度 diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java index 08925261..7945dcd0 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java @@ -197,7 +197,7 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform { @Override public boolean catalogQuery(DeviceChannel channel, ParentPlatform parentPlatform, String sn, String fromTag, int size) { - if (channel == null || parentPlatform ==null) { + if ( parentPlatform ==null) { return false; } try { @@ -210,20 +210,22 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform { catalogXml.append("" + size + "\r\n"); catalogXml.append("\r\n"); catalogXml.append("\r\n"); + if (channel != null) { + catalogXml.append("" + channel.getChannelId() + "\r\n"); + catalogXml.append("" + channel.getName() + "\r\n"); + catalogXml.append("" + channel.getManufacture() + "\r\n"); + catalogXml.append("" + channel.getModel() + "\r\n"); + catalogXml.append("" + channel.getOwner() + "\r\n"); + catalogXml.append("" + channel.getCivilCode() + "\r\n"); + catalogXml.append("
" + channel.getAddress() + "
\r\n"); + catalogXml.append("" + channel.getParental() + "\r\n");// TODO 当前不能添加分组, 所以暂时没有父节点 + catalogXml.append("" + channel.getParentId() + "\r\n"); // TODO 当前不能添加分组, 所以暂时没有父节点 + catalogXml.append("" + channel.getSecrecy() + "\r\n"); + catalogXml.append("" + channel.getRegisterWay() + "\r\n"); + catalogXml.append("" + (channel.getStatus() == 0?"OFF":"ON") + "\r\n"); + catalogXml.append("\r\n"); + } - catalogXml.append("" + channel.getChannelId() + "\r\n"); - catalogXml.append("" + channel.getName() + "\r\n"); - catalogXml.append("" + channel.getManufacture() + "\r\n"); - catalogXml.append("" + channel.getModel() + "\r\n"); - catalogXml.append("" + channel.getOwner() + "\r\n"); - catalogXml.append("" + channel.getCivilCode() + "\r\n"); - catalogXml.append("
" + channel.getAddress() + "
\r\n"); - catalogXml.append("" + channel.getParental() + "\r\n");// TODO 当前不能添加分组, 所以暂时没有父节点 - catalogXml.append("" + channel.getParentId() + "\r\n"); // TODO 当前不能添加分组, 所以暂时没有父节点 - catalogXml.append("" + channel.getSecrecy() + "\r\n"); - catalogXml.append("" + channel.getRegisterWay() + "\r\n"); - catalogXml.append("" + (channel.getStatus() == 0?"OFF":"ON") + "\r\n"); - catalogXml.append("\r\n"); catalogXml.append("
\r\n"); catalogXml.append("
\r\n"); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/request/impl/AckRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/request/impl/AckRequestProcessor.java index 20e1c800..00f1f7da 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/request/impl/AckRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/request/impl/AckRequestProcessor.java @@ -43,14 +43,23 @@ public class AckRequestProcessor extends SIPRequestAbstractProcessor { SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(platformGbId, channelId); String is_Udp = sendRtpItem.isTcp() ? "0" : "1"; String deviceId = sendRtpItem.getDeviceId(); - StreamInfo streamInfo = redisCatchStorage.queryPlayByDevice(deviceId, channelId); - sendRtpItem.setStreamId(streamInfo.getStreamId()); + StreamInfo streamInfo = null; + if (deviceId == null) { + streamInfo = new StreamInfo(); + streamInfo.setApp(sendRtpItem.getApp()); + streamInfo.setStreamId(sendRtpItem.getStreamId()); + }else { + streamInfo = redisCatchStorage.queryPlayByDevice(deviceId, channelId); + sendRtpItem.setStreamId(streamInfo.getStreamId()); + streamInfo.setApp("rtp"); + } + redisCatchStorage.updateSendRTPSever(sendRtpItem); System.out.println(platformGbId); System.out.println(channelId); Map param = new HashMap<>(); param.put("vhost","__defaultVhost__"); - param.put("app","rtp"); + param.put("app",streamInfo.getApp()); param.put("stream",streamInfo.getStreamId()); param.put("ssrc", sendRtpItem.getSsrc()); param.put("dst_url",sendRtpItem.getIp()); @@ -63,7 +72,7 @@ public class AckRequestProcessor extends SIPRequestAbstractProcessor { while (!rtpPushed) { try { if (System.currentTimeMillis() - startTime < 30 * 1000) { - if (zlmrtpServerFactory.isRtpReady(streamInfo.getStreamId())) { + if (zlmrtpServerFactory.isStreamReady(streamInfo.getApp(), streamInfo.getStreamId())) { rtpPushed = true; System.out.println("已获取设备推流,开始向上级推流"); zlmrtpServerFactory.startSendRtpStream(param); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/request/impl/ByeRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/request/impl/ByeRequestProcessor.java index c96501d5..ec5f921f 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/request/impl/ByeRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/request/impl/ByeRequestProcessor.java @@ -51,12 +51,12 @@ public class ByeRequestProcessor extends SIPRequestAbstractProcessor { String streamId = sendRtpItem.getStreamId(); Map param = new HashMap<>(); param.put("vhost","__defaultVhost__"); - param.put("app","rtp"); + param.put("app",sendRtpItem.getApp()); param.put("stream",streamId); System.out.println("停止向上级推流:" + streamId); zlmrtpServerFactory.stopSendRtpStream(param); redisCatchStorage.deleteSendRTPServer(platformGbId, channelId); - if (zlmrtpServerFactory.totalReaderCount(streamId) == 0) { + if (zlmrtpServerFactory.totalReaderCount(sendRtpItem.getApp(), streamId) == 0) { System.out.println(streamId + "无其它观看者,通知设备停止推流"); cmder.streamByeCmd(streamId); } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/request/impl/InviteRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/request/impl/InviteRequestProcessor.java index a7b3d94e..15d06387 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/request/impl/InviteRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/request/impl/InviteRequestProcessor.java @@ -12,13 +12,12 @@ import javax.sip.message.Request; import javax.sip.message.Response; import com.genersoft.iot.vmp.conf.MediaServerConfig; -import com.genersoft.iot.vmp.gb28181.bean.Device; -import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel; -import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform; -import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem; +import com.genersoft.iot.vmp.gb28181.bean.*; +import com.genersoft.iot.vmp.gb28181.event.SipSubscribe; import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander; import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommanderFroPlatform; import com.genersoft.iot.vmp.gb28181.transmit.request.SIPRequestAbstractProcessor; +import com.genersoft.iot.vmp.media.zlm.ZLMHttpHookSubscribe; import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.IVideoManagerStorager; @@ -30,6 +29,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.text.ParseException; +import java.util.HashMap; +import java.util.Map; import java.util.Vector; /** @@ -93,12 +94,14 @@ public class InviteRequestProcessor extends SIPRequestAbstractProcessor { if (platform != null) { // 查询平台下是否有该通道 DeviceChannel channel = storager.queryChannelInParentPlatform(requesterId, channelId); - if (channel == null) { + GbStream gbStream = storager.queryStreamInParentPlatform(requesterId, channelId); + // 不是通道可能是直播流 + if (channel != null || gbStream != null ) { + responseAck(evt, Response.CALL_IS_BEING_FORWARDED); // 通道存在,发181,呼叫转接中 + }else { logger.info("通道不存在,返回404"); responseAck(evt, Response.NOT_FOUND); // 通道不存在,发404,资源不存在 return; - }else { - responseAck(evt, Response.CALL_IS_BEING_FORWARDED); // 通道存在,发181,呼叫转接中 } // 解析sdp消息, 使用jainsip 自带的sdp解析方式 String contentString = new String(request.getRawContent()); @@ -153,67 +156,120 @@ public class InviteRequestProcessor extends SIPRequestAbstractProcessor { String addressStr = sdp.getOrigin().getAddress(); //String sessionName = sdp.getSessionName().getValue(); logger.info("[上级点播]用户:{}, 地址:{}:{}, ssrc:{}", username, addressStr, port, ssrc); + Device device = null; + // 通过 channel 和 gbStream 是否为null 值判断来源是直播流合适国标 + if (channel != null) { + device = storager.queryVideoDeviceByPlatformIdAndChannelId(requesterId, channelId); + if (device == null) { + logger.warn("点播平台{}的通道{}时未找到设备信息", requesterId, channel); + responseAck(evt, Response.SERVER_INTERNAL_ERROR); + return; + } + SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(addressStr, port, ssrc, requesterId, + device.getDeviceId(), channelId, + mediaTransmissionTCP); + if (tcpActive != null) { + sendRtpItem.setTcpActive(tcpActive); + } + if (sendRtpItem == null) { + logger.warn("服务器端口资源不足"); + responseAck(evt, Response.BUSY_HERE); + return; + } - Device device = storager.queryVideoDeviceByPlatformIdAndChannelId(requesterId, channelId); - if (device == null) { - logger.warn("点播平台{}的通道{}时未找到设备信息", requesterId, channel); - responseAck(evt, Response.SERVER_INTERNAL_ERROR); - return; - } - SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(addressStr, port, ssrc, requesterId, device.getDeviceId(), channelId, - mediaTransmissionTCP); - if (tcpActive != null) { - sendRtpItem.setTcpActive(tcpActive); - } - if (sendRtpItem == null) { - logger.warn("服务器端口资源不足"); - responseAck(evt, Response.BUSY_HERE); - return; - } - - // 写入redis, 超时时回复 - redisCatchStorage.updateSendRTPSever(sendRtpItem); - // 通知下级推流, - PlayResult playResult = playService.play(device.getDeviceId(), channelId, (responseJSON)->{ - // 收到推流, 回复200OK, 等待ack - sendRtpItem.setStatus(1); + // 写入redis, 超时时回复 redisCatchStorage.updateSendRTPSever(sendRtpItem); - // TODO 添加对tcp的支持 - MediaServerConfig mediaInfo = redisCatchStorage.getMediaInfo(); - StringBuffer content = new StringBuffer(200); - content.append("v=0\r\n"); - content.append("o="+"00000"+" 0 0 IN IP4 "+mediaInfo.getWanIp()+"\r\n"); - content.append("s=Play\r\n"); - content.append("c=IN IP4 "+mediaInfo.getWanIp()+"\r\n"); - content.append("t=0 0\r\n"); - content.append("m=video "+ sendRtpItem.getLocalPort()+" RTP/AVP 96\r\n"); - content.append("a=sendonly\r\n"); - content.append("a=rtpmap:96 PS/90000\r\n"); - content.append("y="+ ssrc + "\r\n"); - content.append("f=\r\n"); + // 通知下级推流, + PlayResult playResult = playService.play(device.getDeviceId(), channelId, (responseJSON)->{ + // 收到推流, 回复200OK, 等待ack + if (sendRtpItem == null) return; + sendRtpItem.setStatus(1); + redisCatchStorage.updateSendRTPSever(sendRtpItem); + // TODO 添加对tcp的支持 + MediaServerConfig mediaInfo = redisCatchStorage.getMediaInfo(); + StringBuffer content = new StringBuffer(200); + content.append("v=0\r\n"); + content.append("o="+"00000"+" 0 0 IN IP4 "+mediaInfo.getWanIp()+"\r\n"); + content.append("s=Play\r\n"); + content.append("c=IN IP4 "+mediaInfo.getWanIp()+"\r\n"); + content.append("t=0 0\r\n"); + content.append("m=video "+ sendRtpItem.getLocalPort()+" RTP/AVP 96\r\n"); + content.append("a=sendonly\r\n"); + content.append("a=rtpmap:96 PS/90000\r\n"); + content.append("y="+ ssrc + "\r\n"); + content.append("f=\r\n"); - try { - responseAck(evt, content.toString()); - } catch (SipException e) { - e.printStackTrace(); - } catch (InvalidArgumentException e) { - e.printStackTrace(); - } catch (ParseException e) { - e.printStackTrace(); + try { + responseAck(evt, content.toString()); + } catch (SipException e) { + e.printStackTrace(); + } catch (InvalidArgumentException e) { + e.printStackTrace(); + } catch (ParseException e) { + e.printStackTrace(); + } + } ,(event -> { + // 未知错误。直接转发设备点播的错误 + Response response = null; + try { + response = getMessageFactory().createResponse(event.getResponse().getStatusCode(), evt.getRequest()); + getServerTransaction(evt).sendResponse(response); + } catch (ParseException | SipException | InvalidArgumentException e) { + e.printStackTrace(); + } + })); + if (logger.isDebugEnabled()) { + logger.debug(playResult.getResult().toString()); } - },(event -> { - // 未知错误。直接转发设备点播的错误 - Response response = null; - try { - response = getMessageFactory().createResponse(event.getResponse().getStatusCode(), evt.getRequest()); - getServerTransaction(evt).sendResponse(response); - } catch (ParseException | SipException | InvalidArgumentException e) { - e.printStackTrace(); + + }else if (gbStream != null) { + SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(addressStr, port, ssrc, requesterId, + gbStream.getApp(), gbStream.getStream(), channelId, + mediaTransmissionTCP); + + if (tcpActive != null) { + sendRtpItem.setTcpActive(tcpActive); + } + if (sendRtpItem == null) { + logger.warn("服务器端口资源不足"); + responseAck(evt, Response.BUSY_HERE); + return; + } + + // 写入redis, 超时时回复 + redisCatchStorage.updateSendRTPSever(sendRtpItem); + + // 检测直播流是否在线 + Boolean streamReady = zlmrtpServerFactory.isStreamReady(gbStream.getApp(), gbStream.getStream()); + if (streamReady) { + sendRtpItem.setStatus(1); + redisCatchStorage.updateSendRTPSever(sendRtpItem); + // TODO 添加对tcp的支持 + MediaServerConfig mediaInfo = redisCatchStorage.getMediaInfo(); + StringBuffer content = new StringBuffer(200); + content.append("v=0\r\n"); + content.append("o="+"00000"+" 0 0 IN IP4 "+mediaInfo.getWanIp()+"\r\n"); + content.append("s=Play\r\n"); + content.append("c=IN IP4 "+mediaInfo.getWanIp()+"\r\n"); + content.append("t=0 0\r\n"); + content.append("m=video "+ sendRtpItem.getLocalPort()+" RTP/AVP 96\r\n"); + content.append("a=sendonly\r\n"); + content.append("a=rtpmap:96 PS/90000\r\n"); + content.append("y="+ ssrc + "\r\n"); + content.append("f=\r\n"); + + try { + responseAck(evt, content.toString()); + } catch (SipException e) { + e.printStackTrace(); + } catch (InvalidArgumentException e) { + e.printStackTrace(); + } catch (ParseException e) { + e.printStackTrace(); + } } - })); - if (logger.isDebugEnabled()) { - logger.debug(playResult.getResult().toString()); } + } else { // 非上级平台请求,查询是否设备请求(通常为接收语音广播的设备) Device device = storager.queryVideoDevice(requesterId); @@ -298,6 +354,7 @@ public class InviteRequestProcessor extends SIPRequestAbstractProcessor { } } + /*** * 回复状态码 * 100 trying diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/request/impl/MessageRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/request/impl/MessageRequestProcessor.java index 5a242021..8176f459 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/request/impl/MessageRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/request/impl/MessageRequestProcessor.java @@ -529,13 +529,44 @@ public class MessageRequestProcessor extends SIPRequestAbstractProcessor { String sn = snElement.getText(); // 准备回复通道信息 List channelReduces = storager.queryChannelListInParentPlatform(parentPlatform.getServerGBId()); + // 查询关联的直播通道 + List gbStreams = storager.queryGbStreamListInPlatform(parentPlatform.getServerGBId()); + int size = channelReduces.size() + gbStreams.size(); + // 回复级联的通道 if (channelReduces.size() > 0) { for (ChannelReduce channelReduce : channelReduces) { DeviceChannel deviceChannel = storager.queryChannel(channelReduce.getDeviceId(), channelReduce.getChannelId()); - cmderFroPlatform.catalogQuery(deviceChannel, parentPlatform, sn, fromHeader.getTag(), channelReduces.size()); + cmderFroPlatform.catalogQuery(deviceChannel, parentPlatform, sn, fromHeader.getTag(), size); } } + // 回复直播的通道 + if (gbStreams.size() > 0) { + for (GbStream gbStream : gbStreams) { + DeviceChannel deviceChannel = new DeviceChannel(); + deviceChannel.setChannelId(gbStream.getGbId()); + deviceChannel.setName(gbStream.getName()); + deviceChannel.setLongitude(gbStream.getLongitude()); + deviceChannel.setLatitude(gbStream.getLatitude()); + deviceChannel.setDeviceId(parentPlatform.getDeviceGBId()); + deviceChannel.setManufacture("wvp-pro"); + deviceChannel.setStatus(1); +// deviceChannel.setParentId(parentPlatform.getDeviceGBId()); + deviceChannel.setRegisterWay(1); + deviceChannel.setCivilCode(cmder.getSipConfig().getSipDomain()); + deviceChannel.setModel("live"); + deviceChannel.setOwner("wvp-pro"); +// deviceChannel.setAddress("test"); + deviceChannel.setParental(0); + deviceChannel.setSecrecy("0"); + deviceChannel.setSecrecy("0"); + cmderFroPlatform.catalogQuery(deviceChannel, parentPlatform, sn, fromHeader.getTag(), size); + } + } + if (size == 0) { + // 回复无通道 + cmderFroPlatform.catalogQuery(null, parentPlatform, sn, fromHeader.getTag(), size); + } } diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java index 1f1693df..02fb742e 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java @@ -96,7 +96,7 @@ public class ZLMRTPServerFactory { } /** - * 创建一个推流 + * 创建一个国标推流 * @param ip 推流ip * @param port 推流端口 * @param ssrc 推流唯一标识 @@ -122,6 +122,39 @@ public class ZLMRTPServerFactory { sendRtpItem.setDeviceId(deviceId); sendRtpItem.setChannelId(channelId); sendRtpItem.setTcp(tcp); + sendRtpItem.setApp("rtp"); + sendRtpItem.setLocalPort(localPort); + return sendRtpItem; + } + + /** + * 创建一个直播推流 + * @param ip 推流ip + * @param port 推流端口 + * @param ssrc 推流唯一标识 + * @param platformId 平台id + * @param channelId 通道id + * @param tcp 是否为tcp + * @return SendRtpItem + */ + public SendRtpItem createSendRtpItem(String ip, int port, String ssrc, String platformId, String app, String stream, String channelId, boolean tcp){ + String playSsrc = SsrcUtil.getPlaySsrc(); + int localPort = createRTPServer(SsrcUtil.getPlaySsrc()); + if (localPort != -1) { + closeRTPServer(playSsrc); + }else { + logger.error("没有可用的端口"); + return null; + } + SendRtpItem sendRtpItem = new SendRtpItem(); + sendRtpItem.setIp(ip); + sendRtpItem.setPort(port); + sendRtpItem.setSsrc(ssrc); + sendRtpItem.setApp(app); + sendRtpItem.setStreamId(stream); + sendRtpItem.setPlatformId(platformId); + sendRtpItem.setChannelId(channelId); + sendRtpItem.setTcp(tcp); sendRtpItem.setLocalPort(localPort); return sendRtpItem; } @@ -152,13 +185,21 @@ public class ZLMRTPServerFactory { return (mediaInfo.getInteger("code") == 0 && mediaInfo.getBoolean("online")); } + /** + * 查询待转推的流是否就绪 + */ + public Boolean isStreamReady(String app, String streamId) { + JSONObject mediaInfo = zlmresTfulUtils.getMediaInfo(app, "rtmp", streamId); + return (mediaInfo.getInteger("code") == 0 && mediaInfo.getBoolean("online")); + } + /** * 查询转推的流是否有其它观看者 * @param streamId * @return */ - public int totalReaderCount(String streamId) { - JSONObject mediaInfo = zlmresTfulUtils.getMediaInfo("rtp", "rtmp", streamId); + public int totalReaderCount(String app, String streamId) { + JSONObject mediaInfo = zlmresTfulUtils.getMediaInfo(app, "rtmp", streamId); return mediaInfo.getInteger("totalReaderCount"); } diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRunner.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRunner.java index 15fa9573..1340a68f 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRunner.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRunner.java @@ -4,7 +4,7 @@ import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; import com.genersoft.iot.vmp.conf.MediaServerConfig; -import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyDto; +import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; //import com.genersoft.iot.vmp.storager.IVideoManagerStorager; import com.genersoft.iot.vmp.storager.IVideoManagerStorager; @@ -165,8 +165,8 @@ public class ZLMRunner implements CommandLineRunner { // 更新流列表 zlmMediaListManager.updateMediaList(); // 恢复流代理 - List streamProxyListForEnable = storager.getStreamProxyListForEnable(true); - for (StreamProxyDto streamProxyDto : streamProxyListForEnable) { + List streamProxyListForEnable = storager.getStreamProxyListForEnable(true); + for (StreamProxyItem streamProxyDto : streamProxyListForEnable) { logger.info("恢复流代理," + streamProxyDto.getApp() + "/" + streamProxyDto.getStream()); streamProxyService.addStreamProxyToZlm(streamProxyDto); } diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/StreamProxyDto.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/StreamProxyItem.java similarity index 95% rename from src/main/java/com/genersoft/iot/vmp/media/zlm/dto/StreamProxyDto.java rename to src/main/java/com/genersoft/iot/vmp/media/zlm/dto/StreamProxyItem.java index 8f036028..f04e7566 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/StreamProxyDto.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/StreamProxyItem.java @@ -1,6 +1,9 @@ package com.genersoft.iot.vmp.media.zlm.dto; -public class StreamProxyDto { +import com.genersoft.iot.vmp.gb28181.bean.GbStream; + +public class StreamProxyItem extends GbStream { + private String type; private String app; private String stream; @@ -109,4 +112,6 @@ public class StreamProxyDto { public void setEnable_mp4(boolean enable_mp4) { this.enable_mp4 = enable_mp4; } + + } diff --git a/src/main/java/com/genersoft/iot/vmp/storager/IVideoManagerStorager.java b/src/main/java/com/genersoft/iot/vmp/storager/IVideoManagerStorager.java index 76568986..e8f32409 100644 --- a/src/main/java/com/genersoft/iot/vmp/storager/IVideoManagerStorager.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/IVideoManagerStorager.java @@ -2,12 +2,9 @@ package com.genersoft.iot.vmp.storager; import java.util.List; -import com.genersoft.iot.vmp.gb28181.bean.Device; -import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel; -import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform; -import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyDto; +import com.genersoft.iot.vmp.gb28181.bean.*; +import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem; import com.genersoft.iot.vmp.vmanager.platform.bean.ChannelReduce; -import com.genersoft.iot.vmp.gb28181.bean.MobilePosition; import com.github.pagehelper.PageInfo; /** @@ -238,7 +235,7 @@ public interface IVideoManagerStorager { /** * 添加Mobile Position设备移动位置 - * @param MobilePosition + * @param mobilePosition * @return */ public boolean insertMobilePosition(MobilePosition mobilePosition); @@ -268,14 +265,14 @@ public interface IVideoManagerStorager { * @param streamProxyDto * @return */ - public int addStreamProxy(StreamProxyDto streamProxyDto); + public boolean addStreamProxy(StreamProxyItem streamProxyDto); /** * 更新代理流 * @param streamProxyDto * @return */ - public int updateStreamProxy(StreamProxyDto streamProxyDto); + public boolean updateStreamProxy(StreamProxyItem streamProxyDto); /** * 移除代理流 @@ -290,7 +287,7 @@ public interface IVideoManagerStorager { * @param enable * @return */ - public List getStreamProxyListForEnable(boolean enable); + public List getStreamProxyListForEnable(boolean enable); /** * 按照是app和stream获取代理流 @@ -298,7 +295,7 @@ public interface IVideoManagerStorager { * @param stream * @return */ - public StreamProxyDto queryStreamProxy(String app, String stream); + public StreamProxyItem queryStreamProxy(String app, String stream); /** * 获取代理流 @@ -306,5 +303,20 @@ public interface IVideoManagerStorager { * @param count * @return */ - PageInfo queryStreamProxyList(Integer page, Integer count); + PageInfo queryStreamProxyList(Integer page, Integer count); + + /** + * 根据国标ID获取平台关联的直播流 + * @param platformId + * @param channelId + * @return + */ + GbStream queryStreamInParentPlatform(String platformId, String channelId); + + /** + * 获取平台关联的直播流 + * @param platformId + * @return + */ + List queryGbStreamListInPlatform(String platformId); } diff --git a/src/main/java/com/genersoft/iot/vmp/storager/dao/GbStreamMapper.java b/src/main/java/com/genersoft/iot/vmp/storager/dao/GbStreamMapper.java new file mode 100644 index 00000000..2b83a4a7 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/storager/dao/GbStreamMapper.java @@ -0,0 +1,49 @@ +package com.genersoft.iot.vmp.storager.dao; + +import com.genersoft.iot.vmp.gb28181.bean.GbStream; +import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem; +import org.apache.ibatis.annotations.*; +import org.springframework.stereotype.Repository; + +import java.util.List; + +@Mapper +@Repository +public interface GbStreamMapper { + + @Insert("INSERT INTO gb_stream (app, stream, gbId, name, " + + "longitude, latitude, streamType) VALUES" + + "('${app}', '${stream}', '${gbId}', '${name}', " + + "'${longitude}', '${latitude}', '${streamType}')") + int add(GbStream gbStream); + + @Update("UPDATE gb_stream " + + "SET app=#{app}," + + "stream=#{stream}," + + "gbId=#{gbId}," + + "name=#{name}," + + "streamType=#{streamType}," + + "longitude=#{longitude}, " + + "latitude=#{latitude}, " + + "WHERE app=#{app} AND stream=#{stream} AND gbId=#{gbId}") + int update(GbStream gbStream); + + @Delete("DELETE FROM gb_stream WHERE app=#{app} AND stream=#{stream}") + int del(String app, String stream); + + @Select("SELECT gs.*, pgs.platformId FROM gb_stream gs LEFT JOIN platform_gb_stream pgs ON gs.app = pgs.app AND gs.stream = pgs.stream") + List selectAll(); + + @Select("SELECT * FROM gb_stream WHERE app=#{app} AND stream=#{stream}") + StreamProxyItem selectOne(String app, String stream); + + @Select("SELECT gs.*, pgs.platformId FROM gb_stream gs " + + "LEFT JOIN platform_gb_stream pgs ON gs.app = pgs.app AND gs.stream = pgs.stream " + + "WHERE gs.gbId = '${gbId}' AND pgs.platformId = '${platformId}'") + GbStream queryStreamInPlatform(String platformId, String gbId); + + @Select("SELECT gs.*, pgs.platformId FROM gb_stream gs " + + "LEFT JOIN platform_gb_stream pgs ON gs.app = pgs.app AND gs.stream = pgs.stream " + + "WHERE pgs.platformId = '${platformId}'") + List queryGbStreamListInPlatform(String platformId); +} diff --git a/src/main/java/com/genersoft/iot/vmp/storager/dao/PlarfotmGbStreamMapper.java b/src/main/java/com/genersoft/iot/vmp/storager/dao/PlarfotmGbStreamMapper.java new file mode 100644 index 00000000..1172aebb --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/storager/dao/PlarfotmGbStreamMapper.java @@ -0,0 +1,28 @@ +package com.genersoft.iot.vmp.storager.dao; + +import com.genersoft.iot.vmp.gb28181.bean.GbStream; +import com.genersoft.iot.vmp.gb28181.bean.PlatformGbStream; +import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem; +import org.apache.ibatis.annotations.*; +import org.springframework.stereotype.Repository; + +import java.util.List; + + +@Mapper +@Repository +public interface PlarfotmGbStreamMapper { + + @Insert("INSERT INTO platform_gb_stream (app, stream, platformId) VALUES" + + "('${app}', '${stream}', '${platformId}')") + int add(PlatformGbStream platformGbStream); + + @Delete("DELETE FROM platform_gb_stream WHERE app=#{app} AND stream=#{stream}") + int delByAppAndStream(String app, String stream); + + @Delete("DELETE FROM platform_gb_stream WHERE app=#{app} AND stream=#{stream}") + int delByPlatformId(String platformId); + + @Select("SELECT * FROM platform_gb_stream WHERE app=#{app} AND stream=#{stream} AND platformId=#{platformId}") + StreamProxyItem selectOne(String app, String stream, String platformId); +} diff --git a/src/main/java/com/genersoft/iot/vmp/storager/dao/PatformChannelMapper.java b/src/main/java/com/genersoft/iot/vmp/storager/dao/PlatformChannelMapper.java similarity index 98% rename from src/main/java/com/genersoft/iot/vmp/storager/dao/PatformChannelMapper.java rename to src/main/java/com/genersoft/iot/vmp/storager/dao/PlatformChannelMapper.java index 50eb3e3d..a88f2664 100644 --- a/src/main/java/com/genersoft/iot/vmp/storager/dao/PatformChannelMapper.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/dao/PlatformChannelMapper.java @@ -13,7 +13,7 @@ import java.util.List; @Mapper @Repository -public interface PatformChannelMapper { +public interface PlatformChannelMapper { /** * 查询列表里已经关联的 diff --git a/src/main/java/com/genersoft/iot/vmp/storager/dao/StreamProxyMapper.java b/src/main/java/com/genersoft/iot/vmp/storager/dao/StreamProxyMapper.java index 414125b0..3f53c362 100644 --- a/src/main/java/com/genersoft/iot/vmp/storager/dao/StreamProxyMapper.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/dao/StreamProxyMapper.java @@ -1,6 +1,6 @@ package com.genersoft.iot.vmp.storager.dao; -import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyDto; +import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem; import org.apache.ibatis.annotations.*; import org.springframework.stereotype.Repository; @@ -14,7 +14,7 @@ public interface StreamProxyMapper { "timeout_ms, ffmpeg_cmd_key, rtp_type, enable_hls, enable_mp4, enable) VALUES" + "('${type}','${app}', '${stream}', '${url}', '${src_url}', '${dst_url}', " + "'${timeout_ms}', '${ffmpeg_cmd_key}', '${rtp_type}', ${enable_hls}, ${enable_mp4}, ${enable} )") - int add(StreamProxyDto streamProxyDto); + int add(StreamProxyItem streamProxyDto); @Update("UPDATE stream_proxy " + "SET type=#{type}, " + @@ -30,17 +30,17 @@ public interface StreamProxyMapper { "enable=#{enable}, " + "enable_mp4=#{enable_mp4} " + "WHERE app=#{app} AND stream=#{stream}") - int update(StreamProxyDto streamProxyDto); + int update(StreamProxyItem streamProxyDto); @Delete("DELETE FROM stream_proxy WHERE app=#{app} AND stream=#{stream}") int del(String app, String stream); - @Select("SELECT * FROM stream_proxy") - List selectAll(); + @Select("SELECT st.*, pgs.gbId, pgs.name, pgs.longitude, pgs.latitude FROM stream_proxy st LEFT JOIN gb_stream pgs on st.app = pgs.app AND st.stream = pgs.stream") + List selectAll(); - @Select("SELECT * FROM stream_proxy WHERE enable=${enable}") - List selectForEnable(boolean enable); + @Select("SELECT st.*, pgs.gbId, pgs.name, pgs.longitude, pgs.latitude FROM stream_proxy st LEFT JOIN gb_stream pgs on st.app = pgs.app AND st.stream = pgs.stream WHERE st.enable=${enable}") + List selectForEnable(boolean enable); - @Select("SELECT * FROM stream_proxy WHERE app=#{app} AND stream=#{stream}") - StreamProxyDto selectOne(String app, String stream); + @Select("SELECT st.*, pgs.gbId, pgs.name, pgs.longitude, pgs.latitude FROM stream_proxy st LEFT JOIN gb_stream pgs on st.app = pgs.app AND st.stream = pgs.stream WHERE st.app=#{app} AND st.stream=#{stream}") + StreamProxyItem selectOne(String app, String stream); } diff --git a/src/main/java/com/genersoft/iot/vmp/storager/impl/VideoManagerStoragerImpl.java b/src/main/java/com/genersoft/iot/vmp/storager/impl/VideoManagerStoragerImpl.java index 071bf68f..b2756e27 100644 --- a/src/main/java/com/genersoft/iot/vmp/storager/impl/VideoManagerStoragerImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/impl/VideoManagerStoragerImpl.java @@ -2,21 +2,20 @@ package com.genersoft.iot.vmp.storager.impl; import java.util.*; -import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel; -import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform; -import com.genersoft.iot.vmp.gb28181.bean.ParentPlatformCatch; -import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyDto; +import com.genersoft.iot.vmp.gb28181.bean.*; +import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; -import com.genersoft.iot.vmp.gb28181.bean.MobilePosition; import com.genersoft.iot.vmp.storager.dao.*; import com.genersoft.iot.vmp.vmanager.platform.bean.ChannelReduce; import com.github.pagehelper.PageHelper; import com.github.pagehelper.PageInfo; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.jdbc.datasource.DataSourceTransactionManager; import org.springframework.stereotype.Component; -import com.genersoft.iot.vmp.gb28181.bean.Device; import com.genersoft.iot.vmp.storager.IVideoManagerStorager; +import org.springframework.transaction.TransactionDefinition; +import org.springframework.transaction.TransactionStatus; import org.springframework.transaction.annotation.Transactional; /** @@ -27,6 +26,11 @@ import org.springframework.transaction.annotation.Transactional; @SuppressWarnings("rawtypes") @Component public class VideoManagerStoragerImpl implements IVideoManagerStorager { + @Autowired + DataSourceTransactionManager dataSourceTransactionManager; + + @Autowired + TransactionDefinition transactionDefinition; @Autowired private DeviceMapper deviceMapper; @@ -44,12 +48,13 @@ public class VideoManagerStoragerImpl implements IVideoManagerStorager { private IRedisCatchStorage redisCatchStorage; @Autowired - private PatformChannelMapper patformChannelMapper; + private PlatformChannelMapper platformChannelMapper; @Autowired private StreamProxyMapper streamProxyMapper; - + @Autowired + private GbStreamMapper gbStreamMapper; /** @@ -283,7 +288,7 @@ public class VideoManagerStoragerImpl implements IVideoManagerStorager { public boolean deleteParentPlatform(ParentPlatform parentPlatform) { int result = platformMapper.delParentPlatform(parentPlatform); // 删除关联的通道 - patformChannelMapper.cleanChannelForGB(parentPlatform.getServerGBId()); + platformChannelMapper.cleanChannelForGB(parentPlatform.getServerGBId()); return result > 0; } @@ -333,7 +338,7 @@ public class VideoManagerStoragerImpl implements IVideoManagerStorager { } List deviceAndChannelList = new ArrayList<>(deviceAndChannels.keySet()); // 查询当前已经存在的 - List relatedPlatformchannels = patformChannelMapper.findChannelRelatedPlatform(platformId, deviceAndChannelList); + List relatedPlatformchannels = platformChannelMapper.findChannelRelatedPlatform(platformId, deviceAndChannelList); if (relatedPlatformchannels != null) { deviceAndChannelList.removeAll(relatedPlatformchannels); } @@ -344,7 +349,7 @@ public class VideoManagerStoragerImpl implements IVideoManagerStorager { // 对剩下的数据进行存储 int result = 0; if (channelReducesToAdd.size() > 0) { - result = patformChannelMapper.addChannels(platformId, channelReducesToAdd); + result = platformChannelMapper.addChannels(platformId, channelReducesToAdd); } return result; @@ -354,20 +359,20 @@ public class VideoManagerStoragerImpl implements IVideoManagerStorager { @Override public int delChannelForGB(String platformId, List channelReduces) { - int result = patformChannelMapper.delChannelForGB(platformId, channelReduces); + int result = platformChannelMapper.delChannelForGB(platformId, channelReduces); return result; } @Override public DeviceChannel queryChannelInParentPlatform(String platformId, String channelId) { - DeviceChannel channel = patformChannelMapper.queryChannelInParentPlatform(platformId, channelId); + DeviceChannel channel = platformChannelMapper.queryChannelInParentPlatform(platformId, channelId); return channel; } @Override public Device queryVideoDeviceByPlatformIdAndChannelId(String platformId, String channelId) { - Device device = patformChannelMapper.queryVideoDeviceByPlatformIdAndChannelId(platformId, channelId); + Device device = platformChannelMapper.queryVideoDeviceByPlatformIdAndChannelId(platformId, channelId); return device; } @@ -390,27 +395,54 @@ public class VideoManagerStoragerImpl implements IVideoManagerStorager { /** * 新增代理流 - * @param streamProxyDto + * @param streamProxyItem * @return */ @Override - public int addStreamProxy(StreamProxyDto streamProxyDto) { - return streamProxyMapper.add(streamProxyDto); + public boolean addStreamProxy(StreamProxyItem streamProxyItem) { + TransactionStatus transactionStatus = dataSourceTransactionManager.getTransaction(transactionDefinition); + boolean result = false; + streamProxyItem.setStreamType("proxy"); + try { + if (gbStreamMapper.add(streamProxyItem)<0 || streamProxyMapper.add(streamProxyItem) < 0) { + //事务回滚 + dataSourceTransactionManager.rollback(transactionStatus); + } + result = true; + dataSourceTransactionManager.commit(transactionStatus); //手动提交 + }catch (Exception e) { + dataSourceTransactionManager.rollback(transactionStatus); + } + return result; } /** * 更新代理流 - * @param streamProxyDto + * @param streamProxyItem * @return */ @Override - public int updateStreamProxy(StreamProxyDto streamProxyDto) { - return streamProxyMapper.update(streamProxyDto); + public boolean updateStreamProxy(StreamProxyItem streamProxyItem) { + TransactionStatus transactionStatus = dataSourceTransactionManager.getTransaction(transactionDefinition); + boolean result = false; + streamProxyItem.setStreamType("proxy"); + try { + if (gbStreamMapper.update(streamProxyItem)<0 || streamProxyMapper.update(streamProxyItem) < 0) { + //事务回滚 + dataSourceTransactionManager.rollback(transactionStatus); + } + dataSourceTransactionManager.commit(transactionStatus); //手动提交 + result = true; + }catch (Exception e) { + dataSourceTransactionManager.rollback(transactionStatus); + } + return result; } /** * 移除代理流 - * @param id + * @param app + * @param stream * @return */ @Override @@ -424,7 +456,7 @@ public class VideoManagerStoragerImpl implements IVideoManagerStorager { * @return */ @Override - public List getStreamProxyListForEnable(boolean enable) { + public List getStreamProxyListForEnable(boolean enable) { return streamProxyMapper.selectForEnable(enable); } @@ -435,12 +467,32 @@ public class VideoManagerStoragerImpl implements IVideoManagerStorager { * @return */ @Override - public PageInfo queryStreamProxyList(Integer page, Integer count) { + public PageInfo queryStreamProxyList(Integer page, Integer count) { PageHelper.startPage(page, count); - List all = streamProxyMapper.selectAll(); + List all = streamProxyMapper.selectAll(); return new PageInfo<>(all); } + /** + * 根据国标ID获取平台关联的直播流 + * @param platformId + * @param gbId + * @return + */ + @Override + public GbStream queryStreamInParentPlatform(String platformId, String gbId) { + return gbStreamMapper.queryStreamInPlatform(platformId, gbId); + } + + /** + * 获取平台关联的直播流 + * @param platformId + * @return + */ + @Override + public List queryGbStreamListInPlatform(String platformId) { + return gbStreamMapper.queryGbStreamListInPlatform(platformId); + } /** * 按照是app和stream获取代理流 @@ -449,7 +501,7 @@ public class VideoManagerStoragerImpl implements IVideoManagerStorager { * @return */ @Override - public StreamProxyDto queryStreamProxy(String app, String stream){ + public StreamProxyItem queryStreamProxy(String app, String stream){ return streamProxyMapper.selectOne(app, stream); } } diff --git a/src/main/java/com/genersoft/iot/vmp/vmanager/gbStream/GbStreamController.java b/src/main/java/com/genersoft/iot/vmp/vmanager/gbStream/GbStreamController.java new file mode 100644 index 00000000..d6afb09e --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/vmanager/gbStream/GbStreamController.java @@ -0,0 +1,65 @@ +package com.genersoft.iot.vmp.vmanager.gbStream; + +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONArray; +import com.genersoft.iot.vmp.gb28181.bean.GbStream; +import com.genersoft.iot.vmp.storager.IVideoManagerStorager; +import com.genersoft.iot.vmp.vmanager.gbStream.bean.GbStreamParam; +import com.genersoft.iot.vmp.vmanager.platform.bean.UpdateChannelParam; +import com.genersoft.iot.vmp.vmanager.service.IGbStreamService; +import com.github.pagehelper.PageInfo; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.web.bind.annotation.*; + +import java.util.List; + +@CrossOrigin +@RestController +@RequestMapping("/api/gbStream") +public class GbStreamController { + + private final static Logger logger = LoggerFactory.getLogger(GbStreamController.class); + + @Autowired + private IGbStreamService gbStreamService; + + @Autowired + private IVideoManagerStorager storager; + + + @RequestMapping(value = "/list") + @ResponseBody + public PageInfo list(@RequestParam(required = false)Integer page, + @RequestParam(required = false)Integer count){ + + return gbStreamService.getAll(page, count); + } + + + @RequestMapping(value = "/del") + @ResponseBody + public Object del(@RequestBody GbStreamParam gbStreamParam){ + System.out.println(2222); + System.out.println(gbStreamParam.getGbStreams().size()); + if (gbStreamService.delPlatformInfo(gbStreamParam.getGbStreams())) { + return "success"; + }else { + return "fail"; + } + + } + + @RequestMapping(value = "/add") + @ResponseBody + public Object add(@RequestBody GbStreamParam gbStreamParam){ + System.out.println(3333); + System.out.println(gbStreamParam.getGbStreams().size()); + if (gbStreamService.addPlatformInfo(gbStreamParam.getGbStreams(), gbStreamParam.getPlatformId())) { + return "success"; + }else { + return "fail"; + } + } +} diff --git a/src/main/java/com/genersoft/iot/vmp/vmanager/gbStream/bean/GbStreamParam.java b/src/main/java/com/genersoft/iot/vmp/vmanager/gbStream/bean/GbStreamParam.java new file mode 100644 index 00000000..2181cc8b --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/vmanager/gbStream/bean/GbStreamParam.java @@ -0,0 +1,28 @@ +package com.genersoft.iot.vmp.vmanager.gbStream.bean; + +import com.genersoft.iot.vmp.gb28181.bean.GbStream; + +import java.util.List; + +public class GbStreamParam { + + private String platformId; + + private List gbStreams; + + public String getPlatformId() { + return platformId; + } + + public void setPlatformId(String platformId) { + this.platformId = platformId; + } + + public List getGbStreams() { + return gbStreams; + } + + public void setGbStreams(List gbStreams) { + this.gbStreams = gbStreams; + } +} diff --git a/src/main/java/com/genersoft/iot/vmp/vmanager/media/MediaController.java b/src/main/java/com/genersoft/iot/vmp/vmanager/media/MediaController.java index 115e8408..0710f9b0 100644 --- a/src/main/java/com/genersoft/iot/vmp/vmanager/media/MediaController.java +++ b/src/main/java/com/genersoft/iot/vmp/vmanager/media/MediaController.java @@ -46,7 +46,4 @@ public class MediaController { return mediaService.getStreamInfoByAppAndStreamWithCheck(app, stream); } - - - } diff --git a/src/main/java/com/genersoft/iot/vmp/vmanager/platformGbStream/PlatformGbStreamController.java b/src/main/java/com/genersoft/iot/vmp/vmanager/platformGbStream/PlatformGbStreamController.java new file mode 100644 index 00000000..f2db3403 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/vmanager/platformGbStream/PlatformGbStreamController.java @@ -0,0 +1,35 @@ +package com.genersoft.iot.vmp.vmanager.platformGbStream; + +import com.genersoft.iot.vmp.gb28181.bean.GbStream; +import com.genersoft.iot.vmp.gb28181.bean.PlatformGbStream; +import com.genersoft.iot.vmp.storager.IVideoManagerStorager; +import com.genersoft.iot.vmp.vmanager.service.IGbStreamService; +import com.github.pagehelper.PageInfo; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.web.bind.annotation.*; + +@CrossOrigin +@RestController +@RequestMapping("/api") +public class PlatformGbStreamController { + + private final static Logger logger = LoggerFactory.getLogger(PlatformGbStreamController.class); + + @Autowired + private IGbStreamService gbStreamService; + + @Autowired + private IVideoManagerStorager storager; + + @RequestMapping(value = "/list") + @ResponseBody + public PageInfo list(@RequestParam(required = false)Integer page, + @RequestParam(required = false)Integer count){ + + return gbStreamService.getAll(page, count); + } + + +} diff --git a/src/main/java/com/genersoft/iot/vmp/vmanager/play/PlayController.java b/src/main/java/com/genersoft/iot/vmp/vmanager/play/PlayController.java index 67045b44..c4bee6d1 100644 --- a/src/main/java/com/genersoft/iot/vmp/vmanager/play/PlayController.java +++ b/src/main/java/com/genersoft/iot/vmp/vmanager/play/PlayController.java @@ -163,20 +163,7 @@ public class PlayController { JSONObject data = jsonObject.getJSONObject("data"); if (data != null) { result.put("key", data.getString("key")); -// StreamInfo streamInfoResult = new StreamInfo(); -// streamInfoResult.setRtmp(dstUrl); -// streamInfoResult.setRtsp(String.format("rtsp://%s:%s/convert/%s", mediaInfo.getWanIp(), mediaInfo.getRtspPort(), streamId)); -// streamInfoResult.setStreamId(streamId); -// streamInfoResult.setFlv(String.format("http://%s:%s/convert/%s.flv", mediaInfo.getWanIp(), mediaInfo.getHttpPort(), streamId)); -// streamInfoResult.setWs_flv(String.format("ws://%s:%s/convert/%s.flv", mediaInfo.getWanIp(), mediaInfo.getHttpPort(), streamId)); -// streamInfoResult.setHls(String.format("http://%s:%s/convert/%s/hls.m3u8", mediaInfo.getWanIp(), mediaInfo.getHttpPort(), streamId)); -// streamInfoResult.setWs_hls(String.format("ws://%s:%s/convert/%s/hls.m3u8", mediaInfo.getWanIp(), mediaInfo.getHttpPort(), streamId)); -// streamInfoResult.setFmp4(String.format("http://%s:%s/convert/%s.live.mp4", mediaInfo.getWanIp(), mediaInfo.getHttpPort(), streamId)); -// streamInfoResult.setWs_fmp4(String.format("ws://%s:%s/convert/%s.live.mp4", mediaInfo.getWanIp(), mediaInfo.getHttpPort(), streamId)); -// streamInfoResult.setTs(String.format("http://%s:%s/convert/%s.live.ts", mediaInfo.getWanIp(), mediaInfo.getHttpPort(), streamId)); -// streamInfoResult.setWs_ts(String.format("ws://%s:%s/convert/%s.live.ts", mediaInfo.getWanIp(), mediaInfo.getHttpPort(), streamId)); StreamInfo streamInfoResult = mediaService.getStreamInfoByAppAndStream("convert", streamId); - streamInfoResult.setStreamId(streamId); result.put("data", streamInfoResult); } }else { diff --git a/src/main/java/com/genersoft/iot/vmp/vmanager/service/IGbStreamService.java b/src/main/java/com/genersoft/iot/vmp/vmanager/service/IGbStreamService.java new file mode 100644 index 00000000..877ec2a6 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/vmanager/service/IGbStreamService.java @@ -0,0 +1,41 @@ +package com.genersoft.iot.vmp.vmanager.service; + +import com.genersoft.iot.vmp.gb28181.bean.GbStream; +import com.genersoft.iot.vmp.gb28181.bean.PlatformGbStream; +import com.github.pagehelper.PageInfo; + +import java.util.List; + +/** + * 级联国标平台关联流业务接口 + */ +public interface IGbStreamService { + + /** + * 分页获取所有 + * @param page + * @param count + * @return + */ + PageInfo getAll(Integer page, Integer count); + + + /** + * 移除 + * @param app + * @param stream + */ + void del(String app, String stream); + + /** + * 保存国标关联 + * @param gbStreams + */ + boolean addPlatformInfo(List gbStreams, String platformId); + + /** + * 移除国标关联 + * @param gbStreams + */ + boolean delPlatformInfo(List gbStreams); +} diff --git a/src/main/java/com/genersoft/iot/vmp/vmanager/service/IStreamProxyService.java b/src/main/java/com/genersoft/iot/vmp/vmanager/service/IStreamProxyService.java index 4a93cd2a..49d1e8aa 100644 --- a/src/main/java/com/genersoft/iot/vmp/vmanager/service/IStreamProxyService.java +++ b/src/main/java/com/genersoft/iot/vmp/vmanager/service/IStreamProxyService.java @@ -1,8 +1,7 @@ package com.genersoft.iot.vmp.vmanager.service; import com.alibaba.fastjson.JSONObject; -import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyDto; -import com.genersoft.iot.vmp.vmanager.streamProxy.StreamProxyController; +import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem; import com.github.pagehelper.PageInfo; public interface IStreamProxyService { @@ -11,21 +10,21 @@ public interface IStreamProxyService { * 保存视频代理 * @param param */ - void save(StreamProxyDto param); + void save(StreamProxyItem param); /** * 添加视频代理到zlm * @param param * @return */ - JSONObject addStreamProxyToZlm(StreamProxyDto param); + JSONObject addStreamProxyToZlm(StreamProxyItem param); /** * 从zlm移除视频代理 * @param param * @return */ - JSONObject removeStreamProxyFromZlm(StreamProxyDto param); + JSONObject removeStreamProxyFromZlm(StreamProxyItem param); /** * 分页查询 @@ -33,7 +32,7 @@ public interface IStreamProxyService { * @param count * @return */ - PageInfo getAll(Integer page, Integer count); + PageInfo getAll(Integer page, Integer count); /** * 删除视频代理 diff --git a/src/main/java/com/genersoft/iot/vmp/vmanager/service/impl/GbStreamServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/vmanager/service/impl/GbStreamServiceImpl.java new file mode 100644 index 00000000..b8caacd7 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/vmanager/service/impl/GbStreamServiceImpl.java @@ -0,0 +1,89 @@ +package com.genersoft.iot.vmp.vmanager.service.impl; + +import com.genersoft.iot.vmp.gb28181.bean.GbStream; +import com.genersoft.iot.vmp.gb28181.bean.PlatformGbStream; +import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem; +import com.genersoft.iot.vmp.storager.dao.GbStreamMapper; +import com.genersoft.iot.vmp.storager.dao.PlarfotmGbStreamMapper; +import com.genersoft.iot.vmp.vmanager.platform.PlatformController; +import com.genersoft.iot.vmp.vmanager.service.IGbStreamService; +import com.github.pagehelper.PageHelper; +import com.github.pagehelper.PageInfo; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.jdbc.datasource.DataSourceTransactionManager; +import org.springframework.stereotype.Service; +import org.springframework.transaction.TransactionDefinition; +import org.springframework.transaction.TransactionStatus; + +import java.util.List; + +@Service +public class GbStreamServiceImpl implements IGbStreamService { + + private final static Logger logger = LoggerFactory.getLogger(GbStreamServiceImpl.class); + + @Autowired + DataSourceTransactionManager dataSourceTransactionManager; + + @Autowired + TransactionDefinition transactionDefinition; + + @Autowired + private GbStreamMapper gbStreamMapper; + + @Autowired + private PlarfotmGbStreamMapper plarfotmGbStreamMapper; + + @Override + public PageInfo getAll(Integer page, Integer count) { + PageHelper.startPage(page, count); + List all = gbStreamMapper.selectAll(); + return new PageInfo<>(all); + } + + @Override + public void del(String app, String stream) { + gbStreamMapper.del(app, stream); + } + + + @Override + public boolean addPlatformInfo(List gbStreams, String platformId) { + // 放在事务内执行 + boolean result = false; + TransactionStatus transactionStatus = dataSourceTransactionManager.getTransaction(transactionDefinition); + try { + for (GbStream gbStream : gbStreams) { + gbStream.setPlatformId(platformId); + plarfotmGbStreamMapper.add(gbStream); + } + dataSourceTransactionManager.commit(transactionStatus); //手动提交 + result = true; + }catch (Exception e) { + logger.error("批量保存流与平台的关系时错误", e); + dataSourceTransactionManager.rollback(transactionStatus); + } + return result; + + } + + @Override + public boolean delPlatformInfo(List gbStreams) { + // 放在事务内执行 + boolean result = false; + TransactionStatus transactionStatus = dataSourceTransactionManager.getTransaction(transactionDefinition); + try { + for (GbStream gbStream : gbStreams) { + plarfotmGbStreamMapper.delByAppAndStream(gbStream.getApp(), gbStream.getStream()); + } + dataSourceTransactionManager.commit(transactionStatus); //手动提交 + result = true; + }catch (Exception e) { + logger.error("批量移除流与平台的关系时错误", e); + dataSourceTransactionManager.rollback(transactionStatus); + } + return result; + } +} diff --git a/src/main/java/com/genersoft/iot/vmp/vmanager/service/impl/MediaServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/vmanager/service/impl/MediaServiceImpl.java index afd6cc96..68a1c440 100644 --- a/src/main/java/com/genersoft/iot/vmp/vmanager/service/impl/MediaServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/vmanager/service/impl/MediaServiceImpl.java @@ -27,6 +27,8 @@ public class MediaServiceImpl implements IMediaService { public StreamInfo getStreamInfoByAppAndStream(String app, String stream) { MediaServerConfig mediaInfo = redisCatchStorage.getMediaInfo(); StreamInfo streamInfoResult = new StreamInfo(); + streamInfoResult.setStreamId(stream); + streamInfoResult.setApp(app); streamInfoResult.setRtmp(String.format("rtmp://%s:%s/%s/%s", mediaInfo.getWanIp(), mediaInfo.getRtmpPort(), app, stream)); streamInfoResult.setRtsp(String.format("rtsp://%s:%s/%s/%s", mediaInfo.getWanIp(), mediaInfo.getRtspPort(), app, stream)); streamInfoResult.setFlv(String.format("http://%s:%s/%s/%s.flv", mediaInfo.getWanIp(), mediaInfo.getHttpPort(), app, stream)); diff --git a/src/main/java/com/genersoft/iot/vmp/vmanager/service/impl/PlayServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/vmanager/service/impl/PlayServiceImpl.java index edd8710c..694c74f2 100644 --- a/src/main/java/com/genersoft/iot/vmp/vmanager/service/impl/PlayServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/vmanager/service/impl/PlayServiceImpl.java @@ -153,27 +153,8 @@ public class PlayServiceImpl implements IPlayService { public StreamInfo onPublishHandler(JSONObject resonse, String deviceId, String channelId, String uuid) { String streamId = resonse.getString("id"); StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStream("rtp", streamId); -// StreamInfo streamInfo = new StreamInfo(); - streamInfo.setStreamId(streamId); streamInfo.setDeviceID(deviceId); streamInfo.setChannelId(channelId); -// MediaServerConfig mediaServerConfig = redisCatchStorage.getMediaInfo(); - -// streamInfo.setFlv(String.format("http://%s:%s/rtp/%s.flv", mediaServerConfig.getWanIp(), mediaServerConfig.getHttpPort(), streamId)); -// streamInfo.setWs_flv(String.format("ws://%s:%s/rtp/%s.flv", mediaServerConfig.getWanIp(), mediaServerConfig.getHttpPort(), streamId)); -// -// streamInfo.setFmp4(String.format("http://%s:%s/rtp/%s.live.mp4", mediaServerConfig.getWanIp(), mediaServerConfig.getHttpPort(), streamId)); -// streamInfo.setWs_fmp4(String.format("ws://%s:%s/rtp/%s.live.mp4", mediaServerConfig.getWanIp(), mediaServerConfig.getHttpPort(), streamId)); -// -// streamInfo.setHls(String.format("http://%s:%s/rtp/%s/hls.m3u8", mediaServerConfig.getWanIp(), mediaServerConfig.getHttpPort(), streamId)); -// streamInfo.setWs_hls(String.format("ws://%s:%s/rtp/%s/hls.m3u8", mediaServerConfig.getWanIp(), mediaServerConfig.getHttpPort(), streamId)); -// -// streamInfo.setTs(String.format("http://%s:%s/rtp/%s.live.ts", mediaServerConfig.getWanIp(), mediaServerConfig.getHttpPort(), streamId)); -// streamInfo.setWs_ts(String.format("ws://%s:%s/rtp/%s.live.ts", mediaServerConfig.getWanIp(), mediaServerConfig.getHttpPort(), streamId)); -// -// streamInfo.setRtmp(String.format("rtmp://%s:%s/rtp/%s", mediaServerConfig.getWanIp(), mediaServerConfig.getRtmpPort(), streamId)); -// streamInfo.setRtsp(String.format("rtsp://%s:%s/rtp/%s", mediaServerConfig.getWanIp(), mediaServerConfig.getRtspPort(), streamId)); - return streamInfo; } diff --git a/src/main/java/com/genersoft/iot/vmp/vmanager/service/impl/StreamProxyServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/vmanager/service/impl/StreamProxyServiceImpl.java index 06559800..ec94844c 100644 --- a/src/main/java/com/genersoft/iot/vmp/vmanager/service/impl/StreamProxyServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/vmanager/service/impl/StreamProxyServiceImpl.java @@ -2,21 +2,18 @@ package com.genersoft.iot.vmp.vmanager.service.impl; import com.alibaba.fastjson.JSONObject; import com.genersoft.iot.vmp.conf.MediaServerConfig; -import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform; import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils; -import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyDto; +import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.IVideoManagerStorager; +import com.genersoft.iot.vmp.storager.dao.GbStreamMapper; +import com.genersoft.iot.vmp.storager.dao.PlarfotmGbStreamMapper; import com.genersoft.iot.vmp.storager.dao.StreamProxyMapper; import com.genersoft.iot.vmp.vmanager.service.IStreamProxyService; -import com.genersoft.iot.vmp.vmanager.streamProxy.StreamProxyController; -import com.github.pagehelper.PageHelper; import com.github.pagehelper.PageInfo; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; -import java.util.List; - /** * 视频代理业务 */ @@ -35,29 +32,35 @@ public class StreamProxyServiceImpl implements IStreamProxyService { @Autowired private StreamProxyMapper streamProxyMapper; + @Autowired + private GbStreamMapper gbStreamMapper; + + @Autowired + private PlarfotmGbStreamMapper plarfotmGbStreamMapper; + @Override - public void save(StreamProxyDto param) { + public void save(StreamProxyItem param) { MediaServerConfig mediaInfo = redisCatchStorage.getMediaInfo(); String dstUrl = String.format("rtmp://%s:%s/%s/%s", "127.0.0.1", mediaInfo.getRtmpPort(), param.getApp(), param.getStream() ); param.setDst_url(dstUrl); // 更新 if (videoManagerStorager.queryStreamProxy(param.getApp(), param.getStream()) != null) { - int result = videoManagerStorager.updateStreamProxy(param); - if (result > 0 && param.isEnable()) { + boolean result = videoManagerStorager.updateStreamProxy(param); + if (result && param.isEnable()) { addStreamProxyToZlm(param); } }else { // 新增 - int result = videoManagerStorager.addStreamProxy(param); - if (result > 0 && param.isEnable()) { + boolean result = videoManagerStorager.addStreamProxy(param); + if (result && param.isEnable()) { addStreamProxyToZlm(param); } } } @Override - public JSONObject addStreamProxyToZlm(StreamProxyDto param) { + public JSONObject addStreamProxyToZlm(StreamProxyItem param) { JSONObject result = null; if ("default".equals(param.getType())){ result = zlmresTfulUtils.addStreamProxy(param.getApp(), param.getStream(), param.getUrl(), @@ -70,37 +73,42 @@ public class StreamProxyServiceImpl implements IStreamProxyService { } @Override - public JSONObject removeStreamProxyFromZlm(StreamProxyDto param) { + public JSONObject removeStreamProxyFromZlm(StreamProxyItem param) { JSONObject result = zlmresTfulUtils.closeStreams(param.getApp(), param.getStream()); + return result; } @Override - public PageInfo getAll(Integer page, Integer count) { + public PageInfo getAll(Integer page, Integer count) { return videoManagerStorager.queryStreamProxyList(page, count); } @Override public void del(String app, String stream) { - StreamProxyDto streamProxyDto = new StreamProxyDto(); - streamProxyDto.setApp(app); - streamProxyDto.setStream(stream); - JSONObject jsonObject = removeStreamProxyFromZlm(streamProxyDto); + StreamProxyItem streamProxyItem = new StreamProxyItem(); + streamProxyItem.setApp(app); + streamProxyItem.setStream(stream); + JSONObject jsonObject = removeStreamProxyFromZlm(streamProxyItem); if (jsonObject.getInteger("code") == 0) { videoManagerStorager.deleteStreamProxy(app, stream); + // 如果关联了国标那么移除关联 + gbStreamMapper.del(app, stream); + plarfotmGbStreamMapper.delByAppAndStream(app, stream); + // TODO 如果关联的推流, 那么状态设置为离线 } } @Override public boolean start(String app, String stream) { boolean result = false; - StreamProxyDto streamProxyDto = videoManagerStorager.queryStreamProxy(app, stream); - if (!streamProxyDto.isEnable() && streamProxyDto != null) { - JSONObject jsonObject = addStreamProxyToZlm(streamProxyDto); + StreamProxyItem streamProxy = videoManagerStorager.queryStreamProxy(app, stream); + if (!streamProxy.isEnable() && streamProxy != null) { + JSONObject jsonObject = addStreamProxyToZlm(streamProxy); if (jsonObject.getInteger("code") == 0) { result = true; - streamProxyDto.setEnable(true); - videoManagerStorager.updateStreamProxy(streamProxyDto); + streamProxy.setEnable(true); + videoManagerStorager.updateStreamProxy(streamProxy); } } return result; @@ -109,7 +117,7 @@ public class StreamProxyServiceImpl implements IStreamProxyService { @Override public boolean stop(String app, String stream) { boolean result = false; - StreamProxyDto streamProxyDto = videoManagerStorager.queryStreamProxy(app, stream); + StreamProxyItem streamProxyDto = videoManagerStorager.queryStreamProxy(app, stream); if (streamProxyDto.isEnable() && streamProxyDto != null) { JSONObject jsonObject = removeStreamProxyFromZlm(streamProxyDto); if (jsonObject.getInteger("code") == 0) { diff --git a/src/main/java/com/genersoft/iot/vmp/vmanager/streamProxy/StreamProxyController.java b/src/main/java/com/genersoft/iot/vmp/vmanager/streamProxy/StreamProxyController.java index 09d4174c..95b83ef1 100644 --- a/src/main/java/com/genersoft/iot/vmp/vmanager/streamProxy/StreamProxyController.java +++ b/src/main/java/com/genersoft/iot/vmp/vmanager/streamProxy/StreamProxyController.java @@ -1,8 +1,7 @@ package com.genersoft.iot.vmp.vmanager.streamProxy; import com.alibaba.fastjson.JSONObject; -import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform; -import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyDto; +import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.vmanager.service.IStreamProxyService; import com.github.pagehelper.PageInfo; @@ -31,17 +30,17 @@ public class StreamProxyController { @RequestMapping(value = "/list") @ResponseBody - public PageInfo list(@RequestParam(required = false)Integer page, - @RequestParam(required = false)Integer count, - @RequestParam(required = false)String q, - @RequestParam(required = false)Boolean online ){ + public PageInfo list(@RequestParam(required = false)Integer page, + @RequestParam(required = false)Integer count, + @RequestParam(required = false)String q, + @RequestParam(required = false)Boolean online ){ return streamProxyService.getAll(page, count); } @RequestMapping(value = "/save") @ResponseBody - public Object save(@RequestBody StreamProxyDto param){ + public Object save(@RequestBody StreamProxyItem param){ logger.info("添加代理: " + JSONObject.toJSONString(param)); streamProxyService.save(param); return "success"; diff --git a/src/main/resources/wvp.sqlite b/src/main/resources/wvp.sqlite index 9147e22da5c6845d1bbc7d05cc5420d723384039..315c6899503c2ec9d0cd7588f2e23221fc72aae5 100644 GIT binary patch delta 546 zcmZoTz|zpbIzd|S2?GOz2oMVZF%ts=WU^=AU&j~AXTZIO zyPm6(%a9|SL!7OKO^>;sIg~Y$Rh6lg$)1UYi3O+~23Wo@?wvTXLkOtz1F#~~S=lUS0LUz8i4o)lkPQk0sQ8*f;EAtu!5)8x)B zZfMBZNN=1od$;@+x2=SfUn$0Ehb*Ob_#IQcA#3M234Pb@Z!AP*Z(oT$rg*yO|}?kW#b zSdZj>I8&rCu*s2KTwb2BsSxaJxD1>j>>T9k7~-lB;^^e#s-OfH2RfLG6YMh15Z8zh z9UuqlKsY}=3Fvwlk6S4(F*g;$oji$GpDU!Y0LJBGlV#HZ8ts{~S&Qx3&P56Wivk2T XD{NpFU}Xe)g@3c)hA;dOQ3weD^_!e^ delta 187 zcmZo@U~M?SGC^AKDgy(95D*IjF$)6&quWFsWBscPdiFPYfg-#t{S5r;_+t4Cxc6|^ zb5(K~a)fh;v(>Qav7BV-XE9_kWd6XsZnL1kWJVSiE5_c53td@Q%oy7yF7#nxDQ7GP z%9JrO&X~9`iru_1giYLAb+R0X#N-lYDOQ1|#Kz#slbKZ*`8V%iwqe|?$98GwW`zsS f +
+ + + + + +
+ 直播级联列表 +
+
+ 添加代理 +
+ + + + + + + + + + + + + + + + + + + + + + + +
+
+
+ + + + + diff --git a/web_src/src/components/ParentPlatformList.vue b/web_src/src/components/ParentPlatformList.vue index 32c999d5..1121f6e0 100644 --- a/web_src/src/components/ParentPlatformList.vue +++ b/web_src/src/components/ParentPlatformList.vue @@ -19,7 +19,7 @@ diff --git a/web_src/src/components/PushVideoList.vue b/web_src/src/components/PushVideoList.vue index 4a93d7a8..bbfc5f8b 100644 --- a/web_src/src/components/PushVideoList.vue +++ b/web_src/src/components/PushVideoList.vue @@ -19,10 +19,14 @@ - + + -