临时提交

结构优化
648540858 2023-12-12 17:57:19 +08:00
parent e0686dd426
commit dc841a9dcf
24 changed files with 714 additions and 589 deletions

View File

@ -57,20 +57,20 @@ public class GB28181ResourceServiceImpl implements IResourceService {
assert callback != null; assert callback != null;
if (!CommonGbChannelType.GB28181.equals(commonGbChannel.getType())) { if (!CommonGbChannelType.GB28181.equals(commonGbChannel.getType())) {
logger.warn("[资源类-国标28181] 收到播放通道: {} 时发现类型不为28181", commonGbChannel.getCommonGbId()); logger.warn("[资源类-国标28181] 收到播放通道: {} 时发现类型不为28181", commonGbChannel.getCommonGbId());
callback.call(commonGbChannel, ErrorCode.ERROR500.getCode(), ErrorCode.ERROR500.getMsg(), null); callback.call(commonGbChannel, null, ErrorCode.ERROR500.getCode(), ErrorCode.ERROR500.getMsg(), null);
return; return;
} }
DeviceChannel channel = deviceChannelMapper.getChannelByCommonChannelId(commonGbChannel.getCommonGbId()); DeviceChannel channel = deviceChannelMapper.getChannelByCommonChannelId(commonGbChannel.getCommonGbId());
if (channel == null) { if (channel == null) {
logger.warn("[资源类-国标28181] 收到播放通道: {} 时未找到国标通道", commonGbChannel.getCommonGbId()); logger.warn("[资源类-国标28181] 收到播放通道: {} 时未找到国标通道", commonGbChannel.getCommonGbId());
callback.call(commonGbChannel, ErrorCode.ERROR500.getCode(), ErrorCode.ERROR500.getMsg(), null); callback.call(commonGbChannel, null, ErrorCode.ERROR500.getCode(), ErrorCode.ERROR500.getMsg(), null);
return; return;
} }
Device device = deviceMapper.getDeviceByDeviceId(channel.getDeviceId()); Device device = deviceMapper.getDeviceByDeviceId(channel.getDeviceId());
if (device == null) { if (device == null) {
logger.warn("[资源类-国标28181] 收到播放通道: {} 时未找到通道 {} 所属的国标设备", logger.warn("[资源类-国标28181] 收到播放通道: {} 时未找到通道 {} 所属的国标设备",
commonGbChannel.getCommonGbId(), channel.getDeviceId()); commonGbChannel.getCommonGbId(), channel.getDeviceId());
callback.call(commonGbChannel, ErrorCode.ERROR500.getCode(), ErrorCode.ERROR500.getMsg(), null); callback.call(commonGbChannel, null, ErrorCode.ERROR500.getCode(), ErrorCode.ERROR500.getMsg(), null);
return; return;
} }
MediaServerItem mediaServerItem = playService.getNewMediaServerItem(device); MediaServerItem mediaServerItem = playService.getNewMediaServerItem(device);
@ -78,10 +78,10 @@ public class GB28181ResourceServiceImpl implements IResourceService {
if (code == InviteErrorCode.SUCCESS.getCode()) { if (code == InviteErrorCode.SUCCESS.getCode()) {
if (data != null) { if (data != null) {
StreamInfo streamInfo = (StreamInfo)data; StreamInfo streamInfo = (StreamInfo)data;
callback.call(commonGbChannel, ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), streamInfo); callback.call(commonGbChannel, mediaServerItem, ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), streamInfo);
} }
}else { }else {
callback.call(commonGbChannel, code, msg, null); callback.call(commonGbChannel, null, code, msg, null);
} }
}); });
} }
@ -91,7 +91,7 @@ public class GB28181ResourceServiceImpl implements IResourceService {
if (!CommonGbChannelType.GB28181.equals(commonGbChannel.getType())) { if (!CommonGbChannelType.GB28181.equals(commonGbChannel.getType())) {
logger.warn("[资源类-国标28181] 收到停止播放通道: {} 时发现类型不为28181", commonGbChannel.getCommonGbId()); logger.warn("[资源类-国标28181] 收到停止播放通道: {} 时发现类型不为28181", commonGbChannel.getCommonGbId());
if (callback != null) { if (callback != null) {
callback.call(commonGbChannel, ErrorCode.ERROR500.getCode(), ErrorCode.ERROR500.getMsg(), null); callback.call(commonGbChannel,null, ErrorCode.ERROR500.getCode(), ErrorCode.ERROR500.getMsg(), null);
} }
return; return;
} }
@ -99,7 +99,7 @@ public class GB28181ResourceServiceImpl implements IResourceService {
if (channel == null) { if (channel == null) {
logger.warn("[资源类-国标28181] 收到停止播放通道: {} 时未找到国标通道", commonGbChannel.getCommonGbId()); logger.warn("[资源类-国标28181] 收到停止播放通道: {} 时未找到国标通道", commonGbChannel.getCommonGbId());
if (callback != null) { if (callback != null) {
callback.call(commonGbChannel, ErrorCode.ERROR500.getCode(), ErrorCode.ERROR500.getMsg(), null); callback.call(commonGbChannel, null, ErrorCode.ERROR500.getCode(), ErrorCode.ERROR500.getMsg(), null);
} }
return; return;
} }
@ -107,7 +107,7 @@ public class GB28181ResourceServiceImpl implements IResourceService {
playService.stop(channel.getDeviceId(), channel.getChannelId()); playService.stop(channel.getDeviceId(), channel.getChannelId());
} catch (ControllerException exception) { } catch (ControllerException exception) {
if (callback != null) { if (callback != null) {
callback.call(commonGbChannel, exception.getCode(), exception.getMsg(), null); callback.call(commonGbChannel, null,exception.getCode(), exception.getMsg(), null);
} }
} }
} }
@ -119,4 +119,8 @@ public class GB28181ResourceServiceImpl implements IResourceService {
return false; return false;
} }
@Override
public void streamOffline(String app, String streamId) {
// TODO
}
} }

View File

