《C++并发编程实战》读书笔记(4):设计并发数据结构

C语言与CPP编程 2023-07-21 08:30

击上方“C语言与CPP编程”,选择“关注/置顶/星标公众号

干货/资讯/热点/福利,每天准时送达!

最近有小伙伴说没有收到当天的文章推送,这是因为微信改了推送机制,确实会一部分有小伙伴刷不到当天的文章,一些比较实用的知识和信息可能会错过。所以建议大家加个星标⭐️,就能第一时间收到推送。

小伙伴们大家好,我是飞宇

今天继续更新《Effective C++》和《C++并发编程实战的读书笔记,下面是已经更新过的内容:

《C++并发编程实战》读书笔记(1):并发、线程管控

《C++并发编程实战》读书笔记(2):并发操作的同步

《C++并发编程实战》读书笔记(3):内存模型和原子操作

《Effective C++》读书笔记(1):让自己习惯C++

《Effective C++》读书笔记(2):构造/析构/赋值运算

《Effective C++》读书笔记(3):资源管理

本文包括第6章设计基于锁的并发数据结构与第7章设计无锁数据结构,后者实在有些烧脑了。此外,发现吴天明版的中译本有太多太离谱的翻译错误了,还得是中英对照才行:

第6章 设计基于锁的并发数据结构

    设计支持并发访问的数据结构时,一方面需要确保访问安全,通常需要限定其提供的接口,另一方面需要是按真正的并发操作,仅利用互斥保护并发实际上是串行化。

    设计基于锁的并发数据结构的奥义就是确保先锁定合适的互斥,再访问数据,并尽可能缩短持锁时间。

    可以采用锁实现线程安全的栈容器。

struct empty_stack : std::exception {    const char* what() const throw() { return "empty stack"; }};
template <typename T>class threadsafe_stack {private: std::stack data; mutable std::mutex m;
public: threadsafe_stack() {} threadsafe_stack(const threadsafe_stack& other) { std::lock_guard<std::mutex> lock(other.m); data = other.data; } threadsafe_stack& operator=(const threadsafe_stack&) = delete;
void push(T new_value) { std::lock_guard<std::mutex> lock(m); data.push(std::move(new_value)); } std::shared_ptr pop() { std::lock_guard<std::mutex> lock(m); if (data.empty()) throw empty_stack(); std::shared_ptr const res( std::make_shared(std::move(data.top()))); data.pop(); return res;    }};


    可以采用锁和条件变量实现线程安全的队列容器。data_queue中存储shared_ptr而非原始值,是为了把shared_ptr的初始化从wait_and_pop移动到push处,使得wait_and_pop中不会抛出异常。否则假如push操作通知条件变量时有多个消费者线程在等待,被notify_one通知到的消费者线程初始化shared_ptr时抛出异常,那么其他消费者线程不会被唤醒。

#include #include #include #include 
template <typename T>class threadsafe_queue {private: mutable std::mutex mut; std::queue<std::shared_ptr > data_queue; std::condition_variable data_cond;
public:    threadsafe_queue() {}
std::shared_ptr wait_and_pop() { std::unique_lock<std::mutex> lk(mut); data_cond.wait(lk, [this] { return !data_queue.empty(); }); std::shared_ptr res = data_queue.front(); data_queue.pop(); return res;    }
void push(T new_value) { std::shared_ptr data(std::make_shared(std::move(new_value))); std::lock_guard<std::mutex> lk(mut); data_queue.push(data); data_cond.notify_one(); }};


    上面的容器直接保护整个data_queue,只用了一个互斥,可以采取更精细粒度的锁操作。为此使用基于单向链表实现的队列,单向链表包含一个不含数据的头节点,后续的每个节点存储指向数据的指针与指向下一个节点的指针。这样的话,就可以对头尾节点分别加锁,减小锁的粒度。

