redis 分布式锁 RedissonLock
版本
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>2.9.0</version>
</dependency>
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.4.3</version>
</dependency>
1.加锁原理
加锁其实是通过一段 lua 脚本实现的
加锁:主要是判断锁是否存在,存在就设置过期时间,如果锁已经存在了,那对比一下线程,线程是一个那就证明可以重入,锁在了,但是不是当前线程,证明别人还没释放,那就把剩余时间返回,加锁失败
redis 存放的是用户指定的 key,key 的 类型是 hash,hash 的 field 是 (UUID : ThreadId),value 是 1,过期时间默认 30s
相当于执行:
hset redlock 0654df0e-25c4-4e08-a586-cf46d40fd62b:35 1
pexpire redlock 30000
源码:
// RedissonLock.java
<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
internalLockLeaseTime = unit.toMillis(leaseTime);
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('hset', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"return redis.call('pttl', KEYS[1]);",
Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
}
2.锁互斥机制
流程:
- 尝试获取锁,返回 null 则说明加锁成功,返回一个数值,则说明已经存在该锁,ttl 为锁的剩余存活时间。
- 如果此时客户端 2 进程获取锁失败,那么使用客户端 2 的线程 id(其实本质上就是进程 id)通过 Redis 的 channel 订阅锁释放的事件。如果等待的过程中一直未等到锁的释放事件通知,当超过最大等待时间则获取锁失败,返回 false。如果等到了锁的释放事件的通知,则开始进入一个不断重试获取锁的循环。
- 循环中每次都先试着获取锁,并得到已存在的锁的剩余存活时间。如果在重试中拿到了锁,则直接返回。如果锁当前还是被占用的,那么等待释放锁的消息,具体实现使用了 JDK 的信号量 Semaphore 来阻塞线程,当锁释放并发布释放锁的消息后,信号量的 release() 方法会被调用,此时被信号量阻塞的等待队列中的一个线程就可以继续尝试获取锁了。
注意:分布式锁的一个关键点,当锁正在被占用时,等待获取锁的进程并不是通过一个 while(true) 死循环去获取锁,而是利用了 Redis 的发布订阅机制,通过 await 方法阻塞等待锁的进程,有效的解决了无效的锁申请浪费资源的问题。
3.锁的续期机制
Redisson 提供了一个续期机制, 只要线程 一旦加锁成功,就会启动一个 Watch Dog
Watch Dog 机制其实就是一个后台定时任务线程,获取锁成功之后,然后每隔 10 秒 (internalLockLeaseTime / 3) 检查一下,如果线程还持有锁 key(判断线程是否还持有 key,其实根据线程 id 去 Redis 中查,如果存在就会延长 key 的时间),那么就会不断的延长锁 key 的生存时间。
注意:如果服务宕机了,Watch Dog 机制线程也就没有了,此时就不会延长 key 的过期时间,到了 30s 之后就会自动过期了,其他线程就可以获取到锁。
// RedissonLock.java
private void scheduleExpirationRenewal(final long threadId) {
if (expirationRenewalMap.containsKey(getEntryName())) {
return;
}
Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
RFuture<Boolean> future = commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return 1; " +
"end; " +
"return 0;",
Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
future.addListener(new FutureListener<Boolean>() {
@Override
public void operationComplete(Future<Boolean> future) throws Exception {
expirationRenewalMap.remove(getEntryName());
if (!future.isSuccess()) {
log.error("Can't update lock " + getName() + " expiration", future.cause());
return;
}
if (future.getNow()) {
// reschedule itself
scheduleExpirationRenewal(threadId);
}
}
});
}
}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
if (expirationRenewalMap.putIfAbsent(getEntryName(), task) != null) {
task.cancel();
}
}
4.可重入加锁机制
申请锁时,记录锁数量,并续期
相当于执行的是:
hexists redlock 0654df0e-25c4-4e08-a586-cf46d40fd62b:35
hincrby redlock 0654df0e-25c4-4e08-a586-cf46d40fd62b:35 1
pexpire redlock 3000
源码:
// RedissonLock.java
<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
internalLockLeaseTime = unit.toMillis(leaseTime);
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('hset', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"return redis.call('pttl', KEYS[1]);",
Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
}
5.释放锁
- 删除锁 (查看当前锁重入次数,并减1,如果重入次数为0,则删除锁)
- 广播释放锁的消息,通知阻塞等待的进程(向通道名为
redisson_lock__channel publish
一条unlockMessage 信息
,例如通道redisson_lock__channel:{redlock}
),重入次数为0,才广播释放锁消息, - 取消 Watch Dog 机制,即将 RedissonLock.expirationRenewalMap 里面的线程 id 删除,并且取消定时任务
// RedissonLock.java
protected RFuture<Boolean> unlockInnerAsync(long threadId) {
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; " +
"end;" +
"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
"return nil;" +
"end; " +
"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
"if (counter > 0) then " +
"redis.call('pexpire', KEYS[1], ARGV[2]); " +
"return 0; " +
"else " +
"redis.call('del', KEYS[1]); " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; "+
"end; " +
"return nil;",
Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.unlockMessage, internalLockLeaseTime, getLockName(threadId));
}
void cancelExpirationRenewal() {
Timeout task = expirationRenewalMap.remove(getEntryName());
if (task != null) {
task.cancel();
}
}
栗子
RLock 是 RedissonLock
private static Integer inventory = 1000;
private static final int NUM = 1000;
@Test
public void testRedissonLock() {
Config config = new Config();
config.useSingleServer()
.setAddress("redis://127.0.0.1:6379")
.setDatabase(0);
RedissonClient redissonClient = Redisson.create(config);
RLock lock = redissonClient.getLock("redlock");
ThreadPoolExecutor threadPoolExecutor =
new ThreadPoolExecutor(
inventory,
inventory,
10L,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>());
CyclicBarrier cyclicBarrier = new CyclicBarrier(100);
long start = System.currentTimeMillis();
for (int i = 0; i < NUM; i++) {
threadPoolExecutor.execute(() -> {
try {
cyclicBarrier.await();
lock.lock();
inventory--;
System.out.println(inventory);
lock.unlock();
} catch (Exception e) {
e.printStackTrace();
}
});
}
while (threadPoolExecutor.getActiveCount() > 0) {
Thread.yield();
}
long end = System.currentTimeMillis();
System.out.println(String.format("线程执行数:%s 总耗时:%s 库存书:%s",
NUM,
(end - start),
inventory));
threadPoolExecutor.shutdown();
}
非常好的博客!感谢