How we keep our Elasticsearch index updated with data from Microsoft SQL Server

My previous article became redundant when Elasticsearch announced the deprecation of rivers. We stopped using rivers and built an application that queries a database and indexes this data into Elasticsearch.

My colleague Jacob and I went back to the drawingboard and created a module that came to be known as the Feeder. It queries a MS SQL database using TinyTDS and indexes the data using the elasticsearch-ruby gem. I will show you how to set this up yourself in a few simple steps.

The feeder concept

We have a database with products that will be continuously updated. Our application wants to search through all this data, for which we use Elasticsearch. The search index should be updated with new entries, deletions and changes made to the database.

So, how to do this?

First, we retrieve all product_hashes from Elasticsearch. We create and store hashes ourselves and use them to quickly see if a product has changed. We compare hashes stored in the search index with hashes computed from the products in the database. This saves the overhead of retrieving complete documents.

We iterate over all the products in the database and compute new hashes. If the hash is not present in the product_hashes it means the product is new or has been updated: we will index it. Afterwards, we take all the product hashes that we did not encounter (these represent deleted products or previous versions of products) and delete them from the index.

The hash comparison process has four possible, see the following graphic:

Comparison between the old hash and the new one from the database Comparison between the old hash and the new one from the database
Comparison between the old hash in the index and the new one from the database. There are four different outcomes each resulting in a different action.

Here is a basic implementation of the complete feeder process. Subsequent sections in this article elaborate on the specific database and the Elasticsearch parts.

module Feeder
  database =[:database])
  elasticsearch =[:elasticsearch])

  def feed
    hashes = elasticsearch.product_hashes

    database.each_product do |product|
      unless hashes.delete( == product.hash_code

    hashes.keys.each do |key|

Getting products from the database

We define a simple model for a Product. The cache_rows method returns a hash of the product attributes:

class Product
  def initialize(attributes)
    @attributes = attributes

  def hash_code

  def document
    attributes.merge({ 'hash_code' => hash_code })

It mostly speaks for itself. Note that we add the product hash to the document so it will be included in the the search index.

We use the each_product to iterate over all products in the database:

class DB
  def initialize(options)
    @client =

  def each_product
    result = @client.execute('SELECT id, name, price FROM products ORDER BY id')
    result.each(cache_rows: false) do |row|
    result.cancel if result

The TinyTDS option cache_rows: false makes sure that yielded rows are forgotten about immediately after being used. This frees up the memory again. In our situation we have to deal with a lot of rows, making this absolutely necessary.

Feeding products to Elasticsearch

Now that we're able to iterate over all products, we need to (re-)index them or delete them. Below are the index and delete functions that are called from the first code sample:

class ES
  def initialize(options)
    @client =

  def index(product)
    @client.index(index: 'products', type: 'product',
                  id:, body: product.document)

  def delete(id)
    @client.delete index: 'products', type: 'product', id: id

We index products using This allows us to retrieve them using this id. If you don't need this you might as well index products using their hash as id, simplifying the feed code.

Retrieving all hashes from Elasticsearch proved to be more of a challenge. We optimized the product_hashes function using scan and scroll. A scrolled search can be compared to using a database cursor. The search_type: 'scan' disables sorting of results:

def product_hashes
  result = 'products', search_type: 'scan',
    scroll: '1m', size: 100, body: {
      filter: {
        match_all: {}
      fields: ['hash_code']

  hashes = {}
  while result = @client.scroll(scroll_id: result['_scroll_id'], scroll: '1m')
    result['hits']['hits'].each do |product|
      hashes[product['_id'].to_i] = product['fields']['hash_code'].first
    break if result['hits']['hits'].empty?

Using these cool Elasticsearch features, we were able to quickly get all hash_codes for our products.

Running the feed as a rake task

And to complete the circle: we run the Feeder by calling a rake task from a cron job. In practice we make a difference between a full and partial update, but in essence it looks like this:

namespace :product do
  desc 'Index products in elasticsearch'
  task :feed do{
      elasticsearch: Rails.configuration.elasticsearch,
      database: Rails.configuration.database,

C'est tout!

Of course, the above code is a simplifed version of our actual implementation. We've refactored a couple of times and meanwhile created an Indexable concern that will do all the work in a standardized way in our Ruby on Rails application. But that's something for another time!