您现在的位置是:首页 >技术杂谈 >【go项目-geecache】动手写分布式缓存 day2 - 单机并发缓存网站首页技术杂谈
【go项目-geecache】动手写分布式缓存 day2 - 单机并发缓存
简介【go项目-geecache】动手写分布式缓存 day2 - 单机并发缓存
sync.Mutex 互斥锁
如果我们要是实现并发缓存,那么我们要引入sync.Mutex 互斥锁来保证多个协程不冲突,确保同一时间只有一个协程运行,我们在使用的时候使用Lock() 和unLock()来实现阻塞
实现并发读写
实现ByteView表示缓存值 1.go
package geecache
type ByteView struct {
b []byte //缓存值,byte是为了通用性
}
func (v ByteView) Len() int {
return len(v.b)
}
func (v ByteView) ByteSlice() []byte {
return cloneBytes(v.b)
}
func (v ByteView) String() string { // 返回一个字符串的拷贝
return string(v.b)
}
func cloneBytes(b []byte) []byte { // 返回一个byte的拷贝
c := make([]byte, len(b))
copy(c, b)
return c
}
- ByteView,b表示实际的缓存值
- Len()实现接口,表示所占的内存
- ByteSlice() 返回一个拷贝,因为ByteView不能修改
封装lru.Cache ,添加并发属性 2.go
实现cache数据结构
package geecache
import (
"geecache/lru"
"sync"
)
type cache struct {
mu sync.Mutex
lru *lru.Cache
cacheBytes int64
}
cache : 里面封装了lru的Cache,控制并发的mutex锁,和缓存大小
实现增加和get函数
func (c *cache) add(key string, value ByteView) {
c.mu.Lock()
defer c.mu.Unlock()
if c.lru == nil {
c.lru = lru.New(c.cacheBytes, nil)
}
c.lru.Add(key, value)
}
func (c *cache) get(key string) (value ByteView, ok bool) {
c.mu.Lock()
defer c.mu.Unlock()
if c.lru == nil {
return
}
if v, ok := c.lru.Get(key); ok {
return v.(ByteView), ok
}
return
}
add和get函数逻辑类似,首先使用mutex锁保证不冲突,然后查看cache已经存在,如果不存在则初始化,然后添加/获得数据
defer 的 使用
这里使用了defer,defer的作用就是令函数最后执行,所以虽然 c.mu.Lock()
defer c.mu.Unlock()写在一起,但是Unlock()是最后运行的,即保证协程不冲突,又提高代码可读性,不会忘记解锁
实现Group ,负责控制缓存值的存取 group.go
实现回调函数,在缓存不存在时获取数据
当我们在缓存中找不到数据时,此时我们需要从外界获取数据,由于数据来源的多种,我们不应该考虑这么多,为了拓展性和可读性,我们实现一个回调函数来通知用户我们需要数据,用户通过这个接口把数据传入
package geecache
import (
"fmt"
"log"
"sync"
)
type Getter interface {
Get(key string) ([]byte, error)
}
type GetterFunc func(key string) ([]byte, error)
func (f GetterFunc) Get(key string) ([]byte, error) {
return f(key)
}
这里定义了Getter接口,GetterFunc函数类型,并且为这个类型实现了Getter接口Get方法
在这里为什么使用接口而不用直接函数实现呢?
- 接口比函数更好的地方在于接口并不关心传入的数据类型,所以接口可以实现多态,更灵活也更节省代码,面对其他情况也能处理
- 而函数需要形参,对传入参数有要求,面对复杂场景无法处理
实现Group数据结构
type Group struct {
name string
getter Getter
mainCache cache
}
var (
mu sync.RWMutex
groups = make(map[string]*Group)
)```
- name 表示缓存的名字
- getter 回调函数,用于不命中缓存时从用户获取数据
- mainCache 缓存,实现lru算法,add/get等操作
除此以外还有两个全局变量
- mu RW锁 允许多个同时读,禁止读写和写写同时操作
- groups
#### 实例化函数
```go
func NewGroup(name string, cacheBytes int64, getter Getter) *Group {
if getter == nil {
panic("nil Getter")
}
mu.Lock()
defer mu.Unlock()
g := &Group{
name: name,
getter: getter,
mainCache: cache{cacheBytes: cacheBytes},
}
groups[name] = g
return g
}
首先还是判断回调函数是否正常(是否为空),正常则开启mutex锁,保证协程正常运行,然后实例化
实现Get函数和GetGroup函数
func GetGroup(name string) *Group {
mu.RLock()
g := groups[name]
mu.RUnlock()
return g
}
- GetGroup函数通过缓存名字得到group
func (g *Group) Get(key string) (ByteView, error) {
if key == "" {
return ByteView{}, fmt.Errorf("key is required")
}
if v, ok := g.mainCache.get(key); ok {
log.Println("[GeeCache] hit")
return v, nil
}
return g.load(key)
}
func (g *Group) load(key string) (value ByteView, err error) {
return g.getLocally(key)
}
func (g *Group) getLocally(key string) (ByteView, error) {
bytes, err := g.getter.Get(key)
if err != nil {
return ByteView{}, err
}
value := ByteView{b: cloneBytes(bytes)}
g.populateCache(key, value)
return value, nil
}
func (g *Group) populateCache(key string, value ByteView) {
g.mainCache.add(key, value)
}
- 实现Get函数,首先判断key是否存在实现过滤,然后查看缓存,找的到就返回,否则使用load函数去进一步查找
- load函数调用getLocally,在分布式场景会使用其他函数获取
- getLocally 首先使用回调函数,如果成功得到数据则将源数据添加到缓存 mainCache 中(通过 populateCache 方法)
- populateCache 将数据添加main_Cache缓存
收获总结:
- 了解接口的使用场景,它和函数之间的差别和优略势
- 测试文件要以_test结尾
- 系统设计要严谨,要考虑后期的拓展性和维护 ,比如load函数考虑到了分布式场景
- 数据结构之间的封装
实现代码和测试代码
1.go
package geecache
type ByteView struct {
b []byte //缓存值,byte是为了通用性
}
func (v ByteView) Len() int {
return len(v.b)
}
func (v ByteView) ByteSlice() []byte {
return cloneBytes(v.b)
}
func (v ByteView) String() string { // 返回一个字符串的拷贝
return string(v.b)
}
func cloneBytes(b []byte) []byte { // 返回一个byte的拷贝
c := make([]byte, len(b))
copy(c, b)
return c
}
2.go
package geecache
import (
"geecache/lru"
"sync"
)
type cache struct {
mu sync.Mutex
lru *lru.Cache
cacheBytes int64
}
func (c *cache) add(key string, value ByteView) {
c.mu.Lock()
defer c.mu.Unlock()
if c.lru == nil {
c.lru = lru.New(c.cacheBytes, nil)
}
c.lru.Add(key, value)
}
func (c *cache) get(key string) (value ByteView, ok bool) {
c.mu.Lock()
defer c.mu.Unlock()
if c.lru == nil {
return
}
if v, ok := c.lru.Get(key); ok {
return v.(ByteView), ok
}
return
}
group.go
package geecache
import (
"fmt"
"log"
"sync"
)
type Group struct {
name string
getter Getter
mainCache cache
}
type Getter interface {
Get(key string) ([]byte, error)
}
type GetterFunc func(key string) ([]byte, error)
func (f GetterFunc) Get(key string) ([]byte, error) {
return f(key)
}
var (
mu sync.RWMutex
groups = make(map[string]*Group)
)
func NewGroup(name string, cacheBytes int64, getter Getter) *Group {
if getter == nil {
panic("nil Getter")
}
mu.Lock()
defer mu.Unlock()
g := &Group{
name: name,
getter: getter,
mainCache: cache{cacheBytes: cacheBytes},
}
groups[name] = g
return g
}
func GetGroup(name string) *Group {
mu.RLock()
g := groups[name]
mu.RUnlock()
return g
}
func (g *Group) Get(key string) (ByteView, error) {
if key == "" {
return ByteView{}, fmt.Errorf("key is required")
}
if v, ok := g.mainCache.get(key); ok {
log.Println("[GeeCache] hit")
return v, nil
}
return g.load(key)
}
func (g *Group) load(key string) (value ByteView, err error) {
return g.getLocally(key)
}
func (g *Group) getLocally(key string) (ByteView, error) {
bytes, err := g.getter.Get(key)
if err != nil {
return ByteView{}, err
}
value := ByteView{b: cloneBytes(bytes)}
g.populateCache(key, value)
return value, nil
}
func (g *Group) populateCache(key string, value ByteView) {
g.mainCache.add(key, value)
}
group_test.go
package geecache
import (
"fmt"
"log"
"reflect"
"testing"
)
var db = map[string]string{
"Tom": "630",
"Jack": "589",
"Sam": "567",
}
func TestGetter(t *testing.T) {
var f Getter = GetterFunc(func(key string) ([]byte, error) {
return []byte(key), nil
})
expect := []byte("key")
if v, _ := f.Get("key"); !reflect.DeepEqual(v, expect) {
t.Fatal("callback failed")
}
}
func TestGet(t *testing.T) {
loadCounts := make(map[string]int, len(db))
gee := NewGroup("scores", 2<<10, GetterFunc(
func(key string) ([]byte, error) {
log.Println("[SlowDB] search key", key)
if v, ok := db[key]; ok {
if _, ok := loadCounts[key]; !ok {
loadCounts[key] = 0
}
loadCounts[key]++
return []byte(v), nil
}
return nil, fmt.Errorf("%s not exist", key)
}))
for k, v := range db {
if view, err := gee.Get(k); err != nil || view.String() != v {
t.Fatal("failed to get value of Tom")
}
if _, err := gee.Get(k); err != nil || loadCounts[k] > 1 {
t.Fatalf("cache %s miss", k)
}
}
if view, err := gee.Get("unknown"); err == nil {
t.Fatalf("the value of unknow should be empty, but %s got", view)
}
}
func TestGetGroup(t *testing.T) {
groupName := "scores"
NewGroup(groupName, 2<<10, GetterFunc(
func(key string) (bytes []byte, err error) { return }))
if group := GetGroup(groupName); group == nil || group.name != groupName {
t.Fatalf("group %s not exist", groupName)
}
if group := GetGroup(groupName + "111"); group != nil {
t.Fatalf("expect nil, but %s got", group.name)
}
}
风语者!平时喜欢研究各种技术,目前在从事后端开发工作,热爱生活、热爱工作。