Redisson的强大之处在于完美的实现了分布式锁和同步器,不需要我们再考虑怎么设计分布式锁的可重入?怎么保证分布式锁的公平性?如何实现一个分布式读写锁?怎么实现分布式的信号量和闭锁?这些在Redisson中都已经帮我们实现好了。先看一下最常用的lock的使用:
@RestController
public class TestController {
@Autowired
private RedissonClient client;
@RequestMapping("/test")
public String test(){
RLock anyLock = client.getLock("anyLock");
anyLock.lock();
return "success";
}
}
上面的demo获取到一个lock不去释放。我们打开一个浏览器请求这个controller返回success后,再打开一个窗口重新请求,发现一直等待无法返回结果。查看redis:
10.150.27.139:6380> hgetall anyLock
1) "c5745dc6-3105-4d60-9d5d-e39258714c31:38"
2) "1"
删除了这个key后就可以成功执行了。在设计分布式锁我们一般都要考虑锁的释放。因为如果获取到锁而线程出现异常或者系统故障,会导致这个锁无法释放。自己实现redis的锁的话会给这个key一个过期时间以避免死锁的发生。Redisson默认的锁的过期时间为30s。如果这个期间任务并没有执行完,而锁已经过期了不就出问题了吗?Redisson这里有一个watch dog,看一下lock()方法的代码:
private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
long threadId = Thread.currentThread().getId();
Long ttl = tryAcquire(leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
return;
}
RFuture<RedissonLockEntry> future = subscribe(threadId);
if (interruptibly) {
commandExecutor.syncSubscriptionInterrupted(future);
} else {
commandExecutor.syncSubscription(future);
}
try {
while (true) {
ttl = tryAcquire(leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
break;
}
// waiting for message
if (ttl >= 0) {
try {
future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
if (interruptibly) {
throw e;
}
future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
}
} else {
if (interruptibly) {
future.getNow().getLatch().acquire();
} else {
future.getNow().getLatch().acquireUninterruptibly();
}
}
}
} finally {
unsubscribe(future, threadId);
}
// get(lockAsync(leaseTime, unit));
}
看一下tryAcquireAsync方法
private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, long threadId) {
if (leaseTime != -1) {
return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
}
RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
if (e != null) {
return;
}
// lock acquired
if (ttlRemaining == null) {
scheduleExpirationRenewal(threadId);
}
});
return ttlRemainingFuture;
}
如果lock指定过期时间,那么直接执行tryLockInnerAsync,tryLockInnerAsync方法是一段lua脚本,如下:
eval "if (redis.call('exists', KEYS[1]) == 0) then redis.call('hset', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; return redis.call('pttl', KEYS[1]);" 1 anyLock 30000 4a23dfaa-9d98-4f4c-9c6a-8966b28e1a95:31
先判断anyLock 这个key是否存在,不存在则执行hset anyLock 4a23dfaa-9d98-4f4c-9c6a-8966b28e1a95:31 1结束。否则判断anyLock这个hash中4a23dfaa-9d98-4f4c-9c6a-8966b28e1a95:31元素是否存在,如果存在则说明是重入锁,累加重入次数,重置key的失效时间为30s,结束。否则说明anyLock已经被其他线程获取,这里直接返回anyLock的失效时间。该方法是一个基于Future的异步方法。这里类似于JS通过Promise来实现异步操作的模式。在onComplete中执行了一个BiConsumer,这个函数会启动失效检查:
private void scheduleExpirationRenewal(long threadId) {
ExpirationEntry entry = new ExpirationEntry();
ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);
if (oldEntry != null) {
oldEntry.addThreadId(threadId);
} else {
entry.addThreadId(threadId);
renewExpiration();
}
}
上面代码会将该线程放入到一个concurrentmap中,并执行renewExpiration方法。
private void renewExpiration() {
ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ee == null) {
return;
}
Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ent == null) {
return;
}
Long threadId = ent.getFirstThreadId();
if (threadId == null) {
return;
}
RFuture<Boolean> future = renewExpirationAsync(threadId);
future.onComplete((res, e) -> {
if (e != null) {
log.error("Can't update lock " + getName() + " expiration", e);
return;
}
if (res) {
// reschedule itself
renewExpiration();
}
});
}
}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
ee.setTimeout(task);
}
上面的方法会生成一个timertask来检查concurrentmap中的key是否存在,如果存在说明该线程还没有释放掉锁,则会更新锁的过期时间,该方法以一种异步递归的方式循环执行。
返回到lock方法,如果返回的ttl>0,则会进入while循环中一直尝试获取,达到了阻塞的目的。