All posts by kevin

Elasticsearch Data Engineering

Last month, our team hosted a hackathon for about a dozen Data Scientists who work for one of our customers. The Scientists are a mixed group of econometricians and general statisticians and we wanted to see what they could do with some time series data – in particular news articles and social media (mostly tweets).

A few days before the start of the hackathon, one of our customer contacts asked if I would load a few billion tweets into a temporary Elasticsearch cluster so that the hackathon participants could use it when they arrived.

I quickly violated some well learned muscle memories:

  1. I said: “yes, I can do that.”
  2. I chose to install the most recent version of Elasticsearch (2.0 at the time.)

Installation of 2.0 is as you would expect if you have installed ES before. Very notable differences are the way in which you install Marvel and the new Marvel user interface. I want to use the image of Marvel below to tell this story.

Indexing

To ingest, I wrote a python script that iterated through a massive repository of zipped GNIP files in Amazon’s Simple Storage Service (S3). The first pain point you go through with an ingest like this is one of defining the Elasticsearch mapping. I thought I had it just right, let the ingest script rip, then checked back in on it in the morning. Turns out I missed quite a few fields (GNIP data has shifted formats over time) so I had to reingest about 40 million tweets. You can see my frustration in the first peak on the indexing rate graph below (also mirrored in the Index Size and Lucene Memory graphs.)

20151120-marvel-hackathon.png

After ingesting all weekend, it was clear that I was never going to make it to a billion tweets by the start of the hackathon. I reached out to some of the HumanGeo Gurus and got some advice on how to tweak the cluster to improve ingest speed. The one visible piece of advice in the graph is in the blue circles: at that time I set index.number_of_replicas=0. You can tell that the size on disk was dramatically smaller (expected) and there is a small inflection point in the rate of ingest which you can only see in the document count line graph. Very disconcertingly, the (blue rectangle) indexing rate decreased! But the document count over time has clearly increased. I think this is because marvel is double(ish)-counting your index rate since it sees indexing happening into multiple replicas at the same time.

Trouble

I was now resigned to have a database of ~800 million tweets which was still great but short of my 1bn personal goal. Additional fun occurred at the red circle. One of the nodes ran out of disk space, and in doing that it corrupted its transaction log. This ground indexing to a halt – bulk indexes weren’t happening cleanly any more because Elasticsearch was missing shards. The cluster was missing shards because this failed node had the only copy (remember when I turned off replicas?!) and that copy was now offline.

The transaction log is one of those data store concepts that is supposed to save you from situations like this. I was running the cluster on AWS EC2, so the first thing I did was to stop Elasticsearch on that node, provision and move the index to a larger EBS volume, and start it back up. Elasticsearch tried to recover the shard by reading the transaction log, then discovered that it was corrupted, then gave up, then repeated the process.

One of the tools in your arsenal at this point is to say: forget about those transactions. So I removed the transaction log and restarted Elasticsearch. No dice – because of this bug in Elasticsearch 2.0 – Elasticsearch can neither recover from a corrupted transaction log nor reinitialize a missing transaction log.

My goal of 800 million tweets was now 250 million shy. But those tweets were still indexed! I was just being held up by a few bad eggs in that transaction log!

It was several paragraphs ago that the harder-core reader was literally punching their monitor in frustration because I hadn’t considered trying to hack around the transaction log. The transaction log is a binary format specific to Elasticsearch and it creates a new one whenever you create a new index. What if I created an empty transaction log? Could I get my shard back online?

To get started, I created a new index in Elasticsearch which dropped a pristine transaction log on disk. I copied that transaction log into the right place in my broken index and restarted Elasticsearch. Elasticsearch complained that the UUID in the copied transaction log didn’t match the UUID that the index was expecting. A UUID is just a unique identifier – the transaction log in a brand new index is otherwise identical to the transaction log in any other brand new index. In the log, Elasticsearch said it saw a UUID of 0xDEADBEEF when it expected 0x00C0FFEE. I opened the transaction log in hexedit and could see 0xDEADBEEF.  I copied the correct UUID over the incorrect UUID, saved it, restarted Elasticsearch, the shard came online, and then that gap in the red circle was filled in!

