Metric Collection Stack for Distributed SystemsBack Blog
In our previous post we referred to the subject of having logs in a central place and viewing aggregated data from all the nodes, in order to figure out what has happened at a certain point in time throughout the system. In addition to log information, it is important to have all the metrics at your disposal, both at instance level (OS metrics from the system) and at application level (metrics of that application or tool exposes). This blog post presents our learning process, while working on a complex use case with tight SLA where every piece of information counts. We had a 9-node Cassandra cluster with 3 application nodes on top. Everything was deployed on AWS.
Types of Metrics
Each application or tool that you use, is deployed on some (AWS) instance. It is important to have instance metrics (OS metrics) that you can collect over time with a configurable resolution. When you use AWS you have CloudWatch at your disposal but the problem with it is the maximal resolution of 1 minute which is practically useless for use cases with high request rate (in our case 20k per second). Most of the time you need more frequent measurements but then you must pay attention not to consume too many resources to collect these metrics (and possibly slow down the instance).
The second type of metrics that you can collect is metrics closer to the application stack. Most of the libraries and tools nowadays have built-in metrics they are exposing, which provides valuable insights. In addition, you can generate application level metrics which provide important business value for the application (such as the rate of some method in the application being called, number of exceptions of certain type and so on).
In the distributed logging blog post we already presented a part of the stack we are using on the monitoring machine (an AWS EC2 instance dedicated to collect metrics data and monitor tasks). This can be seen on monitoring application we are provisioning using Ansible. Grafana is our choice to visualize data through time and correlate data from different data sources. We needed to add something that could collect metrics and store them, in addition to the existing ELK stack we use for collecting log information. We decided to go with Riemann, a tool that aggregates events coming from many nodes using Riemann client instances and collects them to a centralized place - Riemann server. It has nice performance characteristics and already built extensions for things that are interesting to us (OS metrics and Cassandra metrics).
After deciding on the way of collecting events, we needed to choose where to store them. We decided to go with InfluxDB, a database built specifically for storing time series data. It has a notion of measurement - a time series which are of same type and stores points inside it which are single measurements with value and timestamp. Riemann has a good support for InfluxDB integration so all events can be saved in InfluxDB with little or no manipulation of data. On the other hand, Grafana has a good support for InfluxDB integration - you can easily add it as data source and start querying and visualizing data.
I must note that, at the time we made the decision to go with Riemann, Telegraf did not gain popularity. Telegraf is an InfluxDB adapter with a lot of supported input sources, so you can just throw in Telegraf Cassandra adapter and ingest metrics from Cassandra to InfluxDB without Riemann in between. However, we were working on a Java application, which needed to be robust while sending lot of events per second and Riemann had nice filtering and throttling features and a decent Java client.
First, we decided to tackle the problem of OS metrics. As stated in the previous part, AWS CloudWatch metrics do not have the desired frequency and you had to pay to have more frequent measurements. Riemann tools has a nice selection of libraries for collecting stats, but we couldn’t run them as a single service and have more control over the measurement collection. We created the SmartCat OS metrics, Python application which collects the OS metrics we need and sends them with Riemann Python client to the monitoring machine. The focus was on obtaining precise metrics for block devices and network statistics apart from the standard cpu and memory measurements. It was fun to revisit /proc filesystem and format measurement data as we saw fit.
In order to understand our needs, it is important to explain the use case in more detail. We had a tight SLA, 99.999% of all the queries needed to be below a certain threshold. So we needed two metrics: how many queries we had above the certain threshold (slow queries) and the latency of those queries.
The application uses the Java driver and it has QueryLogger, a nice interface that is useful for query logging. Basically, you tell it which queries to log (normal or slow) and set a threshold for slow queries, and the driver starts producing a report which covers all queries above the given threshold. This is nice, it covers all queries above the threshold and it is good for the high nines use case. You need a certain amount of custom coding if you want those log messages plotted somewhere (i.e. Grafana), but you can always write your own implementation of LatencyTracker and send the metrics wherever you want. Since we have the Riemann server on the monitoring machine, with the Java Riemann client we decided to write RiemannLatencyTracker which reports slow queries as Riemann events to server, which then go to InfluxDB and from there we plot graphs on Grafana. This is all nice and exactly what we want from the application perspective, but we have an Amazon deployment and we are losing precious milliseconds between the cluster and driver, so we are basically blind for those slow queries shown on the driver but not seen at the cluster level (those are the ones showing problems with configuration, network, and hardware). We needed a better way of storing and representing slow queries both on the cluster and on the driver so we could correlate information from both sources to figure out the exact moments when we have problems and which need further investigation.
Cassandra is exposing a lot of useful metrics over JMX. Using the metrics reporter config with RiemannReporter, internal Cassandra metrics were shipped from every Cassandra node to the Riemann server. This way, we solved the metrics coming from the cluster level but only partially. Cassandra exposes latency measurements up to “only” four nines (99.99%). We needed five nines so we had to build this feature on our own and this is how Cassandra Diagnostics came to life. It is a java agent which hooks into the query path and measures latency, sending the metrics to an external reporter. We found Byte Buddy, a great project which enabled us to hook into the Cassandra code without changing the code itself, and create a wrapper around the query processor which would measure latency. We wrapped that into a project and placed that jar in Cassandra’s library folder. Full details of our project along with other diagnostic modules can be found on our Github page. In a nutshell, the project consists of a number of parts/modules. The diagnostics connector is a changeable component which you choose based on the Cassandra version you use (currently we support 2.1 and 3.x). The newest addition to the project is a driver connector which hooks into the query path at driver level and reuse logic for measuring slow queries on the application side as well. Full logic is wrapped in the diagnostics core module. The core is holding different submodules (slow query is just one of them) and it applies different logic for different modules reporting the result to a configured reporter. By default, slow queries are reported to a standard log output but you have an option to configure different reporters as well. Currently, we support two more reporter implementations - a Riemann reporter outputting slow queries as Riemann events and an InfluxDB reporter which sends measurement points directly into InfluxDB.
This brought us exactly to the place we wanted to be. We had all slow queries above certain threshold from the application machine reported by driver and all slow queries on cluster nodes reported by our Cassandra diagnostics tool and we could correlate these two measurements.
This is interesting, since we can spot all those slow queries seen at application level and not seen on cluster, which point out the problems with application machines. Those are timestamps that need further investigation, and you can correlate those with OS metrics and AWS metrics to figure out if there was a glitch in AWS, JVM, disk latency or a problem with the networking stack.
In distributed systems, it is hard to reason about problems especially if you are blindfolded. You have more than one place that can be the root cause. What is even worse is that sometimes, it can be a combination of events, or instances that are causing the problem (i.e. Cassandra node doing flush and blocking while application node is trying to send request). It is of utmost importance to have all the information at your disposal when the problem happens. It is even better to have alerting in place which can send a message when some measurements have out of the ordinary values but that could be a topic of another blog post. In depth measurements are now at the same place where proper testing was a couple of years ago: the first thing to get skipped when pushing deadlines. We think that more effort must be spent in creating a good monitoring stack in order to have a peace of mind when the system goes live.