Yangming's Blog

beware the barrenness of a busy life

C++11——Concurrency

11 Feb 2020 » Linux, C++

一直就是野路子c++,主要就是以c++98的方式写代码;前段时间看了MySQL8中的redolog无锁优化,对其中c++11的并发框架的运用印象深刻,决定要了解一下C++11的新东西了,跟上潮流😂。这里就从并发这块开始看吧,先补补基础知识!

关于锁

并发编程中最常见的概念就是同步,而实现同步的方式有很多种,有互斥锁(mutex),原子变量(atomic),自旋锁(spinlock),信号量(singal),读写锁(share/exclusive lock)等等。本文主要讨论前三种:

  • 互斥锁:mutex需要陷入内核态,存在上下文切换的代价,主要代价在cpu-sys。
  • 自旋锁:spinlock通常基于CAS等原子操作实现,如果当下没有取到锁,那么会在用户态自旋等待一段时间,避免频繁陷入内核态,代价在cpu-usr。
  • 原子变量:封装了原子操作的变量,不需要锁,也能保证并发更新的正确性;前两个可用在某段临界区的互斥上,原子变量的互斥临界区可认为缩小到一个变量的更新。

下面对这三类进行简单展开。

spinlock

自旋锁的概念是在PostgreSQL中了解到的,自旋锁(SpinLock)作为PostgreSQL最底层的锁(关于PostgreSQL的锁,可见另一篇——Lock in PG),它的实现是和操作系统和硬件环境相关的。在PostgreSQL中,实现了两种SpinLock:

  • Hardware-dependent,利用TAS指令集实现(s_lock);

    static __inline__ int
    tas(volatile slock_t *lock)
    {
    	register slock_t _res = 1;
      
    	__asm__ __volatile__(
    		"	lock			\n"
    		"	xchgb	%0,%1	\n"
    :		"+q"(_res), "+m"(*lock)
    :		/* no inputs */
    :		"memory", "cc");
    	return (int) _res;
    }
    

    在编程语言中,CPU有时不会从内存读取,会直接从寄存器中读取变量值;那么,定义变量为volatile,表示这个变量是易失的,所以会编译器会强制每次都去内存中取原始值,而不是直接拿寄存器中的值,常用在锁变量等对象上。

  • Hardware-independent,利用PostgreSQL定义的信号量PGSemaphore实现(spin)。

    int
    tas_sema(volatile slock_t *lock)
    {
    	int			lockndx = *lock;
      
    	if (lockndx <= 0 || lockndx > NUM_SPINLOCK_SEMAPHORES)
    		elog(ERROR, "invalid spinlock number: %d", lockndx);
    	/* Note that TAS macros return 0 if *success* */
    	return !PGSemaphoreTryLock(SpinlockSemaArray[lockndx - 1]);
    }
    /*
     * PGSemaphoreTryLock
     *
     * Lock a semaphore only if able to do so without blocking
     */
    bool
    PGSemaphoreTryLock(PGSemaphore sema)
    {
    	int			errStatus;
      
    	/*
    	 * Note: if errStatus is -1 and errno == EINTR then it means we returned
    	 * from the operation prematurely because we were sent a signal.  So we
    	 * try and lock the semaphore again.
    	 */
    	do
    	{
    		errStatus = sem_trywait(PG_SEM_REF(sema));
    	} while (errStatus < 0 && errno == EINTR);
      
    	if (errStatus < 0)
    	{
    		if (errno == EAGAIN || errno == EDEADLK)
    			return false;		/* failed to lock it */
    		/* Otherwise we got trouble */
    		elog(FATAL, "sem_trywait failed: %m");
    	}
      
    	return true;
    }
    

一般来说,自旋锁需要占用CPU资源进行不断的检查,对于比较耗时的操作【较大的临界区】,通常不建议使用,会浪费太多的CPU资源。

