1. Hazard Pointer要解决什么问题

Hazard Pointer主要解决了在一个共享数据结构上,读多写少情况下,如何实现无锁的读写操作。

考虑下面一个场景,有一个Map的数据结构,需要实现在多线程下无锁的操作。

// Write Rarely Read Mostly map
template<class K, class V>
class WRRMMap { 
	Map<K, V>* pMap_;
  ...
};

一个常用的实现方法是,多个线程共享指向Map的指针pMap_,当写的时候,构造一个pMap指向对象的拷贝pMapNew,对这个拷贝做更新操作,更新完成后使用一个原子操作替换pMap指针。

Map<K, V>* pNew = nullptr;
do {
  Map<K, V>* pOld = pMap_;
  delete pNew;
  pNew = new Map<K, V>(*pOld); // make a copy
  (*pNew)[k] = v; // do update
} while (!CAS(&pMap_, pOld, pNew)); // atomic update pMap_ pointer

但这样带来的问题是,pOld指针什么时候回收?因为此时pOld可能正在被其他读线程引用,如果在写线程里回收,会出现读线程访问一个不存在地址的错误。

这就是Hazard Pointer要解决的问题。整体解决思路是,对于共享的指针,维护一个全局的读列表(所有线程可见),记录读的指针。每个线程维护一个对于共享指针的写列表,记录pOld。当写线程更新完之后,先把pOld记录下来,然后定期去触发检查回收已经不被读的pOld

2. Hazard Pointer的设计

每次对共享数据结构读的时候,返回一个Hazard Pointer Record,其中包装了具体的指针。同时需要把这个pointer连接记录当前正在读的pointer列表中,具体实现是一个单向链表。

这个单向链表本质上是一个Hazard Pointer Record对象的池子,只有标记为activeHPRecType才表示当前正在被读。

  • Acquire():从池子中获取一个非activeHPRecType p,或创建一个新的HprecType同时连接到池子中。用户可以调用p->pHazard来设置具体指针内容。
  • Release(p): 当读操作结束后,调用Release来释放到池子中,同时标记为非active
// Hazard Pointer record
class HPRecType {
  HPRecType * pNext_; // 链表的下一个节点
  int active_; // 当前这个指针是否是正在被读。
  
  // Global header of the HP list
  static HPRecType* pHead_; // 链表的头
  // The length of the list
  static int listLen_; // 链表长度
 public:
  // Can be used by the thread that acquired it
  void * pHazard_; // 存储共享数据结构指针
  
  static HPRecType * Head() {
    return pHead_;
  }
  
  // Acquires one hazard pointer
  static HPRecType* Acquire() {
    // Try to reuse a retired HP Record
    HPRecType * p = pHead_;
    for (; p; p = p->pNext_) {
      if (p->active_ || ! CAS(&p->active_, 0, 1))
        continue;
      // Got one retired HP Record
      return p;
    }
    // If there's no HP Record available, 
    // allocate one and increment the list length
    int oldLen;
    do {
      oldLen = listLen_;
    } while (!CAS(&listLen_, oldLen, oldLen+1));
    
    // Allocate new one
    HPRecType* p = new HPRecType;
    p->active_ = 1;
    p->pHazard_ = nullptr;
    // Push it to the front
    do {
      old = pHead_;
      p->pNext_ = old;
    } while(!CAS(&pHead_, old, p));
    return p;
  }
  
  // Release a hazard pointer
  static void Release(HPRecType* p) {
    p->pHazard_ = nullptr;
    p->active_ = 0;
  }
};

同时,对于每个线程需要维护一个写过的pointer的列表,称为Retired List,列表中的元素表示当没有读线程引用这个指针的时候,这个指针可以被回收了。

// Per-thread private variable
thread_local std::vector<Map<K, V>*> rlist; // retired list

所以说,在WRRMMap中需要提供一个Retire函数。其

template<class K, class V>
 class WRRMMap {
  	Map<K, V> *pMap_;
   ...
 private:
   static void Retire(Map<K, V>* pOld) {
     // Put it in the retired list
     rlist.push_back(pOld);
     if (rlist.size() >= R) {
       Scan(HPRecType::Head()); // 对比读写列表的差异,回收没有被读的同时retired的指针
     }
   }
 }

Scan的过程就是找到当前Retired List中没有被正在读的指针,回收。具体实现有两点优化:

  1. 选择链表中非空的指针(也就是当前正在被读的),排序,二分查找retired的是否出现在其中;也可以用哈希表来维护hp,查找效率更高。
  2. rlist是一个vector,释放其中元素的时候,先将要删除的元素和最后一个元素交换,然后再pop_back()
void Scan(HPRecType* head) {
  // Stage 1: Scan hazard pointers list collecting all non-null ptrs
  std::vector<void*> hp;
  while (head) {
    void* p = head->pHazard_;
    if (p) {
      hp.push_back(p);
    }
    head = head->pNext_;
  }
  // Stage 2: sort the hazard pointers
  std::sort(hp.begin(), hp.end()), std::less<void*>());
  // Stage 3: set intersect
  std::vector<Map<K, V>*>::iterator i = rlist.begin();
  while (i != rlist.end()) {
    if (!std::binary_search(hp.begin(), hp.end(), *i) {
      // retired pointer not found in hazard pointer sets
      // safe to delete
      delete *i;
      // remove from rlist
      if (&*i != &rlist.back()) {
        *i = rlist.back();
      }
      rlist.pop_back();
    } else {
      ++i;
    }
  }
  
}

如果用哈希表来维护hp的话,Scan的复杂度就是O(R)R也就是触发Scan时,rlist的大小。那R的值一般怎么选择的?一个比较好的选择是R = (1+k)H其中H表示读线程数(也是harzard pointer的数目),k一般是一个比较小的值(如0.25)。

应用到WRRMMap中代码如下:

对于更新操作

void Update(const K& k, const V& v) {
  Map<K, V> * pNew = nullptr;
  do { 
  	Map<K, V>* pOld = pMap_;
    delete pNew;
    pNew = new Map<K, V>(*pOld);
    (*pNew)[k] = v;
  } while (!CAS(&pMap_, pOld, pNew));  // 为了应对多线程的写
  Retire(pOld); // 标记到rlist中
}

对于读操作:

V Lookup(const K& k) {
  HPRecType* pRec = HPRecType::Acquire();
  Map<K, V>* ptr;
  do {
    ptr = pMap_;
    pRec->pHazard_ = ptr;
  } while (pMap_ != ptr); //这里要check,因为可能在执行完ptr=pMap_之后,优于线程切换,导致另外一个线程修改了pMap_的值在例如Update的CAS中。
  V result = (*ptr)[k];
  // Release
 	HPRecType::Release(pRec);
  return result;
}

这里,Lookup的实现是LockFree,但不是WaitFree的,因为do {} while()的存在。对于这个问题在[2]中提出了trap的方法实现了WaitFree的读。

以上,利用了Hazard Pointer,实现了Lock-Free的读多写少场景下的Map。

3. References

[1] Lock-Free Data Structure with Hazard Pointer. 2004

[2] Maged M. Michael. Practical Lock-Free and Wait- Free LL/SC/VL Implementations Using 64-Bit CAS. In Proceedings of the 18th International Conference on Distributed Computing, LNCS vol- ume 3274, pages 144–158, October 2004.