Hazard Pointer
1. Hazard Pointer要解决什么问题
Hazard Pointer主要解决了在一个共享数据结构上,读多写少情况下,如何实现无锁的读写操作。
考虑下面一个场景,有一个Map的数据结构,需要实现在多线程下无锁的操作。
// Write Rarely Read Mostly map
template<class K, class V>
class WRRMMap {
Map<K, V>* pMap_;
...
};
一个常用的实现方法是,多个线程共享指向Map
的指针pMap_
,当写的时候,构造一个pMap
指向对象的拷贝pMapNew
,对这个拷贝做更新操作,更新完成后使用一个原子操作替换pMap
指针。
Map<K, V>* pNew = nullptr;
do {
Map<K, V>* pOld = pMap_;
delete pNew;
pNew = new Map<K, V>(*pOld); // make a copy
(*pNew)[k] = v; // do update
} while (!CAS(&pMap_, pOld, pNew)); // atomic update pMap_ pointer
但这样带来的问题是,pOld
指针什么时候回收?因为此时pOld
可能正在被其他读线程引用,如果在写线程里回收,会出现读线程访问一个不存在地址的错误。
这就是Hazard Pointer要解决的问题。整体解决思路是,对于共享的指针,维护一个全局的读列表(所有线程可见),记录读的指针。每个线程维护一个对于共享指针的写列表,记录pOld
。当写线程更新完之后,先把pOld
记录下来,然后定期去触发检查回收已经不被读的pOld
。
2. Hazard Pointer的设计
每次对共享数据结构读的时候,返回一个Hazard Pointer Record,其中包装了具体的指针。同时需要把这个pointer连接记录当前正在读的pointer列表中,具体实现是一个单向链表。
这个单向链表本质上是一个Hazard Pointer Record
对象的池子,只有标记为active
的HPRecType
才表示当前正在被读。
Acquire()
:从池子中获取一个非active
的HPRecType p
,或创建一个新的HprecType
同时连接到池子中。用户可以调用p->pHazard
来设置具体指针内容。Release(p)
: 当读操作结束后,调用Release
来释放到池子中,同时标记为非active
。
// Hazard Pointer record
class HPRecType {
HPRecType * pNext_; // 链表的下一个节点
int active_; // 当前这个指针是否是正在被读。
// Global header of the HP list
static HPRecType* pHead_; // 链表的头
// The length of the list
static int listLen_; // 链表长度
public:
// Can be used by the thread that acquired it
void * pHazard_; // 存储共享数据结构指针
static HPRecType * Head() {
return pHead_;
}
// Acquires one hazard pointer
static HPRecType* Acquire() {
// Try to reuse a retired HP Record
HPRecType * p = pHead_;
for (; p; p = p->pNext_) {
if (p->active_ || ! CAS(&p->active_, 0, 1))
continue;
// Got one retired HP Record
return p;
}
// If there's no HP Record available,
// allocate one and increment the list length
int oldLen;
do {
oldLen = listLen_;
} while (!CAS(&listLen_, oldLen, oldLen+1));
// Allocate new one
HPRecType* p = new HPRecType;
p->active_ = 1;
p->pHazard_ = nullptr;
// Push it to the front
do {
old = pHead_;
p->pNext_ = old;
} while(!CAS(&pHead_, old, p));
return p;
}
// Release a hazard pointer
static void Release(HPRecType* p) {
p->pHazard_ = nullptr;
p->active_ = 0;
}
};
同时,对于每个线程需要维护一个写过的pointer的列表,称为Retired List,列表中的元素表示当没有读线程引用这个指针的时候,这个指针可以被回收了。
// Per-thread private variable
thread_local std::vector<Map<K, V>*> rlist; // retired list
所以说,在WRRMMap中需要提供一个Retire
函数。其
template<class K, class V>
class WRRMMap {
Map<K, V> *pMap_;
...
private:
static void Retire(Map<K, V>* pOld) {
// Put it in the retired list
rlist.push_back(pOld);
if (rlist.size() >= R) {
Scan(HPRecType::Head()); // 对比读写列表的差异,回收没有被读的同时retired的指针
}
}
}
Scan的过程就是找到当前Retired List中没有被正在读的指针,回收。具体实现有两点优化:
- 选择链表中非空的指针(也就是当前正在被读的),排序,二分查找retired的是否出现在其中;也可以用哈希表来维护
hp
,查找效率更高。 - rlist是一个vector,释放其中元素的时候,先将要删除的元素和最后一个元素交换,然后再
pop_back()
void Scan(HPRecType* head) {
// Stage 1: Scan hazard pointers list collecting all non-null ptrs
std::vector<void*> hp;
while (head) {
void* p = head->pHazard_;
if (p) {
hp.push_back(p);
}
head = head->pNext_;
}
// Stage 2: sort the hazard pointers
std::sort(hp.begin(), hp.end()), std::less<void*>());
// Stage 3: set intersect
std::vector<Map<K, V>*>::iterator i = rlist.begin();
while (i != rlist.end()) {
if (!std::binary_search(hp.begin(), hp.end(), *i) {
// retired pointer not found in hazard pointer sets
// safe to delete
delete *i;
// remove from rlist
if (&*i != &rlist.back()) {
*i = rlist.back();
}
rlist.pop_back();
} else {
++i;
}
}
}
如果用哈希表来维护hp
的话,Scan的复杂度就是O(R)
,R
也就是触发Scan
时,rlist
的大小。那R
的值一般怎么选择的?一个比较好的选择是R = (1+k)H
其中H表示读线程数(也是harzard pointer的数目),k一般是一个比较小的值(如0.25)。
应用到WRRMMap中代码如下:
对于更新操作
void Update(const K& k, const V& v) {
Map<K, V> * pNew = nullptr;
do {
Map<K, V>* pOld = pMap_;
delete pNew;
pNew = new Map<K, V>(*pOld);
(*pNew)[k] = v;
} while (!CAS(&pMap_, pOld, pNew)); // 为了应对多线程的写
Retire(pOld); // 标记到rlist中
}
对于读操作:
V Lookup(const K& k) {
HPRecType* pRec = HPRecType::Acquire();
Map<K, V>* ptr;
do {
ptr = pMap_;
pRec->pHazard_ = ptr;
} while (pMap_ != ptr); //这里要check,因为可能在执行完ptr=pMap_之后,优于线程切换,导致另外一个线程修改了pMap_的值在例如Update的CAS中。
V result = (*ptr)[k];
// Release
HPRecType::Release(pRec);
return result;
}
这里,Lookup的实现是LockFree,但不是WaitFree的,因为do {} while()
的存在。对于这个问题在[2]中提出了trap的方法实现了WaitFree的读。
以上,利用了Hazard Pointer,实现了Lock-Free的读多写少场景下的Map。
3. References
[1] Lock-Free Data Structure with Hazard Pointer. 2004
[2] Maged M. Michael. Practical Lock-Free and Wait- Free LL/SC/VL Implementations Using 64-Bit CAS. In Proceedings of the 18th International Conference on Distributed Computing, LNCS vol- ume 3274, pages 144–158, October 2004.