Saturday, April 15, 2017

My sugar findings

The posts in this blog is usually about technology subjects. However I'm on vacation for the last week and have spent several days reading about sugar and products containing it, mostly from Wikipedia. Below is the summary of my findings. Please note that I did not study neither chemistry not biology since 9th grade, so please bear with me for possible inaccuracies.

Appetizer: In the year of 2015, the world has produced 177 million tons of sugar (all types combined). This is 24 kilograms per person per year, or 70 gram per day, and surely much higher in industrialized countries.


AKA “Simple sugars”. These are the most basic types of sugar - they can not be further hydrolyzed to simpler compounds. Those relevant for humans are glucose, fructose and galactose - they are the only ones that human body can directly absorb through small intestine. Glucose can be used directly by body cells, while fructose and galactose are directed to liver for further pre-processing.

Glucose is not “bad” per-se - it’s a fuel of most living organisms on earth, including humans. However high amounts of glucose, as well as other monosaccharides, can lead to insulin resistance (diabetes) and obesity. Another problem related to intake of simple sugars, is that they are fueling acid-producing bacteria living in mouth that leads to dental caries.


Primary sources of monosaccharides in human diet are fruits (both fresh and dried), honey and, recently, HFCS - High Fructose Corn Syrup. On top of that, inverted sugar is also in use, but I will cover it separately later on.

While fruits contain high percentage of fructose, it comes together with good amount of other beneficial nutrients, e.g. dietary fiber, vitamin C and potassium. For that, fruits should not be discarded because of their fructose content - they overall are healthy products and commonly are not a reason for overweight or obesity. For example, two thirds of Australians are overweight or obese, while an average Australian eats only about one piece of fruit a day.

Note: It’s quite common in the food industry to treat dried fruits with sulfur dioxide, which is a toxic gas in its natural form. The health effects of this substance are still disputed, but since it’s done to increase shelf life and enhance visual appeal of the product, i.e. to benefit producer and not end user, I do not see a reason to buy dried fruits treated with it. Moreover, I’ve seen products labeled as organic, that still contained sulfur dioxide, i.e. the fruits themselves were from organic origin, but were treated with sulfur dioxide.

Honey, one the other hand, while generally perceived as “healthy food” is actually a bunch of empty calories. An average honey consists of 80% of sugars and 17% of water, particularly, 38% of fructose and 31% of glucose. Since honey is supersaturated liquid, containing more sugar than water, glucose tends to crystallize into solid granules floating in fructose syrup.

Note: one interesting source of honey is a honeydew secretion.

Finally, HFCS, is a sweetener produced from corn starch by breaking its carbohydrates into glucose and fructose. The resulting solution is about 50/50% on glucose/fructose (in their free form), but may vary between manufactures. This sweetener is generally available since 1970, shortly after discovery of enzymes necessary for its manufacturing process. There were some health concerns about HFCS, however nowadays they are generally dismissed - i.e. HFCS is not better of worth than any other added sugar, which, again, in case of excess intake can lead to obesity and diabetes.


Disaccharide is a sugar that is formed by two joined monosaccharides. The most common examples are:
  • Lactose: glucose + galactose
  • Maltose: glucose + glucose
  • Sucrose: glucose + fructose
Disaccharides can not be absorbed by human body as they are, but require to be broken down, or hydrolyzed, to monosaccharides. To speed up the process and allow fast enough absorption, enzymes are secreted by small intestine, where disaccharides are hydrolyzed and absorbed. Dedicated enzyme is secreted for each disaccharide type, e.g. lactase, maltase and sucrase. Insufficient secretion, or lack thereof, results in body intolerance to a certain types of disaccharides, i.e. inability to absorb them in small intestine. In such case they are passed on into large intestine, where various bacteria metabolize them and the resulting fermentation process produces gases leading to detrimental health effects.

Another issue with disaccharides is that they, together with monosaccharides, provide food food to acid-producing bacteria leading to dental caries. Sucrose particularly shines here allowing anaerobic environments that boost acid production by the bacteria.

Lactose is naturally found in dairy products, but some sources say that it’s often added to bread, snacks, cereals, etc. I don’t quite remember lactose being listed on products, at least in Israel, and though I did not research on the subject, my guess is this is because it will convert products to milk-kosher, and thus can limit their consumption by end user. I did not study lactose any further. Maltose is a major component of brown rice syrup - this is how I’ve stumbled upon it initially.

Sucrose, or “table sugar”, or just “sugar” is the king of disaccharides, and all of the sweeteners together. The rest of this post will be mainly dedicated to it, but let's finish with maltose first.


My discovery to maltose started with reading nutrition facts of organic, i.e. perceived “healthy”, candy saying “rice syrup”. Reading further, I found out that it’s a sweetener produced by breaking down starch of the whole brown rice. The traditional way to produce the syrup is to cook the rice and then to add small amount of sprouted barley grains - something that I should definitely try at home some time. Most of the current production is performed using industrial methods, as one would expect.

The outcome is, again, sweet, empty calories, for good and for bad of it. Traditionally prepared syrup can contain up to 10% of protein, however it’s usually removed in industrial products. Other than that, again, - empty calories.


Without further adieu, let's get to sucrose, most common of all sugars. Since Wikipedia has quite good and succinct article on sucrose, I will only mention topics that particularly thrilled me.

Note: Interestingly enough, before introduction of industrial sugar manufacturing methods, honey was the primary source of sweeteners in most parts of the world.

Humans extract sucrose from cane sugar from about 500BC. The process is quite laborious and involves juice extraction from crushed canes, boiling it to reduce water content, then, while cooling, sucrose crystallizes out. Such sugar is considered Non-centrifugal cane sugar (NCS). Today processes are quite optimized and use agents like lime (don’t confuse with lemon), and activated carbon for purification and filtering. The result is raw sugar, which is then further purified up to pure sucrose and molasses (residues).

In 19th century, sugar beet plant joined the sugar party. Slightly different process is used, but it also results in sucrose and molasses. Beet’s molasses are considered unpalatable by humans, while cane molasses are heavily used in food industry.

