|
|
|
@ -21,9 +21,6 @@ public class GenerateQuestionQueueServiceImpl implements GenerateQuestionQueueSe |
|
|
|
@Value("${question.queue.retry-interval:60}") |
|
|
|
private Long retryInterval; |
|
|
|
|
|
|
|
@Value("${question.queue.max-retry:10}") |
|
|
|
private Integer maxRetry; |
|
|
|
|
|
|
|
private PriorityBlockingQueue<GenerateQuestionQueueDTO> queue; |
|
|
|
|
|
|
|
/*按照权限降序、时间升序*/ |
|
|
|
@ -72,9 +69,9 @@ public class GenerateQuestionQueueServiceImpl implements GenerateQuestionQueueSe |
|
|
|
|
|
|
|
@Override |
|
|
|
public List<GenerateQuestionQueueDTO> getRetryItems() { |
|
|
|
capacityLock.lock(); |
|
|
|
try { |
|
|
|
List<GenerateQuestionQueueDTO> retryItems = new ArrayList<>(); |
|
|
|
List<GenerateQuestionQueueDTO> unRetryItems = new ArrayList<>(); |
|
|
|
LocalDateTime now = LocalDateTime.now(); |
|
|
|
|
|
|
|
while (!queue.isEmpty()) { |
|
|
|
GenerateQuestionQueueDTO item = queue.poll(); |
|
|
|
@ -87,48 +84,19 @@ public class GenerateQuestionQueueServiceImpl implements GenerateQuestionQueueSe |
|
|
|
currentSize.decrementAndGet(); |
|
|
|
continue; |
|
|
|
} |
|
|
|
|
|
|
|
if (canRetry(latestItem, now)) { |
|
|
|
retryItems.add(latestItem); |
|
|
|
} else { |
|
|
|
if (latestItem.getRetryCount() >= maxRetry) { |
|
|
|
itemMap.remove(latestItem.getItemId()); |
|
|
|
currentSize.decrementAndGet(); |
|
|
|
} else { |
|
|
|
unRetryItems.add(latestItem); |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
for (GenerateQuestionQueueDTO item : unRetryItems) { |
|
|
|
queue.offer(item); |
|
|
|
} |
|
|
|
|
|
|
|
return retryItems; |
|
|
|
} finally { |
|
|
|
capacityLock.unlock(); |
|
|
|
} |
|
|
|
|
|
|
|
private boolean canRetry(GenerateQuestionQueueDTO item, LocalDateTime now) { |
|
|
|
if (item.getRetryCount() >= maxRetry) { |
|
|
|
log.warn(">>> [队列管理] 队列项重试次数已达上限, ItemId: {}, 重试次数: {}, 最大次数: {}", |
|
|
|
item.getItemId(), item.getRetryCount(), maxRetry); |
|
|
|
itemMap.remove(item.getItemId()); |
|
|
|
currentSize.decrementAndGet(); |
|
|
|
return false; |
|
|
|
} |
|
|
|
|
|
|
|
if (item.getLastRetryTime() != null) { |
|
|
|
long secondsElapsed = java.time.temporal.ChronoUnit.SECONDS |
|
|
|
.between(item.getLastRetryTime(), now); |
|
|
|
if (secondsElapsed < retryInterval) { |
|
|
|
return false; |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
return true; |
|
|
|
} |
|
|
|
|
|
|
|
@Override |
|
|
|
public boolean removeItem(String itemId) { |
|
|
|
capacityLock.lock(); |
|
|
|
try { |
|
|
|
GenerateQuestionQueueDTO removed = itemMap.remove(itemId); |
|
|
|
if (removed != null) { |
|
|
|
currentSize.decrementAndGet(); |
|
|
|
@ -136,6 +104,9 @@ public class GenerateQuestionQueueServiceImpl implements GenerateQuestionQueueSe |
|
|
|
return true; |
|
|
|
} |
|
|
|
return false; |
|
|
|
} finally { |
|
|
|
capacityLock.unlock(); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
@Override |
|
|
|
@ -149,6 +120,9 @@ public class GenerateQuestionQueueServiceImpl implements GenerateQuestionQueueSe |
|
|
|
} |
|
|
|
capacityLock.lock(); |
|
|
|
try { |
|
|
|
if (!itemMap.containsKey(item.getItemId())) { |
|
|
|
currentSize.incrementAndGet(); |
|
|
|
} |
|
|
|
itemMap.put(item.getItemId(), item); |
|
|
|
queue.offer(item); |
|
|
|
} finally { |
|
|
|
|