优化点播超时的日志以及代码

结构优化
648540858 2023-12-20 23:55:03 +08:00
parent 8a605ad722
commit 52c56bfdf2
7 changed files with 149 additions and 48 deletions

View File

@ -38,13 +38,13 @@ public class PresetDataCatch {
} }
} }
public void put(int sn, int total, Device device, List<PresetItem> deviceChannelList) { public void put(int sn, int total, List<PresetItem> presetItemList) {
PresetData presetData = data.get(sn); PresetData presetData = data.get(sn);
if (presetData == null) { if (presetData == null) {
presetData = new PresetData(); presetData = new PresetData();
presetData.setSn(sn); presetData.setSn(sn);
presetData.setTotal(total); presetData.setTotal(total);
presetData.setPresetItemList(deviceChannelList); presetData.setPresetItemList(presetItemList);
presetData.setStatus(DataStatus.runIng); presetData.setStatus(DataStatus.runIng);
presetData.setLastTime(Instant.now()); presetData.setLastTime(Instant.now());
data.put(sn, presetData); data.put(sn, presetData);
@ -55,7 +55,7 @@ public class PresetDataCatch {
} }
presetData.setTotal(total); presetData.setTotal(total);
presetData.setStatus(DataStatus.runIng); presetData.setStatus(DataStatus.runIng);
presetData.getPresetItemList().addAll(deviceChannelList); presetData.getPresetItemList().addAll(presetItemList);
presetData.setLastTime(Instant.now()); presetData.setLastTime(Instant.now());
} }
} }

View File

