12 changed files with 250 additions and 4 deletions
@ -0,0 +1,24 @@ |
|||||
|
package com.project.ding.config; |
||||
|
|
||||
|
import com.google.common.util.concurrent.ThreadFactoryBuilder; |
||||
|
import org.springframework.context.annotation.Bean; |
||||
|
import org.springframework.context.annotation.Configuration; |
||||
|
|
||||
|
import java.util.concurrent.ExecutorService; |
||||
|
import java.util.concurrent.LinkedBlockingQueue; |
||||
|
import java.util.concurrent.ThreadPoolExecutor; |
||||
|
import java.util.concurrent.TimeUnit; |
||||
|
|
||||
|
@Configuration |
||||
|
public class SyncThreadPoolConfig { |
||||
|
@Bean(name = "dingUserSyncExecutor") |
||||
|
public ExecutorService dingUserSyncExecutor() { |
||||
|
return new ThreadPoolExecutor( |
||||
|
1, 1, |
||||
|
0L, TimeUnit.MILLISECONDS, |
||||
|
new LinkedBlockingQueue<>(1), |
||||
|
new ThreadFactoryBuilder().setNameFormat("ding-user-sync-%d").build(), |
||||
|
new ThreadPoolExecutor.DiscardPolicy() |
||||
|
); |
||||
|
} |
||||
|
} |
||||
@ -0,0 +1,37 @@ |
|||||
|
package com.project.ding.domain.entity; |
||||
|
|
||||
|
import com.baomidou.mybatisplus.annotation.TableName; |
||||
|
import com.project.base.domain.entity.BaseEntity; |
||||
|
import jakarta.persistence.Column; |
||||
|
import jakarta.persistence.Entity; |
||||
|
import jakarta.persistence.Table; |
||||
|
import lombok.Data; |
||||
|
import lombok.EqualsAndHashCode; |
||||
|
import org.hibernate.annotations.Comment; |
||||
|
|
||||
|
import java.util.Date; |
||||
|
|
||||
|
|
||||
|
@Data |
||||
|
@Table(name = "evaluator_sync_log" ) |
||||
|
@Entity |
||||
|
@TableName("evaluator_sync_log") |
||||
|
@EqualsAndHashCode(callSuper = true) |
||||
|
public class SyncLogEntity extends BaseEntity { |
||||
|
|
||||
|
@Comment("同步开始时间") |
||||
|
@Column(name = "start_time") |
||||
|
private Date startTime; |
||||
|
|
||||
|
@Comment("同步结束时间") |
||||
|
@Column(name = "end_time") |
||||
|
private Date endTime; |
||||
|
|
||||
|
@Column(name = "status") |
||||
|
@Comment("任务状态") |
||||
|
private Integer status; // 0-进行中, 1-成功, 2-失败
|
||||
|
|
||||
|
@Column(name = "error_msg" , columnDefinition="TEXT comment '错误信息'") |
||||
|
private String errorMsg; |
||||
|
|
||||
|
} |
||||
@ -0,0 +1,7 @@ |
|||||
|
package com.project.ding.domain.service; |
||||
|
|
||||
|
import com.baomidou.mybatisplus.extension.service.IService; |
||||
|
import com.project.ding.domain.entity.UserEntity; |
||||
|
|
||||
|
public interface UserBaseService extends IService<UserEntity> { |
||||
|
} |
||||
@ -0,0 +1,11 @@ |
|||||
|
package com.project.ding.domain.service.impl; |
||||
|
|
||||
|
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; |
||||
|
import com.project.ding.domain.entity.UserEntity; |
||||
|
import com.project.ding.domain.service.UserBaseService; |
||||
|
import com.project.ding.mapper.UserMapper; |
||||
|
import org.springframework.stereotype.Service; |
||||
|
|
||||
|
@Service |
||||
|
public class UserBaseServiceImpl extends ServiceImpl<UserMapper , UserEntity> implements UserBaseService { |
||||
|
} |
||||
@ -0,0 +1,9 @@ |
|||||
|
package com.project.ding.mapper; |
||||
|
|
||||
|
import com.baomidou.mybatisplus.core.mapper.BaseMapper; |
||||
|
import com.project.ding.domain.entity.DepartmentEntity; |
||||
|
import org.apache.ibatis.annotations.Mapper; |
||||
|
|
||||
|
@Mapper |
||||
|
public interface DepartmentMapper extends BaseMapper<DepartmentEntity> { |
||||
|
} |
||||
@ -0,0 +1,9 @@ |
|||||
|
package com.project.ding.mapper; |
||||
|
|
||||
|
import com.baomidou.mybatisplus.core.mapper.BaseMapper; |
||||
|
import com.project.ding.domain.entity.SyncLogEntity; |
||||
|
import org.apache.ibatis.annotations.Mapper; |
||||
|
|
||||
|
@Mapper |
||||
|
public interface SyncLogMapper extends BaseMapper<SyncLogEntity> { |
||||
|
} |
||||
@ -0,0 +1,7 @@ |
|||||
|
package com.project.ding.mapper; |
||||
|
|
||||
|
import com.baomidou.mybatisplus.core.mapper.BaseMapper; |
||||
|
import com.project.ding.domain.entity.UserEntity; |
||||
|
|
||||
|
public interface UserMapper extends BaseMapper<UserEntity> { |
||||
|
} |
||||
@ -0,0 +1,108 @@ |
|||||
|
package com.project.ding.utils; |
||||
|
|
||||
|
|
||||
|
import com.project.ding.domain.dto.UserDTO; |
||||
|
import com.project.ding.domain.entity.SyncLogEntity; |
||||
|
import com.project.ding.domain.entity.UserEntity; |
||||
|
import com.project.ding.domain.service.UserBaseService; |
||||
|
import com.project.ding.mapper.SyncLogMapper; |
||||
|
import jakarta.annotation.Resource; |
||||
|
import lombok.extern.slf4j.Slf4j; |
||||
|
import org.springframework.beans.factory.annotation.Autowired; |
||||
|
import org.springframework.data.redis.core.StringRedisTemplate; |
||||
|
import org.springframework.stereotype.Component; |
||||
|
|
||||
|
import java.util.Date; |
||||
|
import java.util.List; |
||||
|
import java.util.concurrent.CompletableFuture; |
||||
|
import java.util.concurrent.ExecutorService; |
||||
|
import java.util.concurrent.TimeUnit; |
||||
|
import java.util.stream.Collectors; |
||||
|
|
||||
|
@Component |
||||
|
@Slf4j |
||||
|
public class DingUserSyncUtil { |
||||
|
@Resource(name = "dingUserSyncExecutor") |
||||
|
private ExecutorService executor; |
||||
|
|
||||
|
@Autowired |
||||
|
private StringRedisTemplate redisTemplate; |
||||
|
|
||||
|
@Autowired |
||||
|
private UserBaseService userBaseService; |
||||
|
|
||||
|
@Autowired |
||||
|
private DingUtil dingUtil; |
||||
|
|
||||
|
@Autowired |
||||
|
private SyncLogMapper syncLogMapper; |
||||
|
|
||||
|
private static final String LOCK_KEY = "lock:ding:user_sync"; |
||||
|
private static final String LAST_SYNC_KEY = "cache:ding:last_sync_time"; |
||||
|
|
||||
|
/** |
||||
|
* 触发同步任务 |
||||
|
* @param force 是否强制触发(跳过120分钟冷却检查) |
||||
|
*/ |
||||
|
public String triggerSync(boolean force) { |
||||
|
// 1. 抢占 Redis 锁(防止多个实例同时跑全量)
|
||||
|
Boolean acquired = redisTemplate.opsForValue().setIfAbsent(LOCK_KEY, "true", 1, TimeUnit.HOURS); |
||||
|
if (Boolean.FALSE.equals(acquired)) { |
||||
|
return "同步任务已在运行中,请勿重复触发"; |
||||
|
} |
||||
|
|
||||
|
// 2. 提交异步任务(不使用 @Async)
|
||||
|
CompletableFuture.runAsync(() -> { |
||||
|
try { |
||||
|
// 3. 冷却检查
|
||||
|
if (!force) { |
||||
|
String lastSync = redisTemplate.opsForValue().get(LAST_SYNC_KEY); |
||||
|
if (lastSync != null && (System.currentTimeMillis() - Long.parseLong(lastSync) < 120 * 60 * 1000)) { |
||||
|
log.info("处于冷却期,跳过全量同步"); |
||||
|
return; |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
log.info(">>> 开始钉钉用户全量同步 (force={})", force); |
||||
|
runActualSyncTask(); |
||||
|
|
||||
|
} catch (Exception e) { |
||||
|
log.error("全量同步发生严重异常", e); |
||||
|
} finally { |
||||
|
// 4. 释放锁
|
||||
|
redisTemplate.delete(LOCK_KEY); |
||||
|
} |
||||
|
}, executor); |
||||
|
|
||||
|
return "同步任务已启动,请查看系统日志或更新时间"; |
||||
|
} |
||||
|
private void runActualSyncTask() { |
||||
|
// 插入一条同步日志
|
||||
|
SyncLogEntity syncLogEntity = new SyncLogEntity(); |
||||
|
syncLogEntity.setStartTime(new Date()); |
||||
|
syncLogEntity.setStatus(0); |
||||
|
syncLogMapper.insert(syncLogEntity); |
||||
|
|
||||
|
// 开始同步
|
||||
|
try { |
||||
|
List<UserDTO> allUserDTOList = dingUtil.getAllUserDTO(); |
||||
|
userBaseService.saveOrUpdateBatch(allUserDTOList.stream() |
||||
|
.map(dto -> dto.toEntity(UserEntity::new)) |
||||
|
.collect(Collectors.toList()) , 300); |
||||
|
syncLogEntity.setStatus(1); |
||||
|
syncLogEntity.setEndTime(new Date()); |
||||
|
syncLogMapper.updateById(syncLogEntity); |
||||
|
|
||||
|
redisTemplate.opsForValue().set(LAST_SYNC_KEY, String.valueOf(System.currentTimeMillis())); |
||||
|
log.info(">>> 全量同步完成"); |
||||
|
} catch (Exception e) { |
||||
|
// 同步失败更新
|
||||
|
syncLogEntity.setEndTime(new Date()); |
||||
|
syncLogEntity.setStatus(2); // 失败
|
||||
|
syncLogEntity.setErrorMsg(e.getMessage()); |
||||
|
syncLogMapper.updateById(syncLogEntity); |
||||
|
log.error(">>> 全量同步过程报错", e); |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
} |
||||
Loading…
Reference in new issue