点击上方“C语言与CPP编程”,选择“关注/置顶/星标公众号”
干货福利,第一时间送达!
最近有小伙伴说没有收到当天的文章推送,这是因为微信改了推送机制,有一部分小伙伴刷不到当天的文章,一些比较实用的知识和信息,错过了就是错过了,建议大家加个星标⭐️,就能第一时间收到推送。
小伙伴们大家好,我是飞宇。
最近李伟老师讲了《C++ Primer 5th》这本书的视频,他是美国微软高级工程师,清华大学博士,帮忙推广一下,感兴趣的可以看看。
今天继续更新《Effective C++》和《C++并发编程实战》的读书笔记,下面是已经更新过的内容:
《C++并发编程实战》读书笔记(1):并发、线程管控
《C++并发编程实战》读书笔记(2):并发操作的同步
《C++并发编程实战》读书笔记(3):内存模型和原子操作
《C++并发编程实战》读书笔记(4):设计并发数据结构
《Effective C++》读书笔记(1):让自己习惯C++
《Effective C++》读书笔记(2):构造/析构/赋值运算
《Effective C++》读书笔记(3):资源管理
《Effective C++》读书笔记(4):设计与声明
《Effective C++》读书笔记(5):实现
第8章 设计并发代码
在线程间切分任务的方法包括:1、先在线程间切分数据再处理。2、以递归的方式划分数据,例如快排中每次以某元素为界将数据集划分为大小两部分然后再递归处理。3、依据工作类别划分任务,每部分代码只承担单一的功能职责。
例如第四章中演示了使用std::async实现并行版快排,也可以自己实现:
template <typename T>
struct sorter {
struct chunk_to_sort {
std::list
data; std::promise<std::list
> promise; };
thread_safe_stack
chunks; std::vector<std::thread> threads;
unsigned const max_thread_count;
std::atomic<bool> end_of_data;
sorter()
: max_thread_count(std::thread::hardware_concurrency() - 1),
end_of_data(false) {}
~sorter() {
end_of_data = true;
for (unsigned i = 0; i < threads.size(); ++i) {
threads[i].join();
}
}
std::list
do_sort(std::list & chunk_data) { if (chunk_data.empty()) {
return chunk_data;
}
std::list
result; result.splice(result.begin(), chunk_data, chunk_data.begin());
T const& partition_val = *result.begin();
auto divide_point =
std::partition(chunk_data.begin(), chunk_data.end(),
[&](T const& val) { return val < partition_val; });
chunk_to_sort new_lower_chunk;
new_lower_chunk.data.splice(new_lower_chunk.data.end(), chunk_data,
chunk_data.begin(), divide_point);
std::future<std::list
> new_lower = new_lower_chunk.promise.get_future();
chunks.push(std::move(new_lower_chunk));
if (threads.size() < max_thread_count) {
threads.push_back(std::thread(&sorter
::sort_thread, this)); }
std::list
new_higher(do_sort(chunk_data));
result.splice(result.end(), new_higher);
while (new_lower.wait_for(std::chrono::seconds(0)) !=
std::future_status::ready) {
try_sort_chunk();
}
result.splice(result.begin(), new_lower.get());
return result;
}
void sort_thread() {
while (!end_of_data) {
try_sort_chunk();
std::this_thread::yield();
}
}
void try_sort_chunk() {
std::shared_ptr
chunk = chunks.pop(); if (chunk) {
chunk->promise.set_value(do_sort(chunk->data));
}
}
};
template <typename T>
std::list
parallel_quick_sort(std::list input) { if (input.empty()) {
return input;
}
sorter
s; return s.do_sort(input);
}
影响并发代码性能的因素包括:1、处理器的数量,C++11可以通过std::thread::hardware_concurrency获取硬件线程数量。2、数据竞争和缓存乒乓,例如循环处理原子变量或加锁时变量所含的数据在不同缓存之间多次来回传递,严重影响程序性能。3、不经意共享,多个线程访问同一个数据的不同元素时可能访问同一个缓存块,造成数据乒乓;C++17定义了std::hardware_destructive_interference_size用于表示一个字节数的限度,数据分布范围超出该值就不会有不经意共享。4、数据的紧凑程度,单个线程访问的数据可能属于不同内存块,C++17定义了std::hardward_constructive_interference_size表示同一缓存块保证容纳的最大连续字节数。5、过度任务切换和线程过饱和。
还有一些要额外考虑的因素:1、异常安全,某线程上有函数因异常而退出会导致整个程序被终结。2、可伸缩性,尽可能令并发程度达到最大并确保实际工作量足以让多个处理器有效运行。3、利用多线程掩藏等待行为,提高响应能力。
下面是标准库部分并行化实现的例子。
首先是for_each,使用RAII类join_threads,用future
class join_threads {
std::vector<std::thread>& threads_;
public:
explicit join_threads(std::vector<std::thread>& threads)
: threads_(threads) {}
~join_threads() {
for (std::size_t i = 0; i < threads_.size(); ++i) {
if (threads_[i].joinable()) {
threads_[i].join();
}
}
}
};
template <typename Iterator, typename Func>
void parallel_for_each(Iterator first, Iterator last, Func f) {
unsigned long const length = std::distance(first, last);
if (!length) return;
unsigned long const min_per_thread = 25;
unsigned long const max_threads = (length + min_per_thread - 1) / min_per_thread;
unsigned long const hardware_threads = std::thread::hardware_concurrency();
unsigned long const num_threads = std::min(hardware_threads != 0 ? hardware_threads : 2, max_threads);
unsigned long const block_size = length / num_threads;
std::vector<std::future<void> > futures(num_threads - 1);
std::vector<std::thread> threads(num_threads - 1);
join_threads joiner(threads);
Iterator block_start = first;
for (unsigned long i = 0; i < (num_threads - 1); ++i) {
Iterator block_end = block_start;
std::advance(block_end, block_size);
std::packaged_task<void(void)> task(
[=]() { std::for_each(block_start, block_end, f); });
futures[i] = task.get_future();
threads[i] = std::thread(std::move(task));
block_start = block_end;
}
std::for_each(block_start, last, f);
for (unsigned long i = 0; i < (num_threads - 1); ++i) {
futures[i].get();
}
}
// 也可以用std::async简化
template <typename Iterator, typename Func>
void parallel_for_each(Iterator first, Iterator last, Func f) {
unsigned long const length = std::distance(first, last);
if (!length) return;
unsigned long const min_per_thread = 25;
if (length < (2 * min_per_thread)) {
std::for_each(first, last, f);
} else {
Iterator const mid_point = first + length / 2;
std::future<void> first_half =
std::async(¶llel_for_each
, first, mid_point, f); parallel_for_each(mid_point, last, f);
first_half.get();
}
}
接着是find,使用std::promise在工作线程中设定最终结果,查找到后设置原子变量done_flag终止其他线程。
template <typename Iterator, typename MatchType>
Iterator parallel_find(Iterator first, Iterator last, MatchType match) {
struct find_element {
void operator()(Iterator begin, Iterator end, MatchType match,
std::promise
* result, std::atomic<bool>* done_flag) {
try {
for (; (begin != end) && !done_flag->load(); ++begin) {
if (*begin == match) {
result->set_value(begin);
done_flag->store(true);
return;
}
}
} catch (...) {
try {
result->set_exception(std::current_exception());
done_flag->store(true);
} catch (...) {
}
}
}
};
unsigned long const length = std::distance(first, last);
if (!length) return last;
unsigned long const min_per_thread = 25;
unsigned long const max_threads = (length + min_per_thread - 1) / min_per_thread;
unsigned long const hardware_threads = std::thread::hardware_concurrency();
unsigned long const num_threads = std::min(hardware_threads != 0 ? hardware_threads : 2, max_threads);
unsigned long const block_size = length / num_threads;
std::promise
result; std::atomic<bool> done_flag(false);
std::vector<std::thread> threads(num_threads - 1);
{
join_threads joiner(threads);
Iterator block_start = first;
for (unsigned long i = 0; i < (num_threads - 1); ++i) {
Iterator block_end = block_start;
std::advance(block_end, block_size);
threads[i] = std::thread(find_element(), block_start, block_end, match, &result, &done_flag);
block_start = block_end;
}
find_element()(block_start, last, match, &result, &done_flag);
}
if (!done_flag.load()) {
return last;
}
return result.get_future().get();
}
—— EOF —— 你好,我是飞宇,本硕均于某中流985 CS就读,先后于百度搜索以及字节跳动电商等部门担任Linux C/C++后端研发工程师。
同时,我也是知乎博主@韩飞宇,日常分享C/C++、计算机学习经验、工作体会,欢迎点击此处查看我以前的学习笔记&经验&分享的资源。
我组建了一些社群一起交流,群里有大牛也有小白,如果你有意可以一起进群交流。
欢迎你添加我的微信,我拉你进技术交流群。此外,我也会经常在微信上分享一些计算机学习经验以及工作体验,还有一些内推机会。
加个微信,打开另一扇窗