zk · 2022-04-03 0

zookeeper 分布式锁

版本

<dependency>
    <groupId>org.apache.zookeeper</groupId>
    <artifactId>zookeeper</artifactId>
    <version>3.6.0</version>
</dependency>

<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-recipes</artifactId>
    <version>5.1.0</version>
</dependency>

1.加锁

每次获取锁时会直接从本地缓存中先获取锁的元数据,如果存在,则在原有的计数器基础上+1,直接返回;否则尝试加锁

// InterProcessMutex.java
@Override
public void acquire() throws Exception
{
    if ( !internalLock(-1, null) )
    {
        throw new IOException("Lost connection while trying to acquire lock: " + basePath);
    }
}

private boolean internalLock(long time, TimeUnit unit) throws Exception
{
    /*
       Note on concurrency: a given lockData instance
       can be only acted on by a single thread so locking isn't necessary
    */

    Thread currentThread = Thread.currentThread();

    LockData lockData = threadData.get(currentThread);
    if ( lockData != null )
    {
        // re-entering
        lockData.lockCount.incrementAndGet();
        return true;
    }

    String lockPath = internals.attemptLock(time, unit, getLockNodeBytes());
    if ( lockPath != null )
    {
        LockData newLockData = new LockData(currentThread, lockPath);
        threadData.put(currentThread, newLockData);
        return true;
    }

    return false;
}

尝试加锁:

ourPath = driver.createsTheLock(client, path, localLockNodeBytes);: 创建临时顺序节点,例如节点为 /lock/_c_3299a72a-4474-4477-beda-c5865a458e8d-lock-0000001201 (c + UUID + - + lock)

hasTheLock = internalLockLoop(startMillis, millisToWait, ourPath);:这里主要是循环获取锁的过程,首先是判断是否实现了revocable接口,如果实现了那么就对这个path设置监听,否则的话通过 StandardLockInternalsDriver 尝试得到 PredicateResults (主要是否得到锁及需要监视的目录的两个属性);

然后判断 PredicateResults 中的 pathToWatch (主要保存 sequenceNode )是否是最小的节点,如果是,则得到锁,getsTheLock为true,否则得到该序列的前一个节点,设为 pathToWatch,并监控起来;再判断获取锁的时间是否超时,超时则删除节点,不竞争下次锁,否则,睡眠等待获取锁;最后把获取的锁对象的锁路径等信息封装成 LockData 存储在本地缓存中.

// LockInternals.java
String attemptLock(long time, TimeUnit unit, byte[] lockNodeBytes) throws Exception
{
    final long      startMillis = System.currentTimeMillis();
    final Long      millisToWait = (unit != null) ? unit.toMillis(time) : null;
    final byte[]    localLockNodeBytes = (revocable.get() != null) ? new byte[0] : lockNodeBytes;
    int             retryCount = 0;

    String          ourPath = null;
    boolean         hasTheLock = false;
    boolean         isDone = false;
    while ( !isDone )
    {
        isDone = true;

        try
        {
            ourPath = driver.createsTheLock(client, path, localLockNodeBytes);
            hasTheLock = internalLockLoop(startMillis, millisToWait, ourPath);
        }
        catch ( KeeperException.NoNodeException e )
        {
            // gets thrown by StandardLockInternalsDriver when it can't find the lock node
            // this can happen when the session expires, etc. So, if the retry allows, just try it all again
            if ( client.getZookeeperClient().getRetryPolicy().allowRetry(retryCount++, System.currentTimeMillis() - startMillis, RetryLoop.getDefaultRetrySleeper()) )
            {
                isDone = false;
            }
            else
            {
                throw e;
            }
        }
    }

    if ( hasTheLock )
    {
        return ourPath;
    }

    return null;
}