template <typename T>class threadsafe_queue {private:    struct node {        std::shared_ptr data;        std::unique_ptr next;    };
std::mutex head_mutex;    std::mutex tail_mutex;    std::unique_ptr head; node* tail; std::condition_variable data_cond;     std::unique_ptr wait_pop_head() { std::unique_lock<std::mutex> head_lock(wait_for_data()); return pop_head();    }        std::unique_lock<std::mutex> wait_for_data() { std::unique_lock<std::mutex> head_lock(head_mutex); data_cond.wait(head_lock, [&] { return head != get_tail(); }); return std::move(head_lock); }        node* get_tail() { std::lock_guard<std::mutex> tail_lock(tail_mutex); return tail; }     std::unique_ptr pop_head() { std::unique_ptr const old_head = std::move(head); head = std::move(old_head->next); return old_head; }
public: threadsafe_queue() : head(new node), tail(head.get()) {} threadsafe_queue(const threadsafe_queue& other) = delete; threadsafe_queue& operator=(const threadsafe_queue& other) = delete;
    std::shared_ptr wait_and_pop() { std::unique_ptr const old_head = wait_pop_head(); return old_head->data;    }     void push(T new_value) { std::shared_ptr new_data(std::make_shared(std::move(new_value))); std::unique_ptr p(new node); { std::lock_guard<std::mutex> tail_lock(tail_mutex); tail->data = new_data; node* const new_tail = p.get(); tail->next = std::move(p); tail = new_tail; } data_cond.notify_one();    }};


    与栈和队列相比,大多数数据结构支持多种多样的操作,要考虑更多访问模式。例如对于字典来说,首先只考虑基本操作增删改查,其次实现上来说散列表比二叉树和有序数组更支持细粒度的锁,因此必须放弃std::map支持的多余接口、迭代器操作、默认的底层实现。

template <typename Key, typename Value, typename Hash = std::hash >class threadsafe_lookup_table {private:    class bucket_type {    private:        typedef std::pair bucket_value;        typedef std::list bucket_data;        typedef typename bucket_data::iterator bucket_iterator;
bucket_data data; mutable std::shared_mutex mutex;
bucket_iterator find_entry_for(Key const& key) const { return std::find_if( data.begin(), data.end(), [&](bucket_value const& item) { return item.first == key; }); }
public: Value value_for(Key const& key, Value const& default_value) const { std::shared_lock<std::shared_mutex> lock(mutex); bucket_iterator const found_entry = find_entry_for(key); return (found_entry == data.end()) ? default_value : found_entry->second; }
void add_or_update_mapping(Key const& key, Value const& value) { std::unique_lock<std::shared_mutex> lock(mutex); bucket_iterator const found_entry = find_entry_for(key); if (found_entry == data.end()) { data.push_back(bucket_value(key, value)); } else { found_entry->second = value; } }
void remove_mapping(Key const& key) { std::unique_lock<std::shared_mutex> lock(mutex); bucket_iterator const found_entry = find_entry_for(key); if (found_entry != data.end()) { data.erase(found_entry); } } };
std::vector<std::unique_ptr > buckets; Hash hasher;
bucket_type& get_bucket(Key const& key) const { std::size_t const bucket_index = hasher(key) % buckets.size(); return *buckets[bucket_index]; }
public: typedef Key key_type; typedef Value mapped_type; typedef Hash hash_type;
threadsafe_lookup_table(unsigned num_buckets = 19, Hash const& hasher_ = Hash()) : buckets(num_buckets), hasher(hasher_) { for (unsigned i = 0; i < num_buckets; ++i) { buckets[i].reset(new bucket_type); } }
threadsafe_lookup_table(threadsafe_lookup_table const& other) = delete; threadsafe_lookup_table& operator=(threadsafe_lookup_table const& other) = delete;
Value value_for(Key const& key, Value const& default_value = Value()) const { return get_bucket(key).value_for(key, default_value); }
void add_or_update_mapping(Key const& key, Value const& value) { get_bucket(key).add_or_update_mapping(key, value); }
void remove_mapping(Key const& key) { get_bucket(key).remove_mapping(key); }};

  

  再例如对于链表来说,倘若想让链表支持迭代,而STL风格的迭代器的生命周期完全不受容器控制,可以以成员函数的形式提供迭代功能。为了减小锁的粒度,干脆让每个节点都有自己的互斥。

