zk · 2021-09-13 0

zookeeper搭建及基本使用

一、创建zookeeper容器

1.拉取镜像

docker pull zookeeper:3.6.0

2.创建容器

docker run -itd -p 2181:2181 --name zookeeper-2181 zookeeper:3.6.0

3.进入容器

docker exec -itd zookeeper-2181 bash

二、命令操作zookeeper

连接zookeeper

./bin/zkCli.sh -server 127.0.0.1:2181

1.创建节点

create [-s] [-e] [-c] [-t ttl] path [data] [acl]

  • [-s] : 创建有序节点
  • [-e] : 创建临时节点
  • [-c] : 创建一个容器节点
  • [t ttl] : 创建一个TTL节点, -t 时间(单位毫秒)
  • path: 路径
  • [data]:节点的数据,可选,如果不使用时,节点数据就为null
  • [acl] :权限相关
# 创建持久化节点
create /node1  "11"

# 创建持久化有序节点
create -s /node2  "22"

# 创建临时节点
create -e /node3 "33"

# 创建临时有序节点
create -e -s /node "44"

2.查看节点

ls [-s] [-w] [-R] path

  • [-s] : 查看某一节点下的子节点加当前节点的元信息
  • [-w] :查看节点并为节点添加一个监听,当节点被修改时,该客户端会收到一个回调
ls /node1
ls -R /node1

3.获取值

get [-s] [-w] path

  • [-s] :查看节点数据加元信息
  • [-w] : 查看节点并为节点添加一个监听,当节点被修改时,该客户端会收到一个回调
get /node1

4.设置值

set [-s] [-v version] path data

  • [-s] :返回修改后节点的元信息
  • [-v version] :指定数据的版本,版本不符合时修改失败,类似关系型数据库的乐观锁
set /node1 "1234"

5.删除节点

delete [-v version] path

delete /node1

三、ZooKeeper操作zookeeper

maven引入

<dependency>
    <groupId>org.apache.zookeeper</groupId>
    <artifactId>zookeeper</artifactId>
    <version>3.6.0</version>
</dependency>

1.创建节点

