Blog

Exchange Discovery Service (EDS)— Exchange Integration Sharding

This blog provides insights into our system's functionality and logic. In this blog, we will clearly explain how we distribute exchanges and instruments across various instances.

  • December 14, 2021
  • Vlad Cealicu

One of the biggest bottlenecks in our old infrastructure was that we could not have more than one instance per integration. So each exchange got its own virtual machine/container, but as we found out, not all exchanges are equal. Some have a lot more trades/order book messages than others and some end up not doing much work. While the Proxy Swam helped us share some of the load, we need to take even more load of each individual integration instance.

In our new version, we decided to make it possible to shard an exchange across as many virtual machines/containers as needed. To do this we decided we need a way to control how each instrument is assigned to at least one instance of an exchange integration. This is where our Exchange Discovery Service comes in.

While in theory this can be easily done with a cluster that controls the distribution, we still needed to keep true to our main Reliability Motto — if everything fails, an integration that is consuming data will continue to consume data. This means that even if there is a high availability cluster set up, we still need to make sure that a single point of orchestration/failure does not bring all our integrations down at once. Using software that allows clustering and high availability does not mean 100% uptime and data loss and recovery is a very expensive and time-consuming task. The only tradeoff we planned for is that when an exchange integration is set up, the Exchange Discovery Service database needs to be available.

Fig 1. Exchange Discovery Sevice Diagram

Our Exchange Discovery Service is made up of 6 components:

  • The instrument integration service
  • The state sync service
  • The exchange discovery service distributor
  • The exchange discovery service database
  • The exchange discovery service API
  • The exchange discovery service dashboard

The instrument integration service

This historically used to be part of our trades polling or WebSocket integrations. We decided to split it into its own service as soon as we noticed that polling or connecting to data would need to be sharded across instances.

By having a specific service in charge of instrument discovery and metadatawe are able to control the flow of data easier. It is one of our templated services and for spot exchanges, we generally only have to call an endpoint (using our Proxy Swarm) and then map the response of the endpoint to specific fields. Data generated by this service is all written in the exchange discovery service database. With Coinbase for example, all we have to do is map their instrument field id to the right place. Overall, with all the dependencies, logging, and monitoring, the Coinbase instruments integration is under 150 lines of code.

Fig 2. Coinbase Polling Instrument Integration

One of the interesting functions here is the cleanInstrumentId. Even though we keep the data as close to the format we receive from the integration as we can, we have to sanitize some of it.

exchangeFilter.cleanInstrumentId = function(instrumentId) { /*   removing commas because Instruments IDs are part of .CSV files and would break the CSV format  removing forward slashes because Instruments IDs are also filenames, and forward slashes in file names create folders  removing & because it breaks querying on any API  removing ? because it breaks querying on any API */ return instrumentId.replace(/[,/&?]/g, ‘’);};

Other exchanges are a lot more involved when it comes to the instrument service. With Binance, an exchange that serves spot trades, futures trades, index, funding rate and open interest the overall size of the instrument service is around 500 lines of code. Still not unmanageable by one developer but a lot more complicated to follow.

The instrument service does not just tell us how to connect and what to stream or poll for, it also adds a lot of metadata for each instrument. Fields like: first and last seen on different instrument integrations types (polling, streaming, hardcoded, import), the streaming subscription, rest URL and URI as well as instrument status (ACTIVE, RETIRED, MIGRATING, PAUSED etc.). On derivatives, there are a lot more fields depending on the derivative type (future, option, etc.). There is data written in the instrument metadata fields by the other integrations as well, for example, the trades spot integration would write first and last trade spot in internal and in external format, the last request, last response, next request timestamps, the last proxied rest request, and many other metadata fields.

The state sync service

This is a service in charge of making sure the exchange has enough data to run in isolation. It syncs local state data that is needed globally like the first and last seen on polling fields to the Exchange Service Discovery Database. It also syncs global data like instrument status and sharding assignment to the local data store. It acts as the bridge between the global data that is used by our ops people and by our clients and the local data that is needed by the services to be able to run. It is also in charge of moving instruments and changing sharding assignments from one instance to another.

The exchange discovery service distributor

This middle layer is in charge of streaming actions users take on the dashboard back to the state sync services. This is a set of Redis instances specifically dedicated to this. We subscribe to messages on the state sync service and we publish messages from the API, making communication between the API and the backend much quicker.

The exchange discovery service database

