并行数组求和

使用cppthreadpromisefuturepackaged_taskasync等并发处理机制实现数组求和.

并发编程实现思路

将数组进行平均划分, 每个线程处理一个子数组段, 然后主线程收集子线程的结果, 实现并行数组加法.

Thread

我们将任务划分后, 对于线程求和函数传入存储子数组和的引用, 这样主线程直接能够拿到该子数组和.

子线程求和函数

1
2
3
4
void multiAdd(vector<int>& nums, int l, int r, LL& sum) {
for (int i = l; i <= r; i ++ )
sum += nums[i];
}

主线程分配任务和汇集总和

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
LL multiThread(vector<int>& nums) {
vector<thread> threads(M);
vector<LL> sum(M, 0LL);
int n = nums.size(), step = n / M + 1;

for (int i = 0; i < M; i ++ ) {
// 划分任务段
int l = (i == 0) ? 0 : i * step;
int r = min(n - 1, (i + 1) * step - 1);
// 使用 ref进行包装, 无法直接传递
threads[i] = thread(multiAdd, ref(nums), l, r, ref(sum[i]));
}

// join子线程
for (auto& t : threads)
t.join();

// 主线程统计总和
LL ans = 0;
for (auto& c : sum)
ans += c;
return ans;
}

Promise和Future

Promise可以存储一个值, 这个值可以在将来被异步的获取. 获取方式是通过promise.get_future()获取与该Promise共享状态的Future对象, 然后使用future.get()异步的阻塞等待获取其值.

子线程求和函数

1
2
3
4
5
6
void promiseAdd(vector<int>& nums, int l, int r, promise<LL>& promise) {
LL cnt = 0LL;
for (int i = l; i <= r; i ++ )
cnt += nums[i];
promise.set_value(cnt);
}

主线程分配任务和汇集总和

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
LL promiseThread(vector<int>& nums) {
vector<thread> threads(M);
vector<future<LL>> sum(M);
vector<promise<LL>> prom(M);

int n = nums.size(), step = n / M + 1;

for (int i = 0; i < M; i ++ ) {
int l = (i == 0) ? 0 : i * step;
int r = min(n - 1, (i + 1) * step - 1);
// 绑定future和promise
sum[i] = prom[i].get_future();
threads[i] = thread(promiseAdd, ref(nums), l, r, ref(prom[i]));
}

LL ans = 0;

// 异步获取future值
for (auto& f : sum)
ans += f.get();

for (auto& t : threads)
t.join();

return ans;
}

Packaged_task

packaged_task封装一个函数, 如同function<>模板类一样. 不过packaged_task可以异步的获取封装函数的返回值, 该返回值存储在future对象中.

子线程求和函数

1
2
3
4
5
6
7
LL packageTaskAdd(vector<int>& nums, int l, int r) {
LL sum = 0LL;
for (int i = l; i <= r; i ++ )
sum += nums[i];
// 直接返回
return sum;
}

主线程分配任务和汇集总和

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
LL packageTaskThread(vector<int>& nums) {
using task = packaged_task<LL(vector<int>&, int, int)>;

vector<future<LL>> sum(M);
int n = nums.size(), step = n / M + 1;

for (int i = 0; i < M; i ++ ) {
int l = (i == 0) ? 0 : i * step;
int r = min(n - 1, (i + 1) * step - 1);
// 封装任务
task curTask(packageTaskAdd);
// 执行任务
curTask(ref(nums), l, r);
// 绑定到future对象上
sum[i] = curTask.get_future();
}

LL ans = 0;
for (auto& f : sum)
ans += f.get();
return ans;
}

Async

async可以同步也可以异步的执行一个任务. 异步执行的话使用多线程进行计算. 任务的返回值存储于future对象中, 可以使用future.get()获取异步获取其值.

子线程求和函数

1
2
3
4
5
6
LL asyncAdd(vector<int>& nums, int l, int r) {
LL sum = 0;
for (int i = l; i <= r; i ++ )
sum += nums[i];
return sum;
}

主线程分配任务和汇集总和

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
LL asyncThread(vector<int>& nums) {
vector<future<LL>> sum(M);
int n = nums.size(), step = n / M + 1;

for (int i = 0; i < M; i ++ ) {
int l = (i == 0) ? 0 : i * step;
int r = min(n - 1, (i + 1) * step - 1);
// launch::async 开始线程异步执行
sum[i] = async(launch::async, asyncAdd, ref(nums), l, r);
}

LL ans = 0;
for (auto& f : sum)
ans += f.get();

return ans;
}

代码和结果

Code

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
#include <iostream>
#include <thread>
#include <vector>
#include <cstdlib>
#include <ctime>
#include <chrono>
#include <future>
#include <assert.h>

using namespace std;
using LL = long long;
const int N = 1e8;
const int M = 16;

LL nativeAdd(vector<int>& nums) {
LL ans = 0;
for (auto& c : nums)
ans += c;
return ans;
}

void multiAdd(vector<int>& nums, int l, int r, LL& sum) {
for (int i = l; i <= r; i ++ )
sum += nums[i];
}

