在一些甲方需求中,有打包或者导出海量数据,无法在短时间内进行导出,需要异步在后台执行,然后把结果存在数据库中,返回给用户,比如返回用户下载地址,让用户在【任务中心】下载
本例子使用MySql数据库来管理任务的状态、进度以及返回的信息。
1.SpringTaskExecutor
SpringBoot视同异步需要使用@EnableAsync开启异步任务支持
@Configuration
@EnableAsync //开启异步任务支持
public class SpringTaskExecutor implements AsyncConfigurer {
@Override
public Executor getAsyncExecutor() {
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
taskExecutor.setCorePoolSize(5);
taskExecutor.setMaxPoolSize(10);
taskExecutor.setQueueCapacity(20);
taskExecutor.initialize();
return taskExecutor;
}
@Override
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
return null;
}
}
2.AsyncTaskExecutor
@Component
public class AsyncTaskExecutor {
private static Logger LOG = LoggerFactory.getLogger(AsyncTaskExecutor.class);
@Async
public void executor(AsyncTaskConstructor asyncTaskGenerator, String taskId) {
LOG.info("AsyncTaskExecutor is executing async task:{}", taskId);
asyncTaskGenerator.async();
}
}
3.AsyncTaskConstructor
public interface AsyncTaskConstructor {
public void async();
}
4.AsyncTaskMonitor
/**
* 异步任务监控
*/
@Component
@Aspect
public class AsyncTaskMonitor {
@Autowired
AsyncTaskManager manager;
private static final Logger LOG = LoggerFactory.getLogger(AsyncTaskMonitor.class);
@Around("execution(* com.kelongyin.asyncTask.service.AsyncTaskExecutor.*(..))")
public PotAsyncTask taskHandle(ProceedingJoinPoint pjp) {
//获取taskId
String taskId = pjp.getArgs()[1].toString();
//获取任务信息
PotAsyncTask taskInfo = manager.getTaskInfo(taskId);
LOG.info("AsyncTaskMonitor is monitoring async task:{}", taskId);
taskInfo.setTaskStatus(TaskStatusEnum.RUNNING.getState());
manager.setTaskInfo(taskInfo);
TaskStatusEnum status;
try {
pjp.proceed();
status = TaskStatusEnum.SUCCESS;
} catch (Throwable throwable) {
throwable.printStackTrace();
status = TaskStatusEnum.FAILED;
LOG.error("AsyncTaskMonitor:async task {} is failed.Error info:{}", taskId, throwable.getMessage());
}
taskInfo = manager.getTaskInfo(taskId);
taskInfo.setUpdateTime(new Date());
taskInfo.setTaskStatus(status.getState());
manager.setTaskInfo(taskInfo);
return taskInfo;
}
}
5.AsyncTaskManager
/**
* 异步任务管理器
*/
@Component
public class AsyncTaskManager {
@Autowired
AsyncTaskExecutor asyncTaskExecutor;
//MySql管理任务的表
@Autowired
private IPotAsyncTaskService ppmAsyncTaskService;
/**
* 初始化任务
*
* @param asyncTaskConstructor 异步任务构造器
* @return taskInfo
*/
public PotAsyncTask submit(AsyncTaskConstructor asyncTaskConstructor, Long asyncTaskId) {
PotAsyncTask info = ppmAsyncTaskService.selectPotAsyncTaskById(asyncTaskId);
if (info == null) {
return null;
}
asyncTaskExecutor.executor(asyncTaskConstructor, info.getTaskId());
return info;
}
/**
* 保存任务信息
*
* @param taskInfo 任务信息
*/
public void setTaskInfo(PotAsyncTask taskInfo) {
ppmAsyncTaskService.updatePotAsyncTask(taskInfo);
}
/**
* 获取任务信息
*
* @param taskId 任务ID
* @return
*/
public PotAsyncTask getTaskInfo(String taskId) {
return ppmAsyncTaskService.selectPotAsyncTaskByTaskId(taskId);
}
}
6.TaskInfo
public class TaskInfo {
private String taskId;
private TaskStatusEnum status;
private Date startTime;
private Date endTime;
private String totalTime;
}
7.TaskStatusEnum
/**
* 任务状态枚举
*/
public enum TaskStatusEnum {
STARTED("0", "任务已经启动"),
RUNNING("1", "任务正在运行"),
SUCCESS("2", "任务执行成功"),
FAILED("9", "任务执行失败");
private final String state;
private final String stateInfo;
TaskStatusEnum(String state, String stateInfo) {
this.state = state;
this.stateInfo = stateInfo;
}
public String getState() {
return state;
}
public String getStateInfo() {
return stateInfo;
}
}
8.具体调用
//调用任务管理器中的submit去提交一个异步任务
//potAsyncTask 是用于管理异步任务的实体,先创建好任务,然后再submit异步执行,如果失败或者成功通过AsyncTaskId变更表中的状态
PotAsyncTask taskInfo = asyncTaskManager.submit(() -> {
// task code
}, potAsyncTask.getAsyncTaskId());