自旋锁通常是第一种实现,会使用一个volatile变量:lockvar,表示该变量极易改变,这样编译器就不会对该变量进行任何优化,比如乱序, 避免影响原有逻辑。在执行中,lockvar是由load加载到cpu core的寄存器中,然后通过store指令写到磁盘中,尽管编译器不会乱序优化,但是流水线执行的时候还是可能会乱序重排。

因为cpu的流水线会乱序执行,但是为了保证memory order正确性,需要保证store之后的load,不会提前执行;

memory order:指的是load和store指令的相对顺序。Memory Order Violation: A load is executed before a prior store, reads the wrong value

因此,在实际执行中,如果某个core load了某个地址的内容后,并且按照乱序重排了指令;那么,在这之后,另一个core对同一地址执行了store;那么执行load指令的原core的流水线顺序可能发生memory order violate,需要重排(这是很费时的)。

在spin lock场景中,这经常发生;等锁的core不断执行load,执行了几次都失败后,系统按照常理推测出后续应该都失败,就按照失败的逻辑排列了load指令;但是可能刚排列好,另一个core就解锁了,即,执行了store命令;那么spink lock 对应的load流水线,就得重排;为了避免这个代价,intel的cpu引入了pause指令,这样load先不着急预测排列。

atomic

PostgreSQL在9.6版本中,将bufferpool中的spinlock替换为atomic operations(commitlog),例如下面的commitlog:

-		LockBufHdr(buf);
-		buf->refcount++;
+		pg_atomic_add_fetch_u32(&buf->refcount, 1);
+

其将refcount设计成一个原子变量,借助底层的add_fetch原子指令进行操作;相比于lock,类似cas的原子操作有是乐观的。很多用lock实现的地方,可以变成原子操作,这样能够减少锁的额外代价,实现lock-free;但是不等于wait-free。在gcc中,提供了类似的操作,比如__sync_lock_test_and_set。而在在C++11中有原子变量类型,提供了各种原子操作;原子操作是操作系统提供最小的且不可并行化的指令。

PostgreSQL类似的原子操作有:

  • Test and set:pg_atomic_test_set_flag_impl;
  • Compare and exchange:pg_atomic_compare_exchange_u32_impl;
  • Fetch and add:pg_atomic_fetch_add_u32_impl

在InnoDB5.7的os0sync中同样封装了原子操作,而在8中,直接使用了C++11的原子类型。·

Mutex

关于Mutex,主要了解一下futex调用,futexFast Userspace muTEX的简称。是在2003由IBM的工程师引入到Kernel中的,主要用于减小高并发下锁同步是system call的代价。

由于在大部分情况下,锁上并没有竞争。应用可基于硬件提供的原子操作就可以进行锁资源的占用了。

Most modern processors have built-in atomic instructions implemented in HW. For example on Intel architectures cmpxhg is an instruction. While it’s not as cheap as non-atomic instructions (especially in multi-core systems), it’s significantly cheaper than system calls.

对于少部分情况下,获取锁时确实存在竞争,此时通过原子操作获取锁可能会失败。如果锁获取失败,有两种处理方案:

  • 方案一是不断自旋检查锁资源是否可用,但是这样会浪费CPU资源;
  • 方案二就可用先将当前线程(进程)状态变为sleep,等到锁资源释放了,再唤醒处理,那么如何进行sleep/wakeup呢?这里就需要futex了。
#include <linux/futex.h>
#include <sys/time.h>

int futex(int *uaddr, int futex_op, int val,
          const struct timespec *timeout,   /* or: uint32_t val2 */
          int *uaddr2, int val3);

futex调用接口如上,futex_op参数定义了多种不同的操作,比如FUTEX_WAITFUTEX_WAKE,一个是等待uaddr处的数据等于val,一个是唤醒等待的线程(进程)。

image-20200216101837409

