基于Java和Redis实现Feature Store中的热点特征计算防穿透分布式锁


项目紧急上线了一套实时特征平台(Feature Store)的核心计算服务,最初几天流量平稳,一切看起来都很完美。然而,随着运营活动引流,某个核心用户群体的活跃度瞬间飙升,监控系统开始疯狂告警:计算集群的CPU利用率触顶,底层数据源(一个MySQL集群)的连接池被打满,QPS响应时间从50ms飙升到3000ms以上。

问题出乎意料地严重。排查日志发现,对于同一个用户的同一个特征(例如,“用户近1小时内购买总金额”),在缓存失效的瞬间,网关会将大量并发请求路由到不同的计算服务实例上。这导致了多个实例同时从数据库拉取原始数据,执行完全相同的聚合计算,然后几乎同时回写缓存。这种计算资源的极大浪费和对数据源的重复冲击,就是我们面临的“计算风暴”。

最初版的特征计算逻辑大致如下,它存在着典型的并发计算竞争问题:

// V1 - Naive Implementation with Race Condition
@Service
public class FeatureComputationServiceV1 {

    private static final Logger log = LoggerFactory.getLogger(FeatureComputationServiceV1.class);

    @Autowired
    private FeatureCacheClient featureCacheClient;

    @Autowired
    private UserBehaviorRepository userBehaviorRepository;

    public FeatureValue computeUserHourlyPurchase(String userId) {
        String featureKey = "feature:user_hourly_purchase:" + userId;

        // 1. 尝试从缓存获取
        Optional<FeatureValue> cachedValue = featureCacheClient.get(featureKey);
        if (cachedValue.isPresent()) {
            return cachedValue.get();
        }

        // 2. 缓存未命中,执行昂贵的计算
        // 这是问题的核心:多个线程会同时执行到这里
        log.warn("Cache miss for feature key: {}. Triggering expensive computation.", featureKey);
        List<PurchaseRecord> records = userBehaviorRepository.findRecentPurchases(userId, Duration.ofHours(1));
        
        BigDecimal totalAmount = records.stream()
                .map(PurchaseRecord::getAmount)
                .reduce(BigDecimal.ZERO, BigDecimal::add);

        FeatureValue result = new FeatureValue(userId, "user_hourly_purchase", totalAmount, System.currentTimeMillis());

        // 3. 将结果回写缓存
        featureCacheClient.set(featureKey, result, Duration.ofMinutes(5));

        return result;
    }
}

这段代码在低并发下工作正常,但在高并发场景下,log.warn 这行日志会在同一毫秒内被不同实例上的多个线程打印出来,每一个打印都代表着一次对数据库的昂贵查询和重复计算。

一次 Code Review 引发的重构

在问题复盘的 Code Review 会议上,团队迅速定位了问题根源。有人提出用本地锁 synchronizedReentrantLock 解决,但这立刻被否决。我们的计算服务是无状态且水平扩展的,部署在多个 Pod 中,本地锁只能保证单个 JVM 内部的线程安全,无法解决跨实例的竞争。

这时,引入分布式锁的方案被提上日程。技术选型讨论主要围绕 ZooKeeper 和 Redis。

  • ZooKeeper: 强一致性,通过创建临时有序节点实现锁。优点是可靠性极高,具备 watch 机制,能有效避免死锁。缺点是性能开销相对较大,其复杂的CP模型对于我们这种高吞吐、低延迟的锁场景来说,有点“杀鸡用牛刀”。
  • Redis: 基于其单线程模型和原子操作(如 SETNX)实现。优点是性能极高,部署轻量,非常适合用作高并发场景下的锁服务。缺点是实现一个健壮的锁需要考虑很多细节,比如锁超时、原子性、可重入性等。

考虑到我们的技术栈中已经重度使用了 Redis 作为缓存,且场景追求的是极致性能而非绝对的强一致性(一次计算失败或短暂的锁获取失败可以接受,由下一个请求重试即可),我们最终选择了基于 Redis 实现分布式锁。

