Tuesday, November 22, 2016

Elasticsearch pipeline aggreagtions - monitoring used capacity

Lets say I want to setup a simple monitoring system for my desktop. The desktop uses LVM and has three volumes v1, v2 and v3, all belonging to vg1 volume group. I would like to monitor used capacity of these volumes, and the whole system, over time. It's easy to write a script that samples used capacity of the volumes and pushes it to ElasticSearch. All I need to store is:

{
  "name": "v1",
  "ts": 1479762877,
  "used_capacity": 1288404287488
}

OK, so I've put the script into cron to run every 5 minutes and the data starts pouring in. Lets do some BI on it! First thing to find out is how full my desktop is, i.e. the total capacity of all volumes. Sounds like a easy job for Kibana, isn't it? Well, not really.

Part 1: Naive failure

Let's say each of my volumes is ~1TB full. Trying to chart area viz in Kibana with Average aggregation over used_capacity returns useless results (click the below image to enlarge):

The real total system capacity is ~3TB, but Kibana, rightfully, shows that AVG(v1, v2, v3) => AVG(1TB, 1TB, 1TB) => 1TB. So may be I need Sum? Not good either:

I got ~17TB capacity number which not even close to reality. This happens because Kibana uses simple Date Histogram with nested Sum aggregation, i.e.

  • Divide selected date range into ts buckets. 30 minutes in my example.
  • Calculate Sum of used_capacity values of all documents that fall in bucket.
That's why the larger is the bucket, the more weird the results would look.

This happens because Kibana is only capable of either: $$ \underbrace{\text{SUM}\left(\begin{array}{c}v1, v1, v1,...\\ v2, v2, v2,...\\ v3, v3, v3,...\\ \end{array}\right)}_{ts\ bucket} \quad\text{or}\quad \underbrace{\text{AVG}\left(\begin{array}{c}v1, v1, v1,...\\ v2, v2, v2,...\\ v3, v3, v3,...\\ \end{array}\right)}_{ts\ bucket} $$ While what I need is: $$ \underbrace{\text{SUM}\left(\begin{array}{c}\text{AVG}(v1, v1, v1,...)\\ \text{AVG}(v2, v2, v2,...)\\ \text{AVG}(v3, v3, v3,...)\\ \end{array}\right)}_{ts\ bucket} $$ So how to achieve this?

Part 2: Poor man's solution

The post title promised pipeline aggregations and I'll get there. The problem with pipeline aggregations is that they are not supported in Kibana. So, is there still a way to get along with Kibana? - sort of. I can leverage on the fact that my sampling script takes capacity values of all volumes at exactly the same time, i.e. each bunch of volume metrics is pushed to ES with the same ts value. Now, if I force Kibana to use ts bucket length of 1 minute, I can guarantee that in any given bucket, I will only have documents belonging to a single sample batch (that's because I send measurements to ES every 5 minutes, which is much larger than the 1m bucket size).

One can argue that it generates LOTS of buckets - and he is right, but there is one optimization point to consider. ES Date histogram aggregation supports automatic pruning of buckets that do not have a minimum number of documents. The default is 0, which means empty buckets are returned, but Kibana wisely sets it to 1. Now lets say I want to see capacity data chart for last 7 days, which is 7*24*60=10080 points (buckets); however since I take measurements only every 5 minutes, most of the buckets will be pruned and we are left only with 2000, which is fare enough for Full HD screen. The nice side-effect of this is that it forces Kibana to draw really smooth charts :) Let's see it in action:

The above graph shows capacity data for last 7 days. The key point is to open and Advanced section of X-Axis dialog and put {"interval": "1m"} in JSON Input field - this overrides Kibana's automatic interval. The bottom legend, that says "ts per 3 hours", is lying, but it's the least of evils. Also note how smooth is the graph line.

Part 3: Pipeline aggregations!

The above solution works, but does not scale well beyond a single system - getting measurements from multiple systems at exactly the same time is tricky. Another drawback is that trying to looks at several months of data will result in tens of thousands of buckets which will burden both on ES, on the network and Kibana.

The right solution is to implement the correct formula. I need something like this:

SELECT AVG(used_capacity), ts FROM
    (SELECT SUM(used_capacity) AS used_capacity, DATE(ts) AS ts FROM capacity_history GROUP BY DATE(ts), name)
GROUP BY ts

Elasticsearch supports this since version 2.0 with Pipeline aggregations:

GET capacity_history/_search
{
  "size": 0,
  "aggs": {
    "ts": {
      "date_histogram": {"interval": "1h", "field": "ts"},
      "aggs": {
        "vols": {
          "terms": {"field": "name.raw", "size": 0},
          "aggs": {
            "cap": {
              "avg": {"field": "logical_capacity"}
            }
          }
        },
        "total_cap": {
          "sum_bucket": {
            "buckets_path": "vols>cap"
}}}}}}

Response

  "aggregations": {
    "ts": {
      "buckets": [
        {
          "key_as_string": "1479600000",
          "key": 1479600000000,
          "doc_count": 36,
          "vols": {
            "doc_count_error_upper_bound": 0,
            "sum_other_doc_count": 0,
            "buckets": [
              {
                "key": "v1",
                "doc_count": 12,
                "cap": {
                  "value": 1073741824000
                }
              },
              {
                "key": "v2",
                "doc_count": 12,
                "cap": {
                  "value": 1073741824000
                }
              },
              {
                "key": "v3",
                "doc_count": 12,
                "cap": {
                  "value": 1072459894784
                }
              }
            ]
          },
          "total_cap": {
            "value": 3219943542784
          }
        },
        ...
Since we only need ts bucket key and value of total_cap aggregation, we can ask ES to filter the results to include only the data we need. In case we have lots of volumes it can reduce the amount of returned data by orders of magnitude!
GET capacity_history/_search?filter_path=aggregations.ts.buckets.key,aggregations.ts.buckets.total_cap.value,took,_shards,timed_out
...
{
  "took": 92,
  "timed_out": false,
  "_shards": {
    "total": 70,
    "successful": 70,
    "failed": 0
  },
  "aggregations": {
    "ts": {
      "buckets": [
        {
          "key": 1479600000000,
          "total_cap": {
            "value": 3219943542784
          }
        },
        {
          "key": 1479603600000,
          "total_cap": {
            "value": 3220228083712
          }
        },
        ...
NOTE: I suggest always to return meta timed_out and _shards fields to make sure you do not get partial data.

This method is generic and will work regardless of time alignment of the samples; bucket size can be adjusted to return a same amount of data points. The major drawback is that it is not supported by stock Kibana and thus you will need your own custom framework to visualize this.