为什么要并发编程
进程(英语:process),是指计算机中已运行的程序。进程为曾经是分时系统的基本运作单位。在面向进程设计的系统(如早期的UNIX,Linux 2.4及更早的版本)中,进程是程序的基本执行实体; 线程(英语:thread)是操作系统能够进行运算调度的最小单位。它被包含在进程之中,是进程中的实际运作单位。 -- 维基百科
相较而言,Java自JDK 1.0就包含了多线程模型。
g++ -std=c++17 your_file.cpp -o your_program
git clone https://github.com/paulQuei/cpp-concurrency.git
具体编译器对于C++特性支持的情况请参见这里:C++ compiler support。
./make_all.sh
rew install gcc
brew insbtall tbb
export tbb_path=/usr/local/Cellar/tbb/2019_U8/
./make_all.sh
brew upgrade gcc
sudo add-apt-repository ppa:ubuntu-toolchain-r/test
sudo apt-get update
sudo apt install gcc-9 g++-9
sudo apt install ~/Downloads/libtbb2_2019~U8-1_amd64.deb
sudo apt install ~/Downloads/libtbb-dev_2019~U8-1_amd64.deb
// 01_hello_thread.cpp
#include <iostream>
#include <thread> // ①
using namespace std; // ②
void hello() { // ③
cout << "Hello World from new thread." << endl;
}
int main() {
thread t(hello); // ④
t.join(); // ⑤
return 0;
}
// 02_lambda_thread.cpp
#include <iostream>
#include <thread>
using namespace std;
int main() {
thread t([] {
cout << "Hello World from lambda thread." << endl;
});
t.join();
return 0;
}
为了减少不必要的重复,若无必要,下文中的代码将不贴出include指令以及using声明。
// 03_thread_argument.cpp
void hello(string name) {
cout << "Welcome to " << name << endl;
}
int main() {
thread t(hello, "https://paul.pub");
t.join();
return 0;
}
请思考在上面的代码示例中,thread对象在何时会销毁。
// 04_thread_self_manage.cpp
void print_time() {
auto now = chrono::system_clock::now();
auto in_time_t = chrono::system_clock::to_time_t(now);
std::stringstream ss;
ss << put_time(localtime(&in_time_t), "%Y-%m-%d %X");
cout << "now is: " << ss.str() << endl;
}
void sleep_thread() {
this_thread::sleep_for(chrono::seconds(3));
cout << "[thread-" << this_thread::get_id() << "] is waking up" << endl;
}
void loop_thread() {
for (int i = 0; i < 10; i++) {
cout << "[thread-" << this_thread::get_id() << "] print: " << i << endl;
}
}
int main() {
print_time();
thread t1(sleep_thread);
thread t2(loop_thread);
t1.join();
t2.detach();
print_time();
return 0;
}
now is: 2019-10-13 10:17:48
[thread-0x70000cdda000] print: 0
[thread-0x70000cdda000] print: 1
[thread-0x70000cdda000] print: 2
[thread-0x70000cdda000] print: 3
[thread-0x70000cdda000] print: 4
[thread-0x70000cdda000] print: 5
[thread-0x70000cdda000] print: 6
[thread-0x70000cdda000] print: 7
[thread-0x70000cdda000] print: 8
[thread-0x70000cdda000] print: 9
[thread-0x70000cd57000] is waking up
now is: 2019-10-13 10:17:51
// 05_call_once.cpp
void init() {
cout << "Initialing..." << endl;
// Do something...
}
void worker(once_flag* flag) {
call_once(*flag, init);
}
int main() {
once_flag flag;
thread t1(worker, &flag);
thread t2(worker, &flag);
thread t3(worker, &flag);
t1.join();
t2.join();
t3.join();
return 0;
}
请思考一下,为什么要在main函数中创建once_flag flag。如果是在worker函数中直接声明一个once_flag并使用行不行?为什么?
// 06_naive_multithread.cpp
static const int MAX = 10e8; // ①
static double sum = 0; // ②
void worker(int min, int max) { // ③
for (int i = min; i <= max; i++) {
sum += sqrt(i);
}
}
void serial_task(int min, int max) { // ④
auto start_time = chrono::steady_clock::now();
sum = 0;
worker(0, MAX);
auto end_time = chrono::steady_clock::now();
auto ms = chrono::duration_cast<chrono::milliseconds>(end_time - start_time).count();
cout << "Serail task finish, " << ms << " ms consumed, Result: " << sum << endl;
}
Serail task finish, 6406 ms consumed, Result: 2.10819e+13
// 06_naive_multithread.cpp
void concurrent_task(int min, int max) {
auto start_time = chrono::steady_clock::now();
unsigned concurrent_count = thread::hardware_concurrency(); // ①
cout << "hardware_concurrency: " << concurrent_count << endl;
vector<thread> threads;
min = 0;
sum = 0;
for (int t = 0; t < concurrent_count; t++) { // ②
int range = max / concurrent_count * (t + 1);
threads.push_back(thread(worker, min, range)); // ③
min = range + 1;
}
for (auto& t : threads) {
t.join(); // ④
}
auto end_time = chrono::steady_clock::now();
auto ms = chrono::duration_cast<chrono::milliseconds>(end_time - start_time).count();
cout << "Concurrent task finish, " << ms << " ms consumed, Result: " << sum << endl;
}
hardware_concurrency: 16
Concurrent task finish, 6246 ms consumed, Result: 1.78162e+12
事实上,目前大部分CPU的缓存已经不只一层。
// 07_mutex_lock.cpp
static const int MAX = 10e8;
static double sum = 0;
static mutex exclusive;
void concurrent_worker(int min, int max) {
for (int i = min; i <= max; i++) {
exclusive.lock(); // ①
sum += sqrt(i);
exclusive.unlock(); // ②
}
}
void concurrent_task(int min, int max) {
auto start_time = chrono::steady_clock::now();
unsigned concurrent_count = thread::hardware_concurrency();
cout << "hardware_concurrency: " << concurrent_count << endl;
vector<thread> threads;
min = 0;
sum = 0;
for (int t = 0; t < concurrent_count; t++) {
int range = max / concurrent_count * (t + 1);
threads.push_back(thread(concurrent_worker, min, range)); // ③
min = range + 1;
}
for (int i = 0; i < threads.size(); i++) {
threads[i].join();
}
auto end_time = chrono::steady_clock::now();
auto ms = chrono::duration_cast<chrono::milliseconds>(end_time - start_time).count();
cout << "Concurrent task finish, " << ms << " ms consumed, Result: " << sum << endl;
}
hardware_concurrency: 16
Concurrent task finish, 74232 ms consumed, Result: 2.10819e+13
// 08_improved_mutex_lock.cpp
void concurrent_worker(int min, int max) {
double tmp_sum = 0;
for (int i = min; i <= max; i++) {
tmp_sum += sqrt(i); // ①
}
exclusive.lock(); // ②
sum += tmp_sum;
exclusive.unlock();
}
hardware_concurrency: 16
Concurrent task finish, 451 ms consumed, Result: 2.10819e+13
In general, a lock should be held for only the minimum possible time needed to perform the required operations. --《C++ Concurrency in Action》
// 09_deadlock_bank_transfer.cpp
class Account {
public:
Account(string name, double money): mName(name), mMoney(money) {};
public:
void changeMoney(double amount) {
mMoney += amount;
}
string getName() {
return mName;
}
double getMoney() {
return mMoney;
}
mutex* getLock() {
return &mMoneyLock;
}
private:
string mName;
double mMoney;
mutex mMoneyLock;
};
// 09_deadlock_bank_transfer.cpp
class Bank {
public:
void addAccount(Account* account) {
mAccounts.insert(account);
}
bool transferMoney(Account* accountA, Account* accountB, double amount) {
lock_guard guardA(*accountA->getLock()); // ①
lock_guard guardB(*accountB->getLock());
if (amount > accountA->getMoney()) { // ②
return false;
}
accountA->changeMoney(-amount); // ③
accountB->changeMoney(amount);
return true;
}
double totalMoney() const {
double sum = 0;
for (auto a : mAccounts) {
sum += a->getMoney();
}
return sum;
}
private:
set<Account*> mAccounts;
};
// 09_deadlock_bank_transfer.cpp
void randomTransfer(Bank* bank, Account* accountA, Account* accountB) {
while(true) {
double randomMoney = ((double)rand() / RAND_MAX) * 100;
if (bank->transferMoney(accountA, accountB, randomMoney)) {
cout << "Transfer " << randomMoney << " from " << accountA->getName()
<< " to " << accountB->getName()
<< ", Bank totalMoney: " << bank->totalMoney() << endl;
} else {
cout << "Transfer failed, "
<< accountA->getName() << " has only $" << accountA->getMoney() << ", but "
<< randomMoney << " required" << endl;
}
}
}
// 09_deadlock_bank_transfer.cpp
int main() {
Account a("Paul", 100);
Account b("Moira", 100);
Bank aBank;
aBank.addAccount(&a);
aBank.addAccount(&b);
thread t1(randomTransfer, &aBank, &a, &b);
thread t2(randomTransfer, &aBank, &b, &a);
t1.join();
t2.join();
return 0;
}
...
Transfer 13.2901 from Paul to Moira, Bank totalMoney: 20042.6259 from Moira to Paul, Bank totalMoney: 200
Transfer failed, Moira has only $34.7581, but 66.3208 required
Transfer failed, Moira has only $34.7581, but
Transfer 93.191 from 53.9176 required
Transfer 60.6146 from Moira to Paul, Bank totalMoney: 200
Transfer 49.7304 from Moira to Paul, Bank totalMoney: 200Paul to Moira, Bank totalMoney:
Transfer failed, Moira has only $17.6041, but 18.1186 required
Transfer failed, Moira has only $17.6041, but 18.893 required
Transfer failed, Moira has only $17.6041, but 34.7078 required
Transfer failed, Moira has only $17.6041, but 33.9569 required
Transfer 12.7899 from 200
Moira to Paul, Bank totalMoney: 200
Transfer failed, Moira has only $63.9373, but 80.9038 required
Transfer 50.933 from Moira to Paul, Bank totalMoney: 200
Transfer failed, Moira has only $13.0043, but 30.2056 required
Transfer failed, Moira has only $Transfer 59.123 from Paul to Moira, Bank totalMoney: 200
Transfer 29.0486 from Paul to Moira, Bank totalMoney: 20013.0043, but 64.7307 required
// 10_improved_bank_transfer.cpp
mutex sCoutLock;
void randomTransfer(Bank* bank, Account* accountA, Account* accountB) {
while(true) {
double randomMoney = ((double)rand() / RAND_MAX) * 100;
if (bank->transferMoney(accountA, accountB, randomMoney)) {
sCoutLock.lock();
cout << "Transfer " << randomMoney << " from " << accountA->getName()
<< " to " << accountB->getName()
<< ", Bank totalMoney: " << bank->totalMoney() << endl;
sCoutLock.unlock();
} else {
sCoutLock.lock();
cout << "Transfer failed, "
<< accountA->getName() << " has only " << accountA->getMoney() << ", but "
<< randomMoney << " required" << endl;
sCoutLock.unlock();
}
}
}
请思考一下两处lock和unlock调用,并考虑为什么不在while(true)下面写一次整体的加锁和解锁。
// 10_improved_bank_transfer.cpp
bool transferMoney(Account* accountA, Account* accountB, double amount) {
lock(*accountA->getLock(), *accountB->getLock()); // ①
lock_guard lockA(*accountA->getLock(), adopt_lock); // ②
lock_guard lockB(*accountB->getLock(), adopt_lock); // ③
if (amount > accountA->getMoney()) {
return false;
}
accountA->changeMoney(-amount);
accountB->changeMoney(amount);
return true;
}
...
Transfer failed, Paul has only $1.76243, but 17.5974 required
Transfer failed, Paul has only $1.76243, but 59.2104 required
Transfer failed, Paul has only $1.76243, but 49.6379 required
Transfer failed, Paul has only $1.76243, but 63.6373 required
Transfer failed, Paul has only $1.76243, but 51.8742 required
Transfer failed, Paul has only $1.76243, but 50.0081 required
Transfer failed, Paul has only $1.76243, but 86.1041 required
Transfer failed, Paul has only $1.76243, but 51.3278 required
Transfer failed, Paul has only $1.76243, but 66.5754 required
Transfer failed, Paul has only $1.76243, but 32.1867 required
Transfer failed, Paul has only $1.76243, but 62.0039 required
Transfer failed, Paul has only $1.76243, but 98.7819 required
Transfer failed, Paul has only $1.76243, but 27.046 required
Transfer failed, Paul has only $1.76243, but 62.9155 required
Transfer 98.8478 from Moira to Paul, Bank totalMoney: 200
Transfer 80.0722 from Moira to Paul, Bank totalMoney: 200
Transfer 73.7035 from Moira to Paul, Bank totalMoney: 200
Transfer 34.4476 from Moira to Paul, Bank totalMoney: 200
Transfer failed, Moira has only $10.0142, but 61.3033 required
Transfer failed, Moira has only $10.0142, but 24.5595 required
...
// https://en.cppreference.com/w/cpp/thread/lock_guard
int g_i = 0;
std::mutex g_i_mutex; // ①
void safe_increment()
{
std::lock_guard<std::mutex> lock(g_i_mutex); // ②
++g_i;
std::cout << std::this_thread::get_id() << ": " << g_i << '\n';
// ③
}
int main()
{
std::cout << "main: " << g_i << '\n';
std::thread t1(safe_increment); // ④
std::thread t2(safe_increment);
t1.join();
t2.join();
std::cout << "main: " << g_i << '\n';
}
lock(*accountA->getLock(), *accountB->getLock());
lock_guard lockA(*accountA->getLock(), adopt_lock);
lock_guard lockB(*accountB->getLock(), adopt_lock);
unique_lock lockA(*accountA->getLock(), defer_lock);
unique_lock lockB(*accountB->getLock(), defer_lock);
lock(*accountA->getLock(), *accountB->getLock());
scoped_lock lockAll(*accountA->getLock(), *accountB->getLock());
// 11_bank_transfer_wait_notify.cpp
class Account {
public:
Account(string name, double money): mName(name), mMoney(money) {};
public:
void changeMoney(double amount) {
unique_lock lock(mMoneyLock); // ②
mConditionVar.wait(lock, [this, amount] { // ③
return mMoney + amount > 0; // ④
});
mMoney += amount;
mConditionVar.notify_all(); // ⑤
}
string getName() {
return mName;
}
double getMoney() {
return mMoney;
}
private:
string mName;
double mMoney;
mutex mMoneyLock;
condition_variable mConditionVar; // ①
};
// 11_bank_transfer_wait_notify.cpp
void Bank::transferMoney(Account* accountA, Account* accountB, double amount) {
accountA->changeMoney(-amount);
accountB->changeMoney(amount);
}
// 11_bank_transfer_wait_notify.cpp
mutex sCoutLock;
void randomTransfer(Bank* bank, Account* accountA, Account* accountB) {
while(true) {
double randomMoney = ((double)rand() / RAND_MAX) * 100;
{
lock_guard guard(sCoutLock);
cout << "Try to Transfer " << randomMoney
<< " from " << accountA->getName() << "(" << accountA->getMoney()
<< ") to " << accountB->getName() << "(" << accountB->getMoney()
<< "), Bank totalMoney: " << bank->totalMoney() << endl;
}
bank->transferMoney(accountA, accountB, randomMoney);
}
}
...
Try to Transfer 13.72 from Moira(10.9287) to Paul(189.071), Bank totalMoney: 200
Try to Transfer 28.6579 from Paul(189.071) to Moira(10.9287), Bank totalMoney: 200
Try to Transfer 91.8049 from Paul(160.413) to Moira(39.5866), Bank totalMoney: 200
Try to Transfer 5.56383 from Paul(82.3285) to Moira(117.672), Bank totalMoney: 200
Try to Transfer 11.3594 from Paul(76.7646) to Moira(123.235), Bank totalMoney: 200
Try to Transfer 16.9557 from Paul(65.4053) to Moira(134.595), Bank totalMoney: 200
Try to Transfer 74.998 from Paul(48.4495) to Moira(151.55), Bank totalMoney: 200
Try to Transfer 65.3005 from Moira(151.55) to Paul(48.4495), Bank totalMoney: 200
Try to Transfer 90.6084 from Moira(86.25) to Paul(113.75), Bank totalMoney: 125.002
Try to Transfer 99.6425 from Moira(70.6395) to Paul(129.36), Bank totalMoney: 200
Try to Transfer 55.2091 from Paul(129.36) to Moira(70.6395), Bank totalMoney: 200
Try to Transfer 92.259 from Paul(74.1513) to Moira(125.849), Bank totalMoney: 200
...
// 12_async_task.cpp
static const int MAX = 10e8;
static double sum = 0;
void worker(int min, int max) {
for (int i = min; i <= max; i++) {
sum += sqrt(i);
}
}
int main() {
sum = 0;
auto f1 = async(worker, 0, MAX);
cout << "Async task triggered" << endl;
f1.wait();
cout << "Async task finish, result: " << sum << endl << endl;
}
// 12_async_task.cpp
int main() {
double result = 0;
cout << "Async task with lambda triggered, thread: " << this_thread::get_id() << endl;
auto f2 = async(launch::async, [&result]() {
cout << "Lambda task in thread: " << this_thread::get_id() << endl;
for (int i = 0; i <= MAX; i++) {
result += sqrt(i);
}
});
f2.wait();
cout << "Async task with lambda finish, result: " << result << endl << endl;
return 0;
}
Async task with lambda triggered, thread: 0x11290d5c0
Lambda task in thread: 0x700007aa1000
Async task with lambda finish, result: 2.10819e+13
// 12_async_task.cpp
class Worker {
public:
Worker(int min, int max): mMin(min), mMax(max) {} // ①
double work() { // ②
mResult = 0;
for (int i = mMin; i <= mMax; i++) {
mResult += sqrt(i);
}
return mResult;
}
double getResult() {
return mResult;
}
private:
int mMin;
int mMax;
double mResult;
};
int main() {
Worker w(0, MAX);
cout << "Task in class triggered" << endl;
auto f3 = async(&Worker::work, &w); // ③
f3.wait();
cout << "Task in class finish, result: " << w.getResult() << endl << endl;
return 0;
}
如果你了解设计模式,你应该会知道命令模式。
// 13_packaged_task.cpp
double concurrent_worker(int min, int max) {
double sum = 0;
for (int i = min; i <= max; i++) {
sum += sqrt(i);
}
return sum;
}
double concurrent_task(int min, int max) {
vector<future<double>> results; // ①
unsigned concurrent_count = thread::hardware_concurrency();
min = 0;
for (int i = 0; i < concurrent_count; i++) { // ②
packaged_task<double(int, int)> task(concurrent_worker); // ③
results.push_back(task.get_future()); // ④
int range = max / concurrent_count * (i + 1);
thread t(std::move(task), min, range); // ⑤
t.detach();
min = range + 1;
}
cout << "threads create finish" << endl;
double sum = 0;
for (auto& r : results) {
sum += r.get(); ⑥
}
return sum;
}
int main() {
auto start_time = chrono::steady_clock::now();
double r = concurrent_task(0, MAX);
auto end_time = chrono::steady_clock::now();
auto ms = chrono::duration_cast<chrono::milliseconds>(end_time - start_time).count();
cout << "Concurrent task finish, " << ms << " ms consumed, Result: " << r << endl;
return 0;
}
// 14_promise_future.cpp
double concurrent_worker(int min, int max) {
double sum = 0;
for (int i = min; i <= max; i++) {
sum += sqrt(i);
}
return sum;
}
void concurrent_task(int min, int max, promise<double>* result) { // ①
vector<future<double>> results;
unsigned concurrent_count = thread::hardware_concurrency();
min = 0;
for (int i = 0; i < concurrent_count; i++) {
packaged_task<double(int, int)> task(concurrent_worker);
results.push_back(task.get_future());
int range = max / concurrent_count * (i + 1);
thread t(std::move(task), min, range);
t.detach();
min = range + 1;
}
cout << "threads create finish" << endl;
double sum = 0;
for (auto& r : results) {
sum += r.get();
}
result->set_value(sum); // ②
cout << "concurrent_task finish" << endl;
}
int main() {
auto start_time = chrono::steady_clock::now();
promise<double> sum; // ③
concurrent_task(0, MAX, &sum);
auto end_time = chrono::steady_clock::now();
auto ms = chrono::duration_cast<chrono::milliseconds>(end_time - start_time).count();
cout << "Concurrent task finish, " << ms << " ms consumed." << endl;
cout << "Result: " << sum.get_future().get() << endl; // ④
return 0;
}
// 15_parallel_algorithm.cpp
void generateRandomData(vector<double>& collection, int size) {
random_device rd;
mt19937 mt(rd());
uniform_real_distribution<double> dist(1.0, 100.0);
for (int i = 0; i < size; i++) {
collection.push_back(dist(mt));
}
}
int main() {
vector<double> collection;
generateRandomData(collection, 10e6); // ①
vector<double> copy1(collection); // ②
vector<double> copy2(collection);
vector<double> copy3(collection);
auto time1 = chrono::steady_clock::now(); // ③
sort(execution::seq, copy1.begin(), copy1.end()); // ④
auto time2 = chrono::steady_clock::now();
auto duration = chrono::duration_cast<chrono::milliseconds>(time2 - time1).count();
cout << "Sequenced sort consuming " << duration << "ms." << endl; // ⑤
auto time3 = chrono::steady_clock::now();
sort(execution::par, copy2.begin(),copy2.end()); // ⑥
auto time4 = chrono::steady_clock::now();
duration = chrono::duration_cast<chrono::milliseconds>(time4 - time3).count();
cout << "Parallel sort consuming " << duration << "ms." << endl;
auto time5 = chrono::steady_clock::now();
sort(execution::par_unseq, copy2.begin(),copy2.end()); // ⑦
auto time6 = chrono::steady_clock::now();
duration = chrono::duration_cast<chrono::milliseconds>(time6 - time5).count();
cout << "Parallel unsequenced sort consuming " << duration << "ms." << endl;
}
通过一个函数生成1000,000个随机数。
将数据拷贝3份,以备使用。
接下来将通过三个不同的parallel_policy参数来调用同样的sort算法。每次调用记录开始和结束的时间。
第一次调用使用std::execution::seq参数。
输出本次测试所使用的时间。
第二次调用使用std::execution::par参数。
第三次调用使用std::execution::par_unseq参数。
Sequenced sort consuming 4464ms.
Parallel sort consuming 459ms.
Parallel unsequenced sort consuming 168ms.
转自:https://paul.pub/cpp-concurrency/