Tuesday, November 22, 2016

Elasticsearch pipeline aggreagtions - monitoring used capacity

Lets say I want to setup a simple monitoring system for my desktop. The desktop uses LVM and has three volumes v1, v2 and v3, all belonging to vg1 volume group. I would like to monitor used capacity of these volumes, and the whole system, over time. It's easy to write a script that samples used capacity of the volumes and pushes it to ElasticSearch. All I need to store is:

{
  "name": "v1",
  "ts": 1479762877,
  "used_capacity": 1288404287488
}

OK, so I've put the script into cron to run every 5 minutes and the data starts pouring in. Lets do some BI on it! First thing to find out is how full my desktop is, i.e. the total capacity of all volumes. Sounds like a easy job for Kibana, isn't it? Well, not really.

Part 1: Naive failure

Let's say each of my volumes is ~1TB full. Trying to chart area viz in Kibana with Average aggregation over used_capacity returns useless results (click the below image to enlarge):

The real total system capacity is ~3TB, but Kibana, rightfully, shows that AVG(v1, v2, v3) => AVG(1TB, 1TB, 1TB) => 1TB. So may be I need Sum? Not good either:

I got ~17TB capacity number which not even close to reality. This happens because Kibana uses simple Date Histogram with nested Sum aggregation, i.e.

  • Divide selected date range into ts buckets. 30 minutes in my example.
  • Calculate Sum of used_capacity values of all documents that fall in bucket.
That's why the larger is the bucket, the more weird the results would look.

This happens because Kibana is only capable of either: $$ \underbrace{\text{SUM}\left(\begin{array}{c}v1, v1, v1,...\\ v2, v2, v2,...\\ v3, v3, v3,...\\ \end{array}\right)}_{ts\ bucket} \quad\text{or}\quad \underbrace{\text{AVG}\left(\begin{array}{c}v1, v1, v1,...\\ v2, v2, v2,...\\ v3, v3, v3,...\\ \end{array}\right)}_{ts\ bucket} $$ While what I need is: $$ \underbrace{\text{SUM}\left(\begin{array}{c}\text{AVG}(v1, v1, v1,...)\\ \text{AVG}(v2, v2, v2,...)\\ \text{AVG}(v3, v3, v3,...)\\ \end{array}\right)}_{ts\ bucket} $$ So how to achieve this?

Part 2: Poor man's solution

The post title promised pipeline aggregations and I'll get there. The problem with pipeline aggregations is that they are not supported in Kibana. So, is there still a way to get along with Kibana? - sort of. I can leverage on the fact that my sampling script takes capacity values of all volumes at exactly the same time, i.e. each bunch of volume metrics is pushed to ES with the same ts value. Now, if I force Kibana to use ts bucket length of 1 minute, I can guarantee that in any given bucket, I will only have documents belonging to a single sample batch (that's because I send measurements to ES every 5 minutes, which is much larger than the 1m bucket size).

One can argue that it generates LOTS of buckets - and he is right, but there is one optimization point to consider. ES Date histogram aggregation supports automatic pruning of buckets that do not have a minimum number of documents. The default is 0, which means empty buckets are returned, but Kibana wisely sets it to 1. Now lets say I want to see capacity data chart for last 7 days, which is 7*24*60=10080 points (buckets); however since I take measurements only every 5 minutes, most of the buckets will be pruned and we are left only with 2000, which is fare enough for Full HD screen. The nice side-effect of this is that it forces Kibana to draw really smooth charts :) Let's see it in action:

The above graph shows capacity data for last 7 days. The key point is to open and Advanced section of X-Axis dialog and put {"interval": "1m"} in JSON Input field - this overrides Kibana's automatic interval. The bottom legend, that says "ts per 3 hours", is lying, but it's the least of evils. Also note how smooth is the graph line.

Part 3: Pipeline aggregations!

The above solution works, but does not scale well beyond a single system - getting measurements from multiple systems at exactly the same time is tricky. Another drawback is that trying to looks at several months of data will result in tens of thousands of buckets which will burden both on ES, on the network and Kibana.

The right solution is to implement the correct formula. I need something like this:

SELECT AVG(used_capacity), ts FROM
    (SELECT SUM(used_capacity) AS used_capacity, DATE(ts) AS ts FROM capacity_history GROUP BY DATE(ts), name)
GROUP BY ts

Elasticsearch supports this since version 2.0 with Pipeline aggregations:

GET capacity_history/_search
{
  "size": 0,
  "aggs": {
    "ts": {
      "date_histogram": {"interval": "1h", "field": "ts"},
      "aggs": {
        "vols": {
          "terms": {"field": "name.raw", "size": 0},
          "aggs": {
            "cap": {
              "avg": {"field": "logical_capacity"}
            }
          }
        },
        "total_cap": {
          "sum_bucket": {
            "buckets_path": "vols>cap"
}}}}}}

