Two years ago I was given a task to build a real time business intelligence tool that works for call center. It should deliver both current and historical reports in real-time.
It is installed in clients premises and has to work sometimes on very limited resources. It consists of a web service and Mongo cluster. Customers who install this product has to familiarize themselves with NoSQL technology. In this market usually reporting is implemented on RDBMS. SQL databases can do almost everything we want from it. However, there is a usage where NoSQL database can replace SQL databases:
real time content, cheap scalability and availability, web.
There was no great scope for any of these great ideas to improve performance that involves additional processing or caching. I had to concentrate on stream mechanics, data modelling and code optimization.
The tool operates on streams. Stream types give you a way to prioritize stream and helps to choose proper caching. In my case, I implemented three different caching approaches for three different types of streams.
First, I looked for events which are crucial from both historical and real time point of view.I found them and was able to promote them to the separate statistic stream.
There was no option: all events from this stream have to be persisted in a local queue. By persisting documents in BSON format, we speed up the process of deserialization. The processor (the transforming part) considers BSON as its generic format so messages have a straight way from being deserialized to being processed. We have to make sure that whatever storage technology we use there is no degradation of speed when the queue grows.
Another type of stream was the real time status stream. The real time status stream is sensible only when there is a live link between the reporting tool and the data source. We can jump to the beginning of the real time status stream at any point in time and recreate the objects graph in memory. This objects graph is the second cache. It has a fixed length and keeps references to real objects instead of a list of updates. In the same time, we don’t lose any event and the queue never exceeds a certain limit.
What do we achieve? If we want to have the latest updates in MongoDB, we sent updates as they come. However, if the statistic stream falls back, we can wait and send updates in bursts (batch update). We can postpone updates because updates are cumulated in objects.
The third stream happens rather occasionally: when there was outage, human error,
or we want to apply a new data transformation rule. It is used to replay or to recovery events. It uses in-memory small cache because it feeds on events from SQL database or log files which are persisted anyway.
The processor executes operations as if it was a CPU operating on very long vector of memory. It pre-aggregates and pre-groups reports by creating time slots that describe what happened within given period of time. It does not only do count but also average, rate, min and max and some more. Actually all operations are done client side thus only $set is used to set document fields. Documents are bigger than documents storing only counters.
Before performing any of these operations that are not supported by MongoDB atomic updates, the processor has to read the document . By doing that we warm up selected document but we issue additional request thus introduce latency. To avoid this latency we can cache (in-memory, or memcache if possible ) those documents that were touched last . It will be the next step I will take as it should increase performance by about 30-40 %.
The processor runs MapReduce when a client wants to see combined view of what has happened or happened within a time period that spans across more than one time slice. Then the processor reduces many time slices into one that can be updated as if it were a regular time slot. However, instead of map-reducing millions of documents we reduce only few hundred.
Potential next step could be to generate various length slots e.g. monthly, daily, hourly. Then an “execution plan” could optimizes queries and resources, for example,
by providing 500 monthly report instead of 80000 hourly reports of the same month to the map reduce engine or to the new aggregation framework.
In order to keep up the pace, only data from MongoDB are processed. Calls to external data sources are forbidden as it would slow down the entire process. If one wants to provide external data, data have to be preloaded to MongoDB, or events have to be enriched before getting to queues hence the processor.
As you can see I started off by data modelling. The time slots approach (found similar approach here: http://docs.mongodb.org/manual/use-cases/pre-aggregated-reports/) can save a lot of memory and improve performance. Then I differentiate streams and gave them priorities. Finally, external requests during event processing are forbidden I achieved this by making documents self-sufficient in terms of data. In the future, additional documents caching will help a lot.
Nine hundred agents generate about 160 statistical and 200 status events per second. This amount of traffic is successfully handled by two MongoDB shards installed on virtual server.
Nine hundred agents generate about 160 statistical and 200 status events per second. This amount of traffic is successfully handled by two MongoDB shards installed on virtual server.