While it’s generally agreed that regular white sugar (sucrose) is “bad”, in recent years there is trend to substitute it with various kinds of brown sugars, which are considered healthier. Let’s explore what brown sugars are.

Brown sugar is a sucrose based sugar that has a distinctive brown color due to presence of molasses. It’s either obtained by stopping refinement process at different stages, or by re-adding molasses to pure white sugar. Regardless of the method, the only non-sugar nutritional value of brown sugars comes from their molasses, and since typical brown sugar does not contain more than 10% of molasses, its difference to white sugar is negligible, nutrition wise. Bottom line - use brown sugars, e.g. demerara, muscovado, panela, etc. because you like their taste and not because they are healthier.

This leads to conclusion that molasses is the only health-beneficial product of sugar industry. The strongest, blackstrap molasses, contains significant amount of vitamin B6 and minerals like calcium, magnesium, iron, and manganese, with one tablespoon providing 20% of daily value.

The only outstanding detrimental effect of sucrose that I have discovered (compared to other sugars) is its increased effect on tooth decay.



Heating sugars, particularly sucrose, produces caramel. Sucrose first gets decomposed into glucose and fructose and then builds up new compounds. Surprisingly enough, this process is not well understood.

Inverted sugar

Inverted sugar syrup is produced by splitting sucrose into its components - fructose and glucose. The resulting product is alluringly sweet, even compared to sucrose. The simplest way to obtain inverted sugar is to dissolve some sucrose in water and heat it. Citric acid (1g per kg of sugar) can be added to catalyze the process. Baking soda can be used later to neutralize the acid and thus remove the sour taste.

Sucrose inversion occurs when preparing jams, since fruits naturally contain acids. Inverted sugar provides strong preserving qualities for products that use it - this is what gives jams relatively long shelf life even without additional preservatives.

Tuesday, November 22, 2016

Elasticsearch pipeline aggreagtions - monitoring used capacity

Lets say I want to setup a simple monitoring system for my desktop. The desktop uses LVM and has three volumes v1, v2 and v3, all belonging to vg1 volume group. I would like to monitor used capacity of these volumes, and the whole system, over time. It's easy to write a script that samples used capacity of the volumes and pushes it to ElasticSearch. All I need to store is:

  "name": "v1",
  "ts": 1479762877,
  "used_capacity": 1288404287488

OK, so I've put the script into cron to run every 5 minutes and the data starts pouring in. Lets do some BI on it! First thing to find out is how full my desktop is, i.e. the total capacity of all volumes. Sounds like a easy job for Kibana, isn't it? Well, not really.

Part 1: Naive failure

Let's say each of my volumes is ~1TB full. Trying to chart area viz in Kibana with Average aggregation over used_capacity returns useless results (click the below image to enlarge):

The real total system capacity is ~3TB, but Kibana, rightfully, shows that AVG(v1, v2, v3) => AVG(1TB, 1TB, 1TB) => 1TB. So may be I need Sum? Not good either:

I got ~17TB capacity number which not even close to reality. This happens because Kibana uses simple Date Histogram with nested Sum aggregation, i.e.

  • Divide selected date range into ts buckets. 30 minutes in my example.
  • Calculate Sum of used_capacity values of all documents that fall in bucket.
That's why the larger is the bucket, the more weird the results would look.

This happens because Kibana is only capable of either: $$ \underbrace{\text{SUM}\left(\begin{array}{c}v1, v1, v1,...\\ v2, v2, v2,...\\ v3, v3, v3,...\\ \end{array}\right)}_{ts\ bucket} \quad\text{or}\quad \underbrace{\text{AVG}\left(\begin{array}{c}v1, v1, v1,...\\ v2, v2, v2,...\\ v3, v3, v3,...\\ \end{array}\right)}_{ts\ bucket} $$ While what I need is: $$ \underbrace{\text{SUM}\left(\begin{array}{c}\text{AVG}(v1, v1, v1,...)\\ \text{AVG}(v2, v2, v2,...)\\ \text{AVG}(v3, v3, v3,...)\\ \end{array}\right)}_{ts\ bucket} $$ So how to achieve this?

Part 2: Poor man's solution

The post title promised pipeline aggregations and I'll get there. The problem with pipeline aggregations is that they are not supported in Kibana. So, is there still a way to get along with Kibana? - sort of. I can leverage on the fact that my sampling script takes capacity values of all volumes at exactly the same time, i.e. each bunch of volume metrics is pushed to ES with the same ts value. Now, if I force Kibana to use ts bucket length of 1 minute, I can guarantee that in any given bucket, I will only have documents belonging to a single sample batch (that's because I send measurements to ES every 5 minutes, which is much larger than the 1m bucket size).

One can argue that it generates LOTS of buckets - and he is right, but there is one optimization point to consider. ES Date histogram aggregation supports automatic pruning of buckets that do not have a minimum number of documents. The default is 0, which means empty buckets are returned, but Kibana wisely sets it to 1. Now lets say I want to see capacity data chart for last 7 days, which is 7*24*60=10080 points (buckets); however since I take measurements only every 5 minutes, most of the buckets will be pruned and we are left only with 2000, which is fare enough for Full HD screen. The nice side-effect of this is that it forces Kibana to draw really smooth charts :) Let's see it in action:

The above graph shows capacity data for last 7 days. The key point is to open and Advanced section of X-Axis dialog and put {"interval": "1m"} in JSON Input field - this overrides Kibana's automatic interval. The bottom legend, that says "ts per 3 hours", is lying, but it's the least of evils. Also note how smooth is the graph line.

Part 3: Pipeline aggregations!

The above solution works, but does not scale well beyond a single system - getting measurements from multiple systems at exactly the same time is tricky. Another drawback is that trying to looks at several months of data will result in tens of thousands of buckets which will burden both on ES, on the network and Kibana.

The right solution is to implement the correct formula. I need something like this:

SELECT AVG(used_capacity), ts FROM
    (SELECT SUM(used_capacity) AS used_capacity, DATE(ts) AS ts FROM capacity_history GROUP BY DATE(ts), name)

