您现在的位置是:首页 >学无止境 >云原生之深入解析Kubernetes如何使用Leader选举机制来实现自己的HA应用网站首页学无止境

云原生之深入解析Kubernetes如何使用Leader选举机制来实现自己的HA应用

╰つ栺尖篴夢ゞ 2024-06-19 06:01:02
简介云原生之深入解析Kubernetes如何使用Leader选举机制来实现自己的HA应用

一、背景

  • 在 Kubernetes 的 kube-controller-manager、kube-scheduler,以及使用 Operator 的底层实现 controller-rumtime 都支持高可用系统中的 leader 选举,那么 controller-rumtime(底层的实现是 client-go) 中的 leader 选举以在 kubernetes controller 中是如何实现的呢?
  • 在运行 kube-controller-manager 时,是有一些参数提供给 cm 进行 leader 选举使用的,可以参考 官方文档提供的“参数” 来了解相关的参数。
--leader-elect                               Default: true
--leader-elect-renew-deadline duration       Default: 10s
--leader-elect-resource-lock string          Default: "leases"
--leader-elect-resource-name string       Default: "kube-controller-manager"
--leader-elect-resource-namespace string     Default: "kube-system"
--leader-elect-retry-period duration         Default: 2s
...
  • 本身以为这些组件的选举动作时通过 etcd 进行的,但是后面对 controller-runtime 学习时,发现并没有配置其相关的 etcd 相关参数,这就引起了对选举机制的好奇。有关于 kubernetes 的选举机制,官方的说明如下:
通过阅读文章得知,kubernetes API 提供了一中选举机制,只要运行在集群内的容器,都是可以实现选举功能的。
Kubernetes API 通过提供了两个属性来完成选举动作的
ResourceVersions:每个 API 对象唯一一个 ResourceVersion
Annotations:每个 API 对象都可以对这些 key 进行注释
注:这种选举会增加 APIServer 的压力。也就对 etcd 会产生影响
  • 那么有了这些信息之后,来看一下,在 Kubernetes 集群中,谁是 cm 的 leader(这里集群只有一个节点,所以本节点就是 leader)。在 Kubernetes 中所有启用了 leader 选举的服务都会生成一个 EndPoint ,在这个 EndPoint 中会有上面提到的 label(Annotations)来标识谁是 leader:
$ kubectl get ep -n kube-system
NAME                      ENDPOINTS   AGE
kube-controller-manager   <none>      3d4h
kube-dns                              3d4h
kube-scheduler            <none>      3d4h
  • 以 kube-controller-manager 为例,来看下这个 EndPoint 有什么信息?
