Asynchronous Programming: Back to the Future

C++

Asynchronous programming… Hearing these words, programmers’ eyes begin to shine, breathing becomes shallow, hands are shaking and the brain is drawing multiple levels of abstraction… Managers’ eyes become wide, sounds become inarticulate, fists are clenched, and the voice switches to overtones. The only thing that unites these two groups of people is a rapid pulse. However, there are different reasons. While programmers are eager for the fight, managers are trying to look into the crystal ball and realize the risks, frantically trying to come up with reasons to extend the deadlines as mush as they can. Later, when the most part of the code is already written, programmers begin to realize and experience the bitterness of asynchronous programming, spending endless nights in a debugger, desperately trying to understand what is actually happening…

That’s exactly what my inflamed imagination draws when hearing “asynchronous programming”. Of course, all of it is too emotional and not always true. Right? Various options are possible. Some people might say that “everything will work well with the right approach”. But you could always say so, at every possible occasion. This doesn’t make anything better. Bugs are not fixed, and insomnia won’t go away.

So, what is asynchronous programming? Why is it so attractive, and, most importantly, what’s wrong with it?

Introduction

Asynchronous programming is quite a popular subject nowadays. It’s enough to look around to make sure it’s true. You will come across reviews of various libraries, as well as the Go language, and all sorts of asynchronous frameworks in JS, and many other things.

As a rule, asynchronous programming is used for network programming: various sockets-shmockets, readers-writers and other acceptors. But interesting events also take place, especially in UI. In this article, I am going to talk about the network programming only. However, as we shall see in the next article, we can expand and deepen the approach to the unknown extent.

Being more specific, we are going to write a simple HTTP server that will send a standard response to any standard request. This is not to write a parser as it has exactly the same attitude to asynchronous programming as the position of stars to the character of a man (see astrology).

Synchronous Single-Threaded Server

Hmm. Synchronous? Reading an article on asynchronous programming, a careful reader may say that «asynchronous» has nothing to do here. Well, first of all, we have to begin with something, something simple. Secondly, I am the author here, so it’s going to be this way. Later, you’ll find out what it’s for.

In order not to write a low-level platform-dependent code, I’m going to use a powerful asynchronous library named boost.asio for all our purposes. Fortunately, there are lots of articles written about it, so we can be in-the-know.

For more clarity and «productioness» of the code, I am going to create wrappers around some of the boost.asio library’s functions. Someone can certainly like things like boost::asio::ip::tcp::socket or boost::asio::ip::udp::resolver::iterator, but this results to a less clear, as well as less readable code.

Here’s the definition of the socket and acceptor:

typedef std::string Buffer;

// forward declaration
struct Acceptor;
struct Socket
{
    friend struct Acceptor;

    Socket();
    Socket(Socket&& s);

    // reading fixed-size data
    void read(Buffer&);

    // reading data not greater than the buffer's size
    void readSome(Buffer&);

    // reading data to the until line
    int readUntil(Buffer&, const Buffer& until);

    // writing fixed-size data
    void write(const Buffer&);

    // closing the socket
    void close();

private:
    boost::asio::ip::tcp::socket socket;
};

struct Acceptor
{
    // port to listen to accept connection
    explicit Acceptor(int port);

    // socket creation for new connections
    void accept(Socket& socket);

private:
    boost::asio::ip::tcp::acceptor acceptor;
};

Nothing extra, just the server. Socket allows to read and write, as well as reading until certain symbols (readUntil). Acceptor listens to the specified port and accepts connections.

The implementation is provided below:

boost::asio::io_service& service()
{
    return single();
}

Socket::Socket() :
    socket(service())
{
}

Socket::Socket(Socket&& s) :
    socket(std::move(s.socket))
{
}

void Socket::read(Buffer& buffer)
{
    boost::asio::read(socket, boost::asio::buffer(&buffer[0], buffer.size()));
}

void Socket::readSome(Buffer& buffer)
{
    buffer.resize(socket.read_some(boost::asio::buffer(&buffer[0], buffer.size())));
}

bool hasEnd(size_t posEnd, const Buffer& b, const Buffer& end)
{
    return posEnd >= end.size() &&
        b.rfind(end, posEnd - end.size()) != std::string::npos;
}

