Topics

A Queue With Publishers and Subscribers

A topic-based system is one where messages are published to a named channel (topics) to one or more subscribers. The difference from a queue is that all subscribers receive the same message.

MongoDB has a special type of collection called a capped collection, and a special type of cursor called a tailable cursor, that lets you simply and elegantly model a topic-based publish and subscribe system.

A capped collection is basically a ring buffer, meaning they are a fixed size in bytes. As we insert documents into the capped collection, they are added one after the other in the buffer, until we reach the end of the buffer. Once it gets to the end of the buffer, it wraps around and starts overwriting the documents at the start of the buffer.

The main benefit of a capped collection is that it allows for a tailing cursor, meaning applications can get notified as new documents are inserted into the collection.

The first limitation of a capped collection is that inserted documents cannot grow. If you need to change the shape of a document, you will need to insert a pre-padded document to ensure your document does not grow outside the allocated space.

The second limitation of capped collections is that they are in-memory structures only. So if the MongoDB server is shutdown, you will loose the content of the capped collection. The capped collections, will however replicate across to secondaries in a replicaset. If you need fail over support, it’s recommended to deploy a replicaset.

A Ring Buffer

In this chapter we will use a stock ticker for our topic schema example.

Schema Observations

  • Leverages capped collections
  • Works with replication
  • Performs well
  • To many listeners impact CPU usage on MongoDB currently (MongoDB 3.0 or earlier).
  • Collection is not persistent. If the mongodb process dies, the capped collection is gone. Use a replicaset to avoid this problem.
  • Potentially read and write heavy.

Schema

Schema attribute Description
Optimized For Write Performance
Pre-Allocation Only if the documents need to grow

We will use a very simplified stock ticker schema to showcase how topics can be created and used with MongoDB.

{
  "time": ISODate("2014-01-01T10:01:22Z")
  "ticker": "PIP",
  "price": "4.45"
}

Example 1: A simple ticker document

Operations

Create the capped collection

To create the capped collection, we need to use the createCollection command, passing the capped collection options.

var db = db.getSisterDB("finance");
db.createCollection("ticker", {capped:true, size:100000, max: 100000})

Example 2: Create a capped collection

The available options are:

Option Type Description
capped boolean Signal that we want to create a capped collection
autoIndexId boolean Create index for the _id field. Set to false when replicating the collection
size number The size of the capped collection in bytes in the collection before wrapping around
max number The maximum number of documents in the collection before wrapping around
usePowerOf2Sizes boolean Default allocation strategy from 2.6 and on

Creating Random Ticker Documents

Let’s create some activity in our capped collection, by writing a publisher script. First boot up a new mongo shell and run a publisher that creates a new ticker document with a random price once a second.

var db = db.getSisterDB("finance");
while(true) {
  db.ticker.insert({
    time: new Date(),
    ticker: "PIP",
    price: 100 * Math.random(1000)
  })

  sleep(1000);
}

Example 3: Create random ticker document every second

Ticker Consumer

To consume the messages being published to the topic, let’s write a consumer script. Boot up another mongo shell, and run a script that will consume messages from the topic.

var db = db.getSisterDB("finance");
var cursor = db.ticker.find({time: {$gte: new Date()}}).addOption(DBQuery.Option.tailable).addOption(DBQuery.Option.awaitData)

while(cursor.hasNext) {
  print(JSON.stringify(cursor.next(), null, 2))
}

Example 4: Start up a consumer

The script will only receive tickers that are newer than the date and time the script was started.

Indexes

The indexes for this schema will vary by how you query the data in your consumers. In this simple stock ticker publisher/consumer, we can see that we query the collection by the time field. To avoid collection scans, we create an index on the time field.

var col = db.getSisterDB("finance").ticker;
col.ensureIndex({time: 1});

Example 5: Create the time index

Scaling

Secondary Reads

Secondary reads can help scale the number of subscribers. This allows an application to increase the number of listeners by distributing the read load across multiple secondaries.

