Blog

Message Aggregation Service

This blog explores the functionality and reasoning behind our system. We will clearly detail how we consolidated multiple services from our initial architecture into a single service responsible for publishing, archiving, and distributing messages.

  • December 18, 2021
  • Vlad Cealicu

When we started the company in 2014, we had limited experience in organising message aggregation services. Our initial assumption was that a different microservice per message output type: streaming, archiving, historical minute/hour/day aggregation and tick data aggregation was the most optimal way. They all worked by tailing a daily trade file with each trade stored in our internal packed format. You could restart and recover each of them individually and they dropped any trades that were unmapped. While this approach worked well for trade messages, when we started looking at other types of messages (funding rate, open interest, index updates, futures trades, etc.) we realised that continuing down that path would lead to quite a lot of services, in the double-digit numbers just to be able to consume and propagate each message type. Another obvious fact was that, even though initially each service had its own modules, they slowly, over time just converged into one module for all of these services.

Over the years, we've made quite a few big changes to our message aggregation services in this blog post I’ll try to go through them one by one. What our original solution was, what problems it lead to and how we addressed them to make the aggregation service more resilient and scalable.

Moving from files written by the input and used as a queue for the output to a Redis list

One of our mottos in 2014 was that files are golden, no matter what happens, you can always recover from a file. As long as the instance is up and it has enough storage, it will always be able to write to a file. When we started, one of our main goals was the be able to process trade data from each exchangeas easily as possible. We decided that a daily trades file for each exchange integration was more than enough for building data that we can show on our website.

In 2017 we started getting enquiries for individual instrument trade dataand we realized that a big file with all the daily trades of an exchangeintegration is too broad. We added another service to split the daily trades file into daily trades files for each instrument. This system worked well for small numbers of exchanges. With time, our requirements became more complicated and, with over 300 integrations running at the same time, maintaining a lot of services that saved their state in files, a lot of trade files, and a lot of other state files on each of the instances became quite time-consuming and cumbersome.

While files have a lot of resilience they are not the fastest. It is much slower to read from and write to files than it is to work directly with the RAM. We also ran into a few scenarios where our state files would get corrupted and we sometimes had to manually edit them on the integration servers.

When exchanges were small, rebuilding a day of data from a file that was 80–90MB in size and contained around 1,00,000 trades was quite quick. As exchanges started becoming more performant, the number of trades an exchange could process a day grew as well. From Binance having a 20 MB trade file a day in 2017 to Binance having a trade file for a day that is 6–10 GB in 2021. These files contain messages (trades) that are quite small, each trade is packed in our own format and is 60–80 bytes in size.

By the time we were hitting all of these issues, we had been using Redis in production for a long time, our instance that contains all the tick data had been up for 7 years and it’s been serving and updating the price of each instrument for each trade that we have ever processed. We decided to switch from files to Redis as a method of communication from input to output. Not just for trades using a Redis list, but also for instrument discovery and for storing the state of each instrument.

Fig 1. Message Aggregation Service Outputs

Moving mapping from the integration servers to the API, WebSockets and Index servers

Around 2016 we started seeing a lot of instruments listed on multiple exchanges under the same id, or one instrument listed under different ids on different exchanges. Initially we were hard coding the mapping in the integration, but after some time this approach started causing issues and we decided to use files (files are golden) to store local data from our mapping interface.

With some of the exchanges at higher load and only a small team looking after 300 integrations, we were having issues with instance stability, and that sometimes lead to corrupted mapping files. At this time we were dropping trades for instruments that had not been mapped.

Due to the nature of the crypto industry, where instruments get mapped at different times and the industry being 24/7 we soon realized we were spending a lot of time trying to recover trades for instruments that had not been mapped in a timely fashion.

We decided to move mapping to higher levels and not use it at all on the integration server. This way we get all the benefits of mapping, on the API and Index/Benchmark services without the downside of discarding trades.

Each integration handles a subset of instruments on an exchange and it stores tick and trade data in an unmapped format. We apply a bit of logic on the instrument names at the integration level to make sure they are valid inside a CSV and when included in a URL.

The new mapping at higher levels means we can also give users access to data in individual market instrument id. You can either query for base=BTC and quote=USD on market=coinbase or for instrument=BTC-USD on market=coinbase.

Moving from our own trade packing and unpacking format to JSON pack and unpack

In our old integrations, I was feeling clever and wrote our own data format with its own pack/unpack functions. While this was more efficient in bandwidth and storage than JSON, it was also harder to work with. We decided to switch everything to JSON in the new version of the message aggregation service. This was a generally easy choice, most of our integration machines are CPU starved and not bandwidth starved. So we decided to replace our internal message format with JSON messages being passed everywhere in our system. We initially updated our public streamers to support JSON format and everyone was very happy with the change.

