在一些甲方需求中,有打包或者导出海量数据,无法在短时间内进行导出,需要异步在后台执行,然后把结果存在数据库中,返回给用户,比如返回用户下载地址,让用户在【任务中心】下载
本例子使用MySql数据库来管理任务的状态、进度以及返回的信息。
1.SpringTaskExecutor
SpringBoot视同异步需要使用@EnableAsync开启异步任务支持
@Configuration @EnableAsync //开启异步任务支持 public class SpringTaskExecutor implements AsyncConfigurer { @Override public Executor getAsyncExecutor() { ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor(); taskExecutor.setCorePoolSize(5); taskExecutor.setMaxPoolSize(10); taskExecutor.setQueueCapacity(20); taskExecutor.initialize(); return taskExecutor; } @Override public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() { return null; } }
2.AsyncTaskExecutor
@Component public class AsyncTaskExecutor { private static Logger LOG = LoggerFactory.getLogger(AsyncTaskExecutor.class); @Async public void executor(AsyncTaskConstructor asyncTaskGenerator, String taskId) { LOG.info("AsyncTaskExecutor is executing async task:{}", taskId); asyncTaskGenerator.async(); } }
3.AsyncTaskConstructor
public interface AsyncTaskConstructor { public void async(); }
4.AsyncTaskMonitor
/** * 异步任务监控 */ @Component @Aspect public class AsyncTaskMonitor { @Autowired AsyncTaskManager manager; private static final Logger LOG = LoggerFactory.getLogger(AsyncTaskMonitor.class); @Around("execution(* com.kelongyin.asyncTask.service.AsyncTaskExecutor.*(..))") public PotAsyncTask taskHandle(ProceedingJoinPoint pjp) { //获取taskId String taskId = pjp.getArgs()[1].toString(); //获取任务信息 PotAsyncTask taskInfo = manager.getTaskInfo(taskId); LOG.info("AsyncTaskMonitor is monitoring async task:{}", taskId); taskInfo.setTaskStatus(TaskStatusEnum.RUNNING.getState()); manager.setTaskInfo(taskInfo); TaskStatusEnum status; try { pjp.proceed(); status = TaskStatusEnum.SUCCESS; } catch (Throwable throwable) { throwable.printStackTrace(); status = TaskStatusEnum.FAILED; LOG.error("AsyncTaskMonitor:async task {} is failed.Error info:{}", taskId, throwable.getMessage()); } taskInfo = manager.getTaskInfo(taskId); taskInfo.setUpdateTime(new Date()); taskInfo.setTaskStatus(status.getState()); manager.setTaskInfo(taskInfo); return taskInfo; } }
5.AsyncTaskManager
/** * 异步任务管理器 */ @Component public class AsyncTaskManager { @Autowired AsyncTaskExecutor asyncTaskExecutor; //MySql管理任务的表 @Autowired private IPotAsyncTaskService ppmAsyncTaskService; /** * 初始化任务 * * @param asyncTaskConstructor 异步任务构造器 * @return taskInfo */ public PotAsyncTask submit(AsyncTaskConstructor asyncTaskConstructor, Long asyncTaskId) { PotAsyncTask info = ppmAsyncTaskService.selectPotAsyncTaskById(asyncTaskId); if (info == null) { return null; } asyncTaskExecutor.executor(asyncTaskConstructor, info.getTaskId()); return info; } /** * 保存任务信息 * * @param taskInfo 任务信息 */ public void setTaskInfo(PotAsyncTask taskInfo) { ppmAsyncTaskService.updatePotAsyncTask(taskInfo); } /** * 获取任务信息 * * @param taskId 任务ID * @return */ public PotAsyncTask getTaskInfo(String taskId) { return ppmAsyncTaskService.selectPotAsyncTaskByTaskId(taskId); } }
6.TaskInfo
public class TaskInfo { private String taskId; private TaskStatusEnum status; private Date startTime; private Date endTime; private String totalTime; }
7.TaskStatusEnum
/** * 任务状态枚举 */ public enum TaskStatusEnum { STARTED("0", "任务已经启动"), RUNNING("1", "任务正在运行"), SUCCESS("2", "任务执行成功"), FAILED("9", "任务执行失败"); private final String state; private final String stateInfo; TaskStatusEnum(String state, String stateInfo) { this.state = state; this.stateInfo = stateInfo; } public String getState() { return state; } public String getStateInfo() { return stateInfo; } }
8.具体调用
//调用任务管理器中的submit去提交一个异步任务 //potAsyncTask 是用于管理异步任务的实体,先创建好任务,然后再submit异步执行,如果失败或者成功通过AsyncTaskId变更表中的状态 PotAsyncTask taskInfo = asyncTaskManager.submit(() -> { // task code }, potAsyncTask.getAsyncTaskId());