luoweijian 1 week ago
parent
commit
08870683d6
  1. 2
      src/main/java/com/project/interaction/config/WebClientConfig.java
  2. 17
      src/main/java/com/project/interaction/domain/dto/ClusterQueryDTO.java
  3. 24
      src/main/java/com/project/interaction/domain/service/impl/PostToClusteringDomainServiceImpl.java

2
src/main/java/com/project/interaction/config/WebClientConfig.java

@ -15,7 +15,7 @@ import java.util.concurrent.TimeUnit;
@Configuration
public class WebClientConfig {
@Value("algo.baseUrl:http://172.16.204.50:8002")
@Value("${algo.baseUrl:http://172.16.204.50:8002}")
private String baseUrl;
@Bean(name = "algorithmWebClient")

17
src/main/java/com/project/interaction/domain/dto/ClusterQueryDTO.java

@ -8,20 +8,21 @@ import lombok.NoArgsConstructor;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
@Data
@NoArgsConstructor
public class ClusterQueryDTO {
private Long taskId;
private Map<String , Object> data;
private List<Map<String, Object>> data;
public ClusterQueryDTO(Long taskId, List<KnowledgePointDTO> kpList) {
ClusterQueryDTO dto = new ClusterQueryDTO();
dto.setTaskId(taskId);
kpList.forEach(kp -> {
Map<String , Object> data = new HashMap<>();
data.put("id" , kp.getId());
data.put("text" , kp.getContent());
});
this.taskId = taskId;
this.data = kpList.stream().map(kp -> {
Map<String, Object> item = new HashMap<>();
item.put("id", kp.getId());
item.put("text", kp.getContent()); // 对应 JSON 里的 text
return item;
}).collect(Collectors.toList());
}

24
src/main/java/com/project/interaction/domain/service/impl/PostToClusteringDomainServiceImpl.java

@ -12,6 +12,7 @@ import org.springframework.web.reactive.function.client.WebClient;
import java.util.Collections;
import java.util.List;
import java.util.Map;
@Service
@Slf4j
@ -27,26 +28,27 @@ public class PostToClusteringDomainServiceImpl implements PostToClusteringDomain
@Override
public void postToClustering(Long taskId, List<KnowledgePointDTO> kpList) {
// 构建请求 DTO (AlgoRequestDTO)
List<ClusterQueryDTO> request = Collections.singletonList(new ClusterQueryDTO(taskId, kpList));
Map<String, Object> requestBody = Collections.singletonMap("tasks",
Collections.singletonList(new ClusterQueryDTO(taskId, kpList)));
try {
log.info(">>> [Interactor] 正在向算法端发送请求, TaskId: {}", taskId);
algorithmWebClient.post()
.uri(clusterUrl)
.bodyValue(request)
.bodyValue(requestBody)
.retrieve()
.bodyToMono(ClusterResultDTO.class)// 错误处理
.doOnError(error -> log.error(">>> [Interactor] 算法端推送异常, TaskId: {}, 原因: {}",
taskId, error.getMessage()))
// 结果处理(非阻塞订阅)
.subscribe(result -> {
if (result != null) {
log.info(">>> [Interactor] 算法端已成功接收任务: {}", taskId);
} else {
// log.warn(">>> [Interactor] 算法端拒绝了请求, TaskId: {}, 消息: {}",
// taskId, result != null ? result.getMessage() : "无返回消息");
}
});
.subscribe(
result -> {
log.info(">>> [Interactor] 算法端已成功接收任务: {}", taskId);
},
error -> {
// 这里处理订阅过程中的报错,防止异常被丢弃
log.error(">>> [Interactor] 任务 {} 订阅执行失败", taskId, error);
}
);
} catch (Exception e) {
log.error(">>> [Interactor] 算法端通信异常", e);

Loading…
Cancel
Save