Syncing data from integrations in an asynchronous way

One of NomNom’s core features is the ability to integrate with many customer-facing tools to synchronize customer feedback data. Whether it’s communication tools like Intercom, ticketing system like Zendesk or various NPS services, we always face the issue of how to get all this customer data into NomNom as fast as possible and in a reliable way.

In most of the cases syncing data between external tools and NomNom requires thousands of API requests. This can take a significant amount of time - mainly due to a number of requests as well as due to heavy rate limiting on integration endpoints (e.g many Intercom accounts only allow 60 requests per second). On top of that, requests can fail due to network errors, maintenance windows, etc. That is why our usual algorithm for syncing data consist of the following steps

  1. When a user connects an integration, we create a job for initial historical data fetch
  2. This job is picked by a worker which is responsible for getting the most recent pieces of feedback (Intercom conversations, Zendesk tickets etc.). For each piece, it creates a new job to be processed.
  3. Each item is then imported as a document and data are further processed in the separate processing pipeline.

See example data flow for syncing Intercom conversations:

Besides implementing the message flow, we had the following list of requirements:

  1. Asynchronous processing: Each document is fetched and processed asynchronously in a separate job. This ensures the user can see the imported data in their account as soon as possible.
  2. Manage the number of concurrent jobs: To improve performance we should allow for concurrent processing of jobs. But as mentioned, some of the integrations are heavily rate limited so we should have an easy way to manage the number of concurrent jobs.
  3. Automatic retries: Jobs can fail for various reasons (e.g. network failures) and should be automatically retried in case of such a failure. At the same time, we don’t want to retry failures ad infinitum - once a certain threshold is reached, failed jobs need to be stored in a separate place so they can be reviewed and retried, or discarded.
  4. Monitor job statistics and track errors: There should be an easy way to track metrics and errors for job processing.
  5. Simple production setup: If we decide to use 3rd party software it should be easy to deploy, manage and monitor in our production environment.
  6. Ability to use multiple programming languages: Even though the majority of our backend is written in Clojure, we have a Ruby on Rails application which will need to push jobs to the queue processing.

Why did we decided to go with RabbitMQ

Given our requirements, it was obvious that we would have to use some kind of message queue. There are so many to choose from - RabbitMQ, HornetQ/ActiveMQ, Kafka, and NATS. RabbitMQ was able to fulfill most of the requirements above, as well as being most familiar for our engineering team.

From the list of our requirements, numbers 1, 2, 5 and 6 are solved because:

  • Messaging in RabbitMQ is asynchronous.
  • You can define multiple consumers for a given queue and use that to control the concurrency.
  • RabbitMQ has great monitoring tools (web interface, an HTTP API and command line, see more details in the official documentation).
  • RabbitMQ is easy to host yourself even in a production environment (it supports distributed deployment, high availability, etc. - you can find more details in server documentation). You can also go with a hosted solution using CloudAMQP.
  • RabbitMQ has many mature clients for all major programming languages.

We only have to solve how to support automatic retries and how to enable monitoring in a simple way.

Message processing with automatic retries

Originally NomNom started as a Ruby application. There we used Sneakers, - a simple framework for defining RabbitMQ consumers. One of the nice features of Sneakers is automatic failure handling - the consumers automatically define retry and error queues/exchanges and dead letter exchange to allow job retries.

Let’s explain the implemented message flow. Each time consumer is processing a message it can result in one of the following scenarios:

  1. Success: Message was processed successfully. In that case, we want to acknowledge the message and remove the message from the original queue.
  2. Error: Message processing failed for a known reason and retry processing would lead to the same result (e.g. 401 HTTP response from an external service). The original message will be acknowledged but also pushed to error queue for further examination.
  3. Retry: Message processing failed for a known reason but if we retry we should succeed (e.g network failure). The original message will be acknowledged but also pushed to retry queue. After configured time, message will be routed back to the main queue for re-processing using dead letter exchange. We also configure how many times a message can be retried before being pushed to the error queue.
  4. Timeout: When a message failed because of a timeout, we apply the same handling as in the case of retry.
  5. Exception: When a message processing failed with an unknown exception, we apply the same handling as in the case of retry.

It might be more clear looking at the graph below:

Monitoring

When monitoring consumer processing we are interested in a few things:

  • How long it took to process the message.
  • What was the result of the message processing (success, timeout etc.).
  • We want to monitor exceptions and number of their occurrences.

We like to use smart solutions other people build so we can concentrate on domain-specific problems. We use StatsD and Collectd for collecting metrics (and forwarding them to Google’s Stackdriver) and Rollbar for exception tracking. This can easily change in the future if we find a better tools which are worth switching to. The requirement for monitoring was to make it easy to instrument our job processing as well as make it possible to replace or improve without having to rewrite code for every single consumer we defined.

The majority of our backend services are written in Clojure. We also rely heavily on component architecture as defined in Stuart Sierra's component library.

While Clojure has multiple client libraries for RabbitMQ (particularly Langohr, maintained by one of the RabbitMQ core team members) - most of them act just as an interface to the RabbitMQ API. Initially, we built our own library with a common set of functions for creating consumer components with retry functionality. Quickly we noticed that every time we used the library there was a lot of boilerplate code required for the consumer processing and monitoring wasn’t standardized. We needed a framework, which would define the skeleton for building a consumer and its related functionality.