我们的目标是:在计算前,先获取一个与“用户ID+特征名”绑定的锁。只有成功获取锁的那个线程,才能执行计算逻辑。其他线程要么快速失败,要么短暂等待后重试。

构建一个生产级的 Redis 分布式锁

直接使用 SETNX 是不够的。一个生产级的分布式锁必须解决以下几个核心问题:

  1. 原子性: 获取锁(SETNX)和设置过期时间(EXPIRE``)必须是原子操作,否则 SETNX` 成功后服务崩溃,锁将永不释放。
  2. 防误删: 锁的 value 应该是一个唯一的标识(如 UUID + 线程ID),解锁时必须验证这个标识,防止线程A误删了线程B的锁。
  3. 防死锁: 必须设置合理的过期时间,即使服务宕机,锁也能在一定时间后自动释放。
  4. 高可用: Redis 本身需要是高可用的(如 Sentinel 或 Cluster 模式)。

基于这些原则,我们设计了DistributedLock接口和其RedisDistributedLock实现。

// Lock Interface
public interface DistributedLock {
    /**
     * 尝试获取锁
     * @param lockKey 锁的唯一标识
     * @param requestId 请求ID,用于防止误解锁
     * @param expireTimeMillis 锁的过期时间(毫秒)
     * @return true 如果成功获取锁,否则 false
     */
    boolean tryLock(String lockKey, String requestId, long expireTimeMillis);

    /**
     * 释放锁
     * @param lockKey 锁的唯一标识
     * @param requestId 请求ID,必须与加锁时一致
     * @return true 如果成功释放锁,否则 false
     */
    boolean unlock(String lockKey, String requestId);
}

使用 Lua 脚本是保证原子性的最佳实践,因为 Redis 会保证单个 Lua 脚本的执行是原子的。

// RedisDistributedLock Implementation
@Component
public class RedisDistributedLock implements DistributedLock {

    private static final Logger log = LoggerFactory.getLogger(RedisDistributedLock.class);

    private final StringRedisTemplate stringRedisTemplate;
    private final DefaultRedisScript<Long> unlockScript;

    // LUA script for atomic unlock operation.
    // It checks if the key exists and its value matches the provided requestId.
    private static final String UNLOCK_LUA_SCRIPT =
            "if redis.call('get', KEYS[1]) == ARGV[1] then " +
            "    return redis.call('del', KEYS[1]) " +
            "else " +
            "    return 0 " +
            "end";

    public RedisDistributedLock(StringRedisTemplate stringRedisTemplate) {
        this.stringRedisTemplate = stringRedisTemplate;
        this.unlockScript = new DefaultRedisScript<>(UNLOCK_LUA_SCRIPT, Long.class);
    }

    @Override
    public boolean tryLock(String lockKey, String requestId, long expireTimeMillis) {
        try {
            // Redis SET command with NX and PX options provides atomicity for lock acquisition and expiration setting.
            // SET key value [NX | XX] [GET] [EX seconds | PX milliseconds | EXAT unix-time-seconds | PXAT unix-time-milliseconds | KEEPTTL]
            // NX -- Only set the key if it does not already exist.
            // PX -- Set the specified expire time, in milliseconds.
            Boolean success = stringRedisTemplate.execute(
                (RedisConnection connection) -> connection.set(
                        lockKey.getBytes(StandardCharsets.UTF_8),
                        requestId.getBytes(StandardCharsets.UTF_8),
                        Expiration.from(expireTimeMillis, TimeUnit.MILLISECONDS),
                        RedisStringCommands.SetOption.ifAbsent()
                ),
                true // `execute` in transaction, though for a single command it's not strictly necessary but good practice
            );
            return Boolean.TRUE.equals(success);
        } catch (Exception e) {
            log.error("Error acquiring distributed lock for key: {}", lockKey, e);
            // In case of Redis connection issues, we treat it as lock acquisition failure.
            // This prevents the application from proceeding without a lock.
            return false;
        }
    }

    @Override
    public boolean unlock(String lockKey, String requestId) {
        try {
            // Execute the Lua script to ensure atomic check-and-delete.
            Long result = stringRedisTemplate.execute(
                    unlockScript,
                    Collections.singletonList(lockKey),
                    requestId
            );
            // result == 1 means the key was deleted, 0 means it was not (either key not found or value mismatch).
            return Long.valueOf(1L).equals(result);
        } catch (Exception e) {
            log.error("Error releasing distributed lock for key: {}", lockKey, e);
            // If unlock fails due to Redis issues, the lock will eventually expire.
            // The main risk here is a slight delay in another process acquiring the lock.
            return false;
        }
    }
}

服务发现与架构整合

我们的特征计算服务本身是作为微服务存在的,通过服务发现机制(我们使用的是 Nacos)注册自身,并由上游的网关进行路由。引入分布式锁后,整个架构的交互流程变得更加清晰。

graph TD
    A[Client Request] --> B{API Gateway};
    B --> C[Feature Computation Service Instance 1];
    B --> D[Feature Computation Service Instance 2];
    B --> E[... Instance N];

    subgraph Feature Computation Cluster
        C -- Registers with --> F((Nacos));
        D -- Registers with --> F;
        E -- Registers with --> F;
    end
    
    C -- 1. tryLock(feature_key) --> G((Redis));
    D -- 2. tryLock(feature_key) fails --> G;
    
    G -- Lock acquired --> C;
    C -- 3. Compute feature --> H[Database];
    H -- Data --> C;
    C -- 4. Write to cache & unlock --> G;
    C --> B;
    
    D -- Waits or returns empty --> B;

这个架构中,Nacos 负责服务实例的注册与发现,保证了计算服务的高可用和可扩展性。Redis 则扮演了“协调者”的角色,确保在任意时刻,只有一个服务实例在为特定特征执行计算。

重构后的特征计算服务

我们将 RedisDistributedLock 注入到 FeatureComputationService 中,并重构了核心计算逻辑。

// V2 - Refactored with Distributed Lock
@Service
public class FeatureComputationServiceV2 {

    private static final Logger log = LoggerFactory.getLogger(FeatureComputationServiceV2.class);
    
    private static final long LOCK_EXPIRE_MILLIS = 10_000; // 10 seconds expiration
    private static final long LOCK_WAIT_MILLIS = 100; // Wait 100ms before retrying cache

    @Autowired
    private FeatureCacheClient featureCacheClient;

    @Autowired
    private UserBehaviorRepository userBehaviorRepository;

    @Autowired
    private DistributedLock distributedLock;

    public FeatureValue computeUserHourlyPurchase(String userId) {
        String featureName = "user_hourly_purchase";
        String featureKey = "feature:" + featureName + ":" + userId;
        String lockKey = "lock:feature_computation:" + featureName + ":" + userId;

        // 1. 尝试从缓存获取
        Optional<FeatureValue> cachedValue = featureCacheClient.get(featureKey);
        if (cachedValue.isPresent()) {
            return cachedValue.get();
        }

        // 2. 缓存未命中,尝试获取分布式锁
        String requestId = UUID.randomUUID().toString();
        boolean lockAcquired = distributedLock.tryLock(lockKey, requestId, LOCK_EXPIRE_MILLIS);

        if (lockAcquired) {
            // 2a. 成功获取锁
            log.info("Lock acquired for key: {}. Starting feature computation.", lockKey);
            try {
                // Double-check cache, in case another thread finished computation just before we got the lock.
                // This is a crucial optimization to avoid redundant work.
                cachedValue = featureCacheClient.get(featureKey);
                if (cachedValue.isPresent()) {
                    log.info("Feature found in cache after acquiring lock. Computation skipped.");
                    return cachedValue.get();
                }

                // Execute expensive computation
                List<PurchaseRecord> records = userBehaviorRepository.findRecentPurchases(userId, Duration.ofHours(1));
                BigDecimal totalAmount = records.stream()
                        .map(PurchaseRecord::getAmount)
                        .reduce(BigDecimal.ZERO, BigDecimal::add);
                FeatureValue result = new FeatureValue(userId, featureName, totalAmount, System.currentTimeMillis());

                // Write result to cache
                featureCacheClient.set(featureKey, result, Duration.ofMinutes(5));
                return result;

            } finally {
                // 3. 无论如何,必须释放锁
                if (!distributedLock.unlock(lockKey, requestId)) {
                    // This could happen if the lock expired due to long computation.
                    // It's a signal that LOCK_EXPIRE_MILLIS might be too short.
                    log.warn("Failed to unlock or lock expired for key: {}. RequestId: {}", lockKey, requestId);
                } else {
                    log.info("Lock released for key: {}", lockKey);
                }
            }
        } else {
            // 2b. 获取锁失败,说明有其他实例正在计算
            log.warn("Failed to acquire lock for key: {}. Another instance is likely computing.", lockKey);
            // Strategy: wait for a short period and then try reading from cache again.
            // This is a simple polling mechanism. For more complex scenarios, a pub/sub model could be used.
            try {
                Thread.sleep(LOCK_WAIT_MILLIS);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            return featureCacheClient.get(featureKey).orElse(FeatureValue.EMPTY); // Return empty or a default value if still not available
        }
    }
}

这段重构后的代码有几个关键改进:

  1. 加锁保护: 整个计算和缓存回写的过程被分布式锁保护起来。
  2. 双重检查锁定 (Double-Checked Locking): 获取锁后,再次检查缓存。这是因为在线程A等待锁的过程中,持有锁的线程B可能已经完成了计算并填充了缓存。这一步避免了不必要的重复计算。
  3. finally 块中释放锁: 保证了即使计算过程中抛出异常,锁也能被正确释放,避免死锁。
  4. 锁获取失败策略: 当获取锁失败时,我们没有直接返回失败,而是选择等待一小段时间后再次尝试从缓存读取。这是一种优雅降级,虽然会增加一点点响应延迟,但大大提高了数据获取的成功率,避免了将计算压力传递给调用方。
  5. 唯一请求ID: 使用 UUID 作为 requestId,确保了解锁操作的安全性,防止一个线程释放了另一个线程持有的锁(例如,由于GC暂停导致锁过期后,旧的解锁命令才到达Redis)。

遗留的挑战与未来优化路径

当前的实现已经解决了最核心的计算风暴问题,让系统在高并发下恢复了稳定。但在真实项目中,没有完美的方案,只有不断的权衡与迭代。

  1. 锁的公平性与性能: 当前的实现是非公平锁,任何客户端都可以抢占。在高竞争环境下,可能导致某些请求“饿死”。同时,失败的请求会进行短暂的Thread.sleep,这在超高并发下会消耗大量线程资源。更优化的方式可以引入异步模型,或者使用 Redis 的 Pub/Sub 机制,让等待的线程订阅一个 channel,计算完成后由持锁线程发布消息来唤醒它们,避免无效轮询。

  2. 锁的可重入性: 我们实现的锁是不可重入的。如果未来的业务逻辑变得复杂,一个持有锁的方法需要调用另一个也需要相同锁的方法,就会导致死锁。实现可重入锁需要维护一个持有计数,这会增加锁实现的复杂度。

  3. Redisson 的启发: 像 Redisson 这样的成熟框架提供了更完善的分布式锁实现,例如它内置了“看门狗”(Watchdog)机制。客户端在获取锁后,会有一个后台线程定时延长锁的过期时间,只要客户端存活,锁就不会因为业务执行时间过长而过期。这解决了手动估算过期时间的难题。在未来的迭代中,与其重复造轮子,不如考虑直接引入并封装 Redisson,将团队精力聚焦于业务逻辑本身。

  4. 服务发现与负载均衡: 虽然 Nacos 解决了服务注册发现,但流量均匀地打到各个实例上,反而加剧了锁的竞争。未来可以探索一种更智能的路由策略,例如基于用户ID的哈希,将同一用户的请求尽可能路由到同一个实例,从而将分布式锁的竞争降级为本地锁,进一步提升性能。但这会破坏服务的无状态性,需要仔细评估其带来的复杂性。


  目录