Asynchronous Programming Part 2: Teleportation through Portals
C++Finally, I have finished another article about asynchronous programming. It develops the ideas of the previous one [1]. Today we are going to discuss quite a difficult task that will reveal the power and flexibility of using coroutines in various nontrivial scenarios. At the end, we are going to consider two tasks on race-condition, and also a small bonus. By the way, the first article on asynchronous programming has become quite popular.
So, let’s get started!
The Task
The initial statement is quite plain and simple:
Receive a heavy object through the network and pass it to UI.
We are going to complicate the task by adding a few “interesting” requirements to the UI:
- An action is created from a UI thread via some event.
- The result must be returned back to UI.
- We don’t want to lock UI. Therefore, the operation should be performed asynchronously.
Let’s add some “fun” conditions for receiving an object:
- Objects will be cached — network operations are slow.
- We want to have a persistent cache so that objects are safe after the restart.
- For a better response time, we also want to cache objects in memory.
Now as for the performance aspects:
- Writing to a cache (both persistent and in-memory) should be parallel, not sequential.
- Reading from a cache should also be parallel. If the value has been found in one of the caches, we should use it immediately, without waiting for a response from another cache.
- Network operations should in no way interfere with caches. So, if caches are slow, this should not affect network interactions.
- It would also be good to support a large number of connections in a limited number of threads. That is, I want asynchronous network interaction for a more careful treatment of resources.
Let’s complicate it with some logic:
- We’ll need cancelable operation.
- In addition, if we have received our object through the network, then we shouldn’t cancel further operations for updating the cache. That is, we need to implement “cancelable cancel” for some set of actions.
If this isn’t hardcore enough, let’s add more requirements:
- Implement timeouts for operations. Moreover, timeouts should be for an entire operation as well as some parts of it. For example:
- timeout for the entire networking: connection, request, response;
- timeout for the entire operation, including networking and the work with the cache.
- Operation schedulers can be both our own and some other ones (e.g. the UI thread scheduler).
- No operations should be locked by threads. This means that the use of mutexes and other methods of synchronization are not allowed as they will lock our threads.
Now it seems enough. If someone can immediately come up with an answer, I will be glad to read it. As for now, I’m providing my own solution below. It’s obvious that I’m not going to emphasize the implementation of caches and persistency, but instead, I will highlight concrete parallel and asynchronous interaction, taking into account requirements for locks and schedulers.
The Solution
To solve this task, we are going to use the following pattern:
Let’s see what’s going on here:
- UI, Mem Cache, Disk Cache, Network — objects that perform corresponding operations on the newly created Handler.
- Handler is responsible for a simple sequence:
- In parallel, it runs an operation to get data from Mem Cache and Disk Cache objects. In the case of success (i.e. there’s a response with a found result from at least one cache), the result will be returned immediately. In the case of failure (as in the diagram), the execution continues.
- After waiting for no result from both of cashes, Handler tries to receive an object through the Network. For this purpose, we connect to the server (connect), send a request (send) and receive a response (receive). Such operations are performed asynchronously and do not lock other network interactions.
- The object received from Network component is written into both caches.
- After waiting for completion of writing to caches, a return of the value to a UI thread takes place.
- The program has the following schedulers and objects associated with them:
- A UI thread that initiates the asynchronous Handler operation. It is also the place where the result should return to.
- A common thread pool where all basic operations are performed, including Mem Cache и Disk Cache.
- A network thread pool for Network. It is created separately from the main thread pool so that the main pool load would not affect the network thread pool.
As I’ve said before, we will implement objects in a simple way as it doesn’t really matter for the aspects of asynchronous programming:
// stub: disc cache
struct DiskCache
{
boost::optional<:string> get(const std::string& key)
{
JLOG("get: " ();
}
void set(const std::string& key, const std::string& val)
{
JLOG("set: " get(const std::string& key)
{
auto it = map.find(key);
return it == map.end()
? boost::optional<:string>()
: boost::optional<:string>(it->second);
}
void set(const std::string& key, const std::string& val)
{
map[key] = val;
}
private:
std::unordered_map<:string std::string> map;
};
struct Network
{
// get an object via the network
std::string get(const std::string& key)
{
net::Socket socket;
JLOG("connecting");
socket.connect(address, port);
// the first byte is the size of the string
Buffer sz(1, char(key.size()));
socket.write(sz);
// next - the string
socket.write(key);
// get the size of the result
socket.read(sz);
Buffer val(size_t(sz[0]), 0);
// get the result itself
socket.read(val);
JLOG("val received");
return val;
}
private:
std::string address;
int port;
// ...
};
// UI-object: interaction with the UI
struct UI : IScheduler
{
void schedule(Handler handler)
{
// schedule an operation in the UI thread
// ...
}
void handleResult(const std::string& key, const std::string& val)
{
TLOG("UI result inside UI thread: "
As a rule, all UI frameworks contain a method that allows invoking necessary action in a UI thread (for example, in Android: Activity.runOnUiThread, Ultimate++: PostCallback, Qt: through the signal-slot mechanism). These are the methods to be used in the implementation of the UI::schedule method.
Initialization of all of this is in an imperative style:
// create a thread pool for common actions
ThreadPool cpu(3, "cpu");
// create a thread pool for network actions
ThreadPool net(2, "net");
// scheduler for the serialization of actions with the disc
Alone diskStorage(cpu, "disk storage");
// scheduler for the serialization of actions with memory
Alone memStorage(cpu, "mem storage");
// setting the scheduler by default
scheduler().attach(cpu);
// attaching the network service to the network pool
service().attach(net);
// attaching timeout handling to the common pool
service().attach(cpu);
// attaching a disc portal to a disc scheduler
portal().attach(diskStorage);
// attaching a memory portal to a corresponding scheduler
portal().attach(memStorage);
// attaching a network portal to a network pool
portal().attach(net);
UI& ui = single();
// attaching a UI portal to a UI scheduler
portal().attach(ui);
For some user action, we will perform the following in a UI thread:
go([key] {
// timeout for all operations: 1s=1000 ms
Timeout t(1000);
std::string val;
// get results from caches in parallel
boost::optional<:string> result = goAnyResult<:string>({
[&key] {
return portal()->get(key);
}, [&key] {
return portal()->get(key);
}
});
if (result)
{
// we've got an object
val = std::move(*result);
JLOG("cache val: " ()->get(key);
}
JLOG("net val: " ()->set(key, val);
}, [&key, &val] {
portal()->set(key, val);
}
});
JLOG("cache updated");
}
// move on to the UI and process the result
portal()->handleResult(key, val);
});
The Implementation of Primitives
As a careful reader might have noticed, I have used a considerable number of primitives, the implementation of which we can only guess. Therefore, I’m providing the description of used approach and classes. I think this will clarify what portals are and how we should use them, as well as answer the question about teleportation.
Waiting Primitives
Let’s start with the simplest thing: waiting primitives.
goWait: running an asynchronous operation and waiting for its completion
First, we will implement a function that will run an operation asynchronously and wait for its completion:
void goWait(Handler);
Invoking the handler in the current coroutine will be fine for the implementation, but it will not be enough in more complex scenarios. That’s why we are going to create a new coroutine for the implementation of this function:
void goWait(Handler handler) {
deferProceed([&handler](Handler proceed) {
go([proceed, &handler] { // create a new coroutine
handler();
proceed(); // continue the coroutine execution
});
});
}
Let’s describe in a few words what’s going on here. goWait function accepts a handler that should be invoked in a new coroutine. To perform necessary operations, we use deferProceed function that is implemented the following way:
typedef std::function ProceedHandler;
void deferProceed(ProceedHandler proceed) {
auto& coro = currentCoro();
defer([&coro, proceed] {
proceed([&coro] { coro.resume(); });
});
}
What does this function do? It actually wraps the defer call for a more convenient use (what defer is and why we should use it is described in my previous article). Namely, it accepts ProceedHandler that receives Handler as an input parameter to continue the coroutine execution. Actually, proceed stores a reference to the current coroutine and invokes coro.resume(). Thus, we encapsulate the entire work with coroutines, and the user has to work with proceed handler only.
Let’s get back to goWait function. Calling deferProceed, we have proceed that should be invoked upon the completion of the operation in handler. All we should do is create a new coroutine and trigger our handler handler in it. When the handler completes, we’ll immediately call proceed that will invoke coro.resume() inside itself. By doing this, we’ll continue the execution of the initial coroutine.
This provides us with waiting without locking the thread: calling goWait is like we put operations in the current coroutine on hold. When the passed handler completes, we will continue the execution as if nothing had happened.
goWait: running several asynchronous operations and waiting for their completion
Now, let’s implement a function that invokes a whole bundle of asynchronous operations and waits for their completion:
void goWait(std::initializer_list
Here the function accepts a list of handlers to be triggered asynchronously. That is, each handler will be invoked in its coroutine. A significant difference from the previous function lies in the fact that we should continue the execution of the initial coroutine only after completion of all handlers. Some may use various mutexes and condition variablesfor this purpose (some people actually implement like this!), but we shouldn’t do so (see the requirements). Therefore, we will be looking for other ways of implementation.
The idea is actually quite trivial. We should have a counter that will invoke proceed when a certain value is reached. Each handler will update the counter after its completion. Thus, the last handler will continue the execution of the coroutine. However, there’s a small complexity here: we must share the counter between the invoked coroutines. The last handler shouldn’t only call proceed but also remove this counter from memory. We can implement all of this the following way:
void goWait(std::initializer_list handlers)
{
deferProceed([&handlers](Handler proceed) {
std::shared_ptr proceeder(nullptr, [proceed](void*) { proceed(); });
for (const auto& handler: handlers)
{
go([proceeder, &handler] {
handler();
});
}
});
}
At the very beginning, we invoke the good old deferProceed with some magic hidden inside. Few people know that when building shared_ptr we can pass not only a pointer to a data, but deleter as well. The latter will delete an object by means of calling not delete ptr, but the handler. That’s where we will put the proceed call, so to continue the initial coroutine at the end. There is no need to delete the object itself as we put “nothing” there: nullptr. After that, everything is simple. We’ll loop through all handlers and invoke them in the created coroutines. There’s also something to mention here: we capture our proceeder by value, which will lead to its copy. This means our atomic reference counter inside shared_ptr will increase. When handler completes, our lambda with the captured proceeder will be removed, which will result in counter decrease. A handler that will decrease the counter to zero and remove proceeder object will invoke deleter for the shared_ptr, which means it will call coro.proceed() in the end.
For clarity, I’m providing a sequence of operations by the example of triggering two handlers in different threads:
Example: recursive parallelism with Fibonacci numbers
To illustrate it all, let’s take a look at the following example. Let’s say we’ve decided to calculate the Fibonacci series recursively and in parallel. No problem:
int fibo (int v)
{
if (v
I should say that a stack overflow will never occur here. Each call of fibo function takes place in its own coroutine.
Waiter: invoking several asynchronous operations and waiting for their completion
We often need to not just wait for a fixed set of handlers but also do something useful, and only then wait. Sometimes we don’t even know how many handlers we’re going to need. This means that we create them in the course of operations performance. In fact, we should operate with a group of handlers as a single entity. For this purpose we can use Waiter primitive with the following interface:
struct Waiter
{
Waiter& go(Handler);
void wait();
};
There are only two methods here:
- go: invoke another handler;
- wait: wait for all invoked handlers.
We can run the above methods several times during the lifetime of the Waiter object.
The idea of implementation is absolutely the same: we should have a proceeder, that will continue the execution of our coroutine. However, there appears one thing: now is proceeder shared between invoked coroutines and the Waiter object. Thus, at the moment of calling wait method we should get rid of the copy inside Waiter itself. Here’s how this can be done:
void Waiter::wait()
{
if (proceeder.unique())
{
// only Waiter owns proceeder =>
JLOG("everything done, nothing to do");
return;
}
defer([this] {
// move proceeder outside the coroutine
auto toDestroy = std::move(proceeder);
// the shared proceeder will be removed either here
// or in any coroutine of the handler
});
// proceeder has been deleted,
// let's restore it for later use
init0();
}
And again, we don’t need to do anything! Thanks for that to shared_ptr. Amen!
Example: recursive parallelism with Fibonacci numbers
To consolidate the material, let’s consider an alternative implementation of the things we want using Waiter:
int fibo (int v)
{
if (v
Another variant:
int fibo (int v)
{
if (v
Choose what you want.
goAnyWait: running several asynchronous operations and waiting for the completion of at least one of them
As before, we will run multiple operations in parallel. We are going to wait until at least one operation completes:
size_t goAnyWait(std::initializer_list
The function accepts a list of handlers and returns a number of a handler that was the first one to complete.
To implement this primitive, we are going to update our approach a little bit. Now we will share quite a specific atomic counter counter, instead of void* ptr == nullptr. At the very beginning, it is initialized by 0 value. At the end of execution, each handler increases the counter and if there occurs a change of value from 0 to 1, it will be the only one to call proceed():
size_t goAnyWait(std::initializer_list handlers)
{
VERIFY(handlers.size() >= 1, "Handlers amount must be positive");
size_t index = static_cast(-1);
deferProceed([&handlers, &index](Handler proceed) {
std::shared_ptr<:atomic>> counter =
std::make_shared<:atomic>>();
size_t i = 0;
for (const auto& handler: handlers)
{
go([counter, proceed, &handler, i, &index] {
handler();
if (++ *counter == 1)
{
// gotcha!
index = i;
proceed();
}
});
++ i;
}
});
VERIFY(index
As you might have guessed, we can also use this trick for cases when it is necessary to wait for two, three or more handlers.
goAnyResult: running several asynchronous operations and waiting to get at least one result
Now, let’s move on to the most interesting part that is essential for our task. Namely: run several operations and wait for the result we need. In addition, any handler can return no result. It will finish its execution but will say “I did the best I could”.
An additional complication occurs with this approach. All handlers can complete their execution but we didn’t get any result. First of all, we will have to check whether the necessary result has been obtained. Secondly, we will return an “empty” result. To indicate emptiness, we are going to use boost::optional. In addition, goAnyResult will be with the following simple prototype:
template
boost::optional goAnyResult(
std::initializer_list()
>
> handlers)
There’s nothing scary here: we just pass a list of handlers that optionally return our T_result. That is, handlers should have the following signature:
boost::optional
Comparing to the previous primitive, the situation is slightly different. The counter remains the same, but when we delete it, we must check counter. If we get 1 when increasing it, then an “empty” value should be returned, as the counter hasn’t been reset by anyone else before. Thus, we have a whole Counter object instead of a simple atomic value for counter:
template
boost::optional goAnyResult(
std::initializer_list()
>
> handlers)
{
typedef boost::optional Result;
typedef std::function ResultHandler;
struct Counter
{
Counter(ResultHandler proceed_) : proceed(std::move(proceed_)) {}
~Counter()
{
tryProceed(Result()); // we always proceed in destructor
}
void tryProceed(Result&& result)
{
if (++ counter == 1)
proceed(std::move(result));
}
private:
std::atomic counter;
ResultHandler proceed;
};
Result result;
deferProceed([&handlers, &result](Handler proceed) {
std::shared_ptr counter = std::make_shared(
[&result, proceed](Result&& res) {
result = std::move(res);
proceed();
}
);
for (const auto& handler: handlers)
{
go([counter, &handler] {
Result result = handler();
if (result) // try to proceed only if we have some result
counter->tryProceed(std::move(result));
});
}
});
return result;
}
The intrigue here is that std::move moves the result only when a condition inside tryProceed is performed. It’s all because std::move does not perform the move as it is, no matter how much we want this to happen. It is just a cast operation on references.
Alright, we’re good with waitings, and now let’s move on to schedulers and thread pools.
Scheduler, Pools, Synchronization
The Scheduler Interface
After reviewing the foundations, we will move on to the dessert.
Let’s introduce the scheduler interface:
struct IScheduler : IObject
{
virtual void schedule(Handler handler) = 0;
};
Its task is to execute handlers. Please note that the scheduler interface has neither timeouts nor cancelables, nor deferred operations. The scheduler interface should be crystal-clean so that we could easily join it with various frameworks (see [2]: i.e. Cancelables, actors, delays — all of that is very easy to use with UI schedulers)
Thread Pool
To perform various actions, we are going to need a thread pool that will implement the scheduler interface:
typedef boost::asio::io_service IoService;
struct IService : IObject
{
virtual IoService& ioService() = 0;
};
struct ThreadPool : IScheduler, IService
{
ThreadPool(size_t threadCount);
void schedule(Handler handler)
{
service.post(std::move(handler));
}
private:
IoService& ioService();
std::unique_ptr<:asio::io_service::work> work;
boost::asio::io_service service;
std::vector<:thread> threads;
};
What do we have here?
- A builder that will set the number of threads
- Implementation of the scheduler interface using boost::asio::io_service::post.
- A member of a work class, that holds io_service’s event processing loop. In case there are no events, the loop will terminate and threads will go poof.
- An array of threads.
In addition, our class implements (privately) some strange interface IService using ioService method that returns IoService, which is boost::asio::io_service. All of this looks really strange but I’ll try to explain what it’s all about.
The thing is, we need an advanced scheduler interface to work with network sockets and timeouts. Actually, this interface is hidden inside boost::asio::io_service. Other components that I’m going to use in the future should somehow get access to boost::asio::io_service instance. In order to prevent easy access to this class, I have introduced IService interface allowing to get the desired instance. However, this method is private in the implementation. This provides some level of protection from improper use, as in order to move this object outside, we should at first convert ThreadPool to IService, and then call the necessary method. Using friend classes could be an alternative here but I didn’t want to spoil ThreadPool with knowledge of possible use cases. That’s why I have decided that the applied approach is a reasonable price for encapsulation.
Class of the Coroutine
After introducing the thread pool and scheduler, it’s time to introduce another class for manipulations on coroutines. Strange as it may seem, it is called Journey (I’ll explain you later why):
struct Journey
{
void proceed();
Handler proceedHandler();
void defer(Handler handler);
void deferProceed(ProceedHandler proceed);
static void create(Handler handler, mt::IScheduler& s);
private:
Journey(mt::IScheduler& s);
struct CoroGuard
{
CoroGuard(Journey& j_) : j(j_) { j.onEnter0(); }
~CoroGuard() { j.onExit0(); }
coro::Coro* operator->() { return &j.coro; }
private:
Journey& j;
};
void start0(Handler handler);
void schedule0(Handler handler);
CoroGuard guardedCoro0();
void proceed0();
void onEnter0();
void onExit0();
mt::IScheduler* sched;
coro::Coro coro;
Handler deferHandler;
};
What’s so striking here?
- Private constructor. It is invoked by a static public method create.
- Journey contains the following inside itself: a pointer to sched scheduler, a coroutine itself and deferHandler that is invoked inside defer.
- CoroGuard is a proxy class. For each operation on a coroutine, it automatically performs onEnter0 action when entering the coroutine and onExit0 when exiting it.
To understand how this works, let’s take a look at the implementation of a few simple examples:
void Journey::schedule0(Handler handler)
{
VERIFY(sched != nullptr, "Scheduler must be set in journey");
sched->schedule(std::move(handler));
}
void Journey::proceed0()
{
// we use a guard to resume the coroutine
guardedCoro0()->resume();
}
Journey::CoroGuard Journey::guardedCoro0()
{
return CoroGuard(*this);
}
// we must use a scheduler to go back to a program
void Journey::proceed()
{
schedule0([this] {
proceed0();
});
}
// this is the handler, that returns control to a coroutine
Handler Journey::proceedHandler()
{
return [this] {
proceed();
};
}
// start a new coroutine
void Journey::start0(Handler handler)
{
schedule0([handler, this] {
// we use guards here once again
guardedCoro0()->start([handler] {
JLOG("started");
// let's not forget about exceptions
try
{
handler();
}
catch (std::exception& e)
{
(void) e;
JLOG("exception in coro: "
Now, let’s see how defer works:
void Journey::defer(Handler handler)
{
// preserve the handler
deferHandler = handler;
// and exit the current coroutine
coro::yield();
}
// deferProceed that has been used before
void Journey::deferProceed(ProceedHandler proceed)
{
defer([this, proceed] {
proceed(proceedHandler());
});
}
It’s simple! Now, we should understand where deferred deferHandler handlers are invoked.
TLS Journey* t_journey = nullptr;
void Journey::onEnter0()
{
t_journey = this;
}
void Journey::onExit0()
{
if (deferHandler == nullptr)
{
// no handler => no more actions, so we can selfdestruct
delete this;
}
else
{
// otherwise, perform a deferred operation
deferHandler();
deferHandler = nullptr;
}
// restore the value as we are now outside the coroutine
t_journey = nullptr;
}
Finally, let’s take a look at the implementation of a static function create:
void Journey::create(Handler handler, mt::IScheduler& s)
{
(new Journey(s))->start0(std::move(handler));
}
It is worth noting that the user has no way to explicitly create Journey. I mean he doesn’t even know this class exists. But let’s talk about it later, and now…
Teleportation
Let’s get to the most interesting part! Teleportation… We will talk about a primitive that can be implemented only using coroutines. This primitive is so powerful and so simple that we should take a closer look at it as it’s really interesting!
It would be easier to begin with the implementation first:
void Journey::teleport(mt::IScheduler& s)
{
if (&s == sched)
{
JLOG("the same destination, skipping teleport " name() "
Two things are done here:
- Check whether the coroutine scheduler differs from the scheduler that has been passed to the method. If they are equal, we don’t have to do anything as the scheduler is the one we need.
- Otherwise, we change the coroutine scheduler and reenter the coroutine. defer performs the function that results in exiting the coroutine and invoking the handler for a sooner continuation of the coroutine. However, we are going to use a new scheduler for the return. That’s why we will enter the coroutine in a new thread pool.
The diagram below illustrates the process of switching the coroutine execution from Scheduler/Thread to Scheduler2/Thread2:
What does this give us? In fact, it provides us with switching between thread pools and, by the way, between schedulers. In particular, we can switch between a UI thread and calculation threads, so that UI wouldn’t be slow:
auto result = someCalculations();
teleport(uiScheduler);
showResult(result);
teleport(calcScheduler);
auto newResult = continueSmartCalculations(result);
teleport(uiScheduler);
updateResult(newResult);
//…
That is, to go to UI, we should simply teleport to the required thread. Everything will be thread-safe, in accordance with requirements to the development of UI applications. It is possible to apply the same approach when it is necessary to go to, say, a thread pool or a database thread pool, or any place where we can use the scheduler interface.
Portals
Let’s move on to another interesting part. As you might have noticed, to update the state of a UI application, we had to perform teleportation first to a UI scheduler, and then back. To avoid doing this every time, let’s create a portal that will perform the inverse return automatically as soon as the necessary action completes.
struct Portal
{
Portal(mt::IScheduler& destination) :
source(journey().scheduler())
{
JLOG("creating portal " "
This means that we preserve a source (current scheduler of the coroutine) in the constructor, and then teleport the coroutine in the known direction. As for the destructor, teleportation to the initial scheduler takes place in it.
Thanks to such RAII idiom, we don’t have to worry that we can suddenly turn out to be in some other place (for instance, won’t perform some heavy calculations in a UI thread or in a network thread pool), everything will be performed automatically.
Let’s look at an example:
ThreadPool tp1(1, "tp1");
ThreadPool tp2(1, "tp2");
go([&tp2] {
Portal p(tp2);
JLOG("throwing exception");
throw std::runtime_error("exception occur");
}, tp1);
The coroutine starts in tp1, and then a portal is created, and switching to tp2 takes place. After an exception is generated, we call the portal destructor that actually freezes further exceptions, teleports to tp1 and continues the coroutine that will rethrow an exception in another thread. It’s all for free!
Let’s complicate the use of portals even more:
struct Scheduler
{
Scheduler();
void attach(mt::IScheduler& s)
{
scheduler = &s;
}
void detach()
{
scheduler = nullptr;
}
operator mt::IScheduler&() const
{
VERIFY(scheduler != nullptr, "Scheduler is not attached");
return *scheduler;
}
private:
mt::IScheduler* scheduler;
};
struct DefaultTag;
template
Scheduler& scheduler()
{
return single();
}
template
struct WithPortal : Scheduler
{
struct Access : Portal
{
Access(Scheduler& s) : Portal(s) {}
T* operator->() { return &single(); }
};
Access operator->() { return *this; }
};
template
WithPortal& portal()
{
return single>();
}
This allows us to attach portals to classes, like in the following example:
ThreadPool tp1(1, "tp1");
ThreadPool tp2(1, "tp2");
struct X
{
void op() {}
};
portal().attach(tp2);
go([] {
portal()->op();
}, tp1);
In the given example, we have attached portal x to thread pool tp2. Thus, during each call of the method of the only instance of class x (used in return &single()), the coroutine will switch to the required thread pool. Our execution context Journey will be traveling back and forth teleporting through portals of used objects!
This gives us an opportunity to monitor where we should call methods of our classes. Classes will take care of switching to the required thread and returning back. This is exactly the approach that has been used at the very beginning when we were solving the initial task. It allowed us to obtain high clarity of code and use all the power of coroutines, teleportation, and portals.
Non-locking Mutexes
Mutexes are often used for working with shared resources. Well, there’s no surprise here: such primitive is easy to use and it proves itself in most cases.
But what happens to a mutex when someone has already captured the resource? In this case, waiting on a mutex is observed till the moment the resource is available. In addition, the thread is locked and stops performing useful work.
What would we want? From the point of view of performance, we would like threads to be a bit more than fully involved, and we don’t want them to be distracted by waiting. “It shall be done” said the coroutine and smirked.
There are different ways to implement non-locking mutexes using coroutines. I’m going to apply an elegant method allowing to reuse the available functionality with a minimal number of additions. Let’s create a new scheduler for this purpose:
struct Alone : mt::IScheduler
{
Alone(mt::IService& service);
void schedule(Handler handler)
{
strand.post(std::move(handler));
}
private:
boost::asio::io_service::strand strand;
};
In the constructor of Alone, IService interface is used as an input parameter that allows to properly initialize io_service::strand from boost.asio. Actually, it is another boost.asio scheduler. It guarantees that no more than one scheduler will be invoked at the same time. This is precisely what we think a mutex is (mutual exclusion).
Since Alone idiom implements the scheduler interface, we can easily use all the power of our teleportations and portals as if it is necessary.
To consolidate the material, let’s consider the following code:
struct MemCache
{
boost::optional<:string> get(const std::string& key);
void set(const std::string& key, const std::string& val);
};
// initialization
ThreadPool common_pool(3); // common thread pool
Alone mem_alone(common_pool); // serialization of actions with memory
portal().Attach(mem_alone); // attaching the portal
// now, let’s perform the necessary operations
auto value = portal()->get(key);
// or
portal()->set(anotherKey, anotherValue);
Access to the object will be serialized automatically. At the same time, the thread will not be locked in case of simultaneous access to the object. What a magic!
External Events
It would be nice to perform operations regardless of the environment but life doesn’t care about our desired minimalism. Therefore, we have no choice but to move on, in spite of all difficulties and losses.
What awaits us in asynchronous programming? Actions that seem immutable today should be reconsidered a moment later with appropriate adjustments (see “cancelable”). That is, we want to cancel our actions depending on the current situation.
We should not only take into account the variable execution conditions but also have an opportunity to respond to network factors — properly handle timeouts. It’s good when we managed to get a result. But if a result has not been received in time, it may not need it anymore. There’s no use to learn the subject today if the exam was yesterday and we didn’t show up.
All these requirements are a heavy burden on existing frameworks. As a rule, programmers don’t give a damn about them and then deal with problems at the production when something hangs up, slows down, when resources are not available, and the action continues in spite of the useless result. Let’s try to find approach here.
First of all, we will introduce types of external events and associated exceptions:
enum EventStatus
{
ES_NORMAL,
ES_CANCELLED,
ES_TIMEDOUT,
};
struct EventException : std::runtime_error
{
EventException(EventStatus s);
EventStatus status();
private:
EventStatus st;
};
To control a coroutine from the outside, we need some object sharing the state between the calling and the called.
struct Goer
{
Goer();
EventStatus reset();
bool cancel();
bool timedout();
private:
struct State
{
State() : status(ES_NORMAL) {}
EventStatus status;
};
bool setStatus0(EventStatus s);
State& state0();
std::shared_ptr state;
};
Everything is quite trivial here. We have a smart state pointer we can check and change.
Let’s add event handling to our Journey class:
void Journey::handleEvents()
{
// callable from the destructor
if (!eventsAllowed || std::uncaught_exception())
return;
auto s = gr.reset();
if (s == ES_NORMAL)
return; // нет событий
throw EventException(s);
}
void Journey::disableEvents()
{
handleEvents();
eventsAllowed = false;
}
void Journey::enableEvents()
{
eventsAllowed = true;
handleEvents();
}
We should pay attention to adding a flag of whether we should handle the event or no. Sometimes we should perform some important actions before throwing an exception. A guard is meant for this purpose:
struct EventsGuard
{
EventsGuard(); // invokes disableEvents()
~EventsGuard(); // invokes enableEvents()
};
When do we call handleEvents? Here’s when:
void Journey::defer(Handler handler)
{
// add before exiting the coroutine
handleEvents();
deferHandler = handler;
coro::yield();
// and right after waking up
handleEvents();
}
That is, at the moment of any context switching, for example during an asynchronous operation or teleportation. When performing heavy synchronous operations we should add additional calls for handleEvents to our handlers for a faster response to events. This will solve the problem of operation responsiveness to external events.
Now, let’s implement the start of the coroutine:
Goer go(Handler handler, mt::IScheduler& scheduler)
{
return Journey::create(std::move(handler), scheduler);
}
Journey::create returns a shared Goer state to provide the reaction to external events:
struct Journey
{
// …
Goer goer() const
{
return gr;
}
// …
private:
// …
Goer gr;
};
Goer Journey::create(Handler handler, mt::IScheduler& s)
{
return (new Journey(s))->start0(std::move(handler));
}
// see our first task
Goer Journey::start0(Handler handler)
{
// …
return goer();
}
// A small example of use
Goer op = go(myMegaHandler);
// …
If (weDontNeedMegaHandlerAnymore)
op.cancel();
The state will be changed right after op.cancel() is called and the cancellation will start its work after the subsequent handleEvents() call.
As you might have noticed, the creation of Journey traveler that will go back and forth, teleporting through portals, takes place implicitly inside function go. As a result, the user does not even know that has to deal with a hidden object. He just separately calls go, defer, deferProceed, and other methods that by using TLS restore Journey’s instance inside themselves.
Handling Timeouts
Let’s take a look at the implementation of nested timeouts:
struct Timeout
{
Timeout(int ms);
~Timeout();
private:
boost::asio::deadline_timer timer;
};
We will use boost::asio::deadline_timer:
Timeout::Timeout(int ms) :
timer(service(), boost::posix_time::milliseconds(ms))
{
// get the current shared state
Goer goer = journey().goer();
// trigger the asynchronous handler
timer.async_wait([goer](const Error& error) mutable {
// mutable, since we do not change the captured state goer
if (!error) // if the timer has not been canceled, then time it out
goer.timedout();
});
}
Timeout::~Timeout()
{
// cancel the started timer
timer.cancel_one();
// check whether an event has happened
handleEvents();
}
Using the RAII idiom, you can nest timeout processing into each other independently, without any limits.
Here’s a trivial example:
// inside the coroutine
Timeout t(100); // 100 ms
for (auto element: container)
{
performOperation(element);
handleEvents();
}
Couldn’t do it 100 ms? Bye!
Another example illustrates another feature of nested timeouts:
// set 200 ms timeout on all operations
Timeout outer(200);
portal()->performOp();
{
// set the timeout 100 ms
// only for operations within visibility
Timeout inner(100);
portal()->performAnotherOp();
// let’s protect this operation from invasion
EventsGuard guard;
performGuardedAction();
}
Tasks
I’ve got two tasks on race condition. Actually, catching a race condition is a challenging task. Therefore, I’m giving you an opportunity to think about them.
What’s the purpose of this exercise? Let’s see:
- Behavior analysis will help us understand more precisely how coroutines work.
- Concurrency and asynchrony have specific consequences. It’s important to know about any pitfalls so that we would be ready for them.
- Finally, it’s a nice opportunity to stretch your brain. They say it works better afterwards.
Task 1
Task number 1.
Given a function for running a coroutine:
Goer Journey::start0(Handler handler)
{
schedule0([handler, this] {
guardedCoro0()->start([handler] {
JLOG("started");
try
{
handler();
}
catch (std::exception& e)
{
(void) e;
JLOG("exception in coro: "
It has a race condition. Where is it? What should we change to fix this fatal drawback?
The Answer
Goer Journey::start0(Handler handler)
{
+ Goer gr = goer();
schedule0([handler, this] {
guardedCoro0()->start([handler] {
JLOG("started");
@@ -121,7 +122,7 @@
JLOG("ended");
});
});
- return goer();
+ return gr;
}
Task 2
Same conditions. Given the code:
void Journey::onExit0()
{
if (deferHandler == nullptr)
{
delete this;
}
else
{
deferHandler();
deferHandler = nullptr;
}
t_journey = nullptr;
}
Where’s the mistake and how can we fix it?
The Answer
{
@@ -153,8 +154,8 @@
- deferHandler();
- deferHandler = nullptr;
+ Handler handler = std::move(deferHandler);
+ handler();
}
In addition to the problem of replacing defer handler, this handler can be destroyed twice.
Bonus: Garbage Collector (GC)
Yes, we’ll make the simplest GC on our coroutines. Let’s begin with the following example:
struct A { ~A() { TLOG("~A"); } };
struct B:A { ~B() { TLOG("~B"); } };
struct C { ~C() { TLOG("~C"); } }
;
ThreadPool tp(1, "tp");
go([] {
A* a = gcnew();
C* c = gcnew();
}, tp);
Output in the console:
tp#1: ~C
tp#1: ~B
tp#1: ~A
Take note of non-virtual destructors and a proper object destruction! Although some people say we should always use virtual destructors for inheritance.
As always, all the magic is hidden inside:
template
T* gcnew(V&&... v) {
return gc().add(new T(std::forward(v)...));
}
GC& gc() { return journey().gc; }
struct GC {
~GC()
{
// delete in reverse order
for (auto& deleter: boost::adaptors::reverse(deleters))
deleter();
}
template T* add(T* t)
{
// add a deleter of T type
deleters.emplace_back([t] { delete t; });
return t;
}
private:
std::vector deleters;
};
GC instance is stored inside Journey that will be destroyed when the coroutine completes. There’s a restriction: such objects should not be shared, and they can only be used inside a coroutine.
Summary
So, we have reviewed several extremely useful primitives for building quite complex applications:
- Non-locking primitives of waiting the completion of actions/results.
- Thread pools and schedulers.
- Non-locking synchronization.
- Teleportation, meaning switching between different schedulers.
- Portals. A powerful and flexible abstraction of performing actions in a specified environment: thread, pool thread, a group of threads, consistently in a group of threads, etc.
The suggested approaches can significantly simplify code, without sacrificing performance. Waiting primitives do not lock threads, which is good for processors. As for using non-locking mutexes, they bring synchronization up to an incredible level.
Portals help us to ignore requirements to the caller. It is also good when working in a heterogeneous environment: database, network, disk, UI, shared data, heavy computational operations. That is, when solving tasks, in which data processing and moving between various producers and consumers take place.
Actually, it’s only an introduction to asynchronous programming on coroutines. The most interesting things are yet to come! Hope this article has given you a programming pleasure.
Code github.com/gridem/Synca bitbucket.org/gridem/synca
Presentation C++ Party, Yandex (in Russian) tech.yandex.ru/events/cpp-party/march-msk/talks/1761
Presentation C++ User Group (in Russian) youtu.be/uUQX5QS1CCg habrahabr.ru/post/212793
Comments