This middle layer is a Redis Cluster in charge of storing all instrument metadata. It acts as the middle layer between the API and the state sync service and it’s the authoritative source of truth when it comes to the rest of our infrastructure. The state sync service can always fall back on this if it misses data from the exchange discovery service distributor and it checks this data for inconsistencies every minute. The most important data in the cluster is the exchange instrument sharding data. An instrument is either assigned specifically to an instance or it is by default on the catch-all instance. Newly listed instruments are assigned to the catch-all instance and are then migrated, when needed, to individual instances.

You might ask why we need a set of Redis instances for the distributor and another one for the database, why not just use the cluster for the distribution as well?

  • Scaling, it’s much easier to scale if you separate the two responsibilities as we might have a lot more reads of the data than we have updates through the streaming
  • Different systems will need access to state but not streaming or streaming but not state. We can do that at firewall level if the two are separate instances
  • Redis pub-sub behaves differently in a cluster compared to key distribution
  • Having the distribution and the state as separate systems it means we can update the distribution to RabbitMQ or another queue systemwithout having an impact on the state data
  • Having the distribution and the state as separate systems it means we can update the state to PostgreSQL or another database without having an impact on distribution data.

The exchange discovery service API

This service is quite straightforward, it has logic around reading the exchange discovery service database and figuring out what’s new in terms of instruments and exchanges. It has logic around sharding and it is the only way to impact sharding and exchange instrument distribution. It connects to both the EDS database and the EDS distributor. It is used by the EDS dashboard to show our ops people any new instruments, when is the last time we did something with an instrument, it can also access direct instrument metadata and stats. It’s the glue that makes the EDS dashboard talk to the rest of the services. It is written in Nodejs, using our own framework and it does authentication, rate limiting, and access control. It has Nginx in front of it and is running on at least two instances behind a load balancer.

The exchange discovery service dashboard

This dashboard, just like our other internal tools dashboards is written in Vue.js and deployed as a static HTML package. It relies on the EDS API. Using the dashboard our ops team can pre-shard instruments by instance and they can check and rebalance the shards when needed. We’ll expand at a later date on how we move an instrument from one instance to another.

Disclaimer: Please note that the content of this blog post was created prior to our company's rebranding from CryptoCompare to CCData.

Exchange Discovery Service (EDS)— Exchange Integration Sharding

One of the biggest bottlenecks in our old infrastructure was that we could not have more than one instance per integration. So each exchange got its own virtual machine/container, but as we found out, not all exchanges are equal. Some have a lot more trades/order book messages than others and some end up not doing much work. While the Proxy Swam helped us share some of the load, we need to take even more load of each individual integration instance.

In our new version, we decided to make it possible to shard an exchange across as many virtual machines/containers as needed. To do this we decided we need a way to control how each instrument is assigned to at least one instance of an exchange integration. This is where our Exchange Discovery Service comes in.

While in theory this can be easily done with a cluster that controls the distribution, we still needed to keep true to our main Reliability Motto — if everything fails, an integration that is consuming data will continue to consume data. This means that even if there is a high availability cluster set up, we still need to make sure that a single point of orchestration/failure does not bring all our integrations down at once. Using software that allows clustering and high availability does not mean 100% uptime and data loss and recovery is a very expensive and time-consuming task. The only tradeoff we planned for is that when an exchange integration is set up, the Exchange Discovery Service database needs to be available.

Fig 1. Exchange Discovery Sevice Diagram

Our Exchange Discovery Service is made up of 6 components:

  • The instrument integration service
  • The state sync service
  • The exchange discovery service distributor
  • The exchange discovery service database
  • The exchange discovery service API
  • The exchange discovery service dashboard

The instrument integration service

This historically used to be part of our trades polling or WebSocket integrations. We decided to split it into its own service as soon as we noticed that polling or connecting to data would need to be sharded across instances.

By having a specific service in charge of instrument discovery and metadatawe are able to control the flow of data easier. It is one of our templated services and for spot exchanges, we generally only have to call an endpoint (using our Proxy Swarm) and then map the response of the endpoint to specific fields. Data generated by this service is all written in the exchange discovery service database. With Coinbase for example, all we have to do is map their instrument field id to the right place. Overall, with all the dependencies, logging, and monitoring, the Coinbase instruments integration is under 150 lines of code.

Fig 2. Coinbase Polling Instrument Integration