Elasticsearch supports this since version 2.0 with Pipeline aggregations:

GET capacity_history/_search
  "size": 0,
  "aggs": {
    "ts": {
      "date_histogram": {"interval": "1h", "field": "ts"},
      "aggs": {
        "vols": {
          "terms": {"field": "name.raw", "size": 0},
          "aggs": {
            "cap": {
              "avg": {"field": "logical_capacity"}
        "total_cap": {
          "sum_bucket": {
            "buckets_path": "vols>cap"


  "aggregations": {
    "ts": {
      "buckets": [
          "key_as_string": "1479600000",
          "key": 1479600000000,
          "doc_count": 36,
          "vols": {
            "doc_count_error_upper_bound": 0,
            "sum_other_doc_count": 0,
            "buckets": [
                "key": "v1",
                "doc_count": 12,
                "cap": {
                  "value": 1073741824000
                "key": "v2",
                "doc_count": 12,
                "cap": {
                  "value": 1073741824000
                "key": "v3",
                "doc_count": 12,
                "cap": {
                  "value": 1072459894784
          "total_cap": {
            "value": 3219943542784
Since we only need ts bucket key and value of total_cap aggregation, we can ask ES to filter the results to include only the data we need. In case we have lots of volumes it can reduce the amount of returned data by orders of magnitude!
GET capacity_history/_search?filter_path=aggregations.ts.buckets.key,aggregations.ts.buckets.total_cap.value,took,_shards,timed_out
  "took": 92,
  "timed_out": false,
  "_shards": {
    "total": 70,
    "successful": 70,
    "failed": 0
  "aggregations": {
    "ts": {
      "buckets": [
          "key": 1479600000000,
          "total_cap": {
            "value": 3219943542784
          "key": 1479603600000,
          "total_cap": {
            "value": 3220228083712
NOTE: I suggest always to return meta timed_out and _shards fields to make sure you do not get partial data.

This method is generic and will work regardless of time alignment of the samples; bucket size can be adjusted to return a same amount of data points. The major drawback is that it is not supported by stock Kibana and thus you will need your own custom framework to visualize this.

Thursday, July 14, 2016

You better have persistent storage for ElasticSearch master nodes

This is followup for my previous post about whether ElasticSearch master nodes should have persistent storage - they better do!. The rest of the post demonstrates how you can have spectacular data loss with ES if master nodes do not save their state to persistent storage.

The theory

Let's say you have the following cluster with single index (single primary shard). You also have an application that constantly writes data to the index

Now what happens if all your master nodes evaporate? Well, you relaunch them with clean disks. The moment masters are up, the cluster is red, since there are no data nodes, and your application can not index data.

Now data nodes start to join. In our example, the second one joins slightly before the first. What happens is that cluster becomes green, since fresh masters do not have any idea that there is other data node that has data and is about to join.

You application happily continues to index data, into newly created index on data node 2.

Now data nodes 1 joins - masters discover that they have some old version of our index and discard it. Data loss!!!

Sounds too esoteric to happen in real life? Here is sad&true story - back in a time we ran our ES master nodes in Kubernetes without persistent disk, i.e. on local EmptyDir volumes only. One day there was short network outage - for less than an hour. Kubelets lost connection to K8s master node and killed the pods. Once the network was back, the pods were started - with clean disk volumes! - and our application resumed running. The only catch is we've lost tons data :)

The reproduction

Let's try to simulate this in practice to see what happens. I'll use the minimal ES cluster by just running three ES instances on my laptop:

  • 1 master node that also servers as a client node
  • 2 data nodes. Lets call them dnode1 and dnode2

Open three shells and lets go:

  1. Start the nodes - each in separate shell
    /usr/share/elasticsearch/bin/elasticsearch -Des.node.master=true --path.conf=/etc/elasticsearch --default.path.logs=/tmp/master-client/logs
    Data 01:
    /usr/share/elasticsearch/bin/elasticsearch -Des.http.enabled=false -Des.node.master=false --path.conf=/etc/elasticsearch --default.path.logs=/tmp/data-01/logs
    Data 02:
    /usr/share/elasticsearch/bin/elasticsearch -Des.http.enabled=false -Des.node.master=false   --path.conf=/etc/elasticsearch --default.path.logs=/tmp/data-02/logs
  2. Index a document:
    curl -XPUT -d '{"settings": {"number_of_shards": 1, "number_of_replicas": 0}}'
    curl -XPUT -d '{"name": "Zaar"}'
  3. Check on which data node the index has landed. In my case, it was dnode2. Shutdown this data node and the master node (just hit CTRL-C in the shells)
  4. Simulate master data loss by issuing rm -rf /tmp/master-client/
  5. Bring master back (launch the same command)
  6. Index another document:
    curl -XPUT -d '{"settings": {"number_of_shards": 1, "number_of_replicas":0}}'
    curl -XPUT -d '{"name": "Hai"}'

Now, while dnode2 is still down, we can see that index file exists in data directories of both nodes:

$ ls /tmp/data-0*/elasticsearch/nodes/0/indices/


However data on dnode2 is now in "Schrodinger's cat" state - neither dead, but not exactly alive either.

Let's bring back the node two and see what happens (I've also set gateway loglevel to TRACE in /etc/elasticsearch/logging.yml for better visibility):

$ /usr/share/elasticsearch/bin/elasticsearch -Des.http.enabled=false -Des.node.master=false   --path.conf=/etc/elasticsearch --default.path.logs=/tmp/data-02/logs
[2016-07-01 17:07:13,528][INFO ][node                     ] [data-02] version[2.3.3], pid[11826], build[218bdf1/2016-05-17T15:40:04Z]
[2016-07-01 17:07:13,529][INFO ][node                     ] [data-02] initializing ...
[2016-07-01 17:07:14,265][INFO ][plugins                  ] [data-02] modules [reindex, lang-expression, lang-groovy], plugins [kopf], sites [kopf]
[2016-07-01 17:07:14,296][INFO ][env                      ] [data-02] using [1] data paths, mounts [[/ (/dev/mapper/kubuntu--vg-root)]], net usable_space [21.9gb], net total_space [212.1gb], spins? [no], types [ext4]
[2016-07-01 17:07:14,296][INFO ][env                      ] [data-02] heap size [990.7mb], compressed ordinary object pointers [true]
[2016-07-01 17:07:14,296][WARN ][env                      ] [data-02] max file descriptors [4096] for elasticsearch process likely too low, consider increasing to at least [65536]
[2016-07-01 17:07:16,285][DEBUG][gateway                  ] [data-02] using initial_shards [quorum]
[2016-07-01 17:07:16,513][DEBUG][indices.recovery         ] [data-02] using max_bytes_per_sec[40mb], concurrent_streams [3], file_chunk_size [512kb], translog_size [512kb], translog_ops [1000], and compress [true]
[2016-07-01 17:07:16,563][TRACE][gateway                  ] [data-02] [upgrade]: processing []
[2016-07-01 17:07:16,564][TRACE][gateway                  ] [data-02] found state file: [id:7, legacy:false, file:/tmp/data-02/elasticsearch/nodes/0/_state/]
[2016-07-01 17:07:16,588][TRACE][gateway                  ] [data-02] state id [7] read from []
[2016-07-01 17:07:16,589][TRACE][gateway                  ] [data-02] found state file: [id:1, legacy:false, file:/tmp/data-02/elasticsearch/nodes/0/indices/users/_state/]
[2016-07-01 17:07:16,598][TRACE][gateway                  ] [data-02] state id [1] read from []
[2016-07-01 17:07:16,599][TRACE][gateway                  ] [data-02] found state file: [id:7, legacy:false, file:/tmp/data-02/elasticsearch/nodes/0/_state/]
[2016-07-01 17:07:16,602][TRACE][gateway                  ] [data-02] state id [7] read from []
[2016-07-01 17:07:16,602][TRACE][gateway                  ] [data-02] found state file: [id:1, legacy:false, file:/tmp/data-02/elasticsearch/nodes/0/indices/users/_state/]
[2016-07-01 17:07:16,604][TRACE][gateway                  ] [data-02] state id [1] read from []
[2016-07-01 17:07:16,605][DEBUG][gateway                  ] [data-02] took 5ms to load state
[2016-07-01 17:07:16,613][INFO ][node                     ] [data-02] initialized
[2016-07-01 17:07:16,614][INFO ][node                     ] [data-02] starting ...
[2016-07-01 17:07:16,714][INFO ][transport                ] [data-02] publish_address {}, bound_addresses {[::1]:9302}, {}
[2016-07-01 17:07:16,721][INFO ][discovery                ] [data-02] elasticsearch/zcQx-01tRrWQuXli-eHCTQ
[2016-07-01 17:07:19,848][INFO ][cluster.service          ] [data-02] detected_master {master-client}{V1gaCRB8S9yj_nWFsq7uCg}{}{}{data=false, master=true}, added {{data-01}{FnGrtAwDSDSO2j_B53I4Xg}{}{}{master=false},{master-client}{V1gaCRB8S9yj_nWFsq7uCg}{}{}{data=false, master=true},}, reason: zen-disco-receive(from master [{master-client}{V1gaCRB8S9yj_nWFsq7uCg}{}{}{data=false, master=true}])
[2016-07-01 17:07:19,868][TRACE][gateway                  ] [data-02] [_global] writing state, reason [changed]
[2016-07-01 17:07:19,905][INFO ][node                     ] [data-02] started

At 17:07:16 we see the node found some data on it's own disk, but discarded it at 17:07:19 after joining the cluster. It's data dir is in fact empty:

$ ls /tmp/data-0*/elasticsearch/nodes/0/indices/


Invoking stat confirms that data directory was changed right after "writing state" message above:

$ stat /tmp/data-02/elasticsearch/nodes/0/indices/
  File: ‘/tmp/data-02/elasticsearch/nodes/0/indices/’
  Size: 4096            Blocks: 8          IO Block: 4096   directory
Device: fc01h/64513d    Inode: 1122720     Links: 2
Access: (0775/drwxrwxr-x)  Uid: ( 1000/ haizaar)   Gid: ( 1000/ haizaar)
Access: 2016-07-01 17:08:39.093619141 +0300
Modify: 2016-07-01 17:07:19.920869352 +0300
Change: 2016-07-01 17:07:19.920869352 +0300
 Birth: -


  • Masters' cluster state is at least as important as data. Make sure your master node disks are backed up.
  • If running on K8s - use persistent external volumes (GCEPersistentDisk if running on GKE).
  • If possible, pause indexing after complete master outages until all of the data nodes come back.

Tuesday, April 26, 2016

How Kubernetes applies resource limits

We are building one of our products on a cloud and decided to run it entirely on Kubernetes cluster. One of the big pains that is relieved by containers is resource separation between different processes (modules) of your system. Let's say we have a product that comprises of several services that talk to each other ("microservices" as it is now fashionably called). Before containers, or, to be more precise, before Linux kernel control groups were introduced, we had several options to try to ensure that they do not step on each other:

  • Run each microservice on a separate VM, which is usually wasteful
  • Play with CPU affinity for each microservice, on each VM - this saves you only from CPU hogs, but not from memory leeches, fork bombs, I/O swappers, etc.

This is where containers come into play - this allows you share your machine between different applications by allocating required portion of resources for each of them.

Back to Kubernetes

Kubernetes supports defining limit enforcement on two resource types: CPU and RAM. For each container you can provide requested, i.e. minimum required, amount of CPU and memory and a limit that container should not pass. Requested is also used for pod scheduling to ensure that a node will provide minimum amount of resources that pod requested. All these parameters are of course translated to docker parameters under the hood.

Since Kubernetes is quite a new gorilla in the block, I decided to test how enforcement behaves to get first hand experience with it.

So first I created a container cluster on GKE with Kubernetes 1.1.8:

gcloud container clusters create limits-test --machine-type n1-highcpu-4 --num-nodes 1

Now lets see what we got on our node (scroll right):

$ kubectl describe nodes
Non-terminated Pods:            (5 in total)
  Namespace                     Name                                                                    CPU Requests    CPU Limits      Memory Requests Memory Limits
  ─────────                     ────                                                                    ────────────    ──────────      ─────────────── ─────────────
  kube-system                   fluentd-cloud-logging-gke-limits-test-aec280e3-node-2tdw                100m (2%)       100m (2%)       200Mi (5%)      200Mi (5%)
  kube-system                   heapster-v11-9rqvl                                                      100m (2%)       100m (2%)       212Mi (5%)      212Mi (5%)
  kube-system                   kube-dns-v9-kbzpd                                                       310m (7%)       310m (7%)       170Mi (4%)      170Mi (4%)
  kube-system                   kube-ui-v4-7q12m                                                        100m (2%)       100m (2%)       50Mi (1%)       50Mi (1%)
  kube-system                   l7-lb-controller-v0.5.2-imjry                                           110m (2%)       110m (2%)       70Mi (1%)       120Mi (3%)
Allocated resources:
  (Total limits may be over 100%, i.e., overcommitted...)
  CPU Requests  CPU Limits      Memory Requests Memory Limits
  ────────────  ──────────      ─────────────── ─────────────
  720m (18%)    720m (18%)      702Mi (19%)     752Mi (21%)

That's quite interesting already - the minimal resource overhead of Kubernetes is 720 millicores of CPU and 702 megabytes of RAM (not including kubelet and kube-proxy of course). However second node and on will only run one daemon pod - fluentd for log collection, so the resource reservation will be significantly lower.


Kubernetes defines CPU resource as compressible, i.e. a pod can get larger part of CPU share if there is available CPU and this can be changed back on the fly, without process restart/kill.

I've created a simple CPU loader that calculates squares of integers from 1 to 1000 in loop on every core and prints loops/seconds number; packaged it into a docker image and launched into k8s using the following pod file:

apiVersion: v1
kind: Pod
  name: cpu-small
  - image:
    name: cpu-small
        cpu: "500m"

I've created another pod similar to this - just called it cpu-large. Attaching to pods shortly afterwards, I saw that they get a fair share of CPU:

$ kubectl attach cpu-small
13448 loops/sec
13841 loops/sec
13365 loops/sec
13818 loops/sec
14937 loops/sec

$ kubectl attach cpu-large
14615 loops/sec
14448 loops/sec
14089 loops/sec
13755 loops/sec
14267 loops/sec

That makes sense - they both requested only .5 cores and the rest was split between them, since nobody else was interested. So in total this ode can crunch ~30k loops/second. Now lets make cpu-large to be really large and reserve at least 2.5 cores for it by changing its requests.cpu to 2500m and re-launching it into k8s. According to our settings, this pod now should be able to crunch at least ~25k loops/sec:

$ kubectl attach cpu-large
23310 loops/sec
23000 loops/sec
25822 loops/sec
23834 loops/sec
25153 loops/sec
24741 loops/sec

And this is indeed the case. Lets see what happened to cpu-small:

$ kubectl attach cpu-small
30091 loops/sec
28609 loops/sec
30219 loops/sec
27051 loops/sec
27885 loops/sec
29091 loops/sec
28699 loops/sec
18216 loops/sec
4213 loops/sec
4188 loops/sec
4296 loops/sec
4347 loops/sec
4141 loops/sec

First it got all of the CPU while I was re-launching cpu-large, but once the latter was up, the CPU share for cpu-small was reduced. Together they will produce the same ~30k loops/second, but we now control the share ratio.

What about limits? Well, turns out that currently limits are not enforced. This is not a big problem for us, because in our deployment strategy we prefer to provide minimum required CPU share for every pod and for the rest - be my guest. However at this point I was glad I did this test, since the documentation was misleading with regards to CPU limits.


The RAM resource is uncompressible, because there is no way to throttle process on memory usage or ask it gently to unmalloc some of it. That's why if a process reaches RAM limit, it's simply killed.

To see how it's enforced in practice, I, again, created a simple script that allocates memory in chunks up to predefined layout.

First I've tested how requests.memory are enforced. I've created the following mem-small pod:

apiVersion: v1
kind: Pod
  name: mem-small
  - image:
    name: mem-small
        memory: "100Mi"
    - name: MAXMEM
      value: "2147483648"

and launched it. I happily allocated 2GB of RAM and stood by. Then I created mem-large pod with similar configuration where requests.memory is set to "2000Mi". After I launched the large pod, the following happened:

  • cpu-large started allocating the desired 2GB RAM.
  • Since my k8s node only had 3.6GB RAM, system froze for dozen seconds or so.
  • Since there was no free memory in the system, kernel Out Of Memory Killer kicked in and killed mem-small pod:
[  609.739039] Out of memory: Kill process 5410 (python) score 1270 or sacrifice child
[  609.746918] Killed process 5410 (python) total-vm:1095580kB, anon-rss:1088056kB, file-rss:0kB

I.e. enforcement took place and my small pod was killed, since it consumed more RAM than requested and other pod was eligibly requesting memory. However such behavior is unsuitable in practice since it causes "stop-the-world" effect for everything that runs on particular k8s node.

Now lets see how resource.limits are enforced. To verify that, I've killed by of my pods, and changed mem-small as follows:

apiVersion: v1
kind: Pod
  name: mem-small
  - image:
    name: mem-small
        memory: "100Mi"
        memory: "100Mi"
    - name: MAXMEM
      value: "2147483648"

After launching it I saw the following on it's output:

Reached 94 megabytes
Reached 95 megabytes
Reached 96 megabytes
Reached 97 megabytes
Reached 98 megabytes
Reached 99 megabytes
Reached 99 megabytes
Reached 99 megabytes

I.e. The process was immediately killed after reaching its RAM limit. There is a nice evidence to that in dmesg output:

[  898.665335] Task in /214bc7c0bcdb1d0bf10b8ab4cff06b451850f9af0894472a403412ea295324ea killed as a result of limit of /214bc7c0bcdb1d0bf10b8ab4cff06b451850f9af0894472a403412ea295324ea
[  898.689794] memory: usage 102400kB, limit 102400kB, failcnt 612
[  898.697490] memory+swap: usage 0kB, limit 18014398509481983kB, failcnt 0
[  898.705930] kmem: usage 0kB, limit 18014398509481983kB, failcnt 0
[  898.713672] Memory cgroup stats for /214bc7c0bcdb1d0bf10b8ab4cff06b451850f9af0894472a403412ea295324ea: cache:84KB rss:102316KB rss_huge:0KB mapped_file:4KB writeback:0KB inactive_anon:4KB active_anon:102340KB inactive_file:20KB active_file:16KB unevictable:0KB
[  898.759180] [ pid ]   uid  tgid total_vm      rss nr_ptes swapents oom_score_adj name
[  898.768961] [ 6679]     0  6679      377        1       6        0          -999 sh
[  898.778387] [ 6683]     0  6683    27423    25682      57        0          -999 python
[  898.788280] Memory cgroup out of memory: Kill process 6683 (python) score 29 or sacrifice child


Kubernetes documentation is a bit misleading with regards to requests.limits.cpu. Nevertheless this mechanism looks perfectly useful for application. All of the code and configuration used in this post is available in the following gists:

Sunday, April 17, 2016

Kubernetes cluster access by fixed IP

If you:

  • Have Kubernetes cluster running in GKE
  • Connected GKE to your company network through VPN
  • Puzzled how to assign a fixed IP to particular k8s service

Then read on.


The ideal solution would be to configure k8s service to use GCP LoadBalancer and have the latter to provide private IP only. However as of April 2016, LoadBalancers on GCP do not provide an option for private IP only, though GCP solution engineers said this feature "is coming".

Therefore the only option we have it to run a dedicated VM with fixed IP and proxy traffic through it.

The approach

Kubernetes service itself provides two relevant ways to access pods behind it:
By default, every service has a virtual ClusterIP (which can be manually set to a predefined address) which can be used to access pods behind the service. However for this to work, a client has to have kube-proxy running on its host as explained here.
A k8s service can be configured to expose a certain pod on every k8s node which will redirected to the service's pods (this comes on top of ClusterIP).

ClusterIP approach obviously is not feasible outside the h8s cluster, so we only have left with NodePort approach. The problem is that k8s node IPs are not static and may change. That's why we need a dedicated VM which has a fixed IP.

After we have a VM, we can either

  • Join it to the k8s cluster, so service's NodePort will be exposed on the VM's fixed IP as well.
  • Run a reverse HTTP proxy on VM to forward traffic on k8s nodes together with a script that monitors k8s nodes and updates proxy configuration when necessary.

I chose the second option because it allows a single VM to proxy requests for multiple k8s clusters and is easier to setup.

The setup

Create an instance

Lets create an VM and assign it a static IP. The below is my interpretation of the official guide.

Create an instance first:

gcloud compute instances create fixed-ip-proxy --can-ip-forward
The last switch is crucial here.

I chose IP for my testing cluster to be Lets add it to the instance:

cat <<EOF >>/etc/network/interfaces.d/eth0-0
auto eth0:0
iface eth0:0 inet static

Now change /etc/network/interfaces and make sure that source-directory /etc/network/interfaces.d line comes last. Apply your new configuration by running:

sudo service networking restart

The final step is to instruct GCE to forward traffic destined to to the new instance:

gcloud compute routes create fixed-ip-production \
                                --next-hop-instance fixed-ip-proxy \
                                --next-hop-instance-zone us-central1-b \

To add more IPs (adding dedicated IP per cluster is a good practice), add another file under /etc/network/interfaces.d/ and add a GCE route.

NGINX configuration

Install NGINX:
sudp apt-get install nginx

Install Google Cloud Python SDK:

sudo easy_install pip
sudo pip install --upgrade google-api-python-client

Now download the IP watcher script:

sudo wget -O /root/nginx-ip-watch
sudo chmod 755 /root/nginx-ip-watch

NOTE: You are downloading my script that will run as root on your machine - read its contents first!

Test the script:

$ sudo /root/nginx-ip-watch -h 
usage: Watch GKE node IPs for changes [-h] -p PROJECT -z ZONES
                                      name gke-prefix listen-ip listen-port

positional arguments:
  name                  Meaningful name of your forwarding rule
  gke-prefix            GKE node prefix to monitor and forward to
  listen-ip             IP listen on
  listen-port           Port to listen on
  target-port           IP listen on

optional arguments:
  -h, --help            show this help message and exit
  -p PROJECT, --project PROJECT
                        Project to list instances for
  -z ZONES, --zones ZONES
                        Zones to list instances for

Now lets setup NGINX to listen for HTTP traffic on and forward it to GKE testing cluster nodes on port 30601 by adding the following to /etc/cron.d/nginx-ip-watch:


* * * * * root /root/nginx-ip-watch kibana-testing -p my-project -z us-central1-a gke-testing 5601 30601

After that, within one minute, your forwarding should be up and running. For more services, just keep adding more lines in the cron file. This will work well for a dozen or so services. After that, I would refactor the solution to issue only one gcloud compute instances list command per minute.

Since we are using NGINX in load-balancer mode, checking GKE hosts only once a minute is good-enough even during cluster upgrades - NGINX will detect and blacklist a shutting down GKE node by itself.


Create a snapshot of your instance to keep a backup of your work every time you change it. Don't forget to issue sync command on the system before taking snapshot of the disk.


  • The first version of my script used gcloud command line util to fetch instances list. It turned out that gcloud performs logging to ~/.config/gcloud/logs and spits 500KB on every invocation. To mitigate this, I've updated my script to use Google Cloud Python SDK to bypass gcloud util completely.
  • As Vadim points out below, you can now specify fixed internal IP during instance creation time. Though you'll still need the setup above if you want to have more then one IP per instance.

Tuesday, April 5, 2016

Persistent storage for ElasticSearch master nodes?

ElasticSearch master nodes hold cluster state. I was trying to understand whether these nodes are required to have persistent storage or they can recover from whatever exists on data nodes? The short answer is: probably yes.


You better have persistent disk for your ES data nodes - read here why.

Below I describe tests what I've done. But before that - some background on how did I get to this question at first place.

ElasticSearch on Kubernetes

We are working on running ElasticSearch 2.x on Kubernetes on Google Container Engine. There are two options to store a data for a container:
Part of the local storage on Kubernetes node is allocated for the pod. If pod's controller restarts, the data survives. If pod is killed - the data is lost.
Compute engine persistent disk (created in advance) can be attached to a pod. The data persists. However there is a limitation - as of Kubernetes 1.2, a ReplicaSet can not attach a different disks to each pods that it creates, thus to run ElasticSearch data nodes, for example, you need to create a separate ReplicaSet (with size of 1) for each ES data node.

ES data nodes should have persistent disk - this is no brainer. However with regards to ES master nodes it's not clear. I've tried to understand where master nodes persist cluster state, and this thread states "on every node including client nodes". There is also a resolved issue about storing index metadata on data nodes.

Run, Kill, Repeat

So lets how it behaves in reality.

I created two node Kubernetes 1.2 cluster running n1-standard-2 instances (2 CPUs, 7.5GB RAM). And used Paulo Pires Kubernetes setup for ElasticSearch:

$ git clone
$ cd kubernetes-elasticsearch-cluster
$ vim es-data-rc.yaml  # set replicas to 2
$ vim es-master-rc.yaml  # set replicas to 3
$ vim es-svc.yaml  # set type to ClusterIP

Lets launch it in the air:

$ for i in *.yaml; do kubectl create -f $i; done
$ sleep 1m; kubectl get pods
NAME              READY     STATUS    RESTARTS   AGE
es-client-ats2b   1/1       Running   0          1h
es-data-teodq     1/1       Running   0          1h
es-data-zwml2     1/1       Running   0          1h
es-master-3bosq   1/1       Running   0          1h
es-master-a47om   1/1       Running   0          1h
es-master-c1dy1   1/1       Running   0          1h

We are all good. Lets ingest some data and alter cluster settings:

$ CLIENTIP=$(kubectl describe pods es-client |grep '^IP' |head -n 1|awk '{print $2}')
$ curl -XPUT $CLIENTIP:9200/_cluster/settings?pretty -d '{"transient": {"discovery.zen.minimum_master_nodes": 2}}'
  "acknowledged" : true,
  "persistent" : { },
  "transient" : {
    "discovery" : {
      "zen" : {
        "minimum_master_nodes" : "2"
$ curl -XPUT $CLIENTIP:9200/tweets/tweet/1 -d '{"foo": "bar"}'          
  "_id": "1",
  "_index": "tweets",
  "_shards": {
    "failed": 0,
    "successful": 2,
    "total": 2
  "_type": "tweet",
  "_version": 1,
  "created": true
$ curl $CLIENTIP:9200/_cluster/health?pretty
  "cluster_name" : "myesdb",
  "status" : "green",
  "timed_out" : false,
  "number_of_nodes" : 6,
  "number_of_data_nodes" : 2,
  "active_primary_shards" : 0,
  "active_shards" : 0,
  "relocating_shards" : 0,
  "initializing_shards" : 0,
  "unassigned_shards" : 0,
  "delayed_unassigned_shards" : 0,
  "number_of_pending_tasks" : 0,
  "number_of_in_flight_fetch" : 0,
  "task_max_waiting_in_queue_millis" : 0,
  "active_shards_percent_as_number" : 100.0

The data is there and the cluster is green. Now lets kill all of the masters and recreate them (their data disk will be lost):

$ kubectl delete -f es-master-rc.yaml 
replicationcontroller "es-master" deleted
$ kubectl create -f es-master-rc.yaml       
replicationcontroller "es-master" created

After dozens of seconds, new masters will be up again and we'll see the following in the leader's log:

[2016-04-04 15:57:44,880][INFO ][cluster.service          ] [Elaine Grey] new_master {Elaine Grey}{5NIL5jBbTYadefGzjDLb5A}{}{}{data=false, master=true}, added {{Lyja}{CUyGl7w-R86qcOsNSj0xPA}{}{}{master=false},{Typhoid Mary}{cWmlEtHuSdCImjHMNM6FsA}{}{}{master=false},{Slug}{fQPe2C1FSH2UkuveBFJtbw}{}{}{data=false, master=false},}, reason: zen-disco-join(elected_as_master, [0] joins received)
[2016-04-04 15:57:45,056][INFO ][node                     ] [Elaine Grey] started
[2016-04-04 15:57:45,058][INFO ][cluster.service          ] [Elaine Grey] added {{Stacy X}{U4sn1pkWRlGV-zVMW1OeAA}{}{}{data=false, master=true},}, reason: zen-disco-join(pending joins after accumulation stop [election closed])
[2016-04-04 15:57:45,711][INFO ][gateway                  ] [Elaine Grey] recovered [0] indices into cluster_state
[2016-04-04 15:57:45,712][INFO ][cluster.service          ] [Elaine Grey] added {{Kiss}{MaNkKlQWR82QHFVhz38Ohg}{}{}{data=false, master=true},}, reason: zen-disco-join(join from node[{Kiss}{MaNkKlQWR82QHFVhz38Ohg}{}{}{data=false, master=true}])
[2016-04-04 15:57:46,077][INFO ][gateway                  ] [Elaine Grey] auto importing dangled indices [tweets/OPEN] from [{Lyja}{CUyGl7w-R86qcOsNSj0xPA}{}{}{master=false}]
[2016-04-04 15:57:47,073][INFO ][cluster.routing.allocation] [Elaine Grey] Cluster health status changed from [RED] to [YELLOW] (reason: [shards started [[tweets][3]] ...]).
[2016-04-04 15:57:47,567][INFO ][cluster.routing.allocation] [Elaine Grey] Cluster health status changed from [YELLOW] to [GREEN] (reason: [shards started [[tweets][3]] ...]).
[2016-04-04 15:58:16,355][INFO ][io.fabric8.elasticsearch.discovery.kubernetes.KubernetesDiscovery] [Elaine Grey] updating discovery.zen.minimum_master_nodes from [-1] to [2]

So we see that:

  • The new master recovered 0 indices from cluster state - i.e. the cluster state was indeed lost.
  • The new master auto imported existing indices, which were "dangling" in ES terminology.
  • It also restored our transient quorum setting

And our cluster is green:

$ curl $CLIENTIP:9200/_cluster/health?pretty
  "cluster_name" : "myesdb",
  "status" : "green",
  "timed_out" : false,
  "number_of_nodes" : 6,
  "number_of_data_nodes" : 2,
  "active_primary_shards" : 5,
  "active_shards" : 10,
  "relocating_shards" : 0,
  "initializing_shards" : 0,
  "unassigned_shards" : 0,
  "delayed_unassigned_shards" : 0,
  "number_of_pending_tasks" : 0,
  "number_of_in_flight_fetch" : 0,
  "task_max_waiting_in_queue_millis" : 0,
  "active_shards_percent_as_number" : 100.0

Now lets kill master without destroying their disks and see it behaves any differently:

$ for pod in $(kubectl get pods |grep es-master |awk '{print $1}'); do do kubectl exec $pod killall java & done
The master log shows the following:
[2016-04-04 16:33:55,695][INFO ][cluster.service          ] [Neptune] new_master {Neptune}{51QU1wf4T2Ky9QUzK5MkEw}{}{}{data=false, master=true}, added {{Slug}{fQPe2C1FSH2UkuveBFJtbw}{}{}{data=false, master=false},{Typhoid Mary}{cWmlEtHuSdCImjHMNM6FsA}{}{}{master=false},{Lyja}{CUyGl7w-R86qcOsNSj0xPA}{}{}{master=false},{Comet Man}{cOLRxsOKTC2OYR6Kuiplxw}{}{}{data=false, master=true},}, reason: zen-disco-join(elected_as_master, [1] joins received)
[2016-04-04 16:33:55,867][INFO ][node                     ] [Neptune] started
[2016-04-04 16:33:56,428][INFO ][gateway                  ] [Neptune] recovered [1] indices into cluster_state
[2016-04-04 16:33:57,233][INFO ][cluster.routing.allocation] [Neptune] Cluster health status changed from [RED] to [YELLOW] (reason: [shards started [[tweets][0], [tweets][0]] ...]).
[2016-04-04 16:33:57,745][INFO ][cluster.routing.allocation] [Neptune] Cluster health status changed from [YELLOW] to [GREEN] (reason: [shards started [[tweets][4]] ...]).
[2016-04-04 16:34:01,417][INFO ][cluster.service          ] [Neptune] added {{Nicole St. Croix}{5VcQT4H8RHeAf3R0PW3K4A}{}{}{data=false, master=true},}, reason: zen-disco-join(join from node[{Nicole St. Croix}{5VcQT4H8RHeAf3R0PW3K4A}{}{}{data=false, master=true}])
[2016-04-04 17:01:55,811][INFO ][io.fabric8.elasticsearch.discovery.kubernetes.KubernetesDiscovery] [Neptune] updating discovery.zen.minimum_master_nodes from [-1] to [2]

So 1 index was recovered from the cluster state and there are no dangling indices this time.


While master nodes were able to recover both indices and cluster transient settings, I was testing only the most simple scenario. This is not enough to take a decision on whether we should maintain persistent disks for master nodes. On the other hand, if we do have persistent disks for master - do we need to backup the metadata? And what about ES 5.x? - one of the promised features is that master will hold an ID of the latest index change to prevent stale data nodes becoming primaries during network partitioning. This kind of metadata can not be stored on data nodes.

I'll update this post when I'll have the answers.

Friday, March 25, 2016

Caveat with ElasticSearch nGram tokenizer

Finally got some time to blog about ElasticSearch. I use it extensively during the last two years, but my findings are rather lengthy. Finally I've got something small to share.

ElasticSearch nGram tokernizer is very useful for efficient substring matching (at cost of index size of course). For example, I have an event message field like this

/dev/sda1 has failed due to ...
and I would like to find all events of failure for all SCSI disks. One option is to store message field as not analyzed string (i.e. a one single term) and use wildcard query:
GET /events
  "query": {
    "wildcard": {
      "message.raw": {
        "value": "/dev/sd?? has failed*"
This will do the work perfectly, but to complete it, ElasticSearch will scan every value of message field looking for the pattern during search time. Once number of documents gets big enough, it will become slow.

One solution is to split message to substrings during indexing time, with (2,20) for (min, max) in our example:

# Analyzer definition in settings
"analysis": {
    "analyzer": {
        "substrings": {
            "tokenizer": "standard",
            "filter": ["lowercase", "thengram"]
    "filter": {
        "thengram": {
            "type": "nGram",
            "min_gram": 2,
            "max_gram": 20

# message field definition in mappings
"message": {
    "type": "string",
    "index": "analyzed",
    "analyzer": "substrings"
and use match_phrase query:
GET events/_search
  "query": {
    "match": {
      "message": {
        "query": "dev sd has failed",
        "type": "phrase",

The caveat

The above query will return weirdly unrelevant results and, at first glance, it's not obvious why. The caveat is, that our custom analyzer is applied both during indexing and search. So instead of searching for sequence of terms "dev", "sd", "has", "failed"; we are searching for sequence "de", "ev", "dev", "sd", "ha", "as", "has", etc. To fix this we need to tell Elastic to use different tokernizer during search (and search only). This can be done either by adding "analyzer": "standard" to query itself (which is error phone, since can be easily forgotten) or specified in mapping definition:
"message": {
    "type": "string",
    "index": "analyzed",
    "analyzer": "substrings",
    "search_analyzer": "standard"

Worth it?

I took 1,000,000 events sample data and run both wildcard and phrase queries that match 1,000 doc subset out of it. While for such a small data set, both are fast, the difference it quite striking nevertheless:
  • wildcard query - 30ms
  • phrase query - 5ms

Times 6 speed up! Another bonus for using phrase query is that you can get results highlighting (that is not supported for wildcard queries).