@ -1,6 +1,14 @@
package com.genersoft.iot.vmp.gb28181.bean; package com.genersoft.iot.vmp.gb28181.bean;
import gov.nist.javax.sdp.TimeDescriptionImpl;
import gov.nist.javax.sdp.fields.TimeField;
import javax.sdp.Media;
import javax.sdp.MediaDescription;
import javax.sdp.SdpException;
import javax.sdp.SessionDescription; import javax.sdp.SessionDescription;
import java.time.Instant;
import java.util.Vector;
/** /**
* 28181 SDP * 28181 SDP
@ -11,11 +19,67 @@ public class Gb28181Sdp {
private String mediaDescription; private String mediaDescription;
public static Gb28181Sdp getInstance(SessionDescription baseSdb, String ssrc, String mediaDescription) { private Long startTime = null;
private Long stopTime = null;
private boolean tcp;
private boolean tcpActive;
private String sdpIp;
private Integer sdpPort;
private String username;
private String addressStr;
private Integer downloadSpeed;
public static Gb28181Sdp getInstance(SessionDescription baseSdb, String ssrc, String mediaDescriptionStr) throws SdpException {
Gb28181Sdp gb28181Sdp = new Gb28181Sdp(); Gb28181Sdp gb28181Sdp = new Gb28181Sdp();
gb28181Sdp.setBaseSdb(baseSdb); gb28181Sdp.setBaseSdb(baseSdb);
gb28181Sdp.setSsrc(ssrc); gb28181Sdp.setSsrc(ssrc);
gb28181Sdp.setMediaDescription(mediaDescription); gb28181Sdp.setMediaDescription(mediaDescriptionStr);
if (baseSdb.getTimeDescriptions(false) != null && baseSdb.getTimeDescriptions(false).size() > 0) {
TimeDescriptionImpl timeDescription = (TimeDescriptionImpl) (baseSdb.getTimeDescriptions(false).get(0));
TimeField startTimeFiled = (TimeField) timeDescription.getTime();
Long startTime = startTimeFiled.getStartTime();
Long stopTime = startTimeFiled.getStopTime();
gb28181Sdp.setStartTime(startTime);
gb28181Sdp.setStopTime(stopTime);
}
// 获取支持的格式
Vector mediaDescriptions = baseSdb.getMediaDescriptions(true);
for (Object description : mediaDescriptions) {
MediaDescription mediaDescription = (MediaDescription) description;
gb28181Sdp.setDownloadSpeed(Integer.parseInt(mediaDescription.getAttribute("downloadspeed")));
Media media = mediaDescription.getMedia();
Vector mediaFormats = media.getMediaFormats(false);
// 查看是否支持PS 负载96
if (mediaFormats.contains("96")) {
gb28181Sdp.setSdpPort(media.getMediaPort());
String protocol = media.getProtocol();
// 区分TCP发流还是udp 当前默认udp
if ("TCP/RTP/AVP".equalsIgnoreCase(protocol)) {
String setup = mediaDescription.getAttribute("setup");
if (setup != null) {
gb28181Sdp.setTcp(true);
if ("active".equalsIgnoreCase(setup)) {
gb28181Sdp.setTcpActive(true);
} else if ("passive".equalsIgnoreCase(setup)) {
gb28181Sdp.setTcpActive(false);
}
}
}
break;
}
}
gb28181Sdp.setUsername(baseSdb.getOrigin().getUsername());
gb28181Sdp.setAddressStr(baseSdb.getConnection().getAddress());
return gb28181Sdp; return gb28181Sdp;
} }
@ -43,4 +107,76 @@ public class Gb28181Sdp {
public void setMediaDescription(String mediaDescription) { public void setMediaDescription(String mediaDescription) {
this.mediaDescription = mediaDescription; this.mediaDescription = mediaDescription;
} }
public Long getStartTime() {
return startTime;
}
public void setStartTime(Long startTime) {
this.startTime = startTime;
}
public Long getStopTime() {
return stopTime;
}
public void setStopTime(Long stopTime) {
this.stopTime = stopTime;
}
public boolean isTcp() {
return tcp;
}
public void setTcp(boolean tcp) {
this.tcp = tcp;
}
public boolean isTcpActive() {
return tcpActive;
}
public void setTcpActive(boolean tcpActive) {
this.tcpActive = tcpActive;
}
public String getSdpIp() {
return sdpIp;
}
public void setSdpIp(String sdpIp) {
this.sdpIp = sdpIp;
}
public Integer getSdpPort() {
return sdpPort;
}
public void setSdpPort(Integer sdpPort) {
this.sdpPort = sdpPort;
}
public String getUsername() {
return username;
}
public void setUsername(String username) {
this.username = username;
}
public String getAddressStr() {
return addressStr;
}
public void setAddressStr(String addressStr) {
this.addressStr = addressStr;
}
public Integer getDownloadSpeed() {
return downloadSpeed;
}
public void setDownloadSpeed(Integer downloadSpeed) {
this.downloadSpeed = downloadSpeed;
}
} }

View File

@ -90,13 +90,6 @@ public class EventPublisher {
applicationEventPublisher.publishEvent(outEvent); applicationEventPublisher.publishEvent(outEvent);
} }
public void catalogEventPublishForStream(Integer platformId, GbStream gbStream, String type) {
List<GbStream> gbStreamList = new ArrayList<>();
gbStreamList.add(gbStream);
catalogEventPublishForStream(platformId, gbStreamList, type);
}
public void recordEndEventPush(RecordInfo recordInfo) { public void recordEndEventPush(RecordInfo recordInfo) {
RecordEndEvent outEvent = new RecordEndEvent(this); RecordEndEvent outEvent = new RecordEndEvent(this);
outEvent.setRecordInfo(recordInfo); outEvent.setRecordInfo(recordInfo);

View File

@ -51,7 +51,7 @@ public interface ISIPCommanderForPlatform {
* @return * @return
*/ */
void catalogQuery(DeviceChannel channel, ParentPlatform parentPlatform, String sn, String fromTag, int size) throws SipException, InvalidArgumentException, ParseException; void catalogQuery(DeviceChannel channel, ParentPlatform parentPlatform, String sn, String fromTag, int size) throws SipException, InvalidArgumentException, ParseException;
void catalogQuery(List<DeviceChannel> channels, ParentPlatform parentPlatform, String sn, String fromTag) throws InvalidArgumentException, ParseException, SipException; void catalogQuery(List<CommonGbChannel> channels, ParentPlatform parentPlatform, String sn, String fromTag) throws InvalidArgumentException, ParseException, SipException;
/** /**
* DeviceInfo * DeviceInfo

View File

@ -185,15 +185,15 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform {
} }
@Override @Override
public void catalogQuery(List<DeviceChannel> channels, ParentPlatform parentPlatform, String sn, String fromTag) throws InvalidArgumentException, ParseException, SipException { public void catalogQuery(List<CommonGbChannel> channels, ParentPlatform parentPlatform, String sn, String fromTag) throws InvalidArgumentException, ParseException, SipException {
if ( parentPlatform ==null) { if ( parentPlatform ==null) {
return ; return ;
} }
sendCatalogResponse(channels, parentPlatform, sn, fromTag, 0, true); sendCatalogResponse(channels, parentPlatform, sn, fromTag, 0, true);
} }
private String getCatalogXml(List<DeviceChannel> channels, String sn, ParentPlatform parentPlatform, int size) { private String getCatalogXml(List<CommonGbChannel> channels, String sn, ParentPlatform parentPlatform, int size) {
String characterSet = parentPlatform.getCharacterSet(); String characterSet = parentPlatform.getCharacterSet();
StringBuffer catalogXml = new StringBuffer(600); StringBuilder catalogXml = new StringBuilder(600);
catalogXml.append("<?xml version=\"1.0\" encoding=\"" + characterSet +"\"?>\r\n") catalogXml.append("<?xml version=\"1.0\" encoding=\"" + characterSet +"\"?>\r\n")
.append("<Response>\r\n") .append("<Response>\r\n")
.append("<CmdType>Catalog</CmdType>\r\n") .append("<CmdType>Catalog</CmdType>\r\n")
@ -201,180 +201,127 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform {
.append("<DeviceID>" + parentPlatform.getDeviceGBId() + "</DeviceID>\r\n") .append("<DeviceID>" + parentPlatform.getDeviceGBId() + "</DeviceID>\r\n")
.append("<SumNum>" + size + "</SumNum>\r\n") .append("<SumNum>" + size + "</SumNum>\r\n")
.append("<DeviceList Num=\"" + channels.size() +"\">\r\n"); .append("<DeviceList Num=\"" + channels.size() +"\">\r\n");
if (channels.size() > 0) { if (!channels.isEmpty()) {
for (DeviceChannel channel : channels) { for (CommonGbChannel channel : channels) {
if (parentPlatform.getServerGBId().equals(channel.getParentId())) {
channel.setParentId(parentPlatform.getDeviceGBId());
}
catalogXml.append("<Item>\r\n"); catalogXml.append("<Item>\r\n");
// 行政区划分组只需要这两项就可以 catalogXml.append("<DeviceID>" + channel.getCommonGbDeviceID() + "</DeviceID>\r\n");
catalogXml.append("<DeviceID>" + channel.getChannelId() + "</DeviceID>\r\n"); catalogXml.append("<Name>" + channel.getCommonGbName() + "</Name>\r\n");
catalogXml.append("<Name>" + channel.getName() + "</Name>\r\n"); if (!ObjectUtils.isEmpty(channel.getCommonGbManufacturer())) {
if (channel.getChannelId().length() <= 8) { catalogXml.append("<Manufacturer>" + channel.getCommonGbManufacturer() + "</Manufacturer>\r\n");
catalogXml.append("</Item>\r\n");
continue;
}else {
if (channel.getChannelId().length() != 20) {
catalogXml.append("</Item>\r\n");
logger.warn("[编号长度异常] {} 长度错误请使用20位长度的国标编号,当前长度:{}", channel.getChannelId(), channel.getChannelId().length());
catalogXml.append("</Item>\r\n");
continue;
} }
switch (Integer.parseInt(channel.getChannelId().substring(10, 13))){ if (!ObjectUtils.isEmpty(channel.getCommonGbModel())) {
case 200: catalogXml.append("<Model>" + channel.getCommonGbModel() + "</Model>\r\n");
// catalogXml.append("<Manufacturer>三永华通</Manufacturer>\r\n"); }
// GitUtil gitUtil = SpringBeanFactory.getBean("gitUtil"); if (!ObjectUtils.isEmpty(channel.getCommonGbOwner())) {
// String model = (gitUtil == null || gitUtil.getBuildVersion() == null)?"1.0": gitUtil.getBuildVersion(); catalogXml.append("<Owner> " + channel.getCommonGbOwner()+ "</Owner>\r\n");
// catalogXml.append("<Model>" + model + "</Manufacturer>\r\n"); }
// catalogXml.append("<Owner>三永华通</Owner>\r\n"); if (!ObjectUtils.isEmpty(channel.getCommonGbCivilCode())) {
if (channel.getCivilCode() != null) { catalogXml.append("<CivilCode> " + channel.getCommonGbCivilCode()+ "</CivilCode>\r\n");
catalogXml.append("<CivilCode>"+channel.getCivilCode()+"</CivilCode>\r\n"); }
}else { if (!ObjectUtils.isEmpty(channel.getCommonGbBlock())) {
catalogXml.append("<CivilCode></CivilCode>\r\n"); catalogXml.append("<Block>" + channel.getCommonGbBlock() + "</Block>\r\n");
}
if (!ObjectUtils.isEmpty(channel.getCommonGbAddress())) {
catalogXml.append("<Address> " + channel.getCommonGbAddress()+ "</Address>\r\n");
}
catalogXml.append("<Parental>" + channel.getCommonGbParental() + "</Parental>\r\n");
if (!ObjectUtils.isEmpty(channel.getCommonGbParentID())) {
// 业务分组加上这一项即可,提高兼容性,
catalogXml.append("<ParentID>" + channel.getCommonGbParentID() + "</ParentID>\r\n");
}
if (!ObjectUtils.isEmpty(channel.getCommonGbSafetyWay())) {
catalogXml.append("<SafetyWay>" + channel.getCommonGbSafetyWay() + "</SafetyWay>\r\n");
}
if (!ObjectUtils.isEmpty(channel.getCommonGbRegisterWay())) {
catalogXml.append("<RegisterWay>" + channel.getCommonGbRegisterWay() + "</RegisterWay>\r\n");
}
if (!ObjectUtils.isEmpty(channel.getCommonGbCertNum())) {
catalogXml.append("<CertNum>" + channel.getCommonGbCertNum() + "</CertNum>\r\n");
}
if (!ObjectUtils.isEmpty(channel.getCommonGbCertifiable())) {
catalogXml.append("<Certifiable>" + channel.getCommonGbCertifiable() + "</Certifiable>\r\n");
}
if (!ObjectUtils.isEmpty(channel.getCommonGbErrCode())) {
catalogXml.append("<ErrCode>" + channel.getCommonGbErrCode() + "</ErrCode>\r\n");
}
if (!ObjectUtils.isEmpty(channel.getCommonGbEndTime())) {
catalogXml.append("<EndTime>" + channel.getCommonGbEndTime() + "</EndTime>\r\n");
}
if (!ObjectUtils.isEmpty(channel.getCommonGbSecrecy())) {
catalogXml.append("<Secrecy>" + channel.getCommonGbSecrecy() + "</Secrecy>\r\n");
}
if (!ObjectUtils.isEmpty(channel.getCommonGbIPAddress())) {
catalogXml.append("<IPAddress>" + channel.getCommonGbIPAddress() + "</IPAddress>\r\n");
}
if (!ObjectUtils.isEmpty(channel.getCommonGbPort())) {
catalogXml.append("<Port>" + channel.getCommonGbPort() + "</Port>\r\n");
}
if (!ObjectUtils.isEmpty(channel.getCommonGbPassword())) {
catalogXml.append("<Password>" + channel.getCommonGbPassword() + "</Password>\r\n");
}
if (!ObjectUtils.isEmpty(channel.getCommonGbStatus())) {
catalogXml.append("<Status>" + (channel.getCommonGbStatus() ? "ON" : "OFF") + "</Status>\r\n");
}
if (!ObjectUtils.isEmpty(channel.getCommonGbLongitude())) {
catalogXml.append("<Longitude>" + channel.getCommonGbLongitude() + "</Longitude>\r\n");
}
if (!ObjectUtils.isEmpty(channel.getCommonGbLatitude())) {
catalogXml.append("<Latitude>" + channel.getCommonGbLatitude() + "</Latitude>\r\n");
}
if (!ObjectUtils.isEmpty(channel.getCommonGbPtzType())) {
catalogXml.append("<PTZType>" + channel.getCommonGbPtzType() + "</PTZType>\r\n");
}
if (!ObjectUtils.isEmpty(channel.getCommonGbPositionType())) {
catalogXml.append("<PositionType>" + channel.getCommonGbPositionType() + "</PositionType>\r\n");
}
if (!ObjectUtils.isEmpty(channel.getCommonGbRoomType())) {
catalogXml.append("<RoomType>" + channel.getCommonGbRoomType() + "</RoomType>\r\n");
}
if (!ObjectUtils.isEmpty(channel.getCommonGbUseType())) {
catalogXml.append("<UseType>" + channel.getCommonGbUseType() + "</UseType>\r\n");
}
if (!ObjectUtils.isEmpty(channel.getCommonGbSupplyLightType())) {
catalogXml.append("<SupplyLightType>" + channel.getCommonGbSupplyLightType() + "</SupplyLightType>\r\n");
}
if (!ObjectUtils.isEmpty(channel.getCommonGbDirectionType())) {
catalogXml.append("<DirectionType>" + channel.getCommonGbDirectionType() + "</DirectionType>\r\n");
}
if (!ObjectUtils.isEmpty(channel.getCommonGbResolution())) {
catalogXml.append("<Resolution>" + channel.getCommonGbResolution() + "</Resolution>\r\n");
}
if (!ObjectUtils.isEmpty(channel.getCommonGbBusinessGroupID())) {
catalogXml.append("<BusinessGroupID>" + channel.getCommonGbBusinessGroupID() + "</BusinessGroupID>\r\n");
}
if (!ObjectUtils.isEmpty(channel.getCommonGbDownloadSpeed())) {
catalogXml.append("<DownloadSpeed>" + channel.getCommonGbDownloadSpeed() + "</DownloadSpeed>\r\n");
}
if (!ObjectUtils.isEmpty(channel.getSVCSpaceSupportMode())) {
catalogXml.append("<SVCSpaceSupportMode>" + channel.getSVCSpaceSupportMode() + "</SVCSpaceSupportMode>\r\n");
}
if (!ObjectUtils.isEmpty(channel.getCommonGbSVCTimeSupportMode())) {
catalogXml.append("<SVCTimeSupportMode>" + channel.getCommonGbSVCTimeSupportMode() + "</SVCTimeSupportMode>\r\n");
} }
catalogXml.append("<RegisterWay>1</RegisterWay>\r\n");
catalogXml.append("<Secrecy>0</Secrecy>\r\n");
break;
case 215:
if (!ObjectUtils.isEmpty(channel.getParentId())) {
catalogXml.append("<ParentID>" + channel.getParentId() + "</ParentID>\r\n");
}
break;
case 216:
if (!ObjectUtils.isEmpty(channel.getParentId())) {
catalogXml.append("<ParentID>" + channel.getParentId() + "</ParentID>\r\n");
}else {
catalogXml.append("<ParentID></ParentID>\r\n");
}
if (!ObjectUtils.isEmpty(channel.getBusinessGroupId())) {
catalogXml.append("<BusinessGroupID>" + channel.getBusinessGroupId() + "</BusinessGroupID>\r\n");
}else {
catalogXml.append("<BusinessGroupID></BusinessGroupID>\r\n");
}
break;
default:
// 通道项
if (channel.getManufacture() != null) {
catalogXml.append("<Manufacturer>" + channel.getManufacture() + "</Manufacturer>\r\n");
}else {
catalogXml.append("<Manufacturer></Manufacturer>\r\n");
}
if (channel.getSecrecy() != null) {
catalogXml.append("<Secrecy>" + channel.getSecrecy() + "</Secrecy>\r\n");
}else {
catalogXml.append("<Secrecy></Secrecy>\r\n");
}
catalogXml.append("<RegisterWay>" + channel.getRegisterWay() + "</RegisterWay>\r\n");
if (channel.getModel() != null) {
catalogXml.append("<Model>" + channel.getModel() + "</Model>\r\n");
}else {
catalogXml.append("<Model></Model>\r\n");
}
if (channel.getOwner() != null) {
catalogXml.append("<Owner>" + channel.getOwner()+ "</Owner>\r\n");
}else {
catalogXml.append("<Owner></Owner>\r\n");
}
if (channel.getCivilCode() != null) {
catalogXml.append("<CivilCode>" + channel.getCivilCode() + "</CivilCode>\r\n");
}else {
catalogXml.append("<CivilCode></CivilCode>\r\n");
}
if (channel.getAddress() == null) {
catalogXml.append("<Address></Address>\r\n");
}else {
catalogXml.append("<Address>" + channel.getAddress() + "</Address>\r\n");
}
if (!ObjectUtils.isEmpty(channel.getParentId())) {
catalogXml.append("<ParentID>" + channel.getParentId() + "</ParentID>\r\n");
}else {
catalogXml.append("<ParentID></ParentID>\r\n");
}
if (!ObjectUtils.isEmpty(channel.getBlock())) {
catalogXml.append("<Block>" + channel.getBlock() + "</Block>\r\n");
}else {
catalogXml.append("<Block></Block>\r\n");
}
if (!ObjectUtils.isEmpty(channel.getSafetyWay())) {
catalogXml.append("<SafetyWay>" + channel.getSafetyWay() + "</SafetyWay>\r\n");
}else {
catalogXml.append("<SafetyWay></SafetyWay>\r\n");
}
if (!ObjectUtils.isEmpty(channel.getCertNum())) {
catalogXml.append("<CertNum>" + channel.getCertNum() + "</CertNum>\r\n");
}else {
catalogXml.append("<CertNum></CertNum>\r\n");
}
if (!ObjectUtils.isEmpty(channel.getCertifiable())) {
catalogXml.append("<Certifiable>" + channel.getCertifiable() + "</Certifiable>\r\n");
}else {
catalogXml.append("<Certifiable></Certifiable>\r\n");
}
if (!ObjectUtils.isEmpty(channel.getErrCode())) {
catalogXml.append("<ErrCode>" + channel.getErrCode() + "</ErrCode>\r\n");
}else {
catalogXml.append("<ErrCode></ErrCode>\r\n");
}
if (!ObjectUtils.isEmpty(channel.getEndTime())) {
catalogXml.append("<EndTime>" + channel.getEndTime() + "</EndTime>\r\n");
}else {
catalogXml.append("<EndTime></EndTime>\r\n");
}
if (!ObjectUtils.isEmpty(channel.getSecrecy())) {
catalogXml.append("<Secrecy>" + channel.getSecrecy() + "</Secrecy>\r\n");
}else {
catalogXml.append("<Secrecy></Secrecy>\r\n");
}
if (!ObjectUtils.isEmpty(channel.getIpAddress())) {
catalogXml.append("<IPAddress>" + channel.getIpAddress() + "</IPAddress>\r\n");
}else {
catalogXml.append("<IPAddress></IPAddress>\r\n");
}
catalogXml.append("<Port>" + channel.getPort() + "</Port>\r\n");
if (!ObjectUtils.isEmpty(channel.getPassword())) {
catalogXml.append("<Password>" + channel.getPassword() + "</Password>\r\n");
}else {
catalogXml.append("<Password></Password>\r\n");
}
if (!ObjectUtils.isEmpty(channel.getPTZType())) {
catalogXml.append("<PTZType>" + channel.getPTZType() + "</PTZType>\r\n");
}else {
catalogXml.append("<PTZType></PTZType>\r\n");
}
catalogXml.append("<Status>" + (channel.isStatus() ?"ON":"OFF") + "</Status>\r\n");
catalogXml.append("<Longitude>" +
(channel.getLongitudeWgs84() != 0? channel.getLongitudeWgs84():channel.getLongitude())
+ "</Longitude>\r\n");
catalogXml.append("<Latitude>" +
(channel.getLatitudeWgs84() != 0? channel.getLatitudeWgs84():channel.getLatitude())
+ "</Latitude>\r\n");
break;
}
catalogXml.append("</Item>\r\n"); catalogXml.append("</Item>\r\n");
} }
} }
}
catalogXml.append("</DeviceList>\r\n"); catalogXml.append("</DeviceList>\r\n");
catalogXml.append("</Response>\r\n"); catalogXml.append("</Response>\r\n");
return catalogXml.toString(); return catalogXml.toString();
} }
private void sendCatalogResponse(List<DeviceChannel> channels, ParentPlatform parentPlatform, String sn, String fromTag, int index, boolean sendAfterResponse) throws SipException, InvalidArgumentException, ParseException { private void sendCatalogResponse(List<CommonGbChannel> channels, ParentPlatform parentPlatform, String sn, String fromTag, int index, boolean sendAfterResponse) throws SipException, InvalidArgumentException, ParseException {
if (index >= channels.size()) { if (index >= channels.size()) {
return; return;
} }
List<DeviceChannel> deviceChannels; List<CommonGbChannel> channelList;
if (index + parentPlatform.getCatalogGroup() < channels.size()) { if (index + parentPlatform.getCatalogGroup() < channels.size()) {
deviceChannels = channels.subList(index, index + parentPlatform.getCatalogGroup()); channelList = channels.subList(index, index + parentPlatform.getCatalogGroup());
}else { }else {
deviceChannels = channels.subList(index, channels.size()); channelList = channels.subList(index, channels.size());
} }
String catalogXml = getCatalogXml(deviceChannels, sn, parentPlatform, channels.size()); String catalogXml = getCatalogXml(channelList, sn, parentPlatform, channels.size());
// callid // callid
CallIdHeader callIdHeader = sipSender.getNewCallIdHeader(parentPlatform.getDeviceIp(),parentPlatform.getTransport()); CallIdHeader callIdHeader = sipSender.getNewCallIdHeader(parentPlatform.getDeviceIp(),parentPlatform.getTransport());
@ -616,7 +563,6 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform {
if (channels.size() > 0) { if (channels.size() > 0) {
for (CommonGbChannel channel : channels) { for (CommonGbChannel channel : channels) {
catalogXml.append("<Item>\r\n"); catalogXml.append("<Item>\r\n");
// 行政区划分组只需要这两项就可以
catalogXml.append("<DeviceID>" + channel.getCommonGbDeviceID() + "</DeviceID>\r\n"); catalogXml.append("<DeviceID>" + channel.getCommonGbDeviceID() + "</DeviceID>\r\n");
catalogXml.append("<Name>" + channel.getCommonGbName() + "</Name>\r\n"); catalogXml.append("<Name>" + channel.getCommonGbName() + "</Name>\r\n");
if (!ObjectUtils.isEmpty(channel.getCommonGbManufacturer())) { if (!ObjectUtils.isEmpty(channel.getCommonGbManufacturer())) {

View File

@ -138,7 +138,7 @@ public class ByeRequestProcessor extends SIPRequestProcessorParent implements In
logger.info("[收到bye] {} 通知设备停止推流时未找到设备信息", streamId); logger.info("[收到bye] {} 通知设备停止推流时未找到设备信息", streamId);
} }
try { try {
logger.info("[停止点播] {}/{}", sendRtpItem.getDeviceId(), sendRtpItem.getChannelId()); logger.info("[停止点播] {}", sendRtpItem.getChannelId());
cmder.streamByeCmd(device, sendRtpItem.getChannelId(), streamId, null); cmder.streamByeCmd(device, sendRtpItem.getChannelId(), streamId, null);
} catch (InvalidArgumentException | ParseException | SipException | } catch (InvalidArgumentException | ParseException | SipException |
SsrcTransactionNotFoundException e) { SsrcTransactionNotFoundException e) {

View File

@ -2,6 +2,7 @@ package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl;
import com.alibaba.fastjson2.JSON; import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONObject; import com.alibaba.fastjson2.JSONObject;
import com.genersoft.iot.vmp.common.CommonGbChannel;
import com.genersoft.iot.vmp.common.StreamInfo; import com.genersoft.iot.vmp.common.StreamInfo;
import com.genersoft.iot.vmp.conf.DynamicTask; import com.genersoft.iot.vmp.conf.DynamicTask;
import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.conf.UserSetting;
@ -68,6 +69,9 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
@Autowired @Autowired
private IVideoManagerStorage storager; private IVideoManagerStorage storager;
@Autowired
private IPlatformChannelService platformChannelService;
@Autowired @Autowired
private IStreamPushService streamPushService; private IStreamPushService streamPushService;
@Autowired @Autowired
@ -76,9 +80,6 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
@Autowired @Autowired
private IRedisCatchStorage redisCatchStorage; private IRedisCatchStorage redisCatchStorage;
@Autowired
private IInviteStreamService inviteStreamService;
@Autowired @Autowired
private SSRCFactory ssrcFactory; private SSRCFactory ssrcFactory;
@ -91,6 +92,9 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
@Autowired @Autowired
private IPlayService playService; private IPlayService playService;
@Autowired
private Map<String, IResourceService> resourceServiceMap;
@Autowired @Autowired
private SIPSender sipSender; private SIPSender sipSender;
@ -147,7 +151,6 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
return; return;
} }
// 查询请求是否来自上级平台\设备 // 查询请求是否来自上级平台\设备
ParentPlatform platform = storager.queryParentPlatByServerGBId(requesterId); ParentPlatform platform = storager.queryParentPlatByServerGBId(requesterId);
if (platform == null) { if (platform == null) {
@ -155,83 +158,9 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
} else { } else {
// 查询平台下是否有该通道 // 查询平台下是否有该通道
DeviceChannel channel = storager.queryChannelInParentPlatform(requesterId, channelId); CommonGbChannel channel = platformChannelService.queryChannelByPlatformIdAndChannelDeviceId(platform.getId(), channelId);
GbStream gbStream = storager.queryStreamInParentPlatform(requesterId, channelId); if (channel == null) {
PlatformCatalog catalog = storager.getCatalog(requesterId, channelId); logger.info("[国标级联] 上级点播 通道不存在返回404: {}", channelId);
MediaServerItem mediaServerItem = null;
StreamPush streamPushItem = null;
StreamProxy proxyByAppAndStream =null;
// 不是通道可能是直播流
if (channel != null && gbStream == null) {
// 通道存在发100TRYING
try {
responseAck(request, Response.TRYING);
} catch (SipException | InvalidArgumentException | ParseException e) {
logger.error("[命令发送失败] invite TRYING: {}", e.getMessage());
}
} else if (channel == null && gbStream != null) {
String mediaServerId = gbStream.getMediaServerId();
mediaServerItem = mediaServerService.getOne(mediaServerId);
if (mediaServerItem == null) {
if ("proxy".equals(gbStream.getStreamType())) {
logger.info("[ app={}, stream={} ]找不到zlm {}返回410", gbStream.getApp(), gbStream.getStream(), mediaServerId);
try {
responseAck(request, Response.GONE);
} catch (SipException | InvalidArgumentException | ParseException e) {
logger.error("[命令发送失败] invite GONE: {}", e.getMessage());
}
return;
} else {
streamPushItem = streamPushService.getPush(gbStream.getApp(), gbStream.getStream());
if (streamPushItem != null) {
mediaServerItem = mediaServerService.getOne(streamPushItem.getMediaServerId());
}
if (mediaServerItem == null) {
mediaServerItem = mediaServerService.getDefaultMediaServer();
}
}
} else {
if ("push".equals(gbStream.getStreamType())) {
streamPushItem = streamPushService.getPush(gbStream.getApp(), gbStream.getStream());
if (streamPushItem == null) {
logger.info("[ app={}, stream={} ]找不到zlm {}返回410", gbStream.getApp(), gbStream.getStream(), mediaServerId);
try {
responseAck(request, Response.GONE);
} catch (SipException | InvalidArgumentException | ParseException e) {
logger.error("[命令发送失败] invite GONE: {}", e.getMessage());
}
return;
}
}else if("proxy".equals(gbStream.getStreamType())){
proxyByAppAndStream = streamProxyService.getStreamProxyByAppAndStream(gbStream.getApp(), gbStream.getStream());
if (proxyByAppAndStream == null) {
logger.info("[ app={}, stream={} ]找不到zlm {}返回410", gbStream.getApp(), gbStream.getStream(), mediaServerId);
try {
responseAck(request, Response.GONE);
} catch (SipException | InvalidArgumentException | ParseException e) {
logger.error("[命令发送失败] invite GONE: {}", e.getMessage());
}
return;
}
}
}
try {
responseAck(request, Response.CALL_IS_BEING_FORWARDED);
} catch (SipException | InvalidArgumentException | ParseException e) {
logger.error("[命令发送失败] invite CALL_IS_BEING_FORWARDED: {}", e.getMessage());
}
} else if (catalog != null) {
try {
// 目录不支持点播
responseAck(request, Response.BAD_REQUEST, "catalog channel can not play");
} catch (SipException | InvalidArgumentException | ParseException e) {
logger.error("[命令发送失败] invite 目录不支持点播: {}", e.getMessage());
}
return;
} else {
logger.info("通道不存在返回404: {}", channelId);
try { try {
// 通道不存在发404资源不存在 // 通道不存在发404资源不存在
responseAck(request, Response.NOT_FOUND); responseAck(request, Response.NOT_FOUND);
@ -240,59 +169,30 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
} }
return; return;
} }
IResourceService resourceService = resourceServiceMap.get(channel.getType());
if (resourceService == null) {
logger.info("[国标级联] 上级点播 未找到类型{}的处理类: {}", channel.getType(), channelId);
try {
// 通道不存在发404资源不存在
responseAck(request, Response.NOT_IMPLEMENTED);
} catch (SipException | InvalidArgumentException | ParseException e) {
logger.error("[命令发送失败] invite 通道不存在: {}", e.getMessage());
}
return;
}
// 通道存在发100TRYING
try {
responseAck(request, Response.TRYING);
} catch (SipException | InvalidArgumentException | ParseException e) {
logger.error("[命令发送失败] invite TRYING: {}", e.getMessage());
}
// 解析sdp消息, 使用jainsip 自带的sdp解析方式 // 解析sdp消息, 使用jainsip 自带的sdp解析方式
String contentString = new String(request.getRawContent()); String contentString = new String(request.getRawContent());
Gb28181Sdp gb28181Sdp = SipUtils.parseSDP(contentString); Gb28181Sdp gb28181Sdp = SipUtils.parseSDP(contentString);
SessionDescription sdp = gb28181Sdp.getBaseSdb();
String sessionName = sdp.getSessionName().getValue();
Long startTime = null; if (gb28181Sdp.getSdpPort() == 0) {
Long stopTime = null;
Instant start = null;
Instant end = null;
if (sdp.getTimeDescriptions(false) != null && sdp.getTimeDescriptions(false).size() > 0) {
TimeDescriptionImpl timeDescription = (TimeDescriptionImpl) (sdp.getTimeDescriptions(false).get(0));
TimeField startTimeFiled = (TimeField) timeDescription.getTime();
startTime = startTimeFiled.getStartTime();
stopTime = startTimeFiled.getStopTime();
start = Instant.ofEpochSecond(startTime);
end = Instant.ofEpochSecond(stopTime);
}
// 获取支持的格式
Vector mediaDescriptions = sdp.getMediaDescriptions(true);
// 查看是否支持PS 负载96
//String ip = null;
int port = -1;
boolean mediaTransmissionTCP = false;
Boolean tcpActive = null;
for (Object description : mediaDescriptions) {
MediaDescription mediaDescription = (MediaDescription) description;
Media media = mediaDescription.getMedia();
Vector mediaFormats = media.getMediaFormats(false);
if (mediaFormats.contains("96")) {
port = media.getMediaPort();
//String mediaType = media.getMediaType();
String protocol = media.getProtocol();
// 区分TCP发流还是udp 当前默认udp
if ("TCP/RTP/AVP".equalsIgnoreCase(protocol)) {
String setup = mediaDescription.getAttribute("setup");
if (setup != null) {
mediaTransmissionTCP = true;
if ("active".equalsIgnoreCase(setup)) {
tcpActive = true;
} else if ("passive".equalsIgnoreCase(setup)) {
tcpActive = false;
}
}
}
break;
}
}
if (port == -1) {
logger.info("不支持的媒体格式返回415"); logger.info("不支持的媒体格式返回415");
// 回复不支持的格式 // 回复不支持的格式
try { try {
@ -303,60 +203,46 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
} }
return; return;
} }
String username = sdp.getOrigin().getUsername(); String sessionName = gb28181Sdp.getBaseSdb().getSessionName().getValue();
String addressStr = sdp.getConnection().getAddress(); String streamTypeStr = "UDP";
if (gb28181Sdp.isTcp()) {
if (gb28181Sdp.isTcpActive()) {
Device device = null;
// 通过 channel 和 gbStream 是否为null 值判断来源是直播流合适国标
if (channel != null) {
device = storager.queryVideoDeviceByPlatformIdAndChannelId(requesterId, channelId);
if (device == null) {
logger.warn("点播平台{}的通道{}时未找到设备信息", requesterId, channel);
try {
responseAck(request, Response.SERVER_INTERNAL_ERROR);
} catch (SipException | InvalidArgumentException | ParseException e) {
logger.error("[命令发送失败] invite 未找到设备信息: {}", e.getMessage());
}
return;
}
mediaServerItem = playService.getNewMediaServerItem(device);
if (mediaServerItem == null) {
logger.warn("未找到可用的zlm");
try {
responseAck(request, Response.BUSY_HERE);
} catch (SipException | InvalidArgumentException | ParseException e) {
logger.error("[命令发送失败] invite BUSY_HERE: {}", e.getMessage());
}
return;
}
String ssrc;
if (userSetting.getUseCustomSsrcForParentInvite() || gb28181Sdp.getSsrc() == null) {
// 上级平台点播时不使用上级平台指定的ssrc使用自定义的ssrc参考国标文档-点播外域设备媒体流SSRC处理方式
ssrc = "Play".equalsIgnoreCase(sessionName) ? ssrcFactory.getPlaySsrc(mediaServerItem.getId()) : ssrcFactory.getPlayBackSsrc(mediaServerItem.getId());
}else {
ssrc = gb28181Sdp.getSsrc();
}
String streamTypeStr = null;
if (mediaTransmissionTCP) {
if (tcpActive) {
streamTypeStr = "TCP-ACTIVE"; streamTypeStr = "TCP-ACTIVE";
}else { }else {
streamTypeStr = "TCP-PASSIVE"; streamTypeStr = "TCP-PASSIVE";
} }
}else {
streamTypeStr = "UDP";
} }
logger.info("[上级Invite] {}, 平台:{} 通道:{}, 收流地址:{}:{},收流方式:{}, ssrc{}", sessionName, username, channelId, addressStr, port, streamTypeStr, ssrc); logger.info("[上级Invite] {}, 平台:{} 通道:{}, 收流地址:{}:{},收流方式:{}, ssrc{}",
SendRtpItem sendRtpItem = zlmServerFactory.createSendRtpItem(mediaServerItem, addressStr, port, ssrc, requesterId, sessionName, gb28181Sdp.getUsername(), channelId, gb28181Sdp.getAddressStr(),
device.getDeviceId(), channelId, mediaTransmissionTCP, platform.isRtcp()); gb28181Sdp.getSdpPort(), streamTypeStr, gb28181Sdp.getSsrc());
if (tcpActive != null) { IResourcePlayCallback callback = (commonGbChannel, mediaServerItem, code, message, streamInfo) -> {
sendRtpItem.setTcpActive(tcpActive); if (code != 0) {
logger.info("[上级Invite] 获取资源流失败。{}, {}/{}", message, streamInfo.getApp(), streamInfo.getStream());
try {
cmderFroPlatform.streamByeCmd(platform, callIdHeader.getCallId());
} catch (SipException | InvalidArgumentException | ParseException e) {
logger.error("[命令发送失败] 国标级联 发送BYE: {}", e.getMessage());
} }
return;
}
try {
String ssrc;
logger.info("[上级Invite] 收到资源推流。 回复200OK(SDP) {}/{}", streamInfo.getApp(), streamInfo.getStream());
if (userSetting.getUseCustomSsrcForParentInvite() || gb28181Sdp.getSsrc() == null) {
// 上级平台点播时不使用上级平台指定的ssrc使用自定义的ssrc参考国标文档-点播外域设备媒体流SSRC处理方式
ssrc = "Play".equalsIgnoreCase(sessionName) ?
ssrcFactory.getPlaySsrc(mediaServerItem.getId()) :
ssrcFactory.getPlayBackSsrc(mediaServerItem.getId());
}else {
ssrc = gb28181Sdp.getSsrc();
}
SendRtpItem sendRtpItem = zlmServerFactory.createSendRtpItem(mediaServerItem,
gb28181Sdp.getAddressStr(), gb28181Sdp.getSdpPort(), ssrc, requesterId,
channelId, gb28181Sdp.isTcp(), platform.isRtcp());
if (sendRtpItem == null) { if (sendRtpItem == null) {
logger.warn("服务器端口资源不足"); logger.warn("[上级Invite] 获取发流端口资源失败 服务器端口资源可能不足");
try { try {
responseAck(request, Response.BUSY_HERE); responseAck(request, Response.BUSY_HERE);
} catch (SipException | InvalidArgumentException | ParseException e) { } catch (SipException | InvalidArgumentException | ParseException e) {
@ -364,48 +250,38 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
} }
return; return;
} }
sendRtpItem.setTcpActive(gb28181Sdp.isTcpActive());
sendRtpItem.setCallId(callIdHeader.getCallId()); sendRtpItem.setCallId(callIdHeader.getCallId());
sendRtpItem.setPlayType("Play".equalsIgnoreCase(sessionName) ? InviteStreamType.PLAY : InviteStreamType.PLAYBACK); if ("Play".equalsIgnoreCase(sessionName)) {
sendRtpItem.setPlayType(InviteStreamType.PLAY);
Long finalStartTime = startTime; }else if ("Playback".equalsIgnoreCase(sessionName)) {
Long finalStopTime = stopTime; sendRtpItem.setPlayType(InviteStreamType.PLAYBACK);
ErrorCallback<Object> hookEvent = (code, msg, data) -> { }else if ("Download".equalsIgnoreCase(sessionName)) {
StreamInfo streamInfo = (StreamInfo)data; sendRtpItem.setPlayType(InviteStreamType.DOWNLOAD);
MediaServerItem mediaServerItemInUSe = mediaServerService.getOne(streamInfo.getMediaServerId()); }
logger.info("[上级Invite]下级已经开始推流。 回复200OK(SDP) {}/{}", streamInfo.getApp(), streamInfo.getStream());
// * 0 等待设备推流上来
// * 1 下级已经推流等待上级平台回复ack
// * 2 推流中
sendRtpItem.setStatus(1); sendRtpItem.setStatus(1);
redisCatchStorage.updateSendRTPSever(sendRtpItem); redisCatchStorage.updateSendRTPSever(sendRtpItem);
StringBuffer content = new StringBuffer(200); StringBuffer content = new StringBuffer(200);
content.append("v=0\r\n"); content.append("v=0\r\n");
content.append("o=" + channelId + " 0 0 IN IP4 " + mediaServerItemInUSe.getSdpIp() + "\r\n"); content.append("o=" + channelId + " 0 0 IN IP4 " + mediaServerItem.getSdpIp() + "\r\n");
content.append("s=" + sessionName + "\r\n"); content.append("s=" + sessionName + "\r\n");
content.append("c=IN IP4 " + mediaServerItemInUSe.getSdpIp() + "\r\n"); content.append("c=IN IP4 " + mediaServerItem.getSdpIp() + "\r\n");
if ("Playback".equalsIgnoreCase(sessionName)) { if (!"Play".equalsIgnoreCase(sessionName)) {
content.append("t=" + finalStartTime + " " + finalStopTime + "\r\n"); content.append("t=" + gb28181Sdp.getStartTime() + " " + gb28181Sdp.getStopTime() + "\r\n");
} else { } else {
content.append("t=0 0\r\n"); content.append("t=0 0\r\n");
} }
int localPort = sendRtpItem.getLocalPort(); content.append("m=video " + sendRtpItem.getLocalPort() + " RTP/AVP 96\r\n");
if (localPort == 0) {
// 非严格模式端口不统一, 增加兼容性修改为一个不为0的端口
localPort = new Random().nextInt(65535) + 1;
}
content.append("m=video " + localPort + " RTP/AVP 96\r\n");
content.append("a=sendonly\r\n"); content.append("a=sendonly\r\n");
content.append("a=rtpmap:96 PS/90000\r\n"); content.append("a=rtpmap:96 PS/90000\r\n");
content.append("y=" + sendRtpItem.getSsrc() + "\r\n"); content.append("y=" + sendRtpItem.getSsrc() + "\r\n");
content.append("f=\r\n"); content.append("f=\r\n");
try {
// 超时未收到Ack应该回复bye,当前等待时间为10秒 // 超时未收到Ack应该回复bye,当前等待时间为10秒
dynamicTask.startDelay(callIdHeader.getCallId(), () -> { dynamicTask.startDelay(callIdHeader.getCallId(), () -> {
logger.info("Ack 等待超时"); logger.info("Ack 等待超时");
mediaServerService.releaseSsrc(mediaServerItemInUSe.getId(), sendRtpItem.getSsrc()); mediaServerService.releaseSsrc(mediaServerItem.getId(), sendRtpItem.getSsrc());
// 回复bye // 回复bye
try { try {
cmderFroPlatform.streamByeCmd(platform, callIdHeader.getCallId()); cmderFroPlatform.streamByeCmd(platform, callIdHeader.getCallId());
@ -414,7 +290,6 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
} }
}, 60 * 1000); }, 60 * 1000);
responseSdpAck(request, content.toString(), platform); responseSdpAck(request, content.toString(), platform);
// tcp主动模式回复sdp后开启监听
if (sendRtpItem.isTcpActive()) { if (sendRtpItem.isTcpActive()) {
MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId()); MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());
Map<String, Object> param = new HashMap<>(12); Map<String, Object> param = new HashMap<>(12);
@ -428,7 +303,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
} }
String is_Udp = sendRtpItem.isTcp() ? "0" : "1"; String is_Udp = sendRtpItem.isTcp() ? "0" : "1";
param.put("is_udp", is_Udp); param.put("is_udp", is_Udp);
param.put("src_port", localPort); param.put("src_port", sendRtpItem.getLocalPort());
param.put("pt", sendRtpItem.getPt()); param.put("pt", sendRtpItem.getPt());
param.put("use_ps", sendRtpItem.isUsePs() ? "1" : "0"); param.put("use_ps", sendRtpItem.isUsePs() ? "1" : "0");
param.put("only_audio", sendRtpItem.isOnlyAudio() ? "1" : "0"); param.put("only_audio", sendRtpItem.isOnlyAudio() ? "1" : "0");
@ -445,125 +320,199 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
logger.error("[命令发送失败] 国标级联 回复SdpAck", e); logger.error("[命令发送失败] 国标级联 回复SdpAck", e);
} }
}; };
ErrorCallback<Object> errorEvent = ((statusCode, msg, data) -> { if ("Play".equalsIgnoreCase(sessionName)) {
// 未知错误。直接转发设备点播的错误 resourceService.startPlay(channel, callback);
try { }else if ("Playback".equalsIgnoreCase(sessionName)) {
if (statusCode > 0) { resourceService.startPlayback(channel, gb28181Sdp.getStartTime(), gb28181Sdp.getStopTime(), callback);
Response response = getMessageFactory().createResponse(statusCode, evt.getRequest());
sipSender.transmitRequest(request.getLocalAddress().getHostAddress(), response);
}
} catch (ParseException | SipException e) {
logger.error("未处理的异常 ", e);
}
});
sendRtpItem.setApp("rtp");
if ("Playback".equalsIgnoreCase(sessionName)) {
sendRtpItem.setPlayType(InviteStreamType.PLAYBACK);
String startTimeStr = DateUtil.urlFormatter.format(start);
String endTimeStr = DateUtil.urlFormatter.format(end);
String stream = device.getDeviceId() + "_" + channelId + "_" + startTimeStr + "_" + endTimeStr;
SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, stream, null, device.isSsrcCheck(), true, 0, false, device.getStreamModeForParam());
sendRtpItem.setStreamId(ssrcInfo.getStream());
// 写入redis 超时时回复
redisCatchStorage.updateSendRTPSever(sendRtpItem);
playService.playBack(mediaServerItem, ssrcInfo, device.getDeviceId(), channelId, DateUtil.formatter.format(start),
DateUtil.formatter.format(end),
(code, msg, data) -> {
if (code == InviteErrorCode.SUCCESS.getCode()){
hookEvent.run(code, msg, data);
}else if (code == InviteErrorCode.ERROR_FOR_SIGNALLING_TIMEOUT.getCode() || code == InviteErrorCode.ERROR_FOR_STREAM_TIMEOUT.getCode()){
logger.info("[录像回放]超时, 用户:{} 通道:{}", username, channelId);
redisCatchStorage.deleteSendRTPServer(platform.getServerGBId(), channelId, callIdHeader.getCallId(), null);
errorEvent.run(code, msg, data);
}else {
errorEvent.run(code, msg, data);
}
});
}else if ("Download".equalsIgnoreCase(sessionName)) { }else if ("Download".equalsIgnoreCase(sessionName)) {
// 获取指定的下载速度 resourceService.startDownload(channel, gb28181Sdp.getStartTime(), gb28181Sdp.getStopTime(),
Vector sdpMediaDescriptions = sdp.getMediaDescriptions(true); gb28181Sdp.getDownloadSpeed(), callback);
MediaDescription mediaDescription = null;
String downloadSpeed = "1";
if (sdpMediaDescriptions.size() > 0) {
mediaDescription = (MediaDescription) sdpMediaDescriptions.get(0);
}
if (mediaDescription != null) {
downloadSpeed = mediaDescription.getAttribute("downloadspeed");
}
sendRtpItem.setPlayType(InviteStreamType.DOWNLOAD);
SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, null, null, device.isSsrcCheck(), true, 0, false, device.getStreamModeForParam());
sendRtpItem.setStreamId(ssrcInfo.getStream());
// 写入redis 超时时回复
redisCatchStorage.updateSendRTPSever(sendRtpItem);
playService.download(mediaServerItem, ssrcInfo, device.getDeviceId(), channelId, DateUtil.formatter.format(start),
DateUtil.formatter.format(end), Integer.parseInt(downloadSpeed),
(code, msg, data) -> {
if (code == InviteErrorCode.SUCCESS.getCode()) {
hookEvent.run(code, msg, data);
} else if (code == InviteErrorCode.ERROR_FOR_SIGNALLING_TIMEOUT.getCode() || code == InviteErrorCode.ERROR_FOR_STREAM_TIMEOUT.getCode()) {
logger.info("[录像下载]超时, 用户:{} 通道:{}", username, channelId);
redisCatchStorage.deleteSendRTPServer(platform.getServerGBId(), channelId, callIdHeader.getCallId(), null);
errorEvent.run(code, msg, data);
} else {
errorEvent.run(code, msg, data);
}
});
} else {
SSRCInfo ssrcInfo = playService.play(mediaServerItem, device.getDeviceId(), channelId, ssrc, ((code, msg, data) -> {
if (code == InviteErrorCode.SUCCESS.getCode()) {
hookEvent.run(code, msg, data);
} else if (code == InviteErrorCode.ERROR_FOR_SIGNALLING_TIMEOUT.getCode() || code == InviteErrorCode.ERROR_FOR_STREAM_TIMEOUT.getCode()) {
logger.info("[上级点播]超时, 用户:{} 通道:{}", username, channelId);
redisCatchStorage.deleteSendRTPServer(platform.getServerGBId(), channelId, callIdHeader.getCallId(), null);
errorEvent.run(code, msg, data);
} else {
errorEvent.run(code, msg, data);
}
}));
sendRtpItem.setPlayType(InviteStreamType.PLAY);
String streamId = String.format("%s_%s", device.getDeviceId(), channelId);
sendRtpItem.setStreamId(streamId);
sendRtpItem.setSsrc(ssrcInfo.getSsrc());
redisCatchStorage.updateSendRTPSever(sendRtpItem);
}
} else if (gbStream != null) {
String ssrc;
if (userSetting.getUseCustomSsrcForParentInvite() || gb28181Sdp.getSsrc() == null) {
// 上级平台点播时不使用上级平台指定的ssrc使用自定义的ssrc参考国标文档-点播外域设备媒体流SSRC处理方式
ssrc = "Play".equalsIgnoreCase(sessionName) ? ssrcFactory.getPlaySsrc(mediaServerItem.getId()) : ssrcFactory.getPlayBackSsrc(mediaServerItem.getId());
}else {
ssrc = gb28181Sdp.getSsrc();
}
if("push".equals(gbStream.getStreamType())) {
if (streamPushItem != null && streamPushItem.isPushIng()) {
// 推流状态
pushStream(evt, request, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive,
mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId);
} else {
// 未推流 拉起
notifyStreamOnline(evt, request,gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive,
mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId);
}
}else if ("proxy".equals(gbStream.getStreamType())){
if (null != proxyByAppAndStream) {
if(proxyByAppAndStream.isStatus()){
pushProxyStream(evt, request, gbStream, platform, callIdHeader, mediaServerItem, port, tcpActive,
mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId);
}else{
//开启代理拉流
notifyStreamOnline(evt, request,gbStream, null, platform, callIdHeader, mediaServerItem, port, tcpActive,
mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId);
}
} }
}
}
//
// Device device = null;
// // 通过 channel 和 gbStream 是否为null 值判断来源是直播流合适国标
// if (channel != null) {
//
// ErrorCallback<Object> hookEvent = (code, msg, data) -> {
// StreamInfo streamInfo = (StreamInfo)data;
// MediaServerItem mediaServerItemInUSe = mediaServerService.getOne(streamInfo.getMediaServerId());
// logger.info("[上级Invite]下级已经开始推流。 回复200OK(SDP) {}/{}", streamInfo.getApp(), streamInfo.getStream());
// // * 0 等待设备推流上来
// // * 1 下级已经推流等待上级平台回复ack
// // * 2 推流中
// sendRtpItem.setStatus(1);
// redisCatchStorage.updateSendRTPSever(sendRtpItem);
//
//
//
//
// try {
// // 超时未收到Ack应该回复bye,当前等待时间为10秒
// dynamicTask.startDelay(callIdHeader.getCallId(), () -> {
// logger.info("Ack 等待超时");
// mediaServerService.releaseSsrc(mediaServerItemInUSe.getId(), sendRtpItem.getSsrc());
// // 回复bye
// try {
// cmderFroPlatform.streamByeCmd(platform, callIdHeader.getCallId());
// } catch (SipException | InvalidArgumentException | ParseException e) {
// logger.error("[命令发送失败] 国标级联 发送BYE: {}", e.getMessage());
// }
// }, 60 * 1000);
// responseSdpAck(request, content.toString(), platform);
// // tcp主动模式回复sdp后开启监听
// if (sendRtpItem.isTcpActive()) {
// MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());
// Map<String, Object> param = new HashMap<>(12);
// param.put("vhost","__defaultVhost__");
// param.put("app",sendRtpItem.getApp());
// param.put("stream",sendRtpItem.getStreamId());
// param.put("ssrc", sendRtpItem.getSsrc());
// if (!sendRtpItem.isTcpActive()) {
// param.put("dst_url",sendRtpItem.getIp());
// param.put("dst_port", sendRtpItem.getPort());
// }
// String is_Udp = sendRtpItem.isTcp() ? "0" : "1";
// param.put("is_udp", is_Udp);
// param.put("src_port", localPort);
// param.put("pt", sendRtpItem.getPt());
// param.put("use_ps", sendRtpItem.isUsePs() ? "1" : "0");
// param.put("only_audio", sendRtpItem.isOnlyAudio() ? "1" : "0");
// if (!sendRtpItem.isTcp()) {
// // 开启rtcp保活
// param.put("udp_rtcp_timeout", sendRtpItem.isRtcp()? "1":"0");
// }
// JSONObject startSendRtpStreamResult = zlmServerFactory.startSendRtpStreamForPassive(mediaInfo, param);
// if (startSendRtpStreamResult != null) {
// startSendRtpStreamHand(evt, sendRtpItem, null, startSendRtpStreamResult, param, callIdHeader);
// }
// }
// } catch (SipException | InvalidArgumentException | ParseException e) {
// logger.error("[命令发送失败] 国标级联 回复SdpAck", e);
// }
// };
// ErrorCallback<Object> errorEvent = ((statusCode, msg, data) -> {
// // 未知错误。直接转发设备点播的错误
// try {
// if (statusCode > 0) {
// Response response = getMessageFactory().createResponse(statusCode, evt.getRequest());
// sipSender.transmitRequest(request.getLocalAddress().getHostAddress(), response);
// }
// } catch (ParseException | SipException e) {
// logger.error("未处理的异常 ", e);
// }
// });
// sendRtpItem.setApp("rtp");
// if ("Playback".equalsIgnoreCase(sessionName)) {
// sendRtpItem.setPlayType(InviteStreamType.PLAYBACK);
// String startTimeStr = DateUtil.urlFormatter.format(start);
// String endTimeStr = DateUtil.urlFormatter.format(end);
// String stream = device.getDeviceId() + "_" + channelId + "_" + startTimeStr + "_" + endTimeStr;
// SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, stream, null, device.isSsrcCheck(), true, 0, false, device.getStreamModeForParam());
// sendRtpItem.setStreamId(ssrcInfo.getStream());
// // 写入redis 超时时回复
// redisCatchStorage.updateSendRTPSever(sendRtpItem);
// playService.playBack(mediaServerItem, ssrcInfo, device.getDeviceId(), channelId, DateUtil.formatter.format(start),
// DateUtil.formatter.format(end),
// (code, msg, data) -> {
// if (code == InviteErrorCode.SUCCESS.getCode()){
// hookEvent.run(code, msg, data);
// }else if (code == InviteErrorCode.ERROR_FOR_SIGNALLING_TIMEOUT.getCode() || code == InviteErrorCode.ERROR_FOR_STREAM_TIMEOUT.getCode()){
// logger.info("[录像回放]超时, 用户:{} 通道:{}", username, channelId);
// redisCatchStorage.deleteSendRTPServer(platform.getServerGBId(), channelId, callIdHeader.getCallId(), null);
// errorEvent.run(code, msg, data);
// }else {
// errorEvent.run(code, msg, data);
// }
// });
// } else if ("Download".equalsIgnoreCase(sessionName)) {
// // 获取指定的下载速度
// Vector sdpMediaDescriptions = sdp.getMediaDescriptions(true);
// MediaDescription mediaDescription = null;
// String downloadSpeed = "1";
// if (sdpMediaDescriptions.size() > 0) {
// mediaDescription = (MediaDescription) sdpMediaDescriptions.get(0);
// }
// if (mediaDescription != null) {
// downloadSpeed = mediaDescription.getAttribute("downloadspeed");
// }
//
// sendRtpItem.setPlayType(InviteStreamType.DOWNLOAD);
// SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, null, null, device.isSsrcCheck(), true, 0, false, device.getStreamModeForParam());
// sendRtpItem.setStreamId(ssrcInfo.getStream());
// // 写入redis 超时时回复
// redisCatchStorage.updateSendRTPSever(sendRtpItem);
// playService.download(mediaServerItem, ssrcInfo, device.getDeviceId(), channelId, DateUtil.formatter.format(start),
// DateUtil.formatter.format(end), Integer.parseInt(downloadSpeed),
// (code, msg, data) -> {
// if (code == InviteErrorCode.SUCCESS.getCode()) {
// hookEvent.run(code, msg, data);
// } else if (code == InviteErrorCode.ERROR_FOR_SIGNALLING_TIMEOUT.getCode() || code == InviteErrorCode.ERROR_FOR_STREAM_TIMEOUT.getCode()) {
// logger.info("[录像下载]超时, 用户:{} 通道:{}", username, channelId);
// redisCatchStorage.deleteSendRTPServer(platform.getServerGBId(), channelId, callIdHeader.getCallId(), null);
// errorEvent.run(code, msg, data);
// } else {
// errorEvent.run(code, msg, data);
// }
// });
// } else {
//
// SSRCInfo ssrcInfo = playService.play(mediaServerItem, device.getDeviceId(), channelId, ssrc, ((code, msg, data) -> {
// if (code == InviteErrorCode.SUCCESS.getCode()) {
// hookEvent.run(code, msg, data);
// } else if (code == InviteErrorCode.ERROR_FOR_SIGNALLING_TIMEOUT.getCode() || code == InviteErrorCode.ERROR_FOR_STREAM_TIMEOUT.getCode()) {
// logger.info("[上级点播]超时, 用户:{} 通道:{}", username, channelId);
// redisCatchStorage.deleteSendRTPServer(platform.getServerGBId(), channelId, callIdHeader.getCallId(), null);
// errorEvent.run(code, msg, data);
// } else {
// errorEvent.run(code, msg, data);
// }
// }));
// sendRtpItem.setPlayType(InviteStreamType.PLAY);
// String streamId = String.format("%s_%s", device.getDeviceId(), channelId);
// sendRtpItem.setStreamId(streamId);
// sendRtpItem.setSsrc(ssrcInfo.getSsrc());
// redisCatchStorage.updateSendRTPSever(sendRtpItem);
//
// }
// } else if (gbStream != null) {
//
// String ssrc;
// if (userSetting.getUseCustomSsrcForParentInvite() || gb28181Sdp.getSsrc() == null) {
// // 上级平台点播时不使用上级平台指定的ssrc使用自定义的ssrc参考国标文档-点播外域设备媒体流SSRC处理方式
// ssrc = "Play".equalsIgnoreCase(sessionName) ? ssrcFactory.getPlaySsrc(mediaServerItem.getId()) : ssrcFactory.getPlayBackSsrc(mediaServerItem.getId());
// }else {
// ssrc = gb28181Sdp.getSsrc();
// }
//
// if("push".equals(gbStream.getStreamType())) {
// if (streamPushItem != null && streamPushItem.isPushIng()) {
// // 推流状态
// pushStream(evt, request, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive,
// mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId);
// } else {
// // 未推流 拉起
// notifyStreamOnline(evt, request,gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive,
// mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId);
// }
// }else if ("proxy".equals(gbStream.getStreamType())){
// if (null != proxyByAppAndStream) {
// if(proxyByAppAndStream.isStatus()){
// pushProxyStream(evt, request, gbStream, platform, callIdHeader, mediaServerItem, port, tcpActive,
// mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId);
// }else{
// //开启代理拉流
// notifyStreamOnline(evt, request,gbStream, null, platform, callIdHeader, mediaServerItem, port, tcpActive,
// mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId);
// }
// }
//
//
// }
// }
} }
} catch (SdpParseException e) { } catch (SdpParseException e) {
logger.error("sdp解析错误", e); logger.error("sdp解析错误", e);

View File

@ -245,8 +245,6 @@ public class NotifyRequestForCatalogProcessor extends SIPRequestProcessorParent
logger.warn("[ NotifyCatalog ] event not found {}", event ); logger.warn("[ NotifyCatalog ] event not found {}", event );
} }
// 转发变化信息
eventPublisher.catalogEventPublish(null, channel, event);
if (updateChannelMap.keySet().size() > 0 if (updateChannelMap.keySet().size() > 0
|| addChannelMap.keySet().size() > 0 || addChannelMap.keySet().size() > 0

View File

@ -6,6 +6,7 @@ import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel; import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel;
import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform; import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform;
import com.genersoft.iot.vmp.gb28181.event.EventPublisher; import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommanderFroPlatform; import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommanderFroPlatform;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent; import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.IMessageHandler; import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.IMessageHandler;
@ -42,7 +43,7 @@ public class CatalogQueryMessageHandler extends SIPRequestProcessorParent implem
private IVideoManagerStorage storager; private IVideoManagerStorage storager;
@Autowired @Autowired
private SIPCommanderFroPlatform cmderFroPlatform; private ISIPCommanderForPlatform cmderFroPlatform;
@Autowired @Autowired
private IPlatformChannelService platformChannelService; private IPlatformChannelService platformChannelService;

View File

@ -1,11 +1,10 @@
package com.genersoft.iot.vmp.gb28181.utils; package com.genersoft.iot.vmp.gb28181.utils;
import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel; import com.genersoft.iot.vmp.gb28181.bean.*;
import com.genersoft.iot.vmp.gb28181.bean.Gb28181CodeType;
import com.genersoft.iot.vmp.gb28181.bean.Gb28181Sdp;
import com.genersoft.iot.vmp.gb28181.bean.RemoteAddressInfo;
import com.genersoft.iot.vmp.utils.DateUtil; import com.genersoft.iot.vmp.utils.DateUtil;
import com.genersoft.iot.vmp.utils.GitUtil; import com.genersoft.iot.vmp.utils.GitUtil;
import gov.nist.javax.sdp.TimeDescriptionImpl;
import gov.nist.javax.sdp.fields.TimeField;
import gov.nist.javax.sip.address.AddressImpl; import gov.nist.javax.sip.address.AddressImpl;
import gov.nist.javax.sip.address.SipUri; import gov.nist.javax.sip.address.SipUri;
import gov.nist.javax.sip.header.Subject; import gov.nist.javax.sip.header.Subject;
@ -15,21 +14,24 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.util.ObjectUtils; import org.springframework.util.ObjectUtils;
import javax.sdp.SdpFactory; import javax.sdp.*;
import javax.sdp.SdpParseException; import javax.sip.InvalidArgumentException;
import javax.sdp.SessionDescription;
import javax.sip.PeerUnavailableException; import javax.sip.PeerUnavailableException;
import javax.sip.SipException;
import javax.sip.SipFactory; import javax.sip.SipFactory;
import javax.sip.header.FromHeader; import javax.sip.header.FromHeader;
import javax.sip.header.Header; import javax.sip.header.Header;
import javax.sip.header.UserAgentHeader; import javax.sip.header.UserAgentHeader;
import javax.sip.message.Request; import javax.sip.message.Request;
import javax.sip.message.Response;
import java.text.ParseException; import java.text.ParseException;
import java.time.Instant;
import java.time.LocalDateTime; import java.time.LocalDateTime;
import java.time.format.DateTimeParseException; import java.time.format.DateTimeParseException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.UUID; import java.util.UUID;
import java.util.Vector;
/** /**
* @author panlinlin * @author panlinlin
@ -203,7 +205,7 @@ public class SipUtils {
return deviceChannel; return deviceChannel;
} }
public static Gb28181Sdp parseSDP(String sdpStr) throws SdpParseException { public static Gb28181Sdp parseSDP(String sdpStr) throws SdpException {
// jainSip不支持y= f=字段, 移除以解析。 // jainSip不支持y= f=字段, 移除以解析。
int ssrcIndex = sdpStr.indexOf("y="); int ssrcIndex = sdpStr.indexOf("y=");

View File

@ -413,18 +413,8 @@ public class ZLMHttpHookListener {
type = OriginType.values()[onStreamChangedHookParam.getOriginType()].getType(); type = OriginType.values()[onStreamChangedHookParam.getOriginType()].getType();
redisCatchStorage.removeStream(mediaInfo.getId(), type, param.getApp(), param.getStream()); redisCatchStorage.removeStream(mediaInfo.getId(), type, param.getApp(), param.getStream());
} }
GbStream gbStream = storager.getGbStream(param.getApp(), param.getStream());
if (gbStream != null) {
// eventPublisher.catalogEventPublishForStream(null, gbStream, CatalogEvent.OFF);
}
zlmMediaListManager.streamOffline(param.getApp(), param.getStream()); zlmMediaListManager.streamOffline(param.getApp(), param.getStream());
} }
GbStream gbStream = storager.getGbStream(param.getApp(), param.getStream());
if (gbStream != null) {
if (userSetting.isUsePushingAsStatus()) {
eventPublisher.catalogEventPublishForStream(null, gbStream, param.isRegist()?CatalogEvent.ON:CatalogEvent.OFF);
}
}
if (type != null) { if (type != null) {
// 发送流变化redis消息 // 发送流变化redis消息
JSONObject jsonObject = new JSONObject(); JSONObject jsonObject = new JSONObject();

View File

@ -162,7 +162,7 @@ public class ZLMServerFactory {
* @return SendRtpItem * @return SendRtpItem
*/ */
public SendRtpItem createSendRtpItem(MediaServerItem serverItem, String ip, int port, String ssrc, String platformId, public SendRtpItem createSendRtpItem(MediaServerItem serverItem, String ip, int port, String ssrc, String platformId,
String deviceId, String channelId, boolean tcp, boolean rtcp){ String channelId, boolean tcp, boolean rtcp){
int localPort = sendRtpPortManager.getNextPort(serverItem); int localPort = sendRtpPortManager.getNextPort(serverItem);
if (localPort == 0) { if (localPort == 0) {
@ -173,7 +173,6 @@ public class ZLMServerFactory {
sendRtpItem.setPort(port); sendRtpItem.setPort(port);
sendRtpItem.setSsrc(ssrc); sendRtpItem.setSsrc(ssrc);
sendRtpItem.setPlatformId(platformId); sendRtpItem.setPlatformId(platformId);
sendRtpItem.setDeviceId(deviceId);
sendRtpItem.setChannelId(channelId); sendRtpItem.setChannelId(channelId);
sendRtpItem.setTcp(tcp); sendRtpItem.setTcp(tcp);
sendRtpItem.setRtcp(rtcp); sendRtpItem.setRtcp(rtcp);

View File

@ -0,0 +1,8 @@
package com.genersoft.iot.vmp.media.zlm.service;
/**
* rtp
*/
public interface IReceiveRtpService {
}

View File

@ -0,0 +1,7 @@
package com.genersoft.iot.vmp.media.zlm.service;
/**
* rtp
*/
public interface ISendRtpService {
}

View File

@ -0,0 +1,50 @@
package com.genersoft.iot.vmp.media.zlm.service.impl;
import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
import com.genersoft.iot.vmp.media.zlm.SendRtpPortManager;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.media.zlm.service.ISendRtpService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Service
public class SendRtpServiceImpl implements ISendRtpService {
@Autowired
private SendRtpPortManager sendRtpPortManager;
/**
*
* @param serverItem 使
* @param dstIp IP
* @param dstPort
* @param ssrc SSRC
* @param sourceId
* @param callId InvitecallId
* @param tcp 使TCP
* @param rtcp RTCPtcpfalse
* @return SendRtpItem
*/
public SendRtpItem createSendRtpInfo(MediaServerItem serverItem, String dstIp, int dstPort, String ssrc, String sourceId,
String callId, boolean tcp, boolean rtcp){
// int localPort = sendRtpPortManager.getNextPort(serverItem);
// if (localPort == 0) {
// return null;
// }
// SendRtpItem sendRtpItem = new SendRtpItem();
// sendRtpItem.setIp(ip);
// sendRtpItem.setPort(port);
// sendRtpItem.setSsrc(ssrc);
// sendRtpItem.setPlatformId(platformId);
// sendRtpItem.setChannelId(channelId);
// sendRtpItem.setTcp(tcp);
// sendRtpItem.setRtcp(rtcp);
// sendRtpItem.setApp("rtp");
// sendRtpItem.setLocalPort(localPort);
// sendRtpItem.setServerId(userSetting.getServerId());
// sendRtpItem.setMediaServerId(serverItem.getId());
// return sendRtpItem;
return null;
}
}

View File

@ -87,4 +87,6 @@ public interface ICommonGbChannelService {
void deleteByIdList(List<Integer> commonChannelIdList); void deleteByIdList(List<Integer> commonChannelIdList);
void offlineForList(List<Integer> onlinePushers); void offlineForList(List<Integer> onlinePushers);
void onlineForList(List<Integer> commonChannelIdList);
} }

View File

@ -30,4 +30,9 @@ public interface IPlatformChannelService {
* *
*/ */
List<CommonGbChannel> queryChannelList(ParentPlatform platform); List<CommonGbChannel> queryChannelList(ParentPlatform platform);
/**
*
*/
CommonGbChannel queryChannelByPlatformIdAndChannelDeviceId(Integer platformId, String channelId);
} }

View File

@ -2,6 +2,7 @@ package com.genersoft.iot.vmp.service;
import com.genersoft.iot.vmp.common.CommonGbChannel; import com.genersoft.iot.vmp.common.CommonGbChannel;
import com.genersoft.iot.vmp.common.StreamInfo; import com.genersoft.iot.vmp.common.StreamInfo;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
/** /**
* *
@ -10,10 +11,6 @@ public interface IResourcePlayCallback {
/** /**
* *
* @param commonGbChannel
* @param code
* @param message
* @param streamInfo
*/ */
void call(CommonGbChannel commonGbChannel, int code, String message, StreamInfo streamInfo); void call(CommonGbChannel commonGbChannel, MediaServerItem mediaServerItem, int code, String message, StreamInfo streamInfo);
} }

View File

@ -42,4 +42,14 @@ public interface IResourceService {
* 线 * 线
*/ */
void streamOffline(String app, String streamId); void streamOffline(String app, String streamId);
/**
*
*/
void startPlayback(CommonGbChannel channel, Long startTime, Long stopTime, IResourcePlayCallback callback);
/**
*
*/
void startDownload(CommonGbChannel channel, Long startTime, Long stopTime, Integer downloadSpeed, IResourcePlayCallback playCallback);
} }