Please welcome to the stage, Bunnicula

We are proud to announce the release of nomnom/bunnicula, the framework for asynchronous messaging with RabbitMQ. With Bunnicula, you can now implement consumers with automatic retries just by specifying a function for message processing and a small set of configurations. For convenience, you can also use the library for publishing messages to RabbitMQ.

Bunnicula provides four components:

  • connection
  • publisher
  • consumer-with-retry
  • base-monitoring

Please see full details in the library documentation, in here we will do a quick walk through how to setup components for publishing and consuming messages.

Building your messaging system

Connection component

At first, you need to define a connection component, which represents a RabbitMQ connection.

(require '[bunnicula.component.connection :as connection])  

(def connection 
  (connection/create 
    {:username "rabbit"                                     
     :password "password"                                     
      :host "127.0.0.1"                                     
       :port 5672                                     
       :vhost "/main"}))    
      

Publisher component

For the publisher, you can optionally specify an exchange to which messages will be published. Out of the box, the messages are serialized as JSON but you can also specify your own serialization function. At NomNom we’ve started switching from JSON to Avro - it has many advantages such as type safety and guarantees that both publishers and consumers are using the same message schema.                                                                                                    ‌

(require '[bunnicula.component.publisher :as publisher])  
(def publisher (publisher/create {}))

Consumer-with-retry component

For the consumer, you need to specify a message-handler-fn and set of configuration values. The most important values in the configuration are queue-name and exchange-name, which will specify which queue to set up and which exchange it will be bound to. The rest of configuration values are optional and you can find more details in the consumer documentation.

(require '[bunnicula.component.consumer-with-retry :as consumer])

(defn import-conversation-handler
  [_body parsed _envelope _components]
  (let [{:keys [integration_id message_id]} parsed]
    ;; ... some code ...
    ;; e.g import intercom conversation for 
    ;; given integration_id & message_id
    ;; need to return one of allowed values (:ack, :error, :retry)
    :ack))

(def consumer
  (consumer/create {:message-handler-fn import-conversation-handler
                    :options {:queue-name "intercom.conversation-import"
                              :exchange-name ""
                              :timeout-seconds 120
                              :backoff-interval-seconds 60
                              :consumer-threads 4
                              :max-retries 3}}))

This approach means that your message processing code is a simple function which doesn’t need to know anything about RabbitMQ. The only requirements are the function signature and that it needs to return one of the predefined values signifying the status of message processing. This makes it very easy to test your consumer functions without worrying about the underlying messaging infrastructure.

Components system

Now you can put all those blocks together and build your messaging system. Both publisher and consumer require connection as a component dependency in the system.

The consumer also requires monitoring component as a dependency. By default, we can use the BaseMonitoring component available in the library which will log the results of message processing.

Once you start the system, the following things will happen:

  • RabbitMQ connection is open.
  • New RabbitMQ channel is opened to be used by the publisher component for publishing messages.
  • Queues & exchanges are created for the specified queue name to support the message flow with retries.
  • The configured number of consumer threads is started.

And that is it - you are all set. All you need to do now is use the publisher component to send messages to the appropriate RabbitMQ queue and the rest is taken care of.


(require '[bunnicula.protocol :as protocol])
(protocol/publish (:publisher app)
                  "intercom.conversation-import"
                  {:integration_id 1 :message_id "123"})

… and clap your hands for Bunnicula Monitoring

As mentioned, Bunnicula library itself contains a simple monitoring component which logs the result of message processing. You can define your own methods for tracking by creating your own monitoring component. We also opened sourced nomnom/bunnicula.monitoring which reports job timing and other metrics to StatsD via nomnom/stature and exceptions to Rollbar via nomnom/caliban.

Here’s an example, which sets up a consumer with the monitoring component:


(require '[bunnicula.monitoring :as monitoring]
         '[caliban.tracker :as tracker]
         '[stature.metrics :as metrics]
         '[com.stuartsierra.component :as component])

(def tracker (tracker/create
              {:token "abc123"
               :environment "production"}))

(def statsd (metrics/create
             {:host "localhost"
              :port 8125
              :prefix "test"}))

(def system
  (component/system-map
   :publisher publisher
   :rmq-connection connection
   :exception-tracker tracker
   :statsd statsd
   :monitoring (component/using
                (monitoring/create {:consumer-name "intercom.import"})
                [:exception-tracker :statsd])
   :consumer (component/using
              consumer
              [:rmq-connection :monitoring])))         

This approach allows for injecting your own error reporting service, as long as your exception tracker component implements existing Exception Tracker protocol. The same approach can be used for reporting to InfluxDB instead of StatsD.

Next steps for Bunnicula

Bunniucla (and related components) have been used in production for over a year at NomNom - it powers most of our infrastructure: interacting with external services, feedback data processing, event tracking, and typical background tasks such as sending emails. We hope Bunnicula can help you with some of your distributed, asynchronous job processing. In the near future, we hope to extend the library with the following functionality:

All libraries mentioned in this post are available via Clojars - if you use them or if you find any issues, please let us know on GitHub!