Browse Source

增加队列

master
luogw 4 weeks ago
parent
commit
bd251b90ee
  1. 5
      src/main/java/com/project/exam/domain/service/impl/AssemblePaperDomainServiceImpl.java
  2. 12
      src/main/java/com/project/exam/domain/service/impl/SubmitPaperDomainServiceImpl.java
  3. 49
      src/main/java/com/project/interaction/application/impl/GenerateQuestionQueueSchedulerService.java
  4. 11
      src/main/java/com/project/interaction/controller/InteractionController.java
  5. 48
      src/main/java/com/project/interaction/domain/dto/GenerateQuestionQueueDTO.java
  6. 50
      src/main/java/com/project/interaction/domain/service/GenerateQuestionQueueService.java
  7. 10
      src/main/java/com/project/interaction/domain/service/PostToGenerateQuestionDomainService.java
  8. 181
      src/main/java/com/project/interaction/domain/service/impl/GenerateQuestionQueueServiceImpl.java
  9. 208
      src/main/java/com/project/interaction/domain/service/impl/PostToGenerateQuestionDomainServiceImpl.java
  10. 5
      src/main/java/com/project/task/domain/service/impl/DeleteTaskDomainServiceImpl.java
  11. 6
      src/main/resources/application-dev.yml
  12. 10
      src/main/resources/application-test.yml

5
src/main/java/com/project/exam/domain/service/impl/AssemblePaperDomainServiceImpl.java

