C++20: An Infinite Data Stream with Coroutines

Contents[Show]

My story to coroutines in C++20 goes on. Today I dive deep into the coroutines framework to create an infinite data stream. Consequentially, you have to read the two previous posts "C++20: Coroutines - A First Overview", and "C++20: More Details to Coroutines" to be prepared. 

 

TimelineCpp20

The framework for writing coroutines consists of more than 20 functions that you partially have to implement and partially could overwrite. Therefore, you can tailor the coroutine to your needs. On the end, you can, for example, create a generator Generator<int> for an infinite data stream such as the following one: 

Generator<int> getNext(int start = 0, int step = 1) {
    auto value = start;
    for (int i = 0;; ++i) {
        co_yield value;
        value += step;
    }
}

 

Now, we know the destiny of our job. Let's start.

The Framework

A coroutine consists of three parts: the promise object, the coroutine handle, and the coroutine frame.

  • Promise object: The promise object is manipulated from inside the coroutine, and it delivers its result via the promise object.
  • Coroutine handle: The coroutine handle is a non-owning handle to resume or to destroy the coroutine frame from outside.
  • Coroutine frame: The coroutine frame is an internal, typically heap-allocated state. It consists of the already mentioned promise object, the copied parameters of the coroutine, the representation of the suspension points, local variables which lifetime ends before the current suspension point, and local variables which lifetime exceeds the current suspension point.

The Simplified Workflow

When you use co_yield, co_await, or co_return in a function, the function becomes a coroutine, and the compiler transforms its body to something equivalent to the following lines. 

 

{
  Promise promise;
  co_await promise.initial_suspend();
  try
  {
    <function body>
  }
  catch (...)
  {
    promise.unhandled_exception();
  }
  FinalSuspend:
    co_await promise.final_suspend();
}

<function body> stands for  the original function body. The simplified workflow of the coroutine consists of the following phases.

The coroutine begins execution

  • Allocates the coroutine frame
  • Copies all functions parameters to the coroutine frame
  • Creates the promise object promise
  • Calls promise.get_return_object() to create the coroutine handle and keeps it in a local variable. The result of the call will be returned to the caller when the coroutine first suspends. 
  • Calls promise.initial_suspend() and co_await's its result. The promise type typically returns std::suspend_never for eagerly-started coroutines or std::suspend_always for lazily-started coroutines. 
  • The body of the coroutine is executed, when co_await promise.initial_suspend() resumes

The coroutine reaches a suspension point

  • The coroutine handle (promise.get_return_object()) is returned to the caller, which resumed the coroutine

The coroutine reaches co_return

  • Calls promise.return_void() for co_return or co_return expression, where expression has type void
  • Calls promise.return_value(expression) for co_return expression, where expression has non-type void
  • Destroys all stack created variables
  • Calls promise.final_suspend() and co_await's its result

The coroutine is destroyed (by terminating via co_return, an uncaught exception, or via the coroutine handle)

  • Calls the destruction of the promise object
  • Calls the destructor of the function parameters
  • Frees the memory used by the coroutine frame
  • Transfers control back to the caller

 Let's put the theory into praxis.

An Infinite Data Stream with co_yield

The following program produces an infinite data stream. The coroutine getNext uses co_yield to create a data stream that starts at start and gives on request the next value, incremented by step.

 

// infiniteDataStream.cpp

#include <coroutine>
#include <memory>
#include <iostream>

template<typename T>
struct Generator {
    
    struct promise_type;
    using handle_type = std::coroutine_handle<promise_type>;
    
    Generator(handle_type h): coro(h) {}                         // (3)
    handle_type coro;
    
    ~Generator() {
        if ( coro ) coro.destroy();
    }
    Generator(const Generator&) = delete;
    Generator& operator = (const Generator&) = delete;
    Generator(Generator&& oth) noexcept : coro(oth.coro) {
        oth.coro = nullptr;
    }
    Generator& operator = (Generator&& oth) noexcept {
        coro = oth.coro;
        oth.coro = nullptr;
        return *this;
    }
    T getValue() {
        return coro.promise().current_value;
    }
    bool next() {                                                // (5)
        coro.resume();
        return not coro.done();
    }
    struct promise_type {
        promise_type()  = default;                               // (1)
          
        ~promise_type() = default;
        
