GCC Code Coverage Report


Directory: ./
File: libs/capy/include/boost/capy/when_all.hpp
Date: 2026-01-23 03:27:33
Exec Total Coverage
Lines: 82 100 82.0%
Functions: 24 319 7.5%
Branches: 18 80 22.5%

Line Branch Exec Source
1 //
2 // Copyright (c) 2026 Steve Gerbino
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/capy
8 //
9
10 #ifndef BOOST_CAPY_WHEN_ALL_HPP
11 #define BOOST_CAPY_WHEN_ALL_HPP
12
13 #include <boost/capy/detail/config.hpp>
14 #include <boost/capy/concept/executor.hpp>
15 #include <boost/capy/io_awaitable.hpp>
16 #include <boost/capy/coro.hpp>
17 #include <boost/capy/ex/executor_ref.hpp>
18 #include <boost/capy/ex/frame_allocator.hpp>
19 #include <boost/capy/task.hpp>
20
21 #include <array>
22 #include <atomic>
23 #include <exception>
24 #include <optional>
25 #include <stop_token>
26 #include <tuple>
27 #include <type_traits>
28 #include <utility>
29
30 namespace boost {
31 namespace capy {
32
33 namespace detail {
34
35 /** Type trait to filter void types from a tuple.
36
37 Void-returning tasks do not contribute a value to the result tuple.
38 This trait computes the filtered result type.
39
40 Example: filter_void_tuple_t<int, void, string> = tuple<int, string>
41 */
42 template<typename T>
43 using wrap_non_void_t = std::conditional_t<std::is_void_v<T>, std::tuple<>, std::tuple<T>>;
44
45 template<typename... Ts>
46 using filter_void_tuple_t = decltype(std::tuple_cat(std::declval<wrap_non_void_t<Ts>>()...));
47
48 /** Holds the result of a single task within when_all.
49 */
50 template<typename T>
51 struct result_holder
52 {
53 std::optional<T> value_;
54
55 2 void set(T v)
56 {
57 2 value_ = std::move(v);
58 2 }
59
60 2 T get() &&
61 {
62 2 return std::move(*value_);
63 }
64 };
65
66 /** Specialization for void tasks - no value storage needed.
67 */
68 template<>
69 struct result_holder<void>
70 {
71 };
72
73 /** Shared state for when_all operation.
74
75 @tparam Ts The result types of the tasks.
76 */
77 template<typename... Ts>
78 struct when_all_state
79 {
80 static constexpr std::size_t task_count = sizeof...(Ts);
81
82 // Completion tracking - when_all waits for all children
83 std::atomic<std::size_t> remaining_count_;
84
85 // Result storage in input order
86 std::tuple<result_holder<Ts>...> results_;
87
88 // Runner handles - destroyed in await_resume while allocator is valid
89 std::array<coro, task_count> runner_handles_{};
90
91 // Exception storage - first error wins, others discarded
92 std::atomic<bool> has_exception_{false};
93 std::exception_ptr first_exception_;
94
95 // Stop propagation - on error, request stop for siblings
96 std::stop_source stop_source_;
97
98 // Connects parent's stop_token to our stop_source
99 struct stop_callback_fn
100 {
101 std::stop_source* source_;
102 void operator()() const { source_->request_stop(); }
103 };
104 using stop_callback_t = std::stop_callback<stop_callback_fn>;
105 std::optional<stop_callback_t> parent_stop_callback_;
106
107 // Parent resumption
108 coro continuation_;
109 executor_ref caller_ex_;
110
111 2 when_all_state()
112
1/1
✓ Branch 5 taken 1 times.
2 : remaining_count_(task_count)
113 {
114 2 }
115
116 2 ~when_all_state()
117 {
118
2/2
✓ Branch 0 taken 1 times.
✓ Branch 1 taken 1 times.
4 for(auto h : runner_handles_)
119
1/2
✓ Branch 1 taken 1 times.
✗ Branch 2 not taken.
2 if(h)
120 2 h.destroy();
121 2 }
122
123 /** Capture an exception (first one wins).
124 */
125 void capture_exception(std::exception_ptr ep)
126 {
127 bool expected = false;
128 if(has_exception_.compare_exchange_strong(
129 expected, true, std::memory_order_relaxed))
130 first_exception_ = ep;
131 }
132
133 /** Signal that a task has completed.
134
135 The last child to complete triggers resumption of the parent.
136 */
137 2 coro signal_completion()
138 {
139 2 auto remaining = remaining_count_.fetch_sub(1, std::memory_order_acq_rel);
140
1/2
✓ Branch 0 taken 1 times.
✗ Branch 1 not taken.
2 if(remaining == 1)
141 2 return caller_ex_.dispatch(continuation_);
142 return std::noop_coroutine();
143 }
144
145 };
146
147 /** Wrapper coroutine that intercepts task completion.
148
149 This runner awaits its assigned task and stores the result in
150 the shared state, or captures the exception and requests stop.
151 */
152 template<typename T, typename... Ts>
153 struct when_all_runner
154 {
155 struct promise_type // : frame_allocating_base // DISABLED FOR TESTING
156 {
157 when_all_state<Ts...>* state_ = nullptr;
158 executor_ref ex_;
159 std::stop_token stop_token_;
160
161 2 when_all_runner get_return_object()
162 {
163 2 return when_all_runner(std::coroutine_handle<promise_type>::from_promise(*this));
164 }
165
166 2 std::suspend_always initial_suspend() noexcept
167 {
168 2 return {};
169 }
170
171 2 auto final_suspend() noexcept
172 {
173 struct awaiter
174 {
175 promise_type* p_;
176
177 1 bool await_ready() const noexcept
178 {
179 1 return false;
180 }
181
182 1 coro await_suspend(coro) noexcept
183 {
184 // Signal completion; last task resumes parent
185 1 return p_->state_->signal_completion();
186 }
187
188 void await_resume() const noexcept
189 {
190 }
191 };
192 2 return awaiter{this};
193 }
194
195 2 void return_void()
196 {
197 2 }
198
199 void unhandled_exception()
200 {
201 state_->capture_exception(std::current_exception());
202 // Request stop for sibling tasks
203 state_->stop_source_.request_stop();
204 }
205
206 template<class Awaitable>
207 struct transform_awaiter
208 {
209 std::decay_t<Awaitable> a_;
210 promise_type* p_;
211
212 2 bool await_ready()
213 {
214 2 return a_.await_ready();
215 }
216
217 2 auto await_resume()
218 {
219 2 return a_.await_resume();
220 }
221
222 template<class Promise>
223 2 auto await_suspend(std::coroutine_handle<Promise> h)
224 {
225
1/1
✓ Branch 3 taken 1 times.
2 return a_.await_suspend(h, p_->ex_, p_->stop_token_);
226 }
227 };
228
229 template<class Awaitable>
230 2 auto await_transform(Awaitable&& a)
231 {
232 using A = std::decay_t<Awaitable>;
233 if constexpr (IoAwaitable<A>)
234 {
235 return transform_awaiter<Awaitable>{
236 4 std::forward<Awaitable>(a), this};
237 }
238 else
239 {
240 static_assert(sizeof(A) == 0, "requires IoAwaitable");
241 }
242 2 }
243 };
244
245 std::coroutine_handle<promise_type> h_;
246
247 2 explicit when_all_runner(std::coroutine_handle<promise_type> h)
248 2 : h_(h)
249 {
250 2 }
251
252 // Enable move for all clang versions - some versions need it
253 when_all_runner(when_all_runner&& other) noexcept : h_(std::exchange(other.h_, nullptr)) {}
254
255 // Non-copyable
256 when_all_runner(when_all_runner const&) = delete;
257 when_all_runner& operator=(when_all_runner const&) = delete;
258 when_all_runner& operator=(when_all_runner&&) = delete;
259
260 2 auto release() noexcept
261 {
262 2 return std::exchange(h_, nullptr);
263 }
264 };
265
266 /** Create a runner coroutine for a single task.
267
268 Task is passed directly to ensure proper coroutine frame storage.
269 */
270 template<std::size_t Index, typename T, typename... Ts>
271 when_all_runner<T, Ts...>
272
1/1
✓ Branch 1 taken 1 times.
2 make_when_all_runner(task<T> inner, when_all_state<Ts...>* state)
273 {
274 if constexpr (std::is_void_v<T>)
275 {
276 co_await std::move(inner);
277 }
278 else
279 {
280 std::get<Index>(state->results_).set(co_await std::move(inner));
281 }
282 4 }
283
284 /** Internal awaitable that launches all runner coroutines and waits.
285
286 This awaitable is used inside the when_all coroutine to handle
287 the concurrent execution of child tasks.
288 */
289 template<typename... Ts>
290 class when_all_launcher
291 {
292 std::tuple<task<Ts>...>* tasks_;
293 when_all_state<Ts...>* state_;
294
295 public:
296 2 when_all_launcher(
297 std::tuple<task<Ts>...>* tasks,
298 when_all_state<Ts...>* state)
299 2 : tasks_(tasks)
300 2 , state_(state)
301 {
302 2 }
303
304 2 bool await_ready() const noexcept
305 {
306 2 return sizeof...(Ts) == 0;
307 }
308
309 2 coro await_suspend(coro continuation, executor_ref caller_ex, std::stop_token parent_token = {})
310 {
311 2 state_->continuation_ = continuation;
312 2 state_->caller_ex_ = caller_ex;
313
314 // Forward parent's stop requests to children
315
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 1 times.
2 if(parent_token.stop_possible())
316 {
317 state_->parent_stop_callback_.emplace(
318 parent_token,
319 typename when_all_state<Ts...>::stop_callback_fn{&state_->stop_source_});
320
321 if(parent_token.stop_requested())
322 state_->stop_source_.request_stop();
323 }
324
325 // Launch all tasks concurrently
326 2 auto token = state_->stop_source_.get_token();
327 2 [&]<std::size_t... Is>(std::index_sequence<Is...>) {
328
1/28
✗ Branch 2 not taken.
✗ Branch 6 not taken.
✗ Branch 10 not taken.
✗ Branch 20 not taken.
✗ Branch 24 not taken.
✗ Branch 28 not taken.
✗ Branch 38 not taken.
✗ Branch 42 not taken.
✗ Branch 46 not taken.
✗ Branch 50 not taken.
✗ Branch 54 not taken.
✗ Branch 58 not taken.
✗ Branch 62 not taken.
✗ Branch 66 not taken.
✗ Branch 86 not taken.
✗ Branch 90 not taken.
✗ Branch 94 not taken.
✗ Branch 104 not taken.
✗ Branch 108 not taken.
✗ Branch 116 not taken.
✗ Branch 120 not taken.
✗ Branch 128 not taken.
✗ Branch 132 not taken.
✗ Branch 136 not taken.
✗ Branch 146 not taken.
✗ Branch 150 not taken.
✗ Branch 154 not taken.
✓ Branch 164 taken 1 times.
1 (..., launch_one<Is>(caller_ex, token));
329
1/1
✓ Branch 1 taken 1 times.
2 }(std::index_sequence_for<Ts...>{});
330
331 // Let signal_completion() handle resumption
332 4 return std::noop_coroutine();
333 2 }
334
335 2 void await_resume() const noexcept
336 {
337 // Results are extracted by the when_all coroutine from state
338 2 }
339
340 private:
341 template<std::size_t I>
342 2 void launch_one(executor_ref caller_ex, std::stop_token token)
343 {
344
1/1
✓ Branch 2 taken 1 times.
2 auto runner = make_when_all_runner<I>(
345 2 std::move(std::get<I>(*tasks_)), state_);
346
347 2 auto h = runner.release();
348 2 h.promise().state_ = state_;
349 2 h.promise().ex_ = caller_ex;
350 2 h.promise().stop_token_ = token;
351
352 2 coro ch{h};
353 2 state_->runner_handles_[I] = ch;
354
2/2
✓ Branch 1 taken 1 times.
✓ Branch 4 taken 1 times.
2 state_->caller_ex_.dispatch(ch).resume();
355 2 }
356 };
357
358 /** Compute the result type for when_all.
359
360 Returns void when all tasks are void (P2300 aligned),
361 otherwise returns a tuple with void types filtered out.
362 */
363 template<typename... Ts>
364 using when_all_result_t = std::conditional_t<
365 std::is_same_v<filter_void_tuple_t<Ts...>, std::tuple<>>,
366 void,
367 filter_void_tuple_t<Ts...>>;
368
369 /** Helper to extract a single result, returning empty tuple for void.
370 This is a separate function to work around a GCC-11 ICE that occurs
371 when using nested immediately-invoked lambdas with pack expansion.
372 */
373 template<std::size_t I, typename... Ts>
374 2 auto extract_single_result(when_all_state<Ts...>& state)
375 {
376 using T = std::tuple_element_t<I, std::tuple<Ts...>>;
377 if constexpr (std::is_void_v<T>)
378 return std::tuple<>();
379 else
380
1/1
✓ Branch 4 taken 1 times.
2 return std::make_tuple(std::move(std::get<I>(state.results_)).get());
381 }
382
383 /** Extract results from state, filtering void types.
384 */
385 template<typename... Ts>
386 2 auto extract_results(when_all_state<Ts...>& state)
387 {
388 3 return [&]<std::size_t... Is>(std::index_sequence<Is...>) {
389
2/30
✗ Branch 1 not taken.
✗ Branch 5 not taken.
✗ Branch 8 not taken.
✗ Branch 11 not taken.
✗ Branch 14 not taken.
✗ Branch 17 not taken.
✗ Branch 20 not taken.
✗ Branch 29 not taken.
✗ Branch 32 not taken.
✗ Branch 35 not taken.
✗ Branch 38 not taken.
✗ Branch 41 not taken.
✗ Branch 44 not taken.
✗ Branch 47 not taken.
✗ Branch 50 not taken.
✗ Branch 53 not taken.
✗ Branch 57 not taken.
✗ Branch 60 not taken.
✗ Branch 63 not taken.
✗ Branch 66 not taken.
✗ Branch 69 not taken.
✗ Branch 73 not taken.
✗ Branch 76 not taken.
✗ Branch 79 not taken.
✗ Branch 84 not taken.
✗ Branch 87 not taken.
✗ Branch 90 not taken.
✗ Branch 93 not taken.
✓ Branch 96 taken 1 times.
✓ Branch 99 taken 1 times.
1 return std::tuple_cat(extract_single_result<Is>(state)...);
390
1/1
✓ Branch 1 taken 1 times.
4 }(std::index_sequence_for<Ts...>{});
391 }
392
393 } // namespace detail
394
395 /** Wait for all tasks to complete concurrently.
396
397 @par Example
398 @code
399 task<void> example() {
400 auto [a, b] = co_await when_all(
401 fetch_int(), // task<int>
402 fetch_string() // task<std::string>
403 );
404 }
405 @endcode
406
407 @param tasks The tasks to execute concurrently.
408 @return A task yielding a tuple of results (void types filtered out).
409
410 Key features:
411 @li All child tasks are launched concurrently
412 @li Results are collected in input order
413 @li First error is captured; subsequent errors are discarded
414 @li On error, stop is requested for all siblings
415 @li Completes only after all children have completed
416 @li Void tasks do not contribute to the result tuple
417 @li Properly propagates frame allocators to all child coroutines
418 */
419 template<typename... Ts>
420 [[nodiscard]] task<detail::when_all_result_t<Ts...>>
421
1/1
✓ Branch 1 taken 1 times.
2 when_all(task<Ts>... tasks)
422 {
423 using result_type = detail::when_all_result_t<Ts...>;
424
425 // State is stored in the coroutine frame, using the frame allocator
426 detail::when_all_state<Ts...> state;
427
428 // Store tasks in the frame
429 std::tuple<task<Ts>...> task_tuple(std::move(tasks)...);
430
431 // Launch all tasks and wait for completion
432 co_await detail::when_all_launcher<Ts...>(&task_tuple, &state);
433
434 // Propagate first exception if any.
435 // Safe without explicit acquire: capture_exception() is sequenced-before
436 // signal_completion()'s acq_rel fetch_sub, which synchronizes-with the
437 // last task's decrement that resumes this coroutine.
438 if(state.first_exception_)
439 std::rethrow_exception(state.first_exception_);
440
441 // Extract and return results
442 if constexpr (std::is_void_v<result_type>)
443 co_return;
444 else
445 co_return detail::extract_results(state);
446 4 }
447
448 // For backwards compatibility and type queries, expose result type computation
449 template<typename... Ts>
450 using when_all_result_type = detail::when_all_result_t<Ts...>;
451
452 } // namespace capy
453 } // namespace boost
454
455 #endif
456