@ -267,7 +267,10 @@ public class AssemblePaperDomainServiceImpl implements AssemblePaperDomainServic
int kp1 = CollUtil.count(q.getKpIdList(), k -> !coveredKpPool.contains(k)); int kp1 = CollUtil.count(q.getKpIdList(), k -> !coveredKpPool.contains(k));
int kp2 = q.getKpIdList().size() - kp1; int kp2 = q.getKpIdList().size() - kp1;
return kp1 - kp2; return kp1 - kp2;
})).map(entity -> entity.toDTO(QuestionDTO::new)) })).map(entity ->{
coveredKpPool.addAll(entity.getKpIdList());
return entity.toDTO(QuestionDTO::new);
})
.orElse(null); .orElse(null);
// 选题成功 // 选题成功
if (Objects.nonNull(bestQuestionDTO)) { if (Objects.nonNull(bestQuestionDTO)) {

12
src/main/java/com/project/exam/domain/service/impl/SubmitPaperDomainServiceImpl.java

@ -58,18 +58,26 @@ public class SubmitPaperDomainServiceImpl implements SubmitPaperDomainService {
// 判分 // 判分
double finalScore = 0.0; double finalScore = 0.0;
List<ExamRecordEntity.QuestionSnapshot> snapshotList = record.getAnswerSnapshot(); List<ExamRecordEntity.QuestionSnapshot> snapshotList = record.getAnswerSnapshot();
//记录对的题目
int rightCount = 0;
for (ExamRecordEntity.QuestionSnapshot snapshot : snapshotList) { for (ExamRecordEntity.QuestionSnapshot snapshot : snapshotList) {
// 比对答案 // 比对答案
if (StrUtil.equals(formatAnswer(snapshot.getUserAnswer()) , formatAnswer(snapshot.getRightAnswer()))) { if (StrUtil.equals(formatAnswer(snapshot.getUserAnswer()) , formatAnswer(snapshot.getRightAnswer()))) {
snapshot.setIsRight(true); snapshot.setIsRight(true);
// 根据题型获取任务创建时算好的单题分值 // 根据题型获取任务创建时算好的单题分值
finalScore += getQuestionWeight(snapshot.getType(), task); finalScore += getQuestionWeight(snapshot.getType(), task);
rightCount ++;
} else { } else {
snapshot.setIsRight(false); snapshot.setIsRight(false);
} }
} }
// 四舍五入处理(不能超过100分) if (rightCount == snapshotList.size()) {
finalScore = new BigDecimal(finalScore).setScale(2, RoundingMode.HALF_UP).min(new BigDecimal("100")).doubleValue(); //全对,直接给满分(避免因为小数点导致的误差)
finalScore = new BigDecimal("100").doubleValue();
}else{
// 四舍五入处理(不能超过100分)
finalScore = new BigDecimal(finalScore).setScale(2, RoundingMode.HALF_UP).min(new BigDecimal("100")).doubleValue();
}
boolean isPassed = (finalScore >= task.getPassScore()); boolean isPassed = (finalScore >= task.getPassScore());
record.setScore(finalScore); record.setScore(finalScore);
record.setPass(isPassed); record.setPass(isPassed);

49
src/main/java/com/project/interaction/application/impl/GenerateQuestionQueueSchedulerService.java

@ -0,0 +1,49 @@
package com.project.interaction.application.impl;
import com.project.interaction.domain.service.PostToGenerateQuestionDomainService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
/**
* 题目生成队列定时处理服务
* 定时从队列获取被拒绝的题目生成请求进行重试
*/
@Service
@Slf4j
public class GenerateQuestionQueueSchedulerService {
@Autowired
private PostToGenerateQuestionDomainService postToGenerateQuestionDomainService;
/**
* 定时处理队列中的待重试项
* 每分钟执行一次
*/
@Scheduled(fixedRate = 60000)
public void processQueuedItems() {
try {
log.debug(">>> [定时任务] 开始处理队列中的待重试题目生成请求...");
postToGenerateQuestionDomainService.processQueuedItems();
} catch (Exception e) {
log.error(">>> [定时任务] 处理队列时发生异常, 错误: {}", e.getMessage(), e);
}
}
/**
* 定时监控队列状态
* 每5分钟执行一次监控队列大小和消费状态
*/
@Scheduled(fixedRate = 300000) // 每300秒执行一次(300000毫秒)
public void monitorQueueStatus() {
try {
log.debug(">>> [定时任务] 开始监控队列状态...");
// 这里可以添加队列监控逻辑,如记录队列大小、消费暂停/恢复状态等
log.debug(">>> [定时任务] 队列监控完成");
} catch (Exception e) {
log.error(">>> [定时任务] 监控队列时发生异常, 错误: {}", e.getMessage(), e);
}
}
}

11
src/main/java/com/project/interaction/controller/InteractionController.java

@ -6,6 +6,7 @@ import com.project.base.domain.result.Result;
import com.project.interaction.application.AlgorithmApplicationService; import com.project.interaction.application.AlgorithmApplicationService;
import com.project.interaction.domain.dto.ClusterCallbackDTO; import com.project.interaction.domain.dto.ClusterCallbackDTO;
import com.project.interaction.domain.dto.QuestionCallBackDTO; import com.project.interaction.domain.dto.QuestionCallBackDTO;
import com.project.interaction.domain.service.GenerateQuestionQueueService;
import com.project.task.application.TaskApplicationService; import com.project.task.application.TaskApplicationService;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*; import org.springframework.web.bind.annotation.*;
@ -18,6 +19,8 @@ public class InteractionController {
private AlgorithmApplicationService algorithmApplicationService; private AlgorithmApplicationService algorithmApplicationService;
@Autowired @Autowired
private TaskApplicationService taskApplicationService; private TaskApplicationService taskApplicationService;
@Autowired
private GenerateQuestionQueueService generateQuestionQueueService;
// @PostMapping("/saveCluster") // @PostMapping("/saveCluster")
@ -44,4 +47,12 @@ public class InteractionController {
public Result<Boolean> isDeletedTask(Long taskId){ public Result<Boolean> isDeletedTask(Long taskId){
return taskApplicationService.selectById(taskId); return taskApplicationService.selectById(taskId);
} }
/**
* 获取当前队列大小
*/
@GetMapping("/getQueueSize")
public Result<Integer> getQueueSize(){
return Result.success(generateQuestionQueueService.getQueueSize());
}
} }

48
src/main/java/com/project/interaction/domain/dto/GenerateQuestionQueueDTO.java

@ -0,0 +1,48 @@
package com.project.interaction.domain.dto;
import com.project.question.domain.dto.TaskKnowledgePointDTO;
import com.project.task.domain.enums.QuestionTypeEnum;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.io.Serializable;
import java.time.LocalDateTime;
import java.util.List;
/**
* 生成题目队列项 - 存储被拒绝的题目生成请求等待重试
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class GenerateQuestionQueueDTO implements Serializable {
private static final long serialVersionUID = 1L;
/** 唯一标识符(用于去重) */
private String itemId;
/** 知识点DTO列表 */
private List<TaskKnowledgePointDTO> dtoList;
/* 考试任务ID */
private Long taskId;
/** 题目类型 */
private QuestionTypeEnum questionType;
/** 生成数量 */
private Integer num;
/** 添加到队列的时间 */
private LocalDateTime addTime;
/** 重试次数 */
private Integer retryCount;
/** 最后一次重试的时间 */
private LocalDateTime lastRetryTime;
}

50
src/main/java/com/project/interaction/domain/service/GenerateQuestionQueueService.java

@ -0,0 +1,50 @@
package com.project.interaction.domain.service;
import com.project.interaction.domain.dto.GenerateQuestionQueueDTO;
import java.util.List;
/**
* 生成题目队列服务接口 - 管理被拒绝的题目生成请求
*/
public interface GenerateQuestionQueueService {
/**
* 添加队列项
* @param item 队列项
* @return 是否添加成功
*/
boolean addItem(GenerateQuestionQueueDTO item);
/**
* 获取待重试的队列项
* @return 待重试的队列项列表
*/
List<GenerateQuestionQueueDTO> getRetryItems();
/**
* 移除队列项
* @param itemId 项的唯一标识
* @return 是否移除成功
*/
boolean removeItem(String itemId);
/**
* 获取队列大小
* @return 队列中的项数
*/
int getQueueSize();
/**
* 重新入队用于重试失败后
* @param item 队列项
*/
void requeue(GenerateQuestionQueueDTO item);
/**
* 删除队列数据
* @param id 考试任务ID
*/
void removeByTaskId(Long id);
}

10
src/main/java/com/project/interaction/domain/service/PostToGenerateQuestionDomainService.java

@ -6,5 +6,15 @@ import com.project.task.domain.enums.QuestionTypeEnum;
import java.util.List; import java.util.List;
public interface PostToGenerateQuestionDomainService { public interface PostToGenerateQuestionDomainService {
/**
* 生成题目
*/
void postToGenerateQuestion(List<TaskKnowledgePointDTO> dtoList, QuestionTypeEnum questionType, int num) throws Exception; void postToGenerateQuestion(List<TaskKnowledgePointDTO> dtoList, QuestionTypeEnum questionType, int num) throws Exception;
/**
* 处理队列中的待重试项
* 由定时任务调用
*/
void processQueuedItems();
} }

181
src/main/java/com/project/interaction/domain/service/impl/GenerateQuestionQueueServiceImpl.java

@ -0,0 +1,181 @@
package com.project.interaction.domain.service.impl;
import com.project.interaction.domain.dto.GenerateQuestionQueueDTO;
import com.project.interaction.domain.service.GenerateQuestionQueueService;
import jakarta.annotation.PostConstruct;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import java.time.LocalDateTime;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;
@Service
@Slf4j
public class GenerateQuestionQueueServiceImpl implements GenerateQuestionQueueService {
@Value("${question.queue.retry-interval:60}")
private Long retryInterval;
@Value("${question.queue.max-retry:10}")
private Integer maxRetry;
private PriorityBlockingQueue<GenerateQuestionQueueDTO> queue;
@PostConstruct
public void init() {
this.queue = new PriorityBlockingQueue<>(
10000,
Comparator.comparing(GenerateQuestionQueueDTO::getAddTime)
.thenComparing(GenerateQuestionQueueDTO::getItemId)
);
}
private final Map<String, GenerateQuestionQueueDTO> itemMap = new ConcurrentHashMap<>();
private final AtomicInteger currentSize = new AtomicInteger(0);
private final ReentrantLock capacityLock = new ReentrantLock();
@Override
public boolean addItem(GenerateQuestionQueueDTO item) {
capacityLock.lock();
try {
if (item.getItemId() == null) {
item.setItemId(UUID.randomUUID().toString());
}
if (item.getAddTime() == null) {
item.setAddTime(LocalDateTime.now());
}
if (item.getRetryCount() == null) {
item.setRetryCount(0);
}
itemMap.put(item.getItemId(), item);
queue.offer(item);
currentSize.incrementAndGet();
log.info(">>> [队列管理] 成功添加队列项, ItemId: {}, 当前队列大小: {}, 添加时间: {}",
item.getItemId(), currentSize.get(), item.getAddTime());
return true;
} finally {
capacityLock.unlock();
}
}
@Override
public List<GenerateQuestionQueueDTO> getRetryItems() {
List<GenerateQuestionQueueDTO> retryItems = new ArrayList<>();
List<GenerateQuestionQueueDTO> unRetryItems = new ArrayList<>();
LocalDateTime now = LocalDateTime.now();
while (!queue.isEmpty()) {
GenerateQuestionQueueDTO item = queue.poll();
if (item == null) {
break;
}
GenerateQuestionQueueDTO latestItem = itemMap.get(item.getItemId());
if (latestItem == null) {
currentSize.decrementAndGet();
continue;
}
if (canRetry(latestItem, now)) {
retryItems.add(latestItem);
} else {
if (latestItem.getRetryCount() >= maxRetry) {
itemMap.remove(latestItem.getItemId());
currentSize.decrementAndGet();
} else {
unRetryItems.add(latestItem);
}
}
}
for (GenerateQuestionQueueDTO item : unRetryItems) {
queue.offer(item);
}
return retryItems;
}
private boolean canRetry(GenerateQuestionQueueDTO item, LocalDateTime now) {
if (item.getRetryCount() >= maxRetry) {
log.warn(">>> [队列管理] 队列项重试次数已达上限, ItemId: {}, 重试次数: {}, 最大次数: {}",
item.getItemId(), item.getRetryCount(), maxRetry);
itemMap.remove(item.getItemId());
currentSize.decrementAndGet();
return false;
}
if (item.getLastRetryTime() != null) {
long secondsElapsed = java.time.temporal.ChronoUnit.SECONDS
.between(item.getLastRetryTime(), now);
if (secondsElapsed < retryInterval) {
return false;
}
}
return true;
}
@Override
public boolean removeItem(String itemId) {
GenerateQuestionQueueDTO removed = itemMap.remove(itemId);
if (removed != null) {
currentSize.decrementAndGet();
log.info(">>> [队列管理] 成功移除队列项, ItemId: {}, 当前队列大小: {}", itemId, currentSize.get());
return true;
}
return false;
}
@Override
public int getQueueSize() {
return currentSize.get();
}
public void requeue(GenerateQuestionQueueDTO item) {
if (item == null || item.getItemId() == null) {
return;
}
capacityLock.lock();
try {
itemMap.put(item.getItemId(), item);
queue.offer(item);
} finally {
capacityLock.unlock();
}
}
@Override
public void removeByTaskId(Long taskId) {
if (taskId == null) {
return;
}
capacityLock.lock();
try {
List<String> keysToRemove = new ArrayList<>();
for (Map.Entry<String, GenerateQuestionQueueDTO> entry : itemMap.entrySet()) {
if (taskId.equals(entry.getValue().getTaskId())) {
keysToRemove.add(entry.getKey());
}
}
for (String key : keysToRemove) {
itemMap.remove(key);
currentSize.decrementAndGet();
}
if (!keysToRemove.isEmpty()) {
log.info(">>> [队列管理] 根据TaskId移除队列项, TaskId: {}, 移除数量: {}", taskId, keysToRemove.size());
}
} finally {
capacityLock.unlock();
}
}
}

208
src/main/java/com/project/interaction/domain/service/impl/PostToGenerateQuestionDomainServiceImpl.java

@ -1,10 +1,13 @@
package com.project.interaction.domain.service.impl; package com.project.interaction.domain.service.impl;
import cn.hutool.core.lang.UUID;
import cn.hutool.core.util.RandomUtil; import cn.hutool.core.util.RandomUtil;
import com.project.information.domain.entity.KnowledgePointEntity; import com.project.information.domain.entity.KnowledgePointEntity;
import com.project.information.domain.service.KnowledgePointBaseService; import com.project.information.domain.service.KnowledgePointBaseService;
import com.project.interaction.domain.dto.GenerateQuestionQueueDTO;
import com.project.interaction.domain.dto.GenerateQuestionRequestDTO; import com.project.interaction.domain.dto.GenerateQuestionRequestDTO;
import com.project.interaction.domain.dto.GenerateQuestionResponseDTO; import com.project.interaction.domain.dto.GenerateQuestionResponseDTO;
import com.project.interaction.domain.service.GenerateQuestionQueueService;
import com.project.interaction.domain.service.PostToGenerateQuestionDomainService; import com.project.interaction.domain.service.PostToGenerateQuestionDomainService;
import com.project.interaction.utils.NotifyUtil; import com.project.interaction.utils.NotifyUtil;
import com.project.question.domain.dto.QuestionDTO; import com.project.question.domain.dto.QuestionDTO;
@ -19,8 +22,12 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import org.springframework.web.reactive.function.client.WebClient; import org.springframework.web.reactive.function.client.WebClient;
import org.springframework.web.reactive.function.client.WebClientResponseException;
import reactor.core.publisher.Mono;
import java.nio.charset.StandardCharsets;
import java.time.Duration; import java.time.Duration;
import java.time.LocalDateTime;
import java.util.*; import java.util.*;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@ -46,6 +53,9 @@ public class PostToGenerateQuestionDomainServiceImpl implements PostToGenerateQu
@Autowired @Autowired
private KnowledgePointBaseService knowledgePointBaseService; private KnowledgePointBaseService knowledgePointBaseService;
@Autowired
private GenerateQuestionQueueService generateQuestionQueueService;
@Autowired @Autowired
private NotifyUtil notifyUtil; private NotifyUtil notifyUtil;
@ -55,6 +65,10 @@ public class PostToGenerateQuestionDomainServiceImpl implements PostToGenerateQu
private final List<Integer> list = Arrays.asList(0, 1, 2, 3); private final List<Integer> list = Arrays.asList(0, 1, 2, 3);
/**
* 算法服务拒绝状态码
*/
private static final Integer REJECT_STATUS = 429;
@Value("${question.generation.downgrade:true}") @Value("${question.generation.downgrade:true}")
private Boolean downgrade; private Boolean downgrade;
@ -62,7 +76,8 @@ public class PostToGenerateQuestionDomainServiceImpl implements PostToGenerateQu
/** /**
* 生成题目 * 生成题目
* 1. 先尝试调用算法服务生成真实题目 * 1. 先尝试调用算法服务生成真实题目
* 2. 如果算法服务失败使用熔断降级方案生成默认题目 * 2. 如果算法服务被拒绝将请求加入队列等待定时任务重试
* 3. 如果算法服务失败使用熔断降级方案生成默认题目
*/ */
@Override @Override
public void postToGenerateQuestion(List<TaskKnowledgePointDTO> dtoList, QuestionTypeEnum questionType, int num) throws Exception { public void postToGenerateQuestion(List<TaskKnowledgePointDTO> dtoList, QuestionTypeEnum questionType, int num) throws Exception {
@ -100,13 +115,29 @@ public class PostToGenerateQuestionDomainServiceImpl implements PostToGenerateQu
return; return;
} }
} catch (Exception e) { }catch (Exception e) {
log.warn(">>> [题目生成] 调用算法服务失败,启动熔断降级方案, TaskId: {}, 错误: {}", if (e instanceof WebClientResponseException exception) {
taskId, e.getMessage()); int statusCode = exception.getStatusCode().value();
if (statusCode == REJECT_STATUS) {
log.warn(">>> [题目生成] 算法服务拒绝请求, TaskId: {}, 状态: {}, 信息: {}, 将加入队列等待重试",
taskId, statusCode, e.getMessage());
GenerateQuestionQueueDTO generateQuestionQueueItem = new GenerateQuestionQueueDTO();
generateQuestionQueueItem.setQuestionType(questionType);
generateQuestionQueueItem.setNum(num);
generateQuestionQueueItem.setDtoList(dtoList);
generateQuestionQueueItem.setTaskId(dtoList.get(0).getTaskId());
generateQuestionQueueService.addItem(generateQuestionQueueItem);
return;
}
}
} }
//发送预警通知 log.warn(">>> [题目生成] 调用算法服务失败, TaskId: {}",
notifyUtil.notify(taskId,clusterId); taskId);
//消息提醒
// notifyUtil.notify(taskId, clusterId);
} }
/** /**
@ -137,11 +168,135 @@ public class PostToGenerateQuestionDomainServiceImpl implements PostToGenerateQu
return generateQuestions(requestDTO); return generateQuestions(requestDTO);
} }
private GenerateQuestionResponseDTO generateQuestions(GenerateQuestionRequestDTO requestDTO) {
try {
log.info(">>> [题目生成] 向算法服务发送请求, TaskId: {}, ClusterId: {}, 生成数量: {}",
requestDTO.getCluster().getTaskId(),
requestDTO.getCluster().getClusterId(),
requestDTO.getNumQuestions());
// 发送请求到算法服务
GenerateQuestionResponseDTO response = algorithmWebClient.post()
.uri(apiUrl+generateQuestionUrl)
.bodyValue(requestDTO)
.retrieve()
.onStatus(
status -> !status.is2xxSuccessful(),
resp -> resp.bodyToMono(String.class)
.flatMap(body -> {
return Mono.error(WebClientResponseException.create(
resp.statusCode().value(),
"",
resp.headers().asHttpHeaders(),
body.getBytes(),
StandardCharsets.UTF_8
));
})
)
.bodyToMono(GenerateQuestionResponseDTO.class)
.timeout(Duration.ofSeconds(30)) // 30秒超时
.retry(2) // 失败重试2次
.block(); // 同步等待结果
log.info(">>> [题目生成] 算法服务成功接收任务, TaskId: {}, AlgoTaskId: {}, 状态: {}",
requestDTO.getCluster().getTaskId(),
response != null ? response.getTaskId() : "null",
response != null ? response.getStatus() : "null");
return response;
} catch (Exception e) {
throw e;
}
}
/**
* 处理队列中的待重试项
* 由定时任务调用一般间隔1分钟调用一次
*/
@Override
public void processQueuedItems() {
// 获取待重试的队列项
List<GenerateQuestionQueueDTO> retryItems = generateQuestionQueueService.getRetryItems();
if (retryItems.isEmpty()) {
log.debug(">>> [队列管理] 当前没有待重试的项");
return;
}
log.info(">>> [队列管理] 获取到 {} 个待重试的项", retryItems.size());
for (GenerateQuestionQueueDTO item : retryItems) {
boolean shouldRequeue = retryGenerateQuestion(item);
if (shouldRequeue) {
generateQuestionQueueService.requeue(item);
}
}
log.info(">>> [队列管理] 本次队列处理完成");
}
/**
* 重试生成题目
* @return true 如果需要重新入队等待下次重试false 如果处理完成成功或达到最大重试次数
*/
private boolean retryGenerateQuestion(GenerateQuestionQueueDTO item){
log.info(">>> [队列管理] 开始重试题目生成, 重试次数: {},ItemId: {}, TaskId: {}",
item.getRetryCount() + 1, item.getItemId(),item.getTaskId());
try {
long waitStart = System.currentTimeMillis();
rateLimiter.acquire();
long waitTime = System.currentTimeMillis() - waitStart;
if (waitTime > 100) {
log.info(">>> [队列管理] 重试因限流等待了 {} ms, TaskID: {}", waitTime, item.getTaskId());
}
KnowledgePointEntity knowledgePoint = knowledgePointBaseService.getById(
item.getDtoList().get(0).getAtomId());
GenerateQuestionResponseDTO response = callAlgorithmService(
item.getDtoList(),
item.getQuestionType(),
item.getNum(),
knowledgePoint.getParseName());
if (response != null && StringUtils.isNotBlank(response.getMessage()) &&
response.getMessage().contains("success")) {
log.info(">>> [队列管理] 重试成功!ItemId: {}, TaskId: {}", item.getItemId(),
item.getTaskId());
generateQuestionQueueService.removeItem(item.getItemId());
return false;
}
} catch (Exception e) {
if (e instanceof WebClientResponseException exception) {
int statusCode = exception.getStatusCode().value();
if (statusCode == REJECT_STATUS) {
log.warn(">>> [队列管理] 重试仍被拒绝, ItemId: {}",
item.getItemId(), exception.getMessage());
item.setLastRetryTime(LocalDateTime.now());
item.setRetryCount(item.getRetryCount() + 1);
return true;
}
}
}
log.error(">>> [队列管理] 重试生成题目异常, ItemId: {},TaskId: {}",
item.getItemId(),item.getTaskId());
item.setLastRetryTime(LocalDateTime.now());
item.setRetryCount(item.getRetryCount() + 1);
notifyUtil.notify(item.getTaskId(), item.getDtoList().get(0).getClusterId());
return true;
}
/** /**
* 熔断降级生成硬编码的默认题目当算法服务失败时使用 * 熔断降级生成硬编码的默认题目当算法服务失败时使用
*/ */
private void generateFallbackQuestions(List<TaskKnowledgePointDTO> dtoList, private void generateFallbackQuestions(List<TaskKnowledgePointDTO> dtoList,
QuestionTypeEnum questionType, int num) throws Exception { QuestionTypeEnum questionType, int num) throws Exception {
for (int i = 0; i < num; i++) { for (int i = 0; i < num; i++) {
QuestionDTO questionDTO = new QuestionDTO(); QuestionDTO questionDTO = new QuestionDTO();
questionDTO.setKpIdList(dtoList.stream().map(TaskKnowledgePointDTO::getId).collect(Collectors.toList())); questionDTO.setKpIdList(dtoList.stream().map(TaskKnowledgePointDTO::getId).collect(Collectors.toList()));
@ -195,45 +350,6 @@ public class PostToGenerateQuestionDomainServiceImpl implements PostToGenerateQu
saveQuestionDomainService.save(questionDTO); saveQuestionDomainService.save(questionDTO);
} }
} }
private GenerateQuestionResponseDTO generateQuestions(GenerateQuestionRequestDTO requestDTO) {
try {
log.info(">>> [题目生成] 向算法服务发送请求, TaskId: {}, ClusterId: {}, 生成数量: {}",
requestDTO.getCluster().getTaskId(),
requestDTO.getCluster().getClusterId(),
requestDTO.getNumQuestions());
// 发送请求到算法服务
GenerateQuestionResponseDTO response = algorithmWebClient.post()
.uri(apiUrl+generateQuestionUrl)
.bodyValue(requestDTO)
.retrieve()
.onStatus(
status -> !status.is2xxSuccessful(),
resp -> resp.bodyToMono(String.class)
.flatMap(body -> reactor.core.publisher.Mono.error(
new RuntimeException("算法服务返回错误: " + body)
))
)
.bodyToMono(GenerateQuestionResponseDTO.class)
.timeout(Duration.ofSeconds(30)) // 30秒超时
.retry(2) // 失败重试2次
.block(); // 同步等待结果
log.info(">>> [题目生成] 算法服务成功接收任务, TaskId: {}, AlgoTaskId: {}, 状态: {}",
requestDTO.getCluster().getTaskId(),
response != null ? response.getTaskId() : "null",
response != null ? response.getStatus() : "null");
return response;
} catch (Exception e) {
log.error(">>> [题目生成] 调用算法服务异常, TaskId: {}, 错误: {}",
requestDTO.getCluster().getTaskId(),
e.getMessage(), e);
throw new RuntimeException("调用算法服务生成题目失败: " + e.getMessage(), e);
}
}
} }