[root@master-machine ~]# kubectl describe ep kube-controller-manager -n kube-system
Name:         kube-controller-manager
Namespace:    kube-system
Labels:       <none>
Annotations:  control-plane.alpha.kubernetes.io/leader:
                {"holderIdentity":"master-machine_06730140-a503-487d-850b-1fe1619f1fe1","leaseDurationSeconds":15,"acquireTime":"2022-06-27T15:30:46Z","re...
Subsets:
Events:
  Type    Reason          Age    From                     Message
  ----    ------          ----   ----                     -------
  Normal  LeaderElection  2d22h  kube-controller-manager  master-machine_76aabcb5-49ff-45ff-bd18-4afa61fbc5af became leader
  Normal  LeaderElection  9m     kube-controller-manager  master-machine_06730140-a503-487d-850b-1fe1619f1fe1 became leader
  • 可以看出 Annotations: control-plane.alpha.kubernetes.io/leader: 标出了哪个 node 是 leader。

二、election in controller-runtime

  • controller-runtime 有关 leader 选举的部分在 pkg/leaderelection 里面,总共 100 行代码,现在来看下它做了些什么?可以看到,它只提供了创建资源锁的一些选项:
type Options struct {
 // 在manager启动时,决定是否进行选举
 LeaderElection bool
 // 使用那种资源锁 默认为租用 lease
 LeaderElectionResourceLock string
 // 选举发生的名称空间
 LeaderElectionNamespace string
 // 该属性将决定持有leader锁资源的名称
 LeaderElectionID string
}
  • 通过 NewResourceLock 可以看到,这里是执行的是 client-go/tools/leaderelection ,而这个 leaderelection的学习,这里推荐一个 leader-election 来学习如何使用它。可以看到,进入选举的入口是一个 RunOrDie() 的函数:
// 使用一个lease锁,注释中说愿意为集群中存在lease的监听较少
lock := &resourcelock.LeaseLock{
    LeaseMeta: metav1.ObjectMeta{
        Name:      leaseLockName,
        Namespace: leaseLockNamespace,
    },
    Client: client.CoordinationV1(),
    LockConfig: resourcelock.ResourceLockConfig{
        Identity: id,
    },
}

// 开启选举循环
leaderelection.RunOrDie(ctx, leaderelection.LeaderElectionConfig{
    Lock: lock,
    // 这里必须保证拥有的租约在调用cancel()前终止,否则会仍有一个loop在运行
    ReleaseOnCancel: true,
    LeaseDuration:   60 * time.Second,
    RenewDeadline:   15 * time.Second,
    RetryPeriod:     5 * time.Second,
    Callbacks: leaderelection.LeaderCallbacks{
        OnStartedLeading: func(ctx context.Context) {
            // 这里填写你的代码,
            // usually put your code
            run(ctx)
        },
        OnStoppedLeading: func() {
            // 这里清理你的lease
            klog.Infof("leader lost: %s", id)
            os.Exit(0)
        },
        OnNewLeader: func(identity string) {
            // we're notified when new leader elected
            if identity == id {
                // I just got the lock
                return
            }
            klog.Infof("new leader elected: %s", identity)
        },
    },
})
  • 看到这里,了解了锁的概念和如何启动一个锁,来看下 client-go 都提供了那些锁。在代码 tools/leaderelection/resourcelock/interface.go 定义了一个锁抽象,interface 提供了一个通用接口,用于锁定 leader 选举中使用的资源:
type Interface interface {
 // Get 返回选举记录
 Get(ctx context.Context) (*LeaderElectionRecord, []byte, error)

 // Create 创建一个LeaderElectionRecord
 Create(ctx context.Context, ler LeaderElectionRecord) error

 // Update will update and existing LeaderElectionRecord
 Update(ctx context.Context, ler LeaderElectionRecord) error

 // RecordEvent is used to record events
 RecordEvent(string)

 // Identity 返回锁的标识
 Identity() string

 // Describe is used to convert details on current resource lock into a string
 Describe() string
}
  • 那么实现这个抽象接口的就是,实现的资源锁,可以看到,client-go 提供了四种资源锁:leaselock、configmaplock、multilock、endpointlock。

① leaselock

  • Lease 是 kubernetes 控制平面中的通过 ETCD 来实现的一个 Leases 的资源,主要为了提供分布式租约的一种控制机制。而关于对这个 API 的描述可以参考:Lease
  • 在 Kubernetes 集群中,可以使用如下命令来查看对应的 lease:
$ kubectl get leases -A
NAMESPACE         NAME                      HOLDER                                                AGE
kube-node-lease   master-machine            master-machine                                        3d19h
kube-system       kube-controller-manager   master-machine_06730140-a503-487d-850b-1fe1619f1fe1   3d19h
kube-system       kube-scheduler            master-machine_1724e2d9-c19c-48d7-ae47-ee4217b27073   3d19h

$ kubectl describe leases kube-controller-manager -n kube-system
Name:         kube-controller-manager
Namespace:    kube-system
Labels:       <none>
Annotations:  <none>
API Version:  coordination.k8s.io/v1
Kind:         Lease
Metadata:
  Creation Timestamp:  2022-06-24T11:01:51Z
  Managed Fields:
    API Version:  coordination.k8s.io/v1
    Fields Type:  FieldsV1
    fieldsV1:
      f:spec:
        f:acquireTime:
        f:holderIdentity:
        f:leaseDurationSeconds:
        f:leaseTransitions:
        f:renewTime:
    Manager:         kube-controller-manager
    Operation:       Update
    Time:            2022-06-24T11:01:51Z
  Resource Version:  56012
  Self Link:         /apis/coordination.k8s.io/v1/namespaces/kube-system/leases/kube-controller-manager
  UID:               851a32d2-25dc-49b6-a3f7-7a76f152f071
Spec:
  Acquire Time:            2022-06-27T15:30:46.000000Z
  Holder Identity:         master-machine_06730140-a503-487d-850b-1fe1619f1fe1
  Lease Duration Seconds:  15
  Lease Transitions:       2
  Renew Time:              2022-06-28T06:09:26.837773Z
Events:                    <none>
  • 再来看下 leaselock 的实现,leaselock 会实现了作为资源锁的抽象:
type LeaseLock struct {
 // LeaseMeta 就是类似于其他资源类型的属性,包含name ns 以及其他关于lease的属性
 LeaseMeta  metav1.ObjectMeta
 Client     coordinationv1client.LeasesGetter // Client 就是提供了informer中的功能
 // lockconfig包含上面通过 describe 看到的 Identity与recoder用于记录资源锁的更改
    LockConfig ResourceLockConfig
    // lease 就是 API中的Lease资源,可以参考下上面给出的这个API的使用
 lease      *coordinationv1.Lease
}
  • 那么 leaselock 实现了哪些方法呢?

(A)Get

  • Ge 是从 spec 中返回选举的记录:
func (ll *LeaseLock) Get(ctx context.Context) (*LeaderElectionRecord, []byte, error) {
 var err error
 ll.lease, err = ll.Client.Leases(ll.LeaseMeta.Namespace).Get(ctx, ll.LeaseMeta.Name, metav1.GetOptions{})
 if err != nil {
  return nil, nil, err
 }
 record := LeaseSpecToLeaderElectionRecord(&ll.lease.Spec)
 recordByte, err := json.Marshal(*record)
 if err != nil {
  return nil, nil, err
 }
 return record, recordByte, nil
}

// 可以看出是返回这个资源spec里面填充的值
func LeaseSpecToLeaderElectionRecord(spec *coordinationv1.LeaseSpec) *LeaderElectionRecord {
 var r LeaderElectionRecord
 if spec.HolderIdentity != nil {
  r.HolderIdentity = *spec.HolderIdentity
 }
 if spec.LeaseDurationSeconds != nil {
  r.LeaseDurationSeconds = int(*spec.LeaseDurationSeconds)
 }
 if spec.LeaseTransitions != nil {
  r.LeaderTransitions = int(*spec.LeaseTransitions)
 }
 if spec.AcquireTime != nil {
  r.AcquireTime = metav1.Time{spec.AcquireTime.Time}
 }
 if spec.RenewTime != nil {
  r.RenewTime = metav1.Time{spec.RenewTime.Time}
 }
 return &r
}

(B)Create

  • Create 是在 kubernetes 集群中尝试去创建一个租约,可以看到,Client 就是 API 提供的对应资源的 REST 客户端,结果会在 Kubernetes 集群中创建这个 Lease:
func (ll *LeaseLock) Create(ctx context.Context, ler LeaderElectionRecord) error {
 var err error
 ll.lease, err = ll.Client.Leases(ll.LeaseMeta.Namespace).Create(ctx, &coordinationv1.Lease{
  ObjectMeta: metav1.ObjectMeta{
   Name:      ll.LeaseMeta.Name,
   Namespace: ll.LeaseMeta.Namespace,
  },
  Spec: LeaderElectionRecordToLeaseSpec(&ler),
 }, metav1.CreateOptions{})
 return err
}

(C)Update

  • Update 是更新 Lease 的 spec:
func (ll *LeaseLock) Update(ctx context.Context, ler LeaderElectionRecord) error {
 if ll.lease == nil {
  return errors.New("lease not initialized, call get or create first")
 }
 ll.lease.Spec = LeaderElectionRecordToLeaseSpec(&ler)

 lease, err := ll.Client.Leases(ll.LeaseMeta.Namespace).Update(ctx, ll.lease, metav1.UpdateOptions{})
 if err != nil {
  return err
 }

 ll.lease = lease
 return nil
}

(D)RecordEvent

  • RecordEvent 是记录选举时出现的事件,在 kubernetes 集群中查看 ep 的信息时,可以看到的 event 中存在 became leader 的事件,这里就是将产生的这个 event 添加到 meta-data 中:
func (ll *LeaseLock) RecordEvent(s string) {
   if ll.LockConfig.EventRecorder == nil {
      return
   }
   events := fmt.Sprintf("%v %v", ll.LockConfig.Identity, s)
   subject := &coordinationv1.Lease{ObjectMeta: ll.lease.ObjectMeta}
   // Populate the type meta, so we don't have to get it from the schema
   subject.Kind = "Lease"
   subject.APIVersion = coordinationv1.SchemeGroupVersion.String()
   ll.LockConfig.EventRecorder.Eventf(subject, corev1.EventTypeNormal, "LeaderElection", events)
}

② election workflow

  • 选举的代码入口是在 leaderelection.go ,这里继续上面的例子向下分析整个选举的过程。
  • 前面看到进入选举的入口是一个 RunOrDie() 的函数,进入 RunOrDie,看到其实也只有几行而已,大致上了解到了 RunOrDie 会使用提供的配置来启动选举的客户端,之后会阻塞,直到 ctx 退出,或停止持有 leader 的租约。
func RunOrDie(ctx context.Context, lec LeaderElectionConfig) {
 le, err := NewLeaderElector(lec)
 if err != nil {
  panic(err)
 }
 if lec.WatchDog != nil {
  lec.WatchDog.SetLeaderElection(le)
 }
 le.Run(ctx)
}
  • 再来看下 NewLeaderElector 做了些什么?LeaderElector 是一个结构体,这里只是创建它,这个结构体提供了选举中所需要的一切(LeaderElector 就是 RunOrDie 创建的选举客户端)。
func NewLeaderElector(lec LeaderElectionConfig) (*LeaderElector, error) {
 if lec.LeaseDuration <= lec.RenewDeadline {
  return nil, fmt.Errorf("leaseDuration must be greater than renewDeadline")
 }
 if lec.RenewDeadline <= time.Duration(JitterFactor*float64(lec.RetryPeriod)) {
  return nil, fmt.Errorf("renewDeadline must be greater than retryPeriod*JitterFactor")
 }
 if lec.LeaseDuration < 1 {
  return nil, fmt.Errorf("leaseDuration must be greater than zero")
 }
 if lec.RenewDeadline < 1 {
  return nil, fmt.Errorf("renewDeadline must be greater than zero")
 }
 if lec.RetryPeriod < 1 {
  return nil, fmt.Errorf("retryPeriod must be greater than zero")
 }
 if lec.Callbacks.OnStartedLeading == nil {
  return nil, fmt.Errorf("OnStartedLeading callback must not be nil")
 }
 if lec.Callbacks.OnStoppedLeading == nil {
  return nil, fmt.Errorf("OnStoppedLeading callback must not be nil")
 }

 if lec.Lock == nil {
  return nil, fmt.Errorf("Lock must not be nil.")
 }
 le := LeaderElector{
  config:  lec,
  clock:   clock.RealClock{},
  metrics: globalMetricsFactory.newLeaderMetrics(),
 }
 le.metrics.leaderOff(le.config.Name)
 return &le, nil
}
type LeaderElector struct {
 config LeaderElectionConfig // 这个的配置,包含一些时间参数,健康检查
 // recoder相关属性
 observedRecord    rl.LeaderElectionRecord
 observedRawRecord []byte
 observedTime      time.Time
 // used to implement OnNewLeader(), may lag slightly from the
 // value observedRecord.HolderIdentity if the transition has
 // not yet been reported.
 reportedLeader string
 // clock is wrapper around time to allow for less flaky testing
 clock clock.Clock
 // 锁定 observedRecord
 observedRecordLock sync.Mutex
 metrics leaderMetricsAdapter
}
  • 可以看到 Run 实现的选举逻辑就是在初始化客户端时传入的 三个 callback:
func (le *LeaderElector) Run(ctx context.Context) {
 defer runtime.HandleCrash()
 defer func() { // 退出时执行callbacke的OnStoppedLeading
  le.config.Callbacks.OnStoppedLeading()
 }()

 if !le.acquire(ctx) {
  return
 }
 ctx, cancel := context.WithCancel(ctx)
 defer cancel()
 go le.config.Callbacks.OnStartedLeading(ctx) // 选举时,执行 OnStartedLeading
 le.renew(ctx)
}
  • 在 Run 中调用了 acquire,这个是通过一个 loop 去调用 tryAcquireOrRenew,直到 ctx 传递过来结束信号:
func (le *LeaderElector) acquire(ctx context.Context) bool {
 ctx, cancel := context.WithCancel(ctx)
 defer cancel()
 succeeded := false
 desc := le.config.Lock.Describe()
 klog.Infof("attempting to acquire leader lease %v...", desc)
    // jitterUntil是执行定时的函数 func() 是定时任务的逻辑
    // RetryPeriod是周期间隔
    // JitterFactor 是重试系数,类似于延迟队列中的系数 (duration + maxFactor * duration)
    // sliding 逻辑是否计算在时间内
    // 上下文传递
 wait.JitterUntil(func() {
  succeeded = le.tryAcquireOrRenew(ctx)
  le.maybeReportTransition()
  if !succeeded {
   klog.V(4).Infof("failed to acquire lease %v", desc)
   return
  }
  le.config.Lock.RecordEvent("became leader")
  le.metrics.leaderOn(le.config.Name)
  klog.Infof("successfully acquired lease %v", desc)
  cancel()
 }, le.config.RetryPeriod, JitterFactor, true, ctx.Done())
 return succeeded
}
  • 实际上选举动作在 tryAcquireOrRenew 中,tryAcquireOrRenew 是尝试获得一个 leader 租约,如果已经获得到了,则更新租约;否则可以得到租约则为 true,反之 false:
func (le *LeaderElector) tryAcquireOrRenew(ctx context.Context) bool {
 now := metav1.Now() // 时间
 leaderElectionRecord := rl.LeaderElectionRecord{ // 构建一个选举record
  HolderIdentity:       le.config.Lock.Identity(), // 选举人的身份特征,ep与主机名有关
  LeaseDurationSeconds: int(le.config.LeaseDuration / time.Second), // 默认15s
  RenewTime:            now, // 重新获取时间
  AcquireTime:          now, // 获得时间
 }

 // 1. 从API获取或创建一个recode,如果可以拿到则已经有租约,反之创建新租约
 oldLeaderElectionRecord, oldLeaderElectionRawRecord, err := le.config.Lock.Get(ctx)
 if err != nil {
  if !errors.IsNotFound(err) {
   klog.Errorf("error retrieving resource lock %v: %v", le.config.Lock.Describe(), err)
   return false
  }
  // 创建租约的动作就是新建一个对应的resource,这个lock就是leaderelection提供的四种锁,
  // 看你在runOrDie中初始化传入了什么锁
  if err = le.config.Lock.Create(ctx, leaderElectionRecord); err != nil {
   klog.Errorf("error initially creating leader election record: %v", err)
   return false
  }
  // 到了这里就已经拿到或者创建了租约,然后记录其一些属性,LeaderElectionRecord
  le.setObservedRecord(&leaderElectionRecord)

  return true
 }

 // 2. 获取记录检查身份和时间
 if !bytes.Equal(le.observedRawRecord, oldLeaderElectionRawRecord) {
  le.setObservedRecord(oldLeaderElectionRecord)

  le.observedRawRecord = oldLeaderElectionRawRecord
 }
 if len(oldLeaderElectionRecord.HolderIdentity) > 0 &&
  le.observedTime.Add(le.config.LeaseDuration).After(now.Time) &&
  !le.IsLeader() { // 不是leader,进行HolderIdentity比较,再加上时间,这个时候没有到竞选其,跳出
  klog.V(4).Infof("lock is held by %v and has not yet expired", oldLeaderElectionRecord.HolderIdentity)
  return false
 }

 // 3.我们将尝试更新。 在这里leaderElectionRecord设置为默认值。让我们在更新之前更正它。
 if le.IsLeader() { // 到这就说明是leader,修正他的时间
  leaderElectionRecord.AcquireTime = oldLeaderElectionRecord.AcquireTime
  leaderElectionRecord.LeaderTransitions = oldLeaderElectionRecord.LeaderTransitions
 } else { // LeaderTransitions 就是指leader调整(转变为其他)了几次,如果是,
  // 则为发生转变,保持原有值
  // 反之,则+1
  leaderElectionRecord.LeaderTransitions = oldLeaderElectionRecord.LeaderTransitions + 1
 }
 // 完事之后更新APIServer中的锁资源,也就是更新对应的资源的属性信息
 if err = le.config.Lock.Update(ctx, leaderElectionRecord); err != nil {
  klog.Errorf("Failed to update lock: %v", err)
  return false
 }
 // setObservedRecord 是通过一个新的record来更新这个锁中的record
 // 操作是安全的,会上锁保证临界区仅可以被一个线程/进程操作
 le.setObservedRecord(&leaderElectionRecord)
 return true
}
  • 至此已经完整知道利用 kubernetes 进行选举的流程都是什么了;简单回顾下,上述 leader 选举所有的步骤:
    • 首选创建的服务就是该服务的 leader,锁可以为 lease , endpoint 等资源进行上锁;
    • 已经是 leader 的实例会不断续租,租约的默认值是 15 秒 (leaseDuration);leader 在租约满时更新租约时间(renewTime);
    • 其它的 follower,会不断检查对应资源锁的存在,如果已经有 leader,那么则检查 renewTime,如果超过了租用时间(),则表明 leader 存在问题需要重新启动选举,直到有 follower 提升为 leader;
    • 而为了避免资源被抢占,Kubernetes API 使用了 ResourceVersion 来避免被重复修改(如果版本号与请求版本号不一致,则表示已经被修改了,那么 APIServer 将返回错误)。

三、利用 Leader 机制实现 HA 应用

① 代码实现

  • 如果仅仅是使用 Kubernetes 中的锁,实现的代码也只有几行而已:
package main

import (
 "context"
 "flag"
 "fmt"
 "os"
 "os/signal"
 "syscall"
 "time"

 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
 clientset "k8s.io/client-go/kubernetes"
 "k8s.io/client-go/rest"
 "k8s.io/client-go/tools/clientcmd"
 "k8s.io/client-go/tools/leaderelection"
 "k8s.io/client-go/tools/leaderelection/resourcelock"
 "k8s.io/klog/v2"
)

func buildConfig(kubeconfig string) (*rest.Config, error) {
 if kubeconfig != "" {
  cfg, err := clientcmd.BuildConfigFromFlags("", kubeconfig)
  if err != nil {
   return nil, err
  }
  return cfg, nil
 }

 cfg, err := rest.InClusterConfig()
 if err != nil {
  return nil, err
 }
 return cfg, nil
}

func main() {
 klog.InitFlags(nil)

 var kubeconfig string
 var leaseLockName string
 var leaseLockNamespace string
 var id string
 // 初始化客户端的部分
 flag.StringVar(&kubeconfig, "kubeconfig", "", "absolute path to the kubeconfig file")
 flag.StringVar(&id, "id", "", "the holder identity name")
 flag.StringVar(&leaseLockName, "lease-lock-name", "", "the lease lock resource name")
 flag.StringVar(&leaseLockNamespace, "lease-lock-namespace", "", "the lease lock resource namespace")
 flag.Parse()

 if leaseLockName == "" {
  klog.Fatal("unable to get lease lock resource name (missing lease-lock-name flag).")
 }
 if leaseLockNamespace == "" {
  klog.Fatal("unable to get lease lock resource namespace (missing lease-lock-namespace flag).")
 }
 config, err := buildConfig(kubeconfig)
 if err != nil {
  klog.Fatal(err)
 }
 client := clientset.NewForConfigOrDie(config)

 run := func(ctx context.Context) {
  // 实现的业务逻辑,这里仅仅为实验,就直接打印了
  klog.Info("Controller loop...")

  for {
   fmt.Println("I am leader, I was working.")
   time.Sleep(time.Second * 5)
  }
 }

 // use a Go context so we can tell the leaderelection code when we
 // want to step down
 ctx, cancel := context.WithCancel(context.Background())
 defer cancel()

 // 监听系统中断
 ch := make(chan os.Signal, 1)
 signal.Notify(ch, os.Interrupt, syscall.SIGTERM)
 go func() {
  <-ch
  klog.Info("Received termination, signaling shutdown")
  cancel()
 }()

 // 创建一个资源锁
 lock := &resourcelock.LeaseLock{
  LeaseMeta: metav1.ObjectMeta{
   Name:      leaseLockName,
   Namespace: leaseLockNamespace,
  },
  Client: client.CoordinationV1(),
  LockConfig: resourcelock.ResourceLockConfig{
   Identity: id,
  },
 }

 // 开启一个选举的循环
 leaderelection.RunOrDie(ctx, leaderelection.LeaderElectionConfig{
  Lock:            lock,
  ReleaseOnCancel: true,
  LeaseDuration:   60 * time.Second,
  RenewDeadline:   15 * time.Second,
  RetryPeriod:     5 * time.Second,
  Callbacks: leaderelection.LeaderCallbacks{
   OnStartedLeading: func(ctx context.Context) {
    // 当选举为leader后所运行的业务逻辑
    run(ctx)
   },
   OnStoppedLeading: func() {
    // we can do cleanup here
    klog.Infof("leader lost: %s", id)
    os.Exit(0)
   },
   OnNewLeader: func(identity string) { // 申请一个选举时的动作
    if identity == id {
     return
    }
    klog.Infof("new leader elected: %s", identity)
   },
  },
 })
}
  • 注意:这种 lease 锁只能在 in-cluster 模式下运行,如果需要类似二进制部署的程序,可以选择 endpoint 类型的资源锁。

② 生成镜像

FROM golang:alpine AS builder
MAINTAINER cylon
WORKDIR /election
COPY . /election
ENV GOPROXY https://goproxy.cn,direct
RUN GOOS=linux GOARCH=amd64 CGO_ENABLED=0 go build -o elector main.go

FROM alpine AS runner
WORKDIR /go/elector
COPY --from=builder /election/elector .
VOLUME ["/election"]
ENTRYPOINT ["./elector"]

③ 准备资源清单

  • 默认情况下,Kubernetes 运行的 pod 在请求 Kubernetes 集群内资源时,默认的账户是没有权限的,默认服务帐户无权访问协调 API,因此需要创建另一个 serviceaccount 并相应地设置对应的 RBAC 权限绑定;在清单中配置上这个 sa,此时所有的 pod 就会有协调锁的权限:
apiVersion: v1
kind: ServiceAccount
metadata:
  name: sa-leaderelection
---
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
  name: leaderelection
rules:
  - apiGroups:
      - coordination.k8s.io
    resources:
      - leases
    verbs:
      - '*'
---
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
  name: leaderelection
roleRef:
  apiGroup: rbac.authorization.k8s.io
  kind: Role
  name: leaderelection
subjects:
  - kind: ServiceAccount
    name: sa-leaderelection
---
apiVersion: apps/v1
kind: Deployment
metadata:
  labels:
    app: leaderelection
  name: leaderelection
  namespace: default
spec:
  replicas: 3
  selector:
    matchLabels:
      app: leaderelection
  template:
    metadata:
      labels:
        app: leaderelection
    spec:
      containers:
        - image: cylonchau/leaderelection:v0.0.2
          imagePullPolicy: IfNotPresent
          command: ["./elector"]
          args:
          - "-id=$(POD_NAME)"
          - "-lease-lock-name=test"
          - "-lease-lock-namespace=default"
          env:
          - name: POD_NAME
            valueFrom:
              fieldRef:
                apiVersion: v1
                fieldPath: metadata.name
          name: elector
      serviceAccountName: sa-leaderelection

④ 集群中运行

  • 执行完清单后,当 pod 启动后,可以看到会创建出一个 lease:
$ kubectl get lease
NAME   HOLDER                            AGE
test   leaderelection-5644c5f84f-frs5n   1s


$ kubectl describe lease
Name:         test
Namespace:    default
Labels:       <none>
Annotations:  <none>
API Version:  coordination.k8s.io/v1
Kind:         Lease
Metadata:
  Creation Timestamp:  2022-06-28T16:39:45Z
  Managed Fields:
    API Version:  coordination.k8s.io/v1
    Fields Type:  FieldsV1
    fieldsV1:
      f:spec:
        f:acquireTime:
        f:holderIdentity:
        f:leaseDurationSeconds:
        f:leaseTransitions:
        f:renewTime:
    Manager:         elector
    Operation:       Update
    Time:            2022-06-28T16:39:45Z
  Resource Version:  131693
  Self Link:         /apis/coordination.k8s.io/v1/namespaces/default/leases/test
  UID:               bef2b164-a117-44bd-bad3-3e651c94c97b
Spec:
  Acquire Time:            2022-06-28T16:39:45.931873Z
  Holder Identity:         leaderelection-5644c5f84f-frs5n
  Lease Duration Seconds:  60
  Lease Transitions:       0
  Renew Time:              2022-06-28T16:39:55.963537Z
Events:                    <none>
  • 通过其持有者的信息查看对应 pod(因为程序中对 holder Identity 设置的是 pod 的名称),实际上是工作的 pod。
  • 如上实例所述,这是利用 Kubernetes 集群完成的 leader 选举的方案,虽然这不是最完美解决方案,但这是一种简单的方法,因为可以无需在集群上部署更多东西或者进行大量的代码工作就可以利用 Kubernetes 集群来实现一个高可用的 HA 应用。
风语者!平时喜欢研究各种技术,目前在从事后端开发工作,热爱生活、热爱工作。