etcd源码阅读与分析(四):lease

lease是租约,类似于Redis中的TTL(Time To Live)。可以看一下怎么使用lease:

cli, err := clientv3.New(clientv3.Config{
    Endpoints:   endpoints,
    DialTimeout: dialTimeout,
})
if err != nil {
    log.Fatal(err)
}
defer cli.Close()

// minimum lease TTL is 5-second
resp, err := cli.Grant(context.TODO(), 5)
if err != nil {
    log.Fatal(err)
}

// after 5 seconds, the key 'foo' will be removed
_, err = cli.Put(context.TODO(), "foo", "bar", clientv3.WithLease(resp.ID))
if err != nil {
    log.Fatal(err)
}

可以看出来,我们就是拿一个lease的ID作为凭证。那么,lease是怎么实现的呢?

type Lease struct {
	ID           LeaseID
	ttl          int64 // time to live of the lease in seconds
	remainingTTL int64 // remaining time to live in seconds, if zero valued it is considered unset and the full ttl should be used
	// expiryMu protects concurrent accesses to expiry
	expiryMu sync.RWMutex
	// expiry is time when lease should expire. no expiration when expiry.IsZero() is true
	expiry time.Time

	// mu protects concurrent accesses to itemSet
	mu      sync.RWMutex
	itemSet map[LeaseItem]struct{}
	revokec chan struct{}
}

可以看出来,Lease在创建的时候,就会分配一个ID和设定好TTL。

接下来看看lessor这个接口的定义:

// Lessor owns leases. It can grant, revoke, renew and modify leases for lessee.
type Lessor interface {
	// SetRangeDeleter lets the lessor create TxnDeletes to the store.
	// Lessor deletes the items in the revoked or expired lease by creating
	// new TxnDeletes.
	SetRangeDeleter(rd RangeDeleter)

	SetCheckpointer(cp Checkpointer)

	// Grant grants a lease that expires at least after TTL seconds.
	Grant(id LeaseID, ttl int64) (*Lease, error)
	// Revoke revokes a lease with given ID. The item attached to the
	// given lease will be removed. If the ID does not exist, an error
	// will be returned.
	Revoke(id LeaseID) error

	// Checkpoint applies the remainingTTL of a lease. The remainingTTL is used in Promote to set
	// the expiry of leases to less than the full TTL when possible.
	Checkpoint(id LeaseID, remainingTTL int64) error

	// Attach attaches given leaseItem to the lease with given LeaseID.
	// If the lease does not exist, an error will be returned.
	Attach(id LeaseID, items []LeaseItem) error

	// GetLease returns LeaseID for given item.
	// If no lease found, NoLease value will be returned.
	GetLease(item LeaseItem) LeaseID

	// Detach detaches given leaseItem from the lease with given LeaseID.
	// If the lease does not exist, an error will be returned.
	Detach(id LeaseID, items []LeaseItem) error

	// Promote promotes the lessor to be the primary lessor. Primary lessor manages
	// the expiration and renew of leases.
	// Newly promoted lessor renew the TTL of all lease to extend + previous TTL.
	Promote(extend time.Duration)

	// Demote demotes the lessor from being the primary lessor.
	Demote()

	// Renew renews a lease with given ID. It returns the renewed TTL. If the ID does not exist,
	// an error will be returned.
	Renew(id LeaseID) (int64, error)

	// Lookup gives the lease at a given lease id, if any
	Lookup(id LeaseID) *Lease

	// Leases lists all leases.
	Leases() []*Lease

	// ExpiredLeasesC returns a chan that is used to receive expired leases.
	ExpiredLeasesC() <-chan []*Lease

	// Recover recovers the lessor state from the given backend and RangeDeleter.
	Recover(b backend.Backend, rd RangeDeleter)

	// Stop stops the lessor for managing leases. The behavior of calling Stop multiple
	// times is undefined.
	Stop()
}

可以看出来作为一个lessor,也就是管理lease的东东,需要实现这些接口。我们具体关注怎么完成Grant,此外,expiration是怎么做的。 所以我们找到具体的实现来看看:

// lessor implements Lessor interface.
// TODO: use clockwork for testability.
type lessor struct {
	mu sync.RWMutex

	// demotec is set when the lessor is the primary.
	// demotec will be closed if the lessor is demoted.
	demotec chan struct{}

	leaseMap            map[LeaseID]*Lease
	leaseHeap           LeaseQueue
	leaseCheckpointHeap LeaseQueue
	itemMap             map[LeaseItem]LeaseID

	// When a lease expires, the lessor will delete the
	// leased range (or key) by the RangeDeleter.
	rd RangeDeleter

	// When a lease's deadline should be persisted to preserve the remaining TTL across leader
	// elections and restarts, the lessor will checkpoint the lease by the Checkpointer.
	cp Checkpointer

	// backend to persist leases. We only persist lease ID and expiry for now.
	// The leased items can be recovered by iterating all the keys in kv.
	b backend.Backend

	// minLeaseTTL is the minimum lease TTL that can be granted for a lease. Any
	// requests for shorter TTLs are extended to the minimum TTL.
	minLeaseTTL int64

	expiredC chan []*Lease
	// stopC is a channel whose closure indicates that the lessor should be stopped.
	stopC chan struct{}
	// doneC is a channel whose closure indicates that the lessor is stopped.
	doneC chan struct{}

	lg *zap.Logger

	// Wait duration between lease checkpoints.
	checkpointInterval time.Duration
}