With a repaired transaction log and all shards online I was able to turn on the replicas to get back to a cluster with some durability and better search performance. Elasticsearch took a few more hours to build the replicas and balance the cluster.

The hackathon went live at the green circle. Immediately our sharp Scientists started issuing queries that completely blew up the fielddata in Elasticsearch. Funny thing about fielddata – it’s the special part of memory that Elasticsearch uses to keep aggregations fast. It’s slow to load data into fielddata so it tries to only do that once. So by default, the policy is unbounded. Fielddata will grow until you’re out of memory and never evict data. Which basically means that once you’ve done an aggregation that pushed data into fielddata, it’s there until that data is deleted or you restart Elasticsearch. So if you don’t actually have enough RAM to hold all of the possible fielddata, it will by definition be fully used after a hard query and then newer (possibly more relevant) data can never fit in.

I think in many cases that scenario makes sense but I didn’t have the luxury of adding additional resources. So I set indices.fielddata.cache.size=55%. 55% is a special consideration, since the hard limit without eviction is 60%. When you do this, you’re accepting that ES will evict the least recently used fielddata when it is under memory pressure situations. I suspect that in most cases our users were doing queries in the beginning that were basically bad ideas so I didn’t want to punish the future generations of queries with the mistakes of the past. (That sounds weirdly political.) Anyway, you see the huge spike drop once I put that policy in place.

Phew!

Hopefully the above helps you if you end up in similar situations. Once the hackathon was underway it was a great success. I even overheard one of the participants say how great it was that there was already data available in the database – that’s not normally how these things go, he said.

Some key takeaways from my recent Elasticsearch experience:

  • For ingest performance, turn off replicas
    • Beware! You are flying without an autopilot here. If something goes wrong, it will go very wrong.
  • For greenfield data exploration clusters, set indices.fielddata.cache.size=55% (or get moar RAM.)
  • Learn the internals of your database. This was one such exercise.
    • We see this one all the time with our clients. If you don’t know the failure modes, the query considerations, and the ingest concerns of your database, you’ll be buried under the weight of an early, non-data-driven decision.

Sounds Fun?

We’re hiring. Do Data Science using Elasticsearch and Social Media with us.

Engineering Coda

If you’re the kind of person who likes to be able to make computers do things while also writing the code that makes computers do things, I can’t say enough about two tools that help me do this stuff. One is a clusterssh clone for iTerm2 called i2cssh and the other is called htop. i2cssh lets you login to a bunch of computers at the same time and have your typing go to all of them simultaneously (or individually.) Check out the matrix like screen shots below where I’m running htop on 4 nodes and then 7 nodes (I increased the size of the cluster at one point.)
20151120-htop-hackathon1.png
20151120-htop-hackathon2.png

Thanks to Michele Bueno, Scott Fairgrieve, and Aaron Gussman for reviewing drafts of this post.

Drawing Boundaries In Python

As a technologist at HumanGeo, you’re often asked to perform some kind of analysis on geospatial data, and quickly! We frequently work on short turnaround times for our customers so anything that gives us a boost is welcome, which is probably why so many of us love Python. As evidenced by the volume of scientific talks at PyCon 2014, we can also lean on the great work of the scientific community. Python lets us go from zero to answer within hours or days, not weeks.

I recently had to do some science on the way we can observe clusters of points on the map – to show how regions of social significance emerge. Luckily I was able to lean heavily on Shapely which is a fantastic Python library for performing geometric operations on points, shapes, lines, etc. As an aside, if you are doing any sort of geospatial work with Python, you’ll want to pip install shapely. Once we found a cluster of points which we believed were identifying a unique region, we needed to draw a boundary around the region so that it could be more easily digested by a geospatial analyst. Boundaries are just polygons that enclose something, so I’ll walk through some of your options and attempt to provide complete code examples.

The first step towards geospatial analysis in Python is loading your data. In the example below, I have a shapefile containing a number of points which I generated manually with QGIS. I’ll use the fiona library to read the file in, and then create point objects with shapely.