这样实际上,内核帮用户态线程维护了一个锁队列,如上图,thread通过FUTEX_WAIT通知内核将自己挂起并排队,然后由另外一个进程通过FUTEX_WAKE释放该锁,并唤醒队列中的线程(唤醒时,可通过参数指定唤醒数量);这就是上面所述的方案二的简单原理,避免了队列中每个线程都不断的检查原子变量,而消耗CPU资源;并且Futex大部分是在用户态完成的,减少了陷入内核态的代价。

实际上Futex是通过一个hash table维护各个key,如下图,详细的实现逻辑,有兴趣可以继续了解:

Futex implementation diagram from LWN

而且futex可作用的共享资源不仅仅是多线程并行中的共享变量,还有多进程中的资源,比如mmap(2) 或 shmat(2)创建的共享内存,

futex一般用在低冲突的锁同步场景中。在实际代码中,我们经常会用到mutex进行线程互斥,mutex底层就是基于futex和atomics实现的,即,通过atomics存储锁字节,然后通过futex进行atomics的检测,这里有个简单实现样例。实际的例子,我们可以参见glibc中的pthread_mutex_t,翻看NPTL(Native_POSIX_Thread_Library)的代码可以从注释中了解一二(sysdeps/nptl/lowlevellock.h):

/* Low-level locks use a combination of atomic operations (to acquire and
   release lock ownership) and futex operations (to block until the state
   of a lock changes).  A lock can be in one of three states:
   0:  not acquired,
   1:  acquired with no waiters; no other threads are blocked or about to block
       for changes to the lock state,
   >1: acquired, possibly with waiters; there may be other threads blocked or
       about to block for changes to the lock state.

   We expect that the common case is an uncontended lock, so we just need
   to transition the lock between states 0 and 1; releasing the lock does
   not need to wake any other blocked threads.  If the lock is contended
   and a thread decides to block using a futex operation, then this thread
   needs to first change the state to >1; if this state is observed during
   lock release, the releasing thread will wake one of the potentially
   blocked threads.
 ..
 */

基于以上的了解,基本对多线程中的资源同步有了简单的了解,而在实际编程中,可能会涉及更多的问题;

2000年以后,随着处理器架构的变化,很多编程语言都内建了并行编程的支持;而在C++中,一般还是通过pthread和OpenMP进行多线程编程;在C++11中,对并行编程模型进行了系统的封装,下面就了解一下C++11中的并行编程模型。

C++11并发编程框架

Concurrency means that two or more calculations happen within the same time frame, and there is usually some sort of dependency between them. Parallelism means that two or more calculations happen simultaneously. Parallel is subset of Concurrency;

在C++11中,std::thread进行了跨平台的线程封装;原来通过pthread的多线程编程,就可变为这样了:

int main()
{
    int n = 0;
    std::thread my_thread(f1, n + 1); // pass by value
    my_thread.join();
}

下面,从三个方面,对C++的并发编程框架进行简单介绍:

  1. 数据的原子操作,atomic/memory_order
  2. 数据的共享与互斥,thread_local/mutex/condition_variable
  3. 线程的执行与退出,CPU affinity/quick_exit

数据的原子操作

原子类型

之前对变量的互斥访问是通过lock/unlock构建一个临界区来互斥的,这样编程特别麻烦。在C++11中,在标准库内引入了多线程的支持,最重要的就是原子类型的引入。

  • atomic_*和atomic<T>

C11的新关键字_Atomic是同样的道理

在c++11之前的原子类型是通过内联汇编代码实现。对于每种原子类型,都提供了基本的原子操作,比如test_and_setfetch_and_addcompare_and_exchange等等。

相比于其他原子类型,automic_flag的访问是lock-free的,其只支持两种原子操作:test_and_setclear,一般基于atomic_flag实现一个spinlock。

std::atomic_flag lock = ATOMIC_FLAG_INIT;
void lock(int n) {
  	while(lock.test_and_set());
   }
void unlock(int n) {
	lock.clear();
  }

内存顺序模型

内存顺序模型分两类:软件层面和硬件层面。