One of the interesting functions here is the cleanInstrumentId. Even though we keep the data as close to the format we receive from the integration as we can, we have to sanitize some of it.

exchangeFilter.cleanInstrumentId = function(instrumentId) { /*   removing commas because Instruments IDs are part of .CSV files and would break the CSV format  removing forward slashes because Instruments IDs are also filenames, and forward slashes in file names create folders  removing & because it breaks querying on any API  removing ? because it breaks querying on any API */ return instrumentId.replace(/[,/&?]/g, ‘’);};

Other exchanges are a lot more involved when it comes to the instrument service. With Binance, an exchange that serves spot trades, futures trades, index, funding rate and open interest the overall size of the instrument service is around 500 lines of code. Still not unmanageable by one developer but a lot more complicated to follow.

The instrument service does not just tell us how to connect and what to stream or poll for, it also adds a lot of metadata for each instrument. Fields like: first and last seen on different instrument integrations types (polling, streaming, hardcoded, import), the streaming subscription, rest URL and URI as well as instrument status (ACTIVE, RETIRED, MIGRATING, PAUSED etc.). On derivatives, there are a lot more fields depending on the derivative type (future, option, etc.). There is data written in the instrument metadata fields by the other integrations as well, for example, the trades spot integration would write first and last trade spot in internal and in external format, the last request, last response, next request timestamps, the last proxied rest request, and many other metadata fields.

The state sync service

This is a service in charge of making sure the exchange has enough data to run in isolation. It syncs local state data that is needed globally like the first and last seen on polling fields to the Exchange Service Discovery Database. It also syncs global data like instrument status and sharding assignment to the local data store. It acts as the bridge between the global data that is used by our ops people and by our clients and the local data that is needed by the services to be able to run. It is also in charge of moving instruments and changing sharding assignments from one instance to another.

The exchange discovery service distributor

This middle layer is in charge of streaming actions users take on the dashboard back to the state sync services. This is a set of Redis instances specifically dedicated to this. We subscribe to messages on the state sync service and we publish messages from the API, making communication between the API and the backend much quicker.

The exchange discovery service database

This middle layer is a Redis Cluster in charge of storing all instrument metadata. It acts as the middle layer between the API and the state sync service and it’s the authoritative source of truth when it comes to the rest of our infrastructure. The state sync service can always fall back on this if it misses data from the exchange discovery service distributor and it checks this data for inconsistencies every minute. The most important data in the cluster is the exchange instrument sharding data. An instrument is either assigned specifically to an instance or it is by default on the catch-all instance. Newly listed instruments are assigned to the catch-all instance and are then migrated, when needed, to individual instances.

You might ask why we need a set of Redis instances for the distributor and another one for the database, why not just use the cluster for the distribution as well?

  • Scaling, it’s much easier to scale if you separate the two responsibilities as we might have a lot more reads of the data than we have updates through the streaming
  • Different systems will need access to state but not streaming or streaming but not state. We can do that at firewall level if the two are separate instances
  • Redis pub-sub behaves differently in a cluster compared to key distribution
  • Having the distribution and the state as separate systems it means we can update the distribution to RabbitMQ or another queue systemwithout having an impact on the state data
  • Having the distribution and the state as separate systems it means we can update the state to PostgreSQL or another database without having an impact on distribution data.

The exchange discovery service API

This service is quite straightforward, it has logic around reading the exchange discovery service database and figuring out what’s new in terms of instruments and exchanges. It has logic around sharding and it is the only way to impact sharding and exchange instrument distribution. It connects to both the EDS database and the EDS distributor. It is used by the EDS dashboard to show our ops people any new instruments, when is the last time we did something with an instrument, it can also access direct instrument metadata and stats. It’s the glue that makes the EDS dashboard talk to the rest of the services. It is written in Nodejs, using our own framework and it does authentication, rate limiting, and access control. It has Nginx in front of it and is running on at least two instances behind a load balancer.

The exchange discovery service dashboard

This dashboard, just like our other internal tools dashboards is written in Vue.js and deployed as a static HTML package. It relies on the EDS API. Using the dashboard our ops team can pre-shard instruments by instance and they can check and rebalance the shards when needed. We’ll expand at a later date on how we move an instrument from one instance to another.

Disclaimer: Please note that the content of this blog post was created prior to our company's rebranding from CryptoCompare to CCData.

Stay Up To Date

Get our latest research, reports and event news delivered straight to your inbox.