        auto initial_suspend() {                                 // (4)
            return std::suspend_always{};
        }
        auto final_suspend() {
            return std::suspend_always{};
        }
        auto get_return_object() {                               // (2)
            return Generator{handle_type::from_promise(*this)};
        }
        auto return_void() {
            return std::suspend_never{};
        }
      
        auto yield_value(const T value) {                        // (6) 
            current_value = value;
            return std::suspend_always{};
        }
        void unhandled_exception() {
            std::exit(1);
        }
        T current_value;
    };

};

Generator<int> getNext(int start = 0, int step = 1) noexcept {
    auto value = start;
    for (int i = 0;; ++i){
        co_yield value;
        value += step;
    }
}

int main() {
    
    std::cout << std::endl;
  
    std::cout << "getNext():";
    auto gen = getNext();
    for (int i = 0; i <= 10; ++i) {
        gen.next();
        std::cout << " " << gen.getValue();                      // (7)
    }
    
    std::cout << "\n\n";
    
    std::cout << "getNext(100, -10):";
    auto gen2 = getNext(100, -10);
    for (int i = 0; i <= 20; ++i) {
        gen2.next();
        std::cout << " " << gen2.getValue();
    }
    
    std::cout << std::endl;
    
}

 

The main function creates two coroutines. The first one gen returns the values from 0 to 10, and the second one gen2 the values from 100 to -100. Before I dive into the workflow, thanks to the  Compiler Explorer and GCC 10, here is the output of the program.

infiniteDataStream

The numbers in the program infiniteDataStream.cpp stand for the steps in the first iteration of the workflow.

  1. Creates the promise
  2. Calls promise.get_return_object() and keeps the result in a local variable
  3. Creates the generator
  4. Calls promise.initial_suspend(). The generator is lazy and, therefore, suspends always.
  5. Asks for the next value and returns if the generator is consumed
  6. Triggered by the co_yield call. The next value is afterward available.
  7. Gets the next value

In additional iterations, only the steps 5 to 7 are performed. 

It is quite challenging to understand the underlying framework of coroutines. Playing with existing coroutines and observing the changed behavior may be the easiest way to grasp them. The presented coroutine that creates an infinite data stream is a good starting point for your first experiments: just use the link to the executable program on Compiler Explorer.

What's next?

In today's post, I used co_yield to create an infinite data stream. My next post is about thread synchronization with co_await

 

 

Thanks a lot to my Patreon Supporters: Meeting C++, Matt Braun, Roman Postanciuc, Venkata Ramesh Gudpati, Tobias Zindl, Marko, G Prvulovic, Reinhold Dröge, Abernitzke, Richard Ohnemus, Frank Grimm, Sakib, Broeserl, António Pina, Markus Falkner, Darshan Mody, Sergey Agafyin, Андрей Бурмистров, Jake, GS, Lawton Shoemake, Animus24, Jozo Leko, John Breland, espkk, Wolfgang Gärtner, and Dendi Suhubdy.

 

 

Thanks in particular to:   crp4

 

   

Get your e-book at Leanpub:

The C++ Standard Library

 

Concurrency With Modern C++

 

Get Both as one Bundle

cover   ConcurrencyCoverFrame   bundle
With C++11, C++14, and C++17 we got a lot of new C++ libraries. In addition, the existing ones are greatly improved. The key idea of my book is to give you the necessary information to the current C++ libraries in about 200 pages. I also included more than 120 source files.  

C++11 is the first C++ standard that deals with concurrency. The story goes on with C++17 and will continue with C++20.

I'll give you a detailed insight into the current and upcoming concurrency in C++. This insight includes the theory and a lot of practice with more than 140 source files.

 

Get my books "The C++ Standard Library" (including C++17) and "Concurrency with Modern C++" in a bundle.

In sum, you get more than 700 pages full of modern C++ and more than 260 source files presenting the standard library and concurrency in practice.

 

 

 

 

 

My Newest E-Books

Course: Modern C++ Concurrency in Practice

Course: C++ Standard Library including C++14 & C++17

Course: Embedded Programming with Modern C++

Course: Generic Programming (Templates)

Course: C++ Fundamentals for Professionals

Subscribe to the newsletter (+ pdf bundle)

Blog archive

Source Code

Visitors

Today 576

All 4358528

Currently are 94 guests and no members online

Kubik-Rubik Joomla! Extensions

Latest comments