The only part of our system that does not save/send internal messages in JSON format is the archiving of trades and messages inside blob files and that is because most of our clients prefer CSV format. The less processing we need to do before sending them to clients, the cheaper and easier it is for us.

Moving from one output service per aggregation type to one output service per instrument type

Initially, we had all our output services tailing the same trades file waiting for new trades to be written. One service for historical candlestick data, one for tick data, one for streaming messages, and one for archiving unmapped trades. Multiple services tailing the same file is resource-consuming and not ideal. A few years ago we updated to code on all of these services and realized they use a lot of common logic. Even though they all worked as separate services, they ended up sharing the same modules. We had all the downsides of running multiple services with none of the benefits.

In our new output version, we decided to split based on instrument type instead of service type. We now have an output for spot instruments to process spot trade messages. One for derivative future instruments to process future trades, funding rate update, and open interest update messages. One for index instruments to process index update messages.

Depending on the types of instruments an exchange supports, some integrations will run multiple outputs.

Each output is in charge of creating historical candlestick data, creating current tick data, saving the messages in a CSV file, and sending all the data through our internal routing/distribution.

Moving to the output saving the raw trade data instead of the input

Since in the initial version everything relied on trade files and we moved everything to Redis, we were also able to move the writing of historical trades to the message aggregation service.

To ensure we don’t lose trades, we use a combination of brpoplpush and lpush Redis commands. When we first subscribe to a queue, we check if the in-process data of that queue is empty. If it is not, we move all the entries from the in-process queue to the queue we are subscribing to and start processing them. After processing a message, we lrem the message from the in-process list. This way if our message aggregation service happens to stop mid-processing a message, the next time it starts, it will try to process that message again. If we redeploy the service, it just pauses the queue, makes sure it finishes all the messages, saves its state, dumps all trades it has in memory to a file and gracefully shuts down. The trades are saved in CSV format and there is a Message to blob service that ships them to Azure Blob Storage, AWS Elastic Block Storage, and Google Cloud Storage.

Moving from a single Redis instance that stored our tick and recent historical data to a Redis Cluster

We’ve had all the tick current data stored in a single instance Redis on a 2 CPU, 4 GB of RAM virtual machine for the past 7 years and that machine has only gone down once. The Redis instance has never failed though and we’ve hit it with a lot of calls.

While this is fine as long as everything works fine, in a disaster that VM and instance of Redis are a very hot single point of failure. We’ve been meaning to replace it with a Redis Cluster for over 4 years, even since we’ve had a lot of success with storing the number of calls in a Redis Cluster and using a Redis Cluster as our main Rate Limit storage.

As part of our upgrade to the aggregation services, we decided to shard instruments by exchange and instrument id. Redis Cluster works by applying a function on the key name and getting back an integer from 0 to 16383 (16384 in total) also known as a key slot, you can assign as many key slots as you want to any node in the cluster. To keep big exchanges as distributed as possible, for unmapped instrument market data we went with the Redis key structure 706~{[exchange_internal_name]~[instrument_id]}, for historical latest data (the most-used historical data) we decided on the key structure 707~{[exchange_internal_name]~[instrument_id]}~[unitName]~[bucketName]. Redis shards based on what is found between the {} and this way we make sure we have both the historical and the latest data available on the same Redis instance and we can request them easily with a multi command.

Moving from standard Postgres tables to partitioned tables

For historical data, we kept our Postgres sharding system where each integration has three tables (minute, hour, day) and each instrument on that integration is assigned to a particular Postgres main database. We have six Postgres main databases cc-md-db-pg-collect-01 to cc-md-db-pg-collect-06, each with a few dozen replicas. The APIs use the replicas and the aggregation services write to the main servers.

We decided to use partitioned tables to make it easier to migrate data into the archive at later dates and to make queries faster for the latest data. A typical exchange integration with 1000 active instruments would hold:

  • 10,080,000 entries in the minute table per partition, each partition holding one week of data, from Monday to Sunday
  • 8,760,000 entries in the hourly table per partition, each partition holding one year of data, from the 1st of January to the 31st of December
  • 365,000 entries in the daily table per partition, each partition holding one year of data, from the 1st of January to the 31st of December

We noticed that by using partitions, our queries were over 3x faster and since in the worst-case scenario we select data from a maximum of 2 partitions, Postgres CPU and RAM usage actually went down.

All of this is just a subset of the improvements we did in our new aggregation services, overall they are much faster at processing data, more resilient, and quicker to start.

Monitoring was also an important focus for us and we’re now not just monitoring the state of the service (0 — Nominal, 1 — Warning, 2 — Critical) but we’re also keeping an eye on the size of the message queue, the state of the instance and a dozen other metrics.

