Home > Archives > 【分布式锁实战】Zookeeper分布式锁实现剖析

【分布式锁实战】Zookeeper分布式锁实现剖析

Published on

在单进程应用中,如果多个线程同时访问共享变量时,在Java中我们通常会使用内置锁(synchronized)或显示锁(Lock)来协调多个线程的访问。但如果多个进程想要同时访问某个共享资源,但同一时刻只允许某个进程去访问该资源,这时候我们就需要一个外部协调者,来管理这些进程的操作,也就是我们下面需要提到的分布式锁。

Zookeeper高级应用中就提到了分布式锁的实现,下面我们以写锁(write lock)实现为例,通过具体代码来讲解整个过程。

具体实现

首先我们需要注意的是,每一个ZookeeperClient是和对应的进程绑定的,也就是进程获取锁和释放锁的过程都是由对应的ZookeeperClient来实现的。在实际实现中,我只使用了主线程,所以也就是主线程去负责获取锁和释放锁。

写锁(write lock)一般也称为排它锁(mutex),所以某一个时刻只能有一个进程获取该锁,而其它进程则要等待持有锁的进程释放之后才能再次尝试获取。

在Zookeeper中主要是基于Znode去实现的,总结起来就是在创建的Znode中总是index(其实就是EPHEMERAL_SEQUENTIAL源码中提到的monotonically increasing number)最小的那个获得锁,由于是瞬时节点,所以导致各个进程轮流获取锁。

获取锁的过程

// 核心代码
public String attemptLock() throws Exception {
    boolean hasTheLock = false;
    boolean doDeletePath = false;
    
    String path = String.format("%s/%s", basePath, lockName);
    String ourPath = null;
    
    try {
      // /_locknode_/lock-0000000001
      ourPath = createsTheLock(path);
    
      // try to get the lock until we can not    
      while (!hasTheLock) {
        List<String> children = zooKeeper.getChildren(basePath, false);
        LockCheckResult lockCheckResult = getLockCheckResult(children, ourPath);
    
        if (lockCheckResult.isLocked()) {
          System.out.println(
              String.format(
                  "Thread %s in %s get lock on path",
                  Thread.currentThread().getName(), nameForTest, ourPath
              )
          );
          return ourPath;
        } else {
          String nextPathToWatch = lockCheckResult.getPathToWatch();
          System.out.println(" nextPathToWatch " + nextPathToWatch);
    
          // remember we have to get the object monitor before call wait()
          synchronized (this) {
            try {
              zooKeeper.getData(nextPathToWatch, watcher, null);
              String msg =
                  String.format(
                      "Thread %s in %s enters lock-waiting pool",
                      Thread.currentThread().getName(), nameForTest
                  );
              System.out.println(msg);
              wait();
            } catch (KeeperException.NoNodeException e) {
              // If the nextPathToWatch is gone, try next round of lock
            }
          }
        }
      }
      return ourPath;
    } catch (Exception e) {
      doDeletePath = true;
      throw e;
    } finally {
      if (doDeletePath) {
        deletePath(ourPath);
      }
    }
}

释放锁的过程

释放锁就相对简单一些,拥有锁的进程直接将对应的路径删除或者是对应的ZookeeperClient退出、Session过期时,路径会被Zookeeper Quoram自动删掉都会导致锁释放。无论如何,一点要确保锁被释放,以避免发生死锁。

本文实现的完整版,可参考DistributedMutexLock.java

实现过程中需要注意的问题

1.在各种Zookeeper节点操作的时候一定要注意org.apache.zookeeper.KeeperException的处理

2.由于当前进程(在本例中就是主线程)需要被挂起,等待唤醒。一定要记得先获取对象监视器锁,然后再进行操作,也就是上面的synchronized (this)

3.上面强调过路径删除(即锁释放)的重要性,所以要确保无论是异常抛出还是人为终止(如Ctrl + C),都需要确保删除操作执行; 异常可以通过finally来解决,人为终止可以通过注册ShutdownHook来解决,不过这种情况只能关闭ZookeeperClient,不能释放锁,因为ShutdownHook是在另外的线程中执行的。

private static AtomicBoolean zkClosed = new AtomicBoolean(false);

private static void registerShutdownHook(ZooKeeper zk) {
    Runtime.getRuntime().addShutdownHook(new Thread(() -> {
      System.out.println("Clean up in the shut down hook !!!");
      
      // make sure we do not run zk.close twice
      if (zkClosed.compareAndSet(false, true)) {
        if (zk != null) {
          try {
            zk.close();
          } catch (Exception e) {
            // just ignore
          }
        }
      }
    }));
}

本质上,自己借助Zookeeper实现分布式锁,只是为了理解官网提到的实现机制。在实际开发中,如果遇到需要使用分布式锁的场景,不妨尝试一下apache curator,Zookeeper的客户端框架,有点类似于Guava之于Java。另外需要注意,Curator和Zookeeper版本兼容问题

参考

> 分布式锁实现

> twitter commons 通过Zookeeper实现分布式锁

声明: 本文采用 BY-NC-SA 授权。转载请注明转自: Allen写字的地方