Redis源码阅读:set是怎么做交并集运算的?
今天我们来看看 Redis 中的set是如何存储数据,然后如何做交集、并集、差集运算的,所以我们一共会探索 SADD
, SISMEMBER
,
SINTER
, SUNION
, SDIFF
这五个命令的实现。
首先我们从 SADD
开始:
{"sadd",saddCommand,-3,
"write use-memory fast @set",
0,NULL,1,1,1,0,0,0},
void saddCommand(client *c) {
robj *set;
int j, added = 0;
set = lookupKeyWrite(c->db,c->argv[1]);
if (checkType(c,set,OBJ_SET)) return;
if (set == NULL) {
set = setTypeCreate(c->argv[2]->ptr);
dbAdd(c->db,c->argv[1],set);
}
for (j = 2; j < c->argc; j++) {
if (setTypeAdd(set,c->argv[j]->ptr)) added++;
}
if (added) {
signalModifiedKey(c,c->db,c->argv[1]);
notifyKeyspaceEvent(NOTIFY_SET,"sadd",c->argv[1],c->db->id);
}
server.dirty += added;
addReplyLongLong(c,added);
}
/* Factory method to return a set that *can* hold "value". When the object has
* an integer-encodable value, an intset will be returned. Otherwise a regular
* hash table. */
robj *setTypeCreate(sds value) {
if (isSdsRepresentableAsLongLong(value,NULL) == C_OK)
return createIntsetObject();
return createSetObject();
}
/* Add the specified value into a set.
*
* If the value was already member of the set, nothing is done and 0 is
* returned, otherwise the new element is added and 1 is returned. */
int setTypeAdd(robj *subject, sds value) {
// ...
}
首先我们看到 setTypeCreate
,这是一个共长方法,如果 isSdsRepresentableAsLongLong
返回是 C_OK
,就会调用
createIntsetObject
并且返回,否则返回 createSetObject
。我们分别看看这两个方法:
robj *createIntsetObject(void) {
intset *is = intsetNew();
robj *o = createObject(OBJ_SET,is);
o->encoding = OBJ_ENCODING_INTSET;
return o;
}
robj *createSetObject(void) {
dict *d = dictCreate(&setDictType,NULL);
robj *o = createObject(OBJ_SET,d);
o->encoding = OBJ_ENCODING_HT;
return o;
}
可以看到,如果要存储的值,如果可以存成 long long,那么就会用 intset
来存,否则就用 dict
。我们先来看后者,
也就是如果放在 dict
里,是怎么实现的:
/* Add the specified value into a set.
*
* If the value was already member of the set, nothing is done and 0 is
* returned, otherwise the new element is added and 1 is returned. */
int setTypeAdd(robj *subject, sds value) {
long long llval;
if (subject->encoding == OBJ_ENCODING_HT) {
dict *ht = subject->ptr;
dictEntry *de = dictAddRaw(ht,value,NULL);
if (de) {
dictSetKey(ht,de,sdsdup(value));
dictSetVal(ht,de,NULL);
return 1;
}
可以看到,其实就是用一个 dict
来存储数据,key为我们要存的数据,value为NULL。接下来我们来看看如果是 intset
将会怎么处理:
/* Add the specified value into a set.
*
* If the value was already member of the set, nothing is done and 0 is
* returned, otherwise the new element is added and 1 is returned. */
int setTypeAdd(robj *subject, sds value) {
// ...
} else if (subject->encoding == OBJ_ENCODING_INTSET) {
if (isSdsRepresentableAsLongLong(value,&llval) == C_OK) { // 如果值可以表示成long long
uint8_t success = 0;
subject->ptr = intsetAdd(subject->ptr,llval,&success); // 添加
if (success) {
/* Convert to regular set when the intset contains
* too many entries. */
if (intsetLen(subject->ptr) > server.set_max_intset_entries)
// 如果保存的数据量超过 server.set_max_intset_entries,也转换成 dict 来保存
setTypeConvert(subject,OBJ_ENCODING_HT);
return 1;
}
} else {
/* Failed to get integer from object, convert to regular set. */
setTypeConvert(subject,OBJ_ENCODING_HT);
/* The set *was* an intset and this value is not integer
* encodable, so dictAdd should always work. */
serverAssert(dictAdd(subject->ptr,sdsdup(value),NULL) == DICT_OK);
return 1;
}
到底为止,我们了解到了,Redis保存set有两种形式,当数据都是整数,而且保存的数据量小于一定量时,用的是 intset
,否则
用 dict
来保存。接下来我们来看看 intset
是怎么实现的。
intset 实现
我们从 setTypeCreate
开始入手:
robj *setTypeCreate(sds value) {
if (isSdsRepresentableAsLongLong(value,NULL) == C_OK)
return createIntsetObject();
return createSetObject();
}
robj *createIntsetObject(void) {
intset *is = intsetNew();
robj *o = createObject(OBJ_SET,is);
o->encoding = OBJ_ENCODING_INTSET;
return o;
}
/* Create an empty intset. */
intset *intsetNew(void) {
intset *is = zmalloc(sizeof(intset));
is->encoding = intrev32ifbe(INTSET_ENC_INT16);
is->length = 0;
return is;
}
typedef struct intset {
uint32_t encoding;
uint32_t length;
int8_t contents[];
} intset;
看样子,该不会又是 ziplist
那一套吧?
/* Insert an integer in the intset */
intset *intsetAdd(intset *is, int64_t value, uint8_t *success) {
uint8_t valenc = _intsetValueEncoding(value);
uint32_t pos;
if (success) *success = 1;
/* Upgrade encoding if necessary. If we need to upgrade, we know that
* this value should be either appended (if > 0) or prepended (if < 0),
* because it lies outside the range of existing values. */
if (valenc > intrev32ifbe(is->encoding)) {
/* This always succeeds, so we don't need to curry *success. */
return intsetUpgradeAndAdd(is,value);
} else {
/* Abort if the value is already present in the set.
* This call will populate "pos" with the right position to insert
* the value when it cannot be found. */
if (intsetSearch(is,value,&pos)) {
if (success) *success = 0;
return is;
}
is = intsetResize(is,intrev32ifbe(is->length)+1);
if (pos < intrev32ifbe(is->length)) intsetMoveTail(is,pos,pos+1);
}
_intsetSet(is,pos,value);
is->length = intrev32ifbe(intrev32ifbe(is->length)+1);
return is;
}
/* Note that these encodings are ordered, so:
* INTSET_ENC_INT16 < INTSET_ENC_INT32 < INTSET_ENC_INT64. */
#define INTSET_ENC_INT16 (sizeof(int16_t))
#define INTSET_ENC_INT32 (sizeof(int32_t))
#define INTSET_ENC_INT64 (sizeof(int64_t))
/* Return the required encoding for the provided value. */
static uint8_t _intsetValueEncoding(int64_t v) {
if (v < INT32_MIN || v > INT32_MAX)
return INTSET_ENC_INT64;
else if (v < INT16_MIN || v > INT16_MAX)
return INTSET_ENC_INT32;
else
return INTSET_ENC_INT16;
}
/* Search for the position of "value". Return 1 when the value was found and
* sets "pos" to the position of the value within the intset. Return 0 when
* the value is not present in the intset and sets "pos" to the position
* where "value" can be inserted. */
static uint8_t intsetSearch(intset *is, int64_t value, uint32_t *pos) {
int min = 0, max = intrev32ifbe(is->length)-1, mid = -1;
int64_t cur = -1;
/* The value can never be found when the set is empty */
if (intrev32ifbe(is->length) == 0) {
if (pos) *pos = 0;
return 0;
} else {
/* Check for the case where we know we cannot find the value,
* but do know the insert position. */
if (value > _intsetGet(is,max)) {
if (pos) *pos = intrev32ifbe(is->length);
return 0;
} else if (value < _intsetGet(is,0)) {
if (pos) *pos = 0;
return 0;
}
}
// 二分查找
while(max >= min) {
mid = ((unsigned int)min + (unsigned int)max) >> 1;
cur = _intsetGet(is,mid);
if (value > cur) {
min = mid+1;
} else if (value < cur) {
max = mid-1;
} else {
break;
}
}
if (value == cur) {
if (pos) *pos = mid;
return 1;
} else {
if (pos) *pos = min;
return 0;
}
}
/* Resize the intset */
static intset *intsetResize(intset *is, uint32_t len) {
uint32_t size = len*intrev32ifbe(is->encoding);
is = zrealloc(is,sizeof(intset)+size);
return is;
}
从上面可以看出来,intset又是 ziplist
那一套,is->encoding
记录当前保存的数据,都是什么类型,比如可以选择的
类型为 int16_t
, int32_t
,int64_t
,然后在 is->content
处保存数据,但是是以有序的形式保存。
所以我们现在了解到,intset
有两个特点:
intset
和ziplist
一样,也是以连续内存块的形式保存数据,然后保存了数据的编码(其实也就是大小)intset
的数据是有序保存的,因此查找时可以用二分查找,插入时也可以二分查找出要插入的位置
集合运算(交集并集差集)是怎么实现的
最后我们来看一眼,交集并集差集分别是怎么实现的。
交集:
{"sinter",sinterCommand,-2,
"read-only to-sort @set",
0,NULL,1,-1,1,0,0,0},
void sinterCommand(client *c) {
sinterGenericCommand(c,c->argv+1,c->argc-1,NULL);
}
void sinterGenericCommand(client *c, robj **setkeys,
unsigned long setnum, robj *dstkey) {
// ...
/* Sort sets from the smallest to largest, this will improve our
* algorithm's performance */
qsort(sets,setnum,sizeof(robj*),qsortCompareSetsByCardinality);
// ...
/* Iterate all the elements of the first (smallest) set, and test
* the element against all the other sets, if at least one set does
* not include the element it is discarded */
si = setTypeInitIterator(sets[0]);
while((encoding = setTypeNext(si,&elesds,&intobj)) != -1) {
for (j = 1; j < setnum; j++) {
if (sets[j] == sets[0]) continue;
if (encoding == OBJ_ENCODING_INTSET) {
/* intset with intset is simple... and fast */
if (sets[j]->encoding == OBJ_ENCODING_INTSET &&
!intsetFind((intset*)sets[j]->ptr,intobj))
{
break;
/* in order to compare an integer with an object we
* have to use the generic function, creating an object
* for this */
} else if (sets[j]->encoding == OBJ_ENCODING_HT) {
elesds = sdsfromlonglong(intobj);
if (!setTypeIsMember(sets[j],elesds)) {
sdsfree(elesds);
break;
}
sdsfree(elesds);
}
} else if (encoding == OBJ_ENCODING_HT) {
if (!setTypeIsMember(sets[j],elesds)) {
break;
}
}
}
}
// ...
}
可以看到,做交集的逻辑,其实就是先根据集合大小把集合排序。然后以最小的集合为基准,遍历每一个元素,然后去遍历其它set, 只要有一个不是子元素,就可以跳过这个元素了。
并集:
{"sunion",sunionCommand,-2,
"read-only to-sort @set",
0,NULL,1,-1,1,0,0,0},
void sunionCommand(client *c) {
sunionDiffGenericCommand(c,c->argv+1,c->argc-1,NULL,SET_OP_UNION);
}
void sunionDiffGenericCommand(client *c, robj **setkeys, int setnum,
robj *dstkey, int op) {
// ...
/* We need a temp set object to store our union. If the dstkey
* is not NULL (that is, we are inside an SUNIONSTORE operation) then
* this set object will be the resulting object to set into the target key*/
dstset = createIntsetObject();
if (op == SET_OP_UNION) {
/* Union is trivial, just add every element of every set to the
* temporary set. */
for (j = 0; j < setnum; j++) {
if (!sets[j]) continue; /* non existing keys are like empty sets */
si = setTypeInitIterator(sets[j]);
while((ele = setTypeNextObject(si)) != NULL) {
if (setTypeAdd(dstset,ele)) cardinality++;
sdsfree(ele);
}
setTypeReleaseIterator(si);
}
// ...
}
并集很简单,遍历所有集合,把元素全都加进去即可。
差集:
{"sdiff",sdiffCommand,-2,
"read-only to-sort @set",
0,NULL,1,-1,1,0,0,0},
void sdiffCommand(client *c) {
sunionDiffGenericCommand(c,c->argv+1,c->argc-1,NULL,SET_OP_DIFF);
}
void sunionDiffGenericCommand(client *c, robj **setkeys, int setnum,
robj *dstkey, int op) {
// ...
} else if (op == SET_OP_DIFF && sets[0] && diff_algo == 1) {
/* DIFF Algorithm 1:
*
* We perform the diff by iterating all the elements of the first set,
* and only adding it to the target set if the element does not exist
* into all the other sets.
*
* This way we perform at max N*M operations, where N is the size of
* the first set, and M the number of sets. */
si = setTypeInitIterator(sets[0]);
while((ele = setTypeNextObject(si)) != NULL) {
for (j = 1; j < setnum; j++) {
if (!sets[j]) continue; /* no key is an empty set. */
if (sets[j] == sets[0]) break; /* same set! */
if (setTypeIsMember(sets[j],ele)) break;
}
if (j == setnum) {
/* There is no other set with this element. Add it. */
setTypeAdd(dstset,ele);
cardinality++;
}
sdsfree(ele);
}
setTypeReleaseIterator(si);
} else if (op == SET_OP_DIFF && sets[0] && diff_algo == 2) {
/* DIFF Algorithm 2:
*
* Add all the elements of the first set to the auxiliary set.
* Then remove all the elements of all the next sets from it.
*
* This is O(N) where N is the sum of all the elements in every
* set. */
for (j = 0; j < setnum; j++) {
if (!sets[j]) continue; /* non existing keys are like empty sets */
si = setTypeInitIterator(sets[j]);
while((ele = setTypeNextObject(si)) != NULL) {
if (j == 0) {
if (setTypeAdd(dstset,ele)) cardinality++;
} else {
if (setTypeRemove(dstset,ele)) cardinality--;
}
sdsfree(ele);
}
setTypeReleaseIterator(si);
/* Exit if result set is empty as any additional removal
* of elements will have no effect. */
if (cardinality == 0) break;
}
// ...
}
做差集有两种方式,一种是遍历第一个集合,如果元素不在后面所有集合内,就加入;第二种是把第一个集合所有元素加到结果集合里, 然后再把所有在后面的集合里的元素,从这个结果集合里移除。
总结
这篇文章里,我们看了一下Redis是如何实现set这个数据结构的,Redis会在内容为整数并且数量足够小时,使用一种类似 ziplist
的方式,也就是 intset
来保存数据,其它情况,则会使用一个 dict
来保存数据,此外我们分别看了一下三种常见的集合运算
是怎么实现的。
更多文章
- socks5 协议详解
- zerotier简明教程
- 搞定面试中的系统设计题
- frp 源码阅读与分析(一):流程和概念
- 用peewee代替SQLAlchemy
- Golang(Go语言)中实现典型的fork调用
- DNSCrypt简明教程
- 一个Gunicorn worker数量引发的血案
- Golang validator使用教程
- Docker组件介绍(一):runc和containerd
- Docker组件介绍(二):shim, docker-init和docker-proxy
- 使用Go语言实现一个异步任务框架
- 协程(coroutine)简介 - 什么是协程?
- SQLAlchemy简明教程
- Go Module 简明教程