template <typename T>class threadsafe_list {    struct node {        std::mutex m;        std::shared_ptr data;        std::unique_ptr next;
        node() : next() {} node(T const& value) : data(std::make_shared(value)) {} };
node head;
public: threadsafe_list() {}
~threadsafe_list() { remove_if([](T const&) { return true; }); }
threadsafe_list(threadsafe_list const& other) = delete; threadsafe_list& operator=(threadsafe_list const& other) = delete;
void push_front(T const& value) { std::unique_ptr new_node(new node(value)); std::lock_guard<std::mutex> lk(head.m); new_node->next = std::move(head.next); head.next = std::move(new_node); }
template <typename Function> void for_each(Function f) { node* current = &head; std::unique_lock<std::mutex> lk(head.m); while (node* const next = current->next.get()) { std::unique_lock<std::mutex> next_lk(next->m); lk.unlock(); f(*next->data); current = next; lk = std::move(next_lk); }    }};

第7章 设计无锁数据结构


    非阻塞是指没有使用互斥、条件变量、future进行同步。仅仅非阻塞往往并不足够,例如第5章使用atomic_flag实现自旋锁,非阻塞但效率并不高。

    无锁代表如果多个线程共同操作同一份数据,那么有限步骤内其中某一线程能够完成自己的操作。采用无锁结构可以最大限度地实现并发并且提高代码健壮性,避免锁阻塞其他线程,或者持锁线程抛出异常时其他线程无法继续处理。

    以下是采用引用计数和宽松原子操作的无锁栈容器的实现。书中这段代码讲解了28页,有兴趣的可以仔细读读原文。简略地讲,引用计数的作用是避免pop时重复delete;由于缺乏原子的共享指针,所以将引用计数分为内部与外部两个,手动管理引用计数,每当指针被读取外部计数器自增,读取完成内部计数器自减。

