Thursday, 7 May 2020

Highway England data traffic flow and Covid-19

Introduction

I collected and ingested 15 minutes API Highway England Road Flow reports for around 400 sites across England. This data spans from 2019 till April 2020 i.e. reports from before and including the beginning of Covid-19 pandemic. The idea was to see what new patterns emerged during the new situation.

Data collection and ingestion

I used Python to collect data. After the collection of latitude, longitude, road names plus a few more columns were added to make the dataset richer. 

Apache-Druid unified console showing new columns in INFORMATION_SCHEMA


Data was ingested into Apache Druid using Druid's UI. As my set-up was a single micro machine I encountered some issues to load larger data set. To address that I first increased JVM memory for middle manager, overlord and historical node to prevent garbage collector falling into vicious circle of exceeding overhead limit. But it was not enough so I had to partition ingested data set based on site and timespan.

Data quality

Once reports were ingested into Druid I decided to measure the data quality. The simplest thing I could do was to check if  all days have the same number of reports. I was looking for ninety six reports a day as they come in fifteen minutes intervals. I used Druid's grouping capabilities and created traffic_counts dataset. I used traffic dataset which provides the finest granularity of source data. Druid rolled up traffic dataset instantaneously and created new metric based on reports count per day. The last step was to visualise the new data set in Superset.

Y axis is day of year, X axis is sites

In the picture above, white spaces show missing measurements (site did not deliver data). As we see, some of these sites were not working for quite some time. But volume of missing data was not huge and did not stop me from generating reports I intended. The dark turquoise colour represents the correct number of reports per day for site i.e. ninety six per day. The most interesting are three horizontal lines. They show that number of reports that are consistently wrong for all sites in the same days of year. It may be caused of faulty database export process and it is something that Highway England should investigate.

First pattern found

During March 2019 volume of vehicles using the selected main four roads was constant, hence the pattern shown in the figure below was stable. People were travelling mostly on Mondays and coming back home on Fridays. The least travelling was observed during weekends.

Average volume of vehicles in 2019 March
For selected four main roads, X axis is March, Y axis is volume per day

In year 2020 the pattern is different. People started to follow the government advice to reduce travelling early in March. But the deepest drop in travel can be observed in 27th of March when the country lockdown began.

Average volume of vehicles in 2020 March
For selected four main roads, X axis is March, Y axis is volume per day

What is next? 

As today data for April 2020 has not been published yet. The next step could be to compare March and April 2020 to see how people follow the new rule. Another idea is to drill more into roads and see for example how traffic flows between specific junctions. 

Sunday, 3 July 2016

Converting MongoDB semi-structured to structured schema - Part I


How to convert semi-structured MongoDB schema into fully structured tabular/SQL schema? As we know MongoDB is a schemaless database that stores JSON documents in so-called collections. Schemaless means, that each document in collection holds its own schema: name and type of the field. JSON does not support directly types which are necessary to store data for fast read and write. Hence MongoDB storage is BSON based. BSON is a strongly type serialization format, and binary counterpart of JSON. Having stored JSON document we can ask MongoDB to provide count of documents field that match specific type, for example:

db.collection.find({'field_name':{$type:0}})

will return the number of documents where field_name is of type long. Most of MongoDB types can be easily converted to SQL counterparts: int->int, string->varchar(), ISODateTime->datetime so on.
During converting semi-structured MongoDB schema to structured schema, there are two structures that will need special attention: embedded document and embedded arrays.

Flattening JSON documents
Below I demonstrate a simple JavaScript based approach to flatten JSON documents in MongoDB. The script will open MongoDB connection, select database, collection and start iterating over all documents. I'm not a JavaScript developer hence I had to scrap the internet to find my initial script. After a few amendments, I came up with something like script below. First, load below script via MongoDB shell:

function convert(jobName,databaseName,collectionName){
conn      = new Mongo();
db        = conn.getDB(databaseName);
collection= db.getCollection(collectionName)
cursor    = collection.find();

while ( cursor.hasNext() ) {
   flatten(cursor.next(),
    db.getCollection(jobName),
    jobName,
    -1);
}}

