《C++并发编程实战》读书笔记(6):高级线程管理、并行算法函数、测试与除错

C语言与CPP编程 2023-09-13 08:30

击上方“C语言与CPP编程”,选择“关注/置顶/星标公众号

干货福利,第一时间送达!

最近有小伙伴说没有收到当天的文章推送,这是因为微信改了推送机制,有一部分小伙伴刷不到当天的文章,一些比较实用的知识和信息,错过了就是错过了,建议大家加个星标⭐️,就能第一时间收到推送。

小伙伴们大家好,我是飞宇。


最近李伟老师讲了《C++ Primer 5th》这本书的视频,他是美国微软高级工程师,清华大学博士,帮忙推广一下,感兴趣的可以看看。


今天继续更新《Effective C++》和《C++并发编程实战的读书笔记,下面是已经更新过的内容:

《C++并发编程实战》读书笔记(1):并发、线程管控

《C++并发编程实战》读书笔记(2):并发操作的同步

《C++并发编程实战》读书笔记(3):内存模型和原子操作

《C++并发编程实战》读书笔记(4):设计并发数据结构

《Effective C++》读书笔记(1):让自己习惯C++

《Effective C++》读书笔记(2):构造/析构/赋值运算

《Effective C++》读书笔记(3):资源管理

《Effective C++》读书笔记(4):设计与声明

《Effective C++》读书笔记(5):实现

第9章 高级线程管理

9.1 线程池

    大多数程序中并不方便给每个任务分配单独的线程,但仍可通过线程池来充分利用可调配的并发算力:将可同时执行的任务提交到线程池,放入任务队列中等待,工作线程循环地领取并执行任务。

    以下是一种实现,提交任务后返回future,提交者可通过future获取任务结果,任务先被包装成packaged_task再被包装成function,由工作线程来处理。

class ThreadPool {private:    std::vector<std::thread> threads;    ThreadsafeQueue<std::function<void()>> taskQueue;    std::atomic<bool> stop;    join_threads joiner;public:    ThreadPool(size_t numThreads = std::thread::hardware_concurrency())         : stop(false),joiner(threads) {        for (size_t i = 0; i < numThreads; ++i) {            threads.emplace_back([this]() {                while (!stop) {                    run_pending_task();                }            });        }    }        // 避免所有线程都在等待其他线程完成任务    void run_pending_task(){        std::function<void()> task;                       if (taskQueue.try_pop(task))            task();        else            std::this_thread::yield();    }        ~ThreadPool() {        stop = true;    }
template<class F, class... Args>    auto submit(F&& fArgs&&... args) -> std::future<decltype(f(args...))> { using ReturnType = decltype(f(args...)); auto task = std::make_shared<std::packaged_task>( std::bind(std::forward(f), std::forward(args)...) ); std::future result = task->get_future(); taskQueue.push([task]() { (*task)(); }); return result;    }};


    例如可以实现基于线程池的快排:

