etcd源码阅读与分析(五):mvcc
MVCC 是数据库中常见的一种并发控制的方式,即保存数据的多个版本,在同一个事务里, 应用所见的版本是一致的。
但是,我还是很想吐槽etcd的mvcc实现,有点乱,在我看来,是过度抽象了。为了理解mvcc,我们需要预先了解这些东西(下文,mvcc如无特别说明,都是指代mvcc文件夹下,etcd的mvcc实现):
- mvcc底层使用 bolt 实现,bolt是一个基于B+树的KV存储。
kv.go
这个文件定义了大量的接口,然后接口之间又各种组合,但是其实最后 etcdserver 使用的就是ConsistentWatchableKV
这个接口。
此处,我们预先了解bolt的一些东西,但是暂时不去探究bolt的实现,bolt的实现粗略的瞄了一眼,如果要写的话,有的写了:
- bolt的顶级是一个DB,DB里有多个bucket。在物理上,bolt使用单个文件存储。
- bolt在某一刻只允许一个 read-write 事务,但是可以同时允许多个 read-only 事务。其实就是读写锁,写只能顺序来,读可以并发读。
DB.Update()
是用来开启 read-write 事务的,DB.View()
则是用来开启 read-only 事务的。由于每次执行DB.Update()
都会写入一次磁盘,可以使用DB.Batch()
来进行批量操作。
store是上面所说的 ConsistentWatchableKV
的底层实现:
type store struct {
ReadView
WriteView
// consistentIndex caches the "consistent_index" key's value. Accessed
// through atomics so must be 64-bit aligned.
consistentIndex uint64
// mu read locks for txns and write locks for non-txn store changes.
mu sync.RWMutex
ig ConsistentIndexGetter
b backend.Backend
kvindex index
le lease.Lessor
// revMuLock protects currentRev and compactMainRev.
// Locked at end of write txn and released after write txn unlock lock.
// Locked before locking read txn and released after locking.
revMu sync.RWMutex
// currentRev is the revision of the last completed transaction.
currentRev int64
// compactMainRev is the main revision of the last compaction.
compactMainRev int64
// bytesBuf8 is a byte slice of length 8
// to avoid a repetitive allocation in saveIndex.
bytesBuf8 []byte
fifoSched schedule.Scheduler
stopc chan struct{}
lg *zap.Logger
}
可以看到其中有几个很重要的东西:
mu sync.RWMutex
读写锁用于并发控制b backend.Backend
则是boltkvindex index
是用 Google 的B树实现做索引,至于为什么,后面会讲到
接下来探究一下 Put
是怎么工作的,这样我们就可以粗略的了解 mvcc 是怎么工作的。
store 本身并没有实现 Put
方法, 但是却可以调用 Put
方法,因为在最上边,它嵌套了一个匿名的 WriteView
,从而获得了
这个方法:
type store struct {
ReadView
WriteView
而具体的实现则在 NewStore
这个函数里可以找到:
s.ReadView = &readView{s}
s.WriteView = &writeView{s}
那我们就去看 writeView
怎么实现的:
func (wv *writeView) Put(key, value []byte, lease lease.LeaseID) (rev int64) {
tw := wv.kv.Write()
defer tw.End()
return tw.Put(key, value, lease)
}
看看 wv.kv.Write
返回的是个啥嘎达(注意,wv.kv是一个符合KV这个interface的东东):
type KV interface {
ReadView
WriteView
// Read creates a read transaction.
Read() TxnRead
// Write creates a write transaction.
Write() TxnWrite
那我们就要去看 TxnWrite.Put
是怎么实现的:
// TxnWrite represents a transaction that can modify the store.
type TxnWrite interface {
TxnRead
WriteView
// Changes gets the changes made since opening the write txn.
Changes() []mvccpb.KeyValue
}
原来又是接口,Put
是在 WriteView
里定义的。所以呢,绕了一圈,我们又绕回来了,所以,etcd搞得这么复杂干啥呢。。。为了找出具体实现,我们得去
NewStore
里翻翻,具体传进去的是什么。原来传进去的就是自己啊,那就说明,type store struct
这玩意儿,肯定实现了 Write
这个方法。但是呢,
我找来找去,就是没有发现。最后我只能开启搜索大法,然后在 kvstore_txn.go
这个文件里找到了:
// 哇好绕啊,又绕到这里来了,我是佩服的
func (s *store) Write() TxnWrite {
s.mu.RLock()
tx := s.b.BatchTx()
tx.Lock()
tw := &storeTxnWrite{
storeTxnRead: storeTxnRead{s, tx, 0, 0},
tx: tx,
beginRev: s.currentRev,
changes: make([]mvccpb.KeyValue, 0, 4),
}
return newMetricsTxnWrite(tw)
}
我是服气的。
接下来就要去翻 TxnWrite
的实现,也就是 storeTxnWrite
的 Put
的实现了,然后你会发现,Put
调用了 put
:
// etcdctl put foo bar最后到了这里
func (tw *storeTxnWrite) put(key, value []byte, leaseID lease.LeaseID) {
rev := tw.beginRev + 1
c := rev
oldLease := lease.NoLease
// if the key exists before, use its previous created and
// get its previous leaseID
_, created, ver, err := tw.s.kvindex.Get(key, rev)
if err == nil {
c = created.main
oldLease = tw.s.le.GetLease(lease.LeaseItem{Key: string(key)})
}
ibytes := newRevBytes() // revision的bytes
idxRev := revision{main: rev, sub: int64(len(tw.changes))}
revToBytes(idxRev, ibytes)
ver = ver + 1
kv := mvccpb.KeyValue{
Key: key,
Value: value,
CreateRevision: c,
ModRevision: rev,
Version: ver,
Lease: int64(leaseID),
}
d, err := kv.Marshal() // kv的bytes
if err != nil {
if tw.storeTxnRead.s.lg != nil {
tw.storeTxnRead.s.lg.Fatal(
"failed to marshal mvccpb.KeyValue",
zap.Error(err),
)
} else {
plog.Fatalf("cannot marshal event: %v", err)
}
}
tw.tx.UnsafeSeqPut(keyBucketName, ibytes, d) // 所以最后存储的,以revision为key,kv为value存储下来了
tw.s.kvindex.Put(key, idxRev)
tw.changes = append(tw.changes, kv)
if oldLease != lease.NoLease {
if tw.s.le == nil {
panic("no lessor to detach lease")
}
err = tw.s.le.Detach(oldLease, []lease.LeaseItem{{Key: string(key)}})
if err != nil {
if tw.storeTxnRead.s.lg != nil {
tw.storeTxnRead.s.lg.Fatal(
"failed to detach old lease from a key",
zap.Error(err),
)
} else {
plog.Errorf("unexpected error from lease detach: %v", err)
}
}
}
if leaseID != lease.NoLease {
if tw.s.le == nil {
panic("no lessor to attach lease")
}
err = tw.s.le.Attach(leaseID, []lease.LeaseItem{{Key: string(key)}})
if err != nil {
panic("unexpected error from lease Attach")
}
}
}
这里可以看出来,bolt里存储的KV,实际上并不是用户给出的KV。Key是revision,而Value是用户给出的KV。所以,才需要那个b树做索引, 把用户的key换成revision,然后再到bolt里,把revision换成真正的KV。
之后我们再看mvcc实现里的其他特性,例如watch是怎么实现的,实际上这玩意儿存储的时候,是有序存储的。
好了。这一节就看到这里了。
etcd源码阅读与分析系列文章
- etcd源码阅读与分析(一):raftexample
- etcd源码阅读与分析(二):raft
- etcd源码阅读与分析(三):wal
- etcd源码阅读与分析(四):lease
- etcd源码阅读与分析(五):mvcc
更多文章
- socks5 协议详解
- zerotier简明教程
- 搞定面试中的系统设计题
- frp 源码阅读与分析(一):流程和概念
- 用peewee代替SQLAlchemy
- Golang(Go语言)中实现典型的fork调用
- DNSCrypt简明教程
- 一个Gunicorn worker数量引发的血案
- Golang validator使用教程
- Docker组件介绍(一):runc和containerd
- Docker组件介绍(二):shim, docker-init和docker-proxy
- 使用Go语言实现一个异步任务框架
- 协程(coroutine)简介 - 什么是协程?
- SQLAlchemy简明教程
- Go Module 简明教程