C++20: Thread Synchronization with Coroutines

Contents[Show]

It's a typical requirement for thread management to synchronize them. One thread prepares, in this case, a work-package another thread is waiting for. 

 

TimelineCpp20

I assume most of you use condition variables for this sender/receiver or producer/consumer workflow. Condition variables have many inherent risks, such as spurious wakeup and lost wakeup. Before I implement thread synchronization with coroutines, let me rephrase from a previous post about the inherent challenges of condition variables.

The Challenges of Condition Variables 

Here is the pattern for the correct usage of condition variables.

 

// conditionVariables.cpp

#include <condition_variable>
#include <iostream>
#include <thread>

std::mutex mutex_;
std::condition_variable condVar; 

bool dataReady{false};

void waitingForWork(){
    std::cout << "Waiting " << std::endl;
    std::unique_lock<std::mutex> lck(mutex_);
    condVar.wait(lck, []{ return dataReady; });   // (4)
    std::cout << "Running " << std::endl;
}

void setDataReady(){
    {
        std::lock_guard<std::mutex> lck(mutex_);
        dataReady = true;
    }
    std::cout << "Data prepared" << std::endl;
    condVar.notify_one();                        // (3)
}

int main(){
    
  std::cout << std::endl;

  std::thread t1(waitingForWork);               // (1)
  std::thread t2(setDataReady);                 // (2)

  t1.join();
  t2.join();
  
  std::cout << std::endl;
  
}

 

How does the synchronization work? The program has two child threads: t1 and t2. They get their work package waitingForWork and setDataRead in lines (1 and 2). setDataReady notifies - using the condition variable condVar - that it is done with preparing the work: condVar.notify_one()(line 3). While holding the lock, thread t1 waits for its notification: condVar.wait(lck, []{ return dataReady; })( line 4). The sender and receiver need a lock. In the case of the sender a std::lock_guard is sufficient because it calls lock and unlock only once. In the case of the receiver, a std::unique_lock is necessary because it usually frequently locks and unlocks its mutex.

Finally, the output of the program.

 conditionVariable

Maybe you are wondering why you need a predicate for the wait call because you can invoke wait without a predicate. This workflow seems quite too complicated for such a simple synchronization of threads. 

Now we are back to the missing memory condition variables and the two phenomena called lost wakeup and spurious wakeup.

Lost Wakeup and Spurious Wakeup

  • Lost wakeup: The sender sends its notification before the receiver is in the wait state. The consequence is that the notification is lost. 
  • Spurious wakeup: The receiver may wake up although no notification has happened.

To become not the victim of these two issues, you have to use an additional predicate as memory. If not, you have, in this example, a 50/50 chance of a lost wakeup. A lost wakeup is essentially a deadlock because a thread waits for something that never happens. 

This is not the end of the traps you can fall into with condition variables. Read the details in the previous post: C++ Core Guidelines: Be Aware of the Traps of Condition Variables.

Thanks to coroutines, thread synchronization is relatively easy without the inherent risks of condition variables such as spurious wakeups and lost wakeups.

 

Rainer D 6 P2 540x540Modernes C++ Mentoring

Be part of my mentoring programs:

 

 

 

 

Do you want to stay informed about my mentoring programs: Subscribe via E-Mail.

Thread Synchronization with co_await

 

// senderReceiver.cpp

#include <coroutine>
#include <chrono>
#include <iostream>
#include <functional>
#include <string>
#include <stdexcept>
#include <atomic>
#include <thread>

class Event {
 public:

    Event() = default;

    Event(const Event&) = delete;
    Event(Event&&) = delete;
    Event& operator=(const Event&) = delete;
    Event& operator=(Event&&) = delete;

    class Awaiter;
    Awaiter operator co_await() const noexcept;

    void notify() noexcept;

 private:

    friend class Awaiter;
  
    mutable std::atomic<void*> suspendedWaiter{nullptr};
    mutable std::atomic<bool> notified{false};

};

class Event::Awaiter {
 public:
  Awaiter(const Event& eve): event(eve) {}

  bool await_ready() const;
  bool await_suspend(std::coroutine_handle<> corHandle) noexcept;
  void await_resume() noexcept {}

