博主
258
258
258
258
专辑

第五节 Redisson分布式锁与同步器

亮子 2023-09-27 08:23:22 6545 0 0 0

Redisson的强大之处在于完美的实现了分布式锁和同步器,不需要我们再考虑怎么设计分布式锁的可重入?怎么保证分布式锁的公平性?如何实现一个分布式读写锁?怎么实现分布式的信号量和闭锁?这些在Redisson中都已经帮我们实现好了。先看一下最常用的lock的使用:

@RestController
public class TestController {
    @Autowired
    private RedissonClient client;

    @RequestMapping("/test")
    public String test(){
        RLock anyLock = client.getLock("anyLock");
        anyLock.lock();
        return "success";
    }

}

上面的demo获取到一个lock不去释放。我们打开一个浏览器请求这个controller返回success后,再打开一个窗口重新请求,发现一直等待无法返回结果。查看redis:

10.150.27.139:6380> hgetall anyLock
1) "c5745dc6-3105-4d60-9d5d-e39258714c31:38"
2) "1"

删除了这个key后就可以成功执行了。在设计分布式锁我们一般都要考虑锁的释放。因为如果获取到锁而线程出现异常或者系统故障,会导致这个锁无法释放。自己实现redis的锁的话会给这个key一个过期时间以避免死锁的发生。Redisson默认的锁的过期时间为30s。如果这个期间任务并没有执行完,而锁已经过期了不就出问题了吗?Redisson这里有一个watch dog,看一下lock()方法的代码:

private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
        long threadId = Thread.currentThread().getId();
        Long ttl = tryAcquire(leaseTime, unit, threadId);
        // lock acquired
        if (ttl == null) {
            return;
        }

        RFuture<RedissonLockEntry> future = subscribe(threadId);
        if (interruptibly) {
            commandExecutor.syncSubscriptionInterrupted(future);
        } else {
            commandExecutor.syncSubscription(future);
        }

        try {
            while (true) {
                ttl = tryAcquire(leaseTime, unit, threadId);
                // lock acquired
                if (ttl == null) {
                    break;
                }

                // waiting for message
                if (ttl >= 0) {
                    try {
                        future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                    } catch (InterruptedException e) {
                        if (interruptibly) {
                            throw e;
                        }
                        future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                    }
                } else {
                    if (interruptibly) {
                        future.getNow().getLatch().acquire();
                    } else {
                        future.getNow().getLatch().acquireUninterruptibly();
                    }
                }
            }
        } finally {
            unsubscribe(future, threadId);
        }
//        get(lockAsync(leaseTime, unit));
    }

看一下tryAcquireAsync方法

private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, long threadId) {
        if (leaseTime != -1) {
            return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
        }
        RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
        ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
            if (e != null) {
                return;
            }

            // lock acquired
            if (ttlRemaining == null) {
                scheduleExpirationRenewal(threadId);
            }
        });
        return ttlRemainingFuture;
    }

如果lock指定过期时间,那么直接执行tryLockInnerAsync,tryLockInnerAsync方法是一段lua脚本,如下:

eval "if (redis.call('exists', KEYS[1]) == 0) then redis.call('hset', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; return redis.call('pttl', KEYS[1]);" 1 anyLock 30000 4a23dfaa-9d98-4f4c-9c6a-8966b28e1a95:31

先判断anyLock 这个key是否存在,不存在则执行hset anyLock 4a23dfaa-9d98-4f4c-9c6a-8966b28e1a95:31 1结束。否则判断anyLock这个hash中4a23dfaa-9d98-4f4c-9c6a-8966b28e1a95:31元素是否存在,如果存在则说明是重入锁,累加重入次数,重置key的失效时间为30s,结束。否则说明anyLock已经被其他线程获取,这里直接返回anyLock的失效时间。该方法是一个基于Future的异步方法。这里类似于JS通过Promise来实现异步操作的模式。在onComplete中执行了一个BiConsumer,这个函数会启动失效检查:

private void scheduleExpirationRenewal(long threadId) {
        ExpirationEntry entry = new ExpirationEntry();
        ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);
        if (oldEntry != null) {
            oldEntry.addThreadId(threadId);
        } else {
            entry.addThreadId(threadId);
            renewExpiration();
        }
    }

上面代码会将该线程放入到一个concurrentmap中,并执行renewExpiration方法。

private void renewExpiration() {
        ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
        if (ee == null) {
            return;
        }
        
        Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
            @Override
            public void run(Timeout timeout) throws Exception {
                ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
                if (ent == null) {
                    return;
                }
                Long threadId = ent.getFirstThreadId();
                if (threadId == null) {
                    return;
                }
                
                RFuture<Boolean> future = renewExpirationAsync(threadId);
                future.onComplete((res, e) -> {
                    if (e != null) {
                        log.error("Can't update lock " + getName() + " expiration", e);
                        return;
                    }
                    
                    if (res) {
                        // reschedule itself
                        renewExpiration();
                    }
                });
            }
        }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
        
        ee.setTimeout(task);
    }

上面的方法会生成一个timertask来检查concurrentmap中的key是否存在,如果存在说明该线程还没有释放掉锁,则会更新锁的过期时间,该方法以一种异步递归的方式循环执行。
返回到lock方法,如果返回的ttl>0,则会进入while循环中一直尝试获取,达到了阻塞的目的。

参考文章