std::execution: Inclusive Scan
Inclusive scan solves problems related to range queries, such as calculating the sum of a range of elements in an array. It is also used in range minimum queries and various other algorithms.
Before I present the asynchronous inclusive scan, I introduce the inclusive scan, aka prefix sum.
Prefix Sum
In computer science, the prefix sum, cumulative sum, inclusive scan, or simply scan of a sequence of numbers x0, x1, x2, … is a second sequence of numbers y0, y1, y2, …, the sums of prefixes (running totals) of the input sequence:
y0 = x0 y1 = x0 + x1 y2 = x0 + x1+ x2 ...
(https://en.wikipedia.org/wiki/Prefix_sum)
Proposal P2300R10 has an asynchronous implementation of inclusive scan.
Asynchronous Inclusive Scan
using namespace std::execution; sender auto async_inclusive_scan(scheduler auto sch, // 2 std::span<const double> input, // 1 std::span<double> output, // 1 double init, // 1 std::size_t tile_count) // 3 { std::size_t const tile_size = (input.size() + tile_count - 1) / tile_count; std::vector<double> partials(tile_count + 1); // 4 partials[0] = init; // 4 return just(std::move(partials)) // 5 | continues_on(sch) | bulk(tile_count, // 6 [ = ](std::size_t i, std::vector<double>& partials) { // 7 auto start = i * tile_size; // 8 auto end = std::min(input.size(), (i + 1) * tile_size); // 8 partials[i + 1] = *--std::inclusive_scan(begin(input) + start, // 9 begin(input) + end, // 9 begin(output) + start); // 9 }) // 10 | then( // 11 [](std::vector<double>&& partials) { std::inclusive_scan(begin(partials), end(partials), // 12 begin(partials)); // 12 return std::move(partials); // 13 }) | bulk(tile_count, // 14 [ = ](std::size_t i, std::vector<double>& partials) { // 14 auto start = i * tile_size; // 14 auto end = std::min(input.size(), (i + 1) * tile_size); // 14 std::for_each(begin(output) + start, begin(output) + end, // 14 [&] (double& e) { e = partials[i] + e; } // 14 ); }) | then( // 15 [ = ](std::vector<double>&& partials) { // 15 return output; // 15 }); // 15 }
Here’s the explanation from the proposal P2300R10:
- It scans a sequence of
double
s (represented as thestd::span<const double>
input
) and stores the result in another sequence ofdouble
s (represented asstd::span<double>
output
). - It takes a scheduler, which specifies what execution resource the scan should be launched on.
- It also takes a
tile_count
parameter that controls the number of execution agents that will be spawned. - First we need to allocate temporary storage needed for the algorithm, which we’ll do with a
std::vector
,partials
. We need onedouble
of temporary storage for each execution agent we create. - Next we’ll create our initial sender with
execution::just
andexecution::continues_on
. These senders will send the temporary storage, which we’ve moved into the sender. The sender has a completion scheduler ofsch
, which means the next item in the chain will usesch
. - Senders and sender adaptors support composition via
operator|
, similar to C++ ranges. We’ll useoperator|
to attach the next piece of work, which will spawntile_count
execution agents usingexecution::bulk
s. - Each agent will call a
std::invocable
, passing it two arguments. The first is the agent’s index (i
) in the execution::bulk operation, in this case a unique integer in[0, tile_count)
. The second argument is what the input sender sent – the temporary storage. - We start by computing the start and end of the range of input and output elements that this agent is responsible for, based on our agent index.
- Then we do a sequential
std::inclusive_scan
over our elements. We store the scan result for our last element, which is the sum of all of our elements, in our temporary storagepartials
. - After all computation in that initial
bulk
pass has completed, every one of the spawned execution agents will have written the sum of its elements into its slot inpartials
. - Now we need to scan all of the values in
partials
. We’ll do that with a single execution agent which will execute after theexecution::bulk
completes. We create that execution agent withexecution::then
. execution::then
takes an input sender and anstd::invocable
and calls thestd::invocable
with the value sent by the input sender. Inside ourstd::invocable
, we callstd::inclusive_scan
onpartials
, which the input senders will send to us.- Then we return
partials
, which the next phase will need. - Finally we do another
execution::bulk
of the same shape as before. In thisexecution::bulk,
we will use the scanned values inpartials
to integrate the sums from other tiles into our elements, completing the inclusive scan. async_inclusive_scan
returns a sender that sends the outputstd::span<double>
. A consumer of the algorithm can chain additional work that uses the scan result. At the point at whichasync_inclusive_scan
returns, the computation may not have completed. In fact, it may not have even started.
Sender
bulk(input, shape, call)
: Returns a sender which describes the callablecall
invoked oninput
according toshape.
continues_on(input, scheduler
): Returns a sender describing the transition from the execution agent of theinput
sender to the execution agent of the targetscheduler
.just(values)
: Returns a sender with no completion schedulers, which sends the providedvalues
.just
is a sender factory.then(input, call)
:then
returns a sender describing the continuation of the task of theinput
sender on an added node of invoking the provided functioncall
.
Would it not be nice to see this program in action? Currently (December 2024), no compiler supports std::execution
or the concepts sender
and scheduler
.
Here comes the reference implementation stdexec to our rescue:
I changed the data type of the processed elements from double
to int
.
Modernes C++ Mentoring
Do you want to stay informed: Subscribe.
// inclusiveScanExecution.cpp #include <algorithm> #include <exec/static_thread_pool.hpp> #include <iostream> #include <numeric> #include <span> #include <stdexec/execution.hpp> #include <vector> auto async_inclusive_scan(auto sch, // 2 std::span<const int> input, // 1 std::span<int> output, // 1 int init, // 1 std::size_t tile_count) // 3 { std::size_t const tile_size = (input.size() + tile_count - 1) / tile_count; std::vector<int> partials(tile_count + 1); // 4 partials[0] = init; // 4 return stdexec::just(std::move(partials)) // 5 | stdexec::continues_on(sch) | stdexec::bulk(tile_count, // 6 [=](std::size_t i, std::vector<int> &partials) { // 7 auto start = i * tile_size; // 8 auto end = std::min(input.size(), (i + 1) * tile_size); // 8 partials[i + 1] = *--std::inclusive_scan(begin(input) + start, // 9 begin(input) + end, // 9 begin(output) + start); // 9 }) // 10 | stdexec::then( // 11 [](std::vector<int> &&partials) { std::inclusive_scan(begin(partials), end(partials), // 12 begin(partials)); // 12 return std::move(partials); // 13 }) | stdexec::bulk( tile_count, // 14 [=](std::size_t i, std::vector<int> &partials) { // 14 auto start = i * tile_size; // 14 auto end = std::min(input.size(), (i + 1) * tile_size); // 14 std::for_each(begin(output) + start, begin(output) + end, // 14 [&](int &e) { e = partials[i] + e; } // 14 ); }) | stdexec::then( // 15 [=](std::vector<int> &&partials) { // 15 return output; // 15 }); // 15 } int main() { std::cout << '\n'; std::vector<int> input(30); std::iota(begin(input), end(input), 0); for (auto e : input) { std::cout << e << ' '; } std::cout << '\n'; std::vector<int> output(input.size()); exec::static_thread_pool pool(8); auto sch = pool.get_scheduler(); auto [out] = stdexec::sync_wait(async_inclusive_scan(sch, input, output, 0, 4)) .value(); for (auto e : out) { std::cout << e << ' '; } std::cout << '\n'; }
Here is an explanation of the main program:
In the main
a std::vector<int>
input
is then created with 30 elements. The std::iota
function fills the input
vector with sequential integers starting from 0. The program then prints the contents of the input
vector to the console.
Next, a std::vector<int>
output
is created with the same size as the input
vector to store the results of the inclusive scan operation. The exec::static_thread_pool
pool
has 8 threads, which will be used to execute tasks concurrently. The get_scheduler
member function of the thread pool creates a scheduler object sch
.
The async_inclusive_scan
function takes the scheduler sch
, the input
vector, the output
vector, an initial value of 0, and a tile count of 4. This function performs the inclusive scan operation asynchronously using the specified scheduler and returns a future-like object. The stdexec::sync_wait
function synchronously wait for the completion of the async_inclusive_scan
operation, and the result is unpacked into the variable out
.
Finally, the program prints the contents of the out
vector to the console:
What’s Next?
In my next blog post, I will step one step back and explain the composition of senders via operator |
.
Thanks a lot to my Patreon Supporters: Matt Braun, Roman Postanciuc, Tobias Zindl, G Prvulovic, Reinhold Dröge, Abernitzke, Frank Grimm, Sakib, Broeserl, António Pina, Sergey Agafyin, Андрей Бурмистров, Jake, GS, Lawton Shoemake, Jozo Leko, John Breland, Venkat Nandam, Jose Francisco, Douglas Tinkham, Kuchlong Kuchlong, Robert Blanch, Truels Wissneth, Mario Luoni, Friedrich Huber, lennonli, Pramod Tikare Muralidhara, Peter Ware, Daniel Hufschläger, Alessandro Pezzato, Bob Perry, Satish Vangipuram, Andi Ireland, Richard Ohnemus, Michael Dunsky, Leo Goodstadt, John Wiederhirn, Yacob Cohen-Arazi, Florian Tischler, Robin Furness, Michael Young, Holger Detering, Bernd Mühlhaus, Stephen Kelley, Kyle Dean, Tusar Palauri, Juan Dent, George Liao, Daniel Ceperley, Jon T Hess, Stephen Totten, Wolfgang Fütterer, Matthias Grün, Phillip Diekmann, Ben Atakora, Ann Shatoff, Rob North, Bhavith C Achar, Marco Parri Empoli, Philipp Lenk, Charles-Jianye Chen, Keith Jeffery, Matt Godbolt, and Honey Sukesan.
Thanks, in particular, to Jon Hess, Lakshman, Christian Wittenhorst, Sherhy Pyton, Dendi Suhubdy, Sudhakar Belagurusamy, Richard Sargeant, Rusty Fleming, John Nebel, Mipko, Alicja Kaminska, Slavko Radman, and David Poole.
My special thanks to Embarcadero | |
My special thanks to PVS-Studio | |
My special thanks to Tipi.build | |
My special thanks to Take Up Code | |
My special thanks to SHAVEDYAKS |
Modernes C++ GmbH
Modernes C++ Mentoring (English)
Rainer Grimm
Yalovastraße 20
72108 Rottenburg
Mail: schulung@ModernesCpp.de
Mentoring: www.ModernesCpp.org
Modernes C++ Mentoring,