 private:
    friend class Event;

    const Event& event;
    std::coroutine_handle<> coroutineHandle;
};

bool Event::Awaiter::await_ready() const {                            // (7)
  
    // allow at most one waiter
    if (event.suspendedWaiter.load() != nullptr){
        throw std::runtime_error("More than one waiter is not valid");
    }
  
    // event.notified == false; suspends the coroutine
    // event.notified == true; the coroutine is executed such as a usual function
    return event.notified;
}
                                                                     // (8)
bool Event::Awaiter::await_suspend(std::coroutine_handle<> corHandle) noexcept {

    coroutineHandle = corHandle;                                    
  
    if (event.notified) return false;
  
    // store the waiter for later notification
    event.suspendedWaiter.store(this);

    return true;
}

void Event::notify() noexcept {                                        // (6)
    notified = true;
  
    // try to load the waiter
    auto* waiter = static_cast<Awaiter*>(suspendedWaiter.load());
 
    // check if a waiter is available
    if (waiter != nullptr) {
        // resume the coroutine => await_resume
        waiter->coroutineHandle.resume();
    }
}

Event::Awaiter Event::operator co_await() const noexcept {
    return Awaiter{ *this };
}

struct Task {
    struct promise_type {
        Task get_return_object() { return {}; }
        std::suspend_never initial_suspend() { return {}; }
        std::suspend_never final_suspend() { return {}; }
        void return_void() {}
        void unhandled_exception() {}
    };
};

Task receiver(Event& event) {                                        // (3)
    auto start = std::chrono::high_resolution_clock::now();
    co_await event;                                                 
    std::cout << "Got the notification! " << std::endl;
    auto end = std::chrono::high_resolution_clock::now();
    std::chrono::duration<double> elapsed = end - start;
    std::cout << "Waited " << elapsed.count() << " seconds." << std::endl;
}

using namespace std::chrono_literals;

int main(){
    
    std::cout << std::endl;
    
    std::cout << "Notification before waiting" << std::endl;
    Event event1{};
    auto senderThread1 = std::thread([&event1]{ event1.notify(); });  // (1)
    auto receiverThread1 = std::thread(receiver, std::ref(event1));   // (4)
    
    receiverThread1.join();
    senderThread1.join();
    
    std::cout << std::endl;
    
    std::cout << "Notification after 2 seconds waiting" << std::endl;
    Event event2{};
    auto receiverThread2 = std::thread(receiver, std::ref(event2));  // (5)
    auto senderThread2 = std::thread([&event2]{
      std::this_thread::sleep_for(2s);
      event2.notify();                                               // (2)
    });
    
    receiverThread2.join();
    senderThread2.join();
     
    std::cout << std::endl;
    
}

 

Thread synchronization with coroutines is straightforward from the user's perspective. Let’s have a look at the program senderReiver.cpp. The threads senderThread1 (line 1) and senderThread2 (line 2) use an event to send its notification. The function receiver in line (3) is the coroutine which is executed in the thread receiverThread1 (line 4) and receiverThread2 (line 5). I measured the time between the coroutine's beginning and end and displayed it. This number shows how long the coroutine waits. The following screenshot displays the output of the program with the Wandbox. The Compiler Explorer does not support thread creation, but Matt is "on it". 

senderReceiver


The output displays that the execution of the second coroutine takes about two seconds. The reason is that event1 sends its notification (line 1) before the coroutine is suspended, but event2 sends its notification after a time duration of 2 seconds (line 2).

Now, I put the implementer's hat on and give you a rough idea of the workflow of the coroutine framework. 

The simplified workflow

There is a subtle difference if you compare the class Generator in the last post (C++20: An Infinite Data Stream with Coroutines) with the class Event in this example. In the first case, the Generator is the awaitable and the awaiter, in the second case, the Event uses the operator co_await to return the awaiter. This separation of concerns into the awaitable and the awaiter improves the structure of the code.

In my explanation of both workflows, I assume in the first case, the event notification happens before the coroutine awaits the events. For the second case, I assume it is the other way around. 

