C++20: Thread Pools with cppcoro

Contents[Show]

This post is the third and final post in my miniseries to cppcoro. cppcoro is a library of coroutine abstractions from Lewis Baker. Today, I introduce thread pools.

 TimelineCpp20

To get the most out of this post, you should know my two previous posts to cppcoro.

Additionally to the cppcoro::sync_wait function which can be used to wait until the specified Awaitable completes, cppcoro offers the quite interesting cppcoro::when_all function.

when_all

  • when_all: creates an Awaitable, that waits for all its Input-Awaitables, and returns an aggregate of their individual results.

I simplified the definition of the function cpporo::when_all. The following example should help to get the first impression.

 

// cppcoroWhenAll.cpp

#include <chrono>
#include <iostream>
#include <thread>

#include <cppcoro/sync_wait.hpp>
#include <cppcoro/task.hpp>
#include <cppcoro/when_all.hpp>

using namespace std::chrono_literals;

cppcoro::task<std::string> getFirst() {
    std::this_thread::sleep_for(1s);                       // (3)
    co_return "First";
}

cppcoro::task<std::string> getSecond() {
     std::this_thread::sleep_for(1s);                      // (3)
    co_return "Second";
}

cppcoro::task<std::string> getThird() {
     std::this_thread::sleep_for(1s);                     // (3)
    co_return "Third";
}


cppcoro::task<> runAll() {
                                                          // (2)
    auto[fir, sec, thi] = co_await cppcoro::when_all(getFirst(), getSecond(), getThird());
    
    std::cout << fir << " " << sec << " " << thi << std::endl;
    
}

int main() {
    
    std::cout << std::endl;
    
    auto start = std::chrono::steady_clock::now();
    
    cppcoro::sync_wait(runAll());                          // (1)
    
    std::cout << std::endl;
    
    auto end = std::chrono::high_resolution_clock::now();
    std::chrono::duration<double> elapsed = end - start;   // (4)
    std::cout << "Execution time " << elapsed.count() << " seconds." << std::endl;
    
    std::cout << std::endl;

}

The top-level task cppcoro::sync_wait(runAll()) (line 1) awaits the Awaitable runAll. runAll awaits the Awaitables getFirst, getSecond, and getThird (line 2). The Awaitables runAll, getFirst, getSecond, and getThird are coroutines. Each of the get functions sleeps for one second (line 3). Three times one second makes three seconds. This is the time the call cppcoro::sync_wait(runAll()) waits for the coroutines. Line 4 displays the time duration.

cppcoroWhenAll

Now, that you get the basics of the function cppcoro::when_all, let me add threads pools to it.

static_thread_pool

  • static_thead_pool: schedule work on a fixed-size pool of threads

cppcoro::static_thread_pool can be invoked with and without a number. The number stands for the number of threads that are created. If you don't specify a number, the C++11 function std::thread::hardware_concurrency() is used. std::thread::hardware_concurrency gives you a hint for the number of hardware threads supported by your system. This may be the number of processors or cores you have.

Let me try it out. The following example based on the previous one, executes the coroutines getFirst, getSecond, and getThird concurrently.

 

// cppcoroWhenAllOnThreadPool.cpp

#include <chrono>
#include <iostream>
#include <thread>

#include <cppcoro/sync_wait.hpp>
#include <cppcoro/task.hpp>
#include <cppcoro/static_thread_pool.hpp>
#include <cppcoro/when_all.hpp>


using namespace std::chrono_literals;

cppcoro::task<std::string> getFirst() {
    std::this_thread::sleep_for(1s);
    co_return "First";
}

cppcoro::task<std::string> getSecond() {
    std::this_thread::sleep_for(1s);
    co_return "Second";
}

cppcoro::task<std::string> getThird() {
    std::this_thread::sleep_for(1s);
    co_return "Third";
}

template <typename Func>
cppcoro::task<std::string> runOnThreadPool(cppcoro::static_thread_pool& tp, Func func) {
    co_await tp.schedule();
    auto res = co_await func();
    co_return res;
}

cppcoro::task<> runAll(cppcoro::static_thread_pool& tp) {
    
    auto[fir, sec, thi] = co_await cppcoro::when_all(    // (3)
        runOnThreadPool(tp, getFirst),
        runOnThreadPool(tp, getSecond), 
        runOnThreadPool(tp, getThird));
    
    std::cout << fir << " " << sec << " " << thi << std::endl;
    
}
    
int main() {
    
    std::cout << std::endl;
    
    auto start = std::chrono::steady_clock::now();

    cppcoro::static_thread_pool tp;                         // (1)
    cppcoro::sync_wait(runAll(tp));                         // (2)
    
    std::cout << std::endl;
    
    auto end = std::chrono::high_resolution_clock::now();
    std::chrono::duration<double> elapsed = end - start;    // (4)
    std::cout << "Execution time " << elapsed.count() << " seconds." << std::endl;
    
    std::cout << std::endl;

}

