您现在的位置是:首页 >其他 >Zookeeper整理网站首页其他

Zookeeper整理

山间小僧 2024-08-23 00:01:02
简介Zookeeper整理

Zookeeper是什么

zookeeper: 分布式应用程序的分布式协调服务,主要用来解决分布式集群中应用系统的一致性问题。

详细可以阅读官网:https://zookeeper.apache.org/doc/current/zookeeperOver.html

这里只做一下重点整理

  • 特点
    • 结构简单:znodes结构简单,类似于文件和目录
    • 构建集群简单:内存中会维护状态,以及持久存储中的事务日志和快照。只要大多数服务器可用,ZooKeeper 服务就可用。image.png
    • 消息有序:看上图,从节点有个箭头给主节点,所有增删改都是主节点处理,从而保证有序性
    • 读取快:它在“读主导”工作负载中特别快。

important points

CAP

  • 一致性(Consistency):等同于所有节点访问同一份最新的数据副本
  • 可用性(Availability):每次请求都能获取到非错的响应——但是不保证获取的数据为最新数据
  • 分区容错性(Partition tolerance):(以实际效果而言,分区相当于对通信的时限要求。系统如果不能在时限内达成数据一致性,就意味着发生了分区的情况,必须就当前操作在C和A之间做出选择

角色

  • leader:选举产生,对外提供读写服务,且必须在有leader的情况下,zooker集群才能对外提供服务
  • follower:简单的理解为从节点,对外提供读服务,当收到客户端的写请求,会转发到Leader
  • observer:follower弱版本,没有投票权
    zookeeper保证的偏向CP多一点

协议

  • Paxos协议
    1. 提议阶段:在这个阶段,Proposer向Acceptor发送一个提议编号(Proposal Number),以请求Acceptor承诺不再接受任何小于该编号的提案。每个Acceptor会比较提议编号并返回承诺(Promise),承诺不再接受编号较小的提案。
    2. 接收阶段:如果Proposer收到了大多数(超过半数)Acceptor的承诺,它就会发送一个接收请求,包含提议编号和一个值。Acceptor在接收到接收请求后,会验证提议编号是否大于等于其已承诺的最高编号,并接受该提议。
    3. 决议阶段:一旦Proposer收到了大多数Acceptor的接收请求,它就可以发送一个决议请求,将最终的值广播给所有节点。接收到决议请求的节点将该值作为最终决议。

ZAB 协议是为分布式协调服务ZooKeeper专门设计的一种支持崩溃恢复的一致性协议。基于该协议,ZooKeeper 实现了一种主从模式的系统架构来保持集群中各个副本之间的数据一致性。一旦Leader节点故障无法工作,ZAB协议能够自动从Follower节点中重新选择出一个合适的替代者,这个过程被称为选主,选主也是ZAB协议中最为重要和复杂的过程。本文主要描述ZAB协议的选主过程以及在Zookeeper中的实现。

  • ZAB协议:Zookeeper Atomic Broadcase,原子广播协议
    1. 领导者选举:在ZooKeeper集群启动或Leader节点失效时,ZAB协议负责进行领导者选举。选举过程中节点拒绝对外提供服务,ZooKeeper节点会相互通信,通过提议和投票的方式选择出新的Leader节点。选举结果保证了只有一个节点成为Leader,其他节点成为Follower。
    2. 原子广播:一旦Leader节点选举出来,ZAB协议会通过原子广播机制将写操作的请求广播给集群中的所有节点。然后比较Follower的Zxid和Leader的Zxid,由于Leader的Zxid总是最大的可以防止已经提交的提案不丢失。Leader 服务器会为每个 Follower 服务器都各自分配一个单独的队列,然后将需要广播的事务依次放入这些队列中去,并根据 FIFO 策略进行消息的发送。每个Follower 服务器在接收到后都会将其以事务日志的形式写入到本地磁盘中,并且在成功写入后返回 Leader 服务器一个 ACK 响应。当有超过半数的服务器 ACK 响应后,Leader 就会广播一个 Commit 消息给所有的 Follower 服务器,Follower 接收到后就完成对事务的提交操作。

选举

ZAB协议的核心应该就是选举了,所有的客户端更新都发往Leader,Leader写入本地日志后再复制到所有的Follower节点。Leader就一个,因为Zookeeper的设计目标就是保证一致性,Zooker选主期间是无法对外提供服务的,崩溃回复(选主)过程也显得至关重要。官网上是说200ms就可以恢复

image.png

  • Follower发现无法与Leader保持通信时,改为对外不提供服务的状态
  • 各个Follower发起投票选举信息
  • Follower转变为候选者,先自投,并向其他节点发送选举请求(称为投票请求)
  • 候选者的选举请求中包含了节点ID和最后一条已知的事务ID(ZXID)
  • 通过事务ID(ZXID)投票,事务ID相同比较节点ID
  • 每一轮投票,都会统计每台节点收到的投票信息,判断是否有过半的节点收到了相同的投票信息,超过就选举为新Leader
  • 选出 Leader 后,其他节点就变成 Follower 角色,并向 Leader 发送自己服务器的最大 zxid ,Leader 服务器收到后会和自己本地的提议缓存队列进行比较,使用策略进行同步

api

  • connect host:port :连接指定zk
  • get path [watch] :命令用于获取节点数据和状态信息
  • ls path [watch] :查看某个路径下目录列表
  • set path data [version] :修改节点存储的数据
  • rmr path :删除节点及子节点 过期
  • delquota [-n|-b] path:删除配额
  • quit:退出
  • printwatches on|off:节点监控打开
  • create [-s] [-e] path data acl :创建 -e临时节点 -s顺序节点
  • stat path [watch] :命令用于查看节点状态信息
  • close:关闭当前连接
  • ls2 path [watch] :查看某个目录包含的所有文件
  • history:查看历史执行指令
  • listquota path :查看配额,以及节点的配额状
  • setAcl path acl:设置节点权限
  • getAcl path:获取节点权限
  • sync path:强制同步
  • redo cmdno:再执行一次
  • addauth scheme auth:输入认证授权信息
  • delete path [version] :删除
  • setquota -n|-b val path :配额,给节点限制值,比如限制子节点个数、节点数据的长度 -n:限制子节点个数 -b:限制值的长度

session

会话ID,用来唯一标识一个会话,每次客户端创建会话的时候,zookeeper 都会为其分配一个全局唯一的 sessionID,临时节点生命周期是跟着session生命周期的,session没了,它创建的临时节点也没了。

watch

读取时可以设置watcher,watcher可以理解为回调,当有时间触发,服务端会向指定客户端发送一个事件通知来实现通知功能,注意:一次性的

分布式锁

下面是一个简单的分布式锁的实现,主要步骤是创建临时顺序节点,如果是第一个节点直接获取锁,后续节点订阅前一个节点,前一个节点消失(释放或断开连接)再尝试获取锁

public class DistributedLock {

    private static final String ZOOKEEPER_CONNECT_STRING = "127.0.0.1:2181";
    private static final int SESSION_TIMEOUT = 5000;
    public static final String LOCK_NODE_PREFIX = "lock_";

    private ZooKeeper zooKeeper;
    private String lockPath;
    private String lockNode;
    private CountDownLatch lockAcquiredSignal;
    private String lockName = "/locks";


    public DistributedLock(String lockName) throws Exception {
        zooKeeper = new ZooKeeper(ZOOKEEPER_CONNECT_STRING, SESSION_TIMEOUT, null);
        this.lockName=lockName;
        createRootPath();
    }

    private void createRootPath() throws KeeperException, InterruptedException {
        Stat stat = zooKeeper.exists(lockName, false);
        if (stat == null) {
            zooKeeper.create(lockName, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        }
    }

    public void acquireLock() throws Exception {
        lockNode = zooKeeper.create(lockName + "/" + LOCK_NODE_PREFIX, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
        lockPath = lockNode.substring(lockNode.lastIndexOf("/") + 1);
        while (true) {
            List<String> children = zooKeeper.getChildren(lockName, false);
            Collections.sort(children);

            int index = children.indexOf(lockPath);
            if (index == 0) {
                return;
            } else {
                String previousLockPath = lockName + "/" + children.get(index - 1);
                Stat stat = zooKeeper.exists(previousLockPath, new LockWatcher());
                if (stat == null) {
                    continue;
                }
                lockAcquiredSignal = new CountDownLatch(1);
                lockAcquiredSignal.await();
            }
        }
    }

    public void releaseLock() throws Exception {
        zooKeeper.delete(lockNode, -1);
        zooKeeper.close();
    }

    private class LockWatcher implements Watcher {
        @Override
        public void process(WatchedEvent event) {
            if (event.getType() == Event.EventType.NodeDeleted) {
                lockAcquiredSignal.countDown();
            }
        }
    }


    public String getLockNode() {
        return lockNode;
    }

    public static void main(String[] args) {

        ExecutorService executors=Executors.newFixedThreadPool(10);
        for (int i = 0; i < 10; i++) {
            executors.submit(()->{
                try {
                    DistributedLock lock = new DistributedLock("/myLock");
                    lock.acquireLock();

                    System.out.println("lock start"+lock.getLockNode());
                    Thread.sleep(1);
                    System.out.println("lock over"+lock.getLockNode());

                    lock.releaseLock();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            });
        }


    }
}

docker脚本

version: '3.3'

# 给zk集群配置一个网络,网络名为zk-net
networks:
  zk-net:
    name: zk-net

# 配置zk集群的
# container services下的每一个子配置都对应一个zk节点的docker container
services:
  zk1:
    # docker container所使用的docker image
    image: wurstmeister/zookeeper
    hostname: zk1
    container_name: zk1
    # 配置docker container和宿主机的端口映射
    ports:
      - 2181:2181
      - 8081:8080
    # 配置docker container的环境变量
    environment:
      # 当前zk实例的id
      ZOO_MY_ID: 1
      # 整个zk集群的机器、端口列表
      ZOO_SERVERS: server.1=0.0.0.0:2888:3888;2181 server.2=zk2:2888:3888;2181 server.3=zk3:2888:3888;2181
    # 将docker container上的路径挂载到宿主机上 实现宿主机和docker container的数据共享
    volumes:
      - ./zk1/data:/data
      - ./zk1/datalog:/datalog
    # 当前docker container加入名为zk-net的隔离网络
    networks:
      - zk-net

  zk2:
    image: wurstmeister/zookeeper
    hostname: zk2
    container_name: zk2
    ports:
      - 2182:2181
      - 8082:8080
    environment:
      ZOO_MY_ID: 2
      ZOO_SERVERS: server.1=zk1:2888:3888;2181 server.2=0.0.0.0:2888:3888;2181 server.3=zk3:2888:3888;2181
    volumes:
      - ./zk2/data:/data
      - ./zk2/datalog:/datalog
    networks:
      - zk-net

  zk3:
    image: wurstmeister/zookeeper
    hostname: zk3
    container_name: zk3
    ports:
      - 2183:2181
      - 8083:8080
    environment:
      ZOO_MY_ID: 3
      ZOO_SERVERS: server.1=zk1:2888:3888;2181 server.2=zk2:2888:3888;2181 server.3=0.0.0.0:2888:3888;2181
    volumes:
      - ./zk3/data:/data
      - ./zk3/datalog:/datalog
    networks:
      - zk-net

风语者!平时喜欢研究各种技术,目前在从事后端开发工作,热爱生活、热爱工作。