A Coroutines-Based Single Consumer – Single Producer Workflow by Ljubic Damir
I’m happy to present in this guest post by Ljubic Damir a typical use-case for coroutines: a producer – consumer workflow.
Although this producer-consumer workflow is challenging, it is a nice starting point for your coroutine experiments.
Intro
Coroutines provide a more intuitive and structured way of writing asynchronous code by allowing you to write asynchronous operations in a procedural style. They are a feature introduced in C++20 to simplify asynchronous programming.
Pre-existing mechanisms like std::async
tasks, std::packaged_task
, or events (std::condition_variable
& std::mutex
), synchronize two or more threads on the result of the task by establishing a communication channel. This communication channel has two ends:
– std::promise
that writes into the shared state either the result or the exception, and
– std::future
(std::shared_future
) – a receiving end that waits on the result of the task (or the exception).
Unlike this pre-existing mechanism, coroutines don‘t directly involve threads or other OS synchronization primitives. They are a pure software abstraction based on the coroutine’s control object and the state-machine logic built around it.
Coroutines are stackless – which means that the control object has to be created on the heap. Coincidentally, it‘s a library wrapper around the promise_type
(std::coroutine_handle<promise_type>
), but it doesn’t have anything in common with std::promise
.
The promise_type
is an interface (a customization point) that describes the predefined transition states in a coroutine’s state machine.
Coroutines are highly versatile and can be used in various scenarios where you must manage asynchronous message flow. One common example is socket-based communication.
Modernes C++ Mentoring
Do you want to stay informed: Subscribe.
Today, I‘ll try to enlighten coroutines through another example: single producer – single consumer workflow.
Implementation
First, we need to define result type for the coroutine
class[[nodiscard]] AudioDataResult final { public: class promise_type; using handle_type = std::coroutine_handle<promise_type>; class promise_type { ... }; };
as a wrapper around the inner: promise_type
type. We decorate the enclosing class with [[nodiscard]]
attribute, since the result type is the control object of the coroutine that we return to the client code – to manage its suspension/resumption.
@note The destructor of the class cleans the resources (dynamic memory) in RAII fashion, so strictly speaking, the return type could be ignored if there is no need to manage the coroutine state.
~AudioDataResult() { if(handle_) { handle_.destroy(); } }
The result type is move-only: the copy operations are forbidden – to prevent the control object from being multiplicated.
// Make the result type move-only, due to exclusive ownership over the handle AudioDataResult(const AudioDataResult& ) = delete; AudioDataResult& operator= (constAudioDataResult& ) = delete; AudioDataResult(AudioDataResult&& other) noexcept: handle_(std::exchange(other.handle_, nullptr)) {} AudioDataResult& operator = (AudioDataResult&& other) noexcept { using namespace std; AudioDataResult tmp =std::move(other); swap(*this, tmp); return *this; }
Let’s define the promise_type
interface itself:
// Predefined interface that has to be specify in order to implement // coroutine's state-machine transitions class promise_type { public: using value_type = std::vector<int>; AudioDataResult get_return_object() { return AudioDataResult{ handle_type::from_promise(*this) }; } std::suspend_never initial_suspend() noexcept { return{}; } std::suspend_always final_suspend() noexcept { return{}; } void return_void() {} void unhandled_exception() { std::rethrow_exception(std::current_exception()); } // Generates the value and suspend the "producer" template <typename Data> requires std::convertible_to<std::decay_t<Data>, value_type> std::suspend_always yield_value(Data&& value) { data_ = std::forward<Data>(value); data_ready_.store(true, std::memory_order::relaxed); return {}; } private: value_type data_; std::atomic<bool> data_ready_; };//promise_type interface
The promise_type
defines the necessary infrastructure of the coroutine. Additionally, for any coroutines that want to act as a generator – “producer“, to yield the values: promise_type
has to be enhanced with the yield_value
method (co_yield ≡ co_await promise_.yield_value
). Also, to resume the producer, at the point when provided data are consumed – we need to expose the appropriate wrapper method resume():
void resume() { if( not handle_.done()) { handle_.resume();} }
Now – we need to extend the coroutine to support the consumer requirements: to be synchronized on the data readiness. In other words, the consumer will be suspended until the data are signaled as available by the producer. For that, we need to implement the Awaiter interface:
class promise_type { // Awaiter interface: for consumer waiting on data being ready struct AudioDataAwaiter { explicit AudioDataAwaiter(promise_type& promise) noexcept: promise_(promise) {} bool await_ready() const { return promise_.data_ready_.load(std::memory_order::relaxed); } void await_suspend(handle_type) const { while( not promise_.data_ready_.exchange(false)) { std::this_thread::yield(); } } // move assignment at client invocation side: // const auto data = co_await audioDataResult; // This requires that coroutine's result type provides the co_await unary operator value_type&& await_resume() const { return std::move(promise_.data_); } private: promise_type& promise_; };//Awaiter interface };//promise_type
In its state machine, the await_ready()
will be the first transition state: it will be inspected on data readiness. If the data are not ready, as next the await_suspend()
will be called. Here we wait – until the matching flag is being set. Finally, the await_resume()
will be called: we “move” the value from the promise_type
, by unconditionally cast to the rvalue reference. On the client invocation side, this will cause the move assignment operator on the returned value – data to be called.
const auto data = co_await audioDataResult;
For that to work, the result type must provide the co_await
unary operator, which returns our Awaiter interface.
class AudioDataResult { auto operator co_await() noexcept { return promise_type::AudioDataAwaiter{handle_.promise()}; } };
<Example 1>: https://godbolt.org/z/MvYfbEP8r
The following program producerConsumer.cpp
shows a simplified version of example 1:
// producerConsumer.cpp #include <algorithm> #include <atomic> #include <chrono> #include <coroutine> #include <functional> #include <iostream> #include <iterator> #include <memory> #include <source_location> #include <thread> #include <utility> #include <vector> void funcName(const std::source_location location = std::source_location::current()) { std::cout << location.function_name() << '\n'; } template <typename Container> void printContainer(const Container& container) { typedef typename Container::value_type value_type; auto first = std::cbegin(container); auto last = std::cend(container); std::cout << " ["; std::copy(first, std::prev(last), std::ostream_iterator<value_type>(std::cout, ", ")); std::cout << *std::prev(last) << "]\n"; } class [[nodiscard]] AudioDataResult final { public: class promise_type; using handle_type = std::coroutine_handle<promise_type>; // Predefined interface that has to be specify in order to implement // coroutine's state-machine transitions class promise_type { public: using value_type = std::vector<int>; AudioDataResult get_return_object() { return AudioDataResult{handle_type::from_promise(*this)}; } std::suspend_never initial_suspend() noexcept { return {}; } std::suspend_always final_suspend() noexcept { return {}; } void return_void() {} void unhandled_exception() { std::rethrow_exception(std::current_exception()); } // Generates the value and suspend the "producer" template <typename Data> requires std::convertible_to<std::decay_t<Data>, value_type> std::suspend_always yield_value(Data&& value) { data_ = std::forward<Data>(value); data_ready_.store(true); return {}; } // Awaiter interface: for consumer waiting on data being ready struct AudioDataAwaiter { explicit AudioDataAwaiter(promise_type& promise) noexcept: promise_(promise) {} bool await_ready() const { return promise_.data_ready_.load();} void await_suspend(handle_type) const { while(not promise_.data_ready_.exchange(false)) { std::this_thread::yield(); } } // move assignment at client invocation side: const auto data = co_await audioDataResult; // This requires that coroutine's result type provides the co_await unary operator value_type&& await_resume() const { return std::move(promise_.data_); } private: promise_type& promise_; };//Awaiter interface private: value_type data_; std::atomic<bool> data_ready_; }; //promise_type interface auto operator co_await() noexcept { return promise_type::AudioDataAwaiter{handle_.promise()}; } // Make the result type move-only, due to ownership over the handle AudioDataResult(const AudioDataResult&) = delete; AudioDataResult& operator=(const AudioDataResult&) = delete; AudioDataResult(AudioDataResult&& other) noexcept: handle_(std::exchange(other.handle_, nullptr)){} AudioDataResult& operator=(AudioDataResult&& other) noexcept { using namespace std; AudioDataResult tmp = std::move(other); swap(*this, tmp); return *this; } // d-tor: RAII ~AudioDataResult() { if (handle_) {funcName(); handle_.destroy();}} // For resuming the producer - at the point when the data are consumed void resume() {if (not handle_.done()) { funcName(); handle_.resume();}} private: AudioDataResult(handle_type handle) noexcept : handle_(handle) {} private: handle_type handle_; }; using data_type = std::vector<int>; AudioDataResult producer(const data_type& data) { for (std::size_t i = 0; i < 5; ++i) { funcName(); co_yield data; } co_yield data_type{}; // exit criteria } AudioDataResult consumer(AudioDataResult& audioDataResult) { while(true) { funcName(); const auto data = co_await audioDataResult; if (data.empty()) {std::cout << "No data - exit!\n"; break;} std::cout << "Data received:"; printContainer(data); audioDataResult.resume(); // resume producer } } int main() { { const data_type data = {1, 2, 3, 4}; auto audioDataProducer = producer(data); std::thread t ([&]{auto audioRecorded = consumer(audioDataProducer);}); t.join(); } std::cout << "bye-bye!\n"; }
Finally, here is the output of the program:
The other way around is to use the promise_type::await_transform()
– to wait on the value stored in the promise_type
instance used by the producer.
class promise_type { auto await_transform(handle_type other) { // Awaiter interface: remained the same struct AudioDataAwaiter { explicit AudioDataAwaiter(promise_type& promise)noexcept: promise_(promise) {} ... }; return AudioDataAwaiter{other.promise()}; } };
This way, we don’t need to specify the co_await
unary operator of the result type anymore, but rather (explicit) conversion operator
explicit operator handle_type() const {return handle_;}
so that we can pass it at the point when the consumer calls co_await
, which will internally be translated to the await_transform()
call.
const auto data = co_await static_cast<AudioDataResult::handle_type>(audioDataResult);
We can illustrate this as: me.handle_.promise().await_transform(other.handle_)
<Example 2>: https://godbolt.org/z/57zsK9rEn
Conclusion
In this simple example, the producer will be suspended without any penalty – since, after being resumed, it will provide the very same – upfront known sequence of data. In a real scenario, that is likely not the case: the producer will probably be some kind of mediator – receiver of asynchronously emitted data that will be retransmitted to the consumer. For that, some queuing logic needs to be implemented at the producer side to avoid the data loss at the point of being suspended, waiting for the consumer to resume it – to compensate for the differences in producer data arrival vs. consumer consumption rate.
What’s Next?
In C++20, you can define or default
the three-way comparison operator. This gives you all six comparison operators: ==, !=, <, <=, >
, and >=
. Do you know that you can also define or default
the equal operator (==)
?
A Short Christmas Break
My blog makes a short Christmas break. The next post will be published on the 8th January 2024. Have a good time.
Thanks a lot to my Patreon Supporters: Matt Braun, Roman Postanciuc, Tobias Zindl, G Prvulovic, Reinhold Dröge, Abernitzke, Frank Grimm, Sakib, Broeserl, António Pina, Sergey Agafyin, Андрей Бурмистров, Jake, GS, Lawton Shoemake, Jozo Leko, John Breland, Venkat Nandam, Jose Francisco, Douglas Tinkham, Kuchlong Kuchlong, Robert Blanch, Truels Wissneth, Mario Luoni, Friedrich Huber, lennonli, Pramod Tikare Muralidhara, Peter Ware, Daniel Hufschläger, Alessandro Pezzato, Bob Perry, Satish Vangipuram, Andi Ireland, Richard Ohnemus, Michael Dunsky, Leo Goodstadt, John Wiederhirn, Yacob Cohen-Arazi, Florian Tischler, Robin Furness, Michael Young, Holger Detering, Bernd Mühlhaus, Stephen Kelley, Kyle Dean, Tusar Palauri, Juan Dent, George Liao, Daniel Ceperley, Jon T Hess, Stephen Totten, Wolfgang Fütterer, Matthias Grün, Phillip Diekmann, Ben Atakora, Ann Shatoff, Rob North, Bhavith C Achar, Marco Parri Empoli, Philipp Lenk, Charles-Jianye Chen, Keith Jeffery,and Matt Godbolt.
Thanks, in particular, to Jon Hess, Lakshman, Christian Wittenhorst, Sherhy Pyton, Dendi Suhubdy, Sudhakar Belagurusamy, Richard Sargeant, Rusty Fleming, John Nebel, Mipko, Alicja Kaminska, Slavko Radman, and David Poole.
My special thanks to Embarcadero | |
My special thanks to PVS-Studio | |
My special thanks to Tipi.build | |
My special thanks to Take Up Code | |
My special thanks to SHAVEDYAKS |
Modernes C++ GmbH
Modernes C++ Mentoring (English)
Rainer Grimm
Yalovastraße 20
72108 Rottenburg
Mail: schulung@ModernesCpp.de
Mentoring: www.ModernesCpp.org
Modernes C++ Mentoring,