View File

@ -784,4 +784,9 @@ public class CommonGbChannelServiceImpl implements ICommonGbChannelService {
// TODO 向国标级联发送catalog // TODO 向国标级联发送catalog
} }
@Override
public void onlineForList(List<Integer> commonChannelIdList) {
// TODO 向国标级联发送catalog
}
} }

View File

@ -265,12 +265,18 @@ public class DeviceChannelServiceImpl implements IDeviceChannelService {
@Override @Override
public void batchAddChannel(List<DeviceChannel> channels) { public void batchAddChannel(List<DeviceChannel> channels) {
List<CommonGbChannel> commonGbChannelList = new ArrayList<>();
channels.stream().forEach(channel->{
commonGbChannelList.add(CommonGbChannel.getInstance(null, channel));
});
channelMapper.batchAdd(channels); channelMapper.batchAdd(channels);
for (DeviceChannel channel : channels) { for (DeviceChannel channel : channels) {
if (channel.getParentId() != null) { if (channel.getParentId() != null) {
channelMapper.updateChannelSubCount(channel.getDeviceId(), channel.getParentId()); channelMapper.updateChannelSubCount(channel.getDeviceId(), channel.getParentId());
} }
} }
} }
@Override @Override

View File

@ -146,8 +146,11 @@ public class PlatformChannelServiceImpl implements IPlatformChannelService {
} }
} }
return null;
}
@Override
public CommonGbChannel queryChannelByPlatformIdAndChannelDeviceId(Integer platformId, String channelId) {
return null; return null;
} }
} }