硬件层面上,CPU在超标量流水线(Superscalar)执行中,一个时钟周期会执行多条指令。对于一些硬件上没有相关性的指令,这会造成代码执行顺序与原来不一样;软件层面上,在代码编译中,编译器也会会执行重新排列指令,进行乱序执行的优化;(多喝)

这都会造成实际执行顺序与原来编写的顺序不一致的问题。而如果我们要求某些语句的执行顺序需要和看到的一样,这就叫顺序一致的内存模型(Sequential consistent)。不同的处理器架构的内存模型不一样。如果指令严格按照顺序执行,这叫强顺序的架构(比如x86),否则叫弱顺序(比如PowerPC,ArmV7等)。

那么对于弱顺序的机器,为了保障执行的顺序,在汇编指令中有内存栅栏(memory barrier)的指令,比如PowerPC的sync,在执行到内存栅栏时,会等待之前的指令都执行完成后再执行后面的指令。memory barrier可认为有三种:

  1. full memory barrier:该指令前的acquire(read)/release(write)指令全部完成后,才能执行后面的指令。
  2. acquire(read, load) memory barrier:该指令前的acquire指令全部完成后,才能执行后面的指令。
  3. release(write, store) memory barrier:该指令前的release指令全部完成后,才能执行后面的指令。

对于原子类型默认是full memory barrier,这可能会限制并发性能的提升。而如果我们可以对于某些场景并不需要原子类型保证顺序一致性,在C++11中,通过memory_order来让程序员为原子操作指定内存顺序,这里的顺序对应的操作都是原子类型的存取操作,有如下6种:

  • memory_order_relaxed:不对执行顺序做任何保证;
  • memory_order_acquire:本线程中,后续的所有读操作需要在当前指令结束后执行;
  • memory_order_release:本线程中,之前的所有写操作都完成后,才能执行当前指令;
  • memory_order_acq_rel:结合上两条;
  • memory_order_consume:本线程中,后续与本原子变量相关的读写操作,必须在本指令完成后执行;
  • memory_order_seq_cst:全部读写都按顺序执行,默认值;

具体使用的时候,是通过多者的语义结合,完成多线程间的内存存取的语义,常见的就四种:memory_order_acquirememory_order_release经常结合使用,这种内存顺序成为release-acquire内存模型;而memory_order_consumememory_order_acquire的更加弱化版本,memory_order_releasememory_order_consume可以建立关于某原子变量的生产者-消费者顺序,这称为release-consume内存模型。加上memory_order_relaxed松散内存模型和默认的顺序一致型内存模型,是c++11中典型的四种内存模型。

另外,对于memory_order_acq_rel常用在实现CAS(compare and swap)同步原语,也称之为acquire-release内存顺序模型

在MySQL8中,对log模块进行了大量的重构,其中就包括异步无锁刷盘的逻辑,其中使用了大量的c++11的并行编程的特性,可作为学习C++11很好参考,比如这里就通过memory_order_release进行内存顺序的保证:

 using atomic_lsn_t = std::atomic<lsn_t>;
...
/* Do not reorder writes above, below this line. For x86 this
 * protects only from unlikely compile-time reordering. */
 std::atomic_thread_fence(std::memory_order_release);

数据的共享与互斥

线程间的数据共享需要保证并发执行的正确性,需要同步原语进行同步,而对于一些共享数据当不存在并发冲突的可以基于线程的本地存储。

线程本地存储

对于栈内变量,肯定就是线程本地的;但是对于全局变量,一般就是全局共享的,而如果我们想将一些全局变量变成线程本地存储的,在一个编译器中可以加__thread前缀,而在c++11中,对其进行了统一,使用thread_local关键字:

int thread_local errCode;

thread_local的具体实现方式不同的编译器不一样,有的是在一开始就分配好,有的是动态分配的,但一般来说thread_local变量的性能一般不高于全局变量。

mutex

通过mutex,对某段临界区进行互斥访问,可以通过lock_guard进行简化,lock_guard是按照RAII的方式进行mutex管理。

