At the HighLoad++ conference in 2016, the development manager of “M-Tex” Vadim Madison talked about growth from the system, for which a hundred of microservices seemed a huge number, to a high-load project, in which a few thousands of microservices is a common thing.
I will tell you how we launched microservices on quite a high-load project. It’s rather an aggregate experience but since I work for M-Tex, let me tell you a couple of words about who we are.
To cut a long story short, we deal with video-output — we provide video in real time. We’re a video platform for “NTV-Plus” and “Match TV” channels. It’s 300,000 concurrent users. It’s 300 terabytes of content we output per hour. Quite an interesting task. How to maintain all of this?
What’s this story about? It’s about how our project grew and developed. It’s also about how rethinking of some of its parts and some interaction took place. One way or another, it’s about scaling a project, withstanding even more load and providing our clients with even more functionality without going down and losing key features. In general, we want our clients be satisfied. Well, it’s also a bit about the way we did it and how it all began.
The Very Beginning * 2 servers in a Docker cluster * DB runs on the same machines, in containers * There’s no dedicated storage * The infrastructure is minimal
This is the starting point, when there were 2 servers in the Docker cluster. At that time, databases used to run in the same cluster. There wasn’t anything dedicated in our infrastructure. The infrastructure was minimal.
If we look at what were the main peices of our infrastructure, then it’s Docker and TeamCity as a system we used to deliver code, build, and so on.
The next stage, — the one I call the middle of our path — was quite a serious growth of the project. We’ve already had 80 servers and have built a separate dedicated cluster for databases on special machines. We began to move towards distributed storage based on CEPH and started thinking about the fact that, perhaps, it’s time to reconsider the way our services interacted with each other, as well as it was time to change our monitoring system.
Now, let’s see what we’ve got here. There’re several hundreds of servers in the Docker cluster — hundreds of running microservices. For the moment, we begin to divide our system into some service subsystems at the level of data buses, at the level of logical separation of systems. When there became too many microservices, we decided to split our system so that we could maintain it better (as well as be able to better understand it in general).
This chart is a small part of our system. It’s a thing that allows us to cut video. I showed a similar chart half a year ago at “RIТ++”. There were 17 green microservices at the time. There are 28 green microservices today. It’s approximately a 1⁄20 of our system. Imagine our approximate scale.
One of the interesting things is the communication between our services. As a rule, the inter-services communication should be as efficient as possible. We also thought so and decided that protobuf was exactly what we needed.
It looked something like this:
There is a Load Balancer in front of all microservices. The request comes either to the front-end or services that directly represent the JSON API. protobuf is used for talking between all the internal services.
protobuf is quite a good thing. It really provides some compaction in messaging. There’re quite fast implementations today that allow us to serialize and deserialize data with minimal overhead. We can consider it a conditionally typed request.
But if we look at it in terms of microservices, it’s easy to notice that there’s some kind of a proprietary protocol between services. When there are 1, 2 or 5 services, you can simply run a console utility for each microservice, which will allow you to access a certain service and check what it returns. If there’s something wrong, you can always check the service. This somehow complicates the work with these services in terms of support.
Till some stage, this wasn’t a serious problem as there weren’t that many services. Plus, guys from Google have released gRPC. We saw that it did everything we needed at that moment. So, we slowly migrated to it. That’s how another piece appeared in the stack.
There’s also quite an interesting thing about the implementation. gRPC is based on HTTP/2, that works out of the box. If you don’t have a very dynamic environment, if your instances do not change and do not move to other machines, it can be quite a useful thing for you. Besides, it has support for lots of client- and server-side languages.
Now, let’s look at this in terms of microservices. On the one hand, gRPC is good, but on the other hand — it’s a thing in itself. Up to the point that when we started standardizing our logs in order to aggregate them in a single system, we faced the fact that it’s impossible to get logs directly from gRPC in a convenient form.
As a result, we’ve decided to write our own logging system. It performed parsing of messages and converted them to a required format. Only after we were able to do some decent logging. Besides, when you first describe a service and types to this service, and then compile them, this increases dependency of services on each other. It’s a certain problem for microservices, as well as some sort of complexity off versioning.
As you might have guessed, we began to think about JSON. For a long time, we couldn’t believe that after a compact, conditionally binary protocol we’ll get back to JSON. But one day, we came across an article by guys from DailyMotion who wrote almost about the same thing: “We know how to work with JSON, people from all over the world can work with it. Why create additional difficulties?”
As a result, we slowly migrated from gRPC to JSON in sort of our own implementation. Meaning yes, we kept HTTP/2 and took quite fast implementations to work with JSON.
So, we got all the features we have now. We can access our service via сURL. Our QA team uses Postman, so they are also happy. Everything has become simple at any stage of working with these services. This thing is a controversial decision on the one hand, and helps a lot on the other hand.
By and large, looking at JSON, the only drawback we can name right now is the insufficient compactness. The 30% that are statistically claimed to be the difference with MessagePack or anything else are not that big, according to our measurements. It’s also not always critical when we talk about a supported system.
Besides, we got extra features after the transition to JSON. Such as protocol versioning. At some point, there was a situation when we described protobuf through a new version of the protocol. So clients, the consumers of this particular service, also had to transit to it. Thus, if you have several hundreds services, even 10% of them have to make the transition. This is already a big cascade effect. You change something in one service, and 10 more need to be changed.
As a result, we faced a situation when the service developer has already released the fifth, sixth, seventh version but the actual load in production is still on the fourth one as developers of related services have their own deadlines and priorities. They simply have no ability to constantly rebuild the service and migrate to a new version of the protocol. Thus, new versions are released but they are not in demand. However, we have to fix bugs in old versions in strange ways. This complicated the support.
In the end, we have decided to stop producing new versions of protocols. We’ve provided a basic version, within which it’s possible to add some properties but in rather narrow limits. Services of consumers began to use the JSON schema.
It looks something like this:
Instead of 1, 2 and 3, we have version 1 and the schema that refers to it.
Here’s a typical response from one of our services. It’s the Content Manager. It returned information about the broadcast. Here’s, for example, a schema of one of the consumers.
The most interesting string here is the lowest one, with the required block. Looking at it, we’ll see that this service needs only 4 fields from all this data — id, content, date and status. If we actually apply this schema, the consumer service needs this data only.
They’re really available in every version, in every variation of the first version of the protocol. This fact has simplified the transition to new versions. We produced new releases, and migration of consumers to them was much simpler.
The next important thing arises when we talk about microservices or actually any system (the thing is that we can feel it stronger and faster in microservices). These are situations when the system becomes unstable.
It’s all good when you have a chain of calls for 1–2 services. You don’t see a global difference between a monolithic and distributed application. But when the chain grows to 5—7, something breaks down at some stage. You really don’t know why it had happened and what to do about it. Debugging is quite difficult in this situation. With the monolith, you can simply debug step by step and find an error, whereas such factors as network instability or unstable performance under load will actually take place when it comes down to microservices. At that point, in a distrubyted system with a bunch of nodes, such things become quite noticeable.
At first we went the classic route. We’ve decided to monitor everything and understand what breaks down and where it happens, and then try to quickly deal with it. We sent metrics from our microservices and collected them into a single database. Using Diamond, we collected system metrics. cAdvisor helped us to analyze resource usage and performance characteristics of running containers. All the results were stored in InfluxDB and built dashboards in Grafana.
So now we have three more pieces in our growing infrastructure.
Yes, we’ve become more aware of what is happening. We react faster to things that went down. But it did not stop falling apart.
Strange as it may seem, the main problem of the microservice architecture lies in the fact that you have services that are unstable. It can work today and won’t work tomorrow, and there can be plenty of reasons for this. Up to the point that your service is overloaded, and you are sending additional load to it. It goes down for a while. As it doesn’t serve anything for some time, load falls and the service goes up. This leads to a situation when it’s really hard to support such system and understand what’s wrong with it.
After all of that, we’ve decided that it’s better for this service to go down instead of such leaps back and forth. All of this has led us to changing the approach of how we implement our services.
Here’s one of the important things. We added a limit on the number of incoming requests to each service. Each service knew how many clients it was able to serve (we’ll talk about this in details). If the limit is reached, the service throws 503 Service Unavailable. The request accessing it understands that it should choose another node as this one cannot serve it.
By doing so, we reduce the request time in case there’s something wrong with the system. On the other hand, we increase its stability.
Now, the second pattern we introduced was Circuit Breaker. It’s the pattern we, roughly speaking, implement on the client’s side.
Given Service A, that has 4 instances of service B as possible points of access. It went to the registry and said: “Give me the addresses of these services”. He’s got a respond that there’re 4 of them. Service A goes to the first one. The first one responds that everything is fine. Service marks “yes, I can access it”. Then it goes to the second one that does not respond within the required time. We ban it for some time and move on to the next one. The latter returns, say, an incorrect protocol version, no matter why. We ban it, and go to the fourth one.
As a result, there’re 50% of services able to serve the client. The service will go to these two. It bans for some time the other two that did not satisfy its needs for some reason.
This allowed us to increase efficiently the stability of performance in general. If there’s something wrong with the service, we shut it down, an alert arises about the fact of the service being shut down, and then we try to find out what could have gone wrong.
In response to the introduction of the Circuit Breaker pattern, we had another thing in our infrastructure — Hystrix.
Guys from Netflix not only implemented support for this pattern, but also made it easy to understand if there’s something wrong with your system:
The size of the circle shows how much traffic you have relative to others. Color shows how good or bad the system feels. If you’ve got a green circle, then, probably, everything’s fine. If it’s red — something’s wrong.
Here is how it looks like when a service should be completely shut down. The circuit is open.
We have achieved that our system has become more or less stable. Each service has at least two instances so that we could switch shutting down one or two. But this hasn’t given us an understanding of what is happening with our system. How do we understand that something broke down during the request execution?
Here’s a standard request:
This is how the chain of execution looks like. A user sends request to the first service, then to the second one. From the second service, the chain distributes to the third and the forth ones.
Bam! And one of the branches is gone, for no obvious reason. After facing this situation and trying to understand how we could improve the visibility of the situation, we found such thing as Appdash. It’s a trace service.
Here’s how it looks like:
I should say that we just wanted to try it and understand whether it’s the right thing for us. It was really simple to implement it into our system as we were using Go by that time. Appdash had a ready-to-go library for us. We decided that it was good but the implementation itself wasn’t really suitable for us.
So, we decided to use Zipkin instead of Appdash. Guys from Twitter created it. It looks like this:
I think it’s a bit more clear. We can see that there’s a certain number of services. We can see how our request passes through the chain. We can also see what part the request consumes in each of the services. On the one hand, we can see some total time and division of services. On the other hand, no one prevents us from adding information about what happens inside the service.
So, some useful load, database calls, file system reads, accessing caches – you can add all of this here and see what part of your request could add most of the time for this request. TraceID allows us to perform it. I’ll talk about it later.
That’s how we began to understand what happened in a particular request and why it went down for a particular client. All was fine and then, all of a sudden, something’s wrong in one of them. We saw some basic context and understood what was happening with the service.
Not so long ago, a certain standard was developed for a tracing system. Just some kind of agreement between the main suppliers of tracing systems on how to implement the client API and client libraries in order to make this implementation as simple as possible. There’s already an implementation of OpenTracing for almost all basic languages. Feel free to use it.
We have learned to understand which of the services suddenly prevented us from serving the customer. We can see that one of the parts is struggling doing its work but it is not always clear why. Context is not enough.
We have logging. Yes, it’s pretty standard thing, it is Elasticsearch, Logstash, and Kibana (ELK). Perhaps, in a small variation by us.
We don’t build directly using lots of forward in the form of Logstash. We first pass it to syslog to help us aggregate logs on building machines. From there, we put it into ElasticSearch and Kibana via forward. Quite a standard process. What’s the trick?
The trick is as follows: wherever possible we began to add to Zipkin TraceID to these logs.
As a result, we can see the full execution context for a particular user on Kibana Dashboard. Obviously, if the service got into prod, it is sort of ready for operation. It has passed autotests, and the QA team have already looked at it, if necessary. It should work. If it doesn’t operate in a particular situation, then there apparently were some prerequisites. These prerequisites are mentioned in the detailed log we can see in such filtering for a certain trace for a certain request. They help us understand much faster what’s wrong in this particular situation. As a result, we have significantly reduced the time of understanding the causes of the problem.
The next interesting thing. We’ve introduced a dynamic debug mode. The number of logs is not huge now, about 100—150 gigabytes; I don’t remember the exact number. But it’s in the basic logging mode. If we write everything in more details, it would be terabytes and it would be extremely expensive to process them.
Therefore, when we see that there’s a problem, we go to certain services, turn on the debug mode (there is an API), and see what happens. Sometimes we take the problematic service out without shutting it down, turn on the debug mode on it, and then try to find out what was wrong with it.
In the end, this helps us from the point of view of the ELK stack that is quite resource-consuming. We also carry out the aggregation of errors on critical services. That is, the service understands what error is critical, and what error is not that critical, and then passes it all of this to Sentry.
It can aggregate errors smart enough, aggregate them according to certain metrics, as well as do some basic filtering. We use this on a number of services. Moreover, we have started to use it since the times when we had monolithic applications that are also here today.
Now the most interesting thing. How do we scale all of this? Some introduction is necessary here. We treat every our machine serving a project as a black box.
We have an orchestration system. We started with Nomad. Oh, actually, we started with Ansible, with our own scripts. At some point, this wasn’t enough. By that time, there was some version of Nomad that bribed us with its simplicity. We’ve decided that it’s the thing we could migrate to now.
So, all machines have become approximately the same. A machine has Docker, and it has the Consul and the Nomad agents. By and large, it’s a ready-made machine to be copied, and put into operation at the right time. When they become unnecessary, we can take them out of service. Especially if you have cloud, it will allow you to prepare the machine beforehand and turn it on during the peak times, and then turn it off when the load falls. It’s a fairly serious saving.
At some point, we have decided to transit from Nomad to Kubernetes, and Consul began to play the role of a central configuration system for our services with all the consequences.
So, there’s some stack to scale automatically. How do we do this?
Step 1. We introduced some limits on three characteristics: memory, CPU, network.
We’ve made three gradations for each of these variables. Cut some bricks. Here’s an example:
R3-C2-N1. We have limited a service and gave it a tiny bit of network, a bit more of CPU, and lots of memory. The service is really resource-consuming.
We introduce the mnemonics, as we can set dynamically particular values within quite a wide range called decision service in our system. At the moment, these values are approximately like this:
In fact, we also have C4 and R4, but these values go beyond the limits of these standards. They are negotiated separately. It looks something like this:
The next stage is preparatory. We look what type of scalability this service has.
The easiest type is when your service is completely independent. You can build this service linearly. When there are 2 times more users, you simply run 2 times more instances. Everything is fine again.
The second type is when your scalability depends on external resources. Roughly speaking, this service is included in the database. The database has a certain capability to serve some number of clients. You should take this into account. Either you should understand it when the system degradation begins and you won’t be able to add more instances or you should somehow understand how close this situation is for the moment.
The third and the most interesting option is when you’re limited by an external system. For example, external billing system. You know that it can’t serve more than 500 requests, even if you run 100 services. We need to take into account these limits. When we understand what type this service belongs to and set the appropriate flag, it’s time to see how all of this goes through our pipeline.
We have built and run some unit-tests on the CI server. We have run integration tests on the test environment, and our QA team have checked something. After this, we move on to the load testing in pre-production.
In case our service belongs to the first type, we take an instance and run it in this isolated environment, and give it the maximum load. After running a few rounds, we take the minimum number of the obtained values. We put it in InfluxDB and say that it is the limit possible for this service.
If our service belongs to the second type, we run these instances in an increment in a certain amount till we see that the system degradation has begun. We estimate how fast or slow it is. If we know some specific load on our systems, we decide whether it’s enough. Is there the stock we need? If there isn’t, we set an alert at this stage and do not release this service to production. We tell developers: “Guys, you either need to shard something or introduce another tool that will allow you to scale this service more linearly.”
Talking about a service of the third type, we know its limit and run one copy of our service. We also give load and see how many users this service can serve. If we know that, say, the limit of billing is 1,000 requests and 1 instance serves 200, we understand that 5 instances are the maximum it can serve correctly.
We have saved all this information in InfluxDB. The Decision service appears on the scene. It looks at two boundaries: the upper and the lower ones. When the upper boundary has been exceeded, it understands that it’s necessary to add instances or maybe even machine for these instances. The contrary is also true. When the load falls (at night), we don’t need so many machines and can reduce the number of instances on some services, as well as turn off the machines and thus save some money.
A general scheme looks like this:
Using its own metrics, each service regularly says what the current load on it is. The load goes to InfluxDB. When the Decision service sees we’re reaching the limit for the given version of this very instance, it gives the command to Nomad or Kubernetes that it’s necessary to add new instances. Perhaps, it initiates a new service in cloud before this, or maybe performs some other preparatory work. But the point is that it initiates the fact that you need to raise a new instance.
If we can see that the limit will be reached for some limited services, it raises a corresponding alert. Yes, we can not do anything about it, except for increasing the queue or something else. But at least we know that we can soon face this problem and get can begin to prepare for it now.
This is what I wanted to tell you about scalability in general things. And all of this has allowed us to take a look at one more thing — Gitlab CI.
We’ve traditionally built our services via TeamCity. At some point we realized that there’s one template for all services as each service is unique and it knows how to put itself in container. It has become quite difficult to produce all these projects, as there were lots of them. Turned out, it’s quite convenient to describe this in a yml file and place it together with the service itself. Therefore, we have implemented this thing to some very small extent for the moment but the possibilities are quite interesting.
And now, I’ll say something I wanted to tell myself when we started all of this.
When talking about the development of microservices, the first thing I would advise is to start with some orchestration system. It can be the simplest one, like Nomad, that you run with the
nomad agent -dev command and get a completely ready orchestration system with the running Consul, with Nomad itself, and all of this stuff.
This gives an understanding that you are working in some kind of black box. You try to avoid being tied to a particular machine and attached to the file system on a particular machine. Something like this, it immediately rearranges the way you thinking.
It the development stage, it should be included that each service has at least two instances, otherwise you won’t be able to shut down so easily and smoothly the problem services or other things that create difficulties for you.
I should also mention certain architectural things. In terms of microservices, one of the most important objects is the message bus.
Here’s a classic example: you have a user registration. How to make it in the simplest way? For the registration, it is necessary to create an account, create a user in billing, create an avatar for him, and something else. You have a certain number of services and receive a request to a super service that begins to distribute requests to its team. As a result, each time it knows more and more about what services it should trigger to complete the registration.
It’s much easier, more reliable and more efficient to do this in a different way. Leave 1 service that performs the registration. It registers a user. Then you throw an event to this common bus “I have registered a user, the ID is such-and-such, the minimum information is such-and-such”. All services that need this information will receive it. One will create an account in billing, and another one will send a welcome email.
As a result, your system will lose its solid consistency. You won’t have such super services that know everything about everyone. Actually, this makes it very easy to handle such a system.
Now I’ll tell something I’ve already mentioned before. Don’t try to fix these services. If there’s a problem with a certain instance, try to localize it, then move its traffic to other, perhaps, just raised instances. After this, you can find out what’s wrong. The viability of the system will significantly improve if you do so.
It goes without saying that you need to collect metrics in order to understand what’s going on with your system.
There’s an important thing to say here: if you do not understand some metric, if you don’t know how to use it, if it says nothing to you, then don’t collect it. Because at some point, there will be a billion of such metrics. You spend tons of CPU time on choosing what you need in them. You spend tons of time on filtering things you do not need. It’s just a dead-load.
If you realize that you need this metric — start collecting it. If you don’t need something — don’t collect it. This greatly simplifies the handling of this data as there becomes tons of it really fast.
If you see a problem, don’t hurry to fix every trifle. In most cases, the system itself should somehow react. You need an alert only when it requires your actions. If you don’t need to run somewhere at night to fix it, then it’s not an alert. It’s just a sort of a warning you have taken note of and will handle it under normal conditions.
Well, I guess that’s it. Thank you.