Let’s first look at event1 and the first workflow. event1 send its notification before receiverThread1 is started. The call event1 (line 1) triggers the member function notify (line 6). First, the notification flag is set, and then, the call auto* waiter = static_cast<awaiter*>(suspendedWaiter.load()); loads the potential waiter. In this case, the waiter is a nullptr because it was not set before. This means the following resume call on the waiter is not executed. The subsequentially performed function await_ready (line 7) checks first if there is more than one waiter. In this case, I throw, for simplicity reasons, a std::runtime exception. The crucial part of this member function is the return value. event.notification was already set to true in the notify method. true means that the coroutine is not suspended and executes such as a usual function.

In the second workflow, the co_await event2 call happens before event2 sends its notification. co_wait event2 triggers the call await_ready (line 7). The big difference to the first workflow is now that event.notified is false. This false value causes the suspension of the coroutine. Technically the member function await_suspend (lines 8) is executed. await_suspend gets the coroutine handle corHandle and stores it for later invocation in the variable coroutineHandle. Of course, later invocation means resumption. Secondly, the waiter is stored in the variable suspendedWaiter. When later event2.notify triggers its notification, the method notify (line 6) is executed. The difference to the first workflow is that the condition waiter != nullptr evaluates to true. The consequence is that the waiter uses the coroutineHandle to resume the coroutine.

What's next?

If I have one conclusion to this and my last post (C++20: An Infinite Data Stream with Coroutines), then this one: Don't implement your coroutines. Use existing coroutines such as the one available with cppcoro from Lewis Baker. I strictly follow this advice in my next post.

Four Voucher for Educative

There are four vouchers for educative to win: https://bit.ly/VoucherEducative. The vouchers allow you to access all educative.io courses for a quarter of a year.

 

 

Thanks a lot to my Patreon Supporters: Matt Braun, Roman Postanciuc, Tobias Zindl, G Prvulovic, Reinhold Dröge, Abernitzke, Frank Grimm, Sakib, Broeserl, António Pina, Sergey Agafyin, Андрей Бурмистров, Jake, GS, Lawton Shoemake, Animus24, Jozo Leko, John Breland, Venkat Nandam, Jose Francisco, Douglas Tinkham, Kuchlong Kuchlong, Robert Blanch, Truels Wissneth, Kris Kafka, Mario Luoni, Friedrich Huber, lennonli, Pramod Tikare Muralidhara, Peter Ware, Daniel Hufschläger, Alessandro Pezzato, Bob Perry, Satish Vangipuram, Andi Ireland, Richard Ohnemus, Michael Dunsky, Leo Goodstadt, John Wiederhirn, Yacob Cohen-Arazi, Florian Tischler, Robin Furness, Michael Young, Holger Detering, Bernd Mühlhaus, Matthieu Bolt, Stephen Kelley, Kyle Dean, Tusar Palauri, Dmitry Farberov, Juan Dent, George Liao, Daniel Ceperley, Jon T Hess, Stephen Totten, Wolfgang Fütterer, Matthias Grün, Phillip Diekmann, Ben Atakora, Ann Shatoff, and Rob North.

 

Thanks, in particular, to Jon Hess, Lakshman, Christian Wittenhorst, Sherhy Pyton, Dendi Suhubdy, Sudhakar Belagurusamy, Richard Sargeant, Rusty Fleming, John Nebel, Mipko, Alicja Kaminska, and Slavko Radman.

 

 

My special thanks to Embarcadero CBUIDER STUDIO FINAL ICONS 1024 Small

 

My special thanks to PVS-Studio PVC Logo

 

My special thanks to Tipi.build tipi.build logo

 

My special thanks to Take Up Code TakeUpCode 450 60

 

Seminars

I'm happy to give online seminars or face-to-face seminars worldwide. Please call me if you have any questions.

Bookable (Online)

German

Standard Seminars (English/German)

Here is a compilation of my standard seminars. These seminars are only meant to give you a first orientation.

  • C++ - The Core Language
  • C++ - The Standard Library
  • C++ - Compact
  • C++11 and C++14
  • Concurrency with Modern C++
  • Design Pattern and Architectural Pattern with C++
  • Embedded Programming with Modern C++
  • Generic Programming (Templates) with C++

