Merge branch 'refs/heads/master' into dev/数据库统合2

# Conflicts:
#	src/main/java/com/genersoft/iot/vmp/conf/UserSetting.java
#	src/main/java/com/genersoft/iot/vmp/gb28181/controller/DeviceConfig.java
#	src/main/java/com/genersoft/iot/vmp/gb28181/controller/GBRecordController.java
#	src/main/java/com/genersoft/iot/vmp/gb28181/service/IDeviceService.java
#	src/main/java/com/genersoft/iot/vmp/gb28181/service/IGbChannelPlayService.java
#	src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/DeviceServiceImpl.java
#	src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/GbChannelPlayServiceImpl.java
#	src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlayServiceImpl.java
#	src/main/java/com/genersoft/iot/vmp/streamProxy/service/impl/StreamProxyPlayServiceImpl.java
#	src/main/resources/配置详情.yml
master
lin 2025-02-11 19:30:20 +08:00
commit b6c3f42a1f
43 changed files with 370 additions and 180 deletions

View File

@ -35,10 +35,12 @@ public class InviteInfo {
private Long createTime;
private Boolean record;
public static InviteInfo getInviteInfo(String deviceId, Integer channelId, String stream, SSRCInfo ssrcInfo, String mediaServerId,
String receiveIp, Integer receivePort, String streamMode,
InviteSessionType type, InviteSessionStatus status) {
InviteSessionType type, InviteSessionStatus status, Boolean record) {
InviteInfo inviteInfo = new InviteInfo();
inviteInfo.setDeviceId(deviceId);
inviteInfo.setChannelId(channelId);
@ -50,6 +52,7 @@ public class InviteInfo {
inviteInfo.setType(type);
inviteInfo.setStatus(status);
inviteInfo.setMediaServerId(mediaServerId);
inviteInfo.setRecord(record);
return inviteInfo;
}

View File

@ -180,6 +180,11 @@ public class UserSetting {
*/
private long loginTimeout = 30;
/**
* jwk使resourcesjwk.json
*/
private String jwkFile = "classpath:jwk.json";
/**
* wvpwvpwvp
*/

View File

@ -21,12 +21,16 @@ import org.jose4j.jwt.consumer.JwtConsumer;
import org.jose4j.jwt.consumer.JwtConsumerBuilder;
import org.jose4j.lang.JoseException;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.core.io.ClassPathResource;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.io.BufferedReader;
import java.io.File;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.util.List;
@ -92,8 +96,46 @@ public class JwtUtils implements InitializingBean {
*/
private RsaJsonWebKey generateRsaJsonWebKey() throws JoseException {
RsaJsonWebKey rsaJsonWebKey = null;
try (BufferedReader reader = new BufferedReader(new InputStreamReader(getClass().getClassLoader().getResourceAsStream("/jwk.json"), StandardCharsets.UTF_8))) {
String jwkJson = reader.readLine();
try {
String jwkFile = userSetting.getJwkFile();
InputStream inputStream = null;
if (jwkFile.startsWith("classpath:")){
String filePath = jwkFile.substring("classpath:".length());
ClassPathResource civilCodeFile = new ClassPathResource(filePath);
if (civilCodeFile.exists()) {
inputStream = civilCodeFile.getInputStream();
}
}else {
File civilCodeFile = new File(userSetting.getCivilCodeFile());
if (civilCodeFile.exists()) {
inputStream = Files.newInputStream(civilCodeFile.toPath());
}
}
if (inputStream == null ) {
log.warn("[API AUTH] 读取jwk.json失败文件不存在将使用新生成的随机RSA密钥对");
// 生成一个RSA密钥对该密钥对将用于JWT的签名和验证包装在JWK中
rsaJsonWebKey = RsaJwkGenerator.generateJwk(2048);
// 给JWK一个密钥ID
rsaJsonWebKey.setKeyId(keyId);
return rsaJsonWebKey;
}
BufferedReader inputStreamReader = new BufferedReader(new InputStreamReader(inputStream, StandardCharsets.UTF_8));
int index = -1;
String line;
StringBuilder content = new StringBuilder();
while ((line = inputStreamReader.readLine()) != null) {
content.append(line);
index ++;
if (index == 0) {
continue;
}
}
inputStreamReader.close();
inputStream.close();
String jwkJson = content.toString();
JsonWebKeySet jsonWebKeySet = new JsonWebKeySet(jwkJson);
List<JsonWebKey> jsonWebKeys = jsonWebKeySet.getJsonWebKeys();
if (!jsonWebKeys.isEmpty()) {
@ -102,14 +144,15 @@ public class JwtUtils implements InitializingBean {
rsaJsonWebKey = (RsaJsonWebKey) jsonWebKey;
}
}
} catch (Exception e) {
// ignored
}
} catch (Exception ignore) {}
if (rsaJsonWebKey == null) {
log.warn("[API AUTH] 读取jwk.json失败获取内容失败将使用新生成的随机RSA密钥对");
// 生成一个RSA密钥对该密钥对将用于JWT的签名和验证包装在JWK中
rsaJsonWebKey = RsaJwkGenerator.generateJwk(2048);
// 给JWK一个密钥ID
rsaJsonWebKey.setKeyId(keyId);
}else {
log.info("[API AUTH] 读取jwk.json成功");
}
return rsaJsonWebKey;
}

View File

@ -148,8 +148,10 @@ public class WebSecurityConfig extends WebSecurityConfigurerAdapter {
corsConfiguration.setAllowCredentials(true);
corsConfiguration.setAllowedOrigins(userSetting.getAllowedOrigins());
}else {
corsConfiguration.setAllowCredentials(false);
corsConfiguration.setAllowedOrigins(Collections.singletonList(CorsConfiguration.ALL));
// 在SpringBoot 2.4及以上版本处理跨域时遇到错误提示当allowCredentials为true时allowedOrigins不能包含特殊值"*"。
// 解决方法是明确指定allowedOrigins或使用allowedOriginPatterns。
corsConfiguration.setAllowCredentials(true);
corsConfiguration.addAllowedOriginPattern(CorsConfiguration.ALL); // 默认全部允许所有跨域
}
corsConfiguration.setExposedHeaders(Arrays.asList(JwtUtils.getHeader()));

View File

@ -104,7 +104,21 @@ public class Device {
*
*/
@Schema(description = "心跳间隔")
private int keepaliveIntervalTime;
private Integer heartBeatInterval;
/**
*
*/
@Schema(description = "心跳超时次数")
private Integer heartBeatCount;
/**
*
*/
@Schema(description = "定位功能支持情况。取值:0-不支持;1-支持 GPS定位;2-支持北斗定位(可选,默认取值为0")
private Integer positionCapability;
/**
*

View File

@ -27,6 +27,11 @@ public class SsrcTransaction {
*/
private String callId;
/**
*
*/
private String app;
/**
* ID
*/
@ -52,12 +57,13 @@ public class SsrcTransaction {
*/
private InviteSessionType type;
public static SsrcTransaction buildForDevice(String deviceId, Integer channelId, String callId, String stream,
public static SsrcTransaction buildForDevice(String deviceId, Integer channelId, String callId, String app, String stream,
String ssrc, String mediaServerId, SIPResponse response, InviteSessionType type) {
SsrcTransaction ssrcTransaction = new SsrcTransaction();
ssrcTransaction.setDeviceId(deviceId);
ssrcTransaction.setChannelId(channelId);
ssrcTransaction.setCallId(callId);
ssrcTransaction.setApp(app);
ssrcTransaction.setStream(stream);
ssrcTransaction.setMediaServerId(mediaServerId);
ssrcTransaction.setSsrc(ssrc);
@ -65,13 +71,14 @@ public class SsrcTransaction {
ssrcTransaction.setType(type);
return ssrcTransaction;
}
public static SsrcTransaction buildForPlatform(String platformId, Integer channelId, String callId, String stream,
public static SsrcTransaction buildForPlatform(String platformId, Integer channelId, String callId, String app,String stream,
String ssrc, String mediaServerId, SIPResponse response, InviteSessionType type) {
SsrcTransaction ssrcTransaction = new SsrcTransaction();
ssrcTransaction.setPlatformId(platformId);
ssrcTransaction.setChannelId(channelId);
ssrcTransaction.setCallId(callId);
ssrcTransaction.setStream(stream);
ssrcTransaction.setApp(app);
ssrcTransaction.setMediaServerId(mediaServerId);
ssrcTransaction.setSsrc(ssrc);
ssrcTransaction.setSipTransactionInfo(new SipTransactionInfo(response));

View File

@ -3,7 +3,10 @@ package com.genersoft.iot.vmp.gb28181.controller;
import com.genersoft.iot.vmp.common.StreamInfo;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.conf.security.JwtUtils;
import com.genersoft.iot.vmp.gb28181.bean.*;
import com.genersoft.iot.vmp.gb28181.bean.CommonGBChannel;
import com.genersoft.iot.vmp.gb28181.bean.DeviceType;
import com.genersoft.iot.vmp.gb28181.bean.IndustryCodeType;
import com.genersoft.iot.vmp.gb28181.bean.NetworkIdentificationType;
import com.genersoft.iot.vmp.gb28181.controller.bean.ChannelToGroupByGbDeviceParam;
import com.genersoft.iot.vmp.gb28181.controller.bean.ChannelToGroupParam;
import com.genersoft.iot.vmp.gb28181.controller.bean.ChannelToRegionByGbDeviceParam;
@ -29,7 +32,6 @@ import org.springframework.web.bind.annotation.*;
import org.springframework.web.context.request.async.DeferredResult;
import javax.servlet.http.HttpServletRequest;
import javax.sip.message.Response;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.List;
@ -261,7 +263,7 @@ public class CommonChannelController {
result.setResult(WVPResult.fail(code, msg));
}
};
channelPlayService.play(channel, null, callback);
channelPlayService.play(channel, null, userSetting.getRecordSip(), callback);
return result;
}
}

View File

@ -67,7 +67,7 @@ public class DeviceConfig {
@Parameter(name = "heartBeatInterval", description = "心跳间隔")
@Parameter(name = "heartBeatCount", description = "心跳计数")
public DeferredResult<String> homePositionApi(@PathVariable String deviceId,
String channelId,
@RequestParam(required = false) String channelId,
@RequestParam(required = false) String name,
@RequestParam(required = false) String expiration,
@RequestParam(required = false) String heartBeatInterval,
@ -109,6 +109,8 @@ public class DeviceConfig {
if (log.isDebugEnabled()) {
log.debug("设备状态查询API调用");
}
String key = DeferredResultHolder.CALLBACK_CMD_CONFIGDOWNLOAD + (ObjectUtils.isEmpty(channelId) ? deviceId : deviceId + channelId);
String uuid = UUID.randomUUID().toString();
Device device = deviceService.getDeviceByDeviceId(deviceId);
Assert.notNull(device, "设备不存在");

View File

@ -67,7 +67,9 @@ public interface DeviceMapper {
"expires," +
"register_time," +
"keepalive_time," +
"keepalive_interval_time," +
"heart_beat_interval," +
"heart_beat_count," +
"position_capability," +
"create_time," +
"update_time," +
"charset," +
@ -98,7 +100,9 @@ public interface DeviceMapper {
"#{expires}," +
"#{registerTime}," +
"#{keepaliveTime}," +
"#{keepaliveIntervalTime}," +
"#{heartBeatInterval}," +
"#{heartBeatCount}," +
"#{positionCapability}," +
"#{createTime}," +
"#{updateTime}," +
"#{charset}," +
@ -131,7 +135,9 @@ public interface DeviceMapper {
"<if test=\"onLine != null\">, on_line=#{onLine}</if>" +
"<if test=\"registerTime != null\">, register_time=#{registerTime}</if>" +
"<if test=\"keepaliveTime != null\">, keepalive_time=#{keepaliveTime}</if>" +
"<if test=\"keepaliveIntervalTime != null\">, keepalive_interval_time=#{keepaliveIntervalTime}</if>" +
"<if test=\"heartBeatInterval != null\">, heart_beat_interval=#{heartBeatInterval}</if>" +
"<if test=\"positionCapability != null\">, position_capability=#{positionCapability}</if>" +
"<if test=\"heartBeatCount != null\">, heart_beat_count=#{heartBeatCount}</if>" +
"<if test=\"expires != null\">, expires=#{expires}</if>" +
"<if test=\"serverId != null\">, server_id=#{serverId}</if>" +
"WHERE device_id=#{deviceId}"+

View File

@ -192,4 +192,5 @@ public interface IDeviceService {
DeferredResult<WVPResult<String>> deviceStatus(Device device);
void updateDeviceHeartInfo(Device device);
}

View File

@ -14,12 +14,14 @@ public interface IGbChannelPlayService {
void stopPlay(InviteSessionType type, CommonGBChannel channel, String stream);
void play(CommonGBChannel channel, Platform platform, ErrorCallback<StreamInfo> callback);
void play(CommonGBChannel channel, Platform platform, Boolean record, ErrorCallback<StreamInfo> callback);
void playGbDeviceChannel(CommonGBChannel channel, ErrorCallback<StreamInfo> callback);
void playGbDeviceChannel(CommonGBChannel channel, Boolean record, ErrorCallback<StreamInfo> callback);
void stopPlayDeviceChannel(InviteSessionType type, CommonGBChannel channel, String stream);
void playProxy(CommonGBChannel channel, ErrorCallback<StreamInfo> callback);
void playProxy(CommonGBChannel channel, Boolean record, ErrorCallback<StreamInfo> callback);
void stopPlayProxy(CommonGBChannel channel);

View File

@ -76,7 +76,7 @@ public interface IPlatformService {
/**
* BYE
*/
void stopBroadcast(Platform platform, CommonGBChannel channel, String stream, boolean sendBye, MediaServer mediaServerItem);
void stopBroadcast(Platform platform, CommonGBChannel channel, String app, String stream, boolean sendBye, MediaServer mediaServerItem);
void addSimulatedSubscribeInfo(Platform parentPlatform);

View File

@ -66,7 +66,7 @@ public interface IPlayService {
void stop(InviteInfo inviteInfo);
void play(CommonGBChannel channel, ErrorCallback<StreamInfo> callback);
void play(CommonGBChannel channel, Boolean record, ErrorCallback<StreamInfo> callback);
void stop(InviteSessionType inviteSessionType, CommonGBChannel channel, String stream);

View File

@ -50,6 +50,7 @@ import java.text.ParseException;
import java.time.Instant;
import java.util.List;
import java.util.UUID;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
/**
@ -121,9 +122,12 @@ public class DeviceServiceImpl implements IDeviceService {
}
device.setUpdateTime(now);
device.setKeepaliveTime(now);
if (device.getKeepaliveIntervalTime() == 0) {
// 默认心跳间隔60
device.setKeepaliveIntervalTime(60);
if (device.getHeartBeatCount() == null) {
// 读取设备配置, 获取心跳间隔和心跳超时次数, 在次之前暂时设置为默认值
device.setHeartBeatCount(3);
device.setHeartBeatInterval(60);
device.setPositionCapability(0);
}
if (sipTransactionInfo != null) {
device.setSipTransactionInfo(sipTransactionInfo);
@ -143,6 +147,7 @@ public class DeviceServiceImpl implements IDeviceService {
redisCatchStorage.updateDevice(device);
try {
commander.deviceInfoQuery(device);
commander.deviceConfigQuery(device, null, "BasicParam", null);
} catch (InvalidArgumentException | SipException | ParseException e) {
log.error("[命令发送失败] 查询设备信息: {}", e.getMessage());
}
@ -189,18 +194,8 @@ public class DeviceServiceImpl implements IDeviceService {
// 刷新过期任务
String registerExpireTaskKey = VideoManagerConstants.REGISTER_EXPIRE_TASK_KEY_PREFIX + device.getDeviceId();
// 如果第一次注册那么必须在60 * 3时间内收到一个心跳否则设备离线
dynamicTask.startDelay(registerExpireTaskKey, ()-> offline(device.getDeviceId(), "首次注册后未能收到心跳"), device.getKeepaliveIntervalTime() * 1000 * 3);
//
// try {
// cmder.alarmSubscribe(device, 600, "0", "4", "0", "2023-7-27T00:00:00", "2023-7-28T00:00:00");
// } catch (InvalidArgumentException e) {
// throw new RuntimeException(e);
// } catch (SipException e) {
// throw new RuntimeException(e);
// } catch (ParseException e) {
// throw new RuntimeException(e);
// }
dynamicTask.startDelay(registerExpireTaskKey, ()-> offline(device.getDeviceId(), "三次心跳超时"),
device.getHeartBeatInterval() * 1000 * device.getHeartBeatCount());
}
@ -213,7 +208,7 @@ public class DeviceServiceImpl implements IDeviceService {
return;
}
log.info("[设备离线] device{} 当前心跳间隔: {} 上次心跳时间:{} 上次注册时间: {}", deviceId,
device.getKeepaliveIntervalTime(), device.getKeepaliveTime(), device.getRegisterTime());
device.getHeartBeatInterval(), device.getKeepaliveTime(), device.getRegisterTime());
String registerExpireTaskKey = VideoManagerConstants.REGISTER_EXPIRE_TASK_KEY_PREFIX + deviceId;
dynamicTask.stop(registerExpireTaskKey);
if (device.isOnLine()) {
@ -612,6 +607,26 @@ public class DeviceServiceImpl implements IDeviceService {
}
}
@Override
public void updateDeviceHeartInfo(Device device) {
Device deviceInDb = deviceMapper.query(device.getId());
if (deviceInDb == null) {
return;
}
if (!Objects.equals(deviceInDb.getHeartBeatCount(), device.getHeartBeatCount())
|| !Objects.equals(deviceInDb.getHeartBeatInterval(), device.getHeartBeatInterval())) {
// 刷新过期任务
String registerExpireTaskKey = VideoManagerConstants.REGISTER_EXPIRE_TASK_KEY_PREFIX + device.getDeviceId();
// 如果第一次注册那么必须在60 * 3时间内收到一个心跳否则设备离线
dynamicTask.startDelay(registerExpireTaskKey, ()-> offline(device.getDeviceId(), "三次心跳超时"),
device.getHeartBeatInterval() * 1000 * device.getHeartBeatCount());
deviceInDb.setHeartBeatCount(device.getHeartBeatCount());
deviceInDb.setHeartBeatInterval(device.getHeartBeatInterval());
deviceInDb.setPositionCapability(device.getPositionCapability());
updateDevice(deviceInDb);
}
}
@Override
public WVPResult<SyncStatus> devicesSync(Device device) {

View File

@ -5,13 +5,13 @@ import com.genersoft.iot.vmp.common.StreamInfo;
import com.genersoft.iot.vmp.conf.exception.ServiceException;
import com.genersoft.iot.vmp.gb28181.bean.*;
import com.genersoft.iot.vmp.common.enums.ChannelDataType;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.gb28181.bean.CommonGBChannel;
import com.genersoft.iot.vmp.gb28181.bean.Platform;
import com.genersoft.iot.vmp.gb28181.bean.PlayException;
import com.genersoft.iot.vmp.gb28181.service.IGbChannelPlayService;
import com.genersoft.iot.vmp.gb28181.service.IPlayService;
import com.genersoft.iot.vmp.service.bean.ErrorCallback;
import com.genersoft.iot.vmp.service.bean.InviteErrorCode;
import com.genersoft.iot.vmp.streamProxy.service.IStreamProxyPlayService;
import com.genersoft.iot.vmp.streamPush.service.IStreamPushPlayService;
import lombok.extern.slf4j.Slf4j;
@ -36,6 +36,9 @@ public class GbChannelPlayServiceImpl implements IGbChannelPlayService {
@Autowired
private IStreamPushPlayService streamPushPlayService;
@Autowired
private UserSetting userSetting;
@Override
public void start(CommonGBChannel channel, InviteMessageInfo inviteInfo, Platform platform, ErrorCallback<StreamInfo> callback) {
@ -45,7 +48,7 @@ public class GbChannelPlayServiceImpl implements IGbChannelPlayService {
}
log.info("[点播通用通道] 类型:{} 通道: {}({})", inviteInfo.getSessionName(), channel.getGbName(), channel.getGbDeviceId());
if ("Play".equalsIgnoreCase(inviteInfo.getSessionName())) {
play(channel, platform, callback);
play(channel, platform, userSetting.getRecordSip(), callback);
}else if ("Playback".equals(inviteInfo.getSessionName())) {
if (channel.getDataType() == ChannelDataType.GB28181.value) {
// 国标通道
@ -113,13 +116,13 @@ public class GbChannelPlayServiceImpl implements IGbChannelPlayService {
}
@Override
public void play(CommonGBChannel channel, Platform platform, ErrorCallback<StreamInfo> callback) {
public void play(CommonGBChannel channel, Platform platform, Boolean record, ErrorCallback<StreamInfo> callback) {
if (channel.getDataType() == ChannelDataType.GB28181.value) {
// 国标通道
playGbDeviceChannel(channel, callback);
playGbDeviceChannel(channel, record, callback);
} else if (channel.getDataType() == ChannelDataType.STREAM_PROXY.value) {
// 拉流代理
playProxy(channel, callback);
playProxy(channel, record, callback);
} else if (channel.getDataType() == ChannelDataType.STREAM_PUSH.value) {
if (platform != null) {
// 推流
@ -136,10 +139,10 @@ public class GbChannelPlayServiceImpl implements IGbChannelPlayService {
}
@Override
public void playGbDeviceChannel(CommonGBChannel channel, ErrorCallback<StreamInfo> callback){
public void playGbDeviceChannel(CommonGBChannel channel, Boolean record, ErrorCallback<StreamInfo> callback){
// 国标通道
try {
deviceChannelPlayService.play(channel, callback);
deviceChannelPlayService.play(channel, record, callback);
} catch (PlayException e) {
callback.run(e.getCode(), e.getMsg(), null);
} catch (Exception e) {
@ -159,10 +162,10 @@ public class GbChannelPlayServiceImpl implements IGbChannelPlayService {
}
@Override
public void playProxy(CommonGBChannel channel, ErrorCallback<StreamInfo> callback){
public void playProxy(CommonGBChannel channel, Boolean record, ErrorCallback<StreamInfo> callback){
// 拉流代理通道
try {
streamProxyPlayService.start(channel.getDataDeviceId(), callback);
streamProxyPlayService.start(channel.getDataDeviceId(), record, callback);
}catch (Exception e) {
callback.run(Response.BUSY_HERE, "busy here", null);
}

View File

@ -637,7 +637,7 @@ public class PlatformServiceImpl implements IPlatformService {
// 初始化redis中的invite消息状态
InviteInfo inviteInfo = InviteInfo.getInviteInfo(platform.getServerGBId(), channel.getGbId(), ssrcInfo.getStream(), ssrcInfo, mediaServerItem.getId(),
mediaServerItem.getSdpIp(), ssrcInfo.getPort(), userSetting.getBroadcastForPlatform(), InviteSessionType.BROADCAST,
InviteSessionStatus.ready);
InviteSessionStatus.ready, userSetting.getRecordSip());
inviteStreamService.updateInviteInfo(inviteInfo);
String timeOutTaskKey = UUID.randomUUID().toString();
dynamicTask.startDelay(timeOutTaskKey, () -> {
@ -647,14 +647,14 @@ public class PlatformServiceImpl implements IPlatformService {
log.info("[国标级联] 发起语音喊话 收流超时 deviceId: {}, channelId: {},端口:{}, SSRC: {}", platform.getServerGBId(), channel.getGbDeviceId(), ssrcInfo.getPort(), ssrcInfo.getSsrc());
// 点播超时回复BYE 同时释放ssrc以及此次点播的资源
try {
commanderForPlatform.streamByeCmd(platform, channel, ssrcInfo.getStream(), null, null);
commanderForPlatform.streamByeCmd(platform, channel, ssrcInfo.getApp(), ssrcInfo.getStream(), null, null);
} catch (InvalidArgumentException | ParseException | SipException | SsrcTransactionNotFoundException e) {
log.error("[点播超时] 发送BYE失败 {}", e.getMessage());
} finally {
timeoutCallback.run(1, "收流超时");
mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream());
sessionManager.removeByStream(ssrcInfo.getStream());
sessionManager.removeByStream(ssrcInfo.getApp(), ssrcInfo.getStream());
mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream());
}
}
@ -732,7 +732,7 @@ public class PlatformServiceImpl implements IPlatformService {
if (!result) {
try {
log.warn("[Invite 200OK] 更新ssrc失败停止喊话 {}/{}", platform.getServerGBId(), channel.getGbDeviceId());
commanderForPlatform.streamByeCmd(platform, channel, ssrcInfo.getStream(), null, null);
commanderForPlatform.streamByeCmd(platform, channel, ssrcInfo.getApp(), ssrcInfo.getStream(), null, null);
} catch (InvalidArgumentException | SipException | ParseException | SsrcTransactionNotFoundException e) {
log.error("[命令发送失败] 停止播放, 发送BYE: {}", e.getMessage());
}
@ -741,7 +741,7 @@ public class PlatformServiceImpl implements IPlatformService {
// 释放ssrc
mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
sessionManager.removeByStream(ssrcInfo.getStream());
sessionManager.removeByStream(ssrcInfo.getApp(), ssrcInfo.getStream());
callback.run(InviteErrorCode.ERROR_FOR_RESET_SSRC.getCode(),
"下级自定义了ssrc,重新设置收流信息失败", null);
@ -781,12 +781,13 @@ public class PlatformServiceImpl implements IPlatformService {
if (ssrcInResponse != null) {
// 单端口
// 重新订阅流上线
SsrcTransaction ssrcTransaction = sessionManager.getSsrcTransactionByStream(inviteInfo.getStream());
sessionManager.removeByStream(inviteInfo.getStream());
SsrcTransaction ssrcTransaction = sessionManager.getSsrcTransactionByStream(ssrcInfo.getApp(), inviteInfo.getStream());
sessionManager.removeByStream(ssrcInfo.getApp(), inviteInfo.getStream());
inviteStreamService.updateInviteInfoForSSRC(inviteInfo, ssrcInResponse);
ssrcTransaction.setPlatformId(platform.getServerGBId());
ssrcTransaction.setChannelId(channel.getGbId());
ssrcTransaction.setApp(ssrcInfo.getApp());
ssrcTransaction.setStream(inviteInfo.getStream());
ssrcTransaction.setSsrc(ssrcInResponse);
ssrcTransaction.setMediaServerId(mediaServerItem.getId());
@ -838,7 +839,7 @@ public class PlatformServiceImpl implements IPlatformService {
// 释放ssrc
mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
sessionManager.removeByStream(ssrcInfo.getStream());
sessionManager.removeByStream(ssrcInfo.getApp(), ssrcInfo.getStream());
callback.run(InviteErrorCode.ERROR_FOR_SDP_PARSING_EXCEPTIONS.getCode(),
InviteErrorCode.ERROR_FOR_SDP_PARSING_EXCEPTIONS.getMsg(), null);
@ -849,11 +850,11 @@ public class PlatformServiceImpl implements IPlatformService {
}
@Override
public void stopBroadcast(Platform platform, CommonGBChannel channel, String stream, boolean sendBye, MediaServer mediaServerItem) {
public void stopBroadcast(Platform platform, CommonGBChannel channel, String app, String stream, boolean sendBye, MediaServer mediaServerItem) {
try {
if (sendBye) {
commanderForPlatform.streamByeCmd(platform, channel, stream, null, null);
commanderForPlatform.streamByeCmd(platform, channel, app, stream, null, null);
}
} catch (InvalidArgumentException | SipException | ParseException | SsrcTransactionNotFoundException e) {
log.warn("[消息发送失败] 停止语音对讲, 平台:{},通道:{}", platform.getId(), channel.getGbDeviceId() );
@ -865,7 +866,7 @@ public class PlatformServiceImpl implements IPlatformService {
mediaServerService.releaseSsrc(mediaServerItem.getId(), inviteInfo.getSsrcInfo().getSsrc());
inviteStreamService.removeInviteInfo(inviteInfo);
}
sessionManager.removeByStream(stream);
sessionManager.removeByStream(app, stream);
}
}

View File

@ -190,7 +190,7 @@ public class PlayServiceImpl implements IPlayService {
DeviceChannel channel = deviceChannelService.getOneById(sendRtpInfo.getChannelId());
try {
if (device != null && channel != null) {
cmder.streamByeCmd(device, channel.getDeviceId(), event.getStream(), sendRtpInfo.getCallId());
cmder.streamByeCmd(device, channel.getDeviceId(), event.getApp(), event.getStream(), sendRtpInfo.getCallId(), null);
if (sendRtpInfo.getPlayType().equals(InviteStreamType.BROADCAST)
|| sendRtpInfo.getPlayType().equals(InviteStreamType.TALK)) {
AudioBroadcastCatch audioBroadcastCatch = audioBroadcastManager.get(channel.getId());
@ -322,11 +322,11 @@ public class PlayServiceImpl implements IPlayService {
throw new ControllerException(ErrorCode.ERROR100.getCode(), "未找到通道");
}
return play(mediaServerItem, device, channel, ssrc, callback);
return play(mediaServerItem, device, channel, ssrc, userSetting.getRecordSip(), callback);
}
private SSRCInfo play(MediaServer mediaServerItem, Device device, DeviceChannel channel, String ssrc,
ErrorCallback<StreamInfo> callback) {
private SSRCInfo play(MediaServer mediaServerItem, Device device, DeviceChannel channel, String ssrc, Boolean record,
ErrorCallback<StreamInfo> callback) {
if (mediaServerItem == null ) {
if (callback != null) {
callback.run(InviteErrorCode.ERROR_FOR_PARAMETER_ERROR.getCode(),
@ -339,7 +339,8 @@ public class PlayServiceImpl implements IPlayService {
InviteInfo inviteInfoInCatch = inviteStreamService.getInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, channel.getId());
if (inviteInfoInCatch != null ) {
if (inviteInfoInCatch.getStreamInfo() == null) {
// 释放生成的ssrc使用上一次申请的
// 释放生成的ssrc使用上一次申请的322
ssrcFactory.releaseSsrc(mediaServerItem.getId(), ssrc);
// 点播发起了但是尚未成功, 仅注册回调等待结果即可
inviteStreamService.once(InviteSessionType.PLAY, channel.getId(), null, callback);
@ -421,14 +422,14 @@ public class PlayServiceImpl implements IPlayService {
}
inviteStreamService.call(InviteSessionType.PLAY, channel.getId(), null, code, msg, null);
inviteStreamService.removeInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, channel.getId());
SsrcTransaction ssrcTransaction = sessionManager.getSsrcTransactionByStream(streamId);
SsrcTransaction ssrcTransaction = sessionManager.getSsrcTransactionByStream("rtp", streamId);
if (ssrcTransaction != null) {
try {
cmder.streamByeCmd(device, channel.getDeviceId(), streamId, null);
cmder.streamByeCmd(device, channel.getDeviceId(),"rtp", streamId, null, null);
} catch (InvalidArgumentException | ParseException | SipException | SsrcTransactionNotFoundException e) {
log.error("[点播超时] 发送BYE失败 {}", e.getMessage());
} finally {
sessionManager.removeByStream(streamId);
sessionManager.removeByStream("rtp", streamId);
}
}
}
@ -449,7 +450,13 @@ public class PlayServiceImpl implements IPlayService {
// 初始化redis中的invite消息状态
InviteInfo inviteInfo = InviteInfo.getInviteInfo(device.getDeviceId(), channel.getId(), ssrcInfo.getStream(), ssrcInfo, mediaServerItem.getId(),
mediaServerItem.getSdpIp(), ssrcInfo.getPort(), device.getStreamMode(), InviteSessionType.PLAY,
InviteSessionStatus.ready);
InviteSessionStatus.ready, userSetting.getRecordSip());
if (record != null) {
inviteInfo.setRecord(record);
}else {
inviteInfo.setRecord(userSetting.getRecordSip());
}
inviteStreamService.updateInviteInfo(inviteInfo);
try {
@ -460,7 +467,7 @@ public class PlayServiceImpl implements IPlayService {
log.info("[点播失败]{}:{} deviceId: {}, channelId:{}",event.statusCode, event.msg, device.getDeviceId(), channel.getDeviceId());
receiveRtpServerService.closeRTPServer(mediaServerItem, ssrcInfo);
sessionManager.removeByStream(ssrcInfo.getStream());
sessionManager.removeByStream(ssrcInfo.getApp(), ssrcInfo.getStream());
if (callback != null) {
callback.run(event.statusCode, event.msg, null);
}
@ -472,7 +479,7 @@ public class PlayServiceImpl implements IPlayService {
} catch (InvalidArgumentException | SipException | ParseException e) {
log.error("[命令发送失败] 点播消息: {}", e.getMessage());
receiveRtpServerService.closeRTPServer(mediaServerItem, ssrcInfo);
sessionManager.removeByStream(ssrcInfo.getStream());
sessionManager.removeByStream(ssrcInfo.getApp(), ssrcInfo.getStream());
if (callback != null) {
callback.run(InviteErrorCode.ERROR_FOR_SIP_SENDING_FAILED.getCode(),
InviteErrorCode.ERROR_FOR_SIP_SENDING_FAILED.getMsg(), null);
@ -523,13 +530,13 @@ public class PlayServiceImpl implements IPlayService {
timeoutCallback.run();
// 点播超时回复BYE 同时释放ssrc以及此次点播的资源
try {
cmder.streamByeCmd(device, channel.getDeviceId(), stream, null);
cmder.streamByeCmd(device, channel.getDeviceId(), null, null, callId, null);
} catch (InvalidArgumentException | ParseException | SipException | SsrcTransactionNotFoundException e) {
log.error("[语音对讲]超时, 发送BYE失败 {}", e.getMessage());
} finally {
timeoutCallback.run();
mediaServerService.releaseSsrc(mediaServerItem.getId(), sendRtpInfo.getSsrc());
sessionManager.removeByStream(sendRtpInfo.getStream());
sessionManager.removeByStream(sendRtpInfo.getApp(), sendRtpInfo.getStream());
}
}, userSetting.getPlayTimeout());
@ -538,7 +545,7 @@ public class PlayServiceImpl implements IPlayService {
if (localPort == null || localPort <= 0) {
timeoutCallback.run();
mediaServerService.releaseSsrc(mediaServerItem.getId(), sendRtpInfo.getSsrc());
sessionManager.removeByStream(sendRtpInfo.getStream());
sessionManager.removeByStream(sendRtpInfo.getApp(), sendRtpInfo.getStream());
return;
}
sendRtpInfo.setPort(localPort);
@ -573,7 +580,7 @@ public class PlayServiceImpl implements IPlayService {
sendRtpInfo.setCallId(response.getCallIdHeader().getCallId());
sendRtpServerService.update(sendRtpInfo);
SsrcTransaction ssrcTransaction = SsrcTransaction.buildForDevice(device.getDeviceId(), sendRtpInfo.getChannelId(), "talk",
SsrcTransaction ssrcTransaction = SsrcTransaction.buildForDevice(device.getDeviceId(), sendRtpInfo.getChannelId(), "talk", sendRtpInfo.getApp(),
sendRtpInfo.getStream(), sendRtpInfo.getSsrc(), sendRtpInfo.getMediaServerId(),
response, InviteSessionType.TALK);
@ -590,7 +597,7 @@ public class PlayServiceImpl implements IPlayService {
mediaServerService.closeRTPServer(mediaServerItem, sendRtpInfo.getStream());
// 释放ssrc
mediaServerService.releaseSsrc(mediaServerItem.getId(), sendRtpInfo.getSsrc());
sessionManager.removeByStream(sendRtpInfo.getStream());
sessionManager.removeByStream(sendRtpInfo.getApp(), sendRtpInfo.getStream());
errorEvent.response(event);
}, userSetting.getPlayTimeout().longValue());
} catch (InvalidArgumentException | SipException | ParseException e) {
@ -601,7 +608,7 @@ public class PlayServiceImpl implements IPlayService {
// 释放ssrc
mediaServerService.releaseSsrc(mediaServerItem.getId(), sendRtpInfo.getSsrc());
sessionManager.removeByStream(sendRtpInfo.getStream());
sessionManager.removeByStream(sendRtpInfo.getApp(), sendRtpInfo.getStream());
SipSubscribe.EventResult eventResult = new SipSubscribe.EventResult();
eventResult.type = SipSubscribe.EventResultType.cmdSendFailEvent;
eventResult.statusCode = -1;
@ -644,7 +651,7 @@ public class PlayServiceImpl implements IPlayService {
if (!result) {
// 主动连接失败,结束流程, 清理数据
receiveRtpServerService.closeRTPServer(mediaServerItem, ssrcInfo);
sessionManager.removeByStream(ssrcInfo.getStream());
sessionManager.removeByStream(ssrcInfo.getApp(), ssrcInfo.getStream());
callback.run(InviteErrorCode.ERROR_FOR_SDP_PARSING_EXCEPTIONS.getCode(),
InviteErrorCode.ERROR_FOR_SDP_PARSING_EXCEPTIONS.getMsg(), null);
inviteStreamService.call(InviteSessionType.BROADCAST, channel.getId(), null,
@ -655,7 +662,7 @@ public class PlayServiceImpl implements IPlayService {
log.error("[TCP主动连接对方] deviceId: {}, channelId: {}, 解析200OK的SDP信息失败", device.getDeviceId(), channel.getDeviceId(), e);
receiveRtpServerService.closeRTPServer(mediaServerItem, ssrcInfo);
sessionManager.removeByStream(ssrcInfo.getStream());
sessionManager.removeByStream(ssrcInfo.getApp(), ssrcInfo.getStream());
callback.run(InviteErrorCode.ERROR_FOR_SDP_PARSING_EXCEPTIONS.getCode(),
InviteErrorCode.ERROR_FOR_SDP_PARSING_EXCEPTIONS.getMsg(), null);
@ -804,14 +811,14 @@ public class PlayServiceImpl implements IPlayService {
}
inviteStreamService.call(InviteSessionType.PLAYBACK, channel.getId(), null, code, msg, null);
inviteStreamService.removeInviteInfoByDeviceAndChannel(InviteSessionType.PLAYBACK, channel.getId());
SsrcTransaction ssrcTransaction = sessionManager.getSsrcTransactionByStream(stream);
SsrcTransaction ssrcTransaction = sessionManager.getSsrcTransactionByStream("rtp", stream);
if (ssrcTransaction != null) {
try {
cmder.streamByeCmd(device, channel.getDeviceId(), stream, null);
cmder.streamByeCmd(device, channel.getDeviceId(),"rtp", stream, null, null);
} catch (InvalidArgumentException | ParseException | SipException | SsrcTransactionNotFoundException e) {
log.error("[录像回放] 发送BYE失败 {}", e.getMessage());
} finally {
sessionManager.removeByStream(stream);
sessionManager.removeByStream("rtp", stream);
}
}
}
@ -834,7 +841,7 @@ public class PlayServiceImpl implements IPlayService {
// 初始化redis中的invite消息状态
InviteInfo inviteInfo = InviteInfo.getInviteInfo(device.getDeviceId(), channel.getId(), ssrcInfo.getStream(), ssrcInfo, mediaServerItem.getId(),
mediaServerItem.getSdpIp(), ssrcInfo.getPort(), device.getStreamMode(), InviteSessionType.PLAYBACK,
InviteSessionStatus.ready);
InviteSessionStatus.ready, userSetting.getRecordSip());
inviteStreamService.updateInviteInfo(inviteInfo);
try {
@ -850,7 +857,7 @@ public class PlayServiceImpl implements IPlayService {
}
receiveRtpServerService.closeRTPServer(mediaServerItem, ssrcInfo);
sessionManager.removeByStream(ssrcInfo.getStream());
sessionManager.removeByStream(ssrcInfo.getApp(), ssrcInfo.getStream());
inviteStreamService.removeInviteInfo(inviteInfo);
}, userSetting.getPlayTimeout().longValue());
} catch (InvalidArgumentException | SipException | ParseException e) {
@ -859,7 +866,7 @@ public class PlayServiceImpl implements IPlayService {
callback.run(InviteErrorCode.FAIL.getCode(), e.getMessage(), null);
}
receiveRtpServerService.closeRTPServer(mediaServerItem, ssrcInfo);
sessionManager.removeByStream(ssrcInfo.getStream());
sessionManager.removeByStream(ssrcInfo.getApp(), ssrcInfo.getStream());
inviteStreamService.removeInviteInfo(inviteInfo);
}
}
@ -905,7 +912,7 @@ public class PlayServiceImpl implements IPlayService {
if (!result) {
try {
log.warn("[Invite 200OK] 更新ssrc失败停止点播 {}/{}", device.getDeviceId(), channel.getDeviceId());
cmder.streamByeCmd(device, channel.getDeviceId(), ssrcInfo.getStream(), null, null);
cmder.streamByeCmd(device, channel.getDeviceId(), ssrcInfo.getApp(), ssrcInfo.getStream(), null, null);
} catch (InvalidArgumentException | SipException | ParseException | SsrcTransactionNotFoundException e) {
log.error("[命令发送失败] 停止播放, 发送BYE: {}", e.getMessage());
}
@ -913,7 +920,7 @@ public class PlayServiceImpl implements IPlayService {
// 释放ssrc
mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
sessionManager.removeByStream(ssrcInfo.getStream());
sessionManager.removeByStream(ssrcInfo.getApp(), ssrcInfo.getStream());
callback.run(InviteErrorCode.ERROR_FOR_RESET_SSRC.getCode(),
"下级自定义了ssrc,重新设置收流信息失败", null);
@ -939,13 +946,15 @@ public class PlayServiceImpl implements IPlayService {
if (ssrcInResponse != null) {
// 单端口
// 重新订阅流上线
SsrcTransaction ssrcTransaction = sessionManager.getSsrcTransactionByStream(inviteInfo.getStream());
sessionManager.removeByStream(inviteInfo.getStream());
SsrcTransaction ssrcTransaction = sessionManager.getSsrcTransactionByStream("rtp", inviteInfo.getStream());
sessionManager.removeByStream("rtp", inviteInfo.getStream());
inviteStreamService.updateInviteInfoForSSRC(inviteInfo, ssrcInResponse);
ssrcTransaction.setDeviceId(device.getDeviceId());
ssrcTransaction.setChannelId(ssrcTransaction.getChannelId());
ssrcTransaction.setCallId(ssrcTransaction.getCallId());
ssrcTransaction.setSsrc(ssrcInResponse);
ssrcTransaction.setApp("rtp");
ssrcTransaction.setStream(inviteInfo.getStream());
ssrcTransaction.setMediaServerId(mediaServerItem.getId());
ssrcTransaction.setSipTransactionInfo(new SipTransactionInfo((SIPResponse) responseEvent.getResponse()));
ssrcTransaction.setType(inviteSessionType);
@ -1013,14 +1022,14 @@ public class PlayServiceImpl implements IPlayService {
inviteStreamService.call(InviteSessionType.DOWNLOAD, channel.getId(), null, code, msg, null);
inviteStreamService.removeInviteInfoByDeviceAndChannel(InviteSessionType.DOWNLOAD, channel.getId());
if (result != null && result.getSsrcInfo() != null) {
SsrcTransaction ssrcTransaction = sessionManager.getSsrcTransactionByStream(result.getSsrcInfo().getStream());
SsrcTransaction ssrcTransaction = sessionManager.getSsrcTransactionByStream(result.getSsrcInfo().getApp(), result.getSsrcInfo().getStream());
if (ssrcTransaction != null) {
try {
cmder.streamByeCmd(device, channel.getDeviceId(), ssrcTransaction.getStream(), null);
cmder.streamByeCmd(device, channel.getDeviceId(), ssrcTransaction.getApp(), ssrcTransaction.getStream(), null, null);
} catch (InvalidArgumentException | ParseException | SipException | SsrcTransactionNotFoundException e) {
log.error("[录像下载] 发送BYE失败 {}", e.getMessage());
} finally {
sessionManager.removeByStream(ssrcTransaction.getStream());
sessionManager.removeByStream(ssrcTransaction.getApp(), ssrcTransaction.getStream());
}
}
}
@ -1045,7 +1054,7 @@ public class PlayServiceImpl implements IPlayService {
// 初始化redis中的invite消息状态
InviteInfo inviteInfo = InviteInfo.getInviteInfo(device.getDeviceId(), channel.getId(), ssrcInfo.getStream(), ssrcInfo, mediaServerItem.getId(),
mediaServerItem.getSdpIp(), ssrcInfo.getPort(), device.getStreamMode(), InviteSessionType.DOWNLOAD,
InviteSessionStatus.ready);
InviteSessionStatus.ready, true);
inviteStreamService.updateInviteInfo(inviteInfo);
try {
@ -1054,7 +1063,7 @@ public class PlayServiceImpl implements IPlayService {
// 对方返回错误
callback.run(InviteErrorCode.FAIL.getCode(), String.format("录像下载失败, 错误码: %s, %s", eventResult.statusCode, eventResult.msg), null);
receiveRtpServerService.closeRTPServer(mediaServerItem, ssrcInfo);
sessionManager.removeByStream(ssrcInfo.getStream());
sessionManager.removeByStream(ssrcInfo.getApp(), ssrcInfo.getStream());
inviteStreamService.removeInviteInfo(inviteInfo);
}, eventResult ->{
// 处理收到200ok后的TCP主动连接以及SSRC不一致的问题
@ -1086,7 +1095,7 @@ public class PlayServiceImpl implements IPlayService {
log.error("[命令发送失败] 录像下载: {}", e.getMessage());
callback.run(InviteErrorCode.FAIL.getCode(),e.getMessage(), null);
receiveRtpServerService.closeRTPServer(mediaServerItem, ssrcInfo);
sessionManager.removeByStream(ssrcInfo.getStream());
sessionManager.removeByStream(ssrcInfo.getApp(), ssrcInfo.getStream());
inviteStreamService.removeInviteInfo(inviteInfo);
}
}
@ -1224,8 +1233,8 @@ public class PlayServiceImpl implements IPlayService {
continue;
}
try {
cmder.streamByeCmd(device, deviceChannel.getDeviceId(),
ssrcTransaction.getStream(), null);
cmder.streamByeCmd(device, deviceChannel.getDeviceId(), ssrcTransaction.getApp(),
ssrcTransaction.getStream(), null, null);
} catch (InvalidArgumentException | ParseException | SipException |
SsrcTransactionNotFoundException e) {
log.error("[zlm离线]为正在使用此zlm的设备 发送BYE失败 {}", e.getMessage());
@ -1583,10 +1592,10 @@ public class PlayServiceImpl implements IPlayService {
ssrcFactory.releaseSsrc(mediaServerId, sendRtpInfo.getSsrc());
SsrcTransaction ssrcTransaction = sessionManager.getSsrcTransactionByStream(sendRtpInfo.getStream());
SsrcTransaction ssrcTransaction = sessionManager.getSsrcTransactionByStream(sendRtpInfo.getApp(), sendRtpInfo.getStream());
if (ssrcTransaction != null) {
try {
cmder.streamByeCmd(device, channel.getDeviceId(), sendRtpInfo.getStream(), null);
cmder.streamByeCmd(device, channel.getDeviceId(), sendRtpInfo.getApp(), sendRtpInfo.getStream(), null, null);
} catch (InvalidArgumentException | ParseException | SipException | SsrcTransactionNotFoundException e) {
log.info("[语音对讲] 停止消息发送失败,可能已经停止");
}
@ -1652,7 +1661,7 @@ public class PlayServiceImpl implements IPlayService {
}
return;
}
inviteStreamService.removeInviteInfo(inviteInfo);
inviteStreamService.removeInviteInfo( "rtp", inviteInfo);
if (InviteSessionStatus.ok == inviteInfo.getStatus()) {
try {
log.info("[停止点播/回放/下载] {}/{}", device.getDeviceId(), channel.getDeviceId());
@ -1689,7 +1698,7 @@ public class PlayServiceImpl implements IPlayService {
if (InviteSessionStatus.ok == inviteInfo.getStatus()) {
try {
log.info("[停止点播/回放/下载] {}/{}", device.getDeviceId(), channel.getDeviceId());
cmder.streamByeCmd(device, channel.getDeviceId(), inviteInfo.getStream(), null, null);
cmder.streamByeCmd(device, channel.getDeviceId(), "rtp", inviteInfo.getStream(), null, null);
} catch (InvalidArgumentException | SipException | ParseException | SsrcTransactionNotFoundException e) {
log.warn("[命令发送失败] 停止点播/回放/下载, 发送BYE: {}", e.getMessage());
}
@ -1704,7 +1713,7 @@ public class PlayServiceImpl implements IPlayService {
}
@Override
public void play(CommonGBChannel channel, ErrorCallback<StreamInfo> callback) {
public void play(CommonGBChannel channel, Boolean record, ErrorCallback<StreamInfo> callback) {
Device device = deviceService.getDevice(channel.getDataDeviceId());
if (device == null) {
log.warn("[点播] 未找到通道{}的设备信息", channel);

View File

@ -27,15 +27,15 @@ public class SipInviteSessionManager {
*/
public void put(SsrcTransaction ssrcTransaction){
redisTemplate.opsForHash().put(VideoManagerConstants.SIP_INVITE_SESSION_STREAM + userSetting.getServerId()
, ssrcTransaction.getStream(), ssrcTransaction);
, ssrcTransaction.getApp() + ssrcTransaction.getStream(), ssrcTransaction);
redisTemplate.opsForHash().put(VideoManagerConstants.SIP_INVITE_SESSION_CALL_ID + userSetting.getServerId()
, ssrcTransaction.getCallId(), ssrcTransaction);
}
public SsrcTransaction getSsrcTransactionByStream(String stream){
public SsrcTransaction getSsrcTransactionByStream(String app, String stream){
String key = VideoManagerConstants.SIP_INVITE_SESSION_STREAM + userSetting.getServerId();
return (SsrcTransaction)redisTemplate.opsForHash().get(key, stream);
return (SsrcTransaction)redisTemplate.opsForHash().get(key, app + stream);
}
public SsrcTransaction getSsrcTransactionByCallId(String callId){
@ -56,8 +56,8 @@ public class SipInviteSessionManager {
return result;
}
public void removeByStream(String stream) {
SsrcTransaction ssrcTransaction = getSsrcTransactionByStream(stream);
public void removeByStream(String app, String stream) {
SsrcTransaction ssrcTransaction = getSsrcTransactionByStream(app, stream);
if (ssrcTransaction == null ) {
return;
}

View File

@ -129,13 +129,10 @@ public interface ISIPCommander {
/**
*
*/
void streamByeCmd(Device device, String channelId, String stream, String callId, SipSubscribe.Event okEvent) throws InvalidArgumentException, SipException, ParseException, SsrcTransactionNotFoundException;
void streamByeCmd(Device device, String channelId, String app, String stream, String callId, SipSubscribe.Event okEvent) throws InvalidArgumentException, SipException, ParseException, SsrcTransactionNotFoundException;
void talkStreamCmd(MediaServer mediaServerItem, SendRtpInfo sendRtpItem, Device device, DeviceChannel channelId, String callId, HookSubscribe.Event event, HookSubscribe.Event eventForPush, SipSubscribe.Event okEvent, SipSubscribe.Event errorEvent, Long timeout) throws InvalidArgumentException, SipException, ParseException;
void streamByeCmd(Device device, String channelId, String stream, String callId) throws InvalidArgumentException, ParseException, SipException, SsrcTransactionNotFoundException;
void streamByeCmd(Device device, String channelId, SipTransactionInfo sipTransactionInfo, SipSubscribe.Event okEvent) throws InvalidArgumentException, SipException, ParseException, SsrcTransactionNotFoundException;
/**

View File

@ -144,7 +144,7 @@ public interface ISIPCommanderForPlatform {
void streamByeCmd(Platform platform, SendRtpInfo sendRtpItem, CommonGBChannel channel) throws SipException, InvalidArgumentException, ParseException;
void streamByeCmd(Platform platform, CommonGBChannel channel, String stream, String callId, SipSubscribe.Event okEvent) throws InvalidArgumentException, SipException, ParseException, SsrcTransactionNotFoundException;
void streamByeCmd(Platform platform, CommonGBChannel channel, String app, String stream, String callId, SipSubscribe.Event okEvent) throws InvalidArgumentException, SipException, ParseException, SsrcTransactionNotFoundException;
void broadcastInviteCmd(Platform platform, CommonGBChannel channel, String sourceId, MediaServer mediaServerItem,
SSRCInfo ssrcInfo, HookSubscribe.Event event, SipSubscribe.Event okEvent,

View File

@ -332,14 +332,15 @@ public class SIPCommander implements ISIPCommander {
Request request = headerProvider.createInviteRequest(device, channel.getDeviceId(), content.toString(), SipUtils.getNewViaTag(), SipUtils.getNewFromTag(), null, ssrcInfo.getSsrc(),sipSender.getNewCallIdHeader(sipLayer.getLocalIp(device.getLocalIp()),device.getTransport()));
sipSender.transmitRequest(sipLayer.getLocalIp(device.getLocalIp()), request, (e -> {
sessionManager.removeByStream(ssrcInfo.getStream());
sessionManager.removeByStream(ssrcInfo.getApp(), ssrcInfo.getStream());
mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
errorEvent.response(e);
}), e -> {
ResponseEvent responseEvent = (ResponseEvent) e.event;
SIPResponse response = (SIPResponse) responseEvent.getResponse();
String callId = response.getCallIdHeader().getCallId();
SsrcTransaction ssrcTransaction = SsrcTransaction.buildForDevice(device.getDeviceId(), channel.getId(), callId, stream, ssrcInfo.getSsrc(), mediaServerItem.getId(), response,
SsrcTransaction ssrcTransaction = SsrcTransaction.buildForDevice(device.getDeviceId(), channel.getId(),
callId,ssrcInfo.getApp(), ssrcInfo.getStream(), ssrcInfo.getSsrc(), mediaServerItem.getId(), response,
InviteSessionType.PLAY);
sessionManager.put(ssrcTransaction);
okEvent.response(e);
@ -435,7 +436,9 @@ public class SIPCommander implements ISIPCommander {
ResponseEvent responseEvent = (ResponseEvent) event.event;
SIPResponse response = (SIPResponse) responseEvent.getResponse();
SsrcTransaction ssrcTransaction = SsrcTransaction.buildForDevice(device.getDeviceId(),
channel.getId(), sipSender.getNewCallIdHeader(sipLayer.getLocalIp(device.getLocalIp()),device.getTransport()).getCallId(), ssrcInfo.getStream(), ssrcInfo.getSsrc(), mediaServerItem.getId(), response, InviteSessionType.PLAYBACK);
channel.getId(), sipSender.getNewCallIdHeader(sipLayer.getLocalIp(device.getLocalIp()),
device.getTransport()).getCallId(), ssrcInfo.getApp(), ssrcInfo.getStream(), ssrcInfo.getSsrc(),
mediaServerItem.getId(), response, InviteSessionType.PLAYBACK);
sessionManager.put(ssrcTransaction);
okEvent.response(event);
}, timeout);
@ -526,7 +529,9 @@ public class SIPCommander implements ISIPCommander {
SIPResponse response = (SIPResponse) responseEvent.getResponse();
String contentString =new String(response.getRawContent());
String ssrc = SipUtils.getSsrcFromSdp(contentString);
SsrcTransaction ssrcTransaction = SsrcTransaction.buildForDevice(device.getDeviceId(), channel.getId(), response.getCallIdHeader().getCallId(), ssrcInfo.getStream(), ssrc, mediaServerItem.getId(), response, InviteSessionType.DOWNLOAD);
SsrcTransaction ssrcTransaction = SsrcTransaction.buildForDevice(device.getDeviceId(), channel.getId(),
response.getCallIdHeader().getCallId(), ssrcInfo.getApp(), ssrcInfo.getStream(), ssrc,
mediaServerItem.getId(), response, InviteSessionType.DOWNLOAD);
sessionManager.put(ssrcTransaction);
okEvent.response(event);
}, timeout);
@ -586,32 +591,24 @@ public class SIPCommander implements ISIPCommander {
Request request = headerProvider.createInviteRequest(device, channel.getDeviceId(), content.toString(),
SipUtils.getNewViaTag(), SipUtils.getNewFromTag(), null, sendRtpItem.getSsrc(), callIdHeader);
sipSender.transmitRequest(sipLayer.getLocalIp(device.getLocalIp()), request, (e -> {
sessionManager.removeByStream(sendRtpItem.getStream());
sessionManager.removeByStream(sendRtpItem.getApp(), sendRtpItem.getStream());
mediaServerService.releaseSsrc(mediaServerItem.getId(), sendRtpItem.getSsrc());
errorEvent.response(e);
}), e -> {
// 这里为例避免一个通道的点播只有一个callID这个参数使用一个固定值
ResponseEvent responseEvent = (ResponseEvent) e.event;
SIPResponse response = (SIPResponse) responseEvent.getResponse();
SsrcTransaction ssrcTransaction = SsrcTransaction.buildForDevice(device.getDeviceId(), channel.getId(), "talk", stream, sendRtpItem.getSsrc(), mediaServerItem.getId(), response, InviteSessionType.TALK);
SsrcTransaction ssrcTransaction = SsrcTransaction.buildForDevice(device.getDeviceId(), channel.getId(), "talk",sendRtpItem.getApp(), stream, sendRtpItem.getSsrc(), mediaServerItem.getId(), response, InviteSessionType.TALK);
sessionManager.put(ssrcTransaction);
okEvent.response(e);
}, timeout);
}
/**
* , 使
*/
@Override
public void streamByeCmd(Device device, String channelId, String stream, String callId) throws InvalidArgumentException, ParseException, SipException, SsrcTransactionNotFoundException {
streamByeCmd(device, channelId, stream, callId, null);
}
/**
*
*/
@Override
public void streamByeCmd(Device device, String channelId, String stream, String callId, SipSubscribe.Event okEvent) throws InvalidArgumentException, SipException, ParseException, SsrcTransactionNotFoundException {
public void streamByeCmd(Device device, String channelId, String app, String stream, String callId, SipSubscribe.Event okEvent) throws InvalidArgumentException, SipException, ParseException, SsrcTransactionNotFoundException {
if (device == null) {
log.warn("[发送BYE] device为null");
return;
@ -620,7 +617,7 @@ public class SIPCommander implements ISIPCommander {
if (callId != null) {
ssrcTransaction = sessionManager.getSsrcTransactionByCallId(callId);
}else if (stream != null) {
ssrcTransaction = sessionManager.getSsrcTransactionByStream(stream);
ssrcTransaction = sessionManager.getSsrcTransactionByStream(app, stream);
}
if (ssrcTransaction == null) {
@ -1362,7 +1359,7 @@ public class SIPCommander implements ISIPCommander {
@Override
public void playbackControlCmd(Device device, DeviceChannel channel, String stream, String content, SipSubscribe.Event errorEvent, SipSubscribe.Event okEvent) throws SipException, InvalidArgumentException, ParseException {
SsrcTransaction ssrcTransaction = sessionManager.getSsrcTransactionByStream(stream);
SsrcTransaction ssrcTransaction = sessionManager.getSsrcTransactionByStream("rtp", stream);
if (ssrcTransaction == null) {
log.info("[回放控制]未找到视频流信息,设备:{}, 流ID: {}", device.getDeviceId(), stream);
return;

View File

@ -7,7 +7,6 @@ import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.conf.exception.SsrcTransactionNotFoundException;
import com.genersoft.iot.vmp.gb28181.SipLayer;
import com.genersoft.iot.vmp.gb28181.bean.*;
import com.genersoft.iot.vmp.gb28181.dao.CommonGBChannelMapper;
import com.genersoft.iot.vmp.gb28181.event.SipSubscribe;
import com.genersoft.iot.vmp.gb28181.session.SipInviteSessionManager;
import com.genersoft.iot.vmp.gb28181.transmit.SIPSender;
@ -644,13 +643,13 @@ public class SIPCommanderForPlatform implements ISIPCommanderForPlatform {
}
@Override
public void streamByeCmd(Platform platform, CommonGBChannel channel, String stream, String callId, SipSubscribe.Event okEvent) throws InvalidArgumentException, SipException, ParseException, SsrcTransactionNotFoundException {
public void streamByeCmd(Platform platform, CommonGBChannel channel, String app, String stream, String callId, SipSubscribe.Event okEvent) throws InvalidArgumentException, SipException, ParseException, SsrcTransactionNotFoundException {
SsrcTransaction ssrcTransaction = null;
if (callId != null) {
ssrcTransaction = sessionManager.getSsrcTransactionByCallId(callId);
}else if (stream != null) {
ssrcTransaction = sessionManager.getSsrcTransactionByStream(stream);
ssrcTransaction = sessionManager.getSsrcTransactionByStream(app, stream);
}
if (ssrcTransaction == null) {
throw new SsrcTransactionNotFoundException(platform.getServerGBId(), channel.getGbDeviceId(), callId, stream);
@ -658,7 +657,7 @@ public class SIPCommanderForPlatform implements ISIPCommanderForPlatform {
mediaServerService.releaseSsrc(ssrcTransaction.getMediaServerId(), ssrcTransaction.getSsrc());
mediaServerService.closeRTPServer(ssrcTransaction.getMediaServerId(), ssrcTransaction.getStream());
sessionManager.removeByStream(ssrcTransaction.getStream());
sessionManager.removeByStream(ssrcTransaction.getApp(), ssrcTransaction.getStream());
Request byteRequest = headerProviderPlatformProvider.createByteRequest(platform, channel.getGbDeviceId(), ssrcTransaction.getSipTransactionInfo());
sipSender.transmitRequest(sipLayer.getLocalIp(platform.getDeviceIp()), byteRequest, null, okEvent);
@ -743,14 +742,15 @@ public class SIPCommanderForPlatform implements ISIPCommanderForPlatform {
content.toString(), SipUtils.getNewViaTag(), SipUtils.getNewFromTag(), ssrcInfo.getSsrc(),
callIdHeader);
sipSender.transmitRequest(sipLayer.getLocalIp(platform.getDeviceIp()), request, (e -> {
sessionManager.removeByStream(ssrcInfo.getStream());
sessionManager.removeByStream(ssrcInfo.getApp(), ssrcInfo.getStream());
mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
subscribe.removeSubscribe(hook);
errorEvent.response(e);
}), e -> {
ResponseEvent responseEvent = (ResponseEvent) e.event;
SIPResponse response = (SIPResponse) responseEvent.getResponse();
SsrcTransaction ssrcTransaction = SsrcTransaction.buildForPlatform(platform.getServerGBId(), channel.getGbId(), callIdHeader.getCallId(), stream, ssrcInfo.getSsrc(), mediaServerItem.getId(), response, InviteSessionType.BROADCAST);
SsrcTransaction ssrcTransaction = SsrcTransaction.buildForPlatform(platform.getServerGBId(), channel.getGbId(),
callIdHeader.getCallId(), ssrcInfo.getApp(), stream, ssrcInfo.getSsrc(), mediaServerItem.getId(), response, InviteSessionType.BROADCAST);
sessionManager.put(ssrcTransaction);
okEvent.response(e);
});

View File

@ -169,7 +169,7 @@ public class ByeRequestProcessor extends SIPRequestProcessorParent implements In
}
try {
log.info("[停止点播] {}/{}", sendRtpItem.getTargetId(), sendRtpItem.getChannelId());
cmder.streamByeCmd(device, deviceChannel.getDeviceId(), streamId, null);
cmder.streamByeCmd(device, deviceChannel.getDeviceId(), sendRtpItem.getApp(), sendRtpItem.getStream(), null, null);
} catch (InvalidArgumentException | ParseException | SipException |
SsrcTransactionNotFoundException e) {
log.error("[收到bye] {} 无其它观看者,通知设备停止推流, 发送BYE失败 {}",streamId, e.getMessage());
@ -196,7 +196,7 @@ public class ByeRequestProcessor extends SIPRequestProcessorParent implements In
return;
}
String mediaServerId = ssrcTransaction.getMediaServerId();
platformService.stopBroadcast(platform, channel, ssrcTransaction.getStream(), false,
platformService.stopBroadcast(platform, channel, ssrcTransaction.getApp(), ssrcTransaction.getStream(), false,
mediaServerService.getOne(mediaServerId));
DeviceChannel deviceChannel = deviceChannelService.getOneForSourceById(channel.getGbId());
Device device = deviceService.getDevice(channel.getDataDeviceId());

View File

@ -624,7 +624,8 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
audioBroadcastCatch.setStatus(AudioBroadcastCatchStatus.Ok);
audioBroadcastCatch.setSipTransactionInfoByRequest(sipResponse);
audioBroadcastManager.update(audioBroadcastCatch);
SsrcTransaction ssrcTransaction = SsrcTransaction.buildForDevice(device.getDeviceId(), sendRtpItem.getChannelId(), request.getCallIdHeader().getCallId(), sendRtpItem.getStream(), sendRtpItem.getSsrc(), sendRtpItem.getMediaServerId(), sipResponse, InviteSessionType.BROADCAST);
SsrcTransaction ssrcTransaction = SsrcTransaction.buildForDevice(device.getDeviceId(), sendRtpItem.getChannelId(),
request.getCallIdHeader().getCallId(), sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getSsrc(), sendRtpItem.getMediaServerId(), sipResponse, InviteSessionType.BROADCAST);
sessionManager.put(ssrcTransaction);
// 开启发流大华在收到200OK后就会开始建立连接
if (!device.isBroadcastPushAfterAck()) {

View File

@ -151,7 +151,7 @@ public class BroadcastNotifyMessageHandler extends SIPRequestProcessorParent imp
log.info("[国标级联] 语音喊话 设备正在使用中 platform {} channel: {}",
platform.getServerGBId(), channel.getGbDeviceId());
// 查看语音通道已经建立且已经占用 回复BYE
platformService.stopBroadcast(platform, channel, hookData.getStream(), true, hookData.getMediaServer());
platformService.stopBroadcast(platform, channel, hookData.getApp(), hookData.getStream(), true, hookData.getMediaServer());
}else {
// 查看语音通道已经建立但是未占用
broadcastCatch.setApp(hookData.getApp());

View File

@ -3,16 +3,18 @@ package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.notify
import com.genersoft.iot.vmp.common.VideoManagerConstants;
import com.genersoft.iot.vmp.conf.DynamicTask;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.gb28181.bean.*;
import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.bean.Platform;
import com.genersoft.iot.vmp.gb28181.bean.RemoteAddressInfo;
import com.genersoft.iot.vmp.gb28181.bean.SipMsgInfo;
import com.genersoft.iot.vmp.gb28181.service.IDeviceService;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.IMessageHandler;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.notify.NotifyMessageHandler;
import com.genersoft.iot.vmp.gb28181.utils.SipUtils;
import com.genersoft.iot.vmp.gb28181.service.IDeviceService;
import com.genersoft.iot.vmp.utils.DateUtil;
import gov.nist.javax.sip.message.SIPRequest;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.ObjectUtils;
import org.dom4j.Element;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
@ -95,10 +97,10 @@ public class KeepaliveNotifyMessageHandler extends SIPRequestProcessorParent imp
}
Device device = sipMsgInfo.getDevice();
SIPRequest request = (SIPRequest) evt.getRequest();
if (!ObjectUtils.isEmpty(device.getKeepaliveTime()) && DateUtil.getDifferenceForNow(device.getKeepaliveTime()) <= 3000L) {
log.info("[收到心跳] 心跳发送过于频繁,已忽略 device: {}, callId: {}", device.getDeviceId(), request.getCallIdHeader().getCallId());
return;
}
// if (!ObjectUtils.isEmpty(device.getKeepaliveTime()) && DateUtil.getDifferenceForNow(device.getKeepaliveTime()) <= 3000L) {
// log.info("[收到心跳] 心跳发送过于频繁,已忽略 device: {}, callId: {}", device.getDeviceId(), request.getCallIdHeader().getCallId());
// return;
// }
RemoteAddressInfo remoteAddressInfo = SipUtils.getRemoteAddressFromRequest(request, userSetting.getSipUseSourceIpAsRemoteAddress());
if (!device.getIp().equalsIgnoreCase(remoteAddressInfo.getIp()) || device.getPort() != remoteAddressInfo.getPort()) {
@ -114,14 +116,6 @@ public class KeepaliveNotifyMessageHandler extends SIPRequestProcessorParent imp
});
}
}
if (device.getKeepaliveTime() == null) {
device.setKeepaliveIntervalTime(60);
} else {
long lastTime = DateUtil.yyyy_MM_dd_HH_mm_ssToTimestamp(device.getKeepaliveTime());
if (System.currentTimeMillis() / 1000 - lastTime > 10) {
device.setKeepaliveIntervalTime(Long.valueOf(System.currentTimeMillis() / 1000 - lastTime).intValue());
}
}
device.setKeepaliveTime(DateUtil.getNow());
@ -138,7 +132,8 @@ public class KeepaliveNotifyMessageHandler extends SIPRequestProcessorParent imp
// 刷新过期任务
String registerExpireTaskKey = VideoManagerConstants.REGISTER_EXPIRE_TASK_KEY_PREFIX + device.getDeviceId();
// 如果三次心跳失败,则设置设备离线
dynamicTask.startDelay(registerExpireTaskKey, () -> deviceService.offline(device.getDeviceId(), "三次心跳失败"), device.getKeepaliveIntervalTime() * 1000 * 3);
dynamicTask.startDelay(registerExpireTaskKey, () -> deviceService.offline(device.getDeviceId(), "三次心跳超时"),
device.getHeartBeatInterval() * 1000 * device.getHeartBeatCount());
}
}

View File

@ -3,6 +3,7 @@ package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.respon
import com.alibaba.fastjson2.JSONObject;
import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.bean.Platform;
import com.genersoft.iot.vmp.gb28181.service.IDeviceService;
import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder;
import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent;
@ -36,6 +37,9 @@ public class ConfigDownloadResponseMessageHandler extends SIPRequestProcessorPar
@Autowired
private DeferredResultHolder deferredResultHolder;
@Autowired
private IDeviceService deviceService;
@Override
public void afterPropertiesSet() throws Exception {
responseMessageHandler.addHandler(cmdType, this);
@ -45,7 +49,12 @@ public class ConfigDownloadResponseMessageHandler extends SIPRequestProcessorPar
@Override
public void handForDevice(RequestEvent evt, Device device, Element element) {
String channelId = getText(element, "DeviceID");
String key = DeferredResultHolder.CALLBACK_CMD_CONFIGDOWNLOAD + device.getDeviceId() + channelId;
String key;
if (device.getDeviceId().equals(channelId)) {
key = DeferredResultHolder.CALLBACK_CMD_CONFIGDOWNLOAD + device.getDeviceId();
}else {
key = DeferredResultHolder.CALLBACK_CMD_CONFIGDOWNLOAD + device.getDeviceId() + channelId;
}
try {
// 回复200 OK
responseAck((SIPRequest) evt.getRequest(), Response.OK);
@ -58,6 +67,18 @@ public class ConfigDownloadResponseMessageHandler extends SIPRequestProcessorPar
if (log.isDebugEnabled()) {
log.debug(json.toJSONString());
}
JSONObject basicParam = json.getJSONObject("BasicParam");
Integer heartBeatInterval = basicParam.getInteger("HeartBeatInterval");
Integer heartBeatCount = basicParam.getInteger("HeartBeatCount");
Integer positionCapability = basicParam.getInteger("PositionCapability");
device.setHeartBeatInterval(heartBeatInterval);
device.setHeartBeatCount(heartBeatCount);
device.setPositionCapability(positionCapability);
deviceService.updateDeviceHeartInfo(device);
RequestMessage msg = new RequestMessage();
msg.setKey(key);
msg.setData(json);

View File

@ -10,6 +10,8 @@ import lombok.Data;
import java.util.List;
import java.util.Map;
import org.springframework.util.ObjectUtils;
/**
*
*/
@ -82,6 +84,8 @@ public class MediaInfo {
Long bytesSpeed = jsonObject.getLong("bytesSpeed");
if (totalReaderCount != null) {
mediaInfo.setReaderCount(totalReaderCount);
} else {
mediaInfo.setReaderCount(0);
}
if (online != null) {
mediaInfo.setOnline(online);
@ -106,7 +110,7 @@ public class MediaInfo {
}
}
JSONArray jsonArray = jsonObject.getJSONArray("tracks");
if (!jsonArray.isEmpty()) {
if (!ObjectUtils.isEmpty(jsonArray)) {
for (int i = 0; i < jsonArray.size(); i++) {
JSONObject trackJson = jsonArray.getJSONObject(i);
Integer channels = trackJson.getInteger("channels");

View File

@ -195,7 +195,7 @@ public class MediaServerServiceImpl implements IMediaServerService {
} else {
rtpServerPort = mediaServer.getRtpProxyPort();
}
return new SSRCInfo(rtpServerPort, ssrc, streamId, null);
return new SSRCInfo(rtpServerPort, ssrc, "rtp", streamId, null);
}
@Override

View File

@ -7,12 +7,14 @@ public class SSRCInfo {
private int port;
private String ssrc;
private String app;
private String Stream;
private String timeOutTaskKey;
public SSRCInfo(int port, String ssrc, String stream, String timeOutTaskKey) {
public SSRCInfo(int port, String ssrc, String app, String stream, String timeOutTaskKey) {
this.port = port;
this.ssrc = ssrc;
this.app = app;
this.Stream = stream;
this.timeOutTaskKey = timeOutTaskKey;
}

View File

@ -129,17 +129,19 @@ public class MediaServiceImpl implements IMediaService {
ResultForOnPublish result = new ResultForOnPublish();
result.setEnable_audio(true);
// 是否录像
if ("rtp".equals(app)) {
result.setEnable_mp4(userSetting.getRecordSip());
} else {
result.setEnable_mp4(userSetting.getRecordPushLive());
}
// 国标流
if ("rtp".equals(app)) {
InviteInfo inviteInfo = inviteStreamService.getInviteInfoByStream(null, stream);
if (inviteInfo != null) {
result.setEnable_mp4(inviteInfo.getRecord());
}else {
result.setEnable_mp4(userSetting.getRecordSip());
}
result.setEnable_mp4(inviteInfo.getRecord());
// 单端口模式下修改流 ID
if (!mediaServer.isRtpEnable() && inviteInfo == null) {
String ssrc = String.format("%010d", Long.parseLong(stream, 16));
@ -152,7 +154,7 @@ public class MediaServiceImpl implements IMediaService {
}
// 设置音频信息及录制信息
SsrcTransaction ssrcTransaction = sessionManager.getSsrcTransactionByStream(stream);
SsrcTransaction ssrcTransaction = sessionManager.getSsrcTransactionByStream(app, stream);
if (ssrcTransaction != null ) {
// 为录制国标模拟一个鉴权信息, 方便后续写入录像文件时使用
@ -190,8 +192,12 @@ public class MediaServiceImpl implements IMediaService {
}
} else if (app.equals("broadcast")) {
result.setEnable_audio(true);
result.setEnable_mp4(userSetting.getRecordSip());
} else if (app.equals("talk")) {
result.setEnable_audio(true);
result.setEnable_mp4(userSetting.getRecordSip());
}else {
result.setEnable_mp4(userSetting.getRecordPushLive());
}
if (app.equalsIgnoreCase("rtp")) {
String receiveKey = VideoManagerConstants.WVP_OTHER_RECEIVE_RTP_INFO + userSetting.getServerId() + "_" + stream;

View File

@ -66,7 +66,7 @@ public class RecordPlanServiceImpl implements IRecordPlanService {
return;
}
// 开启点播,
channelPlayService.play(channel, null, ((code, msg, streamInfo) -> {
channelPlayService.play(channel, null, true, ((code, msg, streamInfo) -> {
if (code == InviteErrorCode.SUCCESS.getCode() && streamInfo != null) {
log.info("[录像] 流离开时拉起需要录像的流, 开启成功, 通道ID: {}", channel.getGbId());
recordStreamMap.put(channel.getGbId(), streamInfo);
@ -110,7 +110,7 @@ public class RecordPlanServiceImpl implements IRecordPlanService {
// 查找是否已经开启录像, 如果没有则开启录像
for (CommonGBChannel channel : channelList) {
// 开启点播,
channelPlayService.play(channel, null, ((code, msg, streamInfo) -> {
channelPlayService.play(channel, null, true, ((code, msg, streamInfo) -> {
if (code == InviteErrorCode.SUCCESS.getCode() && streamInfo != null) {
log.info("[录像] 开启成功, 通道ID: {}", channel.getGbId());
recordStreamMap.put(channel.getGbId(), streamInfo);

View File

@ -117,11 +117,11 @@ public class RtpServerServiceImpl implements IReceiveRtpServerService {
// 设置流超时的定时任务
String timeOutTaskKey = UUID.randomUUID().toString();
SSRCInfo ssrcInfo = new SSRCInfo(rtpServerPort, ssrc, streamId, timeOutTaskKey);
SSRCInfo ssrcInfo = new SSRCInfo(rtpServerPort, ssrc, "rtp", streamId, timeOutTaskKey);
OpenRTPServerResult openRTPServerResult = new OpenRTPServerResult();
openRTPServerResult.setSsrcInfo(ssrcInfo);
Hook rtpHook = Hook.getInstance(HookType.on_media_arrival, "rtp", streamId, rtpServerParam.getMediaServerItem().getId());
Hook rtpHook = Hook.getInstance(HookType.on_media_arrival, ssrcInfo.getApp(), streamId, rtpServerParam.getMediaServerItem().getId());
dynamicTask.startDelay(timeOutTaskKey, () -> {
// 收流超时
// 释放ssrc

View File

@ -192,7 +192,7 @@ public class StreamProxyController {
@Parameter(name = "id", description = "代理Id", required = true)
public StreamContent start(int id){
log.info("播放代理: {}", id);
StreamInfo streamInfo = streamProxyPlayService.start(id);
StreamInfo streamInfo = streamProxyPlayService.start(id, null, null);
if (streamInfo == null) {
throw new ControllerException(ErrorCode.ERROR100.getCode(), ErrorCode.ERROR100.getMsg());
}else {

View File

@ -6,7 +6,7 @@ import com.genersoft.iot.vmp.streamProxy.bean.StreamProxy;
public interface IStreamProxyPlayService {
StreamInfo start(int id);
StreamInfo start(int id, Boolean record, ErrorCallback<StreamInfo> callback);
void start(int id, ErrorCallback<StreamInfo> callback);

View File

@ -15,11 +15,6 @@ import com.genersoft.iot.vmp.streamProxy.dao.StreamProxyMapper;
import com.genersoft.iot.vmp.streamProxy.service.IStreamProxyPlayService;
import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.ConcurrentHashMap;
import javax.sip.message.Response;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.event.EventListener;
import org.springframework.scheduling.annotation.Async;
@ -28,6 +23,9 @@ import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.Assert;
import org.springframework.util.ObjectUtils;
import javax.sip.message.Response;
import java.util.concurrent.ConcurrentHashMap;
/**
*
*/
@ -96,11 +94,14 @@ public class StreamProxyPlayServiceImpl implements IStreamProxyPlayService {
}
@Override
public StreamInfo start(int id) {
public StreamInfo start(int id, Boolean record, ErrorCallback<StreamInfo> callback) {
StreamProxy streamProxy = streamProxyMapper.select(id);
if (streamProxy == null) {
throw new ControllerException(ErrorCode.ERROR404.getCode(), "代理信息未找到");
}
if (record != null) {
streamProxy.setEnableMp4(record);
}
return startProxy(streamProxy);
}

View File

@ -244,7 +244,7 @@ public class ApiStreamController {
}
try {
cmder.streamByeCmd(device, code, inviteInfo.getStream(), null);
cmder.streamByeCmd(device, code, "rtp", inviteInfo.getStream(), null, null);
} catch (InvalidArgumentException | ParseException | SipException | SsrcTransactionNotFoundException e) {
JSONObject result = new JSONObject();
result.put("error","发送BYE失败" + e.getMessage());

View File

@ -255,6 +255,8 @@ user-settings:
gb-device-online: 0
# 登录超时时间(分钟)
login-timeout: 30
# jwk文件路径若不指定则使用resources目录下的jwk.json
jwk-file: classpath:jwk.json
# wvp集群模式下如果注册向上级的wvp奔溃则自动选择一个其他wvp继续注册到上级
auto-register-platform: true

View File

@ -96,6 +96,8 @@
布防</el-dropdown-item>
<el-dropdown-item command="resetGuard" v-bind:disabled="!scope.row.onLine">
撤防</el-dropdown-item>
<el-dropdown-item command="syncBasicParam" v-bind:disabled="!scope.row.onLine">
基础配置同步</el-dropdown-item>
</el-dropdown-menu>
</el-dropdown>
</template>
@ -357,6 +359,8 @@ export default {
this.resetGuard(itemData)
}else if (command === "delete") {
this.deleteDevice(itemData)
}else if (command === "syncBasicParam") {
this.syncBasicParam(itemData)
}
},
setGuard: function (itemData) {
@ -464,6 +468,33 @@ export default {
message: error.message
})
});
},
syncBasicParam: function (data) {
console.log(data)
this.$axios({
method: 'get',
url: `/api/device/config/query/${data.deviceId}/BasicParam`,
params: {
// channelId: data.deviceId
}
}).then( (res)=> {
if (res.data.code === 0) {
this.$message.success({
showClose: true,
message: `配置已同步,当前心跳间隔: ${res.data.data.BasicParam.HeartBeatInterval} 心跳间隔:${res.data.data.BasicParam.HeartBeatCount}`
})
}else {
this.$message.error({
showClose: true,
message: res.data.msg
})
}
}).catch( (error)=> {
this.$message.error({
showClose: true,
message: error.message
})
});
},

View File

@ -31,7 +31,9 @@ create table wvp_device
local_ip character varying(50),
password character varying(255),
as_message_channel bool default false,
keepalive_interval_time integer,
heart_beat_interval integer,
heart_beat_count integer,
position_capability integer,
broadcast_push_after_ack bool default false,
server_id character varying(50),
constraint uk_device_device unique (device_id)

View File

@ -32,7 +32,9 @@ create table wvp_device
server_id character varying(50),
password character varying(255),
as_message_channel bool default false,
keepalive_interval_time integer,
heart_beat_interval integer,
heart_beat_count integer,
position_capability integer,
broadcast_push_after_ack bool default false,
server_id character varying(50),
constraint uk_device_device unique (device_id)

View File

@ -41,3 +41,10 @@ alter table wvp_stream_proxy add relates_media_server_id character varying(50);
drop index uk_stream_push_app_stream_path on wvp_cloud_record;
alter table wvp_cloud_record change folder folder varchar(500) null;
alter table wvp_cloud_record change file_path file_path varchar(500) null;
/*
* 20250211
*/
alter table wvp_device change keepalive_interval_time heart_beat_interval integer;
alter table wvp_device add heart_beat_count integer;
alter table wvp_device add position_capability integer;

View File

@ -40,3 +40,10 @@ alter table wvp_stream_proxy add relates_media_server_id character varying(50);
drop index uk_stream_push_app_stream_path on wvp_cloud_record;
alter table wvp_cloud_record change folder folder varchar(500) null;
alter table wvp_cloud_record change file_path file_path varchar(500) null;
/*
* 20250211
*/
alter table wvp_device change keepalive_interval_time heart_beat_interval integer;
alter table wvp_device add heart_beat_count integer;
alter table wvp_device add position_capability integer;