package com.mosquito.project.job; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.mosquito.project.config.AppConfig; import com.mosquito.project.domain.Activity; import com.mosquito.project.persistence.entity.ActivityEntity; import com.mosquito.project.persistence.entity.RewardJobEntity; import com.mosquito.project.persistence.entity.ShortLinkEntity; import com.mosquito.project.persistence.entity.UserRewardEntity; import com.mosquito.project.persistence.repository.ActivityRepository; import com.mosquito.project.persistence.repository.RewardJobRepository; import com.mosquito.project.persistence.repository.ShortLinkRepository; import com.mosquito.project.persistence.repository.UserRewardRepository; import com.mosquito.project.service.CouponRewardService; import com.mosquito.project.service.RewardService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import org.springframework.transaction.annotation.Transactional; import java.math.BigDecimal; import java.time.OffsetDateTime; import java.time.ZoneOffset; import java.util.List; import java.util.Map; /** * 奖励任务消费处理器 * 定时处理奖励队列中的待发放任务 * 使用tracking_id进行精确归因 * 按活动规则计算奖励值 */ @Component @ConditionalOnProperty(value="app.reward-job.enabled", havingValue="true", matchIfMissing=true) public class RewardJobProcessor { private static final Logger log = LoggerFactory.getLogger(RewardJobProcessor.class); private static final int MAX_RETRY_COUNT = 3; private final RewardJobRepository rewardJobRepository; private final ShortLinkRepository shortLinkRepository; private final UserRewardRepository userRewardRepository; private final ActivityRepository activityRepository; private final ObjectMapper objectMapper; private final RewardDistributor rewardDistributor; private final CouponRewardService couponRewardService; private final AppConfig appConfig; public RewardJobProcessor(RewardJobRepository rewardJobRepository, ShortLinkRepository shortLinkRepository, UserRewardRepository userRewardRepository, ActivityRepository activityRepository, ObjectMapper objectMapper, RewardDistributor rewardDistributor, CouponRewardService couponRewardService, AppConfig appConfig) { this.rewardJobRepository = rewardJobRepository; this.shortLinkRepository = shortLinkRepository; this.userRewardRepository = userRewardRepository; this.activityRepository = activityRepository; this.objectMapper = objectMapper; this.rewardDistributor = rewardDistributor; this.couponRewardService = couponRewardService; this.appConfig = appConfig; } @Scheduled(fixedDelay = 5000) // 每5秒执行一次 @Transactional public void processRewardJobs() { if (!appConfig.getRewardJob().isEnabled()) { return; // 测试环境禁用 } OffsetDateTime now = OffsetDateTime.now(ZoneOffset.UTC); List pendingJobs = rewardJobRepository .findTop10ByStatusAndNextRunAtLessThanEqualOrderByCreatedAtAsc("pending", now); if (pendingJobs.isEmpty()) { return; } log.info("开始处理 {} 个奖励任务", pendingJobs.size()); for (RewardJobEntity job : pendingJobs) { try { processRewardJob(job); } catch (Exception e) { log.error("处理奖励任务 {} 失败: {}", job.getId(), e.getMessage()); handleJobFailure(job); } } log.info("奖励任务处理完成"); } /** * 处理单个奖励任务 * 通过tracking_id精确定位邀请关系并发奖 * PRD要求:去掉"按externalUserId查最近邀请记录"的隐式归因 */ private void processRewardJob(RewardJobEntity job) { String trackingId = job.getTrackingId(); if (trackingId == null || trackingId.isEmpty()) { log.warn("奖励任务 {} 缺少tracking_id", job.getId()); job.setStatus("failed"); job.setUpdatedAt(OffsetDateTime.now(ZoneOffset.UTC)); rewardJobRepository.save(job); return; } // 通过tracking_id精确查找对应的短链接记录(包含邀请关系) var shortLinkOpt = shortLinkRepository.findByTrackingId(trackingId); if (shortLinkOpt.isEmpty()) { log.warn("找不到tracking_id {} 对应的邀请记录", trackingId); job.setStatus("failed"); job.setUpdatedAt(OffsetDateTime.now(ZoneOffset.UTC)); rewardJobRepository.save(job); return; } ShortLinkEntity shortLink = shortLinkOpt.get(); Long inviterUserId = shortLink.getInviterUserId(); Long activityId = shortLink.getActivityId(); if (inviterUserId == null) { log.warn("tracking_id {} 对应的邀请记录没有邀请人", trackingId); job.setStatus("failed"); job.setUpdatedAt(OffsetDateTime.now(ZoneOffset.UTC)); rewardJobRepository.save(job); return; } // 根据活动配置计算奖励值和奖励类型 int points = calculateRewardPoints(activityId); String rewardType = calculateRewardType(activityId); // 从活动配置获取奖励类型 // 处理优惠券奖励 if ("COUPON".equals(rewardType)) { // 优惠券奖励处理:先创建积分记录(状态为APPROVED),再调用优惠券服务发放 processCouponReward(job, shortLink, inviterUserId, activityId, trackingId, points); return; } // 调用外部发放适配器进行奖励发放 try { boolean success = rewardDistributor.distribute(inviterUserId, activityId, trackingId, points, rewardType); if (!success) { log.warn("奖励发放失败: userId={}, activityId={}, trackingId={}", inviterUserId, activityId, trackingId); handleJobFailure(job); return; } } catch (Exception e) { log.error("奖励发放异常: {}", e.getMessage()); handleJobFailure(job); return; } // 创建用户奖励记录 UserRewardEntity reward = new UserRewardEntity(); reward.setUserId(inviterUserId); reward.setActivityId(activityId); reward.setPoints(points); reward.setType(rewardType); reward.setStatus(RewardService.STATUS_GRANTED); reward.setCreatedAt(OffsetDateTime.now(ZoneOffset.UTC)); reward.setTrackingId(trackingId); // 记录归因的tracking_id // 从活动获取部门ID,用于数据权限过滤 var activityOpt = activityRepository.findById(activityId); if (activityOpt.isPresent()) { reward.setDepartmentId(activityOpt.get().getDepartmentId()); log.debug("设置奖励departmentId={} from activityId={}", activityOpt.get().getDepartmentId(), activityId); } userRewardRepository.save(reward); // 更新任务状态为已完成 job.setStatus("completed"); job.setUpdatedAt(OffsetDateTime.now(ZoneOffset.UTC)); rewardJobRepository.save(job); log.info("成功处理奖励任务 {},通过tracking_id {} 为用户 {} 发放 {} 积分", job.getId(), trackingId, inviterUserId, points); } /** * 根据活动配置计算奖励积分 */ private int calculateRewardPoints(Long activityId) { // 默认奖励值 int defaultPoints = 10; try { var activityOpt = activityRepository.findById(activityId); if (activityOpt.isEmpty()) { log.warn("找不到活动 {} 的配置,使用默认奖励", activityId); return defaultPoints; } ActivityEntity activity = activityOpt.get(); String calculationMode = activity.getRewardCalculationMode(); // 如果有阶梯奖励配置,按阶梯计算 if (activity.getRewardTiersConfig() != null && !activity.getRewardTiersConfig().isEmpty()) { return calculateTieredReward(activity.getRewardTiersConfig()); } // 根据计算模式返回奖励 if ("FIXED".equals(calculationMode)) { return defaultPoints; } // 默认返回固定奖励 return defaultPoints; } catch (Exception e) { log.error("计算奖励失败: {}", e.getMessage()); return defaultPoints; } } /** * 计算阶梯奖励 */ private int calculateTieredReward(String tiersConfig) { try { @SuppressWarnings("unchecked") List> tiers = objectMapper.readValue(tiersConfig, List.class); if (tiers == null || tiers.isEmpty()) { return 10; } // 取第一个阶梯的奖励值 Map firstTier = tiers.get(0); Object points = firstTier.get("points"); if (points instanceof Number) { return ((Number) points).intValue(); } return 10; } catch (JsonProcessingException e) { log.error("解析阶梯奖励配置失败: {}", e.getMessage()); return 10; } } /** * 从活动配置中获取奖励类型 */ private String calculateRewardType(Long activityId) { try { var activityOpt = activityRepository.findById(activityId); if (activityOpt.isEmpty()) { return "POINTS"; // 默认积分奖励 } ActivityEntity activity = activityOpt.get(); // 从活动配置的 rewardTiersConfig 中获取奖励类型 if (activity.getRewardTiersConfig() != null && !activity.getRewardTiersConfig().isEmpty()) { @SuppressWarnings("unchecked") List> tiers = objectMapper.readValue(activity.getRewardTiersConfig(), List.class); if (tiers != null && !tiers.isEmpty()) { Map firstTier = tiers.get(0); Object type = firstTier.get("type"); if (type != null) { return type.toString(); } } } return "POINTS"; // 默认积分奖励 } catch (Exception e) { log.error("获取奖励类型失败: {}", e.getMessage()); return "POINTS"; } } /** * 处理优惠券奖励发放 */ private void processCouponReward(RewardJobEntity job, ShortLinkEntity shortLink, Long inviterUserId, Long activityId, String trackingId, int points) { try { // 1. 先创建用户奖励记录(状态为APPROVED待发放) UserRewardEntity reward = new UserRewardEntity(); reward.setUserId(inviterUserId); reward.setActivityId(activityId); reward.setPoints(points); reward.setType("COUPON"); reward.setStatus("APPROVED"); // 待发放状态 reward.setCreatedAt(OffsetDateTime.now(ZoneOffset.UTC)); reward.setTrackingId(trackingId); // 从活动获取部门ID和优惠券批次ID var activityOpt = activityRepository.findById(activityId); if (activityOpt.isPresent()) { ActivityEntity activity = activityOpt.get(); reward.setDepartmentId(activity.getDepartmentId()); // 从活动配置中获取优惠券批次ID if (activity.getRewardTiersConfig() != null) { try { @SuppressWarnings("unchecked") List> tiers = objectMapper.readValue(activity.getRewardTiersConfig(), List.class); if (tiers != null && !tiers.isEmpty()) { Object couponBatchId = tiers.get(0).get("couponBatchId"); if (couponBatchId != null) { reward.setCouponBatchId(couponBatchId.toString()); } } } catch (Exception e) { log.warn("解析优惠券批次ID失败: {}", e.getMessage()); } } } UserRewardEntity savedReward = userRewardRepository.save(reward); // 2. 调用优惠券服务发放优惠券 if (savedReward.getCouponBatchId() != null) { CouponRewardService.CouponGrantResult result = couponRewardService.grantCoupon( savedReward.getId(), savedReward.getCouponBatchId(), inviterUserId ); if (result.isSuccess()) { log.info("优惠券发放成功: rewardId={}, couponCode={}", savedReward.getId(), result.getCouponCode()); } else { log.warn("优惠券发放失败: rewardId={}, reason={}", savedReward.getId(), result.getMessage()); // 优惠券发放失败不算任务失败,因为奖励记录已创建 } } // 3. 更新任务状态为已完成 job.setStatus("completed"); job.setUpdatedAt(OffsetDateTime.now(ZoneOffset.UTC)); rewardJobRepository.save(job); log.info("成功处理优惠券奖励任务 {},通过tracking_id {} 为用户 {} 发放优惠券", job.getId(), trackingId, inviterUserId); } catch (Exception e) { log.error("处理优惠券奖励任务 {} 失败: {}", job.getId(), e.getMessage()); handleJobFailure(job); } } /** * 处理任务失败,重试或标记为失败 */ private void handleJobFailure(RewardJobEntity job) { int retryCount = job.getRetryCount() != null ? job.getRetryCount() : 0; if (retryCount >= MAX_RETRY_COUNT) { // 超过最大重试次数,标记为失败 job.setStatus("failed"); log.warn("奖励任务 {} 超过最大重试次数,标记为失败", job.getId()); } else { // 增加重试次数,安排下次执行 job.setRetryCount(retryCount + 1); job.setNextRunAt(OffsetDateTime.now(ZoneOffset.UTC).plusMinutes((long) Math.pow(2, retryCount))); job.setStatus("pending"); log.info("奖励任务 {} 重试次数 {}, 下次执行时间: {}", job.getId(), retryCount + 1, job.getNextRunAt()); } job.setUpdatedAt(OffsetDateTime.now(ZoneOffset.UTC)); rewardJobRepository.save(job); } /** * 奖励发放接口(适配器模式) */ public interface RewardDistributor { /** * 发放奖励 * @param userId 用户ID * @param activityId 活动ID * @param trackingId 追踪ID * @param points 积分数量 * @param rewardType 奖励类型 * @return 是否发放成功 */ boolean distribute(Long userId, Long activityId, String trackingId, int points, String rewardType); } }