int Socket::readUntil(Buffer& buffer, const Buffer& until)
{
    size_t offset = 0;
    while (true)
    {
        size_t bytes = socket.read_some(boost::asio::buffer(&buffer[offset], buffer.size() - offset));
        offset += bytes;
        if (hasEnd(offset, buffer, until))
        {
            buffer.resize(offset);
            return offset;
        }
        if (offset == buffer.size())
        {
            LOG("not enough size: " << buffer.size());
            buffer.resize(buffer.size() * 2);
        }
    }
}

void Socket::write(const Buffer& buffer)
{
    boost::asio::write(socket, boost::asio::buffer(&buffer[0], buffer.size()));
}

void Socket::close()
{
    socket.close();
}

Acceptor::Acceptor(int port) :
    acceptor(service(), boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), port))
{
}

void Acceptor::accept(Socket& socket)
{
    acceptor.accept(socket.socket);
}

Here I’ve used a singleton forio_service, so not to explicitly pass it all the time. How can a user know that there should be some io_service? That’s why I’ve hidden it away. I guess the rest is quite clear, except for the readUntil function. Its purpose is simple: read the bytes until the desired end comes. That’s exactly what we need for HTTP, as we can’t specify the size beforehand. That’s why we have to resize.

Let’s finally write the server. Here it is:

#define HTTP_DELIM          "\r\n"
#define HTTP_DELIM_BODY     HTTP_DELIM HTTP_DELIM

// our response
Buffer httpContent(const Buffer& body)
{
    std::ostringstream o;
    o << "HTTP/1.1 200 Ok" HTTP_DELIM
        "Content-Type: text/html" HTTP_DELIM
        "Content-Length: " << body.size() << HTTP_DELIM_BODY
        << body;
    return o.str();
}