std::mutex my_mutex;
int main(int argc, char *argv[])
{
	{
		std::cout << "block in" << std::endl;
		std::lock_guard<std::mutex> lock(my_mutex);
		test t;
		std::cout << "block out" << std::endl;
	}
	std::cout << "out" << std::endl;
	return 0;
}

condition_variable

如果对某个数据的处理,线程间存在偏序关系,那么通过信号量进行同步;c++11中的信号量是condition_variable:

void waitingForWork(){
    std::cout << "Waiting " << std::endl;
    std::unique_lock<std::mutex> lck(mutex_);
    condVar.wait(lck);                       // (1)
    std::cout << "Running " << std::endl;
}

void setDataReady(){
    std::cout << "Data prepared" << std::endl;
    condVar.notify_one();                   // (2)
}

线程的执行与退出

CPU affinity

在Linux下,通过命令taskset或系统调用(sched_getaffinity)可以设定process与某个CPU核的绑定关系;如果通过pthread进行多线程编程,可以通过pthread_setaffinity_nppthread_attr_setaffinity_np进行线程与CPU的绑定。那么,在C++11中,可通过threads[i].native_handle()获得thread在当前平台的ID,然后通过相应的方式进行CPU绑定:

// Create a cpu_set_t object representing a set of CPUs. Clear it and mark
// only CPU i as set.
cpu_set_t cpuset;
CPU_ZERO(&cpuset);
CPU_SET(1, &cpuset);
int rc = pthread_setaffinity_np(my_threads.native_handle(),
                                sizeof(cpu_set_t), &cpuset);
if (rc != 0) {
  std::cerr << "Error calling pthread_setaffinity_np: " << rc << "\n";
}

快速退出

进程退出时,如果是异常退出,一般走abort,这时不会进行析构函数的调用,有可能会产生coredump;如果是正常退出,一般走exit,这时会进行析构函数的调用,也会调用atexit注册的清理函数。有时为了快速正常退出,不想走大量的析构函数,在C++11中,提供了quick_exit以及对应的at_quick_exit接口。

可重入(reentrant)函数、线程安全(thread-safe)函数、幂等(idempotence)函数:

首先这两个概念没有任何包含关系,一个函数可以满足两者也可以两者都不满足,或只满足其中一个。

可重入(reentrance func)概念的提出是在单进程单线程的背景下,意思是函数在执行的过程中,可以中断当前执行,再次调用该函数,而不会出错,当前的执行函数也不一定会等待。其常见的场景是发生了硬件中断或函数递归调用,

Boost:Fiber

关于并发,还有个值得一提的概念——协程;在go和python等语言中都提出了协程的概念,比如goroutine;Boost:Fiber可认为是C++中的协程,但是提供的功能更多

当然还有很多别的C++协程实现,这里就只了解下Boost中的,毕竟Boost是C++中很强大的库,甚至可以约等于基础库了😂

Fiber相比于thread的好处在于可以主动让出CPU(yield),这样在同一个thread中,可以有多条执行任务互相协作完成整体工作,因此Fiber的调度方式是主动的,并且是用户态进行context switch,更加轻量级(需要更少的CPU指令);

从另外一个角度也可以认为协程的concurrency问题少一些,毕竟是自己定义协作方式,主动让出资源。

fiber的调度通过fiber_manager管理,当fiber执行yield或者suspend等让出控制权的操作时,manager就会调度下一个fiber执行。具体的调度算法可以有多重,继承自algorithm类:

struct algorithm {
    virtual ~algorithm();
    virtual void awakened( context *) noexcept = 0;
    virtual context * pick_next() noexcept = 0;
    virtual bool has_ready_fibers() const noexcept = 0;
    virtual void suspend_until(        std::chrono::steady_clock::time_point const&) noexcept = 0;
    virtual void notify() noexcept = 0;
};

Boost:Fiber需要C++11的支持