点击上方“C语言与CPP编程”,选择“关注/置顶/星标公众号”
干货/资讯/热点/福利,每天准时送达!
最近有小伙伴说没有收到当天的文章推送,这是因为微信改了推送机制,确实会一部分有小伙伴刷不到当天的文章,一些比较实用的知识和信息可能会错过。所以建议大家加个星标⭐️,就能第一时间收到推送。
小伙伴们大家好,我是飞宇。
今天继续更新《Effective C++》和《C++并发编程实战》的读书笔记,下面是已经更新过的内容:
《C++并发编程实战》读书笔记(1):并发、线程管控
《C++并发编程实战》读书笔记(2):并发操作的同步
《C++并发编程实战》读书笔记(3):内存模型和原子操作
《Effective C++》读书笔记(1):让自己习惯C++
《Effective C++》读书笔记(2):构造/析构/赋值运算
《Effective C++》读书笔记(3):资源管理
本文包括第6章设计基于锁的并发数据结构与第7章设计无锁数据结构,后者实在有些烧脑了。此外,发现吴天明版的中译本有太多太离谱的翻译错误了,还得是中英对照才行:)
第6章 设计基于锁的并发数据结构
设计支持并发访问的数据结构时,一方面需要确保访问安全,通常需要限定其提供的接口,另一方面需要是按真正的并发操作,仅利用互斥保护并发实际上是串行化。
设计基于锁的并发数据结构的奥义就是确保先锁定合适的互斥,再访问数据,并尽可能缩短持锁时间。
可以采用锁实现线程安全的栈容器。
struct empty_stack : std::exception {
const char* what() const throw() { return "empty stack"; }
};
template <typename T>
class threadsafe_stack {
private:
std::stack
data; mutable std::mutex m;
public:
threadsafe_stack() {}
threadsafe_stack(const threadsafe_stack& other) {
std::lock_guard<std::mutex> lock(other.m);
data = other.data;
}
threadsafe_stack& operator=(const threadsafe_stack&) = delete;
void push(T new_value) {
std::lock_guard<std::mutex> lock(m);
data.push(std::move(new_value));
}
std::shared_ptr
pop() { std::lock_guard<std::mutex> lock(m);
if (data.empty()) throw empty_stack();
std::shared_ptr
const res( std::make_shared
(std::move(data.top()))) ;data.pop();
return res;
}
};
可以采用锁和条件变量实现线程安全的队列容器。data_queue中存储shared_ptr而非原始值,是为了把shared_ptr的初始化从wait_and_pop移动到push处,使得wait_and_pop中不会抛出异常。否则假如push操作通知条件变量时有多个消费者线程在等待,被notify_one通知到的消费者线程初始化shared_ptr时抛出异常,那么其他消费者线程不会被唤醒。
template <typename T>
class threadsafe_queue {
private:
mutable std::mutex mut;
std::queue<std::shared_ptr
> data_queue; std::condition_variable data_cond;
public:
threadsafe_queue() {}
std::shared_ptr
wait_and_pop() { std::unique_lock<std::mutex> lk(mut);
data_cond.wait(lk, [this] { return !data_queue.empty(); });
std::shared_ptr
res = data_queue.front(); data_queue.pop();
return res;
}
void push(T new_value) {
std::shared_ptr
data(std::make_shared (std::move(new_value))); std::lock_guard<std::mutex> lk(mut);
data_queue.push(data);
data_cond.notify_one();
}
};
上面的容器直接保护整个data_queue,只用了一个互斥,可以采取更精细粒度的锁操作。为此使用基于单向链表实现的队列,单向链表包含一个不含数据的头节点,后续的每个节点存储指向数据的指针与指向下一个节点的指针。这样的话,就可以对头尾节点分别加锁,减小锁的粒度。
template <typename T>
class threadsafe_queue {
private:
struct node {
std::shared_ptr
data; std::unique_ptr
next; };
std::mutex head_mutex;
std::mutex tail_mutex;
std::unique_ptr
head; node* tail;
std::condition_variable data_cond;
std::unique_ptr
wait_pop_head() { std::unique_lock<std::mutex> head_lock(wait_for_data());
return pop_head();
}
std::unique_lock<std::mutex> wait_for_data() {
std::unique_lock<std::mutex> head_lock(head_mutex);
data_cond.wait(head_lock, [&] { return head != get_tail(); });
return std::move(head_lock);
}
node* get_tail() {
std::lock_guard<std::mutex> tail_lock(tail_mutex);
return tail;
}
std::unique_ptr
pop_head() { std::unique_ptr
const old_head = std::move(head); head = std::move(old_head->next);
return old_head;
}
public:
threadsafe_queue() : head(new node), tail(head.get()) {}
threadsafe_queue(const threadsafe_queue& other) = delete;
threadsafe_queue& operator=(const threadsafe_queue& other) = delete;
std::shared_ptr
wait_and_pop() { std::unique_ptr
const old_head = wait_pop_head(); return old_head->data;
}
void push(T new_value) {
std::shared_ptr
new_data(std::make_shared (std::move(new_value))); std::unique_ptr
p(new node); {
std::lock_guard<std::mutex> tail_lock(tail_mutex);
tail->data = new_data;
node* const new_tail = p.get();
tail->next = std::move(p);
tail = new_tail;
}
data_cond.notify_one();
}
};
与栈和队列相比,大多数数据结构支持多种多样的操作,要考虑更多访问模式。例如对于字典来说,首先只考虑基本操作增删改查,其次实现上来说散列表比二叉树和有序数组更支持细粒度的锁,因此必须放弃std::map支持的多余接口、迭代器操作、默认的底层实现。
template <typename Key, typename Value, typename Hash = std::hash
> class threadsafe_lookup_table {
private:
class bucket_type {
private:
typedef std::pair
bucket_value; typedef std::list
bucket_data; typedef typename bucket_data::iterator bucket_iterator;
bucket_data data;
mutable std::shared_mutex mutex;
bucket_iterator find_entry_for(Key const& key) const {
return std::find_if(
data.begin(), data.end(),
[&](bucket_value const& item) { return item.first == key; });
}
public:
Value value_for(Key const& key, Value const& default_value) const {
std::shared_lock<std::shared_mutex> lock(mutex);
bucket_iterator const found_entry = find_entry_for(key);
return (found_entry == data.end()) ? default_value
: found_entry->second;
}
void add_or_update_mapping(Key const& key, Value const& value) {
std::unique_lock<std::shared_mutex> lock(mutex);
bucket_iterator const found_entry = find_entry_for(key);
if (found_entry == data.end()) {
data.push_back(bucket_value(key, value));
} else {
found_entry->second = value;
}
}
void remove_mapping(Key const& key) {
std::unique_lock<std::shared_mutex> lock(mutex);
bucket_iterator const found_entry = find_entry_for(key);
if (found_entry != data.end()) {
data.erase(found_entry);
}
}
};
std::vector<std::unique_ptr
> buckets; Hash hasher;
bucket_type& get_bucket(Key const& key) const {
std::size_t const bucket_index = hasher(key) % buckets.size();
return *buckets[bucket_index];
}
public:
typedef Key key_type;
typedef Value mapped_type;
typedef Hash hash_type;
threadsafe_lookup_table(unsigned num_buckets = 19,
Hash const& hasher_ = Hash())
: buckets(num_buckets), hasher(hasher_) {
for (unsigned i = 0; i < num_buckets; ++i) {
buckets[i].reset(new bucket_type);
}
}
threadsafe_lookup_table(threadsafe_lookup_table const& other) = delete;
threadsafe_lookup_table& operator=(threadsafe_lookup_table const& other) =
delete;
Value value_for(Key const& key,
Value const& default_value = Value()) const {
return get_bucket(key).value_for(key, default_value);
}
void add_or_update_mapping(Key const& key, Value const& value) {
get_bucket(key).add_or_update_mapping(key, value);
}
void remove_mapping(Key const& key) { get_bucket(key).remove_mapping(key); }
};
再例如对于链表来说,倘若想让链表支持迭代,而STL风格的迭代器的生命周期完全不受容器控制,可以以成员函数的形式提供迭代功能。为了减小锁的粒度,干脆让每个节点都有自己的互斥。
template <typename T>
class threadsafe_list {
struct node {
std::mutex m;
std::shared_ptr
data; std::unique_ptr
next;
node() : next() {}
node(T const& value) : data(std::make_shared
(value)) {} };
node head;
public:
threadsafe_list() {}
~threadsafe_list() {
remove_if([](T const&) { return true; });
}
threadsafe_list(threadsafe_list const& other) = delete;
threadsafe_list& operator=(threadsafe_list const& other) = delete;
void push_front(T const& value) {
std::unique_ptr
new_node(new node(value)); std::lock_guard<std::mutex> lk(head.m);
new_node->next = std::move(head.next);
head.next = std::move(new_node);
}
template <typename Function>
void for_each(Function f) {
node* current = &head;
std::unique_lock<std::mutex> lk(head.m);
while (node* const next = current->next.get()) {
std::unique_lock<std::mutex> next_lk(next->m);
lk.unlock();
f(*next->data);
current = next;
lk = std::move(next_lk);
}
}
};
第7章 设计无锁数据结构
非阻塞是指没有使用互斥、条件变量、future进行同步。仅仅非阻塞往往并不足够,例如第5章使用atomic_flag实现自旋锁,非阻塞但效率并不高。
无锁代表如果多个线程共同操作同一份数据,那么有限步骤内其中某一线程能够完成自己的操作。采用无锁结构可以最大限度地实现并发并且提高代码健壮性,避免锁阻塞其他线程,或者持锁线程抛出异常时其他线程无法继续处理。
以下是采用引用计数和宽松原子操作的无锁栈容器的实现。书中这段代码讲解了28页,有兴趣的可以仔细读读原文。简略地讲,引用计数的作用是避免pop时重复delete;由于缺乏原子的共享指针,所以将引用计数分为内部与外部两个,手动管理引用计数,每当指针被读取外部计数器自增,读取完成内部计数器自减。
template <typename T>
class lock_free_stack {
private:
struct node;
struct counted_node_ptr {
int external_count;
node* ptr;
};
struct node {
std::shared_ptr
data; std::atomic<int> internal_count;
counted_node_ptr next;
node(T const& data_)
: data(std::make_shared
(data_)), internal_count(0) {} };
// counted_node_ptr足够小,可以无锁实现原子变量
std::atomic
head; void increase_head_count(counted_node_ptr& old_counter) {
counted_node_ptr new_counter;
do {
new_counter = old_counter;
++new_counter.external_count;
} while (!head.compare_exchange_strong(old_counter, new_counter,
std::memory_order_acquire,
std::memory_order_relaxed));
old_counter.external_count = new_counter.external_count;
}
public:
~lock_free_stack() {
while (pop())
;
}
void push(T const& data) {
counted_node_ptr new_node;
new_node.ptr = new node(data);
new_node.external_count = 1;
new_node.ptr->next = head.load(std::memory_order_relaxed);
// 若head==next则head=next返回true
// 否则代表head被其他线程修改,则next=head返回false
while (!head.compare_exchange_weak(new_node.ptr->next, new_node,
std::memory_order_release,
std::memory_order_relaxed))
;
}
std::shared_ptr
pop() { counted_node_ptr old_head = head.load(std::memory_order_relaxed);
for (;;) {
// 递增外部计数,表示正被指涉
increase_head_count(old_head);
node* const ptr = old_head.ptr;
// 已经到栈底
if (!ptr) {
return std::shared_ptr
(); }
if (head.compare_exchange_strong(old_head, ptr->next,
std::memory_order_relaxed)) {
// 当前线程成功弹出并独占head
std::shared_ptr
res; res.swap(ptr->data);
// 头节点弹出栈,当前线程也不访问,总共减2
int const count_increase = old_head.external_count - 2;
if (ptr->internal_count.fetch_add(count_increase,
std::memory_order_release) ==
-count_increase) {
// 内部计数为0,删除节点
delete ptr;
}
return res;
// 如果当前线程最后一个持有指针
} else if (ptr->internal_count.fetch_add(
-1, std::memory_order_relaxed) == 1) {
ptr->internal_count.load(std::memory_order_acquire);
delete ptr;
}
}
}
};
与栈不同的是,对于队列结构,push与pop访问不同部分
template <typename T>
class lock_free_queue {
private:
struct node;
struct counted_node_ptr {
int external_count;
node* ptr;
};
std::atomic
head; std::atomic
tail;
struct node_counter {
unsigned internal_count : 30;
unsigned external_counters : 2;
};
struct node {
std::atomic
data; std::atomic
count; std::atomic
next;
node() {
node_counter new_count;
new_count.internal_count = 0;
new_count.external_counters = 2;
count.store(new_count);
next.ptr = nullptr;
next.external_count = 0;
}
// 针对某节点释放引用
void release_ref() {
node_counter old_counter = count.load(std::memory_order_relaxed);
node_counter new_counter;
do {
new_counter = old_counter;
--new_counter.internal_count;
} while (!count.compare_exchange_strong(old_counter, new_counter,
std::memory_order_acquire,
std::memory_order_relaxed));
if (!new_counter.internal_count && !new_counter.external_counters) {
delete this;
}
}
};
// 针对节点释放其外部计数器
static void free_external_counter(counted_node_ptr& old_node_ptr) {
node* const ptr = old_node_ptr.ptr;
int const count_increase = old_node_ptr.external_count - 2;
node_counter old_counter = ptr->count.load(std::memory_order_relaxed);
node_counter new_counter;
do {
new_counter = old_counter;
--new_counter.external_counters;
new_counter.internal_count += count_increase;
} while (!ptr->count.compare_exchange_strong(
old_counter, new_counter, std::memory_order_acquire,
std::memory_order_relaxed));
if (!new_counter.internal_count && !new_counter.external_counters) {
delete ptr;
}
}
// 针对某节点获取新的引用
static void increase_external_count(std::atomic
& counter, counted_node_ptr& old_counter) {
counted_node_ptr new_counter;
do {
new_counter = old_counter;
++new_counter.external_count;
} while (!counter.compare_exchange_strong(old_counter, new_counter,
std::memory_order_acquire,
std::memory_order_relaxed));
old_counter.external_count = new_counter.external_count;
}
void set_new_tail(counted_node_ptr& old_tail,
counted_node_ptr const& new_tail) {
node* const current_tail_ptr = old_tail.ptr;
while (!tail.compare_exchange_weak(old_tail, new_tail) &&
old_tail.ptr == current_tail_ptr)
;
if (old_tail.ptr == current_tail_ptr)
free_external_counter(old_tail);
else
current_tail_ptr->release_ref();
}
public:
std::unique_ptr
pop() { counted_node_ptr old_head = head.load(std::memory_order_relaxed);
for (;;) {
increase_external_count(head, old_head);
node* const ptr = old_head.ptr;
if (ptr == tail.load().ptr) {
return std::unique_ptr
(); }
counted_node_ptr next = ptr->next.load();
if (head.compare_exchange_strong(old_head, next)) {
T* const res = ptr->data.exchange(nullptr);
free_external_counter(old_head);
return std::unique_ptr
(res); }
ptr->release_ref();
}
}
void push(T new_value) {
std::unique_ptr
new_data(new T(new_value)); counted_node_ptr new_next;
new_next.ptr = new node;
new_next.external_count = 1;
counted_node_ptr old_tail = tail.load();
for (;;) {
increase_external_count(tail, old_tail);
T* old_data = nullptr;
if (old_tail.ptr->data.compare_exchange_strong(old_data,
new_data.get())) {
counted_node_ptr old_next = {0};
if (!old_tail.ptr->next.compare_exchange_strong(old_next,
new_next)) {
delete new_next.ptr;
new_next = old_next;
}
set_new_tail(old_tail, new_next);
new_data.release();
break;
} else {
// 更新失败,转而协助成功的线程
counted_node_ptr old_next = {0};
// 将next指向本线程分配的节点充当尾节点
if (old_tail.ptr->next.compare_exchange_strong(old_next,
new_next)) {
old_next = new_next;
// 成功的话分配新节点为下次压入做准备
new_next.ptr = new node;
}
// 设置尾节点,重新循环
set_new_tail(old_tail, old_next);
}
}
}
};
相信读者已经了解了正确写出无锁代码的困难与繁复。如果想自行设计,请注意以下原则:
1、在原型设计中使用std::memory_order_seq_cst次序,便于分析和推理;
2、使用无锁的内存回收方案,例如上面的引用计数;
3、防范ABA问题,即两次读取变量的值都相同,但其实变量已经被修改过多次,解决办法是将变量与其计数器绑定;
4、找出忙等循环,协助其他线程,例如两线程同时压入队列的话某一线程就会忙等循环,可以像上面队列中的实现一样,多个线程同时push只有一个能成功,失败的线程转而协助成功线程。
—— EOF —— 你好,我是飞宇,本硕均于某中流985 CS就读,先后于百度搜索以及字节跳动电商等部门担任Linux C/C++后端研发工程师。
同时,我也是知乎博主@韩飞宇,日常分享C/C++、计算机学习经验、工作体会,欢迎点击此处查看我以前的学习笔记&经验&分享的资源。
我组建了一些社群一起交流,群里有大牛也有小白,如果你有意可以一起进群交流。
欢迎你添加我的微信,我拉你进技术交流群。此外,我也会经常在微信上分享一些计算机学习经验以及工作体验,还有一些内推机会。
加个微信,打开另一扇窗