// we're listening on a 8800 port (who knows, maybe 80 is not availbale?)
Acceptor acceptor(8800);
LOG("accepting");
while (true)
{
    Socket socket;
    acceptor.accept(socket);
    try
    {
        LOG("accepted");
        Buffer buffer(4000, 0);
        socket.readUntil(buffer, HTTP_DELIM_BODY);
        socket.write(httpContent("

Hello sync singlethread!

")); socket.close(); } catch (std::exception& e) { LOG("error: " << e.what()); } }

The server is ready!

Synchronous Multi-Threaded Server

Downsides of the previous server are obvious:

  1. It is unable to handle several connections simultaneously.
  2. The client can reuse the connection for a more effective interaction, and we always close it.

That’s why I decide to process connections in a different thread while still accepting further connections. For this purpose, we are going to need a function to create a new thread. I will name such function as go:

typedef std::function Handler;

void go(Handler handler)
{
    LOG("sync::go");
    std::thread([handler] {
        try
        {
            LOG("new thread had been created");
            handler();
            LOG("thread was ended successfully");
        }
        catch (std::exception& e)
        {
            LOG("thread was ended with error: " << e.what());
        }
    }).detach();
}

It is necessary to note the following thing: if we remove detach(), guess what the program is going to do?

It will stupidly terminate without ANY messages. Thanks to the developers of the standard, way to go!

Now, it’s time to write the server:

Acceptor acceptor(8800);

LOG("accepting");

while (true)
{
    Socket* toAccept = new Socket;
    acceptor.accept(*toAccept);
    LOG("accepted");
    go([toAccept] {
        try
        {
            Socket socket = std::move(*toAccept);
            delete toAccept;
            Buffer buffer;
            while (true)
            {
                buffer.resize(4000);
                socket.readUntil(buffer, HTTP_DELIM_BODY);
                socket.write(httpContent("

Hello sync multithread!

")); } } catch (std::exception& e) { LOG("error: " << e.what()); } }); }

One would think that everything is all right, but it’s not the case. On high load, in real-world scenarios, it will go down quite fast. That’s why the smart guys have thought for a while and decided to go asynchronous.

Asynchronous Server

What’s wrong with the previous approach? The thing is that most of the time threads are waiting on events from the network, gobbling resources, instead of performing actual work. We would like to make better use of threads for performing useful work.

Therefore, I am going to implement similar functions, but asynchronously, using the proactor pattern. What does this mean? This means that for all operations we call a function and pass it a callback that will be automagically invoked upon the completion of the operation. Which means they will call us as soon as the operation completes. This differs from the reactor pattern when we have to call the necessary processors ourselves, monitoring the state of operations. A typical example of a reactor is something like epoll, kqueue, and various selects. An example of a proactor: IOCP on Windows. I am going to use a cross-platform proactor boost.asio.

Asynchronous interfaces:

typedef boost::system::error_code Error;
typedef std::function IoHandler;

struct Acceptor;
struct Socket
{
    friend struct Acceptor;

    Socket();
    Socket(Socket&&);

    void read(Buffer&, IoHandler);
    void readSome(Buffer&, IoHandler);
    void readUntil(Buffer&, Buffer until, IoHandler);
    void write(const Buffer&, IoHandler);
    void close();

private:
    boost::asio::ip::tcp::socket socket;
};

struct Acceptor
{
    explicit Acceptor(int port);
    void accept(Socket&, IoHandler);

private:
    boost::asio::ip::tcp::acceptor acceptor;
};

Let’s have a look at the following things:

  1. Error handling differs significantly now. In case of the synchronous approach, we have two variants: returning an error code or generating an exception (this method has been used at the beginning of the article). However, there is the only way in case of asynchronous call: passing an error through the handler. That is, not even through the result, but as an input parameter of the handler. Want it or not, you will have to process errors like in the good old days when there were no exceptions: working with every trifle during the check. But it’s not the most interesting thing here. Interesting is when an error occurs in the handler, and we should process it. Restoring the context is a favorite task of asynchronous programming!
  2. I’ve used IoHandler for the one-size-fits-all approach, which makes the code simpler and multipurpose.

Having a closer look, the only difference from synchronous functions lies only in the fact that asynchronous ones contain an additional handler as an input parameter.

Well, seems like there’s nothing scary so far.

The Implementation:

Socket::Socket() :
    socket(service())
{
}

Socket::Socket(Socket&& s) :
    socket(std::move(s.socket))
{
}

void Socket::read(Buffer& buffer, IoHandler handler)
{
    boost::asio::async_read(socket, boost::asio::buffer(&buffer[0], buffer.size()),
        [&buffer, handler](const Error& error, std::size_t) {
            handler(error);
    }); 
}

void Socket::readSome(Buffer& buffer, IoHandler handler)
{
    socket.async_read_some(boost::asio::buffer(&buffer[0], buffer.size()),
        [&buffer, handler](const Error& error, std::size_t bytes) {
            buffer.resize(bytes);
            handler(error);
    });
}

bool hasEnd(size_t posEnd, const Buffer& b, const Buffer& end)
{
    return posEnd >= end.size() &&
        b.rfind(end, posEnd - end.size()) != std::string::npos;
}

void Socket::readUntil(Buffer& buffer, Buffer until, IoHandler handler)
{
    VERIFY(buffer.size() >= until.size(), "Buffer size is smaller than expected");
    struct UntilHandler
    {
        UntilHandler(Socket& socket_, Buffer& buffer_, Buffer until_, IoHandler handler_) :
            offset(0),
            socket(socket_),
            buffer(buffer_),
            until(std::move(until_)),
            handler(std::move(handler_))
        {
        }

        void read()
        {
            LOG("read at offset: " << offset);
            socket.socket.async_read_some(boost::asio::buffer(&buffer[offset], buffer.size() - offset), *this);
        }

        void complete(const Error& error)
        {
            handler(error);
        }

        void operator()(const Error& error, std::size_t bytes)
        {
            if (!!error)
            {
                return complete(error);
            }
            offset += bytes;
            VERIFY(offset <= buffer.size(), "Offset outside buffer size");
            LOG("buffer: '" << buffer.substr(0, offset) << "'");
            if (hasEnd(offset, buffer, until))
            {
                // found end
                buffer.resize(offset);
                return complete(error);
            }
            if (offset == buffer.size())
            {
                LOG("not enough size: " << buffer.size());
                buffer.resize(buffer.size() * 2);
            }
            read();
        }

    private:
        size_t offset;
        Socket& socket;
        Buffer& buffer;
        Buffer until;
        IoHandler handler;
    };
    UntilHandler(*this, buffer, std::move(until), std::move(handler)).read();
}

void Socket::write(const Buffer& buffer, IoHandler handler)
{
    boost::asio::async_write(socket, boost::asio::buffer(&buffer[0], buffer.size()),
        [&buffer, handler](const Error& error, std::size_t) {
            handler(error);
    }); 
}

void Socket::close()
{
    socket.close();
}

Acceptor::Acceptor(int port) :
    acceptor(service(), boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), port))
{
}

void Acceptor::accept(Socket& socket, IoHandler handler)
{
    acceptor.async_accept(socket.socket, handler);
}

Everything should be clear, except for the readUntil method. In order to call asynchronous reading on a socket several times, we should save the state. There’s a special UntilHandler class for this purpose. It saves the current state of an asynchronous operation. A similar implementation can be found in boost.asio for various functions (likeboost::asio::read) that require multiple calls of simpler (but not less asynchronous) operations.

In addition, we should write an analogue to _go_and dispatch:

void go(Handler);
void dispatch(int threadCount = 0);

Here, we have two things: a) a handler that will be running asynchronously in the pool of threads and b) creating a pool of threads with the subsequent dispatch.

Here’s how the implementation looks like:

void go(Handler handler)
{
    LOG("async::go");
    service().post(std::move(handler));
}

void run()
{
    service().run();
}

void dispatch(int threadCount)
{
    int threads = threadCount > 0 ? threadCount : int(std::thread::hardware_concurrency());
    RLOG("Threads: " << threads);
    for (int i = 1; i < threads; ++ i)
        sync::go(run);
    run();
}

We use sync::go here to create threads from the synchronous approach.

Server Implementation:

Acceptor acceptor(8800);
LOG("accepting");
Handler accepting = [&acceptor, &accepting] {
    struct Connection
    {
        Buffer buffer;
        Socket socket;

        void handling()
        {
            buffer.resize(4000);
            socket.readUntil(buffer, HTTP_DELIM_BODY, [this](const Error& error) {
                if (!!error)
                {
                    LOG("error on reading: " << error.message());
                    delete this;
                    return;
                }
                LOG("read");
                buffer = httpContent("

Hello async!

"); socket.write(buffer, [this](const Error& error) { if (!!error) { LOG("error on writing: " << error.message()); delete this; return; } LOG("written"); handling(); }); }); } }; Connection* conn = new Connection; acceptor.accept(conn->socket, [conn, &accepting](const Error& error) { if (!!error) { LOG("error on accepting: " << error.message()); delete conn; return; } LOG("accepted"); conn->handling(); accepting(); }); }; accepting(); dispatch();

Asynchrony: Back to the Future - Figure 3

Well, that’s it. The nesting of lambdas grows with each new call. Well, normally no one writes such things using lambdas, as there are difficulties with recursion. We should pass a lambda to itself, so that it could call itself. Nevertheless, the readability of the code will be pretty much the same meaning equally bad when compared to synchronous code.

Let’s discuss pros and cons of the asynchronous approach:

  1. The undoubted advantage (actually, that’s the reason why we’re doing this) is performance. It is not only much better but also massively higher!
  2. Now, it’s time to talk about disadvantages. Actually, there is just one — complex and confusing code, which is also more difficult to debug.

It’s good if you’ve written everything correctly, and now it all works without bugs. But what if it’s not the case? As they say, good luck with your debugging. Moreover, I have considered quite a simple example where we can track the sequence of calls. With a slight complication of the processing method (i.e. simultaneous read and write on the same socket), code complexity shoots up, while the number of bugs begins to grow almost exponentially.

So, is it really worth it? Should we deal with asynchronous things? Actually, there is a solution, and it is called coroutines.

Coroutines

Asynchrony: Back to the Future - Figure 4

What do we all want? Health, happiness, and money? We want a simple thing: use the advantages of asynchronous and synchronous approaches simultaneously. That is, performance would be like in the asynchronous approach, and the code would be as simple as that in the synchronous one.

Sounds great on paper but is it possible? To answer this question, we are going to need a brief introduction to coroutines.

What are regular procedures? For instance, we are at some place of execution, and then there is *boom*…and some procedure is being called. To call the procedure, we preserve the current point of return, and then call the procedure. It performs, completes, and returns the control to the place it has been called from. A coroutine is the same thing, but only different. It also returns the control to the place it has been called from but it does not complete. It stops at some point, from which it begins to work during the rerun. Thus, we get a sort of ping pong. The caller throws the ball, and then a coroutine catches it, runs to another place, and throws it back. The caller also does something (runs to a different place), and throws the ball again to the previous place, but of the coroutine. It goes like this until the coroutine completes. In general, we could say that a procedure is a special case of a coroutine.

How can we use it for our asynchronous tasks? We should keep in mind that a coroutine saves some execution context, which is extremely important for asynchronousness. That’s exactly what I am going to use. If a coroutine needs to perform an asynchronous operation, I will simply call an asynchronous method and exit the coroutine. Upon completion of the asynchronous operation, the handler will keep performing our coroutine from the point of the last call of the asynchronous operation. This means that all the dirty work of context saving falls on the shoulders of the implementation of coroutines.

That’s when the problems begin. The thing is that the support for coroutines on the side of languages and processors is ancient history. For the implementation of switching execution contexts, we have to perform lots of operations nowadays. We should save register states, then switch the stack and fill in some command boxes for the correct operation of the runtime environment (for exceptions, TLS, etc.) Moreover, the implementation depends not only on the architecture of the processor but also on the compiler and operating system. Sounds like the final nail in the coffin…

Fortunately, we’ve got boost.context that will implement everything necessary to support a particular platform. Everything is written with Assembler, in the best tradition. We can certainly use boost.coroutine but why when there’s boost.context? We need more hard-rock!

The Implementation of Coroutines

So, we are going to write coroutines for our purposes. The interface is going to be like this:

// leave the coroutine
void yield();

// check if we are inside the coroutine
bool isInsideCoro();

// coroutine
struct Coro
{
    // just in case
    friend void yield();

    Coro();

    // create and invoke the handler
    Coro(Handler);

    // no comments
    ~Coro();

    // run the handler
    void start(Handler);

    // resume the coroutine's work (only when it was completed with yield)
    void resume();

    // check if the coroutine can be resumed
    bool isStarted() const;

private:
    ...
};

As you can see, the interface is quite simple. Here’s a variant of using it:

void coro()
{
    std::cout << '2';
    yield();
    std::cout << '4';
}
std::cout << '1';
Coro c(coro);
std::cout << '3';
c.resume();
std::cout << '5';

The following should be seen on the screen:

12345 Let’s begin with the start method:

void Coro::start(Handler handler)
{
    VERIFY(!isStarted(), "Trying to start already started coro");
    context = boost::context::make_fcontext(&stack.back(), stack.size(), &starterWrapper0);
    jump0(reinterpret_cast(&handler));
}

Here, boost::context::make_fcontext creates the context and passes the static method starterWrapper0 as a starting function:

TLS Coro* t_coro;
void Coro::starterWrapper0(intptr_t p)
{
    t_coro->starter0(p);
}

that simply redirects to starter0 method, extracting the current Coro instance from TLS. All the magic of switching contexts is in the private methodjump0:

void Coro::jump0(intptr_t p)
{
Coro* old = this;
std::swap(old, t_coro);
running = true;
boost::context::jump_fcontext(&savedContext, context, p);
running = false;
std::swap(old, t_coro);
if (exc != std::exception_ptr())
std::rethrow_exception(exc);
}

Here we replace the old TLS value oft_coro with a new one (we need it for the recursive switching between several coroutines), then set various flags and switch the context using boost::context::jump_fcontext. At the end, we restore old values and rethrow exceptions.

Now, let’s have a look at the private starter0 method that invokes the necessary handler:

void Coro::starter0(intptr_t p)
{
    started = true;
    try
    {
        Handler handler = std::move(*reinterpret_cast(p));
        handler();
    }
    catch (...)
    {
        exc = std::current_exception();
    }
    started = false;
    yield0();
}

I’d like to highlight one interesting thing: if we do not save the handler inside the coroutine (before calling it) the program can crash during the next return. This is due to the fact that the handler stores some state that can be destroyed at some point.

Let’s have a look at other functions:

// return the control from the coroutine
void yield()
{
    VERIFY(isInsideCoro(), "yield() outside coro");
    t_coro->yield0();
}

// check if we are inside the coroutine
bool isInsideCoro()
{
    return t_coro != nullptr;
}

// resume the coroutine after yield
void Coro::resume()
{
    VERIFY(started, "Cannot resume: not started");
    VERIFY(!running, "Cannot resume: in running state");
    jump0();
}

// check if the coroutine is still running
bool Coro::isStarted() const
{
    return started || running;
}

// go to the preserved context
void Coro::yield0()
{
    boost::context::jump_fcontext(context, &savedContext, 0);
}

Synca: async vice versa

Asynchrony: Back to the Future - Figure 5

It’s time to implement asynchronousness on coroutines. A trivial option of implementation is provided in the following diagram:

Asynchrony: Back to the Future - Figure 6

We create a coroutine, then it starts an asynchronous operation and completes its execution using yield() function. Upon completion of the operation, the coroutine continues its execution by calling resume() method.

Everything would be fine if it weren’t for the notorious multithreading. As is always the case, it brings some turbulence. That’s why the provided above approach will not work properly, which is clearly illustrated by the following diagram:

Asynchrony: Back to the Future - Figure 7

Right after the operation scheduling, we can call the handler that will continue the execution till we exit the coroutine. This certainly wasn’t in our plans. Therefore, we will have to complicate the sequence:

Asynchrony: Back to the Future - Figure 8

The difference lies in the fact that we run the scheduling not in the coroutine, but outside of it, which eliminates the option described above. At the same time, the continuation coroutine can occur in another thread, which is quite normal behavior. Coroutines are meant this way, so that we could displace it back and forth, saving the execution context.

Small Remark

Surprisingly enough, boost.asio already has the support for coroutines. To solve the mentioned above problem,io_service::strand is used but that’s another story. It’s always interesting to write something on its own. Besides, the result obtained in the article is much more convenient to use.

The Implementation

Let’s begin with the implementation of a go function first:

void go(Handler handler)
{
    LOG("synca::go");
    async::go([handler] {
        coro::Coro* coro = new coro::Coro(std::move(handler));
        onCoroComplete(coro);
    });
}

Instead of simply invoking the handler, we create a coroutine and run a handler inside of it. The onCoroComplete function is also of interest here. It monitors whether something should be scheduled:

typedef std::function CoroHandler;
TLS CoroHandler* t_deferHandler;
void onCoroComplete(coro::Coro* coro)
{
    VERIFY(!coro::isInsideCoro(), "Complete inside coro");
    VERIFY(coro->isStarted() == (t_deferHandler != nullptr), "Unexpected condition in defer/started state");
    if (t_deferHandler != nullptr)
    {
        LOG("invoking defer handler");
        (*t_deferHandler)(coro);
        t_deferHandler = nullptr;
        LOG("completed defer handler");
    }
    else
    {
        LOG("nothing to do, deleting coro");
        delete coro;
    }
}

Our actions are simple. We check whether there’s something to process. In case there is, we do it. If there isn’t, the coroutine completes its execution and we can delete it.

The following question arises: how do we fill in t_deferHandler? Like this:

TLS const Error* t_error;

void handleError()
{
    if (t_error)
        throw boost::system::system_error(*t_error, "synca");
}

void defer(CoroHandler handler)
{
    VERIFY(coro::isInsideCoro(), "defer() outside coro");
    VERIFY(t_deferHandler == nullptr, "There is unexecuted defer handler");
    t_deferHandler = &handler;
    coro::yield();
    handleError();
}

This function is always called inside the coroutine. Here, we pass a handler that will deal with scheduling of operations, that is, invoking asynchronies. This handler is preserved so that we could trigger it after exiting the coroutine(coro::yield). Right after we quit it, onCoroComplete is invoked, and it triggers our «deferred»handler. Here’s the usage of defer function by the example of Socket::accept:

void onComplete(coro::Coro* coro, const Error& error)
{
    LOG("async completed, coro: " << coro << ", error: " << error.message());
    VERIFY(coro != nullptr, "Coro is null");
    VERIFY(!coro::isInsideCoro(), "Completion inside coro");
    t_error = error ? &error : nullptr;
    coro->resume();
    LOG("after resume");
    onCoroComplete(coro);
}

async::IoHandler onCompleteHandler(coro::Coro* coro)
{
    return [coro](const Error& error) {
        onComplete(coro, error);
    };
}

void Acceptor::accept(Socket& socket)
{
    VERIFY(coro::isInsideCoro(), "accept must be called inside coro");
    defer([this, &socket](coro::Coro* coro) {
        VERIFY(!coro::isInsideCoro(), "accept completion must be called outside coro");
        acceptor.accept(socket.socket, onCompleteHandler(coro));
        LOG("accept scheduled");
    });
}

onCompleteHandler returns the asynchronous handler that processes the completion of the asynchronous operation. Inside the handler, the t_error error is stored, so that we could later throw an exception inside our coroutine (refer to handleError inside defer). Then, the execution of coro->resume() coroutine continues, meaning the return to defer method right after calling yield(). The diagram below shows the sequence of calls and the interaction between various entities:

Asynchrony: Back to the Future - Figure 9

Other functions are implemented in a similar way:

void Socket::readSome(Buffer& buffer)
{
    VERIFY(coro::isInsideCoro(), "readSome must be called inside coro");
    defer([this, &buffer](coro::Coro* coro) {
        VERIFY(!coro::isInsideCoro(), "readSome completion must be called outside coro");
        socket.readSome(buffer, onCompleteHandler(coro));
        LOG("readSome scheduled");
    });
}

void Socket::readUntil(Buffer& buffer, Buffer until)
{
    VERIFY(coro::isInsideCoro(), "readUntil must be called inside coro");
    defer([this, &buffer, until](coro::Coro* coro) {
        VERIFY(!coro::isInsideCoro(), "readUntil completion must be called outside coro");
        socket.readUntil(buffer, std::move(until), onCompleteHandler(coro));
        LOG("readUntil scheduled");
    });
}

void Socket::write(const Buffer& buffer)
{
    VERIFY(coro::isInsideCoro(), "write must be called inside coro");
    defer([this, &buffer](coro::Coro* coro) {
        VERIFY(!coro::isInsideCoro(), "write completion must be called outside coro");
        socket.write(buffer, onCompleteHandler(coro));
        LOG("write scheduled");
    });
}

In the implementation I use asynchronous objects async::Socket and async::Acceptor that have been described in the part about asynchronousness.

How To Use It

Let’s move on to the usage of our functionality. Everything is much simpler and smarter here:

Acceptor acceptor(8800);
LOG("accepting");
go([&acceptor] {
    while (true)
    {
        Socket* toAccept = new Socket;
        acceptor.accept(*toAccept);
        LOG("accepted");
        go([toAccept] {
            try
            {
                Socket socket = std::move(*toAccept);
                delete toAccept;
                Buffer buffer;
                while (true)
                {
                    buffer.resize(4000);
                    socket.readUntil(buffer, HTTP_DELIM_BODY);
                    socket.write(httpContent("

Hello synca!

")); } } catch (std::exception& e) { LOG("error: " << e.what()); } }); } }); dispatch();

The provided code resembles something… Right! It’s almost our synchronous code:

Sync vs Synca Code

There’s just one difference here. In the synchronous implementation, socket acceptance takes place in the main thread. That’s why there’s no dispatch. However, we could make these approaches absolutely identical. To do this, the synchronous implementation should also have socket acceptance in a separate thread with the help of go. As for the dispatch function, it would be waiting for the completion of all threads.

But the difference in the implementation is fundamentally important. The obtained code uses asynchronous network interaction, which makes it a much more effective implementation. Actually, that’s where our goal is achieved, as we wanted to create a symbiosis of the asynchronous and synchronous approaches, taking the best from both worlds (the synchronous approach simplicity and the asynchronous approach performance).

Enhancement

Asynchrony: Back to the Future - Figure 9

Let’s have a look how to enhance the process of accepting sockets. After accepting, there are usually two execution routes. The acceptor will continue to accept, and a new socket will be handled in a separate execution context. Therefore, we are going to create a new goAccept method:

async::IoHandler onCompleteGoHandler(coro::Coro* coro, Handler handler)
{
    return [coro, handler](const Error& error) {
        if (!error)
            go(std::move(handler));
        onComplete(coro, error);
    };
}

struct Acceptor
{
    typedef std::function Handler;
    // ...
};

void Acceptor::goAccept(Handler handler)
{
    VERIFY(coro::isInsideCoro(), "goAccept must be called inside coro");
    defer([this, handler](coro::Coro* coro) {
        VERIFY(!coro::isInsideCoro(), "goAccept completion must be called outside coro");
        Socket* socket = new Socket;
        acceptor.accept(socket->socket, onCompleteGoHandler(coro, [socket, handler] {
            Socket s = std::move(*socket);
            delete socket;
            handler(s);
        }));
        LOG("accept scheduled");
    });
}

So now our server will look like this:

Acceptor acceptor(8800);
LOG("accepting");
go([&acceptor] {
    while (true)
    {
        acceptor.goAccept([](Socket& socket) {
            try
            {
                Buffer buffer;
                while (true)
                {
                    buffer.resize(4000);
                    socket.readUntil(buffer, HTTP_DELIM_BODY);
                    socket.write(httpContent("

Hello synca!

")); } } catch (std::exception& e) { LOG("error: " << e.what()); } }); } }); dispatch();

Which is much easier to understand and use.

Question No.1. What about performance?

Unlike the asynchronous approach, additional overhead occurs here to create/switch contexts and related things.

At first, I wanted to test the limit loads but then it turned out that a Gigabit network (rather than a CPU) is fully loaded in one (!!!) thread. Therefore, I have carried out the following test:

  1. The server runs under the constant load of 30K RPS (30K requests per second).
  2. CPU load in case of async and synca.

The results are provided below:

Method Requests per Second Number of Threads Workload of the CPU Core
async 30000 1 75±5%
synca 30000 1 80±5%
It should be noted that the inaccuracy of obtained values is linked to the value fluctuations in the course of one test. Most likely, it is due to the unevenness of the channel load and processing.

Nevertheless, we can see that despite the additional switching contexts as well as throwing exceptions instead of using return codes (an exception is generated each time a socket closes, meaning each time on a new request), overheads are negligible. What if we add code that will honestly parse an HTTP message, and also code that won’t less honestly process requests and do something important and necessary? We could claim that there would be no difference in performance at all.

Question No.2. Okay, maybe. But is it possible to solve more complex asynchronous tasks this way?

Theorem. Any asynchronous task can be solved with the help of coroutines.

Asynchronous Programming

Proof.

First, let’s take a function that uses asynchronous calls. Any function can be converted to a coroutine as function is a special case of coroutines. Then, let’s take any asynchronous call in the converted coroutine. We can represent this call in the following form:

// code before the call
async(..., handler);
// code after the call

Let’s consider the case when there’s no code after the call:

// code before the call
async(..., handler);

In terms of the coroutine, such code is equivalent to the following:

// code before the call
synca(...);
handler();

Meaning that we’re calling a corresponding asynchronous async function inside synca that returns control to the coroutine upon the completion of the operation, and then _handler_() is explicitly called. The result is absolutely the same.

Now, we should consider a more general case, when we have code after the synchronous call. This code is equivalent to:

// code before the call
go {
    async(..., handler);
}
// code after the call

Using the fact that we now have no code after calling async inside go, we will get the following:

// code before the call
go {
    synca(...);
    handler();
}
// code after the call

This means that we have one asynchronous call less. Applying this approach to every asynchronous call of the function, and to every function, we will rewrite the entire code with coroutines. QED.

Summary

Asynchronous programming bursts into the life of programmers. Complications arising when they write code can drive even the most experienced ones crazy. However, we should not forget about the good old synchronous code. In smart hands, asynchronousness turns into smart coroutines.

In the next article, we’re going to review a much more complex example that will reveal all the power and potential of coroutines!

See you soon!

P.S. The entire code is here: bitbucket:gridem/synca

Comments

    3,751

    Ropes — Fast Strings

    Most of us work with strings one way or another. There’s no way to avoid them — when writing code, you’re doomed to concatinate strings every day, split them into parts and access certain characters by index. We are used to the fact that strings are fixed-length arrays of characters, which leads to certain limitations when working with them. For instance, we cannot quickly concatenate two strings. To do this, we will at first need to allocate the required amount of memory, and then copy there the data from the concatenated strings.