5
src/main/java/com/project/task/domain/service/impl/DeleteTaskDomainServiceImpl.java

@ -6,6 +6,7 @@ import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.project.base.domain.enums.StatusEnum; import com.project.base.domain.enums.StatusEnum;
import com.project.base.domain.exception.BusinessErrorException; import com.project.base.domain.exception.BusinessErrorException;
import com.project.base.domain.result.Result; import com.project.base.domain.result.Result;
import com.project.interaction.domain.service.GenerateQuestionQueueService;
import com.project.task.domain.entity.TaskEntity; import com.project.task.domain.entity.TaskEntity;
import com.project.task.domain.entity.TaskUserEntity; import com.project.task.domain.entity.TaskUserEntity;
import com.project.task.domain.service.DeleteTaskDomainService; import com.project.task.domain.service.DeleteTaskDomainService;
@ -24,6 +25,9 @@ public class DeleteTaskDomainServiceImpl implements DeleteTaskDomainService {
@Autowired @Autowired
private TaskUserBaseService taskUserBaseService; private TaskUserBaseService taskUserBaseService;
@Autowired
private GenerateQuestionQueueService generateQuestionQueueService;
@Override @Override
public Result<String> batchDelete(List<Long> ids) throws Exception { public Result<String> batchDelete(List<Long> ids) throws Exception {
if (CollUtil.isEmpty(ids)) { if (CollUtil.isEmpty(ids)) {
@ -49,6 +53,7 @@ public class DeleteTaskDomainServiceImpl implements DeleteTaskDomainService {
LambdaQueryWrapper<TaskUserEntity> queryWrapper = new LambdaQueryWrapper<>(); LambdaQueryWrapper<TaskUserEntity> queryWrapper = new LambdaQueryWrapper<>();
queryWrapper.eq(TaskUserEntity::getTaskId , id); queryWrapper.eq(TaskUserEntity::getTaskId , id);
taskUserBaseService.remove(queryWrapper); taskUserBaseService.remove(queryWrapper);
generateQuestionQueueService.removeByTaskId(id);
} }
return Result.success("删除成功"); return Result.success("删除成功");

6
src/main/resources/application-dev.yml

@ -87,5 +87,11 @@ question:
# 限流速率:每秒允许的API请求数 # 限流速率:每秒允许的API请求数
rate-limit: 20 rate-limit: 20
downgrade: false downgrade: false
# 队列配置
queue:
# 重试间隔(秒)
retry-interval: 60
# 最大重试次数
max-retry: 100
scheduled-task: scheduled-task:
owner: test owner: test

10
src/main/resources/application-test.yml

@ -59,7 +59,7 @@ mybatis-plus:
type-aliases-package: com.proposal.**.domain.entity type-aliases-package: com.proposal.**.domain.entity
milvus: milvus:
host: 8.129.84.155 host: 172.16.204.50
port: 19530 port: 19530
analysis: analysis:
host: https://107pm707566hq.vicp.fun host: https://107pm707566hq.vicp.fun
@ -83,8 +83,14 @@ jwt:
question: question:
generation: generation:
# 限流速率:每秒允许的API请求数 # 限流速率:每秒允许的API请求数
rate-limit: 5 rate-limit: 1
downgrade: false downgrade: false
# 队列配置
queue:
# 重试间隔(秒)
retry-interval: 60
# 最大重试次数
max-retry: 100
scheduled-task: scheduled-task:
owner: test owner: test
Loading…
Cancel
Save