一、创建zookeeper容器
1.拉取镜像
docker pull zookeeper:3.6.0
2.创建容器
docker run -itd -p 2181:2181 --name zookeeper-2181 zookeeper:3.6.0
3.进入容器
docker exec -itd zookeeper-2181 bash
二、命令操作zookeeper
连接zookeeper
./bin/zkCli.sh -server 127.0.0.1:2181
1.创建节点
create [-s] [-e] [-c] [-t ttl] path [data] [acl]
- [-s] : 创建有序节点
- [-e] : 创建临时节点
- [-c] : 创建一个容器节点
- [t ttl] : 创建一个TTL节点, -t 时间(单位毫秒)
- path: 路径
- [data]:节点的数据,可选,如果不使用时,节点数据就为null
- [acl] :权限相关
# 创建持久化节点
create /node1 "11"
# 创建持久化有序节点
create -s /node2 "22"
# 创建临时节点
create -e /node3 "33"
# 创建临时有序节点
create -e -s /node "44"
2.查看节点
ls [-s] [-w] [-R] path
- [-s] : 查看某一节点下的子节点加当前节点的元信息
- [-w] :查看节点并为节点添加一个监听,当节点被修改时,该客户端会收到一个回调
ls /node1
ls -R /node1
3.获取值
get [-s] [-w] path
- [-s] :查看节点数据加元信息
- [-w] : 查看节点并为节点添加一个监听,当节点被修改时,该客户端会收到一个回调
get /node1
4.设置值
set [-s] [-v version] path data
- [-s] :返回修改后节点的元信息
- [-v version] :指定数据的版本,版本不符合时修改失败,类似关系型数据库的乐观锁
set /node1 "1234"
5.删除节点
delete [-v version] path
delete /node1
三、ZooKeeper操作zookeeper
maven引入
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.6.0</version>
</dependency>
1.创建节点
@Test
public void testZKCreate() throws Exception {
CountDownLatch latch = new CountDownLatch(1);
ZooKeeper zk = new ZooKeeper("127.0.0.1", 10000, new Watcher() {
@Override
public void process(WatchedEvent event) {
System.out.println(event);
if (Event.KeeperState.SyncConnected == event.getState()) {
latch.countDown();
}
}
});
// 等待zk连接成功,才继续执行
latch.await();
// 创建持久节点
zk.create("/node1", "11".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
// 创建持久序列节点
zk.create("/node2", "22".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL);
// 创建临死节点
zk.create("/node3", "33".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
// 创建临死序列节点
zk.create("/node4", "44".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
zk.close();
}
2.获取值
@Test
public void testZKGet() throws Exception {
CountDownLatch latch = new CountDownLatch(1);
ZooKeeper zk = new ZooKeeper("127.0.0.1", 10000, new Watcher() {
@Override
public void process(WatchedEvent event) {
if (Event.KeeperState.SyncConnected == event.getState()) {
latch.countDown();
}
}
});
// 等待zk连接成功,才继续执行
latch.await();
byte[] data = zk.getData("/node1", null, null);
System.out.println(new String(data));
zk.close();
}
3.设置值
@Test
public void testZKSet() throws Exception {
CountDownLatch latch = new CountDownLatch(1);
ZooKeeper zk = new ZooKeeper("127.0.0.1", 10000, new Watcher() {
@Override
public void process(WatchedEvent event) {
if (Event.KeeperState.SyncConnected == event.getState()) {
latch.countDown();
}
}
});
// 等待zk连接成功,才继续执行
latch.await();
Stat stat = zk.setData("/node1", "1234".getBytes(), -1);
System.out.println(stat);
zk.close();
}
4.删除节点
@Test
public void testZKDelete() throws Exception {
CountDownLatch latch = new CountDownLatch(1);
ZooKeeper zk = new ZooKeeper("127.0.0.1", 10000, new Watcher() {
@Override
public void process(WatchedEvent event) {
if (Event.KeeperState.SyncConnected == event.getState()) {
latch.countDown();
}
}
});
// 等待zk连接成功,才继续执行
latch.await();
zk.delete("/node1", -1);
zk.close();
}
5.监听节点
一个Watch事件是一个一次性的触发器,当被设置了Watch的数据发生了改变的时候,则服务器将这个改变发送给设置了Watch的客户端,以便通知它们
Zookeeper有数据监视和子数据监视 getdata() and exists() 设置数据监视,getchildren()设置了子节点监视
- EventType.NodeCreated: 节点创建事件
- EventType.NodeDeleted: 节点删除事件
- EventType.NodeDataChanged: 节点数据内容变更
- EventType.NodeChildrenChanged: 子节点数量发生变化, 新增或删除子节点. 子节点内容发生变化, 不会触发.
@Test
public void testZKWatch() throws Exception {
CountDownLatch latch = new CountDownLatch(1);
ZooKeeper zk = new ZooKeeper("127.0.0.1", 10000, new Watcher() {
@Override
public void process(WatchedEvent event) {
// 连接状态
Event.KeeperState keeperState = event.getState();
// 事件类型
Event.EventType eventType = event.getType();
// 受影响的path
String path = event.getPath();
System.out.println(event);
if (Event.KeeperState.Disconnected == keeperState) {
System.out.println("与ZK服务器断开连接");
}
if (Event.KeeperState.AuthFailed == keeperState) {
System.out.println("权限检查失败");
}
if (Event.KeeperState.Expired == keeperState) {
System.out.println("会话失效");
}
if (Event.KeeperState.Closed == keeperState) {
System.out.println("客户端关闭");
}
if (Event.KeeperState.SyncConnected == keeperState) {
if (Event.EventType.None == eventType) {
System.out.println("成功连接上ZK服务器");
latch.countDown();
}
if (Event.EventType.NodeCreated == eventType) {
System.out.println("节点创建");
}
if (Event.EventType.NodeDataChanged == eventType) {
System.out.println("节点数据更新");
}
if (Event.EventType.NodeChildrenChanged == eventType) {
System.out.println("子节点变更");
}
if (Event.EventType.NodeDeleted == eventType) {
System.out.println("节点 " + path + " 被删除");
}
}
}
});
// 等待zk连接成功,才继续执行
latch.await();
zk.exists("/node1", true);
zk.setData("/node1", "1234".getBytes(), -1);
zk.exists("/node1/node11", true);
zk.create("/node1/node11", "1234".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
zk.getChildren("/node1", true);
zk.create("/node1/node12", "1234".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
zk.exists("/node1/node11", true);
zk.delete("/node1/node11", -1);
zk.getChildren("/node1", true);
zk.delete("/node1/node12", -1);
zk.close();
}
四、ZkClient操作zookeeper
maven引入
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.6.0</version>
</dependency>
<dependency>
<groupId>com.101tec</groupId>
<artifactId>zkclient</artifactId>
<version>0.11</version>
<exclusions>
<exclusion>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</exclusion>
</exclusions>
</dependency>
1.创建节点
@Test
public void testZKClientCreate() {
ZkClient zkClient = new ZkClient("127.0.0.1:2181", 10000, 40000, new BytesPushThroughSerializer());
// 创建持久节点
zkClient.createPersistent("/node1", "11".getBytes());
// 创建持久序列节点
zkClient.createPersistentSequential("/node2", "22".getBytes());
// 创建临死节点
zkClient.createEphemeral("/node3", "33".getBytes());
// 创建临死序列节点
zkClient.createEphemeralSequential("/node4", "44".getBytes());
zkClient.close();
}
2.获取值
@Test
public void testZKClientGet() {
ZkClient zkClient = new ZkClient("127.0.0.1:2181", 10000, 40000, new BytesPushThroughSerializer());
byte[] data = zkClient.readData("/node1");
System.out.println(new String(data));
zkClient.close();
}
3.设置值
@Test
public void testZKClientSet() {
ZkClient zkClient = new ZkClient("127.0.0.1:2181", 10000, 40000, new BytesPushThroughSerializer());
zkClient.writeData("/node1", "1234".getBytes());
zkClient.close();
}
4.删除节点
@Test
public void testZKClientDelete() {
ZkClient zkClient = new ZkClient("127.0.0.1:2181", 10000, 40000, new BytesPushThroughSerializer());
zkClient.delete("/node1");
zkClient.close();
}
5.监听节点
@Test
public void testZKClientWatch() throws Exception {
ZkClient zkClient = new ZkClient("127.0.0.1:2181", 10000, 10000, new BytesPushThroughSerializer());
// 可以循环被调用
zkClient.subscribeDataChanges("/node1", new IZkDataListener() {
@Override
public void handleDataChange(String s, Object o) throws Exception {
System.out.println(String.format("1 监听到配置信息被修改 %s %s", s, new String((byte[])o)));
}
@Override
public void handleDataDeleted(String s) throws Exception {
System.out.println("1 监听到配置信息被删除 " + s);
}
});
// 可以循环被调用
zkClient.subscribeDataChanges("/node1/node11", new IZkDataListener() {
@Override
public void handleDataChange(String s, Object o) throws Exception {
System.out.println(String.format("2 监听到配置信息被修改 %s %s", s, new String((byte[])o)));
}
@Override
public void handleDataDeleted(String s) throws Exception {
System.out.println("2 监听到配置信息被删除 " + s);
}
});
zkClient.subscribeChildChanges("/node1", new IZkChildListener() {
@Override
public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception {
System.out.println("子节点变更");
}
});
zkClient.writeData("/node1", "1234".getBytes());
zkClient.createPersistent("/node1/node11");
zkClient.delete("/node1/node11");
TimeUnit.SECONDS.sleep(1);
// 会打断事件线程
zkClient.close();
}