首页

/

归档

/

友链

/

Github

/

模拟面试

/

独立黑客

/

资料

/

订阅

/

RSS

/

关于我


etcd源码阅读(五):mvcc

MVCC 是数据库中常见的一种并发控制的方式,即保存数据的多个版本,在同一个事务里, 应用所见的版本是一致的。

但是,我还是很想吐槽etcd的mvcc实现,有点乱,在我看来,是过度抽象了。为了理解mvcc,我们需要预先了解这些东西(下文,mvcc如无特别说明,都是指代mvcc文件夹下,etcd的mvcc实现):

此处,我们预先了解bolt的一些东西,但是暂时不去探究bolt的实现,bolt的实现粗略的瞄了一眼,如果要写的话,有的写了:

https://github.com/etcd-io/bbolt

store是上面所说的 ConsistentWatchableKV 的底层实现:

type store struct {
    ReadView
    WriteView

    // consistentIndex caches the "consistent_index" key's value. Accessed
    // through atomics so must be 64-bit aligned.
    consistentIndex uint64

    // mu read locks for txns and write locks for non-txn store changes.
    mu sync.RWMutex

    ig ConsistentIndexGetter

    b       backend.Backend
    kvindex index

    le lease.Lessor

    // revMuLock protects currentRev and compactMainRev.
    // Locked at end of write txn and released after write txn unlock lock.
    // Locked before locking read txn and released after locking.
    revMu sync.RWMutex
    // currentRev is the revision of the last completed transaction.
    currentRev int64
    // compactMainRev is the main revision of the last compaction.
    compactMainRev int64

    // bytesBuf8 is a byte slice of length 8
    // to avoid a repetitive allocation in saveIndex.
    bytesBuf8 []byte

    fifoSched schedule.Scheduler

    stopc chan struct{}

    lg *zap.Logger
}

可以看到其中有几个很重要的东西:

接下来探究一下 Put 是怎么工作的,这样我们就可以粗略的了解 mvcc 是怎么工作的。

store 本身并没有实现 Put 方法, 但是却可以调用 Put 方法,因为在最上边,它嵌套了一个匿名的 WriteView,从而获得了 这个方法:

type store struct {
    ReadView
    WriteView

而具体的实现则在 NewStore 这个函数里可以找到:

    s.ReadView = &readView{s}
    s.WriteView = &writeView{s}

那我们就去看 writeView 怎么实现的:

func (wv *writeView) Put(key, value []byte, lease lease.LeaseID) (rev int64) {
    tw := wv.kv.Write()
    defer tw.End()
    return tw.Put(key, value, lease)
}

看看 wv.kv.Write 返回的是个啥嘎达(注意,wv.kv是一个符合KV这个interface的东东):

type KV interface {
    ReadView
    WriteView

    // Read creates a read transaction.
    Read() TxnRead

    // Write creates a write transaction.
    Write() TxnWrite

那我们就要去看 TxnWrite.Put 是怎么实现的:

// TxnWrite represents a transaction that can modify the store.
type TxnWrite interface {
    TxnRead
    WriteView
    // Changes gets the changes made since opening the write txn.
    Changes() []mvccpb.KeyValue
}

原来又是接口,Put 是在 WriteView 里定义的。所以呢,绕了一圈,我们又绕回来了,所以,etcd搞得这么复杂干啥呢。。。为了找出具体实现,我们得去 NewStore 里翻翻,具体传进去的是什么。原来传进去的就是自己啊,那就说明,type store struct 这玩意儿,肯定实现了 Write 这个方法。但是呢, 我找来找去,就是没有发现。最后我只能开启搜索大法,然后在 kvstore_txn.go 这个文件里找到了:

// 哇好绕啊,又绕到这里来了,我是佩服的
func (s *store) Write() TxnWrite {
    s.mu.RLock()
    tx := s.b.BatchTx()
    tx.Lock()
    tw := &storeTxnWrite{
        storeTxnRead: storeTxnRead{s, tx, 0, 0},
        tx:           tx,
        beginRev:     s.currentRev,
        changes:      make([]mvccpb.KeyValue, 0, 4),
    }
    return newMetricsTxnWrite(tw)
}

我是服气的。

接下来就要去翻 TxnWrite 的实现,也就是 storeTxnWritePut 的实现了,然后你会发现,Put 调用了 put

// etcdctl put foo bar最后到了这里
func (tw *storeTxnWrite) put(key, value []byte, leaseID lease.LeaseID) {
    rev := tw.beginRev + 1
    c := rev
    oldLease := lease.NoLease

    // if the key exists before, use its previous created and
    // get its previous leaseID
    _, created, ver, err := tw.s.kvindex.Get(key, rev)
    if err == nil {
        c = created.main
        oldLease = tw.s.le.GetLease(lease.LeaseItem{Key: string(key)})
    }

    ibytes := newRevBytes() // revision的bytes
    idxRev := revision{main: rev, sub: int64(len(tw.changes))}
    revToBytes(idxRev, ibytes)

    ver = ver + 1
    kv := mvccpb.KeyValue{
        Key:            key,
        Value:          value,
        CreateRevision: c,
        ModRevision:    rev,
        Version:        ver,
        Lease:          int64(leaseID),
    }

    d, err := kv.Marshal() // kv的bytes
    if err != nil {
        if tw.storeTxnRead.s.lg != nil {
            tw.storeTxnRead.s.lg.Fatal(
                "failed to marshal mvccpb.KeyValue",
                zap.Error(err),
            )
        } else {
            plog.Fatalf("cannot marshal event: %v", err)
        }
    }

    tw.tx.UnsafeSeqPut(keyBucketName, ibytes, d) // 所以最后存储的,以revision为key,kv为value存储下来了
    tw.s.kvindex.Put(key, idxRev)
    tw.changes = append(tw.changes, kv)

    if oldLease != lease.NoLease {
        if tw.s.le == nil {
            panic("no lessor to detach lease")
        }
        err = tw.s.le.Detach(oldLease, []lease.LeaseItem{{Key: string(key)}})
        if err != nil {
            if tw.storeTxnRead.s.lg != nil {
                tw.storeTxnRead.s.lg.Fatal(
                    "failed to detach old lease from a key",
                    zap.Error(err),
                )
            } else {
                plog.Errorf("unexpected error from lease detach: %v", err)
            }
        }
    }
    if leaseID != lease.NoLease {
        if tw.s.le == nil {
            panic("no lessor to attach lease")
        }
        err = tw.s.le.Attach(leaseID, []lease.LeaseItem{{Key: string(key)}})
        if err != nil {
            panic("unexpected error from lease Attach")
        }
    }
}

这里可以看出来,bolt里存储的KV,实际上并不是用户给出的KV。Key是revision,而Value是用户给出的KV。所以,才需要那个b树做索引, 把用户的key换成revision,然后再到bolt里,把revision换成真正的KV。

之后我们再看mvcc实现里的其他特性,例如watch是怎么实现的,实际上这玩意儿存储的时候,是有序存储的。

好了。这一节就看到这里了。