Blog

Message Distribution / Routing

This blog explains the inner workings and rationale behind our system, focusing on how messages are distributed within our infrastructure.

  • December 16, 2021
  • Vlad Cealicu

Our first version of message distribution was a Socket.io server that all the publishers sent messages to, and everyone interested could subscribe from. It was an obvious single point of failure and also a bandwidth and CPU bottleneck. Throughout time we scaled it up and split the traffic by type of data. Our benchmark prices (CCCAGG) get distributed through one server, the index data has its own distribution server, and the individual exchange data is distributed through a third server. Each of these has a few clients in a tree-like structure.

In order to make the system described above more scalable, we tried to reduce the CPU load on the Socket.io internal routing servers. To do that we decided to not have granular subscriptions internally and just route based on the message type. A client (a streamer or an index calculation instance) subscribes to all exchange messages, you can’t pick and choose, this saves CPU cyles on the routing server level as there are a lot fewer subscriptions per client and the matching of subscription to the client is quite efficient. This tradeoff means clients generally have a lot more data than they need and it comes at a cost of CPU cycles (to drop the data that’s not needed) and an increase in bandwidth. Our goal in this type of architecture was to try to reduce CPU cycles at the data distribution and routing level.

In more recent times, we’ve been experimenting a lot with Redis Pub-Sub. As you might have read in our other blog posts, we use Redis quite extensively in our infrastructure. As part of our order book data distribution, we decided to publish order book data through Redis Pub-Sub instead of NSQ queues and so far it’s been performing much better. In our architecture, we have services that listen to raw data through queues so we can guarantee that internally we are not losing any messages. If we end up missing sequences due to the distribution layer having issues we just send a new snapshot to the clients so the recovery from missing a message is automatic for our external clients.

We benchmarked a Redis instance (4 CPUs 7GB of RAM) with one publishersending over 2 million messages a second and one client handling all of it. We tried to do the same with our own NodeJs WebSockets message routing implementation and the results were two orders of magnitude lower.

Fig 1. Message distribution diagram with 3 layers of depth

While Redis instances provided the best message / CPU ratio there are some caveats.

Using pattern subscriptions reduces performance

The more patterns you have the more checks happen against each message that is published. Even if the pattern does not match the channel the server still needs to do a regex against it to figure out that it does not match. So if you do want very high throughput, then don’t use patterns for subscribing to channels, just use direct subscriptions as much as possible. The pattern subscriptions are very good for lower levels of messages as they can simplify the code on the consumer side quite a lot (if you use a good channel naming convention).

Use replicas of replicas

Redis allows you to have a replica server that republishes everythingpublished on the main server. The good news with Redis is that a replica publishing server can also have other replicas associated with it. This allows for multi-layer tree structures for message distribution. To scale to a lot of consumers, in a high throughput environment, you can use replica instances and replicas of replicas.

Find a main / replica number that works for you

This is not an easy number to figure out as it relies heavily on your expected throughput. In our case, the magic number seems to be 4 as it gives us space to push it to 5 or 6 depending on the number of clients and it also allows us to try the 3 layer replica of replica architecture. This leads to 16 edge distribution servers each supporting 100 / 16 ~ 6 clients (Fig 1).

The main distribution server has 4 replicas that have 4 replicas each (3 layers). One publish on the main, ends up with 1 publish on each of the replicas so in total our message inflation rate is 4 per layer.

One message published on the main server leads to 4 messages being passed on from the main server to the layer two servers and 16 messages passed on from the layer 2 servers to layer 3 servers. We have around 100 clients of the layer three servers (streaming instances, index calculation instances, etc.) that can subscribe to either a subset of the data or all the data. Worst case scenario, if everyone is listening to a channel (coinbase~BTC-USD for example) we publish one message from the coinbase instance, the first layer (the main server) publishes it 4 more times (5 publishes in total), the second layer publishes it 16 times (21 publishes in total) and the third layer, assuming there are 100 streaming clients, publishes it 100 times (121 publishes in total). Assuming each streaming server has 400 clients (website users, API clients, etc) and all the clients are listening to coinbase~BTC-USD, the message gets published another 40,000 times (100 servers * 400 clients each) so in total, assuming all the streaming clients are interested in the coinbase~BTC-USD instrument, we end up doing 40,121 publishes. The message inflation per layer is a very important metric to stay on top, if not carefully monitored and accounted for, it can easily grow out of control and slow down the system.

Make sure you have a fallback and redundancy

We decided to shard our message publishing based on the message type. If we can’t publish to the main distribution instance that handles that type of message, we publish on the backup(Fig 1). This way if we lose the server at layer 1 then all the data flows to the secondary.

Connect edge clients to main and backup edge distributors

We have our internal clients (streamers, index calculators, etc.) connected to both the main and the backup edge distributor servers. We make sure we subscribe to the required messages on both sides, this guarantees that we receive data even if the main server is unavailable, assuming at least one of the layer one servers (the main and the backup) are up. The layer one servers are part of the same availability group so we generally have quite high guarantees from Azure that they would not be restarted at the same time. This does not cover the scenario of losing a layer two replica. The layer two replicas are part of the same availability group as well so we have high guarantees that they won’t all go down at the same time. If Azure has critical security patches to roll out then all the servers in an availability group will be restarted one by one. To handle layer 2 and layer 3 failures we are employing two approaches, both with different tradeoffs.