View File

@ -395,20 +395,34 @@ public class StreamPushServiceImpl implements IStreamPushService {
@Override @Override
public void offline(List<StreamPushItemFromRedis> offlineStreams) { public void offline(List<StreamPushItemFromRedis> offlineStreams) {
List<StreamPush> streamPushList = streamPushMapper.getListIn(offlineStreams);
List<Integer> commonChannelIdList = new ArrayList<>();
streamPushList.stream().forEach(streamPush -> {
commonChannelIdList.add(streamPush.getCommonGbChannelId());
});
// 更新部分设备离线 // 更新部分设备离线
List<GbStream> onlinePushers = streamPushMapper.getOnlinePusherForGbInList(offlineStreams); streamPushMapper.offline(streamPushList);
streamPushMapper.offline(offlineStreams); if (!commonChannelIdList.isEmpty()) {
// 发送通知 commonGbChannelService.offlineForList(commonChannelIdList);
eventPublisher.catalogEventPublishForStream(null, onlinePushers, CatalogEvent.OFF); }
} }
@Override @Override
public void online(List<StreamPushItemFromRedis> onlineStreams) { public void online(List<StreamPushItemFromRedis> onlineStreams) {
// 更新部分设备上线streamPushService List<StreamPush> streamPushList = streamPushMapper.getListIn(onlineStreams);
List<GbStream> onlinePushers = streamPushMapper.getOfflinePusherForGbInList(onlineStreams); List<Integer> commonChannelIdList = new ArrayList<>();
streamPushMapper.online(onlineStreams); streamPushList.stream().forEach(streamPush -> {
// 发送通知 commonChannelIdList.add(streamPush.getCommonGbChannelId());
eventPublisher.catalogEventPublishForStream(null, onlinePushers, CatalogEvent.ON); });
// 更新部分设备离线
streamPushMapper.offline(streamPushList);
if (!commonChannelIdList.isEmpty()) {
commonGbChannelService.onlineForList(commonChannelIdList);
}
} }
@Override @Override

View File

@ -142,12 +142,12 @@ public interface StreamPushMapper {
List<GbStream> getOnlinePusherForGbInList(List<StreamPushItemFromRedis> offlineStreams); List<GbStream> getOnlinePusherForGbInList(List<StreamPushItemFromRedis> offlineStreams);
@Update("<script> "+ @Update("<script> "+
"UPDATE wvp_stream_push SET status=0 where (app, stream) in (" + "UPDATE wvp_stream_push SET status=0 where id in (" +
"<foreach collection='offlineStreams' item='item' separator=','>" + "<foreach collection='offlineStreams' item='item' separator=','>" +
"(#{item.app}, #{item.stream}) " + "#{item.id} " +
"</foreach>" + "</foreach>" +
")</script>") ")</script>")
void offline(List<StreamPushItemFromRedis> offlineStreams); void offline(List<StreamPush> offlineStreams);
@Select("<script> "+ @Select("<script> "+
"SELECT * FROM wvp_stream_push sp left join wvp_gb_stream gs on sp.app = gs.app AND sp.stream = gs.stream " + "SELECT * FROM wvp_stream_push sp left join wvp_gb_stream gs on sp.app = gs.app AND sp.stream = gs.stream " +
@ -186,12 +186,12 @@ public interface StreamPushMapper {
int getAllOnline(Boolean usePushingAsStatus); int getAllOnline(Boolean usePushingAsStatus);
@Select("<script> " + @Select("<script> " +
"select app, stream from wvp_stream_push where (app, stream) in " + "select * from wvp_stream_push where (app, stream) in " +
"<foreach collection='streamPushItems' item='item' separator=','>" + "<foreach collection='streamPushItems' item='item' separator=','>" +
"(#{item.app}, #{item.stream}) " + "(#{item.app}, #{item.stream}) " +
"</foreach>" + "</foreach>" +
"</script>") "</script>")
List<StreamPush> getListIn(List<StreamPush> streamPushItems); List<StreamPush> getListIn(@Param("streamPushItems") List<StreamPushItemFromRedis> streamPushItems);
@Select("select* from wvp_stream_push where id = #{id}") @Select("select* from wvp_stream_push where id = #{id}")
StreamPush query(@Param("id") Integer id); StreamPush query(@Param("id") Integer id);