@Test
public void testZKCreate() throws Exception {
    CountDownLatch latch = new CountDownLatch(1);

    ZooKeeper zk = new ZooKeeper("127.0.0.1", 10000, new Watcher() {
        @Override
        public void process(WatchedEvent event) {
            System.out.println(event);
            if (Event.KeeperState.SyncConnected == event.getState()) {
                latch.countDown();
            }
        }
    });

    // 等待zk连接成功,才继续执行
    latch.await();

    // 创建持久节点
    zk.create("/node1", "11".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
    // 创建持久序列节点
    zk.create("/node2", "22".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL);
    // 创建临死节点
    zk.create("/node3", "33".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
    // 创建临死序列节点
    zk.create("/node4", "44".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);

    zk.close();
}

2.获取值

@Test
public void testZKGet() throws Exception {
    CountDownLatch latch = new CountDownLatch(1);

    ZooKeeper zk = new ZooKeeper("127.0.0.1", 10000, new Watcher() {
        @Override
        public void process(WatchedEvent event) {
            if (Event.KeeperState.SyncConnected == event.getState()) {
                latch.countDown();
            }
        }
    });

    // 等待zk连接成功,才继续执行
    latch.await();

    byte[] data = zk.getData("/node1", null, null);
    System.out.println(new String(data));

    zk.close();
}

3.设置值

@Test
public void testZKSet() throws Exception {
    CountDownLatch latch = new CountDownLatch(1);

    ZooKeeper zk = new ZooKeeper("127.0.0.1", 10000, new Watcher() {
        @Override
        public void process(WatchedEvent event) {
            if (Event.KeeperState.SyncConnected == event.getState()) {
                latch.countDown();
            }
        }
    });

    // 等待zk连接成功,才继续执行
    latch.await();

    Stat stat = zk.setData("/node1", "1234".getBytes(), -1);
    System.out.println(stat);

    zk.close();
}

4.删除节点

@Test
public void testZKDelete() throws Exception {
    CountDownLatch latch = new CountDownLatch(1);

    ZooKeeper zk = new ZooKeeper("127.0.0.1", 10000, new Watcher() {
        @Override
        public void process(WatchedEvent event) {
            if (Event.KeeperState.SyncConnected == event.getState()) {
                latch.countDown();
            }
        }
    });

    // 等待zk连接成功,才继续执行
    latch.await();

    zk.delete("/node1", -1);

    zk.close();
}

5.监听节点

一个Watch事件是一个一次性的触发器,当被设置了Watch的数据发生了改变的时候,则服务器将这个改变发送给设置了Watch的客户端,以便通知它们

Zookeeper有数据监视和子数据监视 getdata() and exists() 设置数据监视,getchildren()设置了子节点监视

  • EventType.NodeCreated: 节点创建事件
  • EventType.NodeDeleted: 节点删除事件
  • EventType.NodeDataChanged: 节点数据内容变更
  • EventType.NodeChildrenChanged: 子节点数量发生变化, 新增或删除子节点. 子节点内容发生变化, 不会触发.
@Test
public void testZKWatch() throws Exception {
    CountDownLatch latch = new CountDownLatch(1);

    ZooKeeper zk = new ZooKeeper("127.0.0.1", 10000, new Watcher() {
        @Override
        public void process(WatchedEvent event) {
            // 连接状态
            Event.KeeperState keeperState = event.getState();
            // 事件类型
            Event.EventType eventType = event.getType();
            // 受影响的path
            String path = event.getPath();

            System.out.println(event);

            if (Event.KeeperState.Disconnected == keeperState) {
                System.out.println("与ZK服务器断开连接");
            }

            if (Event.KeeperState.AuthFailed == keeperState) {
                System.out.println("权限检查失败");
            }

            if (Event.KeeperState.Expired == keeperState) {
                System.out.println("会话失效");
            }

            if (Event.KeeperState.Closed == keeperState) {
                System.out.println("客户端关闭");
            }

            if (Event.KeeperState.SyncConnected == keeperState) {
                if (Event.EventType.None == eventType) {
                    System.out.println("成功连接上ZK服务器");
                    latch.countDown();
                }

                if (Event.EventType.NodeCreated == eventType) {
                    System.out.println("节点创建");
                }

                if (Event.EventType.NodeDataChanged == eventType) {
                    System.out.println("节点数据更新");
                }

                if (Event.EventType.NodeChildrenChanged == eventType) {
                    System.out.println("子节点变更");
                }

                if (Event.EventType.NodeDeleted == eventType) {
                    System.out.println("节点 " + path + " 被删除");
                }
            }
        }
    });

    // 等待zk连接成功,才继续执行
    latch.await();

    zk.exists("/node1", true);
    zk.setData("/node1", "1234".getBytes(),  -1);

    zk.exists("/node1/node11", true);
    zk.create("/node1/node11", "1234".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);

    zk.getChildren("/node1", true);
    zk.create("/node1/node12", "1234".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);

    zk.exists("/node1/node11", true);
    zk.delete("/node1/node11", -1);

    zk.getChildren("/node1", true);
    zk.delete("/node1/node12", -1);

    zk.close();
}

四、ZkClient操作zookeeper

maven引入

<dependency>
    <groupId>org.apache.zookeeper</groupId>
    <artifactId>zookeeper</artifactId>
    <version>3.6.0</version>
</dependency>

<dependency>
    <groupId>com.101tec</groupId>
    <artifactId>zkclient</artifactId>
    <version>0.11</version>
    <exclusions>
        <exclusion>
            <groupId>org.apache.zookeeper</groupId>
            <artifactId>zookeeper</artifactId>
        </exclusion>
    </exclusions>
</dependency>

1.创建节点

@Test
public void testZKClientCreate() {
    ZkClient zkClient = new ZkClient("127.0.0.1:2181", 10000, 40000, new BytesPushThroughSerializer());

    // 创建持久节点
    zkClient.createPersistent("/node1", "11".getBytes());
    // 创建持久序列节点
    zkClient.createPersistentSequential("/node2", "22".getBytes());
    // 创建临死节点
    zkClient.createEphemeral("/node3", "33".getBytes());
    // 创建临死序列节点
    zkClient.createEphemeralSequential("/node4", "44".getBytes());

    zkClient.close();
}

2.获取值

@Test
public void testZKClientGet() {
    ZkClient zkClient = new ZkClient("127.0.0.1:2181", 10000, 40000, new BytesPushThroughSerializer());

    byte[] data = zkClient.readData("/node1");
    System.out.println(new String(data));

    zkClient.close();
}

3.设置值

@Test
public void testZKClientSet() {
    ZkClient zkClient = new ZkClient("127.0.0.1:2181", 10000, 40000, new BytesPushThroughSerializer());

    zkClient.writeData("/node1", "1234".getBytes());

    zkClient.close();
}

4.删除节点

@Test
public void testZKClientDelete() {
    ZkClient zkClient = new ZkClient("127.0.0.1:2181", 10000, 40000, new BytesPushThroughSerializer());

    zkClient.delete("/node1");

    zkClient.close();
}

5.监听节点

@Test
public void testZKClientWatch() throws Exception {
    ZkClient zkClient = new ZkClient("127.0.0.1:2181", 10000, 10000, new BytesPushThroughSerializer());

    // 可以循环被调用
    zkClient.subscribeDataChanges("/node1", new IZkDataListener() {
        @Override
        public void handleDataChange(String s, Object o) throws Exception {
            System.out.println(String.format("1 监听到配置信息被修改 %s %s", s, new String((byte[])o)));
        }

        @Override
        public void handleDataDeleted(String s) throws Exception {
            System.out.println("1 监听到配置信息被删除 " + s);
        }
    });

    // 可以循环被调用
    zkClient.subscribeDataChanges("/node1/node11", new IZkDataListener() {
        @Override
        public void handleDataChange(String s, Object o) throws Exception {
            System.out.println(String.format("2 监听到配置信息被修改 %s %s", s, new String((byte[])o)));
        }

        @Override
        public void handleDataDeleted(String s) throws Exception {
            System.out.println("2 监听到配置信息被删除 " + s);
        }
    });

    zkClient.subscribeChildChanges("/node1", new IZkChildListener() {
        @Override
        public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception {
            System.out.println("子节点变更");
        }
    });

    zkClient.writeData("/node1", "1234".getBytes());
    zkClient.createPersistent("/node1/node11");
    zkClient.delete("/node1/node11");

    TimeUnit.SECONDS.sleep(1);

    // 会打断事件线程
    zkClient.close();
}