template <typename T>struct sorter {    ThreadPool pool;
std::list do_sort(std::list& chunk_data) {        if (chunk_data.empty()) return chunk_data;        // 将原list分为大小两段 std::list result; result.splice(result.begin(), chunk_data, chunk_data.begin());        T const& partition_val = *result.begin();        auto divide_point = std::partition(chunk_data.begin(), chunk_data.end(),[&](T const& val) { return val < partition_val; });        // 两段分别处理 std::list new_lower_chunk;        new_lower_chunk.splice(new_lower_chunk.end(), chunk_data,chunk_data.begin(), divide_point);        auto new_lower = pool.submit(std::bind(&sorter::do_sort, thisstd::move(new_lower_chunk)));        std::list new_higher(do_sort(chunk_data)); result.splice(result.end(), new_higher); // 避免所有线程彼此等待 while (!new_lower.is_ready()) { pool.run_pending_task();        } result.splice(result.begin(), new_lower.get()); return result; }};
template <typename T>std::list parallel_quick_sort(std::list input) {    if (input.empty()) return input;    sorter s; return s.do_sort(input);}


    上述线程池仅具备一个全局的任务队列,即使使用无锁队列来优化仍然会有严重的缓存乒乓,导致性能浪费。可以为每个线程配备thread_local任务队列,仅当线程自身线程没有任务时才从全局队列领取任务。

    此外,倘若某线程自身队列为空,而另一线程的队列为满,需支持窃取任务。首先实现支持这样操作的队列,仅用锁简单实现,一端用于push/pop,另一端用于steal。

class work_stealing_queue {private:    typedef std::function<void()> data_type;    std::deque the_queue;    mutable std::mutex the_mutex;
public: work_stealing_queue() {}
work_stealing_queue(const work_stealing_queue& other) = delete; work_stealing_queue& operator=(const work_stealing_queue& other) = delete;
void push(data_type data) { std::lock_guard<std::mutex> lock(the_mutex); the_queue.push_front(std::move(data)); }
bool try_pop(data_type& res) { std::lock_guard<std::mutex> lock(the_mutex); if (the_queue.empty()) return false; res = std::move(the_queue.front()); the_queue.pop_front(); return true; }
bool try_steal(data_type& res) { std::lock_guard<std::mutex> lock(the_mutex); if (the_queue.empty()) return false; res = std::move(the_queue.back()); the_queue.pop_back(); return true; }};


    基于上面的结构,可以实现支持任务窃取的线程池:

class thread_pool {private:    typedef std::function<void()> task_type;    std::vector<std::thread> threads;    join_threads joiner;    std::atomic_bool done;    // 全局任务队列    thread_safe_queue pool_work_queue;    std::vector<std::unique_ptr> queues;    // 指向线程独有的任务队列    static thread_local work_stealing_queue* local_work_queue;    // 线程编号    static thread_local unsigned my_index;
void worker_thread(unsigned my_index_) { my_index = my_index_; local_work_queue = queues[my_index].get(); while (!done) { run_pending_task(); } }
bool pop_task_from_local_queue(task_type& task) { return local_work_queue && local_work_queue->try_pop(task); }
bool pop_task_from_pool_queue(task_type& task) { return pool_work_queue.try_pop(task); } // 遍历,偷取任务 bool pop_task_from_other_thread_queue(task_type& task) { for (unsigned i = 0; i < queues.size(); ++i) { unsigned const index = (my_index + i + 1) % queues.size(); if (queues[index]->try_steal(task)) { return true; }        } return false; }
public: thread_pool() : joiner(threads), done(false) { unsigned const thread_count = std::thread::hardware_concurrency(); try { for (unsigned i = 0; i < thread_count; ++i) { queues.push_back(std::unique_ptr( new work_stealing_queue)); threads.push_back( std::thread(&thread_pool::worker_thread, this, i)); } } catch (...) { done = true; throw; } }
~thread_pool() { done = true; }
template <class F, class... Args> auto submit(F&& f, Args&&... args) -> std::future<decltype(f(args...))> { using ReturnType = decltype(f(args...)); auto task = std::make_shared<std::packaged_task>( std::bind(std::forward(f), std::forward(args)...)); std::future result = task->get_future(); if (local_work_queue) { local_work_queue->push([task]() { (*task)(); }); } else { pool_work_queue.push([task]() { (*task)(); }); } return result; }
void run_pending_task() { task_type task; if (pop_task_from_local_queue(task) || pop_task_from_pool_queue(task) || pop_task_from_other_thread_queue(task)) { task(); } else { std::this_thread::yield(); } }};


9.2 中断线程 

    C++20中引入了能接收中断、自动join的jthread。但自己实现也不复杂。借助thread_local的interrupt_flag来辅助实现,通过interrupt成员函数来设置中断,并借此实现可中断的条件变量/future上的等待。

thread_local interrupt_flag this_thread_interrupt_flag;
class interruptible_thread { std::thread internal_thread; interrupt_flag* flag;
public: template <typename FunctionType> interruptible_thread(FunctionType f) { std::promise p; internal_thread = std::thread([f, &p] { p.set_value(&this_thread_interrupt_flag);            try{             f();            }catch(...){} }); flag = p.get_future().get(); }    // 设置中断 void interrupt() { if (flag) { flag->set(); } }};// 如果已设置中断则抛出异常void interruption_point() { if (this_thread_interrupt_flag.is_set()) { throw std::exception(); }}// 可中断的条件变量等待template <typename Lockable>void interruptible_wait(std::condition_variable_any& cv, Lockable& lk) { this_thread_interrupt_flag.wait(cv, lk);}// 可中断的future等待template <typename T, typename Lockable>void interruptible_wait(std::future& uf, Lockable& lk) { while (!this_thread_interrupt_flag.is_set()) { if (uf.wait_for(lk, 1ms) == std::future_status::ready) break; }}


    其中,interrupt_flag的实现如下,基于condition_variable_any而非普通条件变量,set时(即设置中断时)唤醒条件变量,wait时多次检查是否设置中断。

class interrupt_flag {    std::atomic<bool> flag;    std::condition_variable_any* thread_cond_any;    std::mutex set_clear_mutex;
public: interrupt_flag() : thread_cond_any(nullptr) {} void set() { flag.store(true, std::memory_order_relaxed); std::lock_guard<std::mutex> lk(set_clear_mutex); if (thread_cond_any) { thread_cond_any->notify_all(); } }
bool is_set() const { return flag.load(std::memory_order_relaxed); }
template <typename Lockable> void wait(std::condition_variable_any& cv, Lockable& lk) { struct custom_lock { interrupt_flag* self; Lockable& lk; custom_lock(interrupt_flag* self_, std::condition_variable_any& cond, Lockable& lk_) : self(self_), lk(lk_) { self->set_clear_mutex.lock(); self->thread_cond_any = &cond; } void unlock() { lk.unlock(); self->set_clear_mutex.unlock(); } void lock() { std::lock(self->set_clear_mutex, lk); } ~custom_lock() { self->thread_cond_any = nullptr; } }; custom_lock cl(this, cv, lk); interruption_point(); cv.wait(cl); interruption_point(); }};


    可以用try/catch来捕获中断,按某种方式处理然后继续执行。中断线程在实际应用中的常见场景是运行程序前开启后台任务,程序运行完退出时中断后台任务。



第10章 并行算法函数

    C++17向标准库加入了并行算法函数,在原有函数的参数列表前新增了执行策略参数。中定义了三种执行策略sequenced_policy、parallel_policy、parallel_unsequenced_policy,以及对应的传给并行算法函数的对象seq、par、par_unseq。

    不同策略会影响算法函数的复杂度、抛出异常时的行为、何时何地何种方式执行。其中seq代表顺序策略,令算法函数在发起调用的线程上执行全部操作,没有内存次序限制;par代表并行策略,内部操作可能在发起调用的线程上也可能另外创建线程执行,涉及的变量绝不能引发数据竞争;par_unseq代表非顺序并行策略,并行化最高,涉及的变量不得以任何形式同步。

    例如某网站有庞大的日志,需要逐行处理日志提炼各项信息,最后聚合结果,类似mapreduce。由于每行日志的处理都独立,只需最后总数正确,所以可以用transfrom_reduce来处理:

struct log_info {    std::string page;    time_t visit_time;    std::string browser;};
extern log_info parse_log_line(std::string const &line);
using visit_map_type = std::unordered_map<std::string, unsigned long long>;
visit_map_type count_visits_per_page( std::vector<std::string> const &log_lines) { struct combine_visits { visit_map_type operator()(visit_map_type lhs, visit_map_type rhs) const { if (lhs.size() < rhs.size()) std::swap(lhs, rhs); for (auto const &entry : rhs) { lhs[entry.first] += entry.second; } return lhs; }
visit_map_type operator()(log_info log, visit_map_type map) const { ++map[log.page]; return map; } visit_map_type operator()(visit_map_type map, log_info log) const { ++map[log.page]; return map; } visit_map_type operator()(log_info log1, log_info log2) const { visit_map_type map; ++map[log1.page]; ++map[log2.page]; return map; } };
return std::transform_reduce(std::execution::par, log_lines.begin(), log_lines.end(), visit_map_type(), combine_visits(), parse_log_line);}




第11章 多线程应用的测试和除错

    跟并发相关的错误主要分为多余的阻塞和条件竞争。多余的阻塞包括死锁、活锁(例如两自旋锁互相等待)、IO等外部阻塞。条件竞争包括数据竞争(对共享内存区域的并发访问未采取同步)、受到破坏的不变量、生存期问题。

    定位这些错误的技法包括审查代码并定位潜在错误、通过测试定位错误、设计可测试的代码、多线程测试技术(压力测试,组合模拟测试,特殊程序库)、测试多线程代码的性能。

    这里仅作简介,更详细的内容还需读原文。

EOF

你好,我是飞宇,本硕均于某中流985 CS就读,先后于百度搜索以及字节跳动电商等部门担任Linux C/C++后端研发工程师。

同时,我也是知乎博主@韩飞宇,日常分享C/C++、计算机学习经验、工作体会,欢迎点击此处查看我以前的学习笔记&经验&分享的资源。

我组建了一些社群一起交流,群里有大牛也有小白,如果你有意可以一起进群交流。

欢迎你添加我的微信,我拉你进技术交流群。此外,我也会经常在微信上分享一些计算机学习经验以及工作体验,还有一些内推机会

加个微信,打开另一扇窗

C语言与CPP编程 C语言/C++开发,C语言/C++基础知识,C语言/C++学习路线,C语言/C++进阶,数据结构;算法;python;计算机基础等
评论
  • 应用环境与极具挑战性的测试需求在服务器制造领域里,系统整合测试(System Integration Test;SIT)是确保产品质量和性能的关键步骤。随着服务器系统的复杂性不断提升,包括:多种硬件组件、操作系统、虚拟化平台以及各种应用程序和服务的整合,服务器制造商面临着更有挑战性的测试需求。这些挑战主要体现在以下五个方面:1. 硬件和软件的高度整合:现代服务器通常包括多个处理器、内存模块、储存设备和网络接口。这些硬件组件必须与操作系统及应用软件无缝整合。SIT测试可以帮助制造商确保这些不同组件
    百佳泰测试实验室 2024-12-12 17:45 106浏览
  • 铁氧体芯片是一种基于铁氧体磁性材料制成的芯片,在通信、传感器、储能等领域有着广泛的应用。铁氧体磁性材料能够通过外加磁场调控其导电性质和反射性质,因此在信号处理和传感器技术方面有着独特的优势。以下是对半导体划片机在铁氧体划切领域应用的详细阐述: 一、半导体划片机的工作原理与特点半导体划片机是一种使用刀片或通过激光等方式高精度切割被加工物的装置,是半导体后道封测中晶圆切割和WLP切割环节的关键设备。它结合了水气电、空气静压高速主轴、精密机械传动、传感器及自动化控制等先进技术,具有高精度、高
    博捷芯划片机 2024-12-12 09:16 101浏览
  • RK3506 是瑞芯微推出的MPU产品,芯片制程为22nm,定位于轻量级、低成本解决方案。该MPU具有低功耗、外设接口丰富、实时性高的特点,适合用多种工商业场景。本文将基于RK3506的设计特点,为大家分析其应用场景。RK3506核心板主要分为三个型号,各型号间的区别如下图:​图 1  RK3506核心板处理器型号场景1:显示HMIRK3506核心板显示接口支持RGB、MIPI、QSPI输出,且支持2D图形加速,轻松运行QT、LVGL等GUI,最快3S内开
    万象奥科 2024-12-11 15:42 109浏览
  • 全球智能电视时代来临这年头若是消费者想随意地从各个通路中选购电视时,不难发现目前市场上的产品都已是具有智能联网功能的智能电视了,可以宣告智能电视的普及时代已到临!Google从2021年开始大力推广Google TV(即原Android TV的升级版),其他各大品牌商也都跟进推出搭载Google TV操作系统的机种,除了Google TV外,LG、Samsung、Panasonic等大厂牌也开发出自家的智能电视平台,可以看出各家业者都一致地看好这块大饼。智能电视的Wi-Fi连线怎么消失了?智能电
    百佳泰测试实验室 2024-12-12 17:33 107浏览
  • 时源芯微——RE超标整机定位与解决详细流程一、 初步测量与问题确认使用专业的电磁辐射测量设备,对整机的辐射发射进行精确测量。确认是否存在RE超标问题,并记录超标频段和幅度。二、电缆检查与处理若存在信号电缆:步骤一:拔掉所有信号电缆,仅保留电源线,再次测量整机的辐射发射。若测量合格:判定问题出在信号电缆上,可能是电缆的共模电流导致。逐一连接信号电缆,每次连接后测量,定位具体哪根电缆或接口导致超标。对问题电缆进行处理,如加共模扼流圈、滤波器,或优化电缆布局和屏蔽。重新连接所有电缆,再次测量
    时源芯微 2024-12-11 17:11 129浏览
  • 一、SAE J1939协议概述SAE J1939协议是由美国汽车工程师协会(SAE,Society of Automotive Engineers)定义的一种用于重型车辆和工业设备中的通信协议,主要应用于车辆和设备之间的实时数据交换。J1939基于CAN(Controller Area Network)总线技术,使用29bit的扩展标识符和扩展数据帧,CAN通信速率为250Kbps,用于车载电子控制单元(ECU)之间的通信和控制。小北同学在之前也对J1939协议做过扫盲科普【科普系列】SAE J
    北汇信息 2024-12-11 15:45 128浏览
  • 首先在gitee上打个广告:ad5d2f3b647444a88b6f7f9555fd681f.mp4 · 丙丁先生/香河英茂工作室中国 - Gitee.com丙丁先生 (mr-bingding) - Gitee.com2024年对我来说是充满挑战和机遇的一年。在这一年里,我不仅进行了多个开发板的测评,还尝试了多种不同的项目和技术。今天,我想分享一下这一年的故事,希望能给大家带来一些启发和乐趣。 年初的时候,我开始对各种开发板进行测评。从STM32WBA55CG到瑞萨、平头哥和平海的开发板,我都
    丙丁先生 2024-12-11 20:14 92浏览
  • 习学习笔记&记录学习学习笔记&记录学习学习笔记&记录学习学习笔记&记录学习笔记&记录学习习笔记&记学习学习笔记&记录学习学习笔记&记录学习习笔记&记录学习学习笔记&记录学习学习笔记记录学习学习笔记&记录学习学习笔记&记录学习学习笔记&记录学习学习笔记&记录学习学习笔记&记录学习习笔记&记录学习学习笔记&记录学习学习笔记&记录学习学习笔记&记录学习学习笔记&记录学习学习笔记&记录学习学习笔记&记录学习学习笔记&学习学习笔记&记录学习学习笔记&记录学习学习笔记&记录学习学习笔记&记录学习学习笔记&记
    youyeye 2024-12-11 17:58 105浏览
  • 本文介绍瑞芯微RK3588主板/开发板Android12系统下,APK签名文件生成方法。触觉智能EVB3588开发板演示,搭载了瑞芯微RK3588芯片,该开发板是核心板加底板设计,音视频接口、通信接口等各类接口一应俱全,可帮助企业提高产品开发效率,缩短上市时间,降低成本和设计风险。工具准备下载Keytool-ImportKeyPair工具在源码:build/target/product/security/系统初始签名文件目录中,将以下三个文件拷贝出来:platform.pem;platform.
    Industio_触觉智能 2024-12-12 10:27 104浏览
  • 天问Block和Mixly是两个不同的编程工具,分别在单片机开发和教育编程领域有各自的应用。以下是对它们的详细比较: 基本定义 天问Block:天问Block是一个基于区块链技术的数字身份验证和数据交换平台。它的目标是为用户提供一个安全、去中心化、可信任的数字身份验证和数据交换解决方案。 Mixly:Mixly是一款由北京师范大学教育学部创客教育实验室开发的图形化编程软件,旨在为初学者提供一个易于学习和使用的Arduino编程环境。 主要功能 天问Block:支持STC全系列8位单片机,32位
    丙丁先生 2024-12-11 13:15 76浏览
  • 在智能化技术快速发展当下,图像数据的采集与处理逐渐成为自动驾驶、工业等领域的一项关键技术。高质量的图像数据采集与算法集成测试都是确保系统性能和可靠性的关键。随着技术的不断进步,对于图像数据的采集、处理和分析的需求日益增长,这不仅要求我们拥有高性能的相机硬件,还要求我们能够高效地集成和测试各种算法。我们探索了一种多源相机数据采集与算法集成测试方案,能够满足不同应用场景下对图像采集和算法测试的多样化需求,确保数据的准确性和算法的有效性。一、相机组成相机一般由镜头(Lens),图像传感器(Image
    康谋 2024-12-12 09:45 113浏览
  • 习学习笔记&记录学习学习笔记&记录学习学习笔记&记录学习学习笔记&记录学习笔记&记录学习习笔记&记学习学习笔记&记录学习学习笔记&记录学习习笔记&记录学习学习笔记&记录学习学习笔记记录学习学习笔记&记录学习学习笔记&记录学习学习笔记&记录学习学习笔记&记录学习学习笔记&记录学习习笔记&记录学习学习笔记&记录学习学习笔记&记录学习学习笔记&记录学习学习笔记&记录学习学习笔记&记录学习学习笔记&记录学习学习笔记&学习学习笔记&记录学习学习笔记&记录学习学习笔记&记录学习学习笔记&记录学习学习笔记&记
    youyeye 2024-12-12 10:13 70浏览
我要评论
0
点击右上角,分享到朋友圈 我知道啦
请使用浏览器分享功能 我知道啦