var curID=0function getNextId(){
  return curID++
}
var done=falsefunction flatten(data, coll, parentName, parentId) {
    var currentId      = getNextId()
    var result  = {"id":currentId,"parent":parentId};
    function recurse (cur, prop) {
        if ( typeof(cur) == "function" )
            return        // scalar value        if (Object(cur) !== cur) {
            result[prop] = cur;
        } else if (Array.isArray(cur)) {
             // if current is array I don't include to object -             // in case of maps it prevents from creating massive schemas             var coll=db.getCollection(prop.replace(/-/g,"_"))
             for(var i=0, l=cur.length; i<l; i++)
             {
                 // spawn another flatten for each array                 flatten(cur[i],coll,prop,currentId)
             }
        } else {
            var isEmpty = true;
            for (var p in cur) {
                //print(prop+typeof(cur))                //if(p == "_id")                //    continue                isEmpty = false;
                recurse(cur[p], prop ? prop+"_"+p : p);
            }
            if (isEmpty && prop)
                result[prop] = {};
        }
    }
    // bootstrap    recurse(data, parentName);
    coll.insert(result)
    return result;
}

Then simply call convert function providing a unique name of the job, database name and collection name. When the function is finished you will have a whole bunch of new collections in your working database, with names starting with job's name and '_'. If your job name is for example 'job', your main collection will have name 'job'.
Each field inside documents is created by concatenating field name. For example document:
  {
   "_id" : ObjectId("57682626c3ea9b6005dc0949"),
   "methodId" : "65c05a05-f63e-4889-bdc0-2a7787a57cf9",
   "method: {    methodDate" : 20160602,    "job_method_version" : 2    }
  }
Will result:
> db.job.find()[0]
  {
     "_id" : ObjectId("57682626c3ea9b6005dc0946"),
     "id" : 0,//new field                       //we need id because we have to record parent-child relation     "parent" : -1,   //new field    //we need id because we have to record parent-child      relation                                   //,-1 indicates that this is top level collection.      "job_methodId" : "65c05a05-f63e-4889-bdc0-2a7787a57cf9",
     "job_method_methodDate" : 20160602,
     "job_method_version" : 2,
  }

JSON arrays
What do we do when we see an array in the document? Existing arrays will have their separate collections, for example, job_flags will be a collection that is derived from the array that sits in the main document under field name 'flags'.We just have to create another table and link current table with the new one using reference keys (parent-child relation). Then we keep analyzing what is in the array and if we find another one inside, then we do the same trick. When we finish we should have an SQL schema that resembles the snowflake schema well known from data warehousing, but in this instance, we don't have facts and dimensions but simple relationships. When we want to deal with a more flat structure we can solve it in two ways: use SQL views or, check the length of the array and when it is very short then just extend current table by adding new columns instead of creating child table. Be aware though that most of SQL databases have constraints on the number of columns in the table.


Caveats - maps
If you have a collection where each document looks differently from one another then there is no way to convert the whole collection into SQL tables with a finite number of columns. One would have to use wide table database like HBase that deals very well with sparse data in the columnar data store. Another caveat may occur when sometimes developers use maps for serialization purposes for example: Map<String, Struct<...>> to store data via DAO (Data Access Layer) where the string is key instead of fixed name for example: instead of using ContracId as the name of the field, developers may use actual contract id key's value: '00024535-8603-4dc7-9dc6-e7a55b3ce7d3'. That approach also makes such conversion difficult as there is no algorithm to discover whether the name of the field is a proper name in the schema-less collection or just a key value. This sort of problems falls into data modeling domain. Mongo allows to do that but as I said before, it makes MongoDB schema difficult to analyze. One remedy could be to provide a list of maps used to the analyzing algorithm. Another is to check the format of the field name: if the format matches GUID/UUID format then we can assume the field is a key and we have to move it to a child table instead of extending the current table.


