7.2 无锁数据结构的例子

为了展示一些在设计无锁数据结构中使用到的技术,我们将看到一些无锁实现的简单数据结构。不仅要在每个例子中描述一个数据结构的实现,还将使用这些例子的某些特别之处来阐述对于无锁数据结构的设计。

无锁结构依赖与原子操作和内存序,以确保多线程以正确的顺序访问数据结构。所有原子操作默认使用的是memory_order_seq_cst内存序。不过,后面的例子中将会降低内存序的要求,使用memory_order_acquire,memory_order_release,甚至memory_order_relaxed。虽然例子中没有直接使用锁,但需要注意的是对std::atomic_flag的使用。一些平台上无锁结构的实现(实际上在C++的标准库的实现中),使用了内部锁(详见第5章)。另一些平台上,基于锁的简单数据结构可能会更加合适;当然,还有很多平台不明确实现细节;选择一种实现前,需要明确需求,并且配置各种选项以满足需求。

回到数据结构上来吧,最简单的数据结构——栈。

7.2.1 实现一个无锁的线程安全栈

栈的要求很简单:查询顺序是添加顺序的逆序——先入后出(LIFO)。所以,确保值能安全的入栈就十分重要,因为很可能在入栈后,马上被其他线程索引,同时确保只有一个线程能索引到给定。最简单的栈就是链表,head指针指向第一个节点(可能是下一个被索引到的节点),并且每个节点依次指向下一个节点。

这样的情况下,添加一个节点相对来说很简单:

  1. 创建一个新节点node。
  2. 让node->next指向head->next。
  3. head->next指向node。

单线程的上下文中,这种方式没有问题,不过当多线程对栈进行修改时,这几步就不够用了。当有两个线程同时添加节点的时候,第2步和第3步的时候会产生条件竞争:一个线程可能在修改head值时,另一个线程正在执行第2步,并且在第3步中对head进行更新。就会使之前那个线程的结果被丢弃,亦或是造成更加糟糕的后果。了解了如何解决这个条件竞争之前,还要注意一个事:当head更新并指向了新节点时,另一个线程就能读取到这个节点了。因此,head设置为指向新节点前,新节点完全准备就绪就变很重要;因为,在这之后就不能对节点进行修改了。

OK,那如何应对条件竞争呢?答案就是:第3步的时候使用原子“比较/交换”操作,来保证步骤2对head进行读取时,不会对head进行修改;有修改时可以循环“比较/交换”操作。下面的代码就展示了,不用锁来实现线程安全的push()函数。

清单7.2 不用锁实现push()

template<typename T>
class lock_free_stack
{
private:
  struct node
  {
    T data;
    node* next;

    node(T const& data_):  // 1
     data(data_)
    {}
  };

  std::atomic<node*> head;
public:
  void push(T const& data)
  {
    node* const new_node=new node(data); // 2
    new_node->next=head.load();  // 3
    while(!head.compare_exchange_weak(new_node->next,new_node));  // 4
  }
};

上面代码几乎能匹配之前所说的三个步骤:创建一个新节点②,设置新节点的next指针指向当前head③,并设置head指针指向新节点④。node结构用其自身的构造函数来进行数据填充①,必须保证节点在构造完成后随时能被弹出。之后需要使用compare_exchange_weak()来保证在被存储到new_node->next的head指针和之前的一样③。代码的亮点是使用“比较/交换”操作:返回false时,因为比较失败(例如,head被其他线程锁修改),会使用head中的内容更新new_node->next(第一个参数)的内容。循环中不需要每次都重新加载head指针,因为编译器会完成这件事。同样,因为循环可能直接就失败了,所以使用compare_exchange_weak要好于使用compare_exchange_strong(详见第5章)。

所以,暂时不需要pop()操作,可以先快速检查一下push()的实现是否有违指导意见。这里唯一一个能抛出异常的就构造新node时①,不过其会自行处理且链表中的内容没有被修改,所以是安全的。因为构建数据时,将其作为node的一部分进行存储,并且使用compare_exchange_weak()来更新head指针,所以没有恶性条件竞争。“比较/交换”成功时,节点已经准备就绪且随时可以提取。因为没有锁,所以不存在死锁,这里的push()函数实现的很成功。

那么,现在已经有往栈中添加数据的方法了,现在需要删除数据的方法。也很简单,其步骤如下:

  1. 获取head。
  2. 读取head->next指向的结点node。
  3. 设置head->next指向node->next。
  4. 通过node返回携带的数据data。
  5. 删除node节点。

在多线程环境下,就不那么简单了。有两个线程要从栈中移除数据时,两个线程可能在步骤1中读取到同一个head(值相同)。其中一个线程处理到步骤5时,另一个线程还在处理步骤2,这个还在处理步骤2的线程将会解引用一个悬空指针。这是书写无锁代码所遇到的问题之一,所以现在只能跳过步骤5,让节点泄露。

另一个问题:两个线程读取到同一个head值时,它们将返回同一个节点。这就违反了栈结构的意图,所以需要避免这样的情况发生。可以像在push()函数中解决条件竞争那样来解决这个问题:使用“比较/交换”操作更新head。当“比较/交换”操作失败时,不是一个新节点已被推入,就是其他线程已经弹出了节点。无论是哪种情况,都得返回步骤1(“比较/交换”操作将会重新读取head)。

当“比较/交换”成功,就可以确定当前线程是弹出指定节点的唯一线程,之后就可以放心的执行步骤4了。这里先看一下pop()的雏形:

template<typename T>
class lock_free_stack
{
public:
  void pop(T& result)
  {
    node* old_head=head.load();
    while(!head.compare_exchange_weak(old_head,old_head->next));
    result=old_head->data;
  }
};

这段代码很优雅,但有两个节点泄露的问题。首先,这段代码在空链表时不工作:当head指针是空指针时,要访问next指针时,将引起未定义行为。很容易通过对nullptr的检查进行修复(在while循环中),要不对空栈抛出一个异常,要不返回一个bool值来表明成功与否。

第二个问题就是异常安全。第3章中介绍栈结构时,了解了在返回值的时候会出现异常安全问题:有异常被抛出时,复制的值将丢失。这种情况下,传入引用是一种可以接受的解决方案;这样就能保证当有异常抛出时,栈上的数据不会丢失。不幸的是,不能这样做;只能在单一线程对值进行返回时,才进行拷贝以确保拷贝操作的安全性,这就意味着在拷贝结束后这个节点就会被删除。因此,通过引用获取返回值的方式没有任何优点:直接返回也是可以的。若想要安全的返回,必须使用第3章中的其他方法:返回指向数据值的(智能)指针。

当返回的是智能指针时,返回nullptr以表明没有值可返回,但是要求在堆上对智能指针进行内存分配。将分配过程做为pop()的一部分时(也没有更好的选择了),堆分配内存时可能会抛出一个异常。与此相反,在push()操作中对内存进行分配——无论怎样,都需要对node进行内存分配。返回一个std::shared_ptr<>不会抛出异常,所以在pop()中进行内存分配是安全的。将上面的点放在一起,就能看到如下的代码。

清单7.3 带有节点泄露的无锁栈

template<typename T>
class lock_free_stack
{
private:
  struct node
  {
    std::shared_ptr<T> data;  // 1 指针获取数据
    node* next;

    node(T const& data_):
      data(std::make_shared<T>(data_))  // 2 让std::shared_ptr指向新分配出来的T
    {}
  };

  std::atomic<node*> head;
public:
  void push(T const& data)
  {
    node* const new_node=new node(data);
    new_node->next=head.load();
    while(!head.compare_exchange_weak(new_node->next,new_node));
  }
  std::shared_ptr<T> pop()
  {
    node* old_head=head.load();
    while(old_head && // 3 在解引用前检查old_head是否为空指针
      !head.compare_exchange_weak(old_head,old_head->next));
    return old_head ? old_head->data : std::shared_ptr<T>();  // 4
  }
};

智能指针指向当前数据①,必须在堆上为数据分配内存(在node结构体中)②。而后,compare_exchage_weak()循环中③,需要在old_head指针前,检查指针是否为空。如果存在相关节点,将会返回相关节点的值;不存在时,将返回一个空指针④。注意,结构是无锁的,但并不是无等待的,因为在push()和pop()函数中都有while循环,当compare_exchange_weak()总是失败的时候,循环将会持续下去。

