最近在看现代C++白皮书,看到C++17引入了并行STL。
先说自己探索的结论,目前的gcc/g++只需要在编译是添加参数 -D_GLIBCXX_PARALLEL -fopenmp 即可达到并行化的效果,不需要对源码进行修改。
网上已有的方法,需要添加头文件#include <execution>
,还需要形如 sort(execution::par, begin(d), end(d));
一样设置并行化算法。
#include <algorithm>
#include <execution>
#include <iostream>
#include <random>
#include <chrono>
using namespace std;
using namespace chrono;
int main() {
vector<long long> d1(30000000);
vector<long long> d2(30000000);
mt19937 gen;
uniform_int_distribution<long long> dis(0, 100000000);
auto rand_num([=]() mutable { return dis(gen); });
generate(execution::par, begin(d1), end(d1), rand_num);
d2 = d1;
auto start_t = high_resolution_clock::now();
sort(begin(d1), end(d1));
auto end_t = high_resolution_clock::now();
auto duration = duration_cast<nanoseconds>(end_t - start_t);
cout << "The run time is: " << double(duration.count()) * nanoseconds::period::num / nanoseconds::period::den << "s" << endl;
start_t = high_resolution_clock::now();
sort(execution::par, begin(d2), end(d2));
end_t = high_resolution_clock::now();
duration = duration_cast<nanoseconds>(end_t - start_t);
cout << "The run time is: " << double(duration.count()) * nanoseconds::period::num / nanoseconds::period::den << "s" << endl;
return 0;
}
我在windows的环境中找不到execution头文件,因此我先在linux(gcc 10.2.0)上运行了下该代码
┌──(kali㉿kali)-[~/testCpp]
└─$ g++ par.cpp -std=c++17
┌──(kali㉿kali)-[~/testCpp]
└─$ ./a.out
The run time is: 10.6875s
The run time is: 11.6801s
在这个代码的基础上,我想加上-D_GLIBCXX_PARALLEL -fopenmp
看看有啥效果。
┌──(kali㉿kali)-[~/testCpp]
└─$ g++ par.cpp -std=c++17 -D_GLIBCXX_PARALLEL -fopenmp
┌──(kali㉿kali)-[~/testCpp]
└─$ ./a.out
The run time is: 2.51153s
The run time is: 2.63921s
结果两个排序的效果都得到了明显提高。
不过,可以看到两种sort的速度没啥区别,然后我把使用到execution的部分给注释掉了,只对d1
进行排序,然后发现仅添加-D_GLIBCXX_PARALLEL -fopenmp
参数,linux和windows(gcc 8.1.0)上都能跑,并且都能达到加速的效果。
为了验证并行化STL算法的正确性,我使用了算法库中的多种函数来进行测试,测试代码如下:
#include <algorithm>
//#include <execution>
#include <iostream>
#include <random>
#include <chrono>
#include <vector>
#include <numeric> // iota
using namespace std;
using namespace chrono;
int main() {
vector<long long> d1(300000000);
iota(d1.rbegin(), d1.rend(), 0);
auto start_t = high_resolution_clock::now();
sort(begin(d1), end(d1));
auto end_t = high_resolution_clock::now();
auto duration = duration_cast<nanoseconds>(end_t - start_t);
cout << "Sort costs: " << double(duration.count()) / nanoseconds::period::den << "s" << endl;
start_t = high_resolution_clock::now();
auto res1 = binary_search(begin(d1), end(d1), 0x123456);
end_t = high_resolution_clock::now();
duration = duration_cast<nanoseconds>(end_t - start_t);
cout << "Binary_search costs: " << double(duration.count()) << "ns, " << "result is " << res1 << endl;
start_t = high_resolution_clock::now();
auto res2 = lower_bound(begin(d1), end(d1), 0x123456);
end_t = high_resolution_clock::now();
duration = duration_cast<nanoseconds>(end_t - start_t);
cout << "Lower_bound costs: " << double(duration.count()) << "ns, " << "result is " << *res2 << endl;
start_t = high_resolution_clock::now();
auto res3 = find(begin(d1), end(d1), 0x123456);
end_t = high_resolution_clock::now();
duration = duration_cast<nanoseconds>(end_t - start_t);
cout << "Find costs: " << double(duration.count()) / nanoseconds::period::den << "s, " << "result is " << *res3 << endl;
start_t = high_resolution_clock::now();
auto res4 = count(begin(d1), end(d1), 0x123456);
end_t = high_resolution_clock::now();
duration = duration_cast<nanoseconds>(end_t - start_t);
cout << "Count costs: " << double(duration.count()) / nanoseconds::period::den << "s, " << "result is " << res4 << endl;
start_t = high_resolution_clock::now();
nth_element(begin(d1), d1.begin() + 0xff, end(d1));
end_t = high_resolution_clock::now();
duration = duration_cast<nanoseconds>(end_t - start_t);
cout << "Nth_element costs: " << double(duration.count()) / nanoseconds::period::den << "s, " << "nth_element is " << d1[0xff - 1] << endl;
start_t = high_resolution_clock::now();
auto res5 = max_element(begin(d1), end(d1));
end_t = high_resolution_clock::now();
duration = duration_cast<nanoseconds>(end_t - start_t);
cout << "Max_element costs: " << double(duration.count()) / nanoseconds::period::den << "s, " << "result is " << *res5 << endl;
return 0;
}
上述代码在linux和windows环境都能正常运行,命令行参数也相同。
下面是在windows平台(gcc 8.1.0, AMD4600U 6cores 12threads)进行测试的结果。
不加参数-D_GLIBCXX_PARALLEL -fopenmp
输出如下:
PS D:\VSworkspace\a_code> cd "d:\VSworkspace\a_code\" ; if ($?) { g++ par_stl.cpp -o par_stl } ; if ($?) { .\par_stl }
Sort costs: 45.8743s
Binary_search costs: 0ns, result is 1
Lower_bound costs: 0ns, result is 1193046
Find costs: 0.0045747s, result is 1193046
Count costs: 2.125s, result is 1
Nth_element costs: 4.10857s, nth_element is 254
Max_element costs: 2.47174s, result is 299999999
加上之后输出如下:
PS D:\VSworkspace\a_code> g++ .\par_stl.cpp -D_GLIBCXX_PARALLEL -fopenmp
PS D:\VSworkspace\a_code> .\a.exe
Sort costs: 8.54622s
Binary_search costs: 0ns, result is 1
Lower_bound costs: 0ns, result is 1193046
Find costs: 0.0030668s, result is 1193046
Count costs: 1.10674s, result is 1
Nth_element costs: 1.27597s, nth_element is 254
Max_element costs: 0.874685s, result is 299999999
可以发现以上的算法,执行效率都有几倍的提高,输出的结果都相同。
总结
目前execution头文件好像用处不大,也不用指定-std=c++17
,仅添加参数-D_GLIBCXX_PARALLEL -fopenmp
,即可达到并行化STL加速的效果,加速效果还挺不错的。