@ -569,9 +569,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
sendRtpItem.setToTag(response.getToTag()); sendRtpItem.setToTag(response.getToTag());
} }
redisCatchStorage.updateSendRTPSever(sendRtpItem); redisCatchStorage.updateSendRTPSever(sendRtpItem);
} }
} }
private void pushStream(RequestEvent evt, SIPRequest request, GbStream gbStream, StreamPush streamPushItem, ParentPlatform platform, private void pushStream(RequestEvent evt, SIPRequest request, GbStream gbStream, StreamPush streamPushItem, ParentPlatform platform,
CallIdHeader callIdHeader, MediaServerItem mediaServerItem, CallIdHeader callIdHeader, MediaServerItem mediaServerItem,

View File

@ -2,6 +2,7 @@ package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.respon
import com.genersoft.iot.vmp.gb28181.bean.PresetQuerySipReq; import com.genersoft.iot.vmp.gb28181.bean.PresetQuerySipReq;
import com.genersoft.iot.vmp.gb28181.bean.*; import com.genersoft.iot.vmp.gb28181.bean.*;
import com.genersoft.iot.vmp.gb28181.session.PresetDataCatch;
import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder; import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder;
import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage; import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent; import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent;
@ -43,6 +44,9 @@ public class PresetQueryResponseMessageHandler extends SIPRequestProcessorParent
@Autowired @Autowired
private DeferredResultHolder deferredResultHolder; private DeferredResultHolder deferredResultHolder;
@Autowired
private PresetDataCatch presetDataCatch;
@Override @Override
@ -68,14 +72,14 @@ public class PresetQueryResponseMessageHandler extends SIPRequestProcessorParent
return; return;
} }
Element presetListNumElement = rootElement.element("PresetList"); Element presetListNumElement = rootElement.element("PresetList");
String sn = getText(rootElement, "SN"); String snStr = getText(rootElement, "SN");
//该字段可能为通道或则设备的id //该字段可能为通道或则设备的id
String channelId = getText(rootElement, "DeviceID"); String channelId = getText(rootElement, "DeviceID");
if (channelId == null) { if (channelId == null) {
} }
String key = DeferredResultHolder.CALLBACK_CMD_PRESETQUERY + deviceId + channelId; String key = DeferredResultHolder.CALLBACK_CMD_PRESETQUERY + deviceId + channelId;
if (sn == null || presetListNumElement == null) { if (snStr == null || presetListNumElement == null) {
try { try {
responseAck(request, Response.BAD_REQUEST, "xml error"); responseAck(request, Response.BAD_REQUEST, "xml error");
} catch (InvalidArgumentException | ParseException | SipException e) { } catch (InvalidArgumentException | ParseException | SipException e) {
@ -84,29 +88,34 @@ public class PresetQueryResponseMessageHandler extends SIPRequestProcessorParent
return; return;
} }
int sumNum = Integer.parseInt(presetListNumElement.attributeValue("Num")); int sumNum = Integer.parseInt(presetListNumElement.attributeValue("Num"));
List<PresetQuerySipReq> presetQuerySipReqList = new ArrayList<>(); int sn = Integer.parseInt(snStr);
if (sumNum > 0) { List<PresetItem> presetItems = new ArrayList<>();
if (sumNum == 0) {
presetDataCatch.setChannelSyncEnd(sn, null );
}else {
for (Iterator<Element> presetIterator = presetListNumElement.elementIterator(); presetIterator.hasNext(); ) { for (Iterator<Element> presetIterator = presetListNumElement.elementIterator(); presetIterator.hasNext(); ) {
Element itemListElement = presetIterator.next(); Element itemListElement = presetIterator.next();
PresetQuerySipReq presetQuerySipReq = new PresetQuerySipReq(); PresetItem presetItem = new PresetItem();
for (Iterator<Element> itemListIterator = itemListElement.elementIterator(); itemListIterator.hasNext(); ) { for (Iterator<Element> itemListIterator = itemListElement.elementIterator(); itemListIterator.hasNext(); ) {
// 遍历item // 遍历item
Element itemOne = itemListIterator.next(); Element itemOne = itemListIterator.next();
String name = itemOne.getName(); String name = itemOne.getName();
String textTrim = itemOne.getTextTrim(); String textTrim = itemOne.getTextTrim();
if ("PresetID".equalsIgnoreCase(name)) { if ("PresetID".equalsIgnoreCase(name)) {
presetQuerySipReq.setPresetId(textTrim); presetItem.setPresetID(Integer.parseInt(textTrim));
} else { } else {
presetQuerySipReq.setPresetName(textTrim); presetItem.setPresetName(textTrim);
} }
} }
presetQuerySipReqList.add(presetQuerySipReq); }
presetDataCatch.put(sn, sumNum, presetItems);
if (sumNum == presetDataCatch.get(sn).size()) {
RequestMessage requestMessage = new RequestMessage();
requestMessage.setKey(key);
requestMessage.setData(presetDataCatch.get(sn));
deferredResultHolder.invokeAllResult(requestMessage);
} }
} }
RequestMessage requestMessage = new RequestMessage();
requestMessage.setKey(key);
requestMessage.setData(presetQuerySipReqList);
deferredResultHolder.invokeAllResult(requestMessage);
try { try {
responseAck(request, Response.OK); responseAck(request, Response.OK);
} catch (InvalidArgumentException | ParseException | SipException e) { } catch (InvalidArgumentException | ParseException | SipException e) {

View File

@ -206,41 +206,41 @@ public class PlayServiceImpl implements IPlayService {
// 超时处理 // 超时处理
String timeOutTaskKey = UUID.randomUUID().toString(); String timeOutTaskKey = UUID.randomUUID().toString();
dynamicTask.startDelay(timeOutTaskKey, () -> { dynamicTask.startDelay(timeOutTaskKey, () -> {
// 取消订阅消息监听
HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed("rtp", ssrcInfo.getStream(), true, "rtsp", mediaServerItem.getId());
subscribe.removeSubscribe(hookSubscribe);
// 执行超时任务时查询是否已经成功,成功了则不执行超时任务,防止超时任务取消失败的情况 // 执行超时任务时查询是否已经成功,成功了则不执行超时任务,防止超时任务取消失败的情况
InviteInfo inviteInfoForTimeOut = inviteStreamService.getInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, device.getDeviceId(), channelId); InviteInfo inviteInfoForTimeOut = inviteStreamService.getInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, device.getDeviceId(), channelId);
if (inviteInfoForTimeOut == null || inviteInfoForTimeOut.getStreamInfo() == null) { int code;
logger.info("[点播超时] 收流超时 deviceId: {}, channelId: {},码流类型:{},端口:{}, SSRC: {}", String msg;
if (inviteInfoForTimeOut == null || inviteInfoForTimeOut.getStatus().equals(InviteSessionStatus.ready)) {
logger.info("[点播超时] 信令超时 deviceId: {}, channelId: {},码流类型:{},端口:{}, SSRC: {}",
device.getDeviceId(), channelId, device.isSwitchPrimarySubStream() ? "辅码流" : "主码流", device.getDeviceId(), channelId, device.isSwitchPrimarySubStream() ? "辅码流" : "主码流",
ssrcInfo.getPort(), ssrcInfo.getSsrc()); ssrcInfo.getPort(), ssrcInfo.getSsrc());
callback.run(InviteErrorCode.ERROR_FOR_SIGNALLING_TIMEOUT.getCode(), InviteErrorCode.ERROR_FOR_SIGNALLING_TIMEOUT.getMsg(), null);
callback.run(InviteErrorCode.ERROR_FOR_STREAM_TIMEOUT.getCode(), InviteErrorCode.ERROR_FOR_STREAM_TIMEOUT.getMsg(), null); code = InviteErrorCode.ERROR_FOR_SIGNALLING_TIMEOUT.getCode();
inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId, null, msg = InviteErrorCode.ERROR_FOR_SIGNALLING_TIMEOUT.getMsg();
InviteErrorCode.ERROR_FOR_STREAM_TIMEOUT.getCode(), InviteErrorCode.ERROR_FOR_STREAM_TIMEOUT.getMsg(), null);
inviteStreamService.removeInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, device.getDeviceId(), channelId);
try {
cmder.streamByeCmd(device, channelId, ssrcInfo.getStream(), null);
} catch (InvalidArgumentException | ParseException | SipException | SsrcTransactionNotFoundException e) {
logger.error("[点播超时] 发送BYE失败 {}", e.getMessage());
} finally {
mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream());
streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream());
mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream());
// 取消订阅消息监听
HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed("rtp", ssrcInfo.getStream(), true, "rtsp", mediaServerItem.getId());
subscribe.removeSubscribe(hookSubscribe);
}
}else { }else {
logger.info("[点播超时] 收流超时 deviceId: {}, channelId: {},码流类型:{},端口:{}, SSRC: {}", logger.info("[点播超时] 收流超时 deviceId: {}, channelId: {},码流类型:{},端口:{}, SSRC: {}",
device.getDeviceId(), channelId, device.isSwitchPrimarySubStream() ? "辅码流" : "主码流", device.getDeviceId(), channelId, device.isSwitchPrimarySubStream() ? "辅码流" : "主码流",
ssrcInfo.getPort(), ssrcInfo.getSsrc()); ssrcInfo.getPort(), ssrcInfo.getSsrc());
code = InviteErrorCode.ERROR_FOR_STREAM_TIMEOUT.getCode();
msg = InviteErrorCode.ERROR_FOR_STREAM_TIMEOUT.getMsg();
mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc()); try {
cmder.streamByeCmd(device, channelId, ssrcInfo.getStream(), null);
mediaServerService.closeRTPServer(mediaServerItem.getId(), ssrcInfo.getStream()); } catch (InvalidArgumentException | ParseException | SipException | SsrcTransactionNotFoundException e) {
streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream()); logger.error("[点播超时]发送BYE失败 {}", e.getMessage());
}
} }
callback.run(code, msg, null);
inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId, null, code, msg, null);
inviteStreamService.removeInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, device.getDeviceId(), channelId);
mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream());
mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream());
}, userSetting.getPlayTimeout()); }, userSetting.getPlayTimeout());
try { try {
@ -592,9 +592,11 @@ public class PlayServiceImpl implements IPlayService {
Device device, String channelId, String timeOutTaskKey, ErrorCallback<Object> callback, Device device, String channelId, String timeOutTaskKey, ErrorCallback<Object> callback,
InviteInfo inviteInfo, InviteSessionType inviteSessionType){ InviteInfo inviteInfo, InviteSessionType inviteSessionType){
inviteInfo.setStatus(InviteSessionStatus.ok); inviteInfo.setStatus(InviteSessionStatus.ok);
inviteStreamService.updateInviteInfo(inviteInfo);
ResponseEvent responseEvent = (ResponseEvent) eventResult.event; ResponseEvent responseEvent = (ResponseEvent) eventResult.event;
String contentString = new String(responseEvent.getResponse().getRawContent()); String contentString = new String(responseEvent.getResponse().getRawContent());
String ssrcInResponse = SipUtils.getSsrcFromSdp(contentString); String ssrcInResponse = SipUtils.getSsrcFromSdp(contentString);
logger.info("[点播收到回复OK] deviceId: {}, channelId: {} 发流SSRC {}", device.getDeviceId(), channelId, ssrcInResponse);
// 兼容回复的消息中缺少ssrc(y字段)的情况 // 兼容回复的消息中缺少ssrc(y字段)的情况
if (ssrcInResponse == null) { if (ssrcInResponse == null) {
ssrcInResponse = ssrcInfo.getSsrc(); ssrcInResponse = ssrcInfo.getSsrc();

View File

@ -0,0 +1,45 @@
package com.genersoft.iot.vmp.service.impl;
import com.genersoft.iot.vmp.common.CommonGbChannel;
import com.genersoft.iot.vmp.service.IResourcePlayCallback;
import com.genersoft.iot.vmp.service.IResourceService;
import com.genersoft.iot.vmp.service.bean.CommonGbChannelType;
import org.springframework.stereotype.Service;
@Service(CommonGbChannelType.PROXY)
public class StreamProxyResourceServiceImpl implements IResourceService {
@Override
public boolean deleteChannel(CommonGbChannel commonGbChannel) {
return false;
}
@Override
public void startPlay(CommonGbChannel commonGbChannel, IResourcePlayCallback callback) {
}
@Override
public void stopPlay(CommonGbChannel commonGbChannel, IResourcePlayCallback callback) {
}
@Override
public boolean ptzControl(CommonGbChannel commonGbChannel, String command, Integer horizonSpeed, Integer verticalSpeed, Integer zoomSpeed) {
return false;
}
@Override
public void streamOffline(String app, String streamId) {
}
@Override
public void startPlayback(CommonGbChannel channel, Long startTime, Long stopTime, IResourcePlayCallback callback) {
}
@Override
public void startDownload(CommonGbChannel channel, Long startTime, Long stopTime, Integer downloadSpeed, IResourcePlayCallback playCallback) {
}
}

View File

@ -0,0 +1,45 @@
package com.genersoft.iot.vmp.service.impl;
import com.genersoft.iot.vmp.common.CommonGbChannel;
import com.genersoft.iot.vmp.service.IResourcePlayCallback;
import com.genersoft.iot.vmp.service.IResourceService;
import com.genersoft.iot.vmp.service.bean.CommonGbChannelType;
import org.springframework.stereotype.Service;
@Service(CommonGbChannelType.PUSH)
public class StreamPushResourceServiceImpl implements IResourceService {
@Override
public boolean deleteChannel(CommonGbChannel commonGbChannel) {
return false;
}
@Override
public void startPlay(CommonGbChannel commonGbChannel, IResourcePlayCallback callback) {
}
@Override
public void stopPlay(CommonGbChannel commonGbChannel, IResourcePlayCallback callback) {
}
@Override
public boolean ptzControl(CommonGbChannel commonGbChannel, String command, Integer horizonSpeed, Integer verticalSpeed, Integer zoomSpeed) {
return false;
}
@Override
public void streamOffline(String app, String streamId) {
}
@Override
public void startPlayback(CommonGbChannel channel, Long startTime, Long stopTime, IResourcePlayCallback callback) {
}
@Override
public void startDownload(CommonGbChannel channel, Long startTime, Long stopTime, Integer downloadSpeed, IResourcePlayCallback playCallback) {
}
}

View File

@ -8,6 +8,7 @@ import com.genersoft.iot.vmp.gb28181.bean.PresetQuerySipReq;
import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder; import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder;
import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage; import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander; import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander;
import com.genersoft.iot.vmp.gb28181.utils.SipUtils;
import com.genersoft.iot.vmp.service.IDeviceService; import com.genersoft.iot.vmp.service.IDeviceService;
import com.genersoft.iot.vmp.storager.IVideoManagerStorage; import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
import com.genersoft.iot.vmp.vmanager.bean.DeferredResultEx; import com.genersoft.iot.vmp.vmanager.bean.DeferredResultEx;
@ -193,15 +194,16 @@ public class ApiDeviceController {
} }
Device device = storager.queryVideoDevice(serial); Device device = storager.queryVideoDevice(serial);
String uuid = UUID.randomUUID().toString(); int sn = SipUtils.getNewSn();
String key = DeferredResultHolder.CALLBACK_CMD_PRESETQUERY + serial + code; String msgId = sn + "";
String key = DeferredResultHolder.CALLBACK_CMD_PRESETQUERY + sn;
DeferredResult<Object> result = new DeferredResult<> (timeout * 1000L); DeferredResult<Object> result = new DeferredResult<> (timeout * 1000L);
DeferredResultEx<Object> deferredResultEx = new DeferredResultEx<>(result); DeferredResultEx<Object> deferredResultEx = new DeferredResultEx<>(result);
result.onTimeout(()->{ result.onTimeout(()->{
logger.warn("<模拟接口> 获取设备预置位超时"); logger.warn("<模拟接口> 获取设备预置位超时");
// 释放rtpserver // 释放rtpserver
RequestMessage msg = new RequestMessage(); RequestMessage msg = new RequestMessage();
msg.setId(uuid); msg.setId(msgId);
msg.setKey(key); msg.setKey(key);
msg.setData("wait for presetquery timeout["+timeout+"s]"); msg.setData("wait for presetquery timeout["+timeout+"s]");
resultHolder.invokeResult(msg); resultHolder.invokeResult(msg);
@ -228,12 +230,12 @@ public class ApiDeviceController {
return resultMap; return resultMap;
}); });
resultHolder.put(key, uuid, deferredResultEx); resultHolder.put(key, msgId, deferredResultEx);
try { try {
cmder.presetQuery(device, code, event -> { cmder.presetQuery(device, code, sn, event -> {
RequestMessage msg = new RequestMessage(); RequestMessage msg = new RequestMessage();
msg.setId(uuid); msg.setId(msgId);
msg.setKey(key); msg.setKey(key);
msg.setData(String.format("获取设备预置位失败,错误码: %s, %s", event.statusCode, event.msg)); msg.setData(String.format("获取设备预置位失败,错误码: %s, %s", event.statusCode, event.msg));
resultHolder.invokeResult(msg); resultHolder.invokeResult(msg);