diff --git a/pom.xml b/pom.xml index 32b9b26..37468fc 100644 --- a/pom.xml +++ b/pom.xml @@ -183,7 +183,10 @@ org.apache.commons commons-pool2 - + + org.springframework.boot + spring-boot-starter-webflux + io.milvus diff --git a/src/main/java/com/project/ding/config/SyncThreadPoolConfig.java b/src/main/java/com/project/ding/config/DingSyncThreadPoolConfig.java similarity index 95% rename from src/main/java/com/project/ding/config/SyncThreadPoolConfig.java rename to src/main/java/com/project/ding/config/DingSyncThreadPoolConfig.java index b10c9ba..f430fda 100644 --- a/src/main/java/com/project/ding/config/SyncThreadPoolConfig.java +++ b/src/main/java/com/project/ding/config/DingSyncThreadPoolConfig.java @@ -10,7 +10,7 @@ import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @Configuration -public class SyncThreadPoolConfig { +public class DingSyncThreadPoolConfig { @Bean(name = "dingUserSyncExecutor") public ExecutorService dingUserSyncExecutor() { return new ThreadPoolExecutor( diff --git a/src/main/java/com/project/information/domain/dto/KnowledgePointDTO.java b/src/main/java/com/project/information/domain/dto/KnowledgePointDTO.java new file mode 100644 index 0000000..9f20f5b --- /dev/null +++ b/src/main/java/com/project/information/domain/dto/KnowledgePointDTO.java @@ -0,0 +1,7 @@ +package com.project.information.domain.dto; + +import lombok.Data; + +@Data +public class KnowledgePointDTO { +} diff --git a/src/main/java/com/project/information/domain/entity/InformationEntity.java b/src/main/java/com/project/information/domain/entity/InformationEntity.java index f95571a..0bf0771 100644 --- a/src/main/java/com/project/information/domain/entity/InformationEntity.java +++ b/src/main/java/com/project/information/domain/entity/InformationEntity.java @@ -5,6 +5,7 @@ import com.baomidou.mybatisplus.annotation.TableField; import com.baomidou.mybatisplus.annotation.TableId; import com.baomidou.mybatisplus.annotation.TableName; import com.project.base.domain.entity.BaseEntity; +import com.project.information.domain.enums.InformationParseStatusEnum; import jakarta.persistence.Column; import jakarta.persistence.Entity; import jakarta.persistence.Id; @@ -46,5 +47,5 @@ public class InformationEntity extends BaseEntity { @Column(name = "parse_status") @TableField("parse_status") @Comment("文件处理状态") - private Integer parseStatus; + private Integer parseStatus = InformationParseStatusEnum.NotStart.getValue(); } diff --git a/src/main/java/com/project/information/domain/service/KnowledgePointBaseService.java b/src/main/java/com/project/information/domain/service/KnowledgePointBaseService.java new file mode 100644 index 0000000..4d2472e --- /dev/null +++ b/src/main/java/com/project/information/domain/service/KnowledgePointBaseService.java @@ -0,0 +1,7 @@ +package com.project.information.domain.service; + +import com.baomidou.mybatisplus.extension.service.IService; +import com.project.information.domain.entity.KnowledgePointEntity; + +public interface KnowledgePointBaseService extends IService { +} diff --git a/src/main/java/com/project/information/domain/service/impl/KnowledgePointBaseServiceImpl.java b/src/main/java/com/project/information/domain/service/impl/KnowledgePointBaseServiceImpl.java new file mode 100644 index 0000000..556c22e --- /dev/null +++ b/src/main/java/com/project/information/domain/service/impl/KnowledgePointBaseServiceImpl.java @@ -0,0 +1,11 @@ +package com.project.information.domain.service.impl; + +import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; +import com.project.information.domain.entity.KnowledgePointEntity; +import com.project.information.domain.service.KnowledgePointBaseService; +import com.project.information.mapper.KnowledgePointMapper; +import org.springframework.stereotype.Service; + +@Service +public class KnowledgePointBaseServiceImpl extends ServiceImpl implements KnowledgePointBaseService { +} diff --git a/src/main/java/com/project/interactor/application/AlgorithmApplicationService.java b/src/main/java/com/project/interactor/application/AlgorithmApplicationService.java new file mode 100644 index 0000000..e0f5637 --- /dev/null +++ b/src/main/java/com/project/interactor/application/AlgorithmApplicationService.java @@ -0,0 +1,10 @@ +package com.project.interactor.application; + +import com.project.information.domain.dto.KnowledgePointDTO; + +import java.util.List; + +public interface AlgorithmApplicationService { + + void postToClustering(Long taskId, List kpList); +} diff --git a/src/main/java/com/project/interactor/application/impl/AlgorithmApplicationServiceImpl.java b/src/main/java/com/project/interactor/application/impl/AlgorithmApplicationServiceImpl.java new file mode 100644 index 0000000..9d3978a --- /dev/null +++ b/src/main/java/com/project/interactor/application/impl/AlgorithmApplicationServiceImpl.java @@ -0,0 +1,19 @@ +package com.project.interactor.application.impl; + +import com.project.information.domain.dto.KnowledgePointDTO; +import com.project.interactor.application.AlgorithmApplicationService; +import com.project.interactor.domain.service.PostToClusteringDomainService; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +import java.util.List; + +@Service +public class AlgorithmApplicationServiceImpl implements AlgorithmApplicationService { + @Autowired + private PostToClusteringDomainService postToClusteringDomainService; + @Override + public void postToClustering(Long taskId, List kpList) { + postToClusteringDomainService.postToClustering(taskId , kpList); + } +} diff --git a/src/main/java/com/project/interactor/config/WebClientConfig.java b/src/main/java/com/project/interactor/config/WebClientConfig.java new file mode 100644 index 0000000..553275a --- /dev/null +++ b/src/main/java/com/project/interactor/config/WebClientConfig.java @@ -0,0 +1,41 @@ +package com.project.interactor.config; + +import io.netty.channel.ChannelOption; +import io.netty.handler.timeout.ReadTimeoutHandler; +import io.netty.handler.timeout.WriteTimeoutHandler; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.http.client.reactive.ReactorClientHttpConnector; +import org.springframework.web.reactive.function.client.WebClient; +import reactor.netty.http.client.HttpClient; + +import java.time.Duration; +import java.util.concurrent.TimeUnit; + +@Configuration +public class WebClientConfig { + @Value("${algo.baseUrl}") + private String baseUrl; + + @Bean(name = "algorithmWebClient") + public WebClient algorithmWebClient() { + // 配置HTTP客户端,设置长超时以支持流式响应 + HttpClient httpClient = HttpClient.create() + .responseTimeout(Duration.ofSeconds(5)) + .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000) + .doOnConnected(conn -> conn + // 2. 读超时:与全局一致,60秒。 + .addHandlerLast(new ReadTimeoutHandler(60, TimeUnit.SECONDS)) + // 3. 写超时:30秒。 + // 因为你推送的是大量 Word 里的文本知识点,网络传输压力比普通接口大。 + .addHandlerLast(new WriteTimeoutHandler(30, TimeUnit.SECONDS)) + ); + + return WebClient.builder() + .baseUrl(baseUrl) // 设置基础URL + .clientConnector(new ReactorClientHttpConnector(httpClient)) + .build(); + } + +} diff --git a/src/main/java/com/project/interactor/domain/dto/ClusterQueryDTO.java b/src/main/java/com/project/interactor/domain/dto/ClusterQueryDTO.java new file mode 100644 index 0000000..176de9f --- /dev/null +++ b/src/main/java/com/project/interactor/domain/dto/ClusterQueryDTO.java @@ -0,0 +1,13 @@ +package com.project.interactor.domain.dto; + + +import com.project.information.domain.dto.KnowledgePointDTO; +import lombok.Data; + +import java.util.List; + +@Data +public class ClusterQueryDTO { + public ClusterQueryDTO(Long taskId, List kpList) { + } +} diff --git a/src/main/java/com/project/interactor/domain/dto/ClusterResultDTO.java b/src/main/java/com/project/interactor/domain/dto/ClusterResultDTO.java new file mode 100644 index 0000000..db351a7 --- /dev/null +++ b/src/main/java/com/project/interactor/domain/dto/ClusterResultDTO.java @@ -0,0 +1,9 @@ +package com.project.interactor.domain.dto; + +import lombok.Data; + +@Data +public class ClusterResultDTO { + private Boolean success; + private String message; +} diff --git a/src/main/java/com/project/interactor/domain/service/PostToClusteringDomainService.java b/src/main/java/com/project/interactor/domain/service/PostToClusteringDomainService.java new file mode 100644 index 0000000..bd777fb --- /dev/null +++ b/src/main/java/com/project/interactor/domain/service/PostToClusteringDomainService.java @@ -0,0 +1,9 @@ +package com.project.interactor.domain.service; + +import com.project.information.domain.dto.KnowledgePointDTO; + +import java.util.List; + +public interface PostToClusteringDomainService { + void postToClustering(Long taskId, List kpList); +} diff --git a/src/main/java/com/project/interactor/domain/service/impl/PostToClusteringDomainServiceImpl.java b/src/main/java/com/project/interactor/domain/service/impl/PostToClusteringDomainServiceImpl.java new file mode 100644 index 0000000..d9cd40c --- /dev/null +++ b/src/main/java/com/project/interactor/domain/service/impl/PostToClusteringDomainServiceImpl.java @@ -0,0 +1,54 @@ +package com.project.interactor.domain.service.impl; + +import com.project.information.domain.dto.KnowledgePointDTO; +import com.project.interactor.domain.dto.ClusterQueryDTO; +import com.project.interactor.domain.dto.ClusterResultDTO; +import com.project.interactor.domain.service.PostToClusteringDomainService; +import jakarta.annotation.Resource; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Service; +import org.springframework.web.reactive.function.client.WebClient; + +import java.util.List; + +@Service +@Slf4j +public class PostToClusteringDomainServiceImpl implements PostToClusteringDomainService { + + @Resource(name = "algorithmWebClient") + private WebClient algorithmWebClient; + + @Value("${algo.clusterUrl:/api/algorithm/v1/cluster}") + private String clusterUrl; + + + @Override + public void postToClustering(Long taskId, List kpList) { + // 构建请求 DTO (AlgoRequestDTO) + ClusterQueryDTO request = new ClusterQueryDTO(taskId, kpList); + + try { + log.info(">>> [Interactor] 正在向算法端发送请求, TaskId: {}", taskId); + algorithmWebClient.post() + .uri(clusterUrl) + .bodyValue(request) + .retrieve() + .bodyToMono(ClusterResultDTO.class)// 错误处理 + .doOnError(error -> log.error(">>> [Interactor] 算法端推送异常, TaskId: {}, 原因: {}", + taskId, error.getMessage())) + // 结果处理(非阻塞订阅) + .subscribe(result -> { + if (result != null && result.getSuccess()) { + log.info(">>> [Interactor] 算法端已成功接收任务: {}", taskId); + } else { + log.warn(">>> [Interactor] 算法端拒绝了请求, TaskId: {}, 消息: {}", + taskId, result != null ? result.getMessage() : "无返回消息"); + } + }); + + } catch (Exception e) { + log.error(">>> [Interactor] 算法端通信异常", e); + } + } +} diff --git a/src/main/java/com/project/task/config/TaskAsyncThreadPoolConfig.java b/src/main/java/com/project/task/config/TaskAsyncThreadPoolConfig.java new file mode 100644 index 0000000..9d278e7 --- /dev/null +++ b/src/main/java/com/project/task/config/TaskAsyncThreadPoolConfig.java @@ -0,0 +1,45 @@ +package com.project.task.config; + +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; + +import java.util.concurrent.Executor; +import java.util.concurrent.ThreadPoolExecutor; + +@Configuration + +public class TaskAsyncThreadPoolConfig { + /** + * 1. 任务初始化池 (低频、轮询专用) + */ + @Bean(name = "taskInternalExecutor") + public Executor taskInternalExecutor() { + ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); + executor.setCorePoolSize(1); + executor.setMaxPoolSize(4); + executor.setQueueCapacity(20); + executor.setThreadNamePrefix("task-init-"); + executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); + executor.initialize(); + return executor; + } + + /** + * 2. 题目生成池 (高并发、IO密集型) + * 核心:允许并行处理不同的知识点 + */ + @Bean(name = "questionGenExecutor") + public Executor questionGenExecutor() { + ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); + // 允许 5-10 个知识点同时进行 AI 出题 + executor.setCorePoolSize(5); + executor.setMaxPoolSize(10); + executor.setQueueCapacity(500); + executor.setThreadNamePrefix("question-gen-"); + executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); + executor.initialize(); + return executor; + } + +} diff --git a/src/main/java/com/project/task/domain/service/InitTaskDomainService.java b/src/main/java/com/project/task/domain/service/InitTaskDomainService.java new file mode 100644 index 0000000..a5402b9 --- /dev/null +++ b/src/main/java/com/project/task/domain/service/InitTaskDomainService.java @@ -0,0 +1,8 @@ +package com.project.task.domain.service; + +import java.util.List; + +public interface InitTaskDomainService { + + void asyncInitialize(Long taskId, List relatedDocIds); +} diff --git a/src/main/java/com/project/task/domain/service/impl/InitTaskDomainServiceImpl.java b/src/main/java/com/project/task/domain/service/impl/InitTaskDomainServiceImpl.java new file mode 100644 index 0000000..76e37a3 --- /dev/null +++ b/src/main/java/com/project/task/domain/service/impl/InitTaskDomainServiceImpl.java @@ -0,0 +1,115 @@ +package com.project.task.domain.service.impl; + +import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; +import com.project.information.domain.dto.KnowledgePointDTO; +import com.project.information.domain.entity.InformationEntity; +import com.project.information.domain.entity.KnowledgePointEntity; +import com.project.information.domain.enums.InformationParseStatusEnum; +import com.project.information.domain.service.InformationBaseService; +import com.project.information.domain.service.KnowledgePointBaseService; +import com.project.interactor.application.AlgorithmApplicationService; +import com.project.task.domain.service.InitTaskDomainService; +import jakarta.annotation.Resource; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; +import java.util.stream.Collectors; + +@Service +@Slf4j +public class InitTaskDomainServiceImpl implements InitTaskDomainService { + + @Resource(name = "taskInternalExecutor") + private Executor taskInternalExecutor; + + @Autowired + private InformationBaseService informationService; + + @Autowired + private KnowledgePointBaseService knowledgePointBaseService; + + @Autowired + private AlgorithmApplicationService algorithmApplicationService; + + @Override + public void asyncInitialize(Long taskId, List relatedDocIds) { + CompletableFuture.runAsync(() -> { + try { + log.info(">>> 收到考试任务[{}]初始化请求,开始异步处理...", taskId); + handle(taskId, relatedDocIds); + } catch (Exception e) { + log.error(">>> 考试任务[{}]异步初始化过程发生严重异常", taskId, e); + } + }, taskInternalExecutor); + } + + public void handle(Long taskId, List docIds) { + boolean isReady = pollUntilDocsParsed(taskId, docIds); + + if (!isReady) { + log.error(">>> 任务[{}]初始化中断:关联文档解析超时或存在失败状态", taskId); + // 这里可以更新 TaskEntity 状态为 "初始化失败" + return; + } + log.info(">>> 任务[{}]所有文档解析完成,开始提取知识点...", taskId); + List kpList = knowledgePointBaseService.list(new LambdaQueryWrapper() + .in(KnowledgePointEntity::getInformationId, docIds)) + .stream().map(entity -> entity.toDTO(KnowledgePointDTO::new)) + .collect(Collectors.toList()); + if (kpList.isEmpty()) { + log.warn(">>> 任务[{}]选取的文档中未发现任何知识点,流程终止", taskId); + return; + } + algorithmApplicationService.postToClustering(taskId , kpList); + log.info(">>> [任务初始化] 任务[{}]知识点已成功送往算法端,当前异步线程任务结束", taskId); + } + + /** + * 轮询检查文档解析状态 + */ + private boolean pollUntilDocsParsed(Long taskId, List docIds) { + // 每个考试任务最多重试十次 + int maxRetries = 10; + int retryCount = 0; + + while (retryCount < maxRetries) { + // 资料未解析成功数量 + long unfinishedCount = informationService.count(new LambdaQueryWrapper() + .in(InformationEntity::getId , docIds) + .ne(InformationEntity::getParseStatus , InformationParseStatusEnum.Success.getValue())); + + + if (unfinishedCount == 0) { + return true; // 全部解析完成 + } + + // 检查是否存在解析失败(status=2)的文件,如果有,直接判定为不可用 + long errorCount = informationService.count(new LambdaQueryWrapper() + .in(InformationEntity::getId , docIds) + .eq(InformationEntity::getParseStatus , InformationParseStatusEnum.Fail.getValue())); + if (errorCount > 0) { + log.error(">>> 任务[{}]关联的文档中存在解析失败的文件,无法继续", taskId); + return false; + } + + log.info(">>> 任务[{}]等待文档解析中({}份未完成),第{}次重试,线程将睡眠20秒", + taskId, unfinishedCount, retryCount + 1); + + try { + // 线程等待策略:主动睡眠,让出 CPU + Thread.sleep(20000); + } catch (InterruptedException e) { + log.error(">>> 轮询线程被意外中断", e); + Thread.currentThread().interrupt(); + return false; + } + retryCount++; + } + return false; + } + +}