如果一个垃圾收集机制(就像在管理语言中,比如C#语言或Java语言),那到这就结束了;节点一旦它不再被任何线程访问,将被收集和循环利用。然而,并不是所有C++编译器都有垃圾收集机制,所以垃圾通常需要自己收拾。

7.2.2 终止内存泄露:使用无锁数据结构管理内存

第一次了解pop()时,为了避免条件竞争(当有线程删除一个节点的同时,其他线程还持有指向该节点的指针,并且要解引用)选择了带有内存泄露的节点。因为内存泄露都不可接受,所以现在来解决这个问题!

基本问题在于,当要释放一个节点时,需要确认其他线程没有持有这个节点。只有一个线程调用pop()时,可以放心的释放。当节点添加入栈后,push()就不会与节点有任何的关系了,所以调用pop()函数的线程只和已加入节点有关,并且能够安全的将节点删除。

另一方面,栈同时处理多线程对pop()的调用时,就知道节点什么时候被删除。实际上就需要写一个专用的垃圾收集器。这听起来相当棘手,不过也没多糟糕:需要检查节点,并且检查哪些节点可被pop()访问。不需担心push()中的节点,因为这些节点推到栈上以后才能被访问到,而多线程只能通过pop()访问同一节点。

没有线程调用pop()时,就可以删除栈上的任意节点。添加节点到“可删除”列表中时,就能从结点中提取数据了。没有线程通过pop()访问节点时,可以安全的删除这些节点了。怎么知道没有线程调用pop()了呢?很简单——计数即可。当计数器数值增加时,就有节点推入;当减少时,就有节点删除。从“可删除”列表中删除节点就很安全,直到计数器的值为0为止。这个计数器必须是原子的,这样才能在多线程的情况下正确的进行计数。下面的代码清单中,展示了修改后的pop()函数,有些功能的实现将在清单7.5中给出。

清单7.4 没有线程通过pop()访问节点时,就对节点进行回收

template<typename T>
class lock_free_stack
{
private:
  std::atomic<unsigned> threads_in_pop;  // 1 原子变量
  void try_reclaim(node* old_head);
public:
  std::shared_ptr<T> pop()
  {
    ++threads_in_pop;  // 2 在做事之前,计数值加1
    node* old_head=head.load();
    while(old_head &&
      !head.compare_exchange_weak(old_head,old_head->next));
    std::shared_ptr<T> res;
    if(old_head)
    { 
      res.swap(old_head->data);  // 3 回收删除的节点
    }
    try_reclaim(old_head);  // 4 从节点中直接提取数据,而非拷贝指针
    return res;
  }
};

threads_in_pop①原子变量用来记录有多少线程试图弹出栈中的元素。调用pop()②函数时,计数器加一;调用try_reclaim()时,计数器减一;这个函数被节点调用时,说明节点已被删除④。因为暂时不需要将节点删除,可以通过swap()函数来删除节点上的数据③(而非只是拷贝指针),当不再需要这些数据的时候,这些数据会自动删除,而不是持续存在着(因为还有对未删除节点的引用)。接下来看一下try_reclaim()是如何实现的。

清单7.5 采用引用计数的回收机制

template<typename T>
class lock_free_stack
{
private:
  std::atomic<node*> to_be_deleted;

  static void delete_nodes(node* nodes)
  {
    while(nodes)
    {
      node* next=nodes->next;
      delete nodes;
      nodes=next;
    }
  }
  void try_reclaim(node* old_head)
  {
    if(threads_in_pop==1)  // 1
    {
      node* nodes_to_delete=to_be_deleted.exchange(nullptr);  // 2 声明“可删除”列表
      if(!--threads_in_pop)  // 3 是否只有一个线程调用pop()?
      {
        delete_nodes(nodes_to_delete);  // 4
      }
      else if(nodes_to_delete)  // 5
      {
         chain_pending_nodes(nodes_to_delete);  // 6
      }
      delete old_head;  // 7
    }
    else
    {
      chain_pending_node(old_head);  // 8
      --threads_in_pop;
    }
  }
  void chain_pending_nodes(node* nodes)
  {
    node* last=nodes;
    while(node* const next=last->next)  // 9 让next指针指向链表的末尾
    {
      last=next;
    }
    chain_pending_nodes(nodes,last);
  }

  void chain_pending_nodes(node* first,node* last)
  {
    last->next=to_be_deleted;  // 10
    while(!to_be_deleted.compare_exchange_weak(  // 11 用循环来保证last->next的正确性
      last->next,first));
    }
    void chain_pending_node(node* n)
    {
      chain_pending_nodes(n,n);  // 12
    }
};

回收节点时①,threads_in_pop的数值是1,当前线程正在对pop()进行访问,这时就可以安全的将节点删除了⑦(将等待节点删除也是安全的)。当数值不是1时,删除任何节点都不安全,所以需要向等待列表中继续添加节点⑧。

假设某一时刻,threads_in_pop的值为1。就可以尝试回收等待列表,如果不回收,节点就会继续等待,直到整个栈被销毁。要做到回收,首先要通过原子exchange操作声明②删除列表,并将计数器减一③。如果之后计数的值为0,意味着没有其他线程访问等待节点链表。不必为出现新的等待节点而烦恼,因为它们会被安全的回收。而后,可以使用delete_nodes对链表进行迭代,并将其删除④。

计数值在减后不为0时,回收节点就不安全;如果存在⑤,就需要将其挂在等待删除链表之后⑥,这种情况会发生在多个线程同时访问数据结构的时候。一些线程在第一次测试threads_in_pop①和对“回收”链表的声明②操作间调用pop(),这可能会将一个已经被线程访问的节点新填入到链表中。图7.1中,线程C添加节点Y到to_be_deleted链表中,即使线程B仍将其引用作为old_head,之后会尝试访问其next指针。线程A删除节点时,会造成线程B发生未定义行为。

7.2 无锁数据结构的例子 - 图1

图7.1 三个线程同时调用pop(),要在try_reclaim()对声明节点进行删除前对threads_in_pop进行检查。

为了将等待删除的节点添加入等待删除链表,需要复用节点的next指针将等待删除节点链接在一起。将已存在的链表链接到删除链表后面,通过遍历的方式找到链表的末尾⑨,将最后一个节点的next指针替换为当前to_be_deleted指针⑩,并且将链表的第一个节点作为新的to_be_deleted指针进行存储⑪。这需要在循环中使用compare_exchange_weak来保证通过其他线程添加进来的节点,不会发生内存泄露。链表发生改变时,更新next指针会很方便。添加单个节点是一种特殊情况,因为需要将这个节点作为第一个节点,同时也是最后一个节点进行添加⑫。

低负荷的情况下,这种方式没有问题,因为在没有线程访问pop()。不过,这只是瞬时的状态,也就是在回收前,需要检查threads_in_pop计数为0③的原因,也是删除节点⑦前进行对计数器检查的原因。删除节点是一项非常耗时的工作,且希望其他线程能对链表做的修改越少越好。从第一次发现threads_in_pop是1到尝试删除节点,会用耗费很长的时间,就会让线程有机会调用pop(),让threads_in_pop不为0,阻止节点的删除操作。

高负荷的情况,因为其他线程在初始化之后都能进入pop(),所以to_ne_deleted链表将会无界的增加,并且会再次泄露。不存在任何静态的情况时,就得为回收节点寻找替代机制。关键是要确定无线程访问给定节点,这样给定节点就能被回收。最简单的替换机制就是使用风险指针(hazard pointer)。

7.2.3 使用风险指针检测不可回收的节点

“风险指针”这个术语引用于Maged Michael的技术发现[1]。之所以这样叫是因为,删除一个节点可能会让其他引用其的线程处于危险之中。其他线程持有已删除节点的指针时,对其进行解引用操作时,会出现未定义行为。基本观点就是,当有线程去访问要被(其他线程)删除的对象时,会先设置对这个对象设置风险指针,而后通知其他线程——使用这个指针是危险的行为。当这个对象不再被需要,那么就可以清除风险指针了。如果了解牛津/剑桥的龙舟比赛,其实这里使用的机制和龙舟比赛开赛时差不多:每个船上的舵手都举起手来,以表示他们还没有准备好。只要有舵手举着手,裁判就不能让比赛开始。当所有舵手的手都放下后,比赛才能开始;比赛还未开始或感觉自己船队的情况有变时,舵手可以再次举手。

当线程想要删除一个对象,就必须检查系统中其他线程是否持有风险指针。当没有风险指针时,就可以安全删除对象。否则,就必须等待风险指针消失。这样,线程就需要周期性的检查要删除的对象是否能安全删除。

看起来很简单,在C++中应该怎么做呢?

首先,需要能存储指向访问对象的指针,也就是风险指针。指针必须能让所有线程看到,需要线程能够对数据结构进行访问。正确且高效的分配这些线程的确是一个挑战,所以这个问题放在后面解决。假设有一个get_hazard_pointer_for_current_thread()函数,可以返回风险指针的引用。当读取一个指针,并且想要解引用它的时候,就需要这个函数——这种情况下head数值源于下面的列表:

std::shared_ptr<T> pop()
{
  std::atomic<void*>& hp=get_hazard_pointer_for_current_thread();
  node* old_head=head.load();  // 1
  node* temp;
  do
  {
    temp=old_head;
    hp.store(old_head);  // 2
    old_head=head.load();
  } while(old_head!=temp); // 3
  // ...
}

while循环能保证node不会在读取旧head指针①时,以及在设置风险指针的时被删除。这种模式下,其他线程不知道有线程对这个节点进行了访问。幸运的是,旧head节点要被删除时,head本身会发生变化,所以需要对head进行检查并持续循环,直到head指针中的值与风险指针中的值相同③。使用风险指针,如同依赖对已删除对象的引用。使用默认的new和delete操作对风险指针进行操作时,会出现未定义行为,所以需要确定实现是否支持这样的操作,或使用自定义内存分配器来保证用法的正确性。

现在已经设置了风险指针,就可以对pop()进行处理了,基于了解到的安全知识,不会有其他线程来删除节点。每一次重新加载old_head后,解引用读取到的指针时,就需要更新风险指针。从链表中提取一个节点时,就可以清除风险指针了。如果没有其他风险指针引用节点,就可以安全的删除节点了;否则,就需要将其添加到链表中,之后再进行删除。下面的代码就是对该方案的完整实现。

清单7.6 使用风险指针实现的pop()

std::shared_ptr<T> pop()
{
  std::atomic<void*>& hp=get_hazard_pointer_for_current_thread();
  node* old_head=head.load();
  do
  {
    node* temp;
    do  // 1 直到将风险指针设为head指针
    {
      temp=old_head;
      hp.store(old_head);
      old_head=head.load();
    } while(old_head!=temp);
  }
  while(old_head &&
    !head.compare_exchange_strong(old_head,old_head->next));
  hp.store(nullptr);  // 2 当声明完成,清除风险指针
  std::shared_ptr<T> res;
  if(old_head)
  {
    res.swap(old_head->data);
    if(outstanding_hazard_pointers_for(old_head))  // 3 在删除之前对风险指针引用的节点进行检查
    {
      reclaim_later(old_head);  // 4
    }
    else
    {
      delete old_head;  // 5
    }
    delete_nodes_with_no_hazards();  // 6
  }
  return res;
}

首先,循环内部会对风险指针进行设置。“比较/交换”操作失败时会重载old_head,再次进行设置①。因为需要在循环内部做一些实际的工作,所以要使用compare_exchange_strong():当compare_exchange_weak()伪失败后,风险指针将被重置(没有必要)。过程能保证风险指针在解引用(old_head)之前被正确的设置。已声明了一个风险指针时,就可以将其清除了②。如果想要获取一个节点,就需要检查其他线程上的风险指针,检查是否有其他指针引用该节点③。如果有,就不能删除节点,只能将其放回链表中,之后再进行回收④;如果没有,就能直接将这个节点删除⑤。最后需要对任意节点进行检查,可以调用reclaim_later()。如果链表上没有任何风险指针引用节点,就可以安全的删除这些节点⑥。当有节点持有风险指针,就只能等待下一个调用pop()的线程退出。

当然,这些函数——get_hazard_pointer_for_current_thread(), reclaim_later(), outstanding_hazard_pointers_for(), 和delete_nodes_with_no_hazards()——的实现细节我们还没有看到,先来看看它们是如何工作的。

为线程分配风险指针实例的具体方案:使用get_hazard_pointer_for_current_thread()与程序逻辑没什么关系(不过会影响效率,接下会看到具体的情况)。可以使用一个简单的结构体:固定长度的“线程ID-指针”数组。get_hazard_pointer_for_curent_thread()可以通过这个数据来找到第一个释放槽,并将当前线程的ID放入到这个槽中。线程退出时,槽就再次置空,可以通过默认构造std::thread::id()将线程ID放入槽中。实现如下:

清单7.7 get_hazard_pointer_for_current_thread()函数的简单实现

unsigned const max_hazard_pointers=100;
struct hazard_pointer
{
  std::atomic<std::thread::id> id;
  std::atomic<void*> pointer;
};
hazard_pointer hazard_pointers[max_hazard_pointers];

class hp_owner
{
  hazard_pointer* hp;

public:
  hp_owner(hp_owner const&)=delete;
  hp_owner operator=(hp_owner const&)=delete;
  hp_owner():
    hp(nullptr)
  {
    for(unsigned i=0;i<max_hazard_pointers;++i)
    {
      std::thread::id old_id;
      if(hazard_pointers[i].id.compare_exchange_strong(  // 6 尝试声明风险指针的所有权
         old_id,std::this_thread::get_id()))
      {
        hp=&hazard_pointers[i];
        break;  // 7
      }
    }
    if(!hp)  // 1
    {
      throw std::runtime_error("No hazard pointers available");
    }
  }

  std::atomic<void*>& get_pointer()
  {
    return hp->pointer;
  }

  ~hp_owner()  // 2
  {
    hp->pointer.store(nullptr);  // 8
    hp->id.store(std::thread::id());  // 9
  }
};

std::atomic<void*>& get_hazard_pointer_for_current_thread()  // 3
{
  thread_local static hp_owner hazard;  // 4 每个线程都有自己的风险指针
  return hazard.get_pointer();  // 5
}

get_hazard_pointer_for_current_thread()的实现看起来很简单③:一个hp_owner④类型的thread_local(本线程所有)变量,用来存储当前线程的风险指针,返回这个变量所持有的指针⑤。之后的工作:有线程第一次调用这个函数时,新hp_owner实例就被创建。这个实例的构造函数⑥会通过查询“所有者/指针”表,寻找没有所有者的记录。用compare_exchange_strong()来检查某个记录是否有所有者,并进行析构②。compare_exchange_strong()失败时,其他线程也可拥有这个记录,所以可以继续执行下去。当交换成功,当前线程就拥有了这些记录,而后进行存储并停止搜索⑦。遍历了列表也没有找到物所有权的记录①时,就说明有很多线程在使用风险指针,所以会抛出一个异常。

当hp_owner实例被一个给定的线程所创建,之后的访问会很快,因为指针在缓存中,所以表不需要再次遍历。

当线程退出时,hp_owner的实例将会被销毁。析构函数会在std::thread::id()设置拥有者ID前,将指针重置为nullptr,这样就允许其他线程对这条记录进行复用⑧⑨。

实现get_hazard_pointer_for_current_thread()后,outstanding_hazard_pointer_for()实现就简单了:只需要对风险指针表进行搜索,就可以找到对应记录。

bool outstanding_hazard_pointers_for(void* p)
{
  for(unsigned i=0;i<max_hazard_pointers;++i)
  {
    if(hazard_pointers[i].pointer.load()==p)
    {
      return true;
    }
  }
  return false;
}

实现都不需要对记录的所有者进行验证:没有所有者的记录会是一个空指针,所以比较代码将总返回false,通过这种方式简化代码。

reclaim_later()和delete_nodes_with_no_hazards()可以对简单的链表进行操作;reclaim_later()只是将节点添加到列表中,delete_nodes_with_no_hazards()就是搜索整个列表,并将无风险指针的记录进行删除。下面将展示它们的具体实现。

清单7.8 回收函数的简单实现

template<typename T>
void do_delete(void* p)
{
  delete static_cast<T*>(p);
}

struct data_to_reclaim
{
  void* data;
  std::function<void(void*)> deleter;
  data_to_reclaim* next;

  template<typename T>
  data_to_reclaim(T* p):  // 1
    data(p),
    deleter(&do_delete<T>),
    next(0)
  {}

  ~data_to_reclaim()
  {
    deleter(data);  // 2
  }
};

std::atomic<data_to_reclaim*> nodes_to_reclaim;

void add_to_reclaim_list(data_to_reclaim* node)  // 3
{
  node->next=nodes_to_reclaim.load();
  while(!nodes_to_reclaim.compare_exchange_weak(node->next,node));
}

template<typename T>
void reclaim_later(T* data)  // 4
{
  add_to_reclaim_list(new data_to_reclaim(data));  // 5
}

void delete_nodes_with_no_hazards()
{
  data_to_reclaim* current=nodes_to_reclaim.exchange(nullptr);  // 6
  while(current)
  {
    data_to_reclaim* const next=current->next;
    if(!outstanding_hazard_pointers_for(current->data))  // 7
    {
      delete current;  // 8
    }
    else
    {
      add_to_reclaim_list(current);  // 9
    }
    current=next;
  }
}

首先,reclaim_later()是一个函数模板④。风险指针是一个通用解决方案,不能将栈节点的类型写死。使用std::atomic<void*>对风险指针进行存储,需要对任意类型的指针进行处理。不过不能使用void*形式,因为当要删除数据项时,delete操作只能对实际类型指针进行操作。data_to_reclaim的构造函数就很优雅:reclaim_later()只创建一个data_to_reclaim的实例,并且将实例添加到回收链表中⑤。add_to_reclaim_list()③就是使用compare_exchange_weak()循环来访问链表头(就如你之前看到的那样)。

回到data_to_reclaim的构造函数①:构造函数也是模板函数。会删除的成员数据类型为void*,并为do_deltete()函数提供一个合适的指针实例——将void*类型转换成要删除的类型,然后删除指针所指向的对象。std::function<>可以安全的产生一个函数指针,所以data_to_reclaim的析构函数可以通过调用存储的函数对数据进行删除②。

将节点添加入链表时,data_to_reclaim的析构函数不会被调用;析构函数会在没有风险指针指向节点的时候调用,这也就是delete_nodes_with_no_hazards()的作用。

delete_nodes_with_no_hazards()将已声明的链表节点进行回收,使用的是exchange()函数⑥(这个步骤简单且关键,是为了保证只有一个线程回收这些节点)。这样,其他线程就能自由将节点添加到链表中,或在不影响回收指定节点线程的情况下对节点进行回收。

只要有节点存在于链表中,就需要检查每个节点,查看节点是否被风险指针所指向⑦。如果没有风险指针,就可以安全的将记录删除(并且清除存储的数据)⑧。否则,就只能将这个节点添加到链表的后面,再进行回收⑨。

虽然实现很简单,也的确安全的回收了被删除的节点,不过开销增加了很多。遍历风险指针数组需要检查max_hazard_pointers原子变量,并且每次pop()调用时,都需要再检查一遍。原子操作很耗时——台式CPU上,100次原子操作要比100次非原子操作慢——所以,pop()成为了性能瓶颈。这种方式很糟糕,不仅需要遍历节点的风险指针链表,还要遍历等待链表上的每一个节点。有max_hazard_pointers在链表中时,就需要检查max_hazard_pointers多个已存储的风险指针。还有更好一点的方法吗?

对风险指针(较好)的回收策略

当然有更好的办法。这里只展示一个风险指针的简单实现,来解释技术问题。首先,考虑的是内存性能。比起对回收链表上的每个节点进行检查都要调用pop(),除非有超过max_hazard_pointer数量的节点存在于链表之上,否则就不需要尝试回收任何节点。这样就能保证至少有一个节点能够回收,如果等待链表中的节点数量达到max_hazard_pointers+1,那这比之前的方案也没好到哪里去。当获取了max_hazard_pointers数量的节点时,可以调用pop()对节点进行回收,所以这样也不是很好。不过,当有2×max_hazard_pointers个节点在列表中时,就能保证至少有max_hazard_pointers可以被回收,再次尝试回收任意节点前,至少会对pop()有max_hazard_pointers次调用。这就很不错了。比起检查max_hazard_pointers个节点就调用max_hazard_pointers次pop()(而且还不一定能回收节点),检查2×max_hazard_pointers个节点时,每max_hazard_pointers次对pop()的调用,就会有max_hazard_pointers个节点可回收。这就意味着,对两个节点检查调用pop(),就有一个节点能被回收。

这个方法的缺点(有增加内存使用的情况):需要对回收链表上的节点进行原子计数,并且还有很多线程争相对回收的链表进行访问。如果内存盈余,可以使用更多内存的来实现更好的回收策略:作为线程的本地变量,每个线程中的都拥有其自己的回收链表,这样就不需要原子计数了。这样的话,就需要分配max_hazard_pointers x max_hazard_pointers个节点。所有节点被回收完毕前时有线程退出,其本地链表可以像之前一样保存在全局中,并且添加到下一个线程的回收链表中,让下一个线程对这些节点进行回收。

风险指针另一个缺点:与IBM申请的专利所冲突[2]。要让写软件能够被他人使用,就必须拥有合法的知识产权,所以需要拥有合适的许可证。这对所有无锁内存回收技术都适用(这是一个活跃的研究领域),很多大公司都会有自己的专利。你可能会问,“为什么用了这么大的篇幅来介绍一个大多数人都没办法的技术呢?”。首先,使用这种技术可能不需要买一个许可证。比如,当使用GPL下的免费软件许可来进行软件开发,软件将不会包含到IBM的专利。其次,设计无锁代码时,还需要从使用者的角度进行思考,比如:高消耗的原子操作。最后,有一个建议将风险指针纳入到C++标准的未来修订中;因此,所以这种指针的确很好用,希望将来能够使用到编译器供应商的实现。

所以,是否有非专利的内存回收技术,且能被大多数人所使用呢?很幸运,的确有。引用计数就是这样的机制。

7.2.4 使用引用计数的节点

回到7.2.2节的问题,“想要删除还能被其他读者线程访问的节点,该怎么办?”。当能安全并精确的识别节点是否还被引用,以及没有线程访问这些节点的具体时间,即可判断是够能将对应节点进行删除。风险指针是通过将使用中的节点存放到链表中,而引用计数是通过对每个节点上访问的线程数量进行统计。

看起来简单粗暴……不,优雅;实际上管理起来却是很困难:首先,想到的就是由std::shared_ptr<>来完成这个任务,其有内置引用计数的指针。不幸的是,虽然std::shared_ptr<>上的一些操作是原子的,不过其也不能保证是无锁的。智能指针上的原子操作和对其他原子类型的操作并没有什么不同,但是std::shared_ptr<>旨在用于有多个上下文的情况下,并且在无锁结构中使用原子操作,无异于对该类增加了很多性能开销。如果平台支持std::atomic_is_lock_free(&some_shared_ptr)实现返回true,那么所有内存回收问题就都迎刃而解了。使用std::shared_ptr<node>构成的链表实现,如清单7.9所示。需要注意的是,next指针是从已弹出的结构中获取,为了避免指针让所有node陷入深度自旋中,所以std::shared_ptr引用的最后一个node会被销毁。

清单7.9 无锁栈——使用无锁std::shared_ptr<>的实现

template<typename T>
class lock_free_stack
{
private:
  struct node
  {
    std::shared_ptr<T> data;
    std::shared_ptr<node> next;
    node(T const& data_):
      data(std::make_shared<T>(data_))
    {}
  };

  std::shared_ptr<node> head;
public:
  void push(T const& data)
  {
    std::shared_ptr<node> const new_node=std::make_shared<node>(data);
    new_node->next=head.load();
    while(!std::atomic_compare_exchange_weak(&head,
        &new_node->next,new_node));
  }
  std::shared_ptr<T> pop()
  {
    std::shared_ptr<node> old_head=std::atomic_load(&head);
    while(old_head && !std::atomic_compare_exchange_weak(&head,
        &old_head,old_head->next));
    if (old_head){
      std::atomic_store(&old_head->next, std::shared_ptr<node>());
      return old_head->data;
    }
    return std::shared_ptr<T>();
  }
  ~lock_free_stack(){
    while(pop());
  }
};

std::shared_ptr<>使用无锁原子操作的实现不仅很少见,而且能想起为其使用一致性的原子操作也很难。并发技术规范扩展提供了工具,可以来解决这个问题,扩展中在头文件<experimental/atomic>中提供了std::experimental::atomic_shared_ptr<T>。多数情况下这与std::atomic<std::shared_ptr<T>>等价(除非是std::atomic<>不使用std::shared_ptr<T>),因为其具有特殊的复制语义,可以正确的处理引用计数。也就是std::experimental::atomic_shared_ptr<T>能在确保原子操作的同事,正确的处理引用计数。与第5章中的其他原子类型一样,其实现也不确定是否无锁。清单7.10是对清单7.9的重写。不需要记住atomic_load和atomic_store调用顺序,从而让事情变得更简单。

清单7.10 使用std::experimental::atomic_shared_ptr<>实现的栈结构

template<typename T>
class lock_free_stack
{
private:
  struct node
  {
    std::shared_ptr<T> data;
    std::experimental::atomic_shared_ptr<node> next;
    node(T const& data_):
      data(std::make_shared<T>(data_))
    {}
  };
  std::experimental::atomic_shared_ptr<node> head;
public:
  void push(T const& data)
  {
    std::shared_ptr<node> const new_node=std::make_shared<node>(data);
    new_node->next=head.load();
    while(!head.compare_exchange_weak(new_node->next,new_node));
  }
  std::shared_ptr<T> pop()
  {
    std::shared_ptr<node> old_head=head.load();
    while(old_head && !head.compare_exchange_weak(
      old_head,old_head->next.load()));
    if(old_head) {
      old_head->next=std::shared_ptr<node>();
      return old_head->data;
    }
    return std::shared_ptr<T>();
  }
  ~lock_free_stack(){
    while(pop());
  }
};

一些情况下,使用std::shared_ptr<>实现的结构并非无锁,这就需要手动管理引用计数。

一种方式是对每个节点使用两个引用计数:内部计数和外部计数。两个值的总和就是对这个节点的引用数。外部计数记录有多少指针指向节点,即在指针每次进行读取的时候,外部计数加一。当线程结束对节点的访问时,内部计数减一。指针在读取时,外部计数加一;读取结束时,内部计数减一。

不需要“外部计数-指针”对时(该节点就不能被多线程所访问了),外部计数减一和在被弃用的时候,内部计数将会增加。当内部计数等于0,就没有指针对该节点进行引用,就可以将该节点安全的删除。使用原子操作来更新共享数据也很重要。就让我们来看一下使用这种技术实现的无锁栈,只有确定节点能安全删除的情况下才能进行节点回收。

下面程序清单中就展示了内部数据结构,以及对push()简单优雅的实现。

清单7.11 使用分离引用计数的方式推送一个节点到无锁栈中

template<typename T>
class lock_free_stack
{
private:
  struct node;

  struct counted_node_ptr  // 1
  {
    int external_count;
    node* ptr;
  };

  struct node
  {
    std::shared_ptr<T> data;
    std::atomic<int> internal_count;  // 2
    counted_node_ptr next;  // 3

    node(T const& data_):
      data(std::make_shared<T>(data_)),
      internal_count(0)
    {}
  };

  std::atomic<counted_node_ptr> head;  // 4

public:
  ~lock_free_stack()
  {
    while(pop());
  }

  void push(T const& data)  // 5
  {
    counted_node_ptr new_node;
    new_node.ptr=new node(data);
    new_node.external_count=1;
    new_node.ptr->next=head.load();
    while(!head.compare_exchange_weak(new_node.ptr->next,new_node));
  }
};

外部计数包含在counted_node_ptr的指针中①,这个结构体会被node中的next指针③和内部计数②用到。counted_node_ptr是一个简单的结构体,可以使用特化std::atomic<>模板来对链表的头指针进行声明④。

counted_node_ptr体积够小,能够让std::atomic<counted_node_ptr>无锁。一些平台上支持双字比较和交换操作,可以直接对结构体进行操作。平台不支持这样的操作时,最好使用std::shared_ptr<>变量(如清单7.9那样);当类型的体积过大,超出了平台支持指令,那么原子std::atomic<>将使用锁来保证其操作的原子性(从而会让你的“无锁”算法“基于锁”来完成)。另外,如果想要限制计数器的大小,需要已知平台上指针所占的空间(比如,地址空间只剩下48位,而一个指针就要占64位),可以将计数存在一个指针空间内,为了适应平台也可以存在一个机器字当中。这样的技巧需要对特定系统有足够的了解,当然超出本书讨论的范围。

push()相对简单⑤,可构造一个counted_node_ptr实例,去引用新分配出来的(带有相关数据的)node,并且将node中的next指针设置为当前head。之后使用compare_exchange_weak()对head的值进行设置,就像之前代码清单中所示。因为internal_count刚被设置其值为0,并且external_count是1。因为这是一个新节点只有一个外部引用(head指针)。

通常,pop()都有一个从繁到简的过程,实现代码如下。

清单7.12 使用分离引用计数从无锁栈中弹出一个节点

template<typename T>
class lock_free_stack
{
private:
  void increase_head_count(counted_node_ptr& old_counter)
  {
    counted_node_ptr new_counter;

    do
    {
      new_counter=old_counter;
      ++new_counter.external_count;
    }
    while(!head.compare_exchange_strong(old_counter,new_counter));  // 1

    old_counter.external_count=new_counter.external_count;
  }
public:
  std::shared_ptr<T> pop()
  {
    counted_node_ptr old_head=head.load();
    for(;;)
    {
      increase_head_count(old_head);
      node* const ptr=old_head.ptr;  // 2
      if(!ptr)
      {
        return std::shared_ptr<T>();
      }
      if(head.compare_exchange_strong(old_head,ptr->next))  // 3
      {
        std::shared_ptr<T> res;
        res.swap(ptr->data);  // 4

        int const count_increase=old_head.external_count-2;  // 5

        if(ptr->internal_count.fetch_add(count_increase)==  // 6
           -count_increase)
        {
          delete ptr;
        }

        return res;  // 7
      }
      else if(ptr->internal_count.fetch_sub(1)==1)
      {
        delete ptr;  // 8
      }
    }
  }
};

当加载head值后就必须将外部引用加一,表明这个节点正在被引用,可保证解引用时的安全性。在引用计数增加前解引用指针,就会有线程能够访问这个节点,从而当前引用指针就成为了一个悬空指针。这就是将引用计数分离的主要原因:通过增加外部引用计数,保证指针在访问期间的合法性。compare_exchange_strong()的循环中①完成增加,通过比较和设置整个结构体来保证,指针不会在同一时间内被其他线程修改。

当计数增加就能安全的解引用ptr,并读取head指针的值,访问指向的节点②。当访问到链表的末尾,指针就是空指针。当指针不为空时,就能尝试对head调用compare_exchange_strong()来删除这个节点③。

compare_exchange_strong()成功时就拥有对应节点的所有权,并且可以和data进行交换④后返回。这样数据就不会持续保存,因为其他线程也会对栈进行访问,所以会有其他指针指向这个节点。而后,可以使用原子操作fetch_add⑥,将外部计数加到内部计数中去。如果引用计数为0,那么之前的值(fetch_add返回的值),在相加之前肯定是一个负数,这种情况下就可以将节点删除。这里需要注意的是,相加的值要比外部引用计数少2⑤;当节点已经从链表中删除,就要减少一次计数,并且这个线程无法再次访问指定节点,所以还要再减一。无论节点是否被删除,都能完成操作,所以可以将获取的数据进行返回⑦。

“比较/交换”③失败时,就说明其他线程已经把对应节点删除了,或者其他线程添加了一个新的节点到栈中。无论是哪种原因,需要通过“比较/交换”的调用,对具有新值的head重新进行操作。不过,首先需要减少节点(要删除的节点)上的引用计数。这个线程将再也没有办法访问这个节点了。如果当前线程是最后一个持有引用(因为其他线程已经将这个节点从栈上删除了)的线程,那么内部引用计数将会为1,所以减一的操作将会让计数器为0。这样,就能在循环⑧之前将对应节点删除。

目前,使用默认std::memory_order_seq_cst内存序来规定原子操作的执行顺序。大多数系统中,这种操作方式很耗时,且同步操作的开销要高于内存序。现在,可以考虑对数据结构的逻辑进行修改,对数据结构的部分放宽内存序要求;就没有必要在栈上增加过度的开销。现在让我们来检查一下栈的操作,并且扪心自问,能对一些操作使用更加宽松的内存序么?如果使用了,能确保同级安全吗?

7.2.5 无锁栈上的内存模型

修改内存序之前,需要检查一下操作之间的依赖关系。而后,再去确定适合这种需求关系的最小内存序。为了保证这种方式能够工作,需要在从线程的视角进行观察。其中最简单的视角就是,向栈中推入一个数据项,之后让其他线程从栈中弹出这个数据。

即使在简单的例子中,都需要三个重要的数据参与。

  1. counted_node_ptr转移的数据head。
  2. head引用的node。
  3. 节点所指向的数据项。

做push()的线程,会先构造数据项和节点,再设置head。做pop()的线程,会先加载head的值,再做在循环中对head做“比较/交换”操作,并增加引用计数,再读取对应的node节点,获取next的指向值,现在可以看到一组需求关系。next的值是普通的非原子对象,所以为了保证读取安全,必须确定存储(推送线程)和加载(弹出线程)的先行关系。因为原子操作就是push()函数中的compare_exchange_weak(),需要释放操作来获取两个线程间的先行关系,compare_exchange_weak()必须是std::memory_order_release或更严格的内存序。当compare_exchange_weak()调用失败,什么都不会改变,并且可以持续循环下去,所以使用std::memory_order_relaxed就足够了。

void push(T const& data)
{
  counted_node_ptr new_node;
  new_node.ptr=new node(data);
  new_node.external_count=1;
  new_node.ptr->next=head.load(std::memory_order_relaxed)
  while(!head.compare_exchange_weak(new_node.ptr->next,new_node,
    std::memory_order_release,std::memory_order_relaxed));
}

pop()的实现呢?为了确定先行关系,必须在访问next值之前使用std::memory_order_acquire或更严格的内存序操作。因为,increase_head_count()中使用compare_exchange_strong()就获取next指针指向的旧值,所以要其获取成功就需要确定内存序。如同调用push()那样,当交换失败,循环会继续,所以在失败时使用松散的内存序:

void increase_head_count(counted_node_ptr& old_counter)
{
  counted_node_ptr new_counter;

  do
  {
    new_counter=old_counter;
    ++new_counter.external_count;
  }
  while(!head.compare_exchange_strong(old_counter,new_counter,
        std::memory_order_acquire,std::memory_order_relaxed));

  old_counter.external_count=new_counter.external_count;
}

compare_exchange_strong()调用成功时,ptr中的值就被存到old_counter中。存储操作是push()中的一个释放操作,compare_exchange_strong()操作是一个获取操作,现在存储同步于加载,并且能够获取先行关系。因此,push()中存储ptr的值要先行于在pop()中对ptr->next的访问,现在的操作就安全了。

内存序对head.load()的初始化并不妨碍分析,现在就可以使用std::memory_order_relaxed

接下来compare_exchange_strong()将old_head.ptr->next设置为head。是否需要做什么来保证操作线程中的数据完整性呢?当交换成功就能访问ptr->data,所以需要保证在push()线程中已经对ptr->data进行了存储(在加载之前)。increase_head_count()中的获取操作时,保证与push()线程中的存储和“比较/交换”同步。这里的先行关系:在push()线程中存储数据,先行于存储head指针;调用increase_head_count()先行于对ptr->data的加载。即使,pop()中的“比较/交换”操作使用std::memory_order_relaxed,这些操作还是能正常运行。唯一不同的地方就是,调用swap()让ptr->data有所变化,且没有其他线程可以对同一节点进行操作(这就是“比较/交换”操作的作用)。

compare_exchange_strong()失败时,新值就不会去更新old_head,并继续循环。已确定在increase_head_count()中使用std::memory_order_acquire内存序的可行性,所以使用std::memory_order_relaxed也可以。

其他线程呢?是否需要设置一些更为严格的内存序来保证其他线程的安全呢?回答是“不用”。因为,head只会因“比较/交换”操作有所改变;对于“读-改-写”操作来说,push()中的“比较/交换”操作是构成释放序列的一部分。因此,即使有很多线程在同一时间对head进行修改,push()中的compare_exchange_weak()与increase_head_count()(读取已存储的值)中的compare_exchange_strong()也是同步的。

剩余操作就可以用来处理fetch_add()操作(用来改变引用计数的操作),因为已知其他线程不可能对该节点的数据进行修改,所以从节点中返回数据的线程可以继续执行。不过,当线程获取其他线程修改后的值时,就代表操作失败(swap()是用来提取数据项的引用)。为了避免数据竞争,需要保证swap()先行于delete操作。一种简单的解决办法:在“成功返回”分支中对fetch_add()使用std::memory_order_release内存序,在“再次循环”分支中对fetch_add()使用std::memory_order_qcquire内存序。不过,这就有点矫枉过正:只有一个线程做delete操作(将引用计数设置为0的线程),所以只有这个线程需要获取操作。因为fetch_add()是一个“读-改-写”操作,是释放序列的一部分,所以可以使用一个额外的load()做获取。当“再次循环”分支将引用计数减为0时,fetch_add()可以重载引用计数,使用std::memory_order_acquire为了保持需求的同步关系;并且,fetch_add()本身可以使用std::memory_order_relaxed。使用新pop()的栈实现如下。

清单7.13 基于引用计数和松散原子操作的无锁栈

template<typename T>
class lock_free_stack
{
private:
  struct node;
  struct counted_node_ptr
  {
    int external_count;
    node* ptr;
  };

  struct node
  {
    std::shared_ptr<T> data;
    std::atomic<int> internal_count;
    counted_node_ptr next;

    node(T const& data_):
      data(std::make_shared<T>(data_)),
      internal_count(0)
    {}
  };

  std::atomic<counted_node_ptr> head;

  void increase_head_count(counted_node_ptr& old_counter)
  {
    counted_node_ptr new_counter;

    do
    {
      new_counter=old_counter;
      ++new_counter.external_count;
    }
    while(!head.compare_exchange_strong(old_counter,new_counter,
                                        std::memory_order_acquire,
                                        std::memory_order_relaxed));
    old_counter.external_count=new_counter.external_count;
  }
public:
  ~lock_free_stack()
  {
    while(pop());
  }

  void push(T const& data)
  {
    counted_node_ptr new_node;
    new_node.ptr=new node(data);
    new_node.external_count=1;
    new_node.ptr->next=head.load(std::memory_order_relaxed)
    while(!head.compare_exchange_weak(new_node.ptr->next,new_node,
                                      std::memory_order_release,
                                      std::memory_order_relaxed));
  }
  std::shared_ptr<T> pop()
  {
    counted_node_ptr old_head=
       head.load(std::memory_order_relaxed);
    for(;;)
    {
      increase_head_count(old_head);
      node* const ptr=old_head.ptr;
      if(!ptr)
      {
        return std::shared_ptr<T>();
      }
      if(head.compare_exchange_strong(old_head,ptr->next,
                                      std::memory_order_relaxed))
      {
        std::shared_ptr<T> res;
        res.swap(ptr->data);

        int const count_increase=old_head.external_count-2;

        if(ptr->internal_count.fetch_add(count_increase,
              std::memory_order_release)==-count_increase)
        {
          delete ptr;
        }

        return res;
      }
      else if(ptr->internal_count.fetch_add(-1,
                   std::memory_order_relaxed)==1)
      {
        ptr->internal_count.load(std::memory_order_acquire);
        delete ptr;
      }
    }
  }
};

这是一种历练,不过要告一段落了,我们已经获得比之前好很多的栈实现。深思熟虑后,通过使用更多的松散操作,不影响并发性的同时提高性能。实现中的pop()有37行,而功能等同于清单6.1中的那7行的基于锁的栈实现和清单7.2中无内存管理的无锁栈实现。接下来要设计的无锁队列,将看到类似的情况:无锁结构的复杂性,主要在于内存的管理。

7.2.6 实现一个无锁的线程安全队列

队列的挑战与栈的有些不同,因为push()和pop()在队列中,操作的不是同一个地方。因此,同步需求就不一样。需要保证对一端的修改是正确的,且对另一端是可见的。不过,清单6.6中队列有一个try_pop()成员函数,其作用和清单7.2中简单的无锁栈的pop()功能差不多,就可以合理的假设无锁代码都很相似。这是为什么呢?

如果将清单6.6中的代码作为基础,就需要两个node指针:head和tail。可以让多线程对它们进行访问,所以这两个节点最好是原子的,这样就不用考虑互斥问题了。让我们对清单6.6中的代码做一些修改,并且看一下应该从哪里开始设计。先来看一下下面的代码。

清单7.14 单生产者/单消费者模型下的无锁队列

template<typename T>
class lock_free_queue
{
private:
  struct node
  {
    std::shared_ptr<T> data;
    node* next;

    node():
       next(nullptr)
    {}
  };

  std::atomic<node*> head;
  std::atomic<node*> tail;

  node* pop_head()
  {
    node* const old_head=head.load();
    if(old_head==tail.load())  // 1
    {
      return nullptr;
    }
    head.store(old_head->next);
    return old_head;
  }
public:
  lock_free_queue():
      head(new node),tail(head.load())
  {}

  lock_free_queue(const lock_free_queue& other)=delete;
  lock_free_queue& operator=(const lock_free_queue& other)=delete;

  ~lock_free_queue()
  {
    while(node* const old_head=head.load())
    {
      head.store(old_head->next);
      delete old_head;
    }
  }
  std::shared_ptr<T> pop()
  {
    node* old_head=pop_head();
    if(!old_head)
    {
      return std::shared_ptr<T>();
    }

    std::shared_ptr<T> const res(old_head->data);  // 2
    delete old_head;
    return res;
  }

  void push(T new_value)
  {
    std::shared_ptr<T> new_data(std::make_shared<T>(new_value));
    node* p=new node;  // 3
    node* const old_tail=tail.load();  // 4
    old_tail->data.swap(new_data);  // 5
    old_tail->next=p;  // 6
    tail.store(p);  // 7
  }
};

一眼望去,这个实现并没什么不好,当只有一个线程调用一次push(),且只有一个线程调用pop(),这种情况下队列完美工作。push()和pop()之间的先行关系就很重要了,这直接关系到获取到的data。对tail的存储⑦同步于对tail的加载①,存储之前节点的data指针⑤先行于存储tail;并且,加载tail先行于加载data指针②,所以对data的存储要先行于加载,一切都没问题。因此,这是一个完美的单生产者,单消费者(SPSC, single-producer, single-consume)队列。

问题在于当多线程对push()或pop()并发调用。先看一下push():如果有两个线程并发调用push(),它们会新分配两个节点作为虚拟节点③,也会读取到相同的tail值④,因此也会同时修改同一个节点,同时设置data和next指针⑤⑥。明显的数据竞争!

pop_head()函数也有类似的问题。当有两个线程并发的调用这个函数时,这两个线程就会读取到同一个head中同样的值,并且会同时通过next指针去复写旧值。两个线程现在都能索引到同一个节点——真是一场灾难!不仅要保证只有一个pop()线程可以访问给定项,还要保证其他线程在读取head指针时,可以安全的访问节点中的next。这就和无锁栈中pop()的问题一样了,也就有很多解决方案可以使用。

pop()的问题解决了,那么push()呢?问题在于为了获取push()和pop()间的先行关系,就需要在为虚拟节点设置数据项前,更新tail指针。并发访问push()时,因为每个线程所读取到的是同一个tail指针,所以线程会为同一个数据项进行竞争。

多线程下的push()

第一个选择是在两个真实节点中添加一个虚拟节点。这种方法,需要当前tail节点更新next指针,这样让节点看起来像一个原子变量。当一个线程成功将next指针指向一个新节点,就说明其成功的添加了一个指针;否则,就需要再次读取tail,并重新对指针进行添加。这里就需要对pop()进行简单的修改,为了消除持有空指针的节点再次进行循环。这个方法的缺点:每次pop()函数的调用,通常都要删除两个节点,每次添加一个节点,都需要分配双份内存。

第二个选择是让data指针原子化,并通过“比较/交换”操作对其进行设置。如果“比较/交换”成功,就说明能获取tail指针,并能够安全的对其next指针进行设置,也就是更新tail。因为有其他线程对数据进行了存储,所以会导致“比较/交换”操作的失败,这时就要重新读取tail,重新循环。当原子操作对于std::shared_ptr<>是无锁的,就可以轻松一下了。如果不是,就需要一个替代方案;一种可能是让pop()函数返回一个std::unique_ptr<>(毕竟,这个指针只能引用指定对象),并且将数据作为普通指针存储在队列中的方案。这就需要队列支持存储std::atomic<T*>类型,对于compare_exchange_strong()的调用就很有必要了。使用类似于清单7.11中的引用计数模式,来解决多线程对pop()和push()的访问。

清单7.15 push()的第一次修订(不正确的)

void push(T new_value)
{
  std::unique_ptr<T> new_data(new T(new_value));
  counted_node_ptr new_next;
  new_next.ptr=new node;
  new_next.external_count=1;
  for(;;)
  {
    node* const old_tail=tail.load();  // 1
    T* old_data=nullptr;
    if(old_tail->data.compare_exchange_strong(
      old_data,new_data.get()))  // 2
    {
      old_tail->next=new_next;
      tail.store(new_next.ptr);  // 3
      new_data.release();
      break;
    }
  }
}

使用引用计数方案可以避免竞争,不过竞争不只在push()中。可以再看一下7.14中的修订版push(),与栈中模式相同:加载一个原子指针①,并且对该指针解引用②。同时,另一个线程可以对指针进行更新③,最终回收该节点(在pop()中)。当节点回收后,再对指针进行解引用,就会导致未定义行为。有个诱人的方案,就是给tail也添加计数器,就像给head做的那样,不过队列中的节点的next指针中都已经拥有了一个外部计数。同一个节点上有两个外部计数,为了避免过早的删除节点,需要对之前引用计数方案进行修改。通过对node结构中外部计数器数量的统计,解决这个问题。外部计数器销毁时,统计值减一(将对应的外部计数添加到内部)。当内部计数是0,且没有外部计数器时,对应节点就可以被安全删除了。这个技术是我查阅Joe Seigh的原子指针+项目[5]的时候看到的。下面push()的实现,使用的就是这种方案。

清单7.16 使用带有引用计数tail,实现的无锁队列中的push()

template<typename T>
class lock_free_queue
{
private:
  struct node;
  struct counted_node_ptr
  {
    int external_count;
    node* ptr;
  };

  std::atomic<counted_node_ptr> head;
  std::atomic<counted_node_ptr> tail;  // 1

  struct node_counter
  {
    unsigned internal_count:30;
    unsigned external_counters:2;  // 2
  };

  struct node
  {
    std::atomic<T*> data;
    std::atomic<node_counter> count;  // 3
    counted_node_ptr next;

    node()
    {
      node_counter new_count;
      new_count.internal_count=0;
      new_count.external_counters=2;  // 4
      count.store(new_count);

      next.ptr=nullptr;
      next.external_count=0;
     }
  };
public:
  void push(T new_value)
  {
    std::unique_ptr<T> new_data(new T(new_value));
    counted_node_ptr new_next;
    new_next.ptr=new node;
    new_next.external_count=1;
    counted_node_ptr old_tail=tail.load();

    for(;;)
    {
      increase_external_count(tail,old_tail);  // 5

      T* old_data=nullptr;
      if(old_tail.ptr->data.compare_exchange_strong(  // 6
           old_data,new_data.get()))
      {
        old_tail.ptr->next=new_next;
        old_tail=tail.exchange(new_next);
        free_external_counter(old_tail);  // 7
        new_data.release();
        break;
      }
      old_tail.ptr->release_ref();
    }
  }
};

清单7.16中,tail和head一样都是atomic类型①,并且node结构体中用count成员变量替换了之前的internal_count③。count成员变量包括了internal_count和外部external_counters成员②。注意,这里需要2bit的external_counters,因为最多就有两个计数器。因为使用了位域,所以就将internal_count指定为30bit的值,就能保证计数器的总体大小是32bit。内部计数值就有充足的空间来保证这个结构体能放在一个机器字中(包括32位和64位平台)。为的就是避免条件竞争,将结构体作为单独的实体来更新。让结构体的大小保持在一个机器字内,对其的操作就如同原子操作一样,还可以在多个平台上使用。

node初始化时,internal_count设置为0,external_counter设置为2④,因为当新节点加入队列中时,都会被tail和上一个节点的next指针所指向。push()与清单7.14中的实现很相似,除了为了对tail中的值进行解引用,需要调用节点data成员变量的compare_exchange_strong()成员函数⑥保证值的正确性;在这之前还要调用increase_external_count()增加计数器的计数⑤,而后在对尾部的旧值调用free_external_counter()⑦。

push()处理完毕再来看一下pop()。下面的实现,是将清单7.12中的引用计数pop()与7.14中队列pop()混合的版本。

清单7.17 使用尾部引用计数,将节点从无锁队列中弹出

template<typename T>
class lock_free_queue
{
private:
  struct node
  {
    void release_ref();
  };
public:
  std::unique_ptr<T> pop()
  {
    counted_node_ptr old_head=head.load(std::memory_order_relaxed);  // 1
    for(;;)
    {
      increase_external_count(head,old_head);  // 2
      node* const ptr=old_head.ptr;
      if(ptr==tail.load().ptr)
      {
        ptr->release_ref();  // 3
        return std::unique_ptr<T>();
      }
      if(head.compare_exchange_strong(old_head,ptr->next))  // 4
      {
        T* const res=ptr->data.exchange(nullptr);
        free_external_counter(old_head);  // 5
        return std::unique_ptr<T>(res);
      }
      ptr->release_ref();  // 6
    }
  }
};

进入循环时,在将加载值的外部计数增加②之前,需要加载old_head值作为启动①。当head与tail节点相同的时候,就能对引用进行释放③,因为队列中已经没有数据,所以返回的是空指针。如果队列中还有数据,可以尝试使用compare_exchange_strong()来做声明④。与7.11中的栈一样,将外部计数和指针做为一个整体进行比较;当外部计数或指针有所变化时,需要将引用释放后,再次进行循环⑥。当交换成功时,已声明的数据就归线程所有。为已弹出节点释放外部计数后⑤,就能把对应的指针返回给调用函数了。当两个外部引用计数都被释放,且内部计数降为0时,节点就可以被删除。对应的引用计数函数将会在清单7.18,7.19和7.20中展示。

清单7.18 在无锁队列中释放一个节点引用

template<typename T>
class lock_free_queue
{
private:
  struct node
  {
    void release_ref()
    {
      node_counter old_counter=
        count.load(std::memory_order_relaxed);
      node_counter new_counter;
      do
      {
        new_counter=old_counter;
        --new_counter.internal_count;  // 1
      }
      while(!count.compare_exchange_strong(  // 2
            old_counter,new_counter,
            std::memory_order_acquire,std::memory_order_relaxed));
      if(!new_counter.internal_count &&
         !new_counter.external_counters)
      {
        delete this;  // 3
      }
    }
  };
};

node::release_ref()的实现,只是对7.12中lock_free_stack::pop()进行小幅度修改得到。不过,清单7.12中的代码仅是处理单个外部计数的情况,所以要修改internal_count①,只需要使用fetch_sub就能让count结构体自动更新。因此,需要一个“比较/交换”循环②。降低internal_count时,内外部计数都为0时,就代表这是最后一次引用,之后就可以将这个节点删除③。

清单7.19 从无锁队列中获取一个节点的引用

template<typename T>
class lock_free_queue
{
private:
  static void increase_external_count(
    std::atomic<counted_node_ptr>& counter,
    counted_node_ptr& old_counter)
  {
    counted_node_ptr new_counter;
    do
    {
      new_counter=old_counter;
      ++new_counter.external_count;
    }
    while(!counter.compare_exchange_strong(
      old_counter,new_counter,
      std::memory_order_acquire,std::memory_order_relaxed));

    old_counter.external_count=new_counter.external_count;
  }
};

清单7.19展示的是另一方面。并不是对引用的释放,都会得到一个新引用,并增加外部计数的值。increase_external_count()和清单7.13中的increase_head_count()很相似,不同的是increase_external_count()这里作为静态成员函数,通过将外部计数器作为第一个参数传入函数,对其进行更新,而非只操作一个固定的计数器。

清单7.19 无锁队列中释放节点外部计数器

template<typename T>
class lock_free_queue
{
private:
  static void free_external_counter(counted_node_ptr &old_node_ptr)
  {
    node* const ptr=old_node_ptr.ptr;
    int const count_increase=old_node_ptr.external_count-2;

    node_counter old_counter=
      ptr->count.load(std::memory_order_relaxed);
    node_counter new_counter;
    do
    {
      new_counter=old_counter;
      --new_counter.external_counters;  // 1
      new_counter.internal_count+=count_increase;  // 2
    }
    while(!ptr->count.compare_exchange_strong(  // 3
           old_counter,new_counter,
           std::memory_order_acquire,std::memory_order_relaxed));

    if(!new_counter.internal_count &&
       !new_counter.external_counters)
    {
      delete ptr;  // 4
    }
  }
};

与increase_external_count()对应的是free_external_counter()。这里的代码和清单7.11中的lock_free_stack::pop()类似,不过做了一些修改用来处理external_counters计数。使用单个compare_exchange_strong()对计数结构体中的两个计数器进行更新③,就像之前release_ref()降低internal_count一样。和清单7.11一样,internal_count会进行更新②,并且external_counters将会减一①。当内外计数值都为0,就没有更多的节点可以被引用,所以节点可以安全的删除④。这个操作需要作为独立的操作来完成(因此需要“比较/交换”循环),来避免条件竞争。如果将两个计数器分开来更新,两个线程的情况下,可能都会认为自己是最后一个引用者,从而将节点删除,最后导致未定义行为。

现在的队列工作正常且无竞争,但是还是有一个性能问题。当一个线程对old_tail.ptr->data成功的完成compare_exchange_strong()(7.15中的⑥),就可以执行push()操作;并且,能确定没有其他线程在同时执行push()操作。让其他线程看到有新值的加入,要比只看到空指针的好,因此在compare_exchange_strong()调用失败的时候,线程就会继续循环。这就是忙等待,这种方式会消耗CPU的运算周期。因此,忙等待这就是一个锁。push()的首次调用,是要在其他线程完成后,将阻塞去除后才能完成,所以实现只是半无锁(no longer lock-free)结构。不仅如此,当线程被阻塞的时候,操作系统会给不同的线程以不同优先级,用于获取互斥锁。当前情况下,不可能出现不同优先级的情况,所以阻塞线程将会浪费CPU的运算周期,直到第一个线程完成其操作。处理的技巧出自于“无锁技巧包”:等待线程可以帮助push()线程完成操作。

无锁队列中的线程间互助

为了恢复代码无锁的属性,就需要让等待线程在push()线程没什么进展时,做一些事情,就是帮进展缓慢的线程完成其工作。

这种情况下,可以知道线程应该去做什么:尾节点的next指针需要指向一个新的虚拟节点,且tail指针之后也要更新。因为虚拟节点都是一样的,所以是谁创建的都不重要。将next指针放入一个原子节点中时,就可以使用compare_exchange_strong()来设置next指针。next指针已经被设置,就可以使用compare_exchange_weak()循环对tail进行设置,能保证next指针始终引用的是同一个原始节点。如果引用的不是同一个原始节点,其他部分就已经更新,可以停止尝试再次循环。这个需求只需要对pop()进行微小的改动,其目的就是为了加载next指针;实现将在下面展示。

清单7.21 修改pop()用来帮助push()完成工作

template<typename T>
class lock_free_queue
{
private:
  struct node
  {
    std::atomic<T*> data;
    std::atomic<node_counter> count;
    std::atomic<counted_node_ptr> next;  // 1
  };
public:
  std::unique_ptr<T> pop()
  {
    counted_node_ptr old_head=head.load(std::memory_order_relaxed);
    for(;;)
    {
      increase_external_count(head,old_head);
      node* const ptr=old_head.ptr;
      if(ptr==tail.load().ptr)
      {
        return std::unique_ptr<T>();
      }
      counted_node_ptr next=ptr->next.load();  // 2
      if(head.compare_exchange_strong(old_head,next))
      {
        T* const res=ptr->data.exchange(nullptr);
        free_external_counter(old_head);
        return std::unique_ptr<T>(res);
      } 
      ptr->release_ref();
    }
  }
};

改变很简单:next指针线程就是原子的①,所以load②也是原子的。在这个例子中,可以使用默认memory_order_seq_cst内存序,所以这里可以忽略对load()的显式调用,并且依赖于加载对象隐式转换成counted_node_ptr,不过这里的显式调用就可以用来提醒:哪里需要显式添加内存序。

以下代码对push()有更多的展示。

清单7.22 无锁队列中简单的帮助性push()的实现

template<typename T>
class lock_free_queue
{
private:
  void set_new_tail(counted_node_ptr &old_tail,  // 1
                    counted_node_ptr const &new_tail)
  {
    node* const current_tail_ptr=old_tail.ptr;
    while(!tail.compare_exchange_weak(old_tail,new_tail) &&  // 2
          old_tail.ptr==current_tail_ptr);
    if(old_tail.ptr==current_tail_ptr)  // 3
      free_external_counter(old_tail);  // 4
    else
      current_tail_ptr->release_ref();  // 5
  }
public:
  void push(T new_value)
  {
    std::unique_ptr<T> new_data(new T(new_value));
    counted_node_ptr new_next;
    new_next.ptr=new node;
    new_next.external_count=1;
    counted_node_ptr old_tail=tail.load();

    for(;;)
    {
      increase_external_count(tail,old_tail);

      T* old_data=nullptr;
      if(old_tail.ptr->data.compare_exchange_strong(  // 6
         old_data,new_data.get()))
      {
        counted_node_ptr old_next={0};
        if(!old_tail.ptr->next.compare_exchange_strong(  // 7
           old_next,new_next))
        {
          delete new_next.ptr;  // 8
          new_next=old_next;  // 9
        }
        set_new_tail(old_tail, new_next);
        new_data.release();
        break;
      }
      else  // 10
      {
        counted_node_ptr old_next={0};
        if(old_tail.ptr->next.compare_exchange_strong(  // 11
           old_next,new_next))
        {
          old_next=new_next;  // 12
          new_next.ptr=new node;  // 13
        }
        set_new_tail(old_tail, old_next);  // 14
      }
    }
  }
};

与清单7.15中的原始push()相似,不过还是有些不同。当对data进行设置⑥,就需要对另一线程帮忙的情况进行处理,在else分支就是具体的帮助内容⑩。

对节点中的data指针进行设置⑥时,新版push()对next指针的更新使用的是compare_exchange_strong()⑦(使用compare_exchange_strong()来避免循环),当交换失败就能知道另有线程对next指针进行设置,所以就可以删除一开始分配的那个新节点⑧。还需要获取next指向的值——其他线程对tail指针设置的值。

对tail指针的更新,实际在set_new_tail()中完成①。使用一个compare_exchange_weak()循环②来更新tail,如果其他线程尝试push()一个节点时,external_count部分将会改变。不过,当其他线程成功的修改了tail指针时,就不能对其值进行替换;否则,队列中的循环将会结束,这是一个相当糟糕的主意。因此,“比较/交换”操作失败时,就需要保证ptr加载值要与tail指向的值相同。当新旧ptr相同时,循环退出③就代表对tail的设置已经完成,所以需要释放旧外部计数器④。当ptr值不一样时另一线程可能已经将计数器释放了,所以只需要对该线程持有的单次引用进行释放即可⑤。

当线程调用push()时,未能在循环阶段对data指针进行设置,这个线程可以帮助成功的线程完成更新。首先,会尝试更新next指针,让其指向该线程分配出来的新节点⑪。指针更新成功时,就可以将这个新节点作为新的tail节点⑫,且需要分配另一个新节点,用来管理队列中新推送的数据项⑬。再进入循环之前,可以通过调用set_new_tail来设置tail节点⑭。

读者们可能已经意识到,比起大量的new和delete操作,这样的代码更加短小精悍,因为新节点实在push()中被分配,而在pop()中被销毁。因此,内存分配器的效率也需要考虑到;一个糟糕的分配器可能会让无锁容器的扩展特性消失的一干二净。选择和实现高效的分配器,已经超出了本书的范围,不过需要牢记的是:测试以及衡量分配器效率最好的办法,就是对使用前和使用后进行比较。为优化内存分配,包括每个线程有自己的分配器,以及使用回收列表对节点进行回收,而非将这些节点返回给分配器。

例子已经足够多了;那么,让我们从这些例子中提取出一些指导和建议吧。


[1] “Safe Memory Reclamation for Dynamic Lock-Free Objects Using Atomic Reads and Writes,” Maged M.Michael, in PODC ’02: Proceedings of the Twenty-first Annual Symposium on Principles of Distributed Computing (2002), ISBN 1-58113-485-1.

[2] Maged M. Michael, U.S. Patent and Trademark Office application number 20040107227, “Method for efficient implementation of dynamic lock-free data structures with safe memory reclamation.”

[3] GNU General Public License http://www.gnu.org/licenses/gpl.html.

[4] IBM Statement of Non-Assertion of Named Patents Against OSS, http://www.ibm.com/ibm/licensing/patents/pledgedpatents.pdf.

[5] Atomic Ptr Plus Project, http://atomic-ptr-plus.sourceforge.net/.