Disclaimer: Please note that the content of this blog post was created prior to our company's rebranding from CryptoCompare to CCData.

Message Aggregation Service

When we started the company in 2014, we had limited experience in organising message aggregation services. Our initial assumption was that a different microservice per message output type: streaming, archiving, historical minute/hour/day aggregation and tick data aggregation was the most optimal way. They all worked by tailing a daily trade file with each trade stored in our internal packed format. You could restart and recover each of them individually and they dropped any trades that were unmapped. While this approach worked well for trade messages, when we started looking at other types of messages (funding rate, open interest, index updates, futures trades, etc.) we realised that continuing down that path would lead to quite a lot of services, in the double-digit numbers just to be able to consume and propagate each message type. Another obvious fact was that, even though initially each service had its own modules, they slowly, over time just converged into one module for all of these services.

Over the years, we've made quite a few big changes to our message aggregation services in this blog post I’ll try to go through them one by one. What our original solution was, what problems it lead to and how we addressed them to make the aggregation service more resilient and scalable.

Moving from files written by the input and used as a queue for the output to a Redis list

One of our mottos in 2014 was that files are golden, no matter what happens, you can always recover from a file. As long as the instance is up and it has enough storage, it will always be able to write to a file. When we started, one of our main goals was the be able to process trade data from each exchangeas easily as possible. We decided that a daily trades file for each exchange integration was more than enough for building data that we can show on our website.

In 2017 we started getting enquiries for individual instrument trade dataand we realized that a big file with all the daily trades of an exchangeintegration is too broad. We added another service to split the daily trades file into daily trades files for each instrument. This system worked well for small numbers of exchanges. With time, our requirements became more complicated and, with over 300 integrations running at the same time, maintaining a lot of services that saved their state in files, a lot of trade files, and a lot of other state files on each of the instances became quite time-consuming and cumbersome.

While files have a lot of resilience they are not the fastest. It is much slower to read from and write to files than it is to work directly with the RAM. We also ran into a few scenarios where our state files would get corrupted and we sometimes had to manually edit them on the integration servers.

When exchanges were small, rebuilding a day of data from a file that was 80–90MB in size and contained around 1,00,000 trades was quite quick. As exchanges started becoming more performant, the number of trades an exchange could process a day grew as well. From Binance having a 20 MB trade file a day in 2017 to Binance having a trade file for a day that is 6–10 GB in 2021. These files contain messages (trades) that are quite small, each trade is packed in our own format and is 60–80 bytes in size.

By the time we were hitting all of these issues, we had been using Redis in production for a long time, our instance that contains all the tick data had been up for 7 years and it’s been serving and updating the price of each instrument for each trade that we have ever processed. We decided to switch from files to Redis as a method of communication from input to output. Not just for trades using a Redis list, but also for instrument discovery and for storing the state of each instrument.

Fig 1. Message Aggregation Service Outputs

Moving mapping from the integration servers to the API, WebSockets and Index servers

Around 2016 we started seeing a lot of instruments listed on multiple exchanges under the same id, or one instrument listed under different ids on different exchanges. Initially we were hard coding the mapping in the integration, but after some time this approach started causing issues and we decided to use files (files are golden) to store local data from our mapping interface.

With some of the exchanges at higher load and only a small team looking after 300 integrations, we were having issues with instance stability, and that sometimes lead to corrupted mapping files. At this time we were dropping trades for instruments that had not been mapped.

Due to the nature of the crypto industry, where instruments get mapped at different times and the industry being 24/7 we soon realized we were spending a lot of time trying to recover trades for instruments that had not been mapped in a timely fashion.

We decided to move mapping to higher levels and not use it at all on the integration server. This way we get all the benefits of mapping, on the API and Index/Benchmark services without the downside of discarding trades.

Each integration handles a subset of instruments on an exchange and it stores tick and trade data in an unmapped format. We apply a bit of logic on the instrument names at the integration level to make sure they are valid inside a CSV and when included in a URL.

The new mapping at higher levels means we can also give users access to data in individual market instrument id. You can either query for base=BTC and quote=USD on market=coinbase or for instrument=BTC-USD on market=coinbase.

Moving from our own trade packing and unpacking format to JSON pack and unpack

In our old integrations, I was feeling clever and wrote our own data format with its own pack/unpack functions. While this was more efficient in bandwidth and storage than JSON, it was also harder to work with. We decided to switch everything to JSON in the new version of the message aggregation service. This was a generally easy choice, most of our integration machines are CPU starved and not bandwidth starved. So we decided to replace our internal message format with JSON messages being passed everywhere in our system. We initially updated our public streamers to support JSON format and everyone was very happy with the change.

The only part of our system that does not save/send internal messages in JSON format is the archiving of trades and messages inside blob files and that is because most of our clients prefer CSV format. The less processing we need to do before sending them to clients, the cheaper and easier it is for us.