template <typename T>class lock_free_stack {private:    struct node;    struct counted_node_ptr {        int external_count;        node* ptr;    };    struct node {        std::shared_ptr data;        std::atomic<int> internal_count;        counted_node_ptr next;        node(T const& data_)            : data(std::make_shared(data_)), internal_count(0) {}    };    // counted_node_ptr足够小,可以无锁实现原子变量    std::atomic head;    void increase_head_count(counted_node_ptr& old_counter) {        counted_node_ptr new_counter;        do {            new_counter = old_counter;            ++new_counter.external_count;        } while (!head.compare_exchange_strong(old_counter, new_counter,                                               std::memory_order_acquire,                                               std::memory_order_relaxed));        old_counter.external_count = new_counter.external_count;    }
public: ~lock_free_stack() { while (pop()) ; } void push(T const& data) { counted_node_ptr new_node; new_node.ptr = new node(data); new_node.external_count = 1; new_node.ptr->next = head.load(std::memory_order_relaxed);    // 若head==next则head=next返回true    // 否则代表head被其他线程修改,则next=head返回false while (!head.compare_exchange_weak(new_node.ptr->next, new_node, std::memory_order_release, std::memory_order_relaxed)) ; } std::shared_ptr pop() { counted_node_ptr old_head = head.load(std::memory_order_relaxed); for (;;) {    // 递增外部计数,表示正被指涉 increase_head_count(old_head); node* const ptr = old_head.ptr;    // 已经到栈底 if (!ptr) { return std::shared_ptr();            } if (head.compare_exchange_strong(old_head, ptr->next, std::memory_order_relaxed)) {    // 当前线程成功弹出并独占head std::shared_ptr res; res.swap(ptr->data);    // 头节点弹出栈,当前线程也不访问,总共减2 int const count_increase = old_head.external_count - 2; if (ptr->internal_count.fetch_add(count_increase, std::memory_order_release) == -count_increase) {    // 内部计数为0,删除节点 delete ptr; } return res;    // 如果当前线程最后一个持有指针 } else if (ptr->internal_count.fetch_add( -1, std::memory_order_relaxed) == 1) { ptr->internal_count.load(std::memory_order_acquire); delete ptr; } } }};


    与栈不同的是,对于队列结构,push与pop访问不同部分


template <typename T>class lock_free_queue {private:    struct node;    struct counted_node_ptr {        int external_count;        node* ptr;    };    std::atomic head;    std::atomic tail;
struct node_counter { unsigned internal_count : 30; unsigned external_counters : 2; };
struct node { std::atomic data; std::atomic count; std::atomic next;
node() { node_counter new_count; new_count.internal_count = 0; new_count.external_counters = 2; count.store(new_count); next.ptr = nullptr; next.external_count = 0; }     // 针对某节点释放引用 void release_ref() { node_counter old_counter = count.load(std::memory_order_relaxed); node_counter new_counter; do { new_counter = old_counter; --new_counter.internal_count; } while (!count.compare_exchange_strong(old_counter, new_counter, std::memory_order_acquire, std::memory_order_relaxed)); if (!new_counter.internal_count && !new_counter.external_counters) { delete this; } } }; // 针对节点释放其外部计数器 static void free_external_counter(counted_node_ptr& old_node_ptr) { node* const ptr = old_node_ptr.ptr; int const count_increase = old_node_ptr.external_count - 2; node_counter old_counter = ptr->count.load(std::memory_order_relaxed); node_counter new_counter; do { new_counter = old_counter; --new_counter.external_counters; new_counter.internal_count += count_increase; } while (!ptr->count.compare_exchange_strong( old_counter, new_counter, std::memory_order_acquire, std::memory_order_relaxed)); if (!new_counter.internal_count && !new_counter.external_counters) { delete ptr; } }    // 针对某节点获取新的引用 static void increase_external_count(std::atomic& counter, counted_node_ptr& old_counter) { counted_node_ptr new_counter; do { new_counter = old_counter; ++new_counter.external_count; } while (!counter.compare_exchange_strong(old_counter, new_counter, std::memory_order_acquire, std::memory_order_relaxed)); old_counter.external_count = new_counter.external_count; }
void set_new_tail(counted_node_ptr& old_tail, counted_node_ptr const& new_tail) { node* const current_tail_ptr = old_tail.ptr; while (!tail.compare_exchange_weak(old_tail, new_tail) && old_tail.ptr == current_tail_ptr) ; if (old_tail.ptr == current_tail_ptr) free_external_counter(old_tail); else current_tail_ptr->release_ref(); }
public: std::unique_ptr pop() { counted_node_ptr old_head = head.load(std::memory_order_relaxed); for (;;) { increase_external_count(head, old_head); node* const ptr = old_head.ptr; if (ptr == tail.load().ptr) { return std::unique_ptr(); } counted_node_ptr next = ptr->next.load(); if (head.compare_exchange_strong(old_head, next)) { T* const res = ptr->data.exchange(nullptr); free_external_counter(old_head); return std::unique_ptr(res); } ptr->release_ref(); } }
void push(T new_value) { std::unique_ptr new_data(new T(new_value)); counted_node_ptr new_next; new_next.ptr = new node; new_next.external_count = 1; counted_node_ptr old_tail = tail.load(); for (;;) { increase_external_count(tail, old_tail); T* old_data = nullptr; if (old_tail.ptr->data.compare_exchange_strong(old_data, new_data.get())) { counted_node_ptr old_next = {0}; if (!old_tail.ptr->next.compare_exchange_strong(old_next, new_next)) { delete new_next.ptr; new_next = old_next; } set_new_tail(old_tail, new_next); new_data.release(); break; } else {         // 更新失败,转而协助成功的线程 counted_node_ptr old_next = {0};         // 将next指向本线程分配的节点充当尾节点 if (old_tail.ptr->next.compare_exchange_strong(old_next, new_next)) { old_next = new_next;         // 成功的话分配新节点为下次压入做准备 new_next.ptr = new node; }         // 设置尾节点,重新循环 set_new_tail(old_tail, old_next); } } }};


    相信读者已经了解了正确写出无锁代码的困难与繁复。如果想自行设计,请注意以下原则:

    1、在原型设计中使用std::memory_order_seq_cst次序,便于分析和推理;

    2、使用无锁的内存回收方案,例如上面的引用计数;

    3、防范ABA问题,即两次读取变量的值都相同,但其实变量已经被修改过多次,解决办法是将变量与其计数器绑定;

    4、找出忙等循环,协助其他线程,例如两线程同时压入队列的话某一线程就会忙等循环,可以像上面队列中的实现一样,多个线程同时push只有一个能成功,失败的线程转而协助成功线程。

EOF

你好,我是飞宇,本硕均于某中流985 CS就读,先后于百度搜索以及字节跳动电商等部门担任Linux C/C++后端研发工程师。

同时,我也是知乎博主@韩飞宇,日常分享C/C++、计算机学习经验、工作体会,欢迎点击此处查看我以前的学习笔记&经验&分享的资源。

我组建了一些社群一起交流,群里有大牛也有小白,如果你有意可以一起进群交流。

欢迎你添加我的微信,我拉你进技术交流群。此外,我也会经常在微信上分享一些计算机学习经验以及工作体验,还有一些内推机会

加个微信,打开另一扇窗

C语言与CPP编程 C语言/C++开发,C语言/C++基础知识,C语言/C++学习路线,C语言/C++进阶,数据结构;算法;python;计算机基础等
评论
  • 更多生命体征指标风靡的背后都只有一个原因:更多人将健康排在人生第一顺位!“AGEs,也就是晚期糖基化终末产物,英文名Advanced Glycation End-products,是存在于我们体内的一种代谢产物” 艾迈斯欧司朗亚太区健康监测高级市场经理王亚琴说道,“相信业内的朋友都会有关注,最近该指标的热度很高,它可以用来评估人的生活方式是否健康。”据悉,AGEs是可穿戴健康监测领域的一个“萌新”指标,近来备受关注。如果站在学术角度来理解它,那么AGEs是在非酶促条件下,蛋白质、氨基酸
    艾迈斯欧司朗 2025-02-27 14:50 400浏览
  • 振动样品磁强计是一种用于测量材料磁性的精密仪器,广泛应用于科研、工业检测等领域。然而,其测量准确度会受到多种因素的影响,下面我们将逐一分析这些因素。一、温度因素温度是影响振动样品磁强计测量准确度的重要因素之一。随着温度的变化,材料的磁性也会发生变化,从而影响测量结果的准确性。因此,在进行磁性测量时,应确保恒温环境,以减少温度波动对测量结果的影响。二、样品制备样品的制备过程同样会影响振动样品磁强计的测量准确度。样品的形状、尺寸和表面处理等因素都会对测量结果产生影响。为了确保测量准确度,应严格按照规
    锦正茂科技 2025-02-28 14:05 134浏览
  • 在2024年的科技征程中,具身智能的发展已成为全球关注的焦点。从实验室到现实应用,这一领域正以前所未有的速度推进,改写着人类与机器的互动边界。这一年,我们见证了具身智能技术的突破与变革,它不仅落地各行各业,带来新的机遇,更在深刻影响着我们的生活方式和思维方式。随着相关技术的飞速发展,具身智能不再仅仅是一个技术概念,更像是一把神奇的钥匙。身后的众多行业,无论愿意与否,都像是被卷入一场伟大变革浪潮中的船只,注定要被这股汹涌的力量重塑航向。01为什么是具身智能?为什么在中国?最近,中国具身智能行业的进
    艾迈斯欧司朗 2025-02-28 15:45 221浏览
  • RGB灯光无法同步?细致的动态光效设定反而成为产品客诉来源!随着科技的进步和消费者需求变化,电脑接口设备单一功能性已无法满足市场需求,因此在产品上增加「动态光效」的形式便应运而生,藉此吸引消费者目光。这种RGB灯光效果,不仅能增强电脑周边产品的视觉吸引力,还能为用户提供个性化的体验,展现独特自我风格。如今,笔记本电脑、键盘、鼠标、鼠标垫、耳机、显示器等多种电脑接口设备多数已配备动态光效。这些设备的灯光效果会随着音乐节奏、游戏情节或使用者的设置而变化。想象一个画面,当一名游戏玩家,按下电源开关,整
    百佳泰测试实验室 2025-02-27 14:15 137浏览
  •           近日受某专业机构邀请,参加了官方举办的《广东省科技创新条例》宣讲会。在与会之前,作为一名技术工作者一直认为技术的法例都是保密和侵权方面的,而潜意识中感觉法律有束缚创新工作的进行可能。通过一个上午学习新法,对广东省的科技创新有了新的认识。广东是改革的前沿阵地,是科技创新的沃土,企业是创新的主要个体。《广东省科技创新条例》是广东省为促进科技创新、推动高质量发展而制定的地方性法规,主要内容包括: 总则:明确立法目
    广州铁金刚 2025-02-28 10:14 103浏览
  • 应用趋势与客户需求,AI PC的未来展望随着人工智能(AI)技术的日益成熟,AI PC(人工智能个人电脑)逐渐成为消费者和企业工作中的重要工具。这类产品集成了最新的AI处理器,如NPU、CPU和GPU,并具备许多智能化功能,为用户带来更高效且直观的操作体验。AI PC的目标是提升工作和日常生活的效率,通过深度学习与自然语言处理等技术,实现更流畅的多任务处理、实时翻译、语音助手、图像生成等功能,满足现代用户对生产力和娱乐的双重需求。随着各行各业对数字转型需求的增长,AI PC也开始在各个领域中显示
    百佳泰测试实验室 2025-02-27 14:08 255浏览
  • 1,微软下载免费Visual Studio Code2,安装C/C++插件,如果无法直接点击下载, 可以选择手动install from VSIX:ms-vscode.cpptools-1.23.6@win32-x64.vsix3,安装C/C++编译器MniGW (MinGW在 Windows 环境下提供类似于 Unix/Linux 环境下的开发工具,使开发者能够轻松地在 Windows 上编写和编译 C、C++ 等程序.)4,C/C++插件扩展设置中添加Include Path 5,
    黎查 2025-02-28 14:39 140浏览
  •         近日,广电计量在聚焦离子束(FIB)领域编写的专业著作《聚焦离子束:失效分析》正式出版,填补了国内聚焦离子束领域实践性专业书籍的空白,为该领域的技术发展与知识传播提供了重要助力。         随着芯片技术不断发展,芯片的集成度越来越高,结构也日益复杂。这使得传统的失效分析方法面临巨大挑战。FIB技术的出现,为芯片失效分析带来了新的解决方案。它能够在纳米尺度上对芯片进行精确加工和分析。当芯
    广电计量 2025-02-28 09:15 116浏览
  • 一、VSM的基本原理震动样品磁强计(Vibrating Sample Magnetometer,简称VSM)是一种灵敏且高效的磁性测量仪器。其基本工作原理是利用震动样品在探测线圈中引起的变化磁场来产生感应电压,这个感应电压与样品的磁矩成正比。因此,通过测量这个感应电压,我们就能够精确地确定样品的磁矩。在VSM中,被测量的样品通常被固定在一个震动头上,并以一定的频率和振幅震动。这种震动在探测线圈中引起了变化的磁通量,从而产生了一个交流电信号。这个信号的幅度和样品的磁矩有着直接的关系。因此,通过仔细
    锦正茂科技 2025-02-28 13:30 100浏览
  • 在物联网领域中,无线射频技术作为设备间通信的核心手段,已深度渗透工业自动化、智慧城市及智能家居等多元场景。然而,随着物联网设备接入规模的不断扩大,如何降低运维成本,提升通信数据的传输速度和响应时间,实现更广泛、更稳定的覆盖已成为当前亟待解决的系统性难题。SoC无线收发模块-RFM25A12在此背景下,华普微创新推出了一款高性能、远距离与高性价比的Sub-GHz无线SoC收发模块RFM25A12,旨在提升射频性能以满足行业中日益增长与复杂的设备互联需求。值得一提的是,RFM25A12还支持Wi-S
    华普微HOPERF 2025-02-28 09:06 143浏览
  • 美国加州CEC能效跟DOE能效有什么区别?CEC/DOE是什么关系?美国加州CEC能效跟DOE能效有什么区别?CEC/DOE是什么关系?‌美国加州CEC能效认证与美国DOE能效认证在多个方面存在显著差异‌。认证范围和适用地区‌CEC能效认证‌:仅适用于在加利福尼亚州销售的电器产品。CEC认证的范围包括制冷设备、房间空调、中央空调、便携式空调、加热器、热水器、游泳池加热器、卫浴配件、光源、应急灯具、交通信号模块、灯具、洗碗机、洗衣机、干衣机、烹饪器具、电机和压缩机、变压器、外置电源、消费类电子设备
    张工nx808593 2025-02-27 18:04 120浏览
我要评论
0
点击右上角,分享到朋友圈 我知道啦
请使用浏览器分享功能 我知道啦