New

  • Clean Code with Modern C++
  • C++20

Contact Me

Modernes C++,

RainerGrimmDunkelBlauSmall

 

Comments   

+3 #1 Marius 2020-04-13 11:49
One page program just became 3+ pages, really?
Also, cond.notify_one should be protected with the corresponding mutex lock (otherwise the notification will be lost).
Moreover, there is no reason to risk with notify_one, always use notify_all.
Quote
0 #2 Arne Kreutzmann 2020-04-14 04:55
I really like your blog posts and concise examples.
I have one question, why are you using

mutable std::atomic suspendedWaiter{nullptr};

instead of

mutable std::atomic suspendedWaiter{nullptr};

?
Quote
+1 #3 Vaughn Cato 2020-04-14 15:59
Thanks for this article. In the senderReceiver.cpp example, if the receiver suspends before the sender notifies the event, isn't it true that the second part of the receiver will be executed in the sender's thread?
Quote
+1 #4 Jacek 2020-04-15 06:09
Great post, thanks.

I have 2 questions, or remarks, though.

1. When you start two threads, one by one, as you did in your example, you actually don't have the guarantee that they will be really started in this order. It is up to the operating system, isn't it?

2. Could you please also explain what happens on the thread level? When the thread which runs the coroutine (receiver) executes the co_await instruction, the coroutine is suspended, but what happens to the thread itself? When the notification comes, is the resume method executed in the same thread in which it is called (thus executing the remaining part of a coroutine in different thread than it originally started in), or is the execution brought back to the same thread where the coroutine started?
Quote
+4 #5 Tarun Elankath 2020-10-18 06:17
The lock-wait program is far smaller and easier to reason and analyze than the co-routine version.
Quote
0 #6 Rainer Grimm 2020-10-21 20:24
Quoting Tarun Elankath:
The lock-wait program is far smaller and easier to reason and analyze than the co-routine version.

Your comparison is unfair. From a user perspective, you only care about the main function and the coroutine (receiver). The other functionality should be part of a library.
Quote
+4 #7 John Pursey 2020-10-27 23:59
This still needs a mutex to be thread safe. For example, it could still be a lost notification as follows:

Receiver Thread:
in Event::Awaiter::await_suspend, context switch after notified is checked and before sender is set.

SenderThread:
Calls notify, seeing no waiter and so not resuming it.

ReceiverThread:
Stores the waiter, but it is too late, notify was already called.
Quote
+1 #8 Venelin Efremov 2020-12-29 01:17
Thank you for the concise example. One thing that is not obvious is that this code does not actually synchronize the threads.

In the second example, the receiver thread finishes very quickly after the coroutine frame is constructed. The code that prints "Got the notification!" is executed in the thread that sends the notification because that is the thread that resumes the coroutine.

This can easily be demonstrated by adding:
a few lines in the coroutine:

std::cout
Quote
+1 #9 Michel Lesoinne 2022-01-19 04:53
Restarting the coroutine in the Event::notify() (case 2) is a terrible idea as it is done. The caller of notify has to wait for the end of the coroutine. If that's a complex coroutine, it could be a long time before the notifier resumes. Plus the coroutine has been moved to the notifier's thread.

I understand this is an example, but in general, do not do this unless it is a toy code!
In general you want to switch between coroutine by passing back the handle of the awaiter from a routine of the coroutine system like `final_suspend`.
Quote

Stay Informed about my Mentoring

 

Mentoring

English Books

Course: Modern C++ Concurrency in Practice

Course: C++ Standard Library including C++14 & C++17

Course: Embedded Programming with Modern C++

Course: Generic Programming (Templates)

Course: C++ Fundamentals for Professionals

Course: The All-in-One Guide to C++20

Course: Master Software Design Patterns and Architecture in C++

Subscribe to the newsletter (+ pdf bundle)

All tags

Blog archive

Source Code

Visitors

Today 4682

Yesterday 4550

Week 4682

Month 26356

All 12104565

Currently are 187 guests and no members online

Kubik-Rubik Joomla! Extensions

Latest comments