C++20: Thread Pools with cppcoro
This post is the third and final post in my miniseries to cppcoro. cppcoro is a library of coroutine abstractions from Lewis Baker. Today, I introduce thread pools.
To get the most out of this post, you should know my two previous posts to cppcoro.
- C++20: Coroutines with cppcoro: the introduction to cppcoro and the elementary coroutines task and generator
- C++20: Powerful Coroutines with cppcoro: more powerful coroutines with threads
Additionally to the cppcoro::sync_wait function, which can be used to wait until the specified Awaitable completes, cppcoro offers the quite interesting cppcoro::when_all function.
when_all
- when_all: creates an Awaitable, that waits for all its Input-Awaitables, and returns an aggregate of their individual results.
I simplified the definition of the function cpporo::when_all. The following example should help to get the first impression.
// cppcoroWhenAll.cpp #include <chrono> #include <iostream> #include <thread> #include <cppcoro/sync_wait.hpp> #include <cppcoro/task.hpp> #include <cppcoro/when_all.hpp> using namespace std::chrono_literals; cppcoro::task<std::string> getFirst() { std::this_thread::sleep_for(1s); // (3) co_return "First"; } cppcoro::task<std::string> getSecond() { std::this_thread::sleep_for(1s); // (3) co_return "Second"; } cppcoro::task<std::string> getThird() { std::this_thread::sleep_for(1s); // (3) co_return "Third"; } cppcoro::task<> runAll() { // (2) auto[fir, sec, thi] = co_await cppcoro::when_all(getFirst(), getSecond(), getThird()); std::cout << fir << " " << sec << " " << thi << std::endl; } int main() { std::cout << std::endl; auto start = std::chrono::steady_clock::now(); cppcoro::sync_wait(runAll()); // (1) std::cout << std::endl; auto end = std::chrono::high_resolution_clock::now(); std::chrono::duration<double> elapsed = end - start; // (4) std::cout << "Execution time " << elapsed.count() << " seconds." << std::endl; std::cout << std::endl; }
The top-level task cppcoro::sync_wait(runAll()) (line 1) awaits the Awaitable runAll. runAll awaits the Awaitables getFirst, getSecond, and getThird (line 2). The Awaitables runAll, getFirst, getSecond, and getThird are coroutines. Each of the get functions sleeps for one second (line 3). Three times one second makes three seconds. This is when the call cppcoro::sync_wait(runAll()) waits for the coroutines. Line 4 displays the time duration.
Now that you get the basics of the function cppcoro::when_all, let me add a thread pool.
Modernes C++ Mentoring
Do you want to stay informed: Subscribe.
static_thread_pool
- static_thead_pool: schedule work on a fixed-size pool of threads
cppcoro::static_thread_pool can be invoked with and without a number. The number stands for the number of threads that are created. If you don’t specify a number, the C++11 function std::thread::hardware_concurrency() is used. std::thread::hardware_concurrency hints at the number of hardware threads supported by your system. This may be the number of processors or cores you have.
Let me try it out. Based on the previous one, the following example executes the coroutines getFirst, getSecond, and getThird concurrently.
// cppcoroWhenAllOnThreadPool.cpp #include <chrono> #include <iostream> #include <thread> #include <cppcoro/sync_wait.hpp> #include <cppcoro/task.hpp> #include <cppcoro/static_thread_pool.hpp> #include <cppcoro/when_all.hpp> using namespace std::chrono_literals; cppcoro::task<std::string> getFirst() { std::this_thread::sleep_for(1s); co_return "First"; } cppcoro::task<std::string> getSecond() { std::this_thread::sleep_for(1s); co_return "Second"; } cppcoro::task<std::string> getThird() { std::this_thread::sleep_for(1s); co_return "Third"; } template <typename Func> cppcoro::task<std::string> runOnThreadPool(cppcoro::static_thread_pool& tp, Func func) { co_await tp.schedule(); auto res = co_await func(); co_return res; } cppcoro::task<> runAll(cppcoro::static_thread_pool& tp) { auto[fir, sec, thi] = co_await cppcoro::when_all( // (3) runOnThreadPool(tp, getFirst), runOnThreadPool(tp, getSecond), runOnThreadPool(tp, getThird)); std::cout << fir << " " << sec << " " << thi << std::endl; } int main() { std::cout << std::endl; auto start = std::chrono::steady_clock::now(); cppcoro::static_thread_pool tp; // (1) cppcoro::sync_wait(runAll(tp)); // (2) std::cout << std::endl; auto end = std::chrono::high_resolution_clock::now(); std::chrono::duration<double> elapsed = end - start; // (4) std::cout << "Execution time " << elapsed.count() << " seconds." << std::endl; std::cout << std::endl; }
Here are the crucial differences to the previous program cppcoroWhenAll.cpp. I created in line (1) a thread pool tp and used it as an argument for the function runAll(tp) (line 2). The function runAll uses the thread pool to start the coroutines concurrently. Thanks to structured binding (line 3), the values of each coroutine can be easily aggregated and assigned to a variable. In the end, the main function takes one instead of three seconds.
Maybe you know that we get with C++20 latches and barriers. Latches and barriers are thread synchronization mechanisms that enable some threads to block until a counter becomes zero. cppcoro also supports latches and barriers.
async_latch
- async_latch: allows coroutines to wait until a counter becomes zero asynchronously
The following program cppcoroLatch.cpp shows thread synchronization with a cppcoro::async_latch.
// cppcoroLatch.cpp #include <chrono> #include <iostream> #include <future> #include <cppcoro/sync_wait.hpp> #include <cppcoro/async_latch.hpp> #include <cppcoro/task.hpp> using namespace std::chrono_literals; cppcoro::task<> waitFor(cppcoro::async_latch& latch) { std::cout << "Before co_await" << std::endl; co_await latch; // (3) std::cout << "After co_await" << std::endl; } int main() { std::cout << std::endl; cppcoro::async_latch latch(3); // (1) // (2) auto waiter = std::async([&latch]{ cppcoro::sync_wait(waitFor(latch)); }); auto counter1 = std::async([&latch] { // (2) std::this_thread::sleep_for(2s); std::cout << "counter1: latch.count_down() " << std::endl; latch.count_down(); }); auto counter2 = std::async([&latch] { // (2) std::this_thread::sleep_for(1s); std::cout << "counter2: latch.count_down(2) " << std::endl; latch.count_down(2); }); waiter.get(), counter1.get(), counter2.get(); std::cout << std::endl; }
I create the cppcoro::asynch_latch in line (1) and initialize the counter to 3. This time, I use std::async (line 2) to run the three coroutines concurrently. Each std::async call gets the latch per reference. The waitFor coroutine waits in line 3 until the counter becomes zero. The coroutine counter1 sleeps for 2 seconds before it counts down by 1. In contrast, the counter2 sleeps for 1 second and counts down by 2. The screenshot shows the interleaving of the threads.
What’s next?
So far, I have written about three of the big four of C++20: concepts, ranges, and coroutines. Modules are still missing in my tour through the big four and are the topic of my next posts.
By the way, if anyone wants to write a post on a C++20 feature I will write about, please get in touch with me. I’m happy to publish and translate it into English/German if necessary.
Thanks a lot to my Patreon Supporters: Matt Braun, Roman Postanciuc, Tobias Zindl, G Prvulovic, Reinhold Dröge, Abernitzke, Frank Grimm, Sakib, Broeserl, António Pina, Sergey Agafyin, Андрей Бурмистров, Jake, GS, Lawton Shoemake, Jozo Leko, John Breland, Venkat Nandam, Jose Francisco, Douglas Tinkham, Kuchlong Kuchlong, Robert Blanch, Truels Wissneth, Mario Luoni, Friedrich Huber, lennonli, Pramod Tikare Muralidhara, Peter Ware, Daniel Hufschläger, Alessandro Pezzato, Bob Perry, Satish Vangipuram, Andi Ireland, Richard Ohnemus, Michael Dunsky, Leo Goodstadt, John Wiederhirn, Yacob Cohen-Arazi, Florian Tischler, Robin Furness, Michael Young, Holger Detering, Bernd Mühlhaus, Stephen Kelley, Kyle Dean, Tusar Palauri, Juan Dent, George Liao, Daniel Ceperley, Jon T Hess, Stephen Totten, Wolfgang Fütterer, Matthias Grün, Phillip Diekmann, Ben Atakora, Ann Shatoff, Rob North, Bhavith C Achar, Marco Parri Empoli, Philipp Lenk, Charles-Jianye Chen, Keith Jeffery,and Matt Godbolt.
Thanks, in particular, to Jon Hess, Lakshman, Christian Wittenhorst, Sherhy Pyton, Dendi Suhubdy, Sudhakar Belagurusamy, Richard Sargeant, Rusty Fleming, John Nebel, Mipko, Alicja Kaminska, Slavko Radman, and David Poole.
My special thanks to Embarcadero | |
My special thanks to PVS-Studio | |
My special thanks to Tipi.build | |
My special thanks to Take Up Code | |
My special thanks to SHAVEDYAKS |
Modernes C++ GmbH
Modernes C++ Mentoring (English)
Rainer Grimm
Yalovastraße 20
72108 Rottenburg
Mail: schulung@ModernesCpp.de
Mentoring: www.ModernesCpp.org
Modernes C++ Mentoring,
Leave a Reply
Want to join the discussion?Feel free to contribute!