Response

  "aggregations": {
    "ts": {
      "buckets": [
        {
          "key_as_string": "1479600000",
          "key": 1479600000000,
          "doc_count": 36,
          "vols": {
            "doc_count_error_upper_bound": 0,
            "sum_other_doc_count": 0,
            "buckets": [
              {
                "key": "v1",
                "doc_count": 12,
                "cap": {
                  "value": 1073741824000
                }
              },
              {
                "key": "v2",
                "doc_count": 12,
                "cap": {
                  "value": 1073741824000
                }
              },
              {
                "key": "v3",
                "doc_count": 12,
                "cap": {
                  "value": 1072459894784
                }
              }
            ]
          },
          "total_cap": {
            "value": 3219943542784
          }
        },
        ...
Since we only need ts bucket key and value of total_cap aggregation, we can ask ES to filter the results to include only the data we need. In case we have lots of volumes it can reduce the amount of returned data by orders of magnitude!
GET capacity_history/_search?filter_path=aggregations.ts.buckets.key,aggregations.ts.buckets.total_cap.value,took,_shards,timed_out
...
{
  "took": 92,
  "timed_out": false,
  "_shards": {
    "total": 70,
    "successful": 70,
    "failed": 0
  },
  "aggregations": {
    "ts": {
      "buckets": [
        {
          "key": 1479600000000,
          "total_cap": {
            "value": 3219943542784
          }
        },
        {
          "key": 1479603600000,
          "total_cap": {
            "value": 3220228083712
          }
        },
        ...
NOTE: I suggest always to return meta timed_out and _shards fields to make sure you do not get partial data.

This method is generic and will work regardless of time alignment of the samples; bucket size can be adjusted to return a same amount of data points. The major drawback is that it is not supported by stock Kibana and thus you will need your own custom framework to visualize this.

Thursday, July 14, 2016

You better have persistent storage for ElasticSearch master nodes

This is followup for my previous post about whether ElasticSearch master nodes should have persistent storage - they better do!. The rest of the post demonstrates how you can have spectacular data loss with ES if master nodes do not save their state to persistent storage.

The theory

Let's say you have the following cluster with single index (single primary shard). You also have an application that constantly writes data to the index

Now what happens if all your master nodes evaporate? Well, you relaunch them with clean disks. The moment masters are up, the cluster is red, since there are no data nodes, and your application can not index data.

Now data nodes start to join. In our example, the second one joins slightly before the first. What happens is that cluster becomes green, since fresh masters do not have any idea that there is other data node that has data and is about to join.

You application happily continues to index data, into newly created index on data node 2.

Now data nodes 1 joins - masters discover that they have some old version of our index and discard it. Data loss!!!

Sounds too esoteric to happen in real life? Here is sad&true story - back in a time we ran our ES master nodes in Kubernetes without persistent disk, i.e. on local EmptyDir volumes only. One day there was short network outage - for less than an hour. Kubelets lost connection to K8s master node and killed the pods. Once the network was back, the pods were started - with clean disk volumes! - and our application resumed running. The only catch is we've lost tons data :)

The reproduction

Let's try to simulate this in practice to see what happens. I'll use the minimal ES cluster by just running three ES instances on my laptop:

  • 1 master node that also servers as a client node
  • 2 data nodes. Lets call them dnode1 and dnode2

Open three shells and lets go:

  1. Start the nodes - each in separate shell
    Master:
    /usr/share/elasticsearch/bin/elasticsearch -Des.node.data=false -Des.node.master=true -Des.node.name=master-client --path.conf=/etc/elasticsearch --default.path.logs=/tmp/master-client/logs --default.path.data=/tmp/master-client
    
    Data 01:
    /usr/share/elasticsearch/bin/elasticsearch -Des.http.enabled=false -Des.node.data=true -Des.node.master=false -Des.node.name=data-01 --path.conf=/etc/elasticsearch --default.path.logs=/tmp/data-01/logs --default.path.data=/tmp/data-01
    
    Data 02:
    /usr/share/elasticsearch/bin/elasticsearch -Des.http.enabled=false -Des.node.data=true -Des.node.master=false -Des.node.name=data-02   --path.conf=/etc/elasticsearch --default.path.logs=/tmp/data-02/logs --default.path.data=/tmp/data-02
    
  2. Index a document:
    curl -XPUT 127.0.0.1:9200/users?pretty -d '{"settings": {"number_of_shards": 1, "number_of_replicas": 0}}'
    curl -XPUT 127.0.0.1:9200/users/user/1 -d '{"name": "Zaar"}'
    
  3. Check on which data node the index has landed. In my case, it was dnode2. Shutdown this data node and the master node (just hit CTRL-C in the shells)
  4. Simulate master data loss by issuing rm -rf /tmp/master-client/
  5. Bring master back (launch the same command)
  6. Index another document:
    curl -XPUT 127.0.0.1:9200/users?pretty -d '{"settings": {"number_of_shards": 1, "number_of_replicas":0}}'
    curl -XPUT 127.0.0.1:9200/users/user/2 -d '{"name": "Hai"}'
    

Now, while dnode2 is still down, we can see that index file exists in data directories of both nodes:

$ ls /tmp/data-0*/elasticsearch/nodes/0/indices/
/tmp/data-01/elasticsearch/nodes/0/indices/:
users

/tmp/data-02/elasticsearch/nodes/0/indices/:
users

However data on dnode2 is now in "Schrodinger's cat" state - neither dead, but not exactly alive either.

Let's bring back the node two and see what happens (I've also set gateway loglevel to TRACE in /etc/elasticsearch/logging.yml for better visibility):

$ /usr/share/elasticsearch/bin/elasticsearch -Des.http.enabled=false -Des.node.data=true -Des.node.master=false -Des.node.name=data-02   --path.conf=/etc/elasticsearch --default.path.logs=/tmp/data-02/logs --default.path.data=/tmp/data-02
[2016-07-01 17:07:13,528][INFO ][node                     ] [data-02] version[2.3.3], pid[11826], build[218bdf1/2016-05-17T15:40:04Z]
[2016-07-01 17:07:13,529][INFO ][node                     ] [data-02] initializing ...
[2016-07-01 17:07:14,265][INFO ][plugins                  ] [data-02] modules [reindex, lang-expression, lang-groovy], plugins [kopf], sites [kopf]
[2016-07-01 17:07:14,296][INFO ][env                      ] [data-02] using [1] data paths, mounts [[/ (/dev/mapper/kubuntu--vg-root)]], net usable_space [21.9gb], net total_space [212.1gb], spins? [no], types [ext4]
[2016-07-01 17:07:14,296][INFO ][env                      ] [data-02] heap size [990.7mb], compressed ordinary object pointers [true]
[2016-07-01 17:07:14,296][WARN ][env                      ] [data-02] max file descriptors [4096] for elasticsearch process likely too low, consider increasing to at least [65536]
[2016-07-01 17:07:16,285][DEBUG][gateway                  ] [data-02] using initial_shards [quorum]
[2016-07-01 17:07:16,513][DEBUG][indices.recovery         ] [data-02] using max_bytes_per_sec[40mb], concurrent_streams [3], file_chunk_size [512kb], translog_size [512kb], translog_ops [1000], and compress [true]
[2016-07-01 17:07:16,563][TRACE][gateway                  ] [data-02] [upgrade]: processing [global-7.st]
[2016-07-01 17:07:16,564][TRACE][gateway                  ] [data-02] found state file: [id:7, legacy:false, file:/tmp/data-02/elasticsearch/nodes/0/_state/global-7.st]
[2016-07-01 17:07:16,588][TRACE][gateway                  ] [data-02] state id [7] read from [global-7.st]
[2016-07-01 17:07:16,589][TRACE][gateway                  ] [data-02] found state file: [id:1, legacy:false, file:/tmp/data-02/elasticsearch/nodes/0/indices/users/_state/state-1.st]
[2016-07-01 17:07:16,598][TRACE][gateway                  ] [data-02] state id [1] read from [state-1.st]
[2016-07-01 17:07:16,599][TRACE][gateway                  ] [data-02] found state file: [id:7, legacy:false, file:/tmp/data-02/elasticsearch/nodes/0/_state/global-7.st]
[2016-07-01 17:07:16,602][TRACE][gateway                  ] [data-02] state id [7] read from [global-7.st]
[2016-07-01 17:07:16,602][TRACE][gateway                  ] [data-02] found state file: [id:1, legacy:false, file:/tmp/data-02/elasticsearch/nodes/0/indices/users/_state/state-1.st]
[2016-07-01 17:07:16,604][TRACE][gateway                  ] [data-02] state id [1] read from [state-1.st]
[2016-07-01 17:07:16,605][DEBUG][gateway                  ] [data-02] took 5ms to load state
[2016-07-01 17:07:16,613][INFO ][node                     ] [data-02] initialized
[2016-07-01 17:07:16,614][INFO ][node                     ] [data-02] starting ...
[2016-07-01 17:07:16,714][INFO ][transport                ] [data-02] publish_address {127.0.0.1:9302}, bound_addresses {[::1]:9302}, {127.0.0.1:9302}
[2016-07-01 17:07:16,721][INFO ][discovery                ] [data-02] elasticsearch/zcQx-01tRrWQuXli-eHCTQ
[2016-07-01 17:07:19,848][INFO ][cluster.service          ] [data-02] detected_master {master-client}{V1gaCRB8S9yj_nWFsq7uCg}{127.0.0.1}{127.0.0.1:9300}{data=false, master=true}, added {{data-01}{FnGrtAwDSDSO2j_B53I4Xg}{127.0.0.1}{127.0.0.1:9301}{master=false},{master-client}{V1gaCRB8S9yj_nWFsq7uCg}{127.0.0.1}{127.0.0.1:9300}{data=false, master=true},}, reason: zen-disco-receive(from master [{master-client}{V1gaCRB8S9yj_nWFsq7uCg}{127.0.0.1}{127.0.0.1:9300}{data=false, master=true}])
[2016-07-01 17:07:19,868][TRACE][gateway                  ] [data-02] [_global] writing state, reason [changed]
[2016-07-01 17:07:19,905][INFO ][node                     ] [data-02] started

At 17:07:16 we see the node found some data on it's own disk, but discarded it at 17:07:19 after joining the cluster. It's data dir is in fact empty:

$ ls /tmp/data-0*/elasticsearch/nodes/0/indices/
/tmp/data-01/elasticsearch/nodes/0/indices/:
users

/tmp/data-02/elasticsearch/nodes/0/indices/:

Invoking stat confirms that data directory was changed right after "writing state" message above:

$ stat /tmp/data-02/elasticsearch/nodes/0/indices/
  File: ‘/tmp/data-02/elasticsearch/nodes/0/indices/’
  Size: 4096            Blocks: 8          IO Block: 4096   directory
Device: fc01h/64513d    Inode: 1122720     Links: 2
Access: (0775/drwxrwxr-x)  Uid: ( 1000/ haizaar)   Gid: ( 1000/ haizaar)
Access: 2016-07-01 17:08:39.093619141 +0300
Modify: 2016-07-01 17:07:19.920869352 +0300
Change: 2016-07-01 17:07:19.920869352 +0300
 Birth: -

Conclusions

  • Masters' cluster state is at least as important as data. Make sure your master node disks are backed up.
  • If running on K8s - use persistent external volumes (GCEPersistentDisk if running on GKE).
  • If possible, pause indexing after complete master outages until all of the data nodes come back.

Tuesday, April 26, 2016

How Kubernetes applies resource limits

We are building one of our products on a cloud and decided to run it entirely on Kubernetes cluster. One of the big pains that is relieved by containers is resource separation between different processes (modules) of your system. Let's say we have a product that comprises of several services that talk to each other ("microservices" as it is now fashionably called). Before containers, or, to be more precise, before Linux kernel control groups were introduced, we had several options to try to ensure that they do not step on each other:

  • Run each microservice on a separate VM, which is usually wasteful
  • Play with CPU affinity for each microservice, on each VM - this saves you only from CPU hogs, but not from memory leeches, fork bombs, I/O swappers, etc.

This is where containers come into play - this allows you share your machine between different applications by allocating required portion of resources for each of them.

Back to Kubernetes

Kubernetes supports defining limit enforcement on two resource types: CPU and RAM. For each container you can provide requested, i.e. minimum required, amount of CPU and memory and a limit that container should not pass. Requested is also used for pod scheduling to ensure that a node will provide minimum amount of resources that pod requested. All these parameters are of course translated to docker parameters under the hood.

Since Kubernetes is quite a new gorilla in the block, I decided to test how enforcement behaves to get first hand experience with it.

So first I created a container cluster on GKE with Kubernetes 1.1.8:

gcloud container clusters create limits-test --machine-type n1-highcpu-4 --num-nodes 1

Now lets see what we got on our node (scroll right):

$ kubectl describe nodes
Non-terminated Pods:            (5 in total)
  Namespace                     Name                                                                    CPU Requests    CPU Limits      Memory Requests Memory Limits
  ─────────                     ────                                                                    ────────────    ──────────      ─────────────── ─────────────
  kube-system                   fluentd-cloud-logging-gke-limits-test-aec280e3-node-2tdw                100m (2%)       100m (2%)       200Mi (5%)      200Mi (5%)
  kube-system                   heapster-v11-9rqvl                                                      100m (2%)       100m (2%)       212Mi (5%)      212Mi (5%)
  kube-system                   kube-dns-v9-kbzpd                                                       310m (7%)       310m (7%)       170Mi (4%)      170Mi (4%)
  kube-system                   kube-ui-v4-7q12m                                                        100m (2%)       100m (2%)       50Mi (1%)       50Mi (1%)
  kube-system                   l7-lb-controller-v0.5.2-imjry                                           110m (2%)       110m (2%)       70Mi (1%)       120Mi (3%)
Allocated resources:
  (Total limits may be over 100%, i.e., overcommitted...)
  CPU Requests  CPU Limits      Memory Requests Memory Limits
  ────────────  ──────────      ─────────────── ─────────────
  720m (18%)    720m (18%)      702Mi (19%)     752Mi (21%)

That's quite interesting already - the minimal resource overhead of Kubernetes is 720 millicores of CPU and 702 megabytes of RAM (not including kubelet and kube-proxy of course). However second node and on will only run one daemon pod - fluentd for log collection, so the resource reservation will be significantly lower.

CPU

Kubernetes defines CPU resource as compressible, i.e. a pod can get larger part of CPU share if there is available CPU and this can be changed back on the fly, without process restart/kill.

I've created a simple CPU loader that calculates squares of integers from 1 to 1000 in loop on every core and prints loops/seconds number; packaged it into a docker image and launched into k8s using the following pod file:

apiVersion: v1
kind: Pod
metadata:
  name: cpu-small
spec:
  containers:
  - image: docker.io/haizaar/cpu-loader:1.1
    name: cpu-small
    resources:
      requests:
        cpu: "500m"

I've created another pod similar to this - just called it cpu-large. Attaching to pods shortly afterwards, I saw that they get a fair share of CPU:

$ kubectl attach cpu-small
13448 loops/sec
13841 loops/sec
13365 loops/sec
13818 loops/sec
14937 loops/sec

$ kubectl attach cpu-large
14615 loops/sec
14448 loops/sec
14089 loops/sec
13755 loops/sec
14267 loops/sec

That makes sense - they both requested only .5 cores and the rest was split between them, since nobody else was interested. So in total this ode can crunch ~30k loops/second. Now lets make cpu-large to be really large and reserve at least 2.5 cores for it by changing its requests.cpu to 2500m and re-launching it into k8s. According to our settings, this pod now should be able to crunch at least ~25k loops/sec:

$ kubectl attach cpu-large
23310 loops/sec
23000 loops/sec
25822 loops/sec
23834 loops/sec
25153 loops/sec
24741 loops/sec

And this is indeed the case. Lets see what happened to cpu-small:

$ kubectl attach cpu-small
30091 loops/sec
28609 loops/sec
30219 loops/sec
27051 loops/sec
27885 loops/sec
29091 loops/sec
28699 loops/sec
18216 loops/sec
4213 loops/sec
4188 loops/sec
4296 loops/sec
4347 loops/sec
4141 loops/sec

First it got all of the CPU while I was re-launching cpu-large, but once the latter was up, the CPU share for cpu-small was reduced. Together they will produce the same ~30k loops/second, but we now control the share ratio.

What about limits? Well, turns out that currently limits are not enforced. This is not a big problem for us, because in our deployment strategy we prefer to provide minimum required CPU share for every pod and for the rest - be my guest. However at this point I was glad I did this test, since the documentation was misleading with regards to CPU limits.

RAM

The RAM resource is uncompressible, because there is no way to throttle process on memory usage or ask it gently to unmalloc some of it. That's why if a process reaches RAM limit, it's simply killed.

To see how it's enforced in practice, I, again, created a simple script that allocates memory in chunks up to predefined layout.

First I've tested how requests.memory are enforced. I've created the following mem-small pod:

apiVersion: v1
kind: Pod
metadata:
  name: mem-small
spec:
  containers:
  - image: docker.io/haizaar/mem-loader:1.2
    name: mem-small
    resources:
      requests:
        memory: "100Mi"
    env:
    - name: MAXMEM
      value: "2147483648"

and launched it. I happily allocated 2GB of RAM and stood by. Then I created mem-large pod with similar configuration where requests.memory is set to "2000Mi". After I launched the large pod, the following happened:

  • cpu-large started allocating the desired 2GB RAM.
  • Since my k8s node only had 3.6GB RAM, system froze for dozen seconds or so.
  • Since there was no free memory in the system, kernel Out Of Memory Killer kicked in and killed mem-small pod:
[  609.739039] Out of memory: Kill process 5410 (python) score 1270 or sacrifice child
[  609.746918] Killed process 5410 (python) total-vm:1095580kB, anon-rss:1088056kB, file-rss:0kB

I.e. enforcement took place and my small pod was killed, since it consumed more RAM than requested and other pod was eligibly requesting memory. However such behavior is unsuitable in practice since it causes "stop-the-world" effect for everything that runs on particular k8s node.

Now lets see how resource.limits are enforced. To verify that, I've killed by of my pods, and changed mem-small as follows:

apiVersion: v1
kind: Pod
metadata:
  name: mem-small
spec:
  containers:
  - image: docker.io/haizaar/mem-loader:1.2
    name: mem-small
    resources:
      requests:
        memory: "100Mi"
      limits:
        memory: "100Mi"
    env:
    - name: MAXMEM
      value: "2147483648"

After launching it I saw the following on it's output:

Reached 94 megabytes
Reached 95 megabytes
Reached 96 megabytes
Reached 97 megabytes
Reached 98 megabytes
Reached 99 megabytes
Reached 99 megabytes
Reached 99 megabytes
Killed

I.e. The process was immediately killed after reaching its RAM limit. There is a nice evidence to that in dmesg output:

[  898.665335] Task in /214bc7c0bcdb1d0bf10b8ab4cff06b451850f9af0894472a403412ea295324ea killed as a result of limit of /214bc7c0bcdb1d0bf10b8ab4cff06b451850f9af0894472a403412ea295324ea
[  898.689794] memory: usage 102400kB, limit 102400kB, failcnt 612
[  898.697490] memory+swap: usage 0kB, limit 18014398509481983kB, failcnt 0
[  898.705930] kmem: usage 0kB, limit 18014398509481983kB, failcnt 0
[  898.713672] Memory cgroup stats for /214bc7c0bcdb1d0bf10b8ab4cff06b451850f9af0894472a403412ea295324ea: cache:84KB rss:102316KB rss_huge:0KB mapped_file:4KB writeback:0KB inactive_anon:4KB active_anon:102340KB inactive_file:20KB active_file:16KB unevictable:0KB
[  898.759180] [ pid ]   uid  tgid total_vm      rss nr_ptes swapents oom_score_adj name
[  898.768961] [ 6679]     0  6679      377        1       6        0          -999 sh
[  898.778387] [ 6683]     0  6683    27423    25682      57        0          -999 python
[  898.788280] Memory cgroup out of memory: Kill process 6683 (python) score 29 or sacrifice child

Conclusions

Kubernetes documentation is a bit misleading with regards to requests.limits.cpu. Nevertheless this mechanism looks perfectly useful for application. All of the code and configuration used in this post is available in the following gists:

Sunday, April 17, 2016

Kubernetes cluster access by fixed IP

If you:

  • Have Kubernetes cluster running in GKE
  • Connected GKE to your company network through VPN
  • Puzzled how to assign a fixed IP to particular k8s service

Then read on.

Prologue

The ideal solution would be to configure k8s service to use GCP LoadBalancer and have the latter to provide private IP only. However as of April 2016, LoadBalancers on GCP do not provide an option for private IP only, though GCP solution engineers said this feature "is coming".

Therefore the only option we have it to run a dedicated VM with fixed IP and proxy traffic through it.

The approach

Kubernetes service itself provides two relevant ways to access pods behind it:
ClusterIP
By default, every service has a virtual ClusterIP (which can be manually set to a predefined address) which can be used to access pods behind the service. However for this to work, a client has to have kube-proxy running on its host as explained here.
NodePort
A k8s service can be configured to expose a certain pod on every k8s node which will redirected to the service's pods (this comes on top of ClusterIP).

ClusterIP approach obviously is not feasible outside the h8s cluster, so we only have left with NodePort approach. The problem is that k8s node IPs are not static and may change. That's why we need a dedicated VM which has a fixed IP.

After we have a VM, we can either

  • Join it to the k8s cluster, so service's NodePort will be exposed on the VM's fixed IP as well.
  • Run a reverse HTTP proxy on VM to forward traffic on k8s nodes together with a script that monitors k8s nodes and updates proxy configuration when necessary.

I chose the second option because it allows a single VM to proxy requests for multiple k8s clusters and is easier to setup.

The setup

Create an instance

Lets create an VM and assign it a static IP. The below is my interpretation of the official guide.

Create an instance first:

gcloud compute instances create fixed-ip-proxy --can-ip-forward
The last switch is crucial here.

I chose IP for my testing cluster to be 10.10.1.1. Lets add it to the instance:

cat <<EOF >>/etc/network/interfaces.d/eth0-0
auto eth0:0
iface eth0:0 inet static
address 10.250.1.1
netmask 255.255.255.255
EOF

Now change /etc/network/interfaces and make sure that source-directory /etc/network/interfaces.d line comes last. Apply your new configuration by running:

sudo service networking restart

The final step is to instruct GCE to forward traffic destined to 10.250.1.1 to the new instance:

gcloud compute routes create fixed-ip-production \
                                --next-hop-instance fixed-ip-proxy \
                                --next-hop-instance-zone us-central1-b \
                                --destination-range 10.10.1.1/32

To add more IPs (adding dedicated IP per cluster is a good practice), add another file under /etc/network/interfaces.d/ and add a GCE route.

NGINX configuration

Install NGINX:
sudp apt-get install nginx

Install Google Cloud Python SDK:

sudo easy_install pip
sudo pip install --upgrade google-api-python-client

Now download the IP watcher script:

sudo wget -O /root/nginx-ip-watch https://gist.githubusercontent.com/haizaar/f19bdf9e5a6e278c57b96cce945b4fd9/raw/79f11225825607ba78ba84221d27439c1669a492/nginx-ip-watch
sudo chmod 755 /root/nginx-ip-watch

NOTE: You are downloading my script that will run as root on your machine - read its contents first!

Test the script:

$ sudo /root/nginx-ip-watch -h 
usage: Watch GKE node IPs for changes [-h] -p PROJECT -z ZONES
                                      name gke-prefix listen-ip listen-port
                                      target-port

positional arguments:
  name                  Meaningful name of your forwarding rule
  gke-prefix            GKE node prefix to monitor and forward to
  listen-ip             IP listen on
  listen-port           Port to listen on
  target-port           IP listen on

optional arguments:
  -h, --help            show this help message and exit
  -p PROJECT, --project PROJECT
                        Project to list instances for
  -z ZONES, --zones ZONES
                        Zones to list instances for

Now lets setup NGINX to listen for HTTP traffic on 10.10.1.1:5601 and forward it to GKE testing cluster nodes on port 30601 by adding the following to /etc/cron.d/nginx-ip-watch:

PATH=/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin

* * * * * root /root/nginx-ip-watch kibana-testing -p my-project -z us-central1-a gke-testing 10.10.1.1 5601 30601

After that, within one minute, your forwarding should be up and running. For more services, just keep adding more lines in the cron file. This will work well for a dozen or so services. After that, I would refactor the solution to issue only one gcloud compute instances list command per minute.

Since we are using NGINX in load-balancer mode, checking GKE hosts only once a minute is good-enough even during cluster upgrades - NGINX will detect and blacklist a shutting down GKE node by itself.

Epilogue

Create a snapshot of your instance to keep a backup of your work every time you change it. Don't forget to issue sync command on the system before taking snapshot of the disk.

Update

  • The first version of my script used gcloud command line util to fetch instances list. It turned out that gcloud performs logging to ~/.config/gcloud/logs and spits 500KB on every invocation. To mitigate this, I've updated my script to use Google Cloud Python SDK to bypass gcloud util completely.
  • As Vadim points out below, you can now specify fixed internal IP during instance creation time. Though you'll still need the setup above if you want to have more then one IP per instance.

Tuesday, April 5, 2016

Persistent storage for ElasticSearch master nodes?

ElasticSearch master nodes hold cluster state. I was trying to understand whether these nodes are required to have persistent storage or they can recover from whatever exists on data nodes? The short answer is: probably yes.

Update

You better have persistent disk for your ES data nodes - read here why.

Below I describe tests what I've done. But before that - some background on how did I get to this question at first place.

ElasticSearch on Kubernetes

We are working on running ElasticSearch 2.x on Kubernetes on Google Container Engine. There are two options to store a data for a container:
EmptyDir
Part of the local storage on Kubernetes node is allocated for the pod. If pod's controller restarts, the data survives. If pod is killed - the data is lost.
gcePersistentDisk
Compute engine persistent disk (created in advance) can be attached to a pod. The data persists. However there is a limitation - as of Kubernetes 1.2, a ReplicaSet can not attach a different disks to each pods that it creates, thus to run ElasticSearch data nodes, for example, you need to create a separate ReplicaSet (with size of 1) for each ES data node.

ES data nodes should have persistent disk - this is no brainer. However with regards to ES master nodes it's not clear. I've tried to understand where master nodes persist cluster state, and this thread states "on every node including client nodes". There is also a resolved issue about storing index metadata on data nodes.

Run, Kill, Repeat

So lets how it behaves in reality.

I created two node Kubernetes 1.2 cluster running n1-standard-2 instances (2 CPUs, 7.5GB RAM). And used Paulo Pires Kubernetes setup for ElasticSearch:

$ git clone https://github.com/pires/kubernetes-elasticsearch-cluster.git
$ cd kubernetes-elasticsearch-cluster
$ vim es-data-rc.yaml  # set replicas to 2
$ vim es-master-rc.yaml  # set replicas to 3
$ vim es-svc.yaml  # set type to ClusterIP

Lets launch it in the air:

$ for i in *.yaml; do kubectl create -f $i; done
$ sleep 1m; kubectl get pods
NAME              READY     STATUS    RESTARTS   AGE
es-client-ats2b   1/1       Running   0          1h
es-data-teodq     1/1       Running   0          1h
es-data-zwml2     1/1       Running   0          1h
es-master-3bosq   1/1       Running   0          1h
es-master-a47om   1/1       Running   0          1h
es-master-c1dy1   1/1       Running   0          1h

We are all good. Lets ingest some data and alter cluster settings:

$ CLIENTIP=$(kubectl describe pods es-client |grep '^IP' |head -n 1|awk '{print $2}')
$ curl -XPUT $CLIENTIP:9200/_cluster/settings?pretty -d '{"transient": {"discovery.zen.minimum_master_nodes": 2}}'
{
  "acknowledged" : true,
  "persistent" : { },
  "transient" : {
    "discovery" : {
      "zen" : {
        "minimum_master_nodes" : "2"
      }
    }
  }
}
$ curl -XPUT $CLIENTIP:9200/tweets/tweet/1 -d '{"foo": "bar"}'          
{
  "_id": "1",
  "_index": "tweets",
  "_shards": {
    "failed": 0,
    "successful": 2,
    "total": 2
  },
  "_type": "tweet",
  "_version": 1,
  "created": true
}
$ curl $CLIENTIP:9200/_cluster/health?pretty
{
  "cluster_name" : "myesdb",
  "status" : "green",
  "timed_out" : false,
  "number_of_nodes" : 6,
  "number_of_data_nodes" : 2,
  "active_primary_shards" : 0,
  "active_shards" : 0,
  "relocating_shards" : 0,
  "initializing_shards" : 0,
  "unassigned_shards" : 0,
  "delayed_unassigned_shards" : 0,
  "number_of_pending_tasks" : 0,
  "number_of_in_flight_fetch" : 0,
  "task_max_waiting_in_queue_millis" : 0,
  "active_shards_percent_as_number" : 100.0
}

The data is there and the cluster is green. Now lets kill all of the masters and recreate them (their data disk will be lost):

$ kubectl delete -f es-master-rc.yaml 
replicationcontroller "es-master" deleted
$ kubectl create -f es-master-rc.yaml       
replicationcontroller "es-master" created

After dozens of seconds, new masters will be up again and we'll see the following in the leader's log:

[2016-04-04 15:57:44,880][INFO ][cluster.service          ] [Elaine Grey] new_master {Elaine Grey}{5NIL5jBbTYadefGzjDLb5A}{10.224.1.7}{10.224.1.7:9300}{data=false, master=true}, added {{Lyja}{CUyGl7w-R86qcOsNSj0xPA}{10.224.1.4}{10.224.1.4:9300}{master=false},{Typhoid Mary}{cWmlEtHuSdCImjHMNM6FsA}{10.224.1.3}{10.224.1.3:9300}{master=false},{Slug}{fQPe2C1FSH2UkuveBFJtbw}{10.224.1.5}{10.224.1.5:9300}{data=false, master=false},}, reason: zen-disco-join(elected_as_master, [0] joins received)
[2016-04-04 15:57:45,056][INFO ][node                     ] [Elaine Grey] started
[2016-04-04 15:57:45,058][INFO ][cluster.service          ] [Elaine Grey] added {{Stacy X}{U4sn1pkWRlGV-zVMW1OeAA}{10.224.1.8}{10.224.1.8:9300}{data=false, master=true},}, reason: zen-disco-join(pending joins after accumulation stop [election closed])
[2016-04-04 15:57:45,711][INFO ][gateway                  ] [Elaine Grey] recovered [0] indices into cluster_state
[2016-04-04 15:57:45,712][INFO ][cluster.service          ] [Elaine Grey] added {{Kiss}{MaNkKlQWR82QHFVhz38Ohg}{10.224.1.6}{10.224.1.6:9300}{data=false, master=true},}, reason: zen-disco-join(join from node[{Kiss}{MaNkKlQWR82QHFVhz38Ohg}{10.224.1.6}{10.224.1.6:9300}{data=false, master=true}])
[2016-04-04 15:57:46,077][INFO ][gateway                  ] [Elaine Grey] auto importing dangled indices [tweets/OPEN] from [{Lyja}{CUyGl7w-R86qcOsNSj0xPA}{10.224.1.4}{10.224.1.4:9300}{master=false}]
[2016-04-04 15:57:47,073][INFO ][cluster.routing.allocation] [Elaine Grey] Cluster health status changed from [RED] to [YELLOW] (reason: [shards started [[tweets][3]] ...]).
[2016-04-04 15:57:47,567][INFO ][cluster.routing.allocation] [Elaine Grey] Cluster health status changed from [YELLOW] to [GREEN] (reason: [shards started [[tweets][3]] ...]).
[2016-04-04 15:58:16,355][INFO ][io.fabric8.elasticsearch.discovery.kubernetes.KubernetesDiscovery] [Elaine Grey] updating discovery.zen.minimum_master_nodes from [-1] to [2]

So we see that:

  • The new master recovered 0 indices from cluster state - i.e. the cluster state was indeed lost.
  • The new master auto imported existing indices, which were "dangling" in ES terminology.
  • It also restored our transient quorum setting

And our cluster is green:


$ curl $CLIENTIP:9200/_cluster/health?pretty
{
  "cluster_name" : "myesdb",
  "status" : "green",
  "timed_out" : false,
  "number_of_nodes" : 6,
  "number_of_data_nodes" : 2,
  "active_primary_shards" : 5,
  "active_shards" : 10,
  "relocating_shards" : 0,
  "initializing_shards" : 0,
  "unassigned_shards" : 0,
  "delayed_unassigned_shards" : 0,
  "number_of_pending_tasks" : 0,
  "number_of_in_flight_fetch" : 0,
  "task_max_waiting_in_queue_millis" : 0,
  "active_shards_percent_as_number" : 100.0
}

Now lets kill master without destroying their disks and see it behaves any differently:

$ for pod in $(kubectl get pods |grep es-master |awk '{print $1}'); do do kubectl exec $pod killall java & done
The master log shows the following:
[2016-04-04 16:33:55,695][INFO ][cluster.service          ] [Neptune] new_master {Neptune}{51QU1wf4T2Ky9QUzK5MkEw}{10.224.1.7}{10.224.1.7:9300}{data=false, master=true}, added {{Slug}{fQPe2C1FSH2UkuveBFJtbw}{10.224.1.5}{10.224.1.5:9300}{data=false, master=false},{Typhoid Mary}{cWmlEtHuSdCImjHMNM6FsA}{10.224.1.3}{10.224.1.3:9300}{master=false},{Lyja}{CUyGl7w-R86qcOsNSj0xPA}{10.224.1.4}{10.224.1.4:9300}{master=false},{Comet Man}{cOLRxsOKTC2OYR6Kuiplxw}{10.224.1.6}{10.224.1.6:9300}{data=false, master=true},}, reason: zen-disco-join(elected_as_master, [1] joins received)
[2016-04-04 16:33:55,867][INFO ][node                     ] [Neptune] started
[2016-04-04 16:33:56,428][INFO ][gateway                  ] [Neptune] recovered [1] indices into cluster_state
[2016-04-04 16:33:57,233][INFO ][cluster.routing.allocation] [Neptune] Cluster health status changed from [RED] to [YELLOW] (reason: [shards started [[tweets][0], [tweets][0]] ...]).
[2016-04-04 16:33:57,745][INFO ][cluster.routing.allocation] [Neptune] Cluster health status changed from [YELLOW] to [GREEN] (reason: [shards started [[tweets][4]] ...]).
[2016-04-04 16:34:01,417][INFO ][cluster.service          ] [Neptune] added {{Nicole St. Croix}{5VcQT4H8RHeAf3R0PW3K4A}{10.224.1.8}{10.224.1.8:9300}{data=false, master=true},}, reason: zen-disco-join(join from node[{Nicole St. Croix}{5VcQT4H8RHeAf3R0PW3K4A}{10.224.1.8}{10.224.1.8:9300}{data=false, master=true}])
[2016-04-04 17:01:55,811][INFO ][io.fabric8.elasticsearch.discovery.kubernetes.KubernetesDiscovery] [Neptune] updating discovery.zen.minimum_master_nodes from [-1] to [2]

So 1 index was recovered from the cluster state and there are no dangling indices this time.

Conclusions

While master nodes were able to recover both indices and cluster transient settings, I was testing only the most simple scenario. This is not enough to take a decision on whether we should maintain persistent disks for master nodes. On the other hand, if we do have persistent disks for master - do we need to backup the metadata? And what about ES 5.x? - one of the promised features is that master will hold an ID of the latest index change to prevent stale data nodes becoming primaries during network partitioning. This kind of metadata can not be stored on data nodes.

I'll update this post when I'll have the answers.

Friday, March 25, 2016

Caveat with ElasticSearch nGram tokenizer

Finally got some time to blog about ElasticSearch. I use it extensively during the last two years, but my findings are rather lengthy. Finally I've got something small to share.

ElasticSearch nGram tokernizer is very useful for efficient substring matching (at cost of index size of course). For example, I have an event message field like this

/dev/sda1 has failed due to ...
and I would like to find all events of failure for all SCSI disks. One option is to store message field as not analyzed string (i.e. a one single term) and use wildcard query:
GET /events
{
  "query": {
    "wildcard": {
      "message.raw": {
        "value": "/dev/sd?? has failed*"
      }
    }
  }
}
This will do the work perfectly, but to complete it, ElasticSearch will scan every value of message field looking for the pattern during search time. Once number of documents gets big enough, it will become slow.

One solution is to split message to substrings during indexing time, with (2,20) for (min, max) in our example:

# Analyzer definition in settings
"analysis": {
    "analyzer": {
        "substrings": {
            "tokenizer": "standard",
            "filter": ["lowercase", "thengram"]
        }   
    },  
    "filter": {
        "thengram": {
            "type": "nGram",
            "min_gram": 2,
            "max_gram": 20
        }   
    }   
} 

# message field definition in mappings
"message": {
    "type": "string",
    "index": "analyzed",
    "analyzer": "substrings"
}
and use match_phrase query:
GET events/_search
{
  "query": {
    "match": {
      "message": {
        "query": "dev sd has failed",
        "type": "phrase",
      }
    }
  }
}

The caveat

The above query will return weirdly unrelevant results and, at first glance, it's not obvious why. The caveat is, that our custom analyzer is applied both during indexing and search. So instead of searching for sequence of terms "dev", "sd", "has", "failed"; we are searching for sequence "de", "ev", "dev", "sd", "ha", "as", "has", etc. To fix this we need to tell Elastic to use different tokernizer during search (and search only). This can be done either by adding "analyzer": "standard" to query itself (which is error phone, since can be easily forgotten) or specified in mapping definition:
"message": {
    "type": "string",
    "index": "analyzed",
    "analyzer": "substrings",
    "search_analyzer": "standard"
}

Worth it?

I took 1,000,000 events sample data and run both wildcard and phrase queries that match 1,000 doc subset out of it. While for such a small data set, both are fast, the difference it quite striking nevertheless:
  • wildcard query - 30ms
  • phrase query - 5ms

Times 6 speed up! Another bonus for using phrase query is that you can get results highlighting (that is not supported for wildcard queries).

Thursday, February 25, 2016

Patching binaries

My friend asked for help - he has a legacy system that he wants to migrate to new hardware. His Linux OS is 10 years old and it becomes more and more challenging to find hardware to run it on. Long story short, I was asked to make his ten years old binaries to run on a modern Ubuntu.

Fortunately Linux has very impressive ABI compatibility, so my job was down to arranging executables and their dependent libraries. Well, almost.

There three ways of telling an executable (or actually ld.so interpreter) where to search for its libraries

  • Setting rpath on the executable itself.
  • Setting LD_LIBRARY_PATH environment variable.
  • Changing system wide configuration for ld.so to look into additional directories.

The binaries were setuid, and thus LD_LIBRARY_PATH was ruled out.

Next, I've tried to overcome it by putting libraries in /opt/old-stuff/lib and adding it to /etc/ld.so.conf.d/z-old-stuff.conf. This gave me some progress, but I hit the wall with naming collisions - my oldy binary was relying on older libreadline and I had two libreadline.so.5 libs - one in /lib and one in /opt/old-stuff/lib. The latter was obviously further down the search path, since otherwise it would break practically every command-line tool in the system.

So I needed to make my binary to use its own specific version of libreadline and to leave others using the default one. The only way to go was using rpath. Fortunately there is nifty utility out there called patchelf:

patchelf --set-rpath /opt/old-stuff/lib /opt/old-stuff/bin/foo
That almost did the trick. The caveat was that foo was using other library and only that library itself utilized libreadline. So the solution was to set rpath on all libraries as well:
for file in /opt/old-stuff/lib/*; do
    patchelf --set-rpath /opt/old-stuff/lib "$file"
done
Overall that was quite a shift from my current daily programming routine. I did not have to think about linkers for quite a lot of time by now and it was fun to have a taste of this stuff back again.