Approach one is to double the connections and deduplicate the messages. So instead of connecting to one layer 3 replica on the primary and secondary sides, we connect to two on each side, each corresponding to a different layer 2 replica. With this approach, we are guaranteed to receive the messages at least once if a layer one server, a layer 2, or a layer 3 server goes down. This approach unfortunately has a very high overhead as in 99.95% of the time, when all layer 2 and layer 3 servers are up, we are receiving the same message twice and have to deduplicate it. We reserve this approach for mission-critical systems that need to have as high guarantees as possible. Another benefit of this approach is that we can lose a layer 3 server and still not have data loss, we just pick another one to connect to. Having said that,these systems are generally better served by specialized queue software(RabbitMQ, Kafka, etc.) with guarantees on message deliveries.

Approach two is to listen to heartbeat messages and if a heartbeat message is missed, switch to a layer 3 group belonging to a different layer 2 replica. This way, depending on the frequency of heartbeats, we can switch quite quickly. This design has a much smaller overhead on the consumer, we never receive a message more than once. The downside is that it leads to data loss from the time we lose the layer 2 replica to the time we figure out, based on the missed heartbeat messages, that we need to move. It also means that, while losing a layer 3 replica is obvious (our connection is broken) and we can just switch to another, it still leads to data loss until we make the new connection and subscribe to the correct channels. There is another trick we use here where we open a connection to the layer 2 replica that our main connection on layer 3 belongs to, if we lose that connection, or stop getting heartbeats then that is also a clear indicator that the layer 2 replica is having issues. The difference between the layer 2 and layer 3 connection is that on layer 2 we only listen for heartbeats, on layer 3 we subscribe to all the required channels.

Designing systems for very high throughput and no data loss is hard, on a busy second our numbers of trades from all the 300 exchanges we have integrated might increase drastically from 10,000 trades per second to 200,000 trades per second and our system needs to be able to handle that peak load at any time. On the order book side, we could go from 100,000 messages per second to 2,000,000 messages per second in just a matter of moments as well.

Unlike any of the big exchanges, our systems don’t need to only handle traffic from one set of users, but from everyone trading crypto at any time in the world.

Disclaimer: Please note that the content of this blog post was created prior to our company's rebranding from CryptoCompare to CCData.

Message Distribution / Routing

Our first version of message distribution was a Socket.io server that all the publishers sent messages to, and everyone interested could subscribe from. It was an obvious single point of failure and also a bandwidth and CPU bottleneck. Throughout time we scaled it up and split the traffic by type of data. Our benchmark prices (CCCAGG) get distributed through one server, the index data has its own distribution server, and the individual exchange data is distributed through a third server. Each of these has a few clients in a tree-like structure.

In order to make the system described above more scalable, we tried to reduce the CPU load on the Socket.io internal routing servers. To do that we decided to not have granular subscriptions internally and just route based on the message type. A client (a streamer or an index calculation instance) subscribes to all exchange messages, you can’t pick and choose, this saves CPU cyles on the routing server level as there are a lot fewer subscriptions per client and the matching of subscription to the client is quite efficient. This tradeoff means clients generally have a lot more data than they need and it comes at a cost of CPU cycles (to drop the data that’s not needed) and an increase in bandwidth. Our goal in this type of architecture was to try to reduce CPU cycles at the data distribution and routing level.

In more recent times, we’ve been experimenting a lot with Redis Pub-Sub. As you might have read in our other blog posts, we use Redis quite extensively in our infrastructure. As part of our order book data distribution, we decided to publish order book data through Redis Pub-Sub instead of NSQ queues and so far it’s been performing much better. In our architecture, we have services that listen to raw data through queues so we can guarantee that internally we are not losing any messages. If we end up missing sequences due to the distribution layer having issues we just send a new snapshot to the clients so the recovery from missing a message is automatic for our external clients.

We benchmarked a Redis instance (4 CPUs 7GB of RAM) with one publishersending over 2 million messages a second and one client handling all of it. We tried to do the same with our own NodeJs WebSockets message routing implementation and the results were two orders of magnitude lower.

Fig 1. Message distribution diagram with 3 layers of depth

While Redis instances provided the best message / CPU ratio there are some caveats.

Using pattern subscriptions reduces performance

The more patterns you have the more checks happen against each message that is published. Even if the pattern does not match the channel the server still needs to do a regex against it to figure out that it does not match. So if you do want very high throughput, then don’t use patterns for subscribing to channels, just use direct subscriptions as much as possible. The pattern subscriptions are very good for lower levels of messages as they can simplify the code on the consumer side quite a lot (if you use a good channel naming convention).

Use replicas of replicas

