临时提交

dev/多线程性能优化
648540858 2024-11-27 17:19:56 +08:00
parent 3b24a6c8ca
commit 81aadec467
6 changed files with 31 additions and 30 deletions

View File

@ -6,7 +6,6 @@ import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.time.Instant; import java.time.Instant;
import java.util.Date; import java.util.Date;
import java.util.Map; import java.util.Map;
@ -23,17 +22,18 @@ import java.util.concurrent.TimeUnit;
@Component @Component
public class DynamicTask { public class DynamicTask {
private ThreadPoolTaskScheduler threadPoolTaskScheduler; private final ThreadPoolTaskScheduler threadPoolTaskScheduler;
private final Map<String, ScheduledFuture<?>> futureMap = new ConcurrentHashMap<>(); private final Map<String, ScheduledFuture<?>> futureMap = new ConcurrentHashMap<>();
private final Map<String, Runnable> runnableMap = new ConcurrentHashMap<>(); private final Map<String, Runnable> runnableMap = new ConcurrentHashMap<>();
@PostConstruct
public void DynamicTask() { public DynamicTask() {
threadPoolTaskScheduler = new ThreadPoolTaskScheduler(); threadPoolTaskScheduler = new ThreadPoolTaskScheduler();
threadPoolTaskScheduler.setPoolSize(300); threadPoolTaskScheduler.setPoolSize(300);
threadPoolTaskScheduler.setWaitForTasksToCompleteOnShutdown(true); threadPoolTaskScheduler.setWaitForTasksToCompleteOnShutdown(true);
threadPoolTaskScheduler.setAwaitTerminationSeconds(10); threadPoolTaskScheduler.setAwaitTerminationSeconds(10);
threadPoolTaskScheduler.setThreadNamePrefix("dynamicTask-");
threadPoolTaskScheduler.initialize(); threadPoolTaskScheduler.initialize();
} }
@ -42,7 +42,6 @@ public class DynamicTask {
* @param key ID * @param key ID
* @param task * @param task
* @param cycleForCatalog * @param cycleForCatalog
* @return
*/ */
public void startCron(String key, Runnable task, int cycleForCatalog) { public void startCron(String key, Runnable task, int cycleForCatalog) {
if(ObjectUtils.isEmpty(key)) { if(ObjectUtils.isEmpty(key)) {
@ -74,7 +73,6 @@ public class DynamicTask {
* @param key ID * @param key ID
* @param task * @param task
* @param delay / * @param delay /
* @return
*/ */
public void startDelay(String key, Runnable task, int delay) { public void startDelay(String key, Runnable task, int delay) {
if(ObjectUtils.isEmpty(key)) { if(ObjectUtils.isEmpty(key)) {

View File

@ -1,30 +1,24 @@
package com.genersoft.iot.vmp.conf; package com.genersoft.iot.vmp.conf;
import org.apache.commons.lang3.concurrent.BasicThreadFactory; import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.SchedulingConfigurer; import org.springframework.scheduling.annotation.SchedulingConfigurer;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.scheduling.config.ScheduledTaskRegistrar; import org.springframework.scheduling.config.ScheduledTaskRegistrar;
import org.springframework.stereotype.Component;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadPoolExecutor;
/** /**
* "@Scheduled"Spring线 * "@Scheduled"Spring线
* 线线 * 线线
*/ */
@Configuration @Component
public class ScheduleConfig implements SchedulingConfigurer { public class ScheduleConfig implements SchedulingConfigurer {
public static final int cpuNum = Runtime.getRuntime().availableProcessors(); @Qualifier("taskExecutor")
private ThreadPoolTaskExecutor taskExecutor;
private static final int corePoolSize = cpuNum;
private static final String threadNamePrefix = "scheduled-task-pool-%d";
@Override @Override
public void configureTasks(ScheduledTaskRegistrar taskRegistrar) { public void configureTasks(ScheduledTaskRegistrar taskRegistrar) {
taskRegistrar.setScheduler(new ScheduledThreadPoolExecutor(corePoolSize, taskRegistrar.setScheduler(taskExecutor);
new BasicThreadFactory.Builder().namingPattern(threadNamePrefix).daemon(true).build(),
new ThreadPoolExecutor.CallerRunsPolicy()));
} }
} }

View File

@ -45,12 +45,9 @@ public class ThreadPoolTaskConfig {
/** /**
* 线 * 线
*/ */
private static final String threadNamePrefix = "wvp-"; private static final String threadNamePrefix = "async-";
/**
*
* @return
*/
@Bean("taskExecutor") // bean的名称默认为首字母小写的方法名 @Bean("taskExecutor") // bean的名称默认为首字母小写的方法名
public ThreadPoolTaskExecutor taskExecutor() { public ThreadPoolTaskExecutor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();

View File

@ -2,12 +2,14 @@ package com.genersoft.iot.vmp.conf;
import com.alibaba.fastjson2.JSONObject; import com.alibaba.fastjson2.JSONObject;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.annotation.Scheduled; import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
@Component @Component
@Slf4j
public class WVPTimerTask { public class WVPTimerTask {
@Autowired @Autowired
@ -21,6 +23,7 @@ public class WVPTimerTask {
@Scheduled(fixedDelay = 2 * 1000) //每3秒执行一次 @Scheduled(fixedDelay = 2 * 1000) //每3秒执行一次
public void execute(){ public void execute(){
log.info("[更新服务信息]");
JSONObject jsonObject = new JSONObject(); JSONObject jsonObject = new JSONObject();
jsonObject.put("ip", sipConfig.getShowIp()); jsonObject.put("ip", sipConfig.getShowIp());
jsonObject.put("port", serverPort); jsonObject.put("port", serverPort);

View File

@ -167,7 +167,7 @@ public class DeviceServiceImpl implements IDeviceService {
deviceMapper.update(device); deviceMapper.update(device);
redisCatchStorage.updateDevice(device); redisCatchStorage.updateDevice(device);
} }
if (deviceChannelMapper.queryChannelsByDeviceDbId(device.getId()).isEmpty()) { if (deviceChannelMapper.queryChannelsByDeviceDbId(device.getId()).isEmpty() && !catalogResponseMessageHandler.isSyncRunning(device.getDeviceId())) {
log.info("[设备上线]: {}通道数为0,查询通道信息", device.getDeviceId()); log.info("[设备上线]: {}通道数为0,查询通道信息", device.getDeviceId());
sync(device); sync(device);
} }
@ -221,9 +221,14 @@ public class DeviceServiceImpl implements IDeviceService {
sessionManager.removeByCallId(ssrcTransaction.getCallId()); sessionManager.removeByCallId(ssrcTransaction.getCallId());
} }
} }
if(device.getSubscribeCycleForCatalog() > 0) {
// 移除订阅 // 移除订阅
removeCatalogSubscribe(device, null); removeCatalogSubscribe(device, null);
}
if(device.getSubscribeCycleForMobilePosition() > 0) {
// 移除订阅
removeMobilePositionSubscribe(device, null); removeMobilePositionSubscribe(device, null);
}
List<AudioBroadcastCatch> audioBroadcastCatches = audioBroadcastManager.getByDeviceId(deviceId); List<AudioBroadcastCatch> audioBroadcastCatches = audioBroadcastManager.getByDeviceId(deviceId);
if (!audioBroadcastCatches.isEmpty()) { if (!audioBroadcastCatches.isEmpty()) {

View File

@ -66,11 +66,15 @@ public class SipRunner implements CommandLineRunner {
for (Device device : deviceList) { for (Device device : deviceList) {
if (deviceService.expire(device)){ if (deviceService.expire(device)){
if (device.isOnLine()) {
deviceService.offline(device.getDeviceId(), "注册已过期"); deviceService.offline(device.getDeviceId(), "注册已过期");
}
}else { }else {
if (!device.isOnLine()) {
deviceService.online(device, null); deviceService.online(device, null);
} }
} }
}
// 重置cseq计数 // 重置cseq计数
redisCatchStorage.resetAllCSEQ(); redisCatchStorage.resetAllCSEQ();
// 清理redis // 清理redis