Go DiskQueue源码阅读
如何使用Go来实现一个简单的基于磁盘的FIFO队列呢?我们来看看 go-diskqueue 的实现, 它是NSQ中用来持久化的一个库,我们借助它来了解一下,如何实现一个基于磁盘的队列。
这个库很简单,只有一个文件,我们来看看 diskqueue.go
:
// diskQueue implements a filesystem backed FIFO queue
type diskQueue struct {
// 64bit atomic vars need to be first for proper alignment on 32bit platforms
// run-time state (also persisted to disk)
readPos int64
writePos int64
readFileNum int64
writeFileNum int64
depth int64
sync.RWMutex
// instantiation time metadata
name string
dataPath string
maxBytesPerFile int64 // currently this cannot change once created
minMsgSize int32
maxMsgSize int32
syncEvery int64 // number of writes per fsync
syncTimeout time.Duration // duration of time per fsync
exitFlag int32
needSync bool
// keeps track of the position where we have read
// (but not yet sent over readChan)
nextReadPos int64
nextReadFileNum int64
readFile *os.File
writeFile *os.File
reader *bufio.Reader
writeBuf bytes.Buffer
// exposed via ReadChan()
readChan chan []byte
// internal channels
depthChan chan int64
writeChan chan []byte
writeResponseChan chan error
emptyChan chan int
emptyResponseChan chan error
exitChan chan int
exitSyncChan chan int
logf AppLogFunc
}
可以看到,这就是主要的结构体,上面记录了当前读取文件的编号、写入文件的编号、读取的位置(偏移量),写入的位置等。
那么,NSQ的消息是乱序来的,我们无法预知什么时候会有消息来到,它是怎么做到处理成FIFO的呢?很简单,其实就是通过Go
的channel来实现的,可以看到 // internal channels
下面这一堆的channel。
接下来我们来看看读取和写入是如何做到的:
// Put writes a []byte to the queue
func (d *diskQueue) Put(data []byte) error {
d.RLock()
defer d.RUnlock()
if d.exitFlag == 1 {
return errors.New("exiting")
}
d.writeChan <- data
return <-d.writeResponseChan
}
来看看 writeChan
在哪里被消费:
// ioLoop provides the backend for exposing a go channel (via ReadChan())
// in support of multiple concurrent queue consumers
//
// it works by looping and branching based on whether or not the queue has data
// to read and blocking until data is either read or written over the appropriate
// go channels
//
// conveniently this also means that we're asynchronously reading from the filesystem
func (d *diskQueue) ioLoop() {
var dataRead []byte
var err error
var count int64
var r chan []byte
syncTicker := time.NewTicker(d.syncTimeout)
for {
// dont sync all the time :)
if count == d.syncEvery {
d.needSync = true
}
if d.needSync {
err = d.sync()
if err != nil {
d.logf(ERROR, "DISKQUEUE(%s) failed to sync - %s", d.name, err)
}
count = 0
}
if (d.readFileNum < d.writeFileNum) || (d.readPos < d.writePos) {
if d.nextReadPos == d.readPos {
dataRead, err = d.readOne()
if err != nil {
d.logf(ERROR, "DISKQUEUE(%s) reading at %d of %s - %s",
d.name, d.readPos, d.fileName(d.readFileNum), err)
d.handleReadError()
continue
}
}
r = d.readChan
} else {
r = nil
}
select {
// the Go channel spec dictates that nil channel operations (read or write)
// in a select are skipped, we set r to d.readChan only when there is data to read
case r <- dataRead:
count++
// moveForward sets needSync flag if a file is removed
d.moveForward()
case d.depthChan <- d.depth:
case <-d.emptyChan:
d.emptyResponseChan <- d.deleteAllFiles()
count = 0
case dataWrite := <-d.writeChan:
count++
d.writeResponseChan <- d.writeOne(dataWrite)
case <-syncTicker.C:
if count == 0 {
// avoid sync when there's no activity
continue
}
d.needSync = true
case <-d.exitChan:
goto exit
}
}
exit:
d.logf(INFO, "DISKQUEUE(%s): closing ... ioLoop", d.name)
syncTicker.Stop()
d.exitSyncChan <- 1
}
ioLoop
是在 New
函数被调用时发起的。我们来看 case dataWrite := <-d.writeChan
分支,调用 d.writeOne
,将其返回
结果放到 d.writeResponseChan
里:
// writeOne performs a low level filesystem write for a single []byte
// while advancing write positions and rolling files, if necessary
func (d *diskQueue) writeOne(data []byte) error {
var err error
if d.writeFile == nil {
curFileName := d.fileName(d.writeFileNum)
d.writeFile, err = os.OpenFile(curFileName, os.O_RDWR|os.O_CREATE, 0600)
if err != nil {
return err
}
d.logf(INFO, "DISKQUEUE(%s): writeOne() opened %s", d.name, curFileName)
if d.writePos > 0 {
_, err = d.writeFile.Seek(d.writePos, 0)
if err != nil {
d.writeFile.Close()
d.writeFile = nil
return err
}
}
}
dataLen := int32(len(data))
if dataLen < d.minMsgSize || dataLen > d.maxMsgSize {
return fmt.Errorf("invalid message write size (%d) maxMsgSize=%d", dataLen, d.maxMsgSize)
}
d.writeBuf.Reset()
err = binary.Write(&d.writeBuf, binary.BigEndian, dataLen)
if err != nil {
return err
}
_, err = d.writeBuf.Write(data)
if err != nil {
return err
}
// only write to the file once
_, err = d.writeFile.Write(d.writeBuf.Bytes())
if err != nil {
d.writeFile.Close()
d.writeFile = nil
return err
}
totalBytes := int64(4 + dataLen)
d.writePos += totalBytes
d.depth += 1
if d.writePos >= d.maxBytesPerFile {
d.writeFileNum++
d.writePos = 0
// sync every time we start writing to a new file
err = d.sync()
if err != nil {
d.logf(ERROR, "DISKQUEUE(%s) failed to sync - %s", d.name, err)
}
if d.writeFile != nil {
d.writeFile.Close()
d.writeFile = nil
}
}
return err
}
可以看到写入的细节,就是使用Go的 binary
库,用大端的方式,把数据写入,最后来一个判断,如果文件超长了,新开一个文件。
其中有一个细节,调用了 d.sync()
,我们来看看:
// sync fsyncs the current writeFile and persists metadata
func (d *diskQueue) sync() error {
if d.writeFile != nil {
err := d.writeFile.Sync()
if err != nil {
d.writeFile.Close()
d.writeFile = nil
return err
}
}
err := d.persistMetaData()
if err != nil {
return err
}
d.needSync = false
return nil
}
// persistMetaData atomically writes state to the filesystem
func (d *diskQueue) persistMetaData() error {
var f *os.File
var err error
fileName := d.metaDataFileName()
tmpFileName := fmt.Sprintf("%s.%d.tmp", fileName, rand.Int())
// write to tmp file
f, err = os.OpenFile(tmpFileName, os.O_RDWR|os.O_CREATE, 0600)
if err != nil {
return err
}
_, err = fmt.Fprintf(f, "%d\n%d,%d\n%d,%d\n",
d.depth,
d.readFileNum, d.readPos,
d.writeFileNum, d.writePos)
if err != nil {
f.Close()
return err
}
f.Sync()
f.Close()
// atomically rename
return os.Rename(tmpFileName, fileName)
}
可以看到调用了 persistMetaData
,什么是metadata呢?就是我们最开始说的,读取的位置、写入的位置、读取的文件号,
写入的文件号等等。可以看到它的实现,就是先打开一个临时文件,把数据写入,并且sync之后,替换原来的metadata文件。
这样就达到了持久化metadata的作用。这就是写入文件和metadata的逻辑。至于读取,则是在ioLoop
的for循环里:
if (d.readFileNum < d.writeFileNum) || (d.readPos < d.writePos) {
if d.nextReadPos == d.readPos {
dataRead, err = d.readOne()
然后
select {
// the Go channel spec dictates that nil channel operations (read or write)
// in a select are skipped, we set r to d.readChan only when there is data to read
case r <- dataRead:
count++
// moveForward sets needSync flag if a file is removed
d.moveForward()
放到 d.readChan
里,消费端则从这个channel里读取。
这就是 go-diskqueue
的实现。
Ref:
更多文章
- socks5 协议详解
- zerotier简明教程
- 搞定面试中的系统设计题
- frp 源码阅读与分析(一):流程和概念
- 用peewee代替SQLAlchemy
- Golang(Go语言)中实现典型的fork调用
- DNSCrypt简明教程
- 一个Gunicorn worker数量引发的血案
- Golang validator使用教程
- Docker组件介绍(一):runc和containerd
- Docker组件介绍(二):shim, docker-init和docker-proxy
- 使用Go语言实现一个异步任务框架
- 协程(coroutine)简介 - 什么是协程?
- SQLAlchemy简明教程
- Go Module 简明教程