How we keep our Elasticsearch index updated with data from Microsoft SQL Server

My previous article became redundant when Elasticsearch announced the deprecation of rivers. We stopped using rivers and built an application that queries a database and indexes this data into Elasticsearch.

My colleague Jacob and I went back to the drawingboard and created a module that came to be known as the Feeder. It queries a MS SQL database using TinyTDS and indexes the data using the elasticsearch-ruby gem. I will show you how to set this up yourself in a few simple steps.

The feeder concept

We have a database with products that will be continuously updated. Our application wants to search through all this data, for which we use Elasticsearch. The search index should be updated with new entries, deletions and changes made to the database.

So, how to do this?

First, we retrieve all product_hashes from Elasticsearch. We create and store hashes ourselves and use them to quickly see if a product has changed. We compare hashes stored in the search index with hashes computed from the products in the database. This saves the overhead of retrieving complete documents.

We iterate over all the products in the database and compute new hashes. If the hash is not present in the product_hashes it means the product is new or has been updated: we will index it. Afterwards, we take all the product hashes that we did not encounter (these represent deleted products or previous versions of products) and delete them from the index.

The hash comparison process has four possible, see the following graphic:

Comparison between the old hash and the new one from the database Comparison between the old hash and the new one from the database
Comparison between the old hash in the index and the new one from the database. There are four different outcomes each resulting in a different action.

Here is a basic implementation of the complete feeder process. Subsequent sections in this article elaborate on the specific database and the Elasticsearch parts.

              module Feeder
                database = DB.new(config[:database])
                elasticsearch = ES.new(config[:elasticsearch])
              
                def feed
                  hashes = elasticsearch.product_hashes
              
                  database.each_product do |product|
                    unless hashes.delete(product.id) == product.hash_code
                      elasticsearch.index(product)
                    end
                  end
              
                  hashes.keys.each do |key|
                    elasticsearch.delete(key)
                  end
                end
              end
              

Getting products from the database

We define a simple model for a Product. The cache_rows method returns a hash of the product attributes:

              class Product
                def initialize(attributes)
                  @attributes = attributes
                end
              
                def hash_code
                  Digest::SHA1.hexdigest(@attributes.to_json)
                end
              
                def document
                  attributes.merge({ 'hash_code' => hash_code })
                end
              end
              

It mostly speaks for itself. Note that we add the product hash to the document so it will be included in the the search index.

We use the each_product to iterate over all products in the database:

              class DB
                def initialize(options)
                  @client = TinyTds::Client.new(options)
                end
              
                def each_product
                  result = @client.execute('SELECT id, name, price FROM products ORDER BY id')
                  result.each(cache_rows: false) do |row|
                    yield Product.new(row)
                  end
                ensure
                  result.cancel if result
                end
              end
              

The TinyTDS option cache_rows: false makes sure that yielded rows are forgotten about immediately after being used. This frees up the memory again. In our situation we have to deal with a lot of rows, making this absolutely necessary.

Feeding products to Elasticsearch

Now that we're able to iterate over all products, we need to (re-)index them or delete them. Below are the index and delete functions that are called from the first code sample:

              class ES
                def initialize(options)
                  @client = ElasticSearch::Client.new(options)
                end
              
                def index(product)
                  @client.index(index: 'products', type: 'product',
                                id: product.id, body: product.document)
                end
              
                def delete(id)
                  @client.delete index: 'products', type: 'product', id: id
                end
              end
              

We index products using product.id. This allows us to retrieve them using this id. If you don't need this you might as well index products using their hash as id, simplifying the feed code.

Retrieving all hashes from Elasticsearch proved to be more of a challenge. We optimized the product_hashes function using scan and scroll. A scrolled search can be compared to using a database cursor. The search_type: 'scan' disables sorting of results:

              def product_hashes
                result = @client.search(index: 'products', search_type: 'scan',
                  scroll: '1m', size: 100, body: {
                    filter: {
                      match_all: {}
                    },
                    fields: ['hash_code']
                  }
                )
              
                hashes = {}
                while result = @client.scroll(scroll_id: result['_scroll_id'], scroll: '1m')
                  result['hits']['hits'].each do |product|
                    hashes[product['_id'].to_i] = product['fields']['hash_code'].first
                  end
                  break if result['hits']['hits'].empty?
                end
                hashes
              end
              

Using these cool Elasticsearch features, we were able to quickly get all hash_codes for our products.

Running the feed as a rake task

And to complete the circle: we run the Feeder by calling a rake task from a cron job. In practice we make a difference between a full and partial update, but in essence it looks like this:

              namespace :product do
                desc 'Index products in elasticsearch'
                task :feed do
                  Feeder.new({
                    elasticsearch: Rails.configuration.elasticsearch,
                    database: Rails.configuration.database,
                  }).feed()
                end
              end
              

C'est tout!

Of course, the above code is a simplifed version of our actual implementation. We've refactored a couple of times and meanwhile created an Indexable concern that will do all the work in a standardized way in our Ruby on Rails application. But that's something for another time!