import fiona
import shapely.geometry as geometry
input_shapefile = 'concave_demo_points.shp'
shapefile = fiona.open(input_shapefile)
points = [geometry.shape(point['geometry'])
          for point in shapefile]

The points list can now be manipulated with Shapely. First, let’s plot the points to see what we have.

import pylab as pl
x = [p.coords.xy[0] for p in points]
y = [p.coords.xy[1] for p in points]
pl.figure(figsize=(10,10))
_ = pl.plot(x,y,'o', color='#f16824')

H_sparse

We can now interrogate the collection. Many shapely operations result in a different kind of geometry than the one you’re currently working with. Since our geometry is a collection of points, I can instantiate a MultiPoint, and then ask that MultiPoint for its envelope, which is a Polygon. Easily done like so:

point_collection = geometry.MultiPoint(list(points))
point_collection.envelope

We should take a look at that envelope. matplotlib can help us out, but polygons aren’t functions, so we need to use PolygonPatch.

from descartes import PolygonPatch

def plot_polygon(polygon):
    fig = pl.figure(figsize=(10,10))
    ax = fig.add_subplot(111)
    margin = .3

    x_min, y_min, x_max, y_max = polygon.bounds

    ax.set_xlim([x_min-margin, x_max+margin])
    ax.set_ylim([y_min-margin, y_max+margin])
    patch = PolygonPatch(polygon, fc='#999999',
                         ec='#000000', fill=True,
                         zorder=-1)
    ax.add_patch(patch)
    return fig

_ = plot_polygon(point_collection.envelope)
_ = pl.plot(x,y,'o', color='#f16824')

H_envelope

So without a whole lot of code, we were able to get the envelope of the points, which is the smallest rectangle that contains all of the points. In the real world, boundaries are rarely so uniform and straight, so we were naturally led to experiment with the convex hull of the points. Convex hulls are polygons drawn around points too – as if you took a pencil and connected the dots on the outer-most points. Shapely has convex hull as a built in function so let’s try that out on our points.

convex_hull_polygon = point_collection.convex_hull
_ = plot_polygon(convex_hull_polygon)
_ = pl.plot(x,y,'o', color='#f16824')

H_convex

A tighter boundary, but it ignores those places in the “H” where the points dip inward. For many applications, this is probably good enough but we wanted to explore one more option which is known as a concave hull or alpha shape. At this point we’ve left the built-in functions of Shapely and we’ll have to write some more code. Thankfully, smart people like Sean Gillies, the author of Shapely and fiona, have done the heavy lifting. His post on the fading shape of alpha gave me a great place to start. I had to fill in some gaps that Sean left so I’ll recreate the entire working function here.


from shapely.ops import cascaded_union, polygonize
from scipy.spatial import Delaunay
import numpy as np
import math

def alpha_shape(points, alpha):
    """
    Compute the alpha shape (concave hull) of a set
    of points.

    @param points: Iterable container of points.
    @param alpha: alpha value to influence the
        gooeyness of the border. Smaller numbers
        don't fall inward as much as larger numbers.
        Too large, and you lose everything!
    """
    if len(points) < 4:
        # When you have a triangle, there is no sense
        # in computing an alpha shape.
        return geometry.MultiPoint(list(points))
               .convex_hull

    def add_edge(edges, edge_points, coords, i, j):
        """
        Add a line between the i-th and j-th points,
        if not in the list already
        """
            if (i, j) in edges or (j, i) in edges:
                # already added
                return
            edges.add( (i, j) )
            edge_points.append(coords[ [i, j] ])

    coords = np.array([point.coords[0]
                       for point in points])

    tri = Delaunay(coords)
    edges = set()
    edge_points = []
    # loop over triangles:
    # ia, ib, ic = indices of corner points of the
    # triangle
    for ia, ib, ic in tri.vertices:
        pa = coords[ia]
        pb = coords[ib]
        pc = coords[ic]

        # Lengths of sides of triangle
        a = math.sqrt((pa[0]-pb[0])**2 + (pa[1]-pb[1])**2)
        b = math.sqrt((pb[0]-pc[0])**2 + (pb[1]-pc[1])**2)
        c = math.sqrt((pc[0]-pa[0])**2 + (pc[1]-pa[1])**2)

        # Semiperimeter of triangle
        s = (a + b + c)/2.0

        # Area of triangle by Heron's formula
        area = math.sqrt(s*(s-a)*(s-b)*(s-c))
        circum_r = a*b*c/(4.0*area)

        # Here's the radius filter.
        #print circum_r
        if circum_r < 1.0/alpha:
            add_edge(edges, edge_points, coords, ia, ib)
            add_edge(edges, edge_points, coords, ib, ic)
            add_edge(edges, edge_points, coords, ic, ia)

    m = geometry.MultiLineString(edge_points)
    triangles = list(polygonize(m))
    return cascaded_union(triangles), edge_points

