您现在的位置是:首页 >学无止境 >Go Etcd网站首页学无止境
Go Etcd
基本操作
go get go.etcd.io/etcd/client/v3
# 此处使用的 版本是:
# go.etcd.io/etcd/client/v3 v3.5.8
这里使用的是
"go.etcd.io/etcd/client/v3"
而不是"go.etcd.io/etcd/clientv3"
我们不使用 etcd/clientv3,因为它与grpc 最新版本不兼容,官方最新推荐的方式 etcd/client/v3
package main
import (
"context"
"fmt"
"log"
"time"
clientv3 "go.etcd.io/etcd/client/v3"
)
func main() {
client, err := clientv3.New(clientv3.Config{
Endpoints: []string{"127.0.0.1:2379"},
DialTimeout: 5 * time.Second,
})
if err != nil {
log.Fatalln(err)
}
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
// PUT = add + edit
key := "/ns/service" // key 的位置在 /ns/ 目录下的 名为 service 文件 中
value := "127.0.0.1:80001"
_, err = client.Put(ctx, key, value)
if err != nil {
log.Printf("etcd put error,%v
", err)
return
}
// GET
getResponse, err := client.Get(ctx, key)
if err != nil {
log.Printf("etcd GET error,%v
", err)
return
}
for _, kv := range getResponse.Kvs {
fmt.Printf("Key:%s ===> Val:%s
", kv.Key, kv.Value)
}
// DEL
_, err = client.Delete(ctx, key)
if err != nil {
// 处理错误
log.Printf("etcd GET error,%v
", err)
return
}
}
监听
键的变化
如果值一直没有变化,程序不会退出,而是一直等待。这是因为Watch()方法是一个阻塞方法,会一直等待etcd中指定键的值发生变化或者超时。
如果没有设置超时时间,程序将一直等待,直到键值对发生变化或者程序被强制退出。
如果需要设置超时时间,可以使用context.WithTimeout()
方法创建一个带有超时时间的上下文对象,例如:
key := "/ns/service" // key 的位置在 /ns/ 目录下的 名为 service 文件 中
// 发布者
go func() {
for {
time.Sleep(time// 使用cli.Txn()方法创建了一个事务
txn := client.Txn(context.Background())
// 事务包含了一个条件判断和两个操作:
// 条件判断使用 clientv3.Compare() 方法,判断键值对的值是否等于"old_value",
// 如果满足条件,则执行clientv3.OpPut()方法,将键值对的值修改为"new_value",
// 否则执行clientv3.OpGet()方法,读取键值对的值。
txn.If(clientv3.Compare(clientv3.Value("/wtt/key"), "=", "value")).
Then(clientv3.OpPut("/wtt/key", "new_value")).
Else(clientv3.OpGet("/wtt/key"))
// 最后,使用txn.Commit()方法提交事务,如果事务执行成功,则返回resp.Succeeded为true,否则为false。
resp, err := txn.Commit()
if err != nil {
fmt.Println(err)
return
}
if resp.Succeeded {
fmt.Println("事务执行成功")
} else {
fmt.Println("事务执行失败")
}.Second * 1)
_, err = client.Put(context.Background(), key, "value")
if err != nil {
log.Printf("etcd put error,%v
", err)
return
} else {
fmt.Println("have change")
}
}
}()
// 订阅者
rch := client.Watch(context.Background(), key)
for wresp := range rch {
for _, ev := range wresp.Events {
fmt.Printf("%s %q : %q
", ev.Type, ev.Kv.Key, ev.Kv.Value)
}
}
**注意: **
需要注意的是,如果Watch()方法返回错误,例如etcd server连接失败或者上下文对象超时,
程序也会退出。因此,在使用Watch()方法时,需要处理可能出现的错误,以避免程序异常退出
租约
etcd的租约是etcd中一种重要的机制,用于管理键值对的生命周期。
在etcd中,每个键值对都可以关联
一个租约,租约可以设置一个过期时间
,当租约过期时,etcd会自动删除
该键值对。
租约的作用类似于一个定时器,当租约到期时,etcd会自动删除与该租约关联的键值对。租约可以用于实现一些高级功能,例如:
- 实现分布式锁:使用租约来实现分布式锁,当一个客户端获取锁时,可以创建一个带有租约的键值对,当锁被释放时,租约到期,etcd会自动删除该键值对,从而释放锁。
- 实现健康检查:使用租约来实现健康检查,当一个服务启动时,可以创建一个带有租约的键值对,当服务停止时,租约到期,etcd会自动删除该键值对,从而通知其他服务该服务已经停止。
- 实现自动续租:使用租约来实现自动续租,当一个客户端获取租约时,可以定期向etcd发送心跳,从而保持租约的有效性,避免租约过期导致键值对被删除。
client, err := clientv3.New(clientv3.Config{
Endpoints: []string{"127.0.0.1:2379"},
DialTimeout: 5 * time.Second,
})
if err != nil {
log.Fatalln(err)
}
// 创建租约
leaseResp, err := client.Grant(context.Background(), 30)
if err != nil {
fmt.Println(err)
return
}
// 绑定租约到键
_, err = client.Put(context.Background(), "/wtt/who", "whero", clientv3.WithLease(leaseResp.ID))
if err != nil {
fmt.Println(err)
return
} else {
fmt.Println("绑定租约到键 成功")
}
time.Sleep(time.Second * 10)
// 续约租约 (相当于 给 绑定租约 的 键 重新绑定 租约)
ch, err := client.KeepAlive(context.Background(), leaseResp.ID)
if err != nil {
fmt.Println(err)
return
}
ka, ok := <-ch
if ok {
fmt.Println("ttl:", ka.TTL)
}
// 撤销租约
/*
当租约被撤销时,etcd会自动删除与该租约关联的键值对。
需要注意的是,撤销租约是一个异步操作,etcd会在后台删除与该租约关联的键值对,
因此,在撤销租约后,键值对可能不会立即被删除,需要等待一段时间才能生效。
*/
_, err = client.Revoke(context.Background(), leaseResp.ID)
if err != nil {
fmt.Println(err)
return
} else {
fmt.Println("撤销租约 成功")
}
事务
etcd的事务是指一组操作,这些操作要么全部执行成功,要么全部执行失败,保证了数据的一致性和可靠性。
etcd的事务支持多个操作,包括读取、写入、修改和删除等操作,可以在一个事务中同时执行多个操作,
从而避免了多个操作之间的竞态条件和数据不一致的问题。
client, err := clientv3.New(clientv3.Config{
Endpoints: []string{"127.0.0.1:2379"},
DialTimeout: 5 * time.Second,
})
if err != nil {
log.Fatalln(err)
}
_, err = client.Put(context.Background(), "/wtt/key", "value")
if err != nil {
log.Printf("etcd put error,%v
", err)
return
}
// 使用cli.Txn(etcd的分布式锁是一种基于etcd实现的分布式锁,可以用于多个进程或多台机器之间的协调。etcd的分布式锁使用了etcd的事务操作和租约机制,可以保证锁的正确性和高可用性。)方法创建了一个事务
txn := client.Txn(context.Background())
// 事务包含了一个条件判断和两个操作:
// 条件判断使用 clientv3.Compare() 方法,判断键值对的值是否等于"old_value",
// 如果满足条件,则执行clientv3.OpPut()方法,将键值对的值修改为"new_value",
// 否则执行clientv3.OpGet()方法,读取键值对的值。
txn.If(clientv3.Compare(clientv3.Value("/wtt/key"), "=", "value")).
Then(clientv3.OpPut("/wtt/key", "new_value")).
Else(clientv3.OpGet("/wtt/key"))
// 最后,使用txn.Commit()方法提交事务,如果事务执行成功,则返回resp.Succeeded为true,否则为false。
resp, err := txn.Commit()
if err != nil {
fmt.Println(err)
return
}
if resp.Succeeded {
fmt.Println("事务执行成功")
} else {
fmt.Println("事务执行失败")
}
需要注意的是,etcd的事务是原子性的,即要么全部执行成功,要么全部执行失败,不支持部分执行成功的情况。
因此,在使用etcd的事务时,需要仔细考虑每个操作的顺序和条件,避免出现竞态条件和数据不一致的问题。
分布式
分布式锁
etcd的分布式锁是一种基于etcd实现的分布式锁,可以用于 多个进程 或 多台机器 之间的协调。
etcd的分布式锁使用了etcd的 事务操作 和 租约机制,可以保证锁的正确性和高可用性。
package main
import (
"context"
"fmt"
"time"
clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/client/v3/concurrency"
)
func main() {
// 创建etcd客户端
client, err := clientv3.New(clientv3.Config{
Endpoints: []string{"localhost:2379"},
DialTimeout: 5 * time.Second,
})
if err != nil {
panic(err)
}
defer client.Close()
go func() {
time.Sleep(time.Second * 2)
fmt.Println("son start")
// 创建一个session
session, err := concurrency.NewSession(client)
if err != nil {
panic(err)
}
defer session.Close()
// 创建一个锁
mutex := concurrency.NewMutex(session, "/my-lock")
// 加锁
err = mutex.Lock(context.Background())
if err != nil {
panic(err)
}
fmt.Println("son lock acquired")
// 执行业务逻辑
time.Sleep(5 * time.Second)
// 解锁
err = mutex.Unlock(context.Background())
if err != nil {
panic(err)
}
fmt.Println("son lock released")
}()
// 创建一个session
session, err := concurrency.NewSession(client)
if err != nil {
panic(err)
}
defer session.Close()
// 创建一个锁
mutex := concurrency.NewMutex(session, "/my-lock")
// 加锁
err = mutex.Lock(context.Background())
if err != nil {
panic(err)
}
fmt.Println("dad lock acquired")
// 执行业务逻辑
time.Sleep(5 * time.Second)
// 解锁
err = mutex.Unlock(context.Background())
if err != nil {
panic(err)
}
fmt.Println("dad lock released")
time.Sleep(time.Hour)
}
/*
输出:
dad lock acquired
son start
dad lock released
son lock acquired
son lock released
*/
分布式事务
分布式事务是指在分布式系统中,多个节点之间进行的事务操作。
在分布式系统中,不同的节点可能位于不同的物理机器上,它们之间通过网络进行通信。
由于网络通信的不确定性和不可靠性,分布式事务的实现比较复杂。
分布式事务需要满足ACID原则,即原子性、一致性、隔离性和持久性。
其中,原子性指事务是一个不可分割的操作序列,要么全部执行成功,要么全部执行失败;
一致性指事务执行前后,数据库的状态必须保持一致;
隔离性指多个事务之间应该相互隔离,互不干扰;
持久性指事务执行成功后,对数据库的修改应该永久保存。
常见的分布式事务实现方式包括两阶段提交
和 三阶段提交
。
两阶段提交是指在分布式系统中,事务的提交分为两个阶段,第一个阶段是准备阶段
,第二个阶段是提交阶段
;
三阶段提交是在两阶段提交的基础上,增加了一个预提交阶段
,用于解决两阶段提交中的 阻塞问题。