修复代理访问跨域问题,和ScheduledFuture取消任务时指令重排异常,不结束正在运行的任务。防止 Command interrupted

pull/996/head
chenzhangyue 2023-08-10 20:00:43 +08:00
parent 6106bda151
commit dd2ae65786
12 changed files with 68 additions and 17 deletions

View File

@ -12,7 +12,10 @@ import org.springframework.core.annotation.Order;
import org.springframework.core.io.ClassPathResource;
import org.springframework.util.ObjectUtils;
import java.io.*;
import java.io.BufferedReader;
import java.io.File;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.nio.file.Files;
import java.util.Map;

View File

@ -111,7 +111,7 @@ public class DynamicTask {
}
boolean result = false;
if (!ObjectUtils.isEmpty(futureMap.get(key)) && !futureMap.get(key).isCancelled() && !futureMap.get(key).isDone()) {
result = futureMap.get(key).cancel(true);
result = futureMap.get(key).cancel(false);
futureMap.remove(key);
runnableMap.remove(key);
}
@ -143,7 +143,8 @@ public class DynamicTask {
public void execute(){
if (futureMap.size() > 0) {
for (String key : futureMap.keySet()) {
if (futureMap.get(key).isDone() || futureMap.get(key).isCancelled()) {
ScheduledFuture<?> future = futureMap.get(key);
if (future.isDone() || future.isCancelled()) {
futureMap.remove(key);
runnableMap.remove(key);
}

View File

@ -18,6 +18,7 @@ import org.springframework.util.ObjectUtils;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.net.ConnectException;
@ -64,6 +65,18 @@ public class ProxyServletConfig {
return queryStr;
}
@Override
protected HttpResponse doExecute(HttpServletRequest servletRequest, HttpServletResponse servletResponse,
HttpRequest proxyRequest) throws IOException {
HttpResponse response = super.doExecute(servletRequest, servletResponse, proxyRequest);
response.removeHeaders("Access-Control-Allow-Origin");
response.setHeader("Access-Control-Allow-Credentials","true");
response.removeHeaders("Access-Control-Allow-Credentials");
return response;
}
/**
*
*/
@ -181,6 +194,18 @@ public class ProxyServletConfig {
return queryStr;
}
@Override
protected HttpResponse doExecute(HttpServletRequest servletRequest, HttpServletResponse servletResponse,
HttpRequest proxyRequest) throws IOException {
HttpResponse response = super.doExecute(servletRequest, servletResponse, proxyRequest);
String origin = servletRequest.getHeader("origin");
response.setHeader("Access-Control-Allow-Origin",origin);
response.setHeader("Access-Control-Allow-Credentials","true");
return response;
}
/**
*
*/

View File

@ -35,7 +35,7 @@ public class JwtUtils {
/**
* token()
*/
public static final long expirationTime = 30;
public static final long expirationTime = 30 * 24 * 60;
public static String createToken(String username, String password, Integer roleId) {
try {

View File

@ -20,6 +20,8 @@ public interface ISIPCommanderForPlatform {
void register(ParentPlatform parentPlatform, SipSubscribe.Event errorEvent , SipSubscribe.Event okEvent) throws InvalidArgumentException, ParseException, SipException;
void register(ParentPlatform parentPlatform, SipTransactionInfo sipTransactionInfo, SipSubscribe.Event errorEvent , SipSubscribe.Event okEvent) throws InvalidArgumentException, ParseException, SipException;
void register(ParentPlatform parentPlatform, SipTransactionInfo sipTransactionInfo, WWWAuthenticateHeader www, SipSubscribe.Event errorEvent , SipSubscribe.Event okEvent, boolean isRegister) throws SipException, InvalidArgumentException, ParseException;
/**

View File

@ -132,7 +132,7 @@ public class NotifyRequestProcessor extends SIPRequestProcessorParent implements
if (CmdType.CATALOG.equals(cmd)) {
logger.info("接收到Catalog通知");
// processNotifyCatalogList(take.getEvt());
processNotifyCatalogList(take.getEvt());
notifyRequestForCatalogProcessor.process(take.getEvt());
} else if (CmdType.ALARM.equals(cmd)) {
logger.info("接收到Alarm通知");

View File

@ -85,7 +85,11 @@ public class RegisterRequestProcessor extends SIPRequestProcessorParent implemen
Response response = null;
boolean passwordCorrect = false;
// 注册标志
boolean registerFlag;
boolean registerFlag = true;
if (request.getExpires().getExpires() == 0) {
// 注销成功
registerFlag = false;
}
FromHeader fromHeader = (FromHeader) request.getHeader(FromHeader.NAME);
AddressImpl address = (AddressImpl) fromHeader.getAddress();
SipUri uri = (SipUri) address.getURI();
@ -96,11 +100,12 @@ public class RegisterRequestProcessor extends SIPRequestProcessorParent implemen
RemoteAddressInfo remoteAddressInfo = SipUtils.getRemoteAddressFromRequest(request,
userSetting.getSipUseSourceIpAsRemoteAddress());
String requestAddress = remoteAddressInfo.getIp() + ":" + remoteAddressInfo.getPort();
logger.info("[注册请求] 设备:{}, 开始处理: {}", deviceId, requestAddress);
String title = registerFlag ? "[注册请求]": "[注销请求]";
logger.info(title + "设备:{}, 开始处理: {}", deviceId, requestAddress);
if (device != null &&
device.getSipTransactionInfo() != null &&
request.getCallIdHeader().getCallId().equals(device.getSipTransactionInfo().getCallId())) {
logger.info("[注册请求] 设备:{}, 注册续订: {}",device.getDeviceId(), device.getDeviceId());
logger.info(title + "设备:{}, 注册续订: {}",device.getDeviceId(), device.getDeviceId());
device.setExpires(request.getExpires().getExpires());
device.setIp(remoteAddressInfo.getIp());
device.setPort(remoteAddressInfo.getPort());
@ -120,7 +125,7 @@ public class RegisterRequestProcessor extends SIPRequestProcessorParent implemen
String password = (device != null && !ObjectUtils.isEmpty(device.getPassword()))? device.getPassword() : sipConfig.getPassword();
AuthorizationHeader authHead = (AuthorizationHeader) request.getHeader(AuthorizationHeader.NAME);
if (authHead == null && !ObjectUtils.isEmpty(password)) {
logger.info("[注册请求] 设备:{}, 回复401: {}",deviceId, requestAddress);
logger.info(title + " 设备:{}, 回复401: {}",deviceId, requestAddress);
response = getMessageFactory().createResponse(Response.UNAUTHORIZED, request);
new DigestServerAuthenticationHelper().generateChallenge(getHeaderFactory(), response, sipConfig.getDomain());
sipSender.transmitRequest(request.getLocalAddress().getHostAddress(), response);
@ -135,7 +140,7 @@ public class RegisterRequestProcessor extends SIPRequestProcessorParent implemen
// 注册失败
response = getMessageFactory().createResponse(Response.FORBIDDEN, request);
response.setReasonPhrase("wrong password");
logger.info("[注册请求] 设备:{}, 密码/SIP服务器ID错误, 回复403: {}", deviceId, requestAddress);
logger.info(title + " 设备:{}, 密码/SIP服务器ID错误, 回复403: {}", deviceId, requestAddress);
sipSender.transmitRequest(request.getLocalAddress().getHostAddress(), response);
return;
}

View File

@ -108,6 +108,7 @@ public class DeviceServiceImpl implements IDeviceService {
inviteStreamService.clearInviteInfo(device.getDeviceId());
}
device.setUpdateTime(now);
device.setKeepaliveTime(now);
if (device.getKeepaliveIntervalTime() == 0) {
// 默认心跳间隔60
device.setKeepaliveIntervalTime(60);
@ -209,7 +210,7 @@ public class DeviceServiceImpl implements IDeviceService {
redisCatchStorage.updateDevice(device);
deviceMapper.update(device);
//进行通道离线
// deviceChannelMapper.offlineByDeviceId(deviceId);
deviceChannelMapper.offlineByDeviceId(deviceId);
// 离线释放所有ssrc
List<SsrcTransaction> ssrcTransactions = streamSession.getSsrcTransactionForAll(deviceId, null, null, null);
if (ssrcTransactions != null && ssrcTransactions.size() > 0) {

View File

@ -234,7 +234,6 @@ public class PlatformServiceImpl implements IPlatformService {
// 设置平台离线,并重新注册
logger.info("[国标级联] 三次心跳超时, 平台{}({})离线", parentPlatform.getName(), parentPlatform.getServerGBId());
offline(parentPlatform, false);
}
}else {
@ -249,6 +248,7 @@ public class PlatformServiceImpl implements IPlatformService {
platformCatch.setKeepAliveReply(0);
redisCatchStorage.updatePlatformCatchInfo(platformCatch);
}
logger.info("[发送心跳] 国标级联 发送心跳, code {}, msg: {}", eventResult.statusCode, eventResult.msg);
});
} catch (SipException | InvalidArgumentException | ParseException e) {
logger.error("[命令发送失败] 国标级联 发送心跳: {}", e.getMessage());

View File

@ -193,7 +193,7 @@ public interface DeviceChannelMapper {
@Update(value = {"UPDATE wvp_device_channel SET status=false WHERE device_id=#{deviceId} AND channel_id=#{channelId}"})
void offline(String deviceId, String channelId);
@Update(value = {"UPDATE wvp_device_channel SET status=fasle WHERE device_id=#{deviceId}"})
@Update(value = {"UPDATE wvp_device_channel SET status=false WHERE device_id=#{deviceId}"})
void offlineByDeviceId(String deviceId);
@Insert("<script> " +

View File

@ -16,6 +16,7 @@ import com.genersoft.iot.vmp.service.bean.GPSMsgInfo;
import com.genersoft.iot.vmp.service.bean.MessageForPushChannel;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.dao.DeviceChannelMapper;
import com.genersoft.iot.vmp.storager.dao.DeviceMapper;
import com.genersoft.iot.vmp.storager.dao.dto.PlatformRegisterInfo;
import com.genersoft.iot.vmp.utils.DateUtil;
import com.genersoft.iot.vmp.utils.JsonUtil;
@ -40,6 +41,9 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage {
@Autowired
private DeviceChannelMapper deviceChannelMapper;
@Autowired
private DeviceMapper deviceMapper;
@Autowired
private UserSetting userSetting;
@ -375,7 +379,8 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage {
for (Object o : keys) {
String key = (String) o;
Device device = JsonUtil.redisJsonToObject(redisTemplate, key, Device.class);
if (Objects.nonNull(device)) { // 只取没有存过得
if (Objects.nonNull(device)) {
// 只取没有存过得
result.add(JsonUtil.redisJsonToObject(redisTemplate, key, Device.class));
}
}
@ -386,14 +391,22 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage {
@Override
public Device getDevice(String deviceId) {
String key = VideoManagerConstants.DEVICE_PREFIX + userSetting.getServerId() + "_" + deviceId;
return JsonUtil.redisJsonToObject(redisTemplate, key, Device.class);
Device device = JsonUtil.redisJsonToObject(redisTemplate, key, Device.class);
if (device == null){
device = deviceMapper.getDeviceByDeviceId(deviceId);
if (device != null) {
updateDevice(device);
}
}
return device;
}
@Override
public void updateGpsMsgInfo(GPSMsgInfo gpsMsgInfo) {
String key = VideoManagerConstants.WVP_STREAM_GPS_MSG_PREFIX + userSetting.getServerId() + "_" + gpsMsgInfo.getId();
Duration duration = Duration.ofSeconds(60L);
redisTemplate.opsForValue().set(key, gpsMsgInfo, duration); // 默认GPS消息保存1分钟
redisTemplate.opsForValue().set(key, gpsMsgInfo, duration);
// 默认GPS消息保存1分钟
}
@Override

View File

@ -1,5 +1,6 @@
package com.genersoft.iot.vmp.utils.redis;
import com.google.common.collect.Lists;
import org.springframework.data.redis.core.Cursor;
import org.springframework.data.redis.core.RedisCallback;
import org.springframework.data.redis.core.RedisTemplate;
@ -38,7 +39,7 @@ public class RedisUtil {
return keys;
});
return new ArrayList<>(resultKeys);
return Lists.newArrayList(resultKeys);
}
}