diff --git a/pom.xml b/pom.xml index 32b9b26..37468fc 100644 --- a/pom.xml +++ b/pom.xml @@ -183,7 +183,10 @@ org.apache.commons commons-pool2 - + + org.springframework.boot + spring-boot-starter-webflux + io.milvus diff --git a/src/main/java/com/project/ding/config/SyncThreadPoolConfig.java b/src/main/java/com/project/ding/config/DingSyncThreadPoolConfig.java similarity index 95% rename from src/main/java/com/project/ding/config/SyncThreadPoolConfig.java rename to src/main/java/com/project/ding/config/DingSyncThreadPoolConfig.java index b10c9ba..f430fda 100644 --- a/src/main/java/com/project/ding/config/SyncThreadPoolConfig.java +++ b/src/main/java/com/project/ding/config/DingSyncThreadPoolConfig.java @@ -10,7 +10,7 @@ import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @Configuration -public class SyncThreadPoolConfig { +public class DingSyncThreadPoolConfig { @Bean(name = "dingUserSyncExecutor") public ExecutorService dingUserSyncExecutor() { return new ThreadPoolExecutor( diff --git a/src/main/java/com/project/information/domain/dto/KnowledgePointDTO.java b/src/main/java/com/project/information/domain/dto/KnowledgePointDTO.java new file mode 100644 index 0000000..9f20f5b --- /dev/null +++ b/src/main/java/com/project/information/domain/dto/KnowledgePointDTO.java @@ -0,0 +1,7 @@ +package com.project.information.domain.dto; + +import lombok.Data; + +@Data +public class KnowledgePointDTO { +} diff --git a/src/main/java/com/project/information/domain/entity/InformationEntity.java b/src/main/java/com/project/information/domain/entity/InformationEntity.java index f95571a..0bf0771 100644 --- a/src/main/java/com/project/information/domain/entity/InformationEntity.java +++ b/src/main/java/com/project/information/domain/entity/InformationEntity.java @@ -5,6 +5,7 @@ import com.baomidou.mybatisplus.annotation.TableField; import com.baomidou.mybatisplus.annotation.TableId; import com.baomidou.mybatisplus.annotation.TableName; import com.project.base.domain.entity.BaseEntity; +import com.project.information.domain.enums.InformationParseStatusEnum; import jakarta.persistence.Column; import jakarta.persistence.Entity; import jakarta.persistence.Id; @@ -46,5 +47,5 @@ public class InformationEntity extends BaseEntity { @Column(name = "parse_status") @TableField("parse_status") @Comment("文件处理状态") - private Integer parseStatus; + private Integer parseStatus = InformationParseStatusEnum.NotStart.getValue(); } diff --git a/src/main/java/com/project/information/domain/service/KnowledgePointBaseService.java b/src/main/java/com/project/information/domain/service/KnowledgePointBaseService.java new file mode 100644 index 0000000..4d2472e --- /dev/null +++ b/src/main/java/com/project/information/domain/service/KnowledgePointBaseService.java @@ -0,0 +1,7 @@ +package com.project.information.domain.service; + +import com.baomidou.mybatisplus.extension.service.IService; +import com.project.information.domain.entity.KnowledgePointEntity; + +public interface KnowledgePointBaseService extends IService { +} diff --git a/src/main/java/com/project/information/domain/service/impl/KnowledgePointBaseServiceImpl.java b/src/main/java/com/project/information/domain/service/impl/KnowledgePointBaseServiceImpl.java new file mode 100644 index 0000000..556c22e --- /dev/null +++ b/src/main/java/com/project/information/domain/service/impl/KnowledgePointBaseServiceImpl.java @@ -0,0 +1,11 @@ +package com.project.information.domain.service.impl; + +import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; +import com.project.information.domain.entity.KnowledgePointEntity; +import com.project.information.domain.service.KnowledgePointBaseService; +import com.project.information.mapper.KnowledgePointMapper; +import org.springframework.stereotype.Service; + +@Service +public class KnowledgePointBaseServiceImpl extends ServiceImpl implements KnowledgePointBaseService { +} diff --git a/src/main/java/com/project/interactor/application/AlgorithmApplicationService.java b/src/main/java/com/project/interactor/application/AlgorithmApplicationService.java new file mode 100644 index 0000000..e0f5637 --- /dev/null +++ b/src/main/java/com/project/interactor/application/AlgorithmApplicationService.java @@ -0,0 +1,10 @@ +package com.project.interactor.application; + +import com.project.information.domain.dto.KnowledgePointDTO; + +import java.util.List; + +public interface AlgorithmApplicationService { + + void postToClustering(Long taskId, List kpList); +} diff --git a/src/main/java/com/project/interactor/application/impl/AlgorithmApplicationServiceImpl.java b/src/main/java/com/project/interactor/application/impl/AlgorithmApplicationServiceImpl.java new file mode 100644 index 0000000..9d3978a --- /dev/null +++ b/src/main/java/com/project/interactor/application/impl/AlgorithmApplicationServiceImpl.java @@ -0,0 +1,19 @@ +package com.project.interactor.application.impl; + +import com.project.information.domain.dto.KnowledgePointDTO; +import com.project.interactor.application.AlgorithmApplicationService; +import com.project.interactor.domain.service.PostToClusteringDomainService; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +import java.util.List; + +@Service +public class AlgorithmApplicationServiceImpl implements AlgorithmApplicationService { + @Autowired + private PostToClusteringDomainService postToClusteringDomainService; + @Override + public void postToClustering(Long taskId, List kpList) { + postToClusteringDomainService.postToClustering(taskId , kpList); + } +} diff --git a/src/main/java/com/project/interactor/config/WebClientConfig.java b/src/main/java/com/project/interactor/config/WebClientConfig.java new file mode 100644 index 0000000..553275a --- /dev/null +++ b/src/main/java/com/project/interactor/config/WebClientConfig.java @@ -0,0 +1,41 @@ +package com.project.interactor.config; + +import io.netty.channel.ChannelOption; +import io.netty.handler.timeout.ReadTimeoutHandler; +import io.netty.handler.timeout.WriteTimeoutHandler; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.http.client.reactive.ReactorClientHttpConnector; +import org.springframework.web.reactive.function.client.WebClient; +import reactor.netty.http.client.HttpClient; + +import java.time.Duration; +import java.util.concurrent.TimeUnit; + +@Configuration +public class WebClientConfig { + @Value("${algo.baseUrl}") + private String baseUrl; + + @Bean(name = "algorithmWebClient") + public WebClient algorithmWebClient() { + // 配置HTTP客户端,设置长超时以支持流式响应 + HttpClient httpClient = HttpClient.create() + .responseTimeout(Duration.ofSeconds(5)) + .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000) + .doOnConnected(conn -> conn + // 2. 读超时:与全局一致,60秒。 + .addHandlerLast(new ReadTimeoutHandler(60, TimeUnit.SECONDS)) + // 3. 写超时:30秒。 + // 因为你推送的是大量 Word 里的文本知识点,网络传输压力比普通接口大。 + .addHandlerLast(new WriteTimeoutHandler(30, TimeUnit.SECONDS)) + ); + + return WebClient.builder() + .baseUrl(baseUrl) // 设置基础URL + .clientConnector(new ReactorClientHttpConnector(httpClient)) + .build(); + } + +} diff --git a/src/main/java/com/project/interactor/domain/dto/ClusterQueryDTO.java b/src/main/java/com/project/interactor/domain/dto/ClusterQueryDTO.java new file mode 100644 index 0000000..176de9f --- /dev/null +++ b/src/main/java/com/project/interactor/domain/dto/ClusterQueryDTO.java @@ -0,0 +1,13 @@ +package com.project.interactor.domain.dto; + + +import com.project.information.domain.dto.KnowledgePointDTO; +import lombok.Data; + +import java.util.List; + +@Data +public class ClusterQueryDTO { + public ClusterQueryDTO(Long taskId, List kpList) { + } +} diff --git a/src/main/java/com/project/interactor/domain/dto/ClusterResultDTO.java b/src/main/java/com/project/interactor/domain/dto/ClusterResultDTO.java new file mode 100644 index 0000000..db351a7 --- /dev/null +++ b/src/main/java/com/project/interactor/domain/dto/ClusterResultDTO.java @@ -0,0 +1,9 @@ +package com.project.interactor.domain.dto; + +import lombok.Data; + +@Data +public class ClusterResultDTO { + private Boolean success; + private String message; +} diff --git a/src/main/java/com/project/interactor/domain/service/PostToClusteringDomainService.java b/src/main/java/com/project/interactor/domain/service/PostToClusteringDomainService.java new file mode 100644 index 0000000..bd777fb --- /dev/null +++ b/src/main/java/com/project/interactor/domain/service/PostToClusteringDomainService.java @@ -0,0 +1,9 @@ +package com.project.interactor.domain.service; + +import com.project.information.domain.dto.KnowledgePointDTO; + +import java.util.List; + +public interface PostToClusteringDomainService { + void postToClustering(Long taskId, List kpList); +} diff --git a/src/main/java/com/project/interactor/domain/service/impl/PostToClusteringDomainServiceImpl.java b/src/main/java/com/project/interactor/domain/service/impl/PostToClusteringDomainServiceImpl.java new file mode 100644 index 0000000..d9cd40c --- /dev/null +++ b/src/main/java/com/project/interactor/domain/service/impl/PostToClusteringDomainServiceImpl.java @@ -0,0 +1,54 @@ +package com.project.interactor.domain.service.impl; + +import com.project.information.domain.dto.KnowledgePointDTO; +import com.project.interactor.domain.dto.ClusterQueryDTO; +import com.project.interactor.domain.dto.ClusterResultDTO; +import com.project.interactor.domain.service.PostToClusteringDomainService; +import jakarta.annotation.Resource; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Service; +import org.springframework.web.reactive.function.client.WebClient; + +import java.util.List; + +@Service +@Slf4j +public class PostToClusteringDomainServiceImpl implements PostToClusteringDomainService { + + @Resource(name = "algorithmWebClient") + private WebClient algorithmWebClient; + + @Value("${algo.clusterUrl:/api/algorithm/v1/cluster}") + private String clusterUrl; + + + @Override + public void postToClustering(Long taskId, List kpList) { + // 构建请求 DTO (AlgoRequestDTO) + ClusterQueryDTO request = new ClusterQueryDTO(taskId, kpList); + + try { + log.info(">>> [Interactor] 正在向算法端发送请求, TaskId: {}", taskId); + algorithmWebClient.post() + .uri(clusterUrl) + .bodyValue(request) + .retrieve() + .bodyToMono(ClusterResultDTO.class)// 错误处理 + .doOnError(error -> log.error(">>> [Interactor] 算法端推送异常, TaskId: {}, 原因: {}", + taskId, error.getMessage())) + // 结果处理(非阻塞订阅) + .subscribe(result -> { + if (result != null && result.getSuccess()) { + log.info(">>> [Interactor] 算法端已成功接收任务: {}", taskId); + } else { + log.warn(">>> [Interactor] 算法端拒绝了请求, TaskId: {}, 消息: {}", + taskId, result != null ? result.getMessage() : "无返回消息"); + } + }); + + } catch (Exception e) { + log.error(">>> [Interactor] 算法端通信异常", e); + } + } +} diff --git a/src/main/java/com/project/question/domain/entity/QuestionAnswerEntity.java b/src/main/java/com/project/question/domain/entity/QuestionAnswerEntity.java new file mode 100644 index 0000000..2298ad3 --- /dev/null +++ b/src/main/java/com/project/question/domain/entity/QuestionAnswerEntity.java @@ -0,0 +1,54 @@ +package com.project.question.domain.entity; + +import com.baomidou.mybatisplus.annotation.IdType; +import com.baomidou.mybatisplus.annotation.TableField; +import com.baomidou.mybatisplus.annotation.TableId; +import com.baomidou.mybatisplus.annotation.TableName; +import com.project.base.domain.entity.BaseEntity; +import jakarta.persistence.Column; +import jakarta.persistence.Entity; +import jakarta.persistence.Id; +import jakarta.persistence.Table; +import lombok.Data; +import lombok.EqualsAndHashCode; +import org.hibernate.annotations.Comment; + +@Data +@Table(name = "evaluator_question_answer" ) +@Entity +@TableName(value = "evaluator_question_answer", autoResultMap = true) +@EqualsAndHashCode(callSuper = true) +public class QuestionAnswerEntity extends BaseEntity { + @TableId(type = IdType.ASSIGN_ID) + @Id + private Long id; + + @Column(name = "exam_record_id") + @TableField("exam_record_id") + @Comment("所属考试记录ID") + private Long examRecordId; + + @Column(name = "user_id") + @TableField("user_id") + @Comment("所属用户ID") + private Long userId; + + @Column(name = "question_id") + @TableField("question_id") + @Comment("问题ID") + private Long questionId; + + @Column(name = "user_answer" , columnDefinition="varchar(255) comment '用户回答答案'") + @TableField("user_answer") + private String userAnswer; + + @Column(name = "is_right") + @TableField("is_right") + @Comment("是否正确") + private Boolean isRight; + + @Column(name = "score") + @TableField("score") + @Comment("得分") + private Double score; +} diff --git a/src/main/java/com/project/question/domain/entity/QuestionEntity.java b/src/main/java/com/project/question/domain/entity/QuestionEntity.java new file mode 100644 index 0000000..3f482fb --- /dev/null +++ b/src/main/java/com/project/question/domain/entity/QuestionEntity.java @@ -0,0 +1,73 @@ +package com.project.question.domain.entity; + +import com.baomidou.mybatisplus.annotation.IdType; +import com.baomidou.mybatisplus.annotation.TableField; +import com.baomidou.mybatisplus.annotation.TableId; +import com.baomidou.mybatisplus.annotation.TableName; +import com.baomidou.mybatisplus.extension.handlers.JacksonTypeHandler; +import com.project.base.domain.entity.BaseEntity; +import com.project.question.domain.enums.QuestionUseStatusEnum; +import jakarta.persistence.Column; +import jakarta.persistence.Entity; +import jakarta.persistence.Id; +import jakarta.persistence.Table; +import lombok.Data; +import lombok.EqualsAndHashCode; +import org.hibernate.annotations.Comment; +import org.hibernate.annotations.JdbcTypeCode; +import org.hibernate.type.SqlTypes; + +import java.util.List; +import java.util.Map; + + +@Data +@Table(name = "evaluator_question" ) +@Entity +@TableName(value = "evaluator_question", autoResultMap = true) +@EqualsAndHashCode(callSuper = true) +public class QuestionEntity extends BaseEntity { + @TableId(type = IdType.ASSIGN_ID) + @Id + private Long id; + + @Comment("来源:0-单KP出题, 1-簇出题") + @Column(name = "source_type") + @TableField("source_type") + private Integer sourceType; + + + @Comment("是否已被使用:0-未用, 1-已用") + @Column(name = "use_status") + @TableField("use_status") + private Boolean useStatus = QuestionUseStatusEnum.Not_Use.getValue(); + + @TableField(value = "kp_id_list" , typeHandler = JacksonTypeHandler.class) + @JdbcTypeCode(SqlTypes.JSON) + @Column(name = "kp_id_list", columnDefinition = "json comment '覆盖知识点ID列表'") + private List kpIdList; + + + + @Column(name = "unique_hash" , unique = true, length = 64) + @TableField("unique_hash") + private String uniqueHash; + + @Column(name = "question_type", columnDefinition = "comment '题目类型'") + @TableField("question_type") + private Integer questionType; + + @TableField(value = "question_detail" , typeHandler = JacksonTypeHandler.class) + @JdbcTypeCode(SqlTypes.JSON) + @Column(name = "question_detail", columnDefinition = "json comment '题目内容'") + private QuestionDetail questionDetail; + + @Data + public static class QuestionDetail { + private String questionContent; // 题干 + private Integer type; // 题目类型 + private Map options; // 选项 + private String rightAnswer; // 正确选项 + private String analysis; // AI解析 + } +} diff --git a/src/main/java/com/project/question/domain/enums/QuestionSourceTypeEnum.java b/src/main/java/com/project/question/domain/enums/QuestionSourceTypeEnum.java new file mode 100644 index 0000000..d07a9c7 --- /dev/null +++ b/src/main/java/com/project/question/domain/enums/QuestionSourceTypeEnum.java @@ -0,0 +1,15 @@ +package com.project.question.domain.enums; + +import com.project.base.domain.enums.HasValueEnum; +import lombok.Getter; +import lombok.RequiredArgsConstructor; + +@RequiredArgsConstructor +@Getter +public enum QuestionSourceTypeEnum implements HasValueEnum { + + Single_Concept(0 , "单一知识点") , + Multi_Concept(1 , "多知识点/复合"); + private final Integer value; + private final String desc; +} diff --git a/src/main/java/com/project/question/domain/enums/QuestionUseStatusEnum.java b/src/main/java/com/project/question/domain/enums/QuestionUseStatusEnum.java new file mode 100644 index 0000000..36d6c34 --- /dev/null +++ b/src/main/java/com/project/question/domain/enums/QuestionUseStatusEnum.java @@ -0,0 +1,13 @@ +package com.project.question.domain.enums; + +import com.project.base.domain.enums.HasValueEnum; +import lombok.Getter; +import lombok.RequiredArgsConstructor; + +@RequiredArgsConstructor +@Getter +public enum QuestionUseStatusEnum implements HasValueEnum { + Not_Use(Boolean.FALSE) , + Used(Boolean.TRUE); + private final Boolean value; +} diff --git a/src/main/java/com/project/task/config/TaskAsyncThreadPoolConfig.java b/src/main/java/com/project/task/config/TaskAsyncThreadPoolConfig.java new file mode 100644 index 0000000..9d278e7 --- /dev/null +++ b/src/main/java/com/project/task/config/TaskAsyncThreadPoolConfig.java @@ -0,0 +1,45 @@ +package com.project.task.config; + +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; + +import java.util.concurrent.Executor; +import java.util.concurrent.ThreadPoolExecutor; + +@Configuration + +public class TaskAsyncThreadPoolConfig { + /** + * 1. 任务初始化池 (低频、轮询专用) + */ + @Bean(name = "taskInternalExecutor") + public Executor taskInternalExecutor() { + ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); + executor.setCorePoolSize(1); + executor.setMaxPoolSize(4); + executor.setQueueCapacity(20); + executor.setThreadNamePrefix("task-init-"); + executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); + executor.initialize(); + return executor; + } + + /** + * 2. 题目生成池 (高并发、IO密集型) + * 核心:允许并行处理不同的知识点 + */ + @Bean(name = "questionGenExecutor") + public Executor questionGenExecutor() { + ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); + // 允许 5-10 个知识点同时进行 AI 出题 + executor.setCorePoolSize(5); + executor.setMaxPoolSize(10); + executor.setQueueCapacity(500); + executor.setThreadNamePrefix("question-gen-"); + executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); + executor.initialize(); + return executor; + } + +} diff --git a/src/main/java/com/project/task/domain/entity/TaskKnowledgeClusterEntity.java b/src/main/java/com/project/task/domain/entity/TaskKnowledgeClusterEntity.java new file mode 100644 index 0000000..5aeefb4 --- /dev/null +++ b/src/main/java/com/project/task/domain/entity/TaskKnowledgeClusterEntity.java @@ -0,0 +1,41 @@ +package com.project.task.domain.entity; + +import com.baomidou.mybatisplus.annotation.IdType; +import com.baomidou.mybatisplus.annotation.TableField; +import com.baomidou.mybatisplus.annotation.TableId; +import com.baomidou.mybatisplus.annotation.TableName; +import com.project.base.domain.entity.BaseEntity; +import jakarta.persistence.Column; +import jakarta.persistence.Entity; +import jakarta.persistence.Id; +import jakarta.persistence.Table; +import lombok.Data; +import lombok.EqualsAndHashCode; +import org.hibernate.annotations.Comment; + +@Data +@Table(name = "evaluator_task_knowledge_cluster" ) +@Entity +@TableName(value = "evaluator_task_knowledge_cluster", autoResultMap = true) +@EqualsAndHashCode(callSuper = true) +public class TaskKnowledgeClusterEntity extends BaseEntity { + @TableId(value = "id" , type = IdType.ASSIGN_ID) + @Id + private Long id; + + @Column(name = "task_id") + @TableField("task_id") + @Comment("所属考试任务ID") + private Long taskId; + + @Column(name = "cluster_topic", length = 512) + @Comment("簇主题/标签(由AI聚类后总结提取)") + @TableField("cluster_topic") + + private String clusterTopic; + + @Column(name = "cluster_size") + @TableField("cluster_size") + @Comment("簇大小(W):该簇包含的知识点个数") + private Integer clusterSize; +} diff --git a/src/main/java/com/project/task/domain/entity/TaskKnowledgePointEntity.java b/src/main/java/com/project/task/domain/entity/TaskKnowledgePointEntity.java new file mode 100644 index 0000000..9dbb7c5 --- /dev/null +++ b/src/main/java/com/project/task/domain/entity/TaskKnowledgePointEntity.java @@ -0,0 +1,48 @@ +package com.project.task.domain.entity; + +import com.baomidou.mybatisplus.annotation.IdType; +import com.baomidou.mybatisplus.annotation.TableField; +import com.baomidou.mybatisplus.annotation.TableId; +import com.baomidou.mybatisplus.annotation.TableName; +import com.project.base.domain.entity.BaseEntity; +import jakarta.persistence.Column; +import jakarta.persistence.Entity; +import jakarta.persistence.Id; +import jakarta.persistence.Table; +import lombok.Data; +import lombok.EqualsAndHashCode; +import org.hibernate.annotations.Comment; + +@Data +@Table(name = "evaluator_task_knowledge_point" ) +@Entity +@TableName(value = "evaluator_task_knowledge_point", autoResultMap = true) +@EqualsAndHashCode(callSuper = true) +public class TaskKnowledgePointEntity extends BaseEntity { + @TableId(type = IdType.ASSIGN_ID) + @Id + private Long id; + + @Column(name = "task_id") + @TableField("task_id") + @Comment("所属考试任务ID") + private Long taskId; + + @Column(name = "cluster_id") + @TableField("cluster_id") + + @Comment("所属簇ID(关联 evaluator_task_knowledge_cluster)") + private Long clusterId; + + @Column(name = "atom_id") + @TableField("atom_id") + + @Comment("关联原始原子知识点ID(对应原始资料解析出的ID)") + private Long atomId; + + @Column(name = "content", columnDefinition = "TEXT") + @Comment("知识点原文文本") + private String content; + + +} diff --git a/src/main/java/com/project/task/domain/enums/TaskStatusEnum.java b/src/main/java/com/project/task/domain/enums/TaskStatusEnum.java index bfba517..d603afd 100644 --- a/src/main/java/com/project/task/domain/enums/TaskStatusEnum.java +++ b/src/main/java/com/project/task/domain/enums/TaskStatusEnum.java @@ -4,12 +4,12 @@ import com.project.base.domain.enums.HasValueEnum; import lombok.Getter; import lombok.RequiredArgsConstructor; +@Getter @RequiredArgsConstructor public enum TaskStatusEnum implements HasValueEnum { Not_Start("未开考") , In_Progress("已开考") , Cut_Off("已截止"); - @Getter private final String value; } diff --git a/src/main/java/com/project/task/domain/service/InitTaskDomainService.java b/src/main/java/com/project/task/domain/service/InitTaskDomainService.java new file mode 100644 index 0000000..a5402b9 --- /dev/null +++ b/src/main/java/com/project/task/domain/service/InitTaskDomainService.java @@ -0,0 +1,8 @@ +package com.project.task.domain.service; + +import java.util.List; + +public interface InitTaskDomainService { + + void asyncInitialize(Long taskId, List relatedDocIds); +} diff --git a/src/main/java/com/project/task/domain/service/impl/InitTaskDomainServiceImpl.java b/src/main/java/com/project/task/domain/service/impl/InitTaskDomainServiceImpl.java new file mode 100644 index 0000000..76e37a3 --- /dev/null +++ b/src/main/java/com/project/task/domain/service/impl/InitTaskDomainServiceImpl.java @@ -0,0 +1,115 @@ +package com.project.task.domain.service.impl; + +import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; +import com.project.information.domain.dto.KnowledgePointDTO; +import com.project.information.domain.entity.InformationEntity; +import com.project.information.domain.entity.KnowledgePointEntity; +import com.project.information.domain.enums.InformationParseStatusEnum; +import com.project.information.domain.service.InformationBaseService; +import com.project.information.domain.service.KnowledgePointBaseService; +import com.project.interactor.application.AlgorithmApplicationService; +import com.project.task.domain.service.InitTaskDomainService; +import jakarta.annotation.Resource; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; +import java.util.stream.Collectors; + +@Service +@Slf4j +public class InitTaskDomainServiceImpl implements InitTaskDomainService { + + @Resource(name = "taskInternalExecutor") + private Executor taskInternalExecutor; + + @Autowired + private InformationBaseService informationService; + + @Autowired + private KnowledgePointBaseService knowledgePointBaseService; + + @Autowired + private AlgorithmApplicationService algorithmApplicationService; + + @Override + public void asyncInitialize(Long taskId, List relatedDocIds) { + CompletableFuture.runAsync(() -> { + try { + log.info(">>> 收到考试任务[{}]初始化请求,开始异步处理...", taskId); + handle(taskId, relatedDocIds); + } catch (Exception e) { + log.error(">>> 考试任务[{}]异步初始化过程发生严重异常", taskId, e); + } + }, taskInternalExecutor); + } + + public void handle(Long taskId, List docIds) { + boolean isReady = pollUntilDocsParsed(taskId, docIds); + + if (!isReady) { + log.error(">>> 任务[{}]初始化中断:关联文档解析超时或存在失败状态", taskId); + // 这里可以更新 TaskEntity 状态为 "初始化失败" + return; + } + log.info(">>> 任务[{}]所有文档解析完成,开始提取知识点...", taskId); + List kpList = knowledgePointBaseService.list(new LambdaQueryWrapper() + .in(KnowledgePointEntity::getInformationId, docIds)) + .stream().map(entity -> entity.toDTO(KnowledgePointDTO::new)) + .collect(Collectors.toList()); + if (kpList.isEmpty()) { + log.warn(">>> 任务[{}]选取的文档中未发现任何知识点,流程终止", taskId); + return; + } + algorithmApplicationService.postToClustering(taskId , kpList); + log.info(">>> [任务初始化] 任务[{}]知识点已成功送往算法端,当前异步线程任务结束", taskId); + } + + /** + * 轮询检查文档解析状态 + */ + private boolean pollUntilDocsParsed(Long taskId, List docIds) { + // 每个考试任务最多重试十次 + int maxRetries = 10; + int retryCount = 0; + + while (retryCount < maxRetries) { + // 资料未解析成功数量 + long unfinishedCount = informationService.count(new LambdaQueryWrapper() + .in(InformationEntity::getId , docIds) + .ne(InformationEntity::getParseStatus , InformationParseStatusEnum.Success.getValue())); + + + if (unfinishedCount == 0) { + return true; // 全部解析完成 + } + + // 检查是否存在解析失败(status=2)的文件,如果有,直接判定为不可用 + long errorCount = informationService.count(new LambdaQueryWrapper() + .in(InformationEntity::getId , docIds) + .eq(InformationEntity::getParseStatus , InformationParseStatusEnum.Fail.getValue())); + if (errorCount > 0) { + log.error(">>> 任务[{}]关联的文档中存在解析失败的文件,无法继续", taskId); + return false; + } + + log.info(">>> 任务[{}]等待文档解析中({}份未完成),第{}次重试,线程将睡眠20秒", + taskId, unfinishedCount, retryCount + 1); + + try { + // 线程等待策略:主动睡眠,让出 CPU + Thread.sleep(20000); + } catch (InterruptedException e) { + log.error(">>> 轮询线程被意外中断", e); + Thread.currentThread().interrupt(); + return false; + } + retryCount++; + } + return false; + } + +} diff --git a/src/main/java/com/project/task/mapper/TaskKnowledgeClusterMapper.java b/src/main/java/com/project/task/mapper/TaskKnowledgeClusterMapper.java new file mode 100644 index 0000000..0419de4 --- /dev/null +++ b/src/main/java/com/project/task/mapper/TaskKnowledgeClusterMapper.java @@ -0,0 +1,10 @@ +package com.project.task.mapper; + +import com.project.task.domain.entity.TaskKnowledgeClusterEntity; +import org.apache.ibatis.annotations.Mapper; +import com.baomidou.mybatisplus.core.mapper.BaseMapper; + + +@Mapper +public interface TaskKnowledgeClusterMapper extends BaseMapper{ +} diff --git a/src/main/java/com/project/task/mapper/TaskKnowledgePointMapper.java b/src/main/java/com/project/task/mapper/TaskKnowledgePointMapper.java new file mode 100644 index 0000000..9b6a218 --- /dev/null +++ b/src/main/java/com/project/task/mapper/TaskKnowledgePointMapper.java @@ -0,0 +1,9 @@ +package com.project.task.mapper; + +import com.baomidou.mybatisplus.core.mapper.BaseMapper; +import com.project.task.domain.entity.TaskKnowledgePointEntity; +import org.apache.ibatis.annotations.Mapper; + +@Mapper +public interface TaskKnowledgePointMapper extends BaseMapper { +}