Browse Source

考试任务调用知识点簇聚类

master
luoweijian 1 month ago
parent
commit
01dc41c925
  1. 5
      pom.xml
  2. 2
      src/main/java/com/project/ding/config/DingSyncThreadPoolConfig.java
  3. 7
      src/main/java/com/project/information/domain/dto/KnowledgePointDTO.java
  4. 3
      src/main/java/com/project/information/domain/entity/InformationEntity.java
  5. 7
      src/main/java/com/project/information/domain/service/KnowledgePointBaseService.java
  6. 11
      src/main/java/com/project/information/domain/service/impl/KnowledgePointBaseServiceImpl.java
  7. 10
      src/main/java/com/project/interactor/application/AlgorithmApplicationService.java
  8. 19
      src/main/java/com/project/interactor/application/impl/AlgorithmApplicationServiceImpl.java
  9. 41
      src/main/java/com/project/interactor/config/WebClientConfig.java
  10. 13
      src/main/java/com/project/interactor/domain/dto/ClusterQueryDTO.java
  11. 9
      src/main/java/com/project/interactor/domain/dto/ClusterResultDTO.java
  12. 9
      src/main/java/com/project/interactor/domain/service/PostToClusteringDomainService.java
  13. 54
      src/main/java/com/project/interactor/domain/service/impl/PostToClusteringDomainServiceImpl.java
  14. 45
      src/main/java/com/project/task/config/TaskAsyncThreadPoolConfig.java
  15. 8
      src/main/java/com/project/task/domain/service/InitTaskDomainService.java
  16. 115
      src/main/java/com/project/task/domain/service/impl/InitTaskDomainServiceImpl.java

5
pom.xml

@ -183,7 +183,10 @@
<groupId>org.apache.commons</groupId> <groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId> <artifactId>commons-pool2</artifactId>
</dependency> </dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<!-- milvus向量数据库 --> <!-- milvus向量数据库 -->
<dependency> <dependency>
<groupId>io.milvus</groupId> <groupId>io.milvus</groupId>

2
src/main/java/com/project/ding/config/SyncThreadPoolConfig.java → src/main/java/com/project/ding/config/DingSyncThreadPoolConfig.java

