Automatically Resuming a Job with Coroutines on a Separate Thread

In my last post “Starting Jobs with Coroutines“, I applied co_await to start a job. In this post, I improve the workflow and automatically resume a job if necessary. In my final step, I resume the job on a separate thread.

This is my 7th post in my mini-series about the new keywords co_return, co_yield, and co_await. To understand this practical introduction to coroutines, you should know all the previous posts:

co_return:

co_yield:

co_await:

 

Rainer D 6 P2 500x500Modernes C++ Mentoring

Be part of my mentoring programs:

  • "Fundamentals for C++ Professionals" (open)
  • "Design Patterns and Architectural Patterns with C++" (open)
  • "C++20: Get the Details" (open)
  • "Concurrency with Modern C++" (starts March 2024)
  • Do you want to stay informed: Subscribe.

     

    Automatically Resuming the Awaiter

    In the previous workflow (see Starting Jobs with Coroutines), I presented the awaiter workflow in detail, and I explicitly started the job.

    int main() {
    
        std::cout <<  "Before job" << '\n';
    
        auto job = prepareJob();
        job.start();
    
        std::cout <<  "After job" <<  '\n';
    
    }
    

    This explicit invoking of job.start() was necessary because await_ready in the Awaitable MySuspendAlways always returned false. Now let’s assume that await_ready can return true or false and the job is not explicitly started. A short reminder: When await_ready returns true, the function await_resume is directly invoked but not await_suspend.

    // startJobWithAutomaticResumption.cpp
    
    #include <coroutine>
    #include <functional>
    #include <iostream>
    #include <random>
    
    std::random_device seed;
    auto gen = std::bind_front(std::uniform_int_distribution<>(0,1),         // (1)
                               std::default_random_engine(seed()));
    
    struct MySuspendAlways {                                                 // (3)
        bool await_ready() const noexcept { 
            std::cout << "        MySuspendAlways::await_ready"  << '\n';
            return gen();
        }
        bool await_suspend(std::coroutine_handle<> handle) const noexcept {  // (5)
            std::cout << "        MySuspendAlways::await_suspend"  << '\n';
            handle.resume();                                                 // (6)
            return true;
    
        }
        void await_resume() const noexcept {                                 // (4)
            std::cout << "        MySuspendAlways::await_resume"  << '\n';
        }
    };
     
    struct Job { 
        struct promise_type;
        using handle_type = std::coroutine_handle<promise_type>;
        handle_type coro;
        Job(handle_type h): coro(h){}
        ~Job() {
            if ( coro ) coro.destroy();
        }
    
        struct promise_type {
            auto get_return_object() { 
                return Job{handle_type::from_promise(*this)};
            }
            MySuspendAlways initial_suspend() {                     // (2)
                std::cout << "    Job prepared" << '\n';
                return {}; 
            }
            std::suspend_always final_suspend() noexcept {
                std::cout << "    Job finished" << '\n'; 
                return {}; 
            }
            void return_void() {}
            void unhandled_exception() {}
        
        };
    };
     
    Job performJob() {
        co_await std::suspend_never();
    }
     
    int main() {
    
        std::cout <<  "Before jobs" << '\n';
    
        performJob();
        performJob();
        performJob();
        performJob();
    
        std::cout <<  "After jobs" <<  '\n';
    
    }
    

    First, the coroutine is now called performJob and runs automatically. gen (line 1) is a random number generator for the numbers 0 or 1. It uses the default random engine for its job, initialized with the seed. Thanks to std::bind_front, I can bind it together with the std::uniform_int_distribution to get a callable which, when used, gives me a random number 0 or 1.

    A callable is something that behaves like a function. Not only are these named functions but also function objects or lambda expressions. Read more about the new function std::bind_front in the post “More and More Utilities in C++20“.

    I removed in this example the awaitables with predefined Awaitables from the C++ standard, except the awaitable MySuspendAlways as the return type of the member function initial_suspend (line 2). await_ready (line 3) returns a boolean. When the boolean is true, the control flow jumps directly to the member function await_resume (line 4), when false, the coroutine is immediately suspended and, therefore, the function await_suspend runs (line 5). The function await_suspend gets the handle to the coroutine and uses it to resume the coroutine (line 6). Instead of returning the value true, await_suspend can also return void.

    The following screenshot shows: When await_ready returns true, the function await_resume is called, when await_ready returns false, the function await_suspend is also called.

    You can try out the program on Compiler Explorer.
    startJobWithAutomaticResumption
    Let me make the final step and automatically resume the awaiter on a separate thread.

    Automatically Resuming the Awaiter on a Separate Thread

    The following program is based on the previous program.

    // startJobWithAutomaticResumptionOnThread.cpp
    
    #include <coroutine>
    #include <functional>
    #include <iostream>
    #include <random>
    #include <thread>
    #include <vector>
    
    std::random_device seed;
    auto gen = std::bind_front(std::uniform_int_distribution<>(0,1), 
                               std::default_random_engine(seed()));
     
    struct MyAwaitable {
        std::jthread& outerThread;
        bool await_ready() const noexcept {                    
            auto res = gen();
            if (res) std::cout << " (executed)" << '\n';
            else std::cout << " (suspended)" << '\n';
            return res;                                        // (6)   
        }
        void await_suspend(std::coroutine_handle<> h) {        // (7)
            outerThread = std::jthread([h] { h.resume(); });   // (8)
        }
        void await_resume() {}
    };
    
     
    struct Job{
        static inline int JobCounter{1};
        Job() {
            ++JobCounter;
        }
        
        struct promise_type {
            int JobNumber{JobCounter};
            Job get_return_object() { return {}; }
            std::suspend_never initial_suspend() {         // (2)
                std::cout << "    Job " << JobNumber << " prepared on thread " 
                          << std::this_thread::get_id();
                return {}; 
            }
            std::suspend_never final_suspend() noexcept {  // (3)
                std::cout << "    Job " << JobNumber << " finished on thread " 
                          << std::this_thread::get_id() << '\n';
                return {}; 
            }
            void return_void() {}
            void unhandled_exception() { }
        };
    };
     
    Job performJob(std::jthread& out) {
        co_await MyAwaitable{out};                        // (1)
    }
     
    int main() {
    
        std::vector<std::jthread> threads(8);             // (4)
        for (auto& thr: threads) performJob(thr);         // (5)
    
    }
    

    The main difference with the previous program is the new awaitable MyAwaitable, used in the coroutine performJob (line 1). On the contrary, the coroutine object returned from the coroutine performJob is straightforward. Essentially, its member functions initial_suspend (line 2) and final_suspend (line 3) return the predefined awaitable std::suspend_never. Additionally, both functions show the JobNumber of the executed job and the thread ID on which it runs. The screenshot shows which coroutine runs immediately and which one is suspended. Thanks to the thread id, you can observe that suspended coroutines are resumed on a different thread.

    You can try out the program on the Wandbox.
    startJobWithAutomaticResumptionOnThread
    Let me discuss the interesting control flow of the program. Line 4 creates eight default-constructed threads, which the coroutine performJob (line 5) takes by reference. Further, the reference becomes the argument for creating MyAwaitable{out} (line 1). Depending on the value of res (line 6), and, therefore, the return value of the function await_ready, the Awaitable continues (res is true) to run or is suspended (res is false). In case MyAwaitable is suspended, the function await_suspend (line 7) is executed. Thanks to the assignment of outerThread (line 8), it becomes a running thread. The running threads must outlive the lifetime of the coroutine. For this reason, the threads have the scope of the main function.

    What’s next?

    DONE: I have written almost 100 posts about C++20. In my next post, I want to say a few concluding words about C++20 and answer the question “What’s next” regarding C++.

    Thanks a lot to my Patreon Supporters: Matt Braun, Roman Postanciuc, Tobias Zindl, G Prvulovic, Reinhold Dröge, Abernitzke, Frank Grimm, Sakib, Broeserl, António Pina, Sergey Agafyin, Андрей Бурмистров, Jake, GS, Lawton Shoemake, Jozo Leko, John Breland, Venkat Nandam, Jose Francisco, Douglas Tinkham, Kuchlong Kuchlong, Robert Blanch, Truels Wissneth, Kris Kafka, Mario Luoni, Friedrich Huber, lennonli, Pramod Tikare Muralidhara, Peter Ware, Daniel Hufschläger, Alessandro Pezzato, Bob Perry, Satish Vangipuram, Andi Ireland, Richard Ohnemus, Michael Dunsky, Leo Goodstadt, John Wiederhirn, Yacob Cohen-Arazi, Florian Tischler, Robin Furness, Michael Young, Holger Detering, Bernd Mühlhaus, Stephen Kelley, Kyle Dean, Tusar Palauri, Dmitry Farberov, Juan Dent, George Liao, Daniel Ceperley, Jon T Hess, Stephen Totten, Wolfgang Fütterer, Matthias Grün, Phillip Diekmann, Ben Atakora, Ann Shatoff, Rob North, Bhavith C Achar, Marco Parri Empoli, moon, Philipp Lenk, Hobsbawm, and Charles-Jianye Chen.

    Thanks, in particular, to Jon Hess, Lakshman, Christian Wittenhorst, Sherhy Pyton, Dendi Suhubdy, Sudhakar Belagurusamy, Richard Sargeant, Rusty Fleming, John Nebel, Mipko, Alicja Kaminska, Slavko Radman, and David Poole.

    My special thanks to Embarcadero
    My special thanks to PVS-Studio
    My special thanks to Tipi.build 
    My special thanks to Take Up Code
    My special thanks to SHAVEDYAKS

    Seminars

    I’m happy to give online seminars or face-to-face seminars worldwide. Please call me if you have any questions.

    Standard Seminars (English/German)

    Here is a compilation of my standard seminars. These seminars are only meant to give you a first orientation.

    • C++ – The Core Language
    • C++ – The Standard Library
    • C++ – Compact
    • C++11 and C++14
    • Concurrency with Modern C++
    • Design Pattern and Architectural Pattern with C++
    • Embedded Programming with Modern C++
    • Generic Programming (Templates) with C++
    • Clean Code with Modern C++
    • C++20

    Online Seminars (German)

    Contact Me

    Modernes C++ Mentoring,

     

     

    0 replies

    Leave a Reply

    Want to join the discussion?
    Feel free to contribute!

    Leave a Reply

    Your email address will not be published. Required fields are marked *