临时提交

pull/1642/head
648540858 2024-08-13 18:03:34 +08:00
parent ad240ba9a4
commit d5621f9489
3 changed files with 62 additions and 50 deletions

View File

@ -79,5 +79,5 @@ public interface IGbChannelService {
CommonGBChannel queryOneWithPlatform(Integer platformId, String channelDeviceId); CommonGBChannel queryOneWithPlatform(Integer platformId, String channelDeviceId);
void start(CommonGBChannel channel, ErrorCallback<CommonChannelPlayInfo> callback); void start(CommonGBChannel channel, InviteInfo inviteInfo, ErrorCallback<CommonChannelPlayInfo> callback);
} }

View File

@ -85,7 +85,7 @@ public class GbChannelServiceImpl implements IGbChannelService {
try { try {
// 发送通知 // 发送通知
eventPublisher.catalogEventPublish(null, channel, CatalogEvent.DEL); eventPublisher.catalogEventPublish(null, channel, CatalogEvent.DEL);
}catch (Exception e) { } catch (Exception e) {
log.warn("[通道移除通知] 发送失败,{}", channel.getGbDeviceId(), e); log.warn("[通道移除通知] 发送失败,{}", channel.getGbDeviceId(), e);
} }
} }
@ -102,7 +102,7 @@ public class GbChannelServiceImpl implements IGbChannelService {
try { try {
// 发送通知 // 发送通知
eventPublisher.catalogEventPublish(null, channelListInDb, CatalogEvent.DEL); eventPublisher.catalogEventPublish(null, channelListInDb, CatalogEvent.DEL);
}catch (Exception e) { } catch (Exception e) {
log.warn("[通道移除通知] 发送失败,{}条", channelListInDb.size(), e); log.warn("[通道移除通知] 发送失败,{}条", channelListInDb.size(), e);
} }
} }
@ -120,7 +120,7 @@ public class GbChannelServiceImpl implements IGbChannelService {
try { try {
// 发送通知 // 发送通知
eventPublisher.catalogEventPublish(null, commonGBChannel, CatalogEvent.UPDATE); eventPublisher.catalogEventPublish(null, commonGBChannel, CatalogEvent.UPDATE);
}catch (Exception e) { } catch (Exception e) {
log.warn("[更新通道通知] 发送失败,{}", commonGBChannel.getGbDeviceId(), e); log.warn("[更新通道通知] 发送失败,{}", commonGBChannel.getGbDeviceId(), e);
} }
} }
@ -138,7 +138,7 @@ public class GbChannelServiceImpl implements IGbChannelService {
try { try {
// 发送通知 // 发送通知
eventPublisher.catalogEventPublish(null, commonGBChannel, CatalogEvent.OFF); eventPublisher.catalogEventPublish(null, commonGBChannel, CatalogEvent.OFF);
}catch (Exception e) { } catch (Exception e) {
log.warn("[通道离线通知] 发送失败,{}", commonGBChannel.getGbDeviceId(), e); log.warn("[通道离线通知] 发送失败,{}", commonGBChannel.getGbDeviceId(), e);
} }
} }
@ -167,14 +167,14 @@ public class GbChannelServiceImpl implements IGbChannelService {
} }
result += commonGBChannelMapper.updateStatusForListById(onlineChannelList.subList(i, toIndex), 0); result += commonGBChannelMapper.updateStatusForListById(onlineChannelList.subList(i, toIndex), 0);
} }
}else { } else {
result += commonGBChannelMapper.updateStatusForListById(onlineChannelList, 0); result += commonGBChannelMapper.updateStatusForListById(onlineChannelList, 0);
} }
if (result > 0) { if (result > 0) {
try { try {
// 发送catalog // 发送catalog
eventPublisher.catalogEventPublish(null, onlineChannelList, CatalogEvent.OFF); eventPublisher.catalogEventPublish(null, onlineChannelList, CatalogEvent.OFF);
}catch (Exception e) { } catch (Exception e) {
log.warn("[多个通道离线] 发送失败,数量:{}", onlineChannelList.size(), e); log.warn("[多个通道离线] 发送失败,数量:{}", onlineChannelList.size(), e);
} }
} }
@ -192,7 +192,7 @@ public class GbChannelServiceImpl implements IGbChannelService {
try { try {
// 发送通知 // 发送通知
eventPublisher.catalogEventPublish(null, commonGBChannel, CatalogEvent.ON); eventPublisher.catalogEventPublish(null, commonGBChannel, CatalogEvent.ON);
}catch (Exception e) { } catch (Exception e) {
log.warn("[通道上线通知] 发送失败,{}", commonGBChannel.getGbDeviceId(), e); log.warn("[通道上线通知] 发送失败,{}", commonGBChannel.getGbDeviceId(), e);
} }
} }
@ -222,14 +222,14 @@ public class GbChannelServiceImpl implements IGbChannelService {
} }
result += commonGBChannelMapper.updateStatusForListById(offlineChannelList.subList(i, toIndex), 1); result += commonGBChannelMapper.updateStatusForListById(offlineChannelList.subList(i, toIndex), 1);
} }
}else { } else {
result += commonGBChannelMapper.updateStatusForListById(offlineChannelList, 1); result += commonGBChannelMapper.updateStatusForListById(offlineChannelList, 1);
} }
if (result > 0) { if (result > 0) {
try { try {
// 发送catalog // 发送catalog
eventPublisher.catalogEventPublish(null, offlineChannelList, CatalogEvent.ON); eventPublisher.catalogEventPublish(null, offlineChannelList, CatalogEvent.ON);
}catch (Exception e) { } catch (Exception e) {
log.warn("[多个通道上线] 发送失败,数量:{}", offlineChannelList.size(), e); log.warn("[多个通道上线] 发送失败,数量:{}", offlineChannelList.size(), e);
} }
} }
@ -255,7 +255,7 @@ public class GbChannelServiceImpl implements IGbChannelService {
} }
result += commonGBChannelMapper.batchAdd(commonGBChannels.subList(i, toIndex)); result += commonGBChannelMapper.batchAdd(commonGBChannels.subList(i, toIndex));
} }
}else { } else {
result += commonGBChannelMapper.batchAdd(commonGBChannels); result += commonGBChannelMapper.batchAdd(commonGBChannels);
} }
log.warn("[新增多个通道] 通道数量为{},成功保存:{}", commonGBChannels.size(), result); log.warn("[新增多个通道] 通道数量为{},成功保存:{}", commonGBChannels.size(), result);
@ -278,7 +278,7 @@ public class GbChannelServiceImpl implements IGbChannelService {
} }
result += commonGBChannelMapper.batchUpdate(commonGBChannels.subList(i, toIndex)); result += commonGBChannelMapper.batchUpdate(commonGBChannels.subList(i, toIndex));
} }
}else { } else {
result += commonGBChannelMapper.batchUpdate(commonGBChannels); result += commonGBChannelMapper.batchUpdate(commonGBChannels);
} }
log.warn("[更新多个通道] 通道数量为{},成功保存:{}", commonGBChannels.size(), result); log.warn("[更新多个通道] 通道数量为{},成功保存:{}", commonGBChannels.size(), result);
@ -286,7 +286,7 @@ public class GbChannelServiceImpl implements IGbChannelService {
try { try {
// 发送通知 // 发送通知
eventPublisher.catalogEventPublish(null, commonGBChannels, CatalogEvent.UPDATE); eventPublisher.catalogEventPublish(null, commonGBChannels, CatalogEvent.UPDATE);
}catch (Exception e) { } catch (Exception e) {
log.warn("[更新多个通道] 发送失败,{}个", commonGBChannels.size(), e); log.warn("[更新多个通道] 发送失败,{}个", commonGBChannels.size(), e);
} }
} }
@ -308,7 +308,7 @@ public class GbChannelServiceImpl implements IGbChannelService {
} }
result += commonGBChannelMapper.updateStatus(commonGBChannels.subList(i, toIndex)); result += commonGBChannelMapper.updateStatus(commonGBChannels.subList(i, toIndex));
} }
}else { } else {
result += commonGBChannelMapper.updateStatus(commonGBChannels); result += commonGBChannelMapper.updateStatus(commonGBChannels);
} }
log.warn("[更新多个通道状态] 通道数量为{},成功保存:{}", commonGBChannels.size(), result); log.warn("[更新多个通道状态] 通道数量为{},成功保存:{}", commonGBChannels.size(), result);
@ -316,7 +316,7 @@ public class GbChannelServiceImpl implements IGbChannelService {
try { try {
// 发送通知 // 发送通知
eventPublisher.catalogEventPublish(null, commonGBChannels, CatalogEvent.UPDATE); eventPublisher.catalogEventPublish(null, commonGBChannels, CatalogEvent.UPDATE);
}catch (Exception e) { } catch (Exception e) {
log.warn("[更新多个通道] 发送失败,{}个", commonGBChannels.size(), e); log.warn("[更新多个通道] 发送失败,{}个", commonGBChannels.size(), e);
} }
} }
@ -398,7 +398,7 @@ public class GbChannelServiceImpl implements IGbChannelService {
public void reset(int id) { public void reset(int id) {
log.info("[重置国标通道] id: {}", id); log.info("[重置国标通道] id: {}", id);
CommonGBChannel channel = getOne(id); CommonGBChannel channel = getOne(id);
if (channel == null ) { if (channel == null) {
log.warn("[重置国标通道] 未找到对应Id的通道: id: {}", id); log.warn("[重置国标通道] 未找到对应Id的通道: id: {}", id);
throw new ControllerException(ErrorCode.ERROR400); throw new ControllerException(ErrorCode.ERROR400);
} }
@ -440,7 +440,7 @@ public class GbChannelServiceImpl implements IGbChannelService {
try { try {
// 发送catalog // 发送catalog
eventPublisher.catalogEventPublish(null, channelList, CatalogEvent.UPDATE); eventPublisher.catalogEventPublish(null, channelList, CatalogEvent.UPDATE);
}catch (Exception e) { } catch (Exception e) {
log.warn("[多个通道添加行政区划] 发送失败,数量:{}", channelList.size(), e); log.warn("[多个通道添加行政区划] 发送失败,数量:{}", channelList.size(), e);
} }
} }
@ -508,7 +508,7 @@ public class GbChannelServiceImpl implements IGbChannelService {
try { try {
// 发送catalog // 发送catalog
eventPublisher.catalogEventPublish(null, channelList, CatalogEvent.UPDATE); eventPublisher.catalogEventPublish(null, channelList, CatalogEvent.UPDATE);
}catch (Exception e) { } catch (Exception e) {
log.warn("[多个通道添加行政区划] 发送失败,数量:{}", channelList.size(), e); log.warn("[多个通道添加行政区划] 发送失败,数量:{}", channelList.size(), e);
} }
} }
@ -555,7 +555,7 @@ public class GbChannelServiceImpl implements IGbChannelService {
// 发送catalog // 发送catalog
try { try {
eventPublisher.catalogEventPublish(null, channelList, CatalogEvent.UPDATE); eventPublisher.catalogEventPublish(null, channelList, CatalogEvent.UPDATE);
}catch (Exception e) { } catch (Exception e) {
log.warn("[多个通道业务分组] 发送失败,数量:{}", channelList.size(), e); log.warn("[多个通道业务分组] 发送失败,数量:{}", channelList.size(), e);
} }
} }
@ -576,7 +576,7 @@ public class GbChannelServiceImpl implements IGbChannelService {
// 发送catalog // 发送catalog
try { try {
eventPublisher.catalogEventPublish(null, channelList, CatalogEvent.UPDATE); eventPublisher.catalogEventPublish(null, channelList, CatalogEvent.UPDATE);
}catch (Exception e) { } catch (Exception e) {
log.warn("[多个通道业务分组] 发送失败,数量:{}", channelList.size(), e); log.warn("[多个通道业务分组] 发送失败,数量:{}", channelList.size(), e);
} }
} }
@ -598,7 +598,7 @@ public class GbChannelServiceImpl implements IGbChannelService {
try { try {
// 发送catalog // 发送catalog
eventPublisher.catalogEventPublish(null, channelList, CatalogEvent.UPDATE); eventPublisher.catalogEventPublish(null, channelList, CatalogEvent.UPDATE);
}catch (Exception e) { } catch (Exception e) {
log.warn("[多个通道添加行政区划] 发送失败,数量:{}", channelList.size(), e); log.warn("[多个通道添加行政区划] 发送失败,数量:{}", channelList.size(), e);
} }
} }
@ -633,7 +633,7 @@ public class GbChannelServiceImpl implements IGbChannelService {
try { try {
// 发送catalog // 发送catalog
eventPublisher.catalogEventPublish(null, channelList, CatalogEvent.UPDATE); eventPublisher.catalogEventPublish(null, channelList, CatalogEvent.UPDATE);
}catch (Exception e) { } catch (Exception e) {
log.warn("[多个通道添加行政区划] 发送失败,数量:{}", channelList.size(), e); log.warn("[多个通道添加行政区划] 发送失败,数量:{}", channelList.size(), e);
} }
} }
@ -654,34 +654,46 @@ public class GbChannelServiceImpl implements IGbChannelService {
} }
@Override @Override
public void start(CommonGBChannel channel, ErrorCallback<CommonChannelPlayInfo> callback) { public void start(CommonGBChannel channel, InviteInfo inviteInfo, ErrorCallback<CommonChannelPlayInfo> callback) {
log.info("[点播通用通道] 通道: {}({})", channel.getGbName(), channel.getGbDeviceId()); if (channel == null || inviteInfo == null || callback == null) {
if (channel.getGbDeviceDbId() > 0) { log.warn("[通用通道点播] 参数异常, channel: {}, inviteInfo: {}, callback: {}", channel != null, inviteInfo != null, callback != null);
// 国标通道
Device device = deviceService.getDevice(channel.getGbDeviceDbId());
if (device == null) {
log.warn("[点播] 未找到通道{}的设备信息", channel);
throw new PlayException(Response.SERVER_INTERNAL_ERROR, "server internal error");
}
MediaServer mediaServer = playService.getNewMediaServerItem(device);
if (mediaServer == null) {
log.warn("[点播] 未找到可用媒体节点");
throw new PlayException(Response.SERVER_INTERNAL_ERROR, "server internal error");
}
playService.play(mediaServer, device.getDeviceId(), channel.getGbDeviceId(), null, (code, msg, data) -> {
if (callback != null) {
callback.run(code, msg, CommonChannelPlayInfo.build(mediaServer, data));
}
});
}else if (channel.getStreamProxyId() > 0){
// 拉流代理
}else if (channel.getStreamPushId() > 0) {
// 推流
}else {
// 通道数据异常
log.error("[点播通用通道] 通道数据异常,无法识别通道来源: {}({})", channel.getGbName(), channel.getGbDeviceId());
throw new PlayException(Response.SERVER_INTERNAL_ERROR, "server internal error"); throw new PlayException(Response.SERVER_INTERNAL_ERROR, "server internal error");
} }
log.info("[点播通用通道] 类型:{} 通道: {}({})", inviteInfo.getSessionName(), channel.getGbName(), channel.getGbDeviceId());
if ("Play".equalsIgnoreCase(inviteInfo.getSessionName())) {
if (channel.getGbDeviceDbId() > 0) {
// 国标通道
Device device = deviceService.getDevice(channel.getGbDeviceDbId());
if (device == null) {
log.warn("[点播] 未找到通道{}的设备信息", channel);
throw new PlayException(Response.SERVER_INTERNAL_ERROR, "server internal error");
}
MediaServer mediaServer = playService.getNewMediaServerItem(device);
if (mediaServer == null) {
log.warn("[点播] 未找到可用媒体节点");
throw new PlayException(Response.SERVER_INTERNAL_ERROR, "server internal error");
}
playService.play(mediaServer, device.getDeviceId(), channel.getGbDeviceId(), null, (code, msg, data) -> {
if (callback != null) {
callback.run(code, msg, CommonChannelPlayInfo.build(mediaServer, data));
}
});
} else if (channel.getStreamProxyId() > 0) {
// 拉流代理
} else if (channel.getStreamPushId() > 0) {
// 推流
} else {
// 通道数据异常
log.error("[点播通用通道] 通道数据异常,无法识别通道来源: {}({})", channel.getGbName(), channel.getGbDeviceId());
throw new PlayException(Response.SERVER_INTERNAL_ERROR, "server internal error");
}
}else if ("Playback".equals(inviteInfo.getSessionName())) {
}else if ("Download".equals(inviteInfo.getSessionName())) {
}else {
}
} }
} }

View File

@ -184,7 +184,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
log.error("[命令发送失败] 上级Invite TRYING: {}", e.getMessage()); log.error("[命令发送失败] 上级Invite TRYING: {}", e.getMessage());
} }
channelService.start(channel, ((code, msg, commonChannelPlayInfo) -> { channelService.start(channel, inviteInfo, ((code, msg, commonChannelPlayInfo) -> {
if (code != Response.OK) { if (code != Response.OK) {
try { try {
responseAck(request, code, msg); responseAck(request, code, msg);