可以看到,里边有一个 leaseMap, 有 leaseHeap,为什么要有两个呢?堆的特性不知道大家还记得吗?此处的 leaseHeap 实现是 一个小堆,比较的关键是Lease失效的时间:

type LeaseQueue []*LeaseWithTime

func (pq LeaseQueue) Len() int { return len(pq) }

func (pq LeaseQueue) Less(i, j int) bool {
	return pq[i].time < pq[j].time
}

func (pq LeaseQueue) Swap(i, j int) {
	pq[i], pq[j] = pq[j], pq[i]
	pq[i].index = i
	pq[j].index = j
}

func (pq *LeaseQueue) Push(x interface{}) {
	n := len(*pq)
	item := x.(*LeaseWithTime)
	item.index = n
	*pq = append(*pq, item)
}

func (pq *LeaseQueue) Pop() interface{} {
	old := *pq
	n := len(old)
	item := old[n-1]
	item.index = -1 // for safety
	*pq = old[0 : n-1]
	return item
}

所以,怎么保证lease失效呢?我们每次从小堆里判断堆顶元素是否失效,失效就 Pop 就可以了。那为什么又要有 leaseMap 呢?因为 这样可以加速查找,毕竟,哈希表的时间复杂度是 O(1)

那么,什么时候会进行lease的失效管理呢?我们看新建lessort的地方:

func NewLessor(lg *zap.Logger, b backend.Backend, cfg LessorConfig) Lessor {
	return newLessor(lg, b, cfg)
}

func newLessor(lg *zap.Logger, b backend.Backend, cfg LessorConfig) *lessor {
	checkpointInterval := cfg.CheckpointInterval
	if checkpointInterval == 0 {
		checkpointInterval = 5 * time.Minute
	}
	l := &lessor{
		leaseMap:            make(map[LeaseID]*Lease),
		itemMap:             make(map[LeaseItem]LeaseID),
		leaseHeap:           make(LeaseQueue, 0),
		leaseCheckpointHeap: make(LeaseQueue, 0),
		b:                   b,
		minLeaseTTL:         cfg.MinLeaseTTL,
		checkpointInterval:  checkpointInterval,
		// expiredC is a small buffered chan to avoid unnecessary blocking.
		expiredC: make(chan []*Lease, 16),
		stopC:    make(chan struct{}),
		doneC:    make(chan struct{}),
		lg:       lg,
	}
	l.initAndRecover()

	go l.runLoop()

	return l
}

倒数第二行,go l.runLoop(),跟进去看:

func (le *lessor) runLoop() {
	defer close(le.doneC)

	for {
		le.revokeExpiredLeases()
		le.checkpointScheduledLeases()

		select {
		case <-time.After(500 * time.Millisecond):
		case <-le.stopC:
			return
		}
	}
}

每500毫秒会进行一次循环,检查失效的lease然后传递到 expiredC 这个channel里。但是 type lessor struct 这个结构体并没有 对其进行处理,估计是调用者负责处理,所以我搜索了一下是不是有地方处理:

$ ack -Q 'ExpiredLeasesC()'
lease/lessor.go
126:	ExpiredLeasesC() <-chan []*Lease
559:func (le *lessor) ExpiredLeasesC() <-chan []*Lease {
906:func (fl *FakeLessor) ExpiredLeasesC() <-chan []*Lease { return nil }

lease/lessor_test.go
380:	case el := <-le.ExpiredLeasesC():
433:	case el := <-le.ExpiredLeasesC():

etcdserver/server.go
1013:		expiredLeaseC = s.lessor.ExpiredLeasesC()

果然,etcdserver/server.go 作为调用者对它进行处理。

最后,需要提到的是,lease也会进行持久化的,并且新建lessort的时候会优先看是否能从已有持久化的文件中恢复。


etcd源码阅读与分析系列文章


更多文章
  • Go里优雅的使用全局配置
  • Golang sync.Map源码分析
  • Android滑动时隐藏FAB
  • Python中用tuple作为key
  • 一些常用的算法思维
  • 编写可维护的函数
  • 为什么要把配置保存在仓库里?
  • Android自动展示和关闭进度条
  • Kotlin/Java 列表Protobuf序列化
  • deeplink结合路由处理扩展App的能力
  • 怎么使用ViewModel 和 RecyclerView
  • Android手动挡MVVM
  • 来电拦截方案
  • 你好,2021!
  • gRPC鉴权方案