C++20: Thread Pools with cppcoro

This post is the third and final post in my miniseries to cppcoro. cppcoro is a library of coroutine abstractions from Lewis Baker. Today, I introduce thread pools.

To get the most out of this post, you should know my two previous posts to cppcoro.

Additionally to the cppcoro::sync_wait function, which can be used to wait until the specified Awaitable completes, cppcoro offers the quite interesting cppcoro::when_all function.

when_all

  • when_all: creates an Awaitable, that waits for all its Input-Awaitables, and returns an aggregate of their individual results.

I simplified the definition of the function cpporo::when_all. The following example should help to get the first impression.

 

Rainer D 6 P2 500x500Modernes C++ Mentoring

Be part of my mentoring programs:

  • "Fundamentals for C++ Professionals" (open)
  • "Design Patterns and Architectural Patterns with C++" (open)
  • "C++20: Get the Details" (open)
  • "Concurrency with Modern C++" (starts March 2024)
  • Do you want to stay informed: Subscribe.

     

    // cppcoroWhenAll.cpp
    
    #include <chrono>
    #include <iostream>
    #include <thread>
    
    #include <cppcoro/sync_wait.hpp>
    #include <cppcoro/task.hpp>
    #include <cppcoro/when_all.hpp>
    
    using namespace std::chrono_literals;
    
    cppcoro::task<std::string> getFirst() {
        std::this_thread::sleep_for(1s);                       // (3)
        co_return "First";
    }
    
    cppcoro::task<std::string> getSecond() {
         std::this_thread::sleep_for(1s);                      // (3)
        co_return "Second";
    }
    
    cppcoro::task<std::string> getThird() {
         std::this_thread::sleep_for(1s);                     // (3)
        co_return "Third";
    }
    
    
    cppcoro::task<> runAll() {
                                                              // (2)
        auto[fir, sec, thi] = co_await cppcoro::when_all(getFirst(), getSecond(), getThird());
        
        std::cout << fir << " " << sec << " " << thi << std::endl;
        
    }
    
    int main() {
        
        std::cout << std::endl;
        
        auto start = std::chrono::steady_clock::now();
        
        cppcoro::sync_wait(runAll());                          // (1)
        
        std::cout << std::endl;
        
        auto end = std::chrono::high_resolution_clock::now();
        std::chrono::duration<double> elapsed = end - start;   // (4)
        std::cout << "Execution time " << elapsed.count() << " seconds." << std::endl;
        
        std::cout << std::endl;
    
    }
    

    The top-level task cppcoro::sync_wait(runAll()) (line 1) awaits the Awaitable runAll. runAll awaits the Awaitables getFirst, getSecond, and getThird (line 2). The Awaitables runAll, getFirst, getSecond, and getThird are coroutines. Each of the get functions sleeps for one second (line 3). Three times one second makes three seconds. This is when the call cppcoro::sync_wait(runAll()) waits for the coroutines. Line 4 displays the time duration.

    cppcoroWhenAll

    Now that you get the basics of the function cppcoro::when_all, let me add a thread pool.

    static_thread_pool

    • static_thead_pool: schedule work on a fixed-size pool of threads

    cppcoro::static_thread_pool can be invoked with and without a number. The number stands for the number of threads that are created. If you don’t specify a number, the C++11 function std::thread::hardware_concurrency() is used. std::thread::hardware_concurrency hints at the number of hardware threads supported by your system. This may be the number of processors or cores you have.

    Let me try it out. Based on the previous one, the following example executes the coroutines getFirst, getSecond, and getThird concurrently.

    // cppcoroWhenAllOnThreadPool.cpp
    
    #include <chrono>
    #include <iostream>
    #include <thread>
    
    #include <cppcoro/sync_wait.hpp>
    #include <cppcoro/task.hpp>
    #include <cppcoro/static_thread_pool.hpp>
    #include <cppcoro/when_all.hpp>
    
    
    using namespace std::chrono_literals;
    
    cppcoro::task<std::string> getFirst() {
        std::this_thread::sleep_for(1s);
        co_return "First";
    }
    
    cppcoro::task<std::string> getSecond() {
        std::this_thread::sleep_for(1s);
        co_return "Second";
    }
    
    cppcoro::task<std::string> getThird() {
        std::this_thread::sleep_for(1s);
        co_return "Third";
    }
    
    template <typename Func>
    cppcoro::task<std::string> runOnThreadPool(cppcoro::static_thread_pool& tp, Func func) {
        co_await tp.schedule();
        auto res = co_await func();
        co_return res;
    }
    
    cppcoro::task<> runAll(cppcoro::static_thread_pool& tp) {
        
        auto[fir, sec, thi] = co_await cppcoro::when_all(    // (3)
            runOnThreadPool(tp, getFirst),
            runOnThreadPool(tp, getSecond), 
            runOnThreadPool(tp, getThird));
        
        std::cout << fir << " " << sec << " " << thi << std::endl;
        
    }
        
    int main() {
        
        std::cout << std::endl;
        
        auto start = std::chrono::steady_clock::now();
    
        cppcoro::static_thread_pool tp;                         // (1)
        cppcoro::sync_wait(runAll(tp));                         // (2)
        
        std::cout << std::endl;
        
        auto end = std::chrono::high_resolution_clock::now();
        std::chrono::duration<double> elapsed = end - start;    // (4)
        std::cout << "Execution time " << elapsed.count() << " seconds." << std::endl;
        
        std::cout << std::endl;
    
    }
    

    Here are the crucial differences to the previous program cppcoroWhenAll.cpp. I created in line (1) a thread pool tp and used it as an argument for the function runAll(tp) (line 2). The function runAll uses the thread pool to start the coroutines concurrently. Thanks to structured binding (line 3), the values of each coroutine can be easily aggregated and assigned to a variable. In the end, the main function takes one instead of three seconds.

    cppcoroWhenAllOnThreadPool

    Maybe you know that we get with C++20 latches and barriers. Latches and barriers are thread synchronization mechanisms that enable some threads to block until a counter becomes zero. cppcoro also supports latches and barriers.

    async_latch

    • async_latch: allows coroutines to wait until a counter becomes zero asynchronously

    The following program cppcoroLatch.cpp shows thread synchronization with a cppcoro::async_latch.

    // cppcoroLatch.cpp
    
    #include <chrono>
    #include <iostream>
    #include <future>
    
    #include <cppcoro/sync_wait.hpp>
    #include <cppcoro/async_latch.hpp>
    #include <cppcoro/task.hpp>
    
    using namespace std::chrono_literals; 
    
    cppcoro::task<> waitFor(cppcoro::async_latch& latch) {
        std::cout << "Before co_await" << std::endl;
        co_await latch;                              // (3)
        std::cout << "After co_await" << std::endl;
    }
    
    int main() {
        
        std::cout << std::endl;
    
        cppcoro::async_latch latch(3);              // (1)
    
                                                    // (2)
        auto waiter = std::async([&latch]{ cppcoro::sync_wait(waitFor(latch)); }); 
    
        auto counter1 = std::async([&latch] {       // (2)
            std::this_thread::sleep_for(2s);
            std::cout << "counter1: latch.count_down() " << std::endl;
            latch.count_down();
        });
            
        auto counter2 = std::async([&latch] {      // (2)
            std::this_thread::sleep_for(1s);
            std::cout << "counter2: latch.count_down(2) " << std::endl;
            latch.count_down(2);
        });
    
        waiter.get(), counter1.get(), counter2.get();
        
        std::cout << std::endl;
    
    }
    

    I create the cppcoro::asynch_latch in line (1) and initialize the counter to 3. This time, I use std::async (line 2) to run the three coroutines concurrently. Each std::async call gets the latch per reference. The waitFor coroutine waits in line 3 until the counter becomes zero. The coroutine counter1 sleeps for 2 seconds before it counts down by 1. In contrast, the counter2 sleeps for 1 second and counts down by 2. The screenshot shows the interleaving of the threads.

    cppcoroLatch

    What’s next?

    So far, I have written about three of the big four of C++20: concepts, ranges, and coroutines. Modules are still missing in my tour through the big four and are the topic of my next posts.

    By the way, if anyone wants to write a post on a C++20 feature I will write about, please get in touch with me. I’m happy to publish and translate it into English/German if necessary.

    Thanks a lot to my Patreon Supporters: Matt Braun, Roman Postanciuc, Tobias Zindl, G Prvulovic, Reinhold Dröge, Abernitzke, Frank Grimm, Sakib, Broeserl, António Pina, Sergey Agafyin, Андрей Бурмистров, Jake, GS, Lawton Shoemake, Jozo Leko, John Breland, Venkat Nandam, Jose Francisco, Douglas Tinkham, Kuchlong Kuchlong, Robert Blanch, Truels Wissneth, Kris Kafka, Mario Luoni, Friedrich Huber, lennonli, Pramod Tikare Muralidhara, Peter Ware, Daniel Hufschläger, Alessandro Pezzato, Bob Perry, Satish Vangipuram, Andi Ireland, Richard Ohnemus, Michael Dunsky, Leo Goodstadt, John Wiederhirn, Yacob Cohen-Arazi, Florian Tischler, Robin Furness, Michael Young, Holger Detering, Bernd Mühlhaus, Stephen Kelley, Kyle Dean, Tusar Palauri, Dmitry Farberov, Juan Dent, George Liao, Daniel Ceperley, Jon T Hess, Stephen Totten, Wolfgang Fütterer, Matthias Grün, Phillip Diekmann, Ben Atakora, Ann Shatoff, Rob North, Bhavith C Achar, Marco Parri Empoli, moon, Philipp Lenk, Hobsbawm, and Charles-Jianye Chen.

    Thanks, in particular, to Jon Hess, Lakshman, Christian Wittenhorst, Sherhy Pyton, Dendi Suhubdy, Sudhakar Belagurusamy, Richard Sargeant, Rusty Fleming, John Nebel, Mipko, Alicja Kaminska, Slavko Radman, and David Poole.

    My special thanks to Embarcadero
    My special thanks to PVS-Studio
    My special thanks to Tipi.build 
    My special thanks to Take Up Code
    My special thanks to SHAVEDYAKS

    Seminars

    I’m happy to give online seminars or face-to-face seminars worldwide. Please call me if you have any questions.

    Standard Seminars (English/German)

    Here is a compilation of my standard seminars. These seminars are only meant to give you a first orientation.

    • C++ – The Core Language
    • C++ – The Standard Library
    • C++ – Compact
    • C++11 and C++14
    • Concurrency with Modern C++
    • Design Pattern and Architectural Pattern with C++
    • Embedded Programming with Modern C++
    • Generic Programming (Templates) with C++
    • Clean Code with Modern C++
    • C++20

    Online Seminars (German)

    Contact Me

    Modernes C++ Mentoring,

     

     

    0 replies

    Leave a Reply

    Want to join the discussion?
    Feel free to contribute!

    Leave a Reply

    Your email address will not be published. Required fields are marked *