0 New
Reply

Sunday, 5 August 2012

MongoDB in call center

Two years ago I was given a task to build a real time business intelligence tool that works for call center. It should deliver both current and historical reports in real-time.
It is installed in clients premises and has to work sometimes on very limited resources. It consists of a web service and Mongo cluster. Customers who install this product has to familiarize themselves with NoSQL technology. In this market usually reporting is implemented on RDBMS. SQL databases can do almost everything we want from it. However, there is a usage where NoSQL database can replace SQL databases:
real time content, cheap scalability and availability, web. 

There was no great scope for any of these great ideas to improve performance that involves additional processing or caching. I had to concentrate on stream mechanics, data modelling and code optimization. 

The tool operates on streams. Stream types give you a way to prioritize stream and helps to choose proper caching. In my case, I implemented three different caching approaches for three different types of streams. 

First, I looked for events which are crucial from both historical and real time point of view.I found them and was able to promote them to the separate statistic stream.
There was no option: all events from this stream have to be persisted in a local queue. By persisting documents in BSON format, we speed up the process of deserialization. The processor (the transforming part) considers BSON as its generic format so messages have a straight way from being deserialized to being processed. We have to make sure that whatever storage technology we use there is no degradation of speed when the queue grows. 

Another type of stream was the real time status stream. The real time status stream is sensible only when there is a live link between the reporting tool and the data source. We can jump to the beginning of the real time status stream at any point in time and recreate the objects graph in memory. This objects graph is the second cache. It has a fixed length and keeps references to real objects instead of a list of updates. In the same time, we don’t lose any event and the queue never exceeds a certain limit. 

What do we achieve? If we want to have the latest updates in MongoDB, we sent updates as they come. However, if the statistic stream falls back, we can wait and send updates in bursts (batch update). We can postpone updates because updates are cumulated in objects.

The third stream happens rather occasionally: when there was outage, human error,
or we want to apply a new data transformation rule. It is used to replay or to recovery events. It uses in-memory small cache because it feeds on events from SQL database or log files which are persisted anyway. 

The processor executes operations as if it was a CPU operating on very long vector of memory. It pre-aggregates and pre-groups reports by creating time slots that describe what happened within given period of time. It does not only do count but also average, rate, min and max and some more. Actually all operations are done client side thus only $set is used to set document fields. Documents are bigger than documents storing only counters. 

Before performing any of these operations that are not supported by MongoDB atomic updates, the processor has to read the document . By doing that we warm up selected document but we issue additional request thus introduce latency. To avoid this latency we can cache (in-memory, or memcache if possible ) those documents that were touched last . It will be the next step I will take as it should increase performance by about 30-40 %. 

The processor runs MapReduce when a client wants to see combined view of what has happened or happened within a time period that spans across more than one time slice. Then the processor reduces many time slices into one that can be updated as if it were a regular time slot. However, instead of map-reducing millions of documents we reduce only few hundred. 

Potential next step could be to generate various length slots e.g. monthly, daily, hourly. Then an “execution plan” could optimizes queries and resources, for example,
by providing 500 monthly report instead of 80000 hourly reports of the same month to the map reduce engine or to the new aggregation framework. 

In order to keep up the pace, only data from MongoDB are processed. Calls to external data sources are forbidden as it would slow down the entire process. If one wants to provide external data, data have to be preloaded to MongoDB, or events have to be enriched before getting to queues hence the processor. 

As you can see I started off by data modelling. The time slots approach (found similar approach here: http://docs.mongodb.org/manual/use-cases/pre-aggregated-reports/) can save a lot of memory and improve performance. Then I differentiate streams and gave them priorities. Finally, external requests during event processing are forbidden I achieved this by making documents self-sufficient in terms of data. In the future, additional documents caching will help a lot.


Nine hundred agents generate about 160 statistical and 200 status events per second. This amount of traffic is successfully handled by two MongoDB shards installed on virtual server.