版本
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.6.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>5.1.0</version>
</dependency>
1.加锁
每次获取锁时会直接从本地缓存中先获取锁的元数据,如果存在,则在原有的计数器基础上+1,直接返回;否则尝试加锁
// InterProcessMutex.java
@Override
public void acquire() throws Exception
{
if ( !internalLock(-1, null) )
{
throw new IOException("Lost connection while trying to acquire lock: " + basePath);
}
}
private boolean internalLock(long time, TimeUnit unit) throws Exception
{
/*
Note on concurrency: a given lockData instance
can be only acted on by a single thread so locking isn't necessary
*/
Thread currentThread = Thread.currentThread();
LockData lockData = threadData.get(currentThread);
if ( lockData != null )
{
// re-entering
lockData.lockCount.incrementAndGet();
return true;
}
String lockPath = internals.attemptLock(time, unit, getLockNodeBytes());
if ( lockPath != null )
{
LockData newLockData = new LockData(currentThread, lockPath);
threadData.put(currentThread, newLockData);
return true;
}
return false;
}
尝试加锁:
ourPath = driver.createsTheLock(client, path, localLockNodeBytes);
: 创建临时顺序节点,例如节点为 /lock/_c_3299a72a-4474-4477-beda-c5865a458e8d-lock-0000001201
(c + UUID + - + lock)
hasTheLock = internalLockLoop(startMillis, millisToWait, ourPath);
:这里主要是循环获取锁的过程,首先是判断是否实现了revocable接口,如果实现了那么就对这个path设置监听,否则的话通过 StandardLockInternalsDriver 尝试得到 PredicateResults (主要是否得到锁及需要监视的目录的两个属性);
然后判断 PredicateResults 中的 pathToWatch (主要保存 sequenceNode )是否是最小的节点,如果是,则得到锁,getsTheLock为true,否则得到该序列的前一个节点,设为 pathToWatch,并监控起来;再判断获取锁的时间是否超时,超时则删除节点,不竞争下次锁,否则,睡眠等待获取锁;最后把获取的锁对象的锁路径等信息封装成 LockData 存储在本地缓存中.
// LockInternals.java
String attemptLock(long time, TimeUnit unit, byte[] lockNodeBytes) throws Exception
{
final long startMillis = System.currentTimeMillis();
final Long millisToWait = (unit != null) ? unit.toMillis(time) : null;
final byte[] localLockNodeBytes = (revocable.get() != null) ? new byte[0] : lockNodeBytes;
int retryCount = 0;
String ourPath = null;
boolean hasTheLock = false;
boolean isDone = false;
while ( !isDone )
{
isDone = true;
try
{
ourPath = driver.createsTheLock(client, path, localLockNodeBytes);
hasTheLock = internalLockLoop(startMillis, millisToWait, ourPath);
}
catch ( KeeperException.NoNodeException e )
{
// gets thrown by StandardLockInternalsDriver when it can't find the lock node
// this can happen when the session expires, etc. So, if the retry allows, just try it all again
if ( client.getZookeeperClient().getRetryPolicy().allowRetry(retryCount++, System.currentTimeMillis() - startMillis, RetryLoop.getDefaultRetrySleeper()) )
{
isDone = false;
}
else
{
throw e;
}
}
}
if ( hasTheLock )
{
return ourPath;
}
return null;
}
private boolean internalLockLoop(long startMillis, Long millisToWait, String ourPath) throws Exception
{
boolean haveTheLock = false;
boolean doDelete = false;
try
{
if ( revocable.get() != null )
{
client.getData().usingWatcher(revocableWatcher).forPath(ourPath);
}
while ( (client.getState() == CuratorFrameworkState.STARTED) && !haveTheLock )
{
List<String> children = getSortedChildren();
String sequenceNodeName = ourPath.substring(basePath.length() + 1); // +1 to include the slash
PredicateResults predicateResults = driver.getsTheLock(client, children, sequenceNodeName, maxLeases);
if ( predicateResults.getsTheLock() )
{
haveTheLock = true;
}
else
{
String previousSequencePath = basePath + "/" + predicateResults.getPathToWatch();
synchronized(this)
{
try
{
// use getData() instead of exists() to avoid leaving unneeded watchers which is a type of resource leak
client.getData().usingWatcher(watcher).forPath(previousSequencePath);
if ( millisToWait != null )
{
millisToWait -= (System.currentTimeMillis() - startMillis);
startMillis = System.currentTimeMillis();
if ( millisToWait <= 0 )
{
doDelete = true; // timed out - delete our node
break;
}
wait(millisToWait);
}
else
{
wait();
}
}
catch ( KeeperException.NoNodeException e )
{
// it has been deleted (i.e. lock released). Try to acquire again
}
}
}
}
}
catch ( Exception e )
{
ThreadUtils.checkInterrupted(e);
doDelete = true;
throw e;
}
finally
{
if ( doDelete )
{
deleteOurPath(ourPath);
}
}
return haveTheLock;
}
2.释放锁
从本地缓存中拿到锁对象,计数器-1,只有到那个计数器=0的时候才会去执 internals.releaseLock(lockData.lockPath);
// InterProcessMutex.java
@Override
public void release() throws Exception
{
/*
Note on concurrency: a given lockData instance
can be only acted on by a single thread so locking isn't necessary
*/
Thread currentThread = Thread.currentThread();
LockData lockData = threadData.get(currentThread);
if ( lockData == null )
{
throw new IllegalMonitorStateException("You do not own the lock: " + basePath);
}
int newLockCount = lockData.lockCount.decrementAndGet();
if ( newLockCount > 0 )
{
return;
}
if ( newLockCount < 0 )
{
throw new IllegalMonitorStateException("Lock count has gone negative for lock: " + basePath);
}
try
{
internals.releaseLock(lockData.lockPath);
}
finally
{
threadData.remove(currentThread);
}
}
首先移除watcher监听,这个监听可能是在循环获取锁的时候创建的,然后取消动作时触发动作时间置空,最后就是删除path
// LockInternals.java
final void releaseLock(String lockPath) throws Exception
{
client.removeWatchers();
revocable.set(null);
deleteOurPath(lockPath);
}
3.总结
- curator的InterProcessLock接口提供了多种锁机制,互斥锁,读写锁,以及可定时数的互斥锁的机制(这个大家具体问题具体分析).
- 所有申请锁都会创建临时顺序节点,保证了都能够有机会去获取锁.
- 内部用了线程的wait()和notifyAll()这种等待机制,可以及时的唤醒最渴望得到锁的线程.避免常规利用Thread.sleep()这种无用的间隔等待机制.
- 利用redis做锁的时候,一般都需要做锁的有效时间限定。而curator则利用了zookeeper的临时顺序节点特性,一旦客户端失去连接后,则就会自动清除该节点.
栗子
private static Integer inventory = 1000;
private static final int NUM = 1000;
@Test
public void testLock() {
String zkServerAddress = "127.0.0.1:2181";
ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(1000, 3);
CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString(zkServerAddress)
.sessionTimeoutMs(5000)
.connectionTimeoutMs(5000)
.retryPolicy(retryPolicy)
.build();
client.start();
String lockPath = "/lock";
InterProcessMutex lock = new InterProcessMutex(client, lockPath);
ThreadPoolExecutor threadPoolExecutor =
new ThreadPoolExecutor(
inventory,
inventory,
10L,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>());
CyclicBarrier cyclicBarrier = new CyclicBarrier(100);
long start = System.currentTimeMillis();
for (int i = 0; i < NUM; i++) {
threadPoolExecutor.execute(() -> {
try {
cyclicBarrier.await();
lock.acquire();
inventory--;
System.out.println(inventory);
} catch (Exception e) {
e.printStackTrace();
}finally {
try {
lock.release();
} catch (Exception e) {
e.printStackTrace();
}
}
});
}
while (threadPoolExecutor.getActiveCount() > 0) {
Thread.yield();
}
long end = System.currentTimeMillis();
System.out.println(String.format("线程执行数:%s 总耗时:%s 库存书:%s",
NUM,
(end - start),
inventory));
threadPoolExecutor.shutdown();
CloseableUtils.closeQuietly(client);
}