Moving from one output service per aggregation type to one output service per instrument type

Initially, we had all our output services tailing the same trades file waiting for new trades to be written. One service for historical candlestick data, one for tick data, one for streaming messages, and one for archiving unmapped trades. Multiple services tailing the same file is resource-consuming and not ideal. A few years ago we updated to code on all of these services and realized they use a lot of common logic. Even though they all worked as separate services, they ended up sharing the same modules. We had all the downsides of running multiple services with none of the benefits.

In our new output version, we decided to split based on instrument type instead of service type. We now have an output for spot instruments to process spot trade messages. One for derivative future instruments to process future trades, funding rate update, and open interest update messages. One for index instruments to process index update messages.

Depending on the types of instruments an exchange supports, some integrations will run multiple outputs.

Each output is in charge of creating historical candlestick data, creating current tick data, saving the messages in a CSV file, and sending all the data through our internal routing/distribution.

Moving to the output saving the raw trade data instead of the input

Since in the initial version everything relied on trade files and we moved everything to Redis, we were also able to move the writing of historical trades to the message aggregation service.

To ensure we don’t lose trades, we use a combination of brpoplpush and lpush Redis commands. When we first subscribe to a queue, we check if the in-process data of that queue is empty. If it is not, we move all the entries from the in-process queue to the queue we are subscribing to and start processing them. After processing a message, we lrem the message from the in-process list. This way if our message aggregation service happens to stop mid-processing a message, the next time it starts, it will try to process that message again. If we redeploy the service, it just pauses the queue, makes sure it finishes all the messages, saves its state, dumps all trades it has in memory to a file and gracefully shuts down. The trades are saved in CSV format and there is a Message to blob service that ships them to Azure Blob Storage, AWS Elastic Block Storage, and Google Cloud Storage.

Moving from a single Redis instance that stored our tick and recent historical data to a Redis Cluster

We’ve had all the tick current data stored in a single instance Redis on a 2 CPU, 4 GB of RAM virtual machine for the past 7 years and that machine has only gone down once. The Redis instance has never failed though and we’ve hit it with a lot of calls.

While this is fine as long as everything works fine, in a disaster that VM and instance of Redis are a very hot single point of failure. We’ve been meaning to replace it with a Redis Cluster for over 4 years, even since we’ve had a lot of success with storing the number of calls in a Redis Cluster and using a Redis Cluster as our main Rate Limit storage.

As part of our upgrade to the aggregation services, we decided to shard instruments by exchange and instrument id. Redis Cluster works by applying a function on the key name and getting back an integer from 0 to 16383 (16384 in total) also known as a key slot, you can assign as many key slots as you want to any node in the cluster. To keep big exchanges as distributed as possible, for unmapped instrument market data we went with the Redis key structure 706~{[exchange_internal_name]~[instrument_id]}, for historical latest data (the most-used historical data) we decided on the key structure 707~{[exchange_internal_name]~[instrument_id]}~[unitName]~[bucketName]. Redis shards based on what is found between the {} and this way we make sure we have both the historical and the latest data available on the same Redis instance and we can request them easily with a multi command.

Moving from standard Postgres tables to partitioned tables

For historical data, we kept our Postgres sharding system where each integration has three tables (minute, hour, day) and each instrument on that integration is assigned to a particular Postgres main database. We have six Postgres main databases cc-md-db-pg-collect-01 to cc-md-db-pg-collect-06, each with a few dozen replicas. The APIs use the replicas and the aggregation services write to the main servers.

We decided to use partitioned tables to make it easier to migrate data into the archive at later dates and to make queries faster for the latest data. A typical exchange integration with 1000 active instruments would hold:

  • 10,080,000 entries in the minute table per partition, each partition holding one week of data, from Monday to Sunday
  • 8,760,000 entries in the hourly table per partition, each partition holding one year of data, from the 1st of January to the 31st of December
  • 365,000 entries in the daily table per partition, each partition holding one year of data, from the 1st of January to the 31st of December

We noticed that by using partitions, our queries were over 3x faster and since in the worst-case scenario we select data from a maximum of 2 partitions, Postgres CPU and RAM usage actually went down.

All of this is just a subset of the improvements we did in our new aggregation services, overall they are much faster at processing data, more resilient, and quicker to start.

Monitoring was also an important focus for us and we’re now not just monitoring the state of the service (0 — Nominal, 1 — Warning, 2 — Critical) but we’re also keeping an eye on the size of the message queue, the state of the instance and a dozen other metrics.

Disclaimer: Please note that the content of this blog post was created prior to our company's rebranding from CryptoCompare to CCData.

Stay Up To Date

Get our latest research, reports and event news delivered straight to your inbox.