Sharding

Capped collections do not support sharding.

Performance

Capped collections are in-memory collections and are limited to the speed of memory. Unfortunately, there is a performance limit caused by the way tailing cursors are currently implemented. As you add more and more tailing cursors, the MongoDB cpu usage increases markedly. This means that it’s important to keep the number of listeners low to get better throughput.

If you are just pulling data off the topic, and not performing any modifications to the documents using secondary reads with a replicaset might help scale the number of listeners you can deploy without creating undue cpu usage on the servers.

A simple exploration of the performance on a single machine with MongoDb 3.0 shows the difference between MMAP and WiredTiger for a narrow simulation using the schema simulation framework mongodb-schema-simulator.

Scenario

https://github.com/christkv/mongodb-schema-simulator/blob/master/examples/simulations/topic_simulation.js

MongoDb runs locally on a MacBook Pro Retina 2015 with ssd and 16 gb ram. The simulation runs with the following parameters against a single mongod instance under osx 10.10 Yosemite.

Parameters Value
processes 4
poolSize per process 50
type linear
Resolution in milliseconds 1000
Iterations run 25
Total number of users publishing per iteration 1000
Total number of users reading per iteration 40
Execution strategy slicetime, custom

MMAP

The MMAP engine is run using the default settings on MongoDB 3.0.1.

Topics Simulation

publish_to_topics scenario results

Statistics Value
Runtime 44.39 seconds
Mean 2.616 milliseconds
Standard Deviation 7.824 milliseconds
75 percentile 2.318 milliseconds
95 percentile 5.523 milliseconds
99 percentile 21.792 milliseconds
Minimum 0.703 milliseconds
Maximum 420.747 milliseconds

fetch_from_topics scenario results

Statistics Value
Runtime 44.39 seconds
Mean 0.001 milliseconds
Standard Deviation 0.003 milliseconds
75 percentile 0.002 milliseconds
95 percentile 0.004 milliseconds
99 percentile 0.02 milliseconds
Minimum ~0.00 milliseconds
Maximum 0.323 milliseconds

There doesn’t seem to much to note here. The writes per second is pretty much exactly 1000, mirroring the number of concurrent users we have defined in the simulation. As the capped collection is in-memory, the writing speed is limited only to MongoDB’s lock handling and memory performance.

One thing to keep in mind, is that before MongoDB 3.0 the MMAP engine only supported db level locking which meant it was better to place a high throughput capped collection in its own separate database. In MongoDB 3.0 the MMAP engine lowers the lock level to collection level which means it can coexist with other collections in the same database without impacting other collections with lock contention.

WiredTiger

The WiredTiger engine is run using the default settings on MongoDB 3.0.1.

Topics Simulation

publish_to_topics scenario results

Statistics Value
Runtime 45.39 seconds
Mean 2.848 milliseconds
Standard Deviation 11.15 milliseconds
75 percentile 2.432 milliseconds
95 percentile 5.699 milliseconds
99 percentile 22.597 milliseconds
Minimum 0.740 milliseconds
Maximum 522.21 milliseconds

fetch_from_topics scenario results

Statistics Value
Runtime 45.39 seconds
Mean 0.001 milliseconds
Standard Deviation 0.004 milliseconds
75 percentile 0.002 milliseconds
95 percentile 0.004 milliseconds
99 percentile 0.020 milliseconds
Minimum ~0.00 milliseconds
Maximum 0.363 milliseconds

WiredTiger does not really make a big difference here compared to MMAP as the back end storage system is memory and not disk. Remember that WiredTiger supports document level locking. So there is no need to take into consideration placing the capped collection in its own separate db to avoid lock contention, as for MMAP pre MongoDB 3.0.

Notes

Capped collection can be extremely useful to create topic like systems, where you have multiple publishers and subscribers. However due to the current limitations on how tailed cursors work in MongoDB currently, it’s important to balance the number of active listeners with the cpu usage.