@ -10,7 +10,7 @@ import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@Configuration @Configuration
public class SyncThreadPoolConfig { public class DingSyncThreadPoolConfig {
@Bean(name = "dingUserSyncExecutor") @Bean(name = "dingUserSyncExecutor")
public ExecutorService dingUserSyncExecutor() { public ExecutorService dingUserSyncExecutor() {
return new ThreadPoolExecutor( return new ThreadPoolExecutor(

7
src/main/java/com/project/information/domain/dto/KnowledgePointDTO.java

@ -0,0 +1,7 @@
package com.project.information.domain.dto;
import lombok.Data;
@Data
public class KnowledgePointDTO {
}

3
src/main/java/com/project/information/domain/entity/InformationEntity.java

@ -5,6 +5,7 @@ import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableId; import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName; import com.baomidou.mybatisplus.annotation.TableName;
import com.project.base.domain.entity.BaseEntity; import com.project.base.domain.entity.BaseEntity;
import com.project.information.domain.enums.InformationParseStatusEnum;
import jakarta.persistence.Column; import jakarta.persistence.Column;
import jakarta.persistence.Entity; import jakarta.persistence.Entity;
import jakarta.persistence.Id; import jakarta.persistence.Id;
@ -46,5 +47,5 @@ public class InformationEntity extends BaseEntity {
@Column(name = "parse_status") @Column(name = "parse_status")
@TableField("parse_status") @TableField("parse_status")
@Comment("文件处理状态") @Comment("文件处理状态")
private Integer parseStatus; private Integer parseStatus = InformationParseStatusEnum.NotStart.getValue();
} }

7
src/main/java/com/project/information/domain/service/KnowledgePointBaseService.java

@ -0,0 +1,7 @@
package com.project.information.domain.service;
import com.baomidou.mybatisplus.extension.service.IService;
import com.project.information.domain.entity.KnowledgePointEntity;
public interface KnowledgePointBaseService extends IService<KnowledgePointEntity> {
}

11
src/main/java/com/project/information/domain/service/impl/KnowledgePointBaseServiceImpl.java

@ -0,0 +1,11 @@
package com.project.information.domain.service.impl;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.project.information.domain.entity.KnowledgePointEntity;
import com.project.information.domain.service.KnowledgePointBaseService;
import com.project.information.mapper.KnowledgePointMapper;
import org.springframework.stereotype.Service;
@Service
public class KnowledgePointBaseServiceImpl extends ServiceImpl<KnowledgePointMapper, KnowledgePointEntity> implements KnowledgePointBaseService {
}

10
src/main/java/com/project/interactor/application/AlgorithmApplicationService.java

@ -0,0 +1,10 @@
package com.project.interactor.application;
import com.project.information.domain.dto.KnowledgePointDTO;
import java.util.List;
public interface AlgorithmApplicationService {
void postToClustering(Long taskId, List<KnowledgePointDTO> kpList);
}

19
src/main/java/com/project/interactor/application/impl/AlgorithmApplicationServiceImpl.java

@ -0,0 +1,19 @@
package com.project.interactor.application.impl;
import com.project.information.domain.dto.KnowledgePointDTO;
import com.project.interactor.application.AlgorithmApplicationService;
import com.project.interactor.domain.service.PostToClusteringDomainService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.List;
@Service
public class AlgorithmApplicationServiceImpl implements AlgorithmApplicationService {
@Autowired
private PostToClusteringDomainService postToClusteringDomainService;
@Override
public void postToClustering(Long taskId, List<KnowledgePointDTO> kpList) {
postToClusteringDomainService.postToClustering(taskId , kpList);
}
}

41
src/main/java/com/project/interactor/config/WebClientConfig.java

@ -0,0 +1,41 @@
package com.project.interactor.config;
import io.netty.channel.ChannelOption;
import io.netty.handler.timeout.ReadTimeoutHandler;
import io.netty.handler.timeout.WriteTimeoutHandler;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.http.client.reactive.ReactorClientHttpConnector;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.netty.http.client.HttpClient;
import java.time.Duration;
import java.util.concurrent.TimeUnit;
@Configuration
public class WebClientConfig {
@Value("${algo.baseUrl}")
private String baseUrl;
@Bean(name = "algorithmWebClient")
public WebClient algorithmWebClient() {
// 配置HTTP客户端,设置长超时以支持流式响应
HttpClient httpClient = HttpClient.create()
.responseTimeout(Duration.ofSeconds(5))
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000)
.doOnConnected(conn -> conn
// 2. 读超时:与全局一致,60秒。
.addHandlerLast(new ReadTimeoutHandler(60, TimeUnit.SECONDS))
// 3. 写超时:30秒。
// 因为你推送的是大量 Word 里的文本知识点,网络传输压力比普通接口大。
.addHandlerLast(new WriteTimeoutHandler(30, TimeUnit.SECONDS))
);
return WebClient.builder()
.baseUrl(baseUrl) // 设置基础URL
.clientConnector(new ReactorClientHttpConnector(httpClient))
.build();
}
}

13
src/main/java/com/project/interactor/domain/dto/ClusterQueryDTO.java

@ -0,0 +1,13 @@
package com.project.interactor.domain.dto;
import com.project.information.domain.dto.KnowledgePointDTO;
import lombok.Data;
import java.util.List;
@Data
public class ClusterQueryDTO {
public ClusterQueryDTO(Long taskId, List<KnowledgePointDTO> kpList) {
}
}

9
src/main/java/com/project/interactor/domain/dto/ClusterResultDTO.java

@ -0,0 +1,9 @@
package com.project.interactor.domain.dto;
import lombok.Data;
@Data
public class ClusterResultDTO {
private Boolean success;
private String message;
}

9
src/main/java/com/project/interactor/domain/service/PostToClusteringDomainService.java

@ -0,0 +1,9 @@
package com.project.interactor.domain.service;
import com.project.information.domain.dto.KnowledgePointDTO;
import java.util.List;
public interface PostToClusteringDomainService {
void postToClustering(Long taskId, List<KnowledgePointDTO> kpList);
}

54
src/main/java/com/project/interactor/domain/service/impl/PostToClusteringDomainServiceImpl.java

@ -0,0 +1,54 @@
package com.project.interactor.domain.service.impl;
import com.project.information.domain.dto.KnowledgePointDTO;
import com.project.interactor.domain.dto.ClusterQueryDTO;
import com.project.interactor.domain.dto.ClusterResultDTO;
import com.project.interactor.domain.service.PostToClusteringDomainService;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import org.springframework.web.reactive.function.client.WebClient;
import java.util.List;
@Service
@Slf4j
public class PostToClusteringDomainServiceImpl implements PostToClusteringDomainService {
@Resource(name = "algorithmWebClient")
private WebClient algorithmWebClient;
@Value("${algo.clusterUrl:/api/algorithm/v1/cluster}")
private String clusterUrl;
@Override
public void postToClustering(Long taskId, List<KnowledgePointDTO> kpList) {
// 构建请求 DTO (AlgoRequestDTO)
ClusterQueryDTO request = new ClusterQueryDTO(taskId, kpList);
try {
log.info(">>> [Interactor] 正在向算法端发送请求, TaskId: {}", taskId);
algorithmWebClient.post()
.uri(clusterUrl)
.bodyValue(request)
.retrieve()
.bodyToMono(ClusterResultDTO.class)// 错误处理
.doOnError(error -> log.error(">>> [Interactor] 算法端推送异常, TaskId: {}, 原因: {}",
taskId, error.getMessage()))
// 结果处理(非阻塞订阅)
.subscribe(result -> {
if (result != null && result.getSuccess()) {
log.info(">>> [Interactor] 算法端已成功接收任务: {}", taskId);
} else {
log.warn(">>> [Interactor] 算法端拒绝了请求, TaskId: {}, 消息: {}",
taskId, result != null ? result.getMessage() : "无返回消息");
}
});
} catch (Exception e) {
log.error(">>> [Interactor] 算法端通信异常", e);
}
}
}

45
src/main/java/com/project/task/config/TaskAsyncThreadPoolConfig.java

@ -0,0 +1,45 @@
package com.project.task.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
@Configuration
public class TaskAsyncThreadPoolConfig {
/**
* 1. 任务初始化池 (低频轮询专用)
*/
@Bean(name = "taskInternalExecutor")
public Executor taskInternalExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(1);
executor.setMaxPoolSize(4);
executor.setQueueCapacity(20);
executor.setThreadNamePrefix("task-init-");
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.initialize();
return executor;
}
/**
* 2. 题目生成池 (高并发IO密集型)
* 核心允许并行处理不同的知识点
*/
@Bean(name = "questionGenExecutor")
public Executor questionGenExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
// 允许 5-10 个知识点同时进行 AI 出题
executor.setCorePoolSize(5);
executor.setMaxPoolSize(10);
executor.setQueueCapacity(500);
executor.setThreadNamePrefix("question-gen-");
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.initialize();
return executor;
}
}

8
src/main/java/com/project/task/domain/service/InitTaskDomainService.java

@ -0,0 +1,8 @@
package com.project.task.domain.service;
import java.util.List;
public interface InitTaskDomainService {
void asyncInitialize(Long taskId, List<Long> relatedDocIds);
}

115
src/main/java/com/project/task/domain/service/impl/InitTaskDomainServiceImpl.java

@ -0,0 +1,115 @@
package com.project.task.domain.service.impl;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.project.information.domain.dto.KnowledgePointDTO;
import com.project.information.domain.entity.InformationEntity;
import com.project.information.domain.entity.KnowledgePointEntity;
import com.project.information.domain.enums.InformationParseStatusEnum;
import com.project.information.domain.service.InformationBaseService;
import com.project.information.domain.service.KnowledgePointBaseService;
import com.project.interactor.application.AlgorithmApplicationService;
import com.project.task.domain.service.InitTaskDomainService;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.stream.Collectors;
@Service
@Slf4j
public class InitTaskDomainServiceImpl implements InitTaskDomainService {
@Resource(name = "taskInternalExecutor")
private Executor taskInternalExecutor;
@Autowired
private InformationBaseService informationService;
@Autowired
private KnowledgePointBaseService knowledgePointBaseService;
@Autowired
private AlgorithmApplicationService algorithmApplicationService;
@Override
public void asyncInitialize(Long taskId, List<Long> relatedDocIds) {
CompletableFuture.runAsync(() -> {
try {
log.info(">>> 收到考试任务[{}]初始化请求,开始异步处理...", taskId);
handle(taskId, relatedDocIds);
} catch (Exception e) {
log.error(">>> 考试任务[{}]异步初始化过程发生严重异常", taskId, e);
}
}, taskInternalExecutor);
}
public void handle(Long taskId, List<Long> docIds) {
boolean isReady = pollUntilDocsParsed(taskId, docIds);
if (!isReady) {
log.error(">>> 任务[{}]初始化中断:关联文档解析超时或存在失败状态", taskId);
// 这里可以更新 TaskEntity 状态为 "初始化失败"
return;
}
log.info(">>> 任务[{}]所有文档解析完成,开始提取知识点...", taskId);
List<KnowledgePointDTO> kpList = knowledgePointBaseService.list(new LambdaQueryWrapper<KnowledgePointEntity>()
.in(KnowledgePointEntity::getInformationId, docIds))
.stream().map(entity -> entity.toDTO(KnowledgePointDTO::new))
.collect(Collectors.toList());
if (kpList.isEmpty()) {
log.warn(">>> 任务[{}]选取的文档中未发现任何知识点,流程终止", taskId);
return;
}
algorithmApplicationService.postToClustering(taskId , kpList);
log.info(">>> [任务初始化] 任务[{}]知识点已成功送往算法端,当前异步线程任务结束", taskId);
}
/**
* 轮询检查文档解析状态
*/
private boolean pollUntilDocsParsed(Long taskId, List<Long> docIds) {
// 每个考试任务最多重试十次
int maxRetries = 10;
int retryCount = 0;
while (retryCount < maxRetries) {
// 资料未解析成功数量
long unfinishedCount = informationService.count(new LambdaQueryWrapper<InformationEntity>()
.in(InformationEntity::getId , docIds)
.ne(InformationEntity::getParseStatus , InformationParseStatusEnum.Success.getValue()));
if (unfinishedCount == 0) {
return true; // 全部解析完成
}
// 检查是否存在解析失败(status=2)的文件,如果有,直接判定为不可用
long errorCount = informationService.count(new LambdaQueryWrapper<InformationEntity>()
.in(InformationEntity::getId , docIds)
.eq(InformationEntity::getParseStatus , InformationParseStatusEnum.Fail.getValue()));
if (errorCount > 0) {
log.error(">>> 任务[{}]关联的文档中存在解析失败的文件,无法继续", taskId);
return false;
}
log.info(">>> 任务[{}]等待文档解析中({}份未完成),第{}次重试,线程将睡眠20秒",
taskId, unfinishedCount, retryCount + 1);
try {
// 线程等待策略:主动睡眠,让出 CPU
Thread.sleep(20000);
} catch (InterruptedException e) {
log.error(">>> 轮询线程被意外中断", e);
Thread.currentThread().interrupt();
return false;
}
retryCount++;
}
return false;
}
}
Loading…
Cancel
Save