LL multiThread(vector<int>& nums) {
vector<thread> threads(M);
vector<LL> sum(M, 0LL);
int n = nums.size(), step = n / M + 1;

for (int i = 0; i < M; i ++ ) {
int l = (i == 0) ? 0 : i * step;
int r = min(n - 1, (i + 1) * step - 1);
threads[i] = thread(multiAdd, ref(nums), l, r, ref(sum[i]));
}

for (auto& t : threads)
t.join();

LL ans = 0;
for (auto& c : sum)
ans += c;
return ans;
}

void promiseAdd(vector<int>& nums, int l, int r, promise<LL>& promise) {
LL cnt = 0LL;
for (int i = l; i <= r; i ++ )
cnt += nums[i];
promise.set_value(cnt);
}

LL promiseThread(vector<int>& nums) {
vector<thread> threads(M);
vector<future<LL>> sum(M);
vector<promise<LL>> prom(M);

int n = nums.size(), step = n / M + 1;

for (int i = 0; i < M; i ++ ) {
int l = (i == 0) ? 0 : i * step;
int r = min(n - 1, (i + 1) * step - 1);
// 绑定future和promise
sum[i] = prom[i].get_future();
threads[i] = thread(promiseAdd, ref(nums), l, r, ref(prom[i]));
}

LL ans = 0;

// 异步获取future值
for (auto& f : sum)
ans += f.get();

for (auto& t : threads)
t.join();

return ans;
}

LL packageTaskAdd(vector<int>& nums, int l, int r) {
LL sum = 0LL;
for (int i = l; i <= r; i ++ )
sum += nums[i];
return sum;
}

LL packageTaskThread(vector<int>& nums) {
using task = packaged_task<LL(vector<int>&, int, int)>;

vector<future<LL>> sum(M);
int n = nums.size(), step = n / M + 1;

for (int i = 0; i < M; i ++ ) {
int l = (i == 0) ? 0 : i * step;
int r = min(n - 1, (i + 1) * step - 1);
// 封装任务
task curTask(packageTaskAdd);
// 执行任务
curTask(ref(nums), l, r);
// 绑定到future对象上
sum[i] = curTask.get_future();
}

LL ans = 0;
for (auto& f : sum)
ans += f.get();
return ans;
}


LL asyncAdd(vector<int>& nums, int l, int r) {
LL sum = 0;
for (int i = l; i <= r; i ++ )
sum += nums[i];
return sum;
}

LL asyncThread(vector<int>& nums) {
vector<future<LL>> sum(M);
int n = nums.size(), step = n / M + 1;

for (int i = 0; i < M; i ++ ) {
int l = (i == 0) ? 0 : i * step;
int r = min(n - 1, (i + 1) * step - 1);
sum[i] = async(launch::async, asyncAdd, ref(nums), l, r);
}

LL ans = 0;
for (auto& f : sum)
ans += f.get();

return ans;
}

void naive(vector<int>& nums) {
auto start_time = chrono::system_clock::now();
LL ans1 = nativeAdd(nums);
auto end_time = chrono::system_clock::now();
chrono::milliseconds time1 = chrono::duration_cast<chrono::milliseconds>(end_time - start_time);
cout << "nativeAdd cost time = " << time1.count() << " ms, total sum = " << ans1 << '\n';
}


void threads(vector<int>& nums) {
auto start_time = chrono::system_clock::now();
LL ans2 = multiThread(nums);
auto end_time = chrono::system_clock::now();
chrono::milliseconds time2 = chrono::duration_cast<chrono::milliseconds>(end_time - start_time);
cout << "multiThread cost time = " << time2.count() << " ms, total sum = " << ans2 << '\n';

}

void promisefuture(vector<int>& nums) {
auto start_time = chrono::system_clock::now();
LL ans2 = promiseThread(nums);
auto end_time = chrono::system_clock::now();
chrono::milliseconds time2 = chrono::duration_cast<chrono::milliseconds>(end_time - start_time);
cout << "promiseThread cost time = " << time2.count() << " ms, total sum = " << ans2 << '\n';
}

void packagedtask(vector<int>& nums) {
auto start_time = chrono::system_clock::now();
LL ans2 = packageTaskThread(nums);
auto end_time = chrono::system_clock::now();
chrono::milliseconds time2 = chrono::duration_cast<chrono::milliseconds>(end_time - start_time);
cout << "packageTaskThread cost time = " << time2.count() << " ms, total sum = " << ans2 << '\n';
}


void async(vector<int>& nums) {
auto start_time = chrono::system_clock::now();
LL ans2 = asyncThread(nums);
auto end_time = chrono::system_clock::now();
chrono::milliseconds time2 = chrono::duration_cast<chrono::milliseconds>(end_time - start_time);
cout << "asyncThread cost time = " << time2.count() << " ms, total sum = " << ans2 << '\n';
}

int main() {
vector<int> nums;

srand((unsigned int)time(NULL));
for (int i = 1; i < N; i ++ )
nums.push_back(rand() % N);

naive(nums);
threads(nums);
promisefuture(nums);
packagedtask(nums);
async(nums);

return 0;
}

Result

1
2
3
4
5
nativeAdd cost time = 615 ms, total sum = 1638266934225
multiThread cost time = 372 ms, total sum = 1638266934225
promiseThread cost time = 34 ms, total sum = 1638266934225
packageTaskThread cost time = 208 ms, total sum = 1638266934225
asyncThread cost time = 39 ms, total sum = 1638266934225
作者

Jsss

发布于

2022-03-24

更新于

2022-03-27

许可协议


评论