concave_hull, edge_points = alpha_shape(points,
                                        alpha=1.87)

_ = plot_polygon(concave_hull)
_ = pl.plot(x,y,'o', color='#f16824')

That’s a mouthful, but the gist is that we are going to compute Delaunay triangles which establish a connection between each point and nearby points and then we remove some of the triangles that are too far from their neighbors. This removal part is key. By identifying candidates for removal we are saying that these points are too far from their connected points so don’t use that connection as part of the boundary. The result looks like this.

H_concave_womp_womp

Better, but not great.

It turns out that the alpha value and the scale of the points matters a lot when it comes to how well the Delaunay triangulation method will work. You can usually play with the alpha value to find a suitable response, but unless you can scale up your points it might not help. For the sake of a good example, I’ll do both: scale up the “H” and try some different alpha values.

To get more points, I opened up QGIS, drew an “H” like polygon, used the tool to generate regular points, and then spatially joined them to remove any points outside the “H”. My new dataset looks like this:


input_shapefile = 'demo_poly_scaled_points_join.shp'
new_shapefile = fiona.open(input_shapefile)
new_points = [geometry.shape(point['geometry'])
              for point in new_shapefile]
x = [p.coords.xy[0] for p in new_points]
y = [p.coords.xy[1] for p in new_points]
pl.figure(figsize=(10,10))
_ = pl.plot(x,y,'o', color='#f16824')

H_dense

When we try the alpha shape transformation on these points we get a much more satisfying boundary. We can try a few permutations to find the best alpha value for these points with the following code. I combined each plot into an animated gif below.

from matplotlib.collections import LineCollection

for i in range(9):
    alpha = (i+1)*.1
    concave_hull, edge_points = alpha_shape(new_points,
                                            alpha=alpha)

    #print concave_hull
    lines = LineCollection(edge_points)
    pl.figure(figsize=(10,10))
    pl.title('Alpha={0} Delaunay triangulation'.format(
        alpha))
    pl.gca().add_collection(lines)
    delaunay_points = np.array([point.coords[0]
                                for point in new_points])
    pl.plot(delaunay_points[:,0], delaunay_points[:,1],
            'o', hold=1, color='#f16824')

    _ = plot_polygon(concave_hull)
    _ = pl.plot(x,y,'o', color='#f16824')

H_concave_animated_optimized

So in this case, alpha of about 0.4 looks pretty good. We can use shapely’s buffer operation to clean up that polygon a bit and smooth out any of the jagged edges.

alpha = .4
concave_hull, edge_points = alpha_shape(new_points,
                                        alpha=alpha)

plot_polygon(concave_hull)
_ = pl.plot(x,y,'o', color='#f16824')
plot_polygon(concave_hull.buffer(1))
_ = pl.plot(x,y,'o', color='#f16824')

H_0.4_concave H_0.4_concave_buffer

And there you have it. Hopefully this has been a useful tour through some of your geometric boundary options in python. I recommend exploring the Shapely manual to find out about all of the other easy geometric operations you have at your fingertips. Also, if you dig Python and playing with maps, we want to hear from you.