Using Kafka to keep secondary datastores in sync with your primary datastore

Build Status

Simply stated, the primary purpose of KafkaSync is to keep your secondary datastores (ElasticSearch, cache stores, etc.) consistent with the models stored in your primary datastore (MySQL, Postgres, etc).

Getting started is easy:

class MyModel < ActiveRecord::Base
  include KafkaSync::Model


kafka_sync installs model lifecycle callbacks, i.e. after_save, after_touch, after_destroy and, most importantly, after_commit. The callbacks send messages to kafka, having a (customizable) payload:

def kafka_payload
  { id: id }

Now, background workers can fetch the messages in batches and update the secondary datastores. However, after_save, after_touch and after_destroy only send "delay messages" to kafka. These delay messages should not be fetched immediately. Instead, they should be fetched after e.g. 5 minutes. Only the after_commit callback is sending messages to kafka which can be fetched immediately by background workers. The delay messages provide a safety net for cases when something crashes in between the database commit and the after_commit callback. Contrary, the purpose of messages sent to Kafka from within the after_commit callback is to keep the secondary datastore updated in near-realtime when everything is working without any issues. Due to the combination of delay messages and instant messages, you won't have to to do a full re-index after server crashes again, because your secondary datastores will be self-healing.


Add this line to your application's Gemfile:

gem 'kafka_sync'

And then execute:

$ bundle

Or install it yourself as:

$ gem install kafka_sync

Afterwards, you need to specify how to connect to kafka as well as zokeeper:

KafkaSync.seed_brokers = [""]
KafkaSync.zk_hosts = ""

Reference Docs

The reference docs can be found at https://www.rubydoc.info/github/mrkamel/kafka_sync/master.


The KafkaSync::Model module installs model lifecycle methods.

class MyModel < ActiveRecord::Base
  include KafkaSync::Model



DefaultLogger = Logger.new(STDOUT)

KafkaSync::Consumer.new(topic: "products", partition: 0, name: "consumer", logger: DefaultLogger).run do |messages|
  # ...

You should run a consumer per (topic, partition, name) tuple on multiple hosts for high availability. They will perform a leader election using zookeeper, such that only one consumer of them will be actively consuming messages per tuple while the others are hot-standbys, i.e. if the leader dies, another instance will takeover leadership.

Please note: if you have multiple kinds of consumers for a single model/topic, then you must use distinct names. Assume you have an indexer, which updates a search index for a model and a cacher, which updates a cache store for a model:

KafkaSync::Consumer.new(topic: MyModel.kafka_topic, partition: 0, name: "indexer", logger: DefaultLogger).run do |messages|
  # ...

KafkaSync::Consumer.new(topic: MyModel.kafka_topic, partition: 0, name: "cacher", logger: DefaultLogger).run do |messages|
  # ...


The delayer fetches the delay messages, ie. messages from the specified delay topic. It then checks if enough time has passed in between. Otherwise it will sleep until enough time has passed. Afterwards the delay re-sends the messages to the desired topic where an Indexer can fetch it and index it like usual.

KafkaSync::Delayer.new(topic: MyModel.kafka_topic, partition: 0, delay: 300, logger: DefaultLogger).run

Again, you should run a delayer per (topic, partition) tuple on multiple hosts, to ensure high availability.


The KafkaSync:Streamer actually sends the delay as well as instant messages to Kafka and is required for cases where you're using #update_all, #delete_all, etc. As you might now, #update_all, etc. is by-passing any model lifecycle callbacks, such that you need to tell KafkaSync about those updates.

More concretely, you need to change:

Product.where(on_stock: true).update_all(featured: true)

to the following:

KafkaStreamer = KafkaSync::Streamer.new

Product.where(on_stock: true).find_in_batches do |products|
  KafkaStreamer.bulk products do
    Product.where(id: products.map(&:id)).update_all(featured: true)


Bug reports and pull requests are welcome on GitHub at https://github.com/mrkamel/kafka_sync.


The gem is available as open source under the terms of the MIT License.