Redis allows you to have a replica server that republishes everythingpublished on the main server. The good news with Redis is that a replica publishing server can also have other replicas associated with it. This allows for multi-layer tree structures for message distribution. To scale to a lot of consumers, in a high throughput environment, you can use replica instances and replicas of replicas.

Find a main / replica number that works for you

This is not an easy number to figure out as it relies heavily on your expected throughput. In our case, the magic number seems to be 4 as it gives us space to push it to 5 or 6 depending on the number of clients and it also allows us to try the 3 layer replica of replica architecture. This leads to 16 edge distribution servers each supporting 100 / 16 ~ 6 clients (Fig 1).

The main distribution server has 4 replicas that have 4 replicas each (3 layers). One publish on the main, ends up with 1 publish on each of the replicas so in total our message inflation rate is 4 per layer.

One message published on the main server leads to 4 messages being passed on from the main server to the layer two servers and 16 messages passed on from the layer 2 servers to layer 3 servers. We have around 100 clients of the layer three servers (streaming instances, index calculation instances, etc.) that can subscribe to either a subset of the data or all the data. Worst case scenario, if everyone is listening to a channel (coinbase~BTC-USD for example) we publish one message from the coinbase instance, the first layer (the main server) publishes it 4 more times (5 publishes in total), the second layer publishes it 16 times (21 publishes in total) and the third layer, assuming there are 100 streaming clients, publishes it 100 times (121 publishes in total). Assuming each streaming server has 400 clients (website users, API clients, etc) and all the clients are listening to coinbase~BTC-USD, the message gets published another 40,000 times (100 servers * 400 clients each) so in total, assuming all the streaming clients are interested in the coinbase~BTC-USD instrument, we end up doing 40,121 publishes. The message inflation per layer is a very important metric to stay on top, if not carefully monitored and accounted for, it can easily grow out of control and slow down the system.

Make sure you have a fallback and redundancy

We decided to shard our message publishing based on the message type. If we can’t publish to the main distribution instance that handles that type of message, we publish on the backup(Fig 1). This way if we lose the server at layer 1 then all the data flows to the secondary.

Connect edge clients to main and backup edge distributors

We have our internal clients (streamers, index calculators, etc.) connected to both the main and the backup edge distributor servers. We make sure we subscribe to the required messages on both sides, this guarantees that we receive data even if the main server is unavailable, assuming at least one of the layer one servers (the main and the backup) are up. The layer one servers are part of the same availability group so we generally have quite high guarantees from Azure that they would not be restarted at the same time. This does not cover the scenario of losing a layer two replica. The layer two replicas are part of the same availability group as well so we have high guarantees that they won’t all go down at the same time. If Azure has critical security patches to roll out then all the servers in an availability group will be restarted one by one. To handle layer 2 and layer 3 failures we are employing two approaches, both with different tradeoffs.

Approach one is to double the connections and deduplicate the messages. So instead of connecting to one layer 3 replica on the primary and secondary sides, we connect to two on each side, each corresponding to a different layer 2 replica. With this approach, we are guaranteed to receive the messages at least once if a layer one server, a layer 2, or a layer 3 server goes down. This approach unfortunately has a very high overhead as in 99.95% of the time, when all layer 2 and layer 3 servers are up, we are receiving the same message twice and have to deduplicate it. We reserve this approach for mission-critical systems that need to have as high guarantees as possible. Another benefit of this approach is that we can lose a layer 3 server and still not have data loss, we just pick another one to connect to. Having said that,these systems are generally better served by specialized queue software(RabbitMQ, Kafka, etc.) with guarantees on message deliveries.

Approach two is to listen to heartbeat messages and if a heartbeat message is missed, switch to a layer 3 group belonging to a different layer 2 replica. This way, depending on the frequency of heartbeats, we can switch quite quickly. This design has a much smaller overhead on the consumer, we never receive a message more than once. The downside is that it leads to data loss from the time we lose the layer 2 replica to the time we figure out, based on the missed heartbeat messages, that we need to move. It also means that, while losing a layer 3 replica is obvious (our connection is broken) and we can just switch to another, it still leads to data loss until we make the new connection and subscribe to the correct channels. There is another trick we use here where we open a connection to the layer 2 replica that our main connection on layer 3 belongs to, if we lose that connection, or stop getting heartbeats then that is also a clear indicator that the layer 2 replica is having issues. The difference between the layer 2 and layer 3 connection is that on layer 2 we only listen for heartbeats, on layer 3 we subscribe to all the required channels.

Designing systems for very high throughput and no data loss is hard, on a busy second our numbers of trades from all the 300 exchanges we have integrated might increase drastically from 10,000 trades per second to 200,000 trades per second and our system needs to be able to handle that peak load at any time. On the order book side, we could go from 100,000 messages per second to 2,000,000 messages per second in just a matter of moments as well.

Unlike any of the big exchanges, our systems don’t need to only handle traffic from one set of users, but from everyone trading crypto at any time in the world.

Disclaimer: Please note that the content of this blog post was created prior to our company's rebranding from CryptoCompare to CCData.

Stay Up To Date

Get our latest research, reports and event news delivered straight to your inbox.