Here are the crucial differences to the previous program cppcoroWhenAll.cpp. I created in line (1) a thread pool tp and used it as an argument for the function runAll(tp) (line 2). The function runAll uses the thread pool to start the coroutines concurrently. Thanks to structured binding (line 3), the values of each coroutine can be easily aggregated and assigned to a variable. On the end, the main function takes one instead of three seconds.

 cppcoroWhenAllOnThreadPool

Maybe you know, that we get with C++20 latches and barriers. Latches and barriers are thread synchronization mechanisms that enable some threads to block until a counter becomes zero. cppcoro also supports latches and barriers.

async_latch

  • async_latch: allows coroutines to asynchronously wait until a counter becomes zero

The following program cppcoroLatch.cpp shows thread synchronization with a cppcoro::async_latch.

 

// cppcoroLatch.cpp

#include <chrono>
#include <iostream>
#include <future>

#include <cppcoro/sync_wait.hpp>
#include <cppcoro/async_latch.hpp>
#include <cppcoro/task.hpp>

using namespace std::chrono_literals; 

cppcoro::task<> waitFor(cppcoro::async_latch& latch) {
    std::cout << "Before co_await" << std::endl;
    co_await latch;                              // (3)
    std::cout << "After co_await" << std::endl;
}

int main() {
    
    std::cout << std::endl;

    cppcoro::async_latch latch(3);              // (1)

// (2)
auto waiter = std::async([&latch]{ cppcoro::sync_wait(waitFor(latch)); }); auto counter1 = std::async([&latch] { // (2) std::this_thread::sleep_for(2s); std::cout << "counter1: latch.count_down() " << std::endl; latch.count_down(); }); auto counter2 = std::async([&latch] { // (2) std::this_thread::sleep_for(1s); std::cout << "counter2: latch.count_down(2) " << std::endl; latch.count_down(2); }); waiter.get(), counter1.get(), counter2.get(); std::cout << std::endl; }

 

I create the cppcoro::asynch_latch in line (1) and initialize the counter to 3. This time, I use std::async (line 2) to run the three coroutines concurrently. Each std::async call gets the latch per reference. The waitFor coroutine waits in line 3 until the counter becomes zero. The coroutine counter1 sleeps for 2 seconds before it counts down by 1. In contrast, the counter2 sleeps for 1 second and counts down by 2. The screenshot shows the interleaving of the threads.

cppcoroLatch

What's next?

So far, I have written about three of the big four of C++20: concepts, ranges, and coroutines. Modules are still missing in my tour through the big four and are the topic of my next posts.

By the way, if anyone wants to write a post to a C++20 feature I'm going to write about, please contact me. I'm happy to publish it and translate it into English/German if necessary.

 

 

Thanks a lot to my Patreon Supporters: Matt Braun, Roman Postanciuc, Tobias Zindl, Marko, G Prvulovic, Reinhold Dröge, Abernitzke, Frank Grimm, Sakib, Broeserl, António Pina, Darshan Mody, Sergey Agafyin, Андрей Бурмистров, Jake, GS, Lawton Shoemake, Animus24, Jozo Leko, John Breland, espkk, Wolfgang Gärtner,  Louis St-Amour, Stephan Roslen, Venkat Nandam, Jose Francisco, Douglas Tinkham, Kuchlong Kuchlong, Avi Kohn, Robert Blanch, Truels Wissneth, Kris Kafka, Mario Luoni, Neil Wang, Friedrich Huber, Sudhakar Balagurusamy, lennonli, and Pramod Tikare Muralidhara.

 

Thanks in particular to Jon Hess, Lakshman, Christian Wittenhorst, Sherhy Pyton, and Dendi Suhubdy

 

Seminars

I'm happy to give online-seminars or face-to-face seminars world-wide. Please call me if you have any questions.

Bookable (Online)

Deutsch

English

Standard Seminars 

Here is a compilation of my standard seminars. These seminars are only meant to give you a first orientation.

New

Contact Me

Modernes C++,

RainerGrimmSmall

My Newest E-Books

Course: Modern C++ Concurrency in Practice

Course: C++ Standard Library including C++14 & C++17

Course: Embedded Programming with Modern C++

Course: Generic Programming (Templates)

Course: C++ Fundamentals for Professionals

Subscribe to the newsletter (+ pdf bundle)

Blog archive

Source Code

Visitors

Today 2099

Yesterday 7515

Week 33710

Month 192141

All 5061455

Currently are 193 guests and no members online

Kubik-Rubik Joomla! Extensions

Latest comments