private boolean internalLockLoop(long startMillis, Long millisToWait, String ourPath) throws Exception
{
    boolean     haveTheLock = false;
    boolean     doDelete = false;
    try
    {
        if ( revocable.get() != null )
        {
            client.getData().usingWatcher(revocableWatcher).forPath(ourPath);
        }

        while ( (client.getState() == CuratorFrameworkState.STARTED) && !haveTheLock )
        {
            List<String>        children = getSortedChildren();
            String              sequenceNodeName = ourPath.substring(basePath.length() + 1); // +1 to include the slash

            PredicateResults    predicateResults = driver.getsTheLock(client, children, sequenceNodeName, maxLeases);
            if ( predicateResults.getsTheLock() )
            {
                haveTheLock = true;
            }
            else
            {
                String  previousSequencePath = basePath + "/" + predicateResults.getPathToWatch();

                synchronized(this)
                {
                    try
                    {
                        // use getData() instead of exists() to avoid leaving unneeded watchers which is a type of resource leak
                        client.getData().usingWatcher(watcher).forPath(previousSequencePath);
                        if ( millisToWait != null )
                        {
                            millisToWait -= (System.currentTimeMillis() - startMillis);
                            startMillis = System.currentTimeMillis();
                            if ( millisToWait <= 0 )
                            {
                                doDelete = true;    // timed out - delete our node
                                break;
                            }

                            wait(millisToWait);
                        }
                        else
                        {
                            wait();
                        }
                    }
                    catch ( KeeperException.NoNodeException e )
                    {
                        // it has been deleted (i.e. lock released). Try to acquire again
                    }
                }
            }
        }
    }
    catch ( Exception e )
    {
        ThreadUtils.checkInterrupted(e);
        doDelete = true;
        throw e;
    }
    finally
    {
        if ( doDelete )
        {
            deleteOurPath(ourPath);
        }
    }
    return haveTheLock;
}

2.释放锁

从本地缓存中拿到锁对象,计数器-1,只有到那个计数器=0的时候才会去执 internals.releaseLock(lockData.lockPath);

// InterProcessMutex.java
@Override
public void release() throws Exception
{
    /*
        Note on concurrency: a given lockData instance
        can be only acted on by a single thread so locking isn't necessary
     */

    Thread currentThread = Thread.currentThread();
    LockData lockData = threadData.get(currentThread);
    if ( lockData == null )
    {
        throw new IllegalMonitorStateException("You do not own the lock: " + basePath);
    }

    int newLockCount = lockData.lockCount.decrementAndGet();
    if ( newLockCount > 0 )
    {
        return;
    }
    if ( newLockCount < 0 )
    {
        throw new IllegalMonitorStateException("Lock count has gone negative for lock: " + basePath);
    }
    try
    {
        internals.releaseLock(lockData.lockPath);
    }
    finally
    {
        threadData.remove(currentThread);
    }
}

首先移除watcher监听,这个监听可能是在循环获取锁的时候创建的,然后取消动作时触发动作时间置空,最后就是删除path

// LockInternals.java
final void releaseLock(String lockPath) throws Exception
{
    client.removeWatchers();
    revocable.set(null);
    deleteOurPath(lockPath);
}

3.总结

  1. curator的InterProcessLock接口提供了多种锁机制,互斥锁,读写锁,以及可定时数的互斥锁的机制(这个大家具体问题具体分析).
  2. 所有申请锁都会创建临时顺序节点,保证了都能够有机会去获取锁.
  3. 内部用了线程的wait()和notifyAll()这种等待机制,可以及时的唤醒最渴望得到锁的线程.避免常规利用Thread.sleep()这种无用的间隔等待机制.
  4. 利用redis做锁的时候,一般都需要做锁的有效时间限定。而curator则利用了zookeeper的临时顺序节点特性,一旦客户端失去连接后,则就会自动清除该节点.

栗子

private static Integer inventory = 1000;

private static final int NUM = 1000;

@Test
public void testLock() {
    String zkServerAddress = "127.0.0.1:2181";
    ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(1000, 3);
    CuratorFramework client = CuratorFrameworkFactory.builder()
            .connectString(zkServerAddress)
            .sessionTimeoutMs(5000)
            .connectionTimeoutMs(5000)
            .retryPolicy(retryPolicy)
            .build();

    client.start();

    String lockPath = "/lock";
    InterProcessMutex lock = new InterProcessMutex(client, lockPath);

    ThreadPoolExecutor threadPoolExecutor =
            new ThreadPoolExecutor(
                    inventory,
                    inventory,
                    10L,
                    TimeUnit.SECONDS,
                    new LinkedBlockingQueue<>());

    CyclicBarrier cyclicBarrier = new CyclicBarrier(100);
    long start = System.currentTimeMillis();

    for (int i = 0; i < NUM; i++) {
        threadPoolExecutor.execute(() -> {
            try {
                cyclicBarrier.await();
                lock.acquire();
                inventory--;
                System.out.println(inventory);
            } catch (Exception e) {
                e.printStackTrace();
            }finally {
                try {
                    lock.release();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        });
    }

    while (threadPoolExecutor.getActiveCount() > 0) {
        Thread.yield();
    }

    long end = System.currentTimeMillis();
    System.out.println(String.format("线程执行数:%s 总耗时:%s 库存书:%s",
            NUM,
            (end - start),
            inventory));

    threadPoolExecutor.shutdown();

    CloseableUtils.closeQuietly(client);
}