Building a Market Data Feed with Liquibook

Building a Market Data Feed with Liquibook

By Jeff Schmitz, OCI Principal Software Engineer

August 2013


Introduction

The financial industry relies heavily on systems to disseminate and interpret market data. Both trading decisions and analytics are based on the current state of the market for a particular security. A typical trading exchange involves multiple parties, and in a conceptual view looks like this:

Figure 1: Conceptual View of Exchange
Figure 1: Conceptual View of Exchange

Building such an exchange is a complex undertaking, involving order taking, matching, serialization and compression to a feed, dissemination, deserialization, decompression, and interpretation. This paper will demonstrate how to build such a system using only open-source components, including Liquibook for limit order book matching, QuickFAST for compression and decompression, and Boost ASIO for distribution.

Liquibook Background

Liquibook is a C++ open source limit order book matching engine. It is a component library that you can integrate into your project for implementing matching algorithms. Liquibook includes components for an order book, aggregate depth tracking, BBO tracking, and a trade feed.

QuickFAST Background

The FAST protocol is a compression and encoding mechanism for streaming data. QuickFAST is an implementation of the FAST protocol in C++ for encoding and decoding message streams using the FAST protocol.

Boost ASIO Background

Boost ASIO is a cross-platform C++ library for asynchronous network programming.

Example System

The example project for this paper will take on a more advanced use case: simulating an exchange producing a 5-level depth feed - aggregated by price, and a trade feed. This example involves a number of steps:

Accept Orders

The example exchange generates orders, simulating the receipt of orders from traders. A true exchange takes orders from traders by means of an API - often via the FIX protocol. For the purposes of this paper, internally generated orders will suffice.

Maintain a Limit Order Book for Each Security

The example exchange must be ready to match unfilled orders against inbound orders. This is done by means of a limit order book. The limit order book keeps these orders sorted so that when multiple existing orders can match a new inbound order, the matching is performed in a sequence determined by established exchange rules. The example exchange uses Liquibook's DepthOrderBook class to maintain the limit order book.

Match Buy and Sell Orders

The example exchange must match new inbound orders against those in the limit order book. The exchange must detect a match, and fill the appropriate orders. Liquibook's order book classes naturally includes matching capability.

Update Trading Client Order Status

An exchange must always notify traders of changes in their order status. This includes accepting, filling, and rejecting orders, but also accepting or rejecting order cancellation and replacement requests. In the example exchange, there are no trading clients (the orders are randomly generated), so this step will be skipped. Liquibook does, however, provide an interface and make callbacks for changes in order status, which can easily be turned into notifications sent to trading clients.

Aggregate the Top 5 Price Levels into Depth

While a full order book feed is useful, some exchanges choose to send aggregated forms of data instead, known as depth. Rather than representing individual orders, depth models price levels as a whole. An exchange will also limit the number of levels distributed - top 5 and top 10 are common limits.

The example exchange goes through the extra step and complexity of building 5-level depth. Liquibook provides the DepthOrderBook class, which manages the order book and depth, and the DepthListener class for being notified of changes to the top few depth levels.

Determine Changes to Depth in Response to an Event

It is not enough to maintain the depth. In order to produce an incremental feed, the example exchange must also be able to determine which levels of the depth have changed. A change can happen to one or more levels in the depth, on both the buy and sell sides, in response to a single order event. Liquibook has an interface for interrogating the various levels of depth and finding out if it has changed since the last published change. This makes it trivial to build an incremental feed - sending only changed levels to feed clients.

Compress and Distributing Market Data in a Feed

In order to effectively trade, feed clients have a need to understand order flow in the exchange. Feeds in which clients are notified of all possible order detail are known as "quote" feeds or "full order book" feeds, while those which aggregate orders by price are called "depth" feeds. A "BBO" feed gives only the top-level quote for a security, and a "trade" feed reflects all trades, or matched orders. Liquibook has support for building all of these feeds.

The example exchange builds both a trade and depth feed, then compresses the trade and depth messages using QuickFAST and sends them to clients using Boost ASIO.

Handle the Feed in a Client Program

Finally, the example includes feed clients that connect to the feed, decode the updates to recreate the trade events and reproduce the market depth. The clients use QuickFAST for decoding, Boost ASIO to handle network I/O, and Liquibook to reproduce the depth.

Getting Started

If you intend to build this project, you will need to download and build:

  1. Boost version 1.53 or later
  2. Xerces version 3.1.1 or later
  3. QuickFAST version 1.5.0 or later
  4. Liquibook version 1.1.0 or later

The project is found in total within the directory examples/depth_feed_publisher of Liquibook.

The Example Exchange

The example exchange consists of two applications, a publisher and a subscriber. The figure below illustrates the flow of data in the example exchange.

Figure 2: Example Exchange Data Flow

Figure 2: Example Exchange Data Flow

Publisher Application

The publisher implements the exchange in the example project. The publisher generates orders and adds them to the proper order book, handles the trade and depth events through listeners, encodes these using QuickFAST, and sends them using Boost ASIO.

The Order Book

As an exchange, the publisher must implement an order book. Liquibook comes with an order book class - two actually: book::OrderBook for managing an order book only, and a derived class book::DepthOrderBook for adding depth aggregation to that order book. Since the publisher builds a depth feed, it usesbook::DepthOrderBook. The relevant parts of the book::OrderBook class are:

book/order_book.h: Selected parts of OrderBook class declaration
  1. namespace liquibook { namespace book {
  2.  
  3. template <class OrderPtr = Order*>
  4. class OrderBook {
  5. public:
  6. typedef OrderBook<OrderPtr > MyClass;
  7. typedef TradeListener<MyClass > TypedTradeListener;
  8.  
  9. /// @brief set the trade listener
  10. void set_trade_listener(TypedTradeListener* listener);
  11.  
  12. /// @brief add an order to book
  13. /// @param order the order to add
  14. /// @param conditions special conditions on the order
  15. /// @return true if the add resulted in a fill
  16. virtual bool add(const OrderPtr& order, OrderConditions conditions = 0);
  17.  
  18. /// @brief perform all callbacks in the queue
  19. virtual void perform_callbacks();
  20. };
  21.  
  22. } }
  23.  

Note that the book::OrderBook class is a template class, allowing the user to define not only the order class used in the order book, but the style of pointer used. This can be a regular pointer or a smart pointer.

The first method of interest allows a setting of a listener for trades. This listener, shown later, gets notified when a trade occurs. The example exchange needs this notification to build a trade feed, and thus sets the trade listener. The book::OrderBook class also includes listeners for all order status updates, for providing updates back to the trade clients, and a listener for all changes to the order book, to build a full order book feed.

Next is a method to add an order to the order book. add() accepts an order pointer, and condition flags for special conditions, like an immediate or cancel order. Note that there are also methods (not used in the example project) to cancel an order and to replace an order.

Finally, there is a method to perform the callbacks on the order book. Liquibook gives the client code the responsibility to execute this method, so that it can be done in the calling thread, or a background thread. By default, this method calls perform_callback() for each callback in the queue. Naturally, perform_callback() can be overridden, but that is not necessary in the publisher. The default implementation of this method issues callbacks for to the trade listener, order listener, and order book listener, if present.

The derived book::DepthOrderBook class is simpler:

book/depth_order_book.h: DepthOrderBook class declaration
  1. namespace liquibook { namespace book {
  2.  
  3. /// @brief Implementation of order book child class, that incorporates
  4. /// aggregate depth tracking. Overrides perform_callback() method to
  5. // track depth aggregated by price.
  6. template <class OrderPtr = Order*, int SIZE = 5>
  7. class DepthOrderBook : public OrderBook<OrderPtr> {
  8. public:
  9. typedef Depth<SIZE> DepthTracker;
  10. typedef BboListener<DepthOrderBook >TypedBboListener;
  11. typedef DepthListener<DepthOrderBook >TypedDepthListener;
  12. typedef Callback<OrderPtr> DobCallback;
  13.  
  14. /// @brief construct
  15. DepthOrderBook();
  16.  
  17. /// @brief set the BBO listener
  18. void set_bbo_listener(TypedBboListener* bbo_listener);
  19.  
  20. /// @brief set the depth listener
  21. void set_depth_listener(TypedDepthListener* depth_listener);
  22.  
  23. /// @brief handle a single callback
  24. virtual void perform_callback(DobCallback& cb);
  25.  
  26. // @brief access the depth tracker
  27. DepthTracker& depth();
  28.  
  29. // @brief access the depth tracker
  30. const DepthTracker& depth() const;
  31.  
  32. private:
  33. DepthTracker depth_;
  34. TypedBboListener* bbo_listener_;
  35. TypedDepthListener* depth_listener_;
  36. };

book::DepthOrderBook overrides perform_callback() to update its depth, which is accessible through the depth() accessor methods. In addition, book::DepthOrderBook adds two new listeners - a BBO listener, for tracking only changes to the best bid and best offer, and a depth listener, for tracking all depth changes.

To build the exchange, the publisher must create a book::DepthOrderBook, and it set the trade listener and depth listener. While handling these callbacks, the publisher will need to update the feed clients with trade and depth update messages.

Note that in some of the callbacks, a pointer to the order book is provided, while a pointer to the order is provided in others. This gives the publisher two opportunities to provide additional fields and logic through inheritance. The publisher takes advantage of this, providing access to the symbol of the order book's security:

example_order_book.h: ExampleOrderBook class declaration
  1. namespace liquibook { namespace examples {
  2.  
  3. typedef boost::shared_ptr<Order> OrderPtr;
  4.  
  5. class ExampleOrderBook : public book::DepthOrderBook<OrderPtr> {
  6. public:
  7. ExampleOrderBook(const std::string& symbol);
  8. const std::string& symbol() const;
  9.  
  10. private:
  11. std::string symbol_;
  12. };
  13.  
  14. } } // End namespace

Here the reader will note that the ExampleOrderBook class specializes book::DepthOrderBook and binds a shared pointer of Order to its parent's template argument. The Order class is shown below. The trivial implementation of the ExampleOrderBook is omitted from this paper. The class diagram for order books looks like this:

Figure 3: Order Book Class Diagram
Figure 3: Order Book Class Diagram

The Exchange

The ExampleOrderBook class manages the order book and depth for a single security. Since exchanges handle multiple securities, somewhere the publisher needs to create an order book for each security, and maintain a mapping from a symbol to its order book. The publisher does this in the Exchange class:

exchange.h: Exchange class declaration
  1. namespace liquibook { namespace examples {
  2.  
  3. class Exchange {
  4. public:
  5. Exchange(ExampleOrderBook::TypedDepthListener* depth_listener,
  6. ExampleOrderBook::TypedTradeListener* trade_listener);
  7.  
  8. // Permanently add an order book to the exchange
  9. void add_order_book(const std::string& symbol);
  10.  
  11. // Handle an incoming order
  12. void add_order(const std::string& symbol, OrderPtr& order);
  13. private:
  14. typedef std::map<std::string, ExampleOrderBook> OrderBookMap;
  15. OrderBookMap order_books_;
  16. ExampleOrderBook::TypedDepthListener* depth_listener_;
  17. ExampleOrderBook::TypedTradeListener* trade_listener_;
  18. };
  19.  
  20. } }

The Exchange class has only three methods: a constructor, a method to add an order book, and a method to handle an order. As expected, it maintains a map of order books. It also maintains pointers to the depth listener and trade listener. The constructor does exactly that, in order to add the listeners to future order books:

exchange.cpp: Exchange class implementation
  1. namespace liquibook { namespace examples {
  2.  
  3. Exchange::Exchange(ExampleOrderBook::TypedDepthListener* depth_listener,
  4. ExampleOrderBook::TypedTradeListener* trade_listener)
  5. : depth_listener_(depth_listener),
  6. trade_listener_(trade_listener)
  7. {
  8. }

The add_order_book() method creates a new order book, sets the listeners on the order book, and adds a mapping from the given symbol to the order book:

exchange.cpp: Exchange class implementation, continued
  1. void
  2. Exchange::add_order_book(const std::string& sym)
  3. {
  4. std::pair<OrderBookMap::iterator, bool> result;
  5. result = order_books_.insert(std::make_pair(sym, ExampleOrderBook(sym)));
  6. result.first->second.set_depth_listener(depth_listener_);
  7. result.first->second.set_trade_listener(trade_listener_);
  8. }

Finally, the add_order() method finds the correct order book, adds the order to the order book, and then triggers callbacks by calling the perform_callbacks() method:

exchange.cpp: Exchange class implementation, continued
  1. void
  2. Exchange::add_order(const std::string& symbol, OrderPtr& order)
  3. {
  4. OrderBookMap::iterator order_book = order_books_.find(symbol);
  5. if (order_book != order_books_.end()) {
  6. order_book->second.add(order);
  7. order_book->second.perform_callbacks();
  8. }
  9. }
  10.  
  11. } } // End namespace

The call to perform_callbacks() will trigger any callbacks to the provided listeners. This way, a single thread is manipulating the order books. It is also possible to issue perform_callbacks() calls in a background thread, but the publisher does it this way for simplicity.

Order Class

Liquibook requires that the application define an order class to represent an order to match. Liquibook requires that it meet the interface defined in the class src/book/order.h:

book/order.h: Order class (Liquibook) declaration
  1. namespace liquibook { namespace book {
  2.  
  3. class Order {
  4. public:
  5. /// @brief is this a limit order?
  6. bool is_limit() const;
  7.  
  8. /// @brief is this order a buy?
  9. virtual bool is_buy() const = 0;
  10.  
  11. /// @brief get the price of this order, or 0 if a market order
  12. virtual Price price() const = 0;
  13.  
  14. /// @brief get the quantity of this order
  15. virtual Quantity order_qty() const = 0;
  16. };
  17.  
  18. } }

The publisher defines Order to implement the book::Order interface:

order.h: Order class (publisher) declaration
  1. namespace liquibook { namespace examples {
  2.  
  3. class Order : public book::Order {
  4. public:
  5. Order(bool buy,
  6. const double& price,
  7. book::Quantity qty);
  8.  
  9. virtual bool is_buy() const;
  10. virtual book::Quantity order_qty() const;
  11. virtual book::Price price() const;
  12.  
  13. static const uint8_t precision_;
  14. private:
  15. bool is_buy_;
  16. double price_;
  17. book::Quantity qty_;
  18. };
  19.  
  20. } }

Note that as a template class, Liquibook's book::OrderBook does not strictly require inheriting from the book::Order class, just that the interface be implemented.

The example's Order class implements book::Order, and adds a static field called precision_ for price conversion.

The Order constructor simply copies the arguments passed to it:

order.h: Order class implementation
  1. namespace liquibook { namespace examples {
  2.  
  3. const uint8_t Order::precision_(100);
  4.  
  5. Order::Order(bool buy, const double& price, book::Quantity qty)
  6. : is_buy_(buy),
  7. price_(price),
  8. qty_(qty)
  9. {
  10. }

Note the presence of the static initializer of precision_.

The is_buy() and order_qty() are simple accessors:

order.h: Order class implementation, continued
  1. bool
  2. Order::is_buy() const
  3. {
  4. return is_buy_;
  5. }
  6.  
  7. book::Quantity
  8. Order::order_qty() const
  9. {
  10. return qty_;
  11. }

The final method is used to provide a price to the order book. For performance reasons, the Liquibook requires this to be in integer format, so Order converts the price by multiplying by the precision:

order.h: Order class implementation, continued
  1. book::Price
  2. Order::price() const
  3. {
  4. return price_ * precision_;
  5. }

There are two implications - first, that the publisher is limited to penny precision. This is suitable for purposes of the example, but may need to be increased for other use cases. The second implication is that all securities in the publisher have the same precision - since a static variable is used to set precision. Again, in other use cases, this may not be sufficient.

QuickFAST Templates

To transmit messages using the FAST protocol, one must first decide on the messages to transmit on the feed. This exchange will produce a trade feed and an incremental depth feed. A simplified view of a trade consists of a symbol, a quantity, and a trade price. The publisher alters this slightly by providing the trade cost (price times quantity), from which price per share can be derived.

QuickFAST uses XML files, called templates to describe the messages in the feed protocol. The example's templates can be found in the file examples/depth_feed_publisher/depth.xml. The trade message template looks like this:

  1. <template name="Trade" id="1">
  2. <uInt16 name="MessageType" id="100">
  3. <constant value="22"></constant>
  4. </uInt16>
  5. <uInt32 name="SequenceNumber" id="200">
  6. <increment></increment>
  7. </uInt32>
  8. <uInt32 name="Timestamp" id="300">
  9. <copy></copy>
  10. </uInt32>
  11. <string name="Symbol" id="400">
  12. <copy></copy>
  13. </string>
  14. <uInt32 name="Quantity" id="604">
  15. <copy></copy>
  16. </uInt32>
  17. <uInt32 name="Cost" id="603">
  18. <copy></copy>
  19. </uInt32>
  20. </template>

Note that the tag names within the <template> tag indicate the type of the field. The reader may be surprised to see a currency field (Cost) represented as an integer (uint32). This, however is consistent with Liquibook's internal storage of prices as as integer. Liquibook's design decision is thus carried forward to the feed protocol, requiring the clients to convert these prices back to their decimal format.

In addition to the expected trade fields, the publisher provides a message type, to indicate to the subscriber that the message is for a trade, a sequence number, so the subscriber can be confident it has received all messages, and processed them in the correct order, and a timestamp, so the subscriber can be confident the message has been received in a timely manner.

The depth update template is more complicated:

  1. <template name="Depth" id="2">
  2. <uInt16 name="MessageType" id="100">
  3. <constant value="11"></constant>
  4. </uInt16>
  5. <uInt32 name="SequenceNumber" id="200">
  6. <increment></increment>
  7. </uInt32>
  8. <uInt32 name="Timestamp" id="300">
  9. <copy></copy>
  10. </uInt32>
  11. <string name="Symbol" id="400">
  12. <copy></copy>
  13. </string>
  14. <sequence name="Bids" id="500">
  15. <uInt8 name="LevelNum" id="501">
  16. <copy></copy>
  17. </uInt8>
  18. <uInt32 name="OrderCount" id="502">
  19. <copy></copy>
  20. </uInt32>
  21. <uInt32 name="Price" id="503">
  22. <copy></copy>
  23. </uInt32>
  24. <uInt32 name="AggregateQty" id="504">
  25. <copy></copy>
  26. </uInt32>
  27. </sequence>
  28. <sequence name="Asks" id="600">
  29. <uInt8 name="LevelNum" id="601">
  30. <copy></copy>
  31. </uInt8>
  32. <uInt32 name="OrderCount" id="602">
  33. <copy></copy>
  34. </uInt32>
  35. <uInt32 name="Price" id="603">
  36. <copy></copy>
  37. </uInt32>
  38. <uInt32 name="AggregateQty" id="604">
  39. <copy></copy>
  40. </uInt32>
  41. </sequence>
  42. </template>

The depth message begins with the same 4 fields as the trade message: message type, sequence number, timestamp, and symbol. The depth message also includes two sequences, or variable-length lists. These sequences represent the changed bid and ask levels of the depth for the message security. The sequences themselves contain a set of fields, although in this case both sequences are of the same type.

Each changed depth level begins with a level number, indicating which level has changed. If the exchange were to produce all depth levels on every change, this would not be necessary. Because in an incremental feed the sequence element could represent any of the 5 levels, it is required.

Order count indicates the number of orders which were aggregated at this level. If the order count is 0, it indicates a deleted level. Price is the price common to all the aggregated orders at this level. As in the trade message, it is represented by an integer. Finally, aggregate quantity shows the sum of all the order quantities at this level.

Using QuickFAST Template

With the template in place, the publisher (and subscriber) must use QuickFAST to put the templates to use for encoding (and decoding). The publisher (and subscriber) do this by means of the TemplateConsumer class:

tempate_consumer.h: TemplateConsumer class declaration
  1. namespace liquibook { namespace examples {
  2.  
  3. class TemplateConsumer {
  4. public:
  5. static QuickFAST::Codecs::TemplateRegistryPtr
  6. parse_templates(const std::string& template_filename);
  7.  
  8. // Trade field identities
  9. static const QuickFAST::Messages::FieldIdentity id_qty_;
  10. static const QuickFAST::Messages::FieldIdentity id_cost_;
  11.  
  12. // Common field identities
  13. static const QuickFAST::Messages::FieldIdentity id_seq_num_;
  14. static const QuickFAST::Messages::FieldIdentity id_msg_type_;
  15. static const QuickFAST::Messages::FieldIdentity id_timestamp_;
  16. static const QuickFAST::Messages::FieldIdentity id_symbol_;
  17.  
  18. // Depth field identities
  19. static const QuickFAST::Messages::FieldIdentity id_bids_length_;
  20. static const QuickFAST::Messages::FieldIdentity id_bids_;
  21. static const QuickFAST::Messages::FieldIdentity id_asks_length_;
  22. static const QuickFAST::Messages::FieldIdentity id_asks_;
  23.  
  24. static const QuickFAST::Messages::FieldIdentity id_level_num_;
  25. static const QuickFAST::Messages::FieldIdentity id_order_count_;
  26. static const QuickFAST::Messages::FieldIdentity id_price_;
  27. static const QuickFAST::Messages::FieldIdentity id_size_;
  28. };
  29.  
  30. } }

The TemplateConsumer class also has a static parse_templates() method for parsing of templates. It also includes several static members of type const QuickFAST::Messages::FieldIdentityCPtr. These members are used by QuickFAST to establish identities of fields within the various messages. The reader will notice that the field identifiers correspond to fields in the template definition, with two exceptions: id_bids_length_, and id_asks_length_. This is because the FAST protocol specifies that a sequence length is required for each sequence, but can be implicitly declared.

These members are initialized in a manner consistent with the template definition:

tempate_consumer.cpp: TemplateConsumer class implementation
  1. namespace liquibook { namespace examples {
  2.  
  3. using namespace QuickFAST::Messages;
  4.  
  5. const FieldIdentity TemplateConsumer::id_seq_num_("SequenceNumber");
  6.  
  7. const FieldIdentity TemplateConsumer::id_msg_type_("MessageType");
  8.  
  9. const FieldIdentity TemplateConsumer::id_timestamp_("Timestamp");
  10.  
  11. // more like this...

The remaining members are initialized in a similar manner, and thus are not included in this paper. Finally, there is the parse_templates() method which parses the QuickFAST templates:

tempate_consumer.cpp: TemplateConsumer class implementation, continued
  1. QuickFAST::Codecs::TemplateRegistryPtr
  2. TemplateConsumer::parse_templates(const std::string& template_filename)
  3. {
  4. std::ifstream template_stream(template_filename.c_str());
  5. QuickFAST::Codecs::XMLTemplateParser parser;
  6. return parser.parse(template_stream);
  7. }
  8.  
  9. } }

The parse_templates() method opens an input file stream using the filename passed to it. Then it creates a QuickFAST::Codecs::XMLTemplateParser and calls the parse() method on it, passing it the input file stream. This parses the template file, and produces the QuickFAST::Codecs::TemplateRegistryPtr which preserves the parsed form of the template file.

Connecting the Publisher and Subscriber

The publisher and subscriber communicate over TCP/IP using boost::ASIO. The communication between publisher and subscriber is one way - the publisher only sends messages, and the subscriber only receives them. The communication is also one-to-many, as multiple subscribers connect to a single publisher. Since QuickFAST encodes a stream of data, uniquely compressed for each recipient, the publisher will need to maintain a QuickFAST state for each subscriber's encoder. This is done through a class called DepthFeedSession.

The publisher also uses the DepthFeedConnection to listen for connections from subscribers. The DepthFeedConnection will establish many DepthFeedSession instances.

depth_feed_connection.h: DepthFeedSession class declaration
  1. namespace liquibook { namespace examples {
  2. typedef boost::shared_ptr<QuickFAST::WorkingBuffer> WorkingBufferPtr;
  3. typedef std::deque<WorkingBufferPtr> WorkingBuffers;
  4. typedef boost::array<unsigned char, 128> Buffer;
  5. typedef boost::shared_ptr<Buffer> BufferPtr;
  6. typedef boost::function<bool (BufferPtr, size_t)> MessageHandler;
  7. typedef boost::function<void ()> ResetHandler;
  8. typedef boost::function<void (const boost::system::error_code& error,
  9. std::size_t bytes_transferred)> SendHandler;
  10. typedef boost::function<void (const boost::system::error_code& error,
  11. std::size_t bytes_transferred)> RecvHandler;
  12.  
  13. class DepthFeedConnection;
  14.  
  15. // Session between a publisher and one subscriber
  16. class DepthFeedSession : boost::noncopyable {
  17. public:
  18. DepthFeedSession(boost::asio::io_service& ios,
  19. DepthFeedConnection* connection,
  20. QuickFAST::Codecs::TemplateRegistryPtr& templates);

In addition to declaring some typedefs and a forward declaration, this snippet of code declaresDepthFeedSession's constructor. This constructor accepts (1) an IO Service object through which it can create a TCP socket, (2) the DepthFeedConnection and (3) the parsed QuickFAST templates, so it can encode outgoing messages.

Next are an accessor and setter for the session's connection status, and an accessor for the session's TCP socket:

depth_feed_connection.h: DepthFeedSession class declaration, continued
  1. // Is this session connected?
  2. bool connected() const { return connected_; }
  3.  
  4. // Mark this session as connected
  5. void set_connected() { connected_ = true; }
  6.  
  7. // Get the socket for this session
  8. boost::asio::ip::tcp::socket& socket() { return socket_; }

The real meat of the class is next, where messages are sent to the subscriber, in send_trade() for trades, and both send_incr_update() and send_full_update() for depth updates.

depth_feed_connection.h: DepthFeedSession class declaration, continued
  1. // Send a trade messsage to all clients
  2. void send_trade(QuickFAST::Messages::FieldSet& message);
  3.  
  4. // Send an incremental update - if this client has handled this symbol
  5. // return true if handled
  6. bool send_incr_update(const std::string& symbol,
  7. QuickFAST::Messages::FieldSet& message);
  8.  
  9. // Send a full update - if the client has not yet received for this symbol
  10. void send_full_update(const std::string& symbol,
  11. QuickFAST::Messages::FieldSet& message);

The reader may wonder why there are two methods for sending depth updates. This is because the publisher produces an incremental feed for depth updates, allowing only the changed depth levels to be published to the subscriber, rather than all 5 levels of depth. This is particularly effective because of the nature of this data - most changes to depth are at the top one or two levels of the limit order book.

Note that the trade feed is not an incremental feed - each trade stands on its own. This could be different if the trade carried accumulating values, such as volume for the day, or average traded price. In this case, an incremental feed may be suitable for trades as well.

For the depth feed, two update methods are provided, for sending an incremental update, and for sending a full update. Even though the depth feed is an incremental feed, it is necessary to send full updates on occasion. A full update is required, for example, when the subscriber does not yet have the depth state for a security, such as when it first connects to the publisher.

The DepthFeedSession class includes a number of private members:

depth_feed_connection.h: DepthFeedSession class declaration, continued
  1. private:
  2. bool connected_;
  3. uint64_t seq_num_;
  4.  
  5. boost::asio::io_service& ios_;
  6. boost::asio::ip::tcp::socket socket_;

The first member indicates the connection status of the session. Next, is a sequence number counter for the session. This is followed by two low-level ASIO members: a reference to the IO service, and a socket for the session.

This is followed by a pointer to the one DepthFeedConnection of the publisher, and the session's QuickFAST encoder:

depth_feed_connection.h: DepthFeedSession class declaration, continued
  1. DepthFeedConnection* connection_;
  2. QuickFAST::Codecs::Encoder encoder_;

Next, is a set to track which securities have had depth information published, in order to track which securities still need a full update:

depth_feed_connection.h: DepthFeedSession class declaration, continued
  1. typedef std::set<std::string> StringSet;
  2. StringSet sent_symbols_;

Next are two static members that store the template IDS for the trade and depth messages:

depth_feed_connection.h: DepthFeedSession class declaration, continued
  1. static QuickFAST::template_id_t TID_TRADE_MESSAGE;
  2. static QuickFAST::template_id_t TID_DEPTH_MESSAGE;

Finally, there are two private methods, to set the sequence number on a message, and to handle a sent message:

depth_feed_connection.h: DepthFeedSession class declaration, continued
  1. void set_sequence_num(QuickFAST::Messages::FieldSet& message);
  2.  
  3. void on_send(WorkingBufferPtr wb,
  4. const boost::system::error_code& error,
  5. std::size_t bytes_transferred);
  6. };

The DepthFeedSession implementation begins with the declarations of its two static members, TID_TRADE_MESSAGE and TID_DEPTH_MESSAGE.

depth_feed_connection.cpp: DepthFeedSession class implementation
  1. using namespace boost::asio::ip;
  2.  
  3. namespace liquibook { namespace examples {
  4.  
  5. QuickFAST::template_id_t DepthFeedSession::TID_TRADE_MESSAGE(1);
  6. QuickFAST::template_id_t DepthFeedSession::TID_DEPTH_MESSAGE(2);

Next is the class constructor, which sets the connected status to false, sets the session sequence number to 0, initializes the socket, saves the IO service and connection, and provides the templates to the QuickFAST encoder:

depth_feed_connection.cpp: DepthFeedSession class implementation, continued
  1. DepthFeedSession::DepthFeedSession(
  2. boost::asio::io_service& ios,
  3. DepthFeedConnection* connection,
  4. QuickFAST::Codecs::TemplateRegistryPtr& templates)
  5. : connected_(false),
  6. seq_num_(0),
  7. ios_(ios),
  8. socket_(ios),
  9. connection_(connection),
  10. encoder_(templates)
  11. {
  12. }

The next method sends a trade to the subscriber:

depth_feed_connection.cpp: DepthFeedSession class implementation, continued
  1. void
  2. DepthFeedSession::send_trade(QuickFAST::Messages::FieldSet& message)
  3. {
  4. // Add or update sequence number in message
  5. set_sequence_num(message);
  6.  
  7. std::cout && "sending trade message with " && message.size() && " fields" && std::endl;
  8.  
  9. QuickFAST::Codecs::DataDestination dest;
  10. encoder_.encodeMessage(dest, TID_TRADE_MESSAGE, message);
  11. WorkingBufferPtr wb = connection_->reserve_send_buffer();
  12. dest.toWorkingBuffer(*wb);
  13.  
  14. // Perform the send
  15. SendHandler send_handler = boost::bind(&DepthFeedSession::on_send,
  16. this, wb, _1, _2);
  17. boost::asio::const_buffers_1 buffer(
  18. boost::asio::buffer(wb->begin(), wb->size()));
  19. socket_.async_send(buffer, 0, send_handler);
  20. }

The send_trade() accepts an argument, reference to type QuickFAST::Messages::FieldSet, which contains the method in a protocol-neutral format. The method starts by setting the sequence number of the message by passing the field set to set_sequence_num(). The caller is responsible for filling out the other fields of the message, while the session sets the sequence number.

Next, the method declares a data destination variable, which holds an encoded message. This message is then encoded into the destination by the QuickFAST encoder, using the template ID for trade messages.

Next a QuickFAST working buffer is reserved for sending the message, and the encoded message is serialized into the buffer. To send the message, the method first creates a send handler for the buffer, so that DepthFeedSession::on_send is called after the send. Next, it sets up an ASIO buffer to send the working buffer's contents, and calls the socket's async_send() method.

The send_incr_update method is similar, with some key differences:

depth_feed_connection.cpp: DepthFeedSession class implementation, continued
  1. bool
  2. DepthFeedSession::send_incr_update(const std::string& symbol,
  3. QuickFAST::Messages::FieldSet& message)
  4. {
  5. bool sent = false;
  6. // If the session has been started for this symbol
  7. if (sent_symbols_.find(symbol) != sent_symbols_.end()) {
  8. QuickFAST::Codecs::DataDestination dest;
  9. // Add or update sequence number in message
  10. set_sequence_num(message);
  11. encoder_.encodeMessage(dest, TID_DEPTH_MESSAGE, message);
  12. WorkingBufferPtr wb = connection_->reserve_send_buffer();
  13. dest.toWorkingBuffer(*wb);
  14. SendHandler send_handler = boost::bind(&DepthFeedSession::on_send,
  15. this, wb, _1, _2);
  16. boost::asio::const_buffers_1 buffer(
  17. boost::asio::buffer(wb->begin(), wb->size()));
  18. socket_.async_send(buffer, 0, send_handler);
  19. sent = true;
  20. }
  21. return sent;
  22. }

First, the reader will notice that send_incr_update() returns a bool value. This value indicates whether the message was sent or not. If not, the caller is responsible for calling send_full_update() with the message.

The method also checks the sent_symbols_ for the symbol of the message (provided as an argument) to ensure the security has been sent to the subscriber previously. Should this be true, execution is similar to that of send_trade(), where the sequence number is set, the message is encoded to a data destination, a working buffer is reserved, the destination serialized to the working buffer, and the working buffer contents is sent.

The send_full_update() method is similar to send_incr_update, without the return value. In this case, the method updates the sent_symbols_ map to be true. As a form of protection, it does not send the full update if the map already had a value for that security.

depth_feed_connection.cpp: DepthFeedSession class implementation, continued

  1. void
  2. DepthFeedSession::send_full_update(const std::string& symbol,
  3. QuickFAST::Messages::FieldSet& message)
  4. {
  5. // Mark this symbols as sent
  6. std::pair<StringSet::iterator, bool> result = sent_symbols_.insert(symbol);
  7.  
  8. // If this symbol is new for the session
  9. if (result.second) {
  10. QuickFAST::Codecs::DataDestination dest;
  11. // Add or update sequence number in message
  12. set_sequence_num(message);
  13. encoder_.encodeMessage(dest, TID_DEPTH_MESSAGE, message);
  14. WorkingBufferPtr wb = connection_->reserve_send_buffer();
  15. dest.toWorkingBuffer(*wb);
  16.  
  17. // Perform the send
  18. SendHandler send_handler = boost::bind(&DepthFeedSession::on_send,
  19. this, wb, _1, _2);
  20. boost::asio::const_buffers_1 buffer(
  21. boost::asio::buffer(wb->begin(), wb->size()));
  22. socket_.async_send(buffer, 0, send_handler);
  23. }
  24. }

The first private method, set_sequence_num() is used to set the sequence number field in the outgoing message. The sequence number is unique to each session, so it must be encoded for each copy of the outbound message:

depth_feed_connection.cpp: DepthFeedSession class implementation, continued
  1. void
  2. DepthFeedSession::set_sequence_num(QuickFAST::Messages::FieldSet& message)
  3. {
  4. // Create the field
  5. QuickFAST::Messages::FieldCPtr value =
  6. QuickFAST::Messages::FieldUInt32::create(++seq_num_);
  7. // Update the sequence number
  8. if (!message.replaceField(TemplateConsumer::id_seq_num_, value)) {
  9. // Not found, add the sequence number
  10. message.addField(TemplateConsumer::id_seq_num_, value);
  11. }
  12. }
  13.  

The method begins by creating a QUICKFAST field for the incremented sequence number. Since this code resides within a session, there is a copy of the member variable seq_num_ for each subscriber. The first message will have a sequence number of 1.

The method attempts to update the sequence number field in the message, and if the sequence number is not yet present in the message, the field is added. Using this technique, the message can be built once, and then have the sequence number customized for each subscriber, without having to build the message from scratch for each subscriber.

The other private method, on_send() handles a sent message, logging any error code, and calling the DepthFeedConnection version of the method:

depth_feed_connection.cpp: DepthFeedSession class implementation, continued
  1. void
  2. DepthFeedSession::on_send(WorkingBufferPtr wb,
  3. const boost::system::error_code& error,
  4. std::size_t bytes_transferred)
  5. {
  6. if (error) {
  7. std::cout << "Error " << error << " sending message" << std::endl;
  8. connected_ = false;
  9. }
  10.  
  11. // Keep buffer for later
  12. connection_->on_send(wb, error, bytes_transferred);
  13. }
  14.  

The DepthFeedConnection class is used by both the publisher and the subscriber. It has a constructor that accepts the command line arguments:

depth_feed_connection.h: DepthFeedConnection class declaration
  1. typedef boost::shared_ptr<DepthFeedSession> SessionPtr;
  2.  
  3. class DepthFeedConnection : boost::noncopyable {
  4. public:
  5. DepthFeedConnection(int argc, const char* argv[]);

Next are an accessor for the templates parsed by the connection, and two methods for connecting the publisher and subscribers. The publisher calls accept() and the subscriber calls connect():

depth_feed_connection.h: DepthFeedConnection class declaration, continued
  1. // Get the template registry
  2. const QuickFAST::Codecs::TemplateRegistryPtr&
  3. get_templates() { return templates_; }
  4.  
  5. // Connect to publisher
  6. void connect();
  7.  
  8. // Accept connection from subscriber
  9. void accept();

The next method, run() issues all the ASIO callbacks. Both the publisher and subscriber must cal run().

depth_feed_connection.h: DepthFeedConnection class declaration, continued
  1. // Let the IO service run
  2. void run();

The next two methods set up handlers for events:

depth_feed_connection.h: DepthFeedConnection class declaration, continued
  1. // Set a callback to handle a message
  2. void set_message_handler(MessageHandler msg_handler);
  3.  
  4. // Set a callback to handle a reset connection
  5. void set_reset_handler(ResetHandler reset_handler);

The first, set_message_handler(), handles incoming messages in the subscriber. The second, set_reset_handler() within the publisher handles disconnects within the subscriber to the publisher.

The next two methods are for buffer management:

depth_feed_connection.h: DepthFeedConnection class declaration, continued
  1. // Reserve a buffer for receiving a message
  2. BufferPtr reserve_recv_buffer();
  3.  
  4. // Reserve a buffer for sending a message
  5. WorkingBufferPtr reserve_send_buffer();

Rather than constantly allocating and deallocating buffers, the publisher and subscriber have a pool of buffers ready to be used again and again. The publisher and subscriber will reserve these buffers from DepthFeedConnection, bind them to an ASIO asynchronous callback, and then restore the buffers back to the DepthFeedConnection.

The first, reserve_recv_buffer() is called by the subscriber for a buffer to receive a message. Likewise, reserve_send_buffer() reserves a buffer to serialize an encoded QuickFAST message, for the publisher to send to the subscriber.

Next, are three methods for the publisher to send a message to each subscriber:

depth_feed_connection.h: DepthFeedConnection class declaration, continued
  1. // Send a trade messsage to all clients
  2. void send_trade(QuickFAST::Messages::FieldSet& message);
  3.  
  4. // Send an incremental update
  5. // return true if all sessions could handle an incremental update
  6. bool send_incr_update(const std::string& symbol,
  7. QuickFAST::Messages::FieldSet& message);
  8.  
  9. // Send a full update to those which have not yet received for this symbol
  10. void send_full_update(const std::string& symbol,
  11. QuickFAST::Messages::FieldSet& message);

These three methods call the corresponding method on DepthFeedSession for each subscriber. The public interface ends with a number of event handlers:

depth_feed_connection.h: DepthFeedConnection class declaration, continued
  1. // Handle a connection
  2. void on_connect(const boost::system::error_code& error);
  3.  
  4. // Handle an accepted connection
  5. void on_accept(SessionPtr session,
  6. const boost::system::error_code& error);
  7.  
  8. // Handle a received message
  9. void on_receive(BufferPtr bp,
  10. const boost::system::error_code& error,
  11. std::size_t bytes_transferred);
  12. // Handle a sent message
  13. void on_send(WorkingBufferPtr wb,
  14. const boost::system::error_code& error,
  15. std::size_t bytes_transferred);

These methods handle, the result of a subscriber connecting to the publisher, a publisher's accepted connection from a subscriber, a message received from the publisher, and a message sent to the subscriber, respectively.

The private members of DepthFeedConnection start with some configuration date: the name of the file containing the FAST templates, the host for the subscriber to connect to, and the connection port. These are followed by the event handlers, the parsed QuickFAST templates, buffer pools, the DepthFeedSession instances, and some boost ASIO helpers.

The private section ends with some methods, to issue a read, and parse the command line arguments:

depth_feed_connection.h: DepthFeedConnection class declaration, continued
  1. private:
  2. typedef std::deque<BufferPtr> Buffers;
  3. typedef std::vector<SessionPtr> Sessions;
  4. const char* template_filename_;
  5. const char* host_;
  6. int port_;
  7. MessageHandler msg_handler_;
  8. ResetHandler reset_handler_;
  9. QuickFAST::Codecs::TemplateRegistryPtr templates_;
  10.  
  11. Buffers unused_recv_buffers_;
  12. WorkingBuffers unused_send_buffers_;
  13. Sessions sessions_;
  14. boost::shared_ptr<boost::asio::ip::tcp::acceptor> acceptor_;
  15. boost::asio::io_service ios_;
  16. boost::asio::ip::tcp::socket socket_;
  17. boost::shared_ptr<boost::asio::io_service::work> work_ptr_;
  18.  
  19. void issue_read();
  20. static const char* template_file_from_args(int argc, const char* argv[]);
  21. static const char* host_from_args(int argc, const char* argv[]);
  22. static int port_from_args(int argc, const char* argv[]);
  23. };
  24. } } // End namespace

The DepthFeedConnection constructor derives the configuration variables by passing the command-line arguments to the *_from_args() methods:

depth_feed_connection.cpp: DepthFeedConnection class implementation
  1. DepthFeedConnection::DepthFeedConnection(int argc, const char* argv[])
  2. : template_filename_(template_file_from_args(argc, argv)),
  3. host_(host_from_args(argc, argv)),
  4. port_(port_from_args(argc, argv)),
  5. templates_(TemplateConsumer::parse_templates(template_filename_)),
  6. socket_(ios_)
  7. {
  8. }

It also invokes parse_templates() to parse the QuickFAST templates, and initializes the subscriber socket. Next, connect() does a standard ASIO asynchronous connection, using on_connect() as its callback:

depth_feed_connection.cpp: DepthFeedConnection class implementation, continued
  1. void
  2. DepthFeedConnection::connect()
  3. {
  4. std::cout << "Connecting to feed" << std::endl;
  5. tcp::endpoint endpoint(address::from_string(host_), port_);
  6. socket_.async_connect(endpoint, boost::bind(&DepthFeedConnection::on_connect,
  7. this, _1));
  8. }

The accept() method implements the publisher's side of ASIO connectivity. The first time accept() id called a TCP acceptor is created and initialized. For each call, a DepthFeedSession is created, and an asynchronous accept is started, using to on_accept() as its callback:

depth_feed_connection.cpp: DepthFeedConnection class implementation, continued
  1. void
  2. DepthFeedConnection::accept()
  3. {
  4. if (!acceptor_) {
  5. acceptor_.reset(new tcp::acceptor(ios_));
  6. tcp::endpoint endpoint(tcp::v4(), port_);
  7. acceptor_->open(endpoint.protocol());
  8. boost::system::error_code ec;
  9. acceptor_->set_option(boost::asio::socket_base::reuse_address(true), ec);
  10. acceptor_->bind(endpoint);
  11. acceptor_->listen();
  12. }
  13. SessionPtr session(new DepthFeedSession(ios_, this, templates_));
  14. acceptor_->async_accept(
  15. session->socket(),
  16. boost::bind(&DepthFeedConnection::on_accept, this, session, _1));
  17. }

The run() method invokes the run() method on the IO service object. Usually run() will exit when there are no more callbacks to make. Using the io_service::work object ensures run()does not exit:

depth_feed_connection.cpp: DepthFeedConnection class implementation, continued
  1. void
  2. DepthFeedConnection::run()
  3. {
  4. std::cout << "DepthFeedConnection::run()" << std::endl;
  5. // Keep on running
  6. work_ptr_.reset(new boost::asio::io_service::work(ios_));
  7. ios_.run();
  8. }

The next two methods simply save off the two message handlers:

depth_feed_connection.cpp: DepthFeedConnection class implementation, continued
  1. void
  2. DepthFeedConnection::set_message_handler(MessageHandler handler)
  3. {
  4. msg_handler_ = handler;
  5. }
  6.  
  7. void
  8. DepthFeedConnection::set_reset_handler(ResetHandler handler)
  9. {
  10. reset_handler_ = handler;
  11. }
  12.  

As stated above, the DepthFeedConnection uses buffer pools to reduce heap allocations and deallocations. The following two methods provide unused buffers to the caller:

depth_feed_connection.cpp: DepthFeedConnection class implementation
  1. BufferPtr
  2. DepthFeedConnection::reserve_recv_buffer()
  3. {
  4. if (unused_recv_buffers_.empty()) {
  5. return BufferPtr(new Buffer());
  6. } else {
  7. BufferPtr bp = unused_recv_buffers_.front();
  8. unused_recv_buffers_.pop_front();
  9. return bp;
  10. }
  11. }
  12.  
  13. WorkingBufferPtr
  14. DepthFeedConnection::reserve_send_buffer()
  15. {
  16. if (unused_send_buffers_.empty()) {
  17. return WorkingBufferPtr(new QuickFAST::WorkingBuffer());
  18. } else {
  19. WorkingBufferPtr wb = unused_send_buffers_.front();
  20. unused_send_buffers_.pop_front();
  21. return wb;
  22. }
  23. }
  24.  

In both methods, an unused buffer is returned, and one is created if none are available.

The send_trade()send_incr_update() and send_full_update() methods send a message to all subscribers by iterating through the sessions and invoking the corresponding method on each:

depth_feed_connection.cpp: DepthFeedConnection class implementation, continued
  1. void
  2. DepthFeedConnection::send_trade(QuickFAST::Messages::FieldSet& message)
  3. {
  4. // For each session
  5. Sessions::iterator session;
  6. for (session = sessions_.begin(); session != sessions_.end(); ) {
  7. // If the session is connected
  8. if ((*session)->connected()) {
  9. // conditionally send on that session
  10. (*session)->send_trade(message);
  11. ++session;
  12. } else {
  13. // Remove the session
  14. session = sessions_.erase(session);
  15. }
  16. }
  17. }
  18.  
  19. bool
  20. DepthFeedConnection::send_incr_update(const std::string& symbol,
  21. QuickFAST::Messages::FieldSet& message)
  22. {
  23. bool none_new = true;
  24. // For each session
  25. Sessions::iterator session;
  26. for (session = sessions_.begin(); session != sessions_.end(); ) {
  27. // If the session is connected
  28. if ((*session)->connected()) {
  29. // send on that session
  30. if (!(*session)->send_incr_update(symbol, message)) {
  31. none_new = false;
  32. }
  33. ++session;
  34. } else {
  35. // Remove the session
  36. session = sessions_.erase(session);
  37. }
  38. }
  39. return none_new;
  40. }
  41.  
  42. void
  43. DepthFeedConnection::send_full_update(const std::string& symbol,
  44. QuickFAST::Messages::FieldSet& message)
  45. {
  46. // For each session
  47. Sessions::iterator session;
  48. for (session = sessions_.begin(); session != sessions_.end(); ) {
  49. // If the session is connected
  50. if ((*session)->connected()) {
  51. // conditionally send on that session
  52. (*session)->send_full_update(symbol, message);
  53. ++session;
  54. } else {
  55. // Remove the session
  56. session = sessions_.erase(session);
  57. }
  58. }
  59. }
  60.  

In any of the cases, if the session is no longer connected, the session is removed. There is one important difference in send_incr_update() - the method's return value indicates if any of the sessions gave back a return value of false.

The next method handles a connection within the subscriber:

depth_feed_connection.cpp: DepthFeedConnection class implementation, continued
  1. void
  2. DepthFeedConnection::on_connect(const boost::system::error_code& error)
  3. {
  4. if (!error) {
  5. std::cout << "connected to feed" << std::endl;
  6. reset_handler_();
  7. issue_read();
  8. } else {
  9. std::cout << "on_connect, error=" << error << std::endl;
  10. socket_.close();
  11. sleep(3);
  12. // Try again
  13. connect();
  14. }
  15. }

When on_connect() is called, the method tries again, in case of failure, or calls the reset handler and then begins to read on the socket, in case of success.

The next handler is the counterpart in the publisher:

depth_feed_connection.cpp: DepthFeedConnection class implementation, continued
  1. void
  2. DepthFeedConnection::on_accept(SessionPtr session,
  3. const boost::system::error_code& error)
  4. {
  5. if (!error) {
  6. std::cout << "accepted client connection" << std::endl;
  7. sessions_.push_back(session);
  8. session->set_connected();
  9. } else {
  10. std::cout << "on_accept, error=" << error << std::endl;
  11. session.reset();
  12. sleep(2);
  13. }
  14. // accept again
  15. accept();
  16. }

In the case of on_accept(), the publisher always restarts the accept cycle - so that another subscriber may connect. The subscriber, on the other hand, only wants one connection.

In the case of a successful connection, the publisher saves off the session, and marks it as connected, so that messages may be sent to it. If there is failure, the session is deleted (by resetting the shared pointer) and the publisher waits momentarily before starting again.

The next event handler handles a message received by the subscriber:

depth_feed_connection.cpp: DepthFeedConnection class implementation, continued
  1. void
  2. DepthFeedConnection::on_receive(BufferPtr bp,
  3. const boost::system::error_code& error,
  4. std::size_t bytes_transferred)
  5. {
  6. if (!error) {
  7. // Next read
  8. issue_read();
  9.  
  10. // Handle the buffer
  11. if (!msg_handler_(bp, bytes_transferred)) {
  12. socket_.close();
  13. }
  14. } else {
  15. std::cout << "Error " << error << " receiving message" << std::endl;
  16. socket_.close();
  17. sleep(3);
  18. connect();
  19. }
  20. // Restore buffer
  21. unused_recv_buffers_.push_back(bp);
  22. }

When the receive is handled, if the read was successful, the subscriber first issues a new read, and then calls the receive handler, which parses the message. Should this parsing fail, the socket will close (and the next read will then fail).

Should the read fail, the socket is closed, and the connection cycle starts anew. In all cases, the buffer is restored back to the pool.

The next handler is called after a send of a message to the subscriber:

depth_feed_connection.cpp: DepthFeedConnection class implementation, continued
  1. void
  2. DepthFeedConnection::on_send(WorkingBufferPtr wb,
  3. const boost::system::error_code& error,
  4. std::size_t bytes_transferred)
  5. {
  6. // Keep buffer for later
  7. unused_send_buffers_.push_back(wb);
  8. }
  9.  

Recall that this method is called from within DepthFeedSession::on_send() which has already examined the error code. What remains is to restore the buffer back to the pool.

The next method starts an ASIO read on the subscriber's socket:

depth_feed_connection.cpp: DepthFeedConnection class implementation, continued
  1. void
  2. DepthFeedConnection::issue_read()
  3. {
  4. BufferPtr bp = reserve_recv_buffer();
  5. RecvHandler recv_handler = boost::bind(&DepthFeedConnection::on_receive,
  6. this, bp, _1, _2);
  7. boost::asio::mutable_buffers_1 buffer(
  8. boost::asio::buffer(*bp, bp->size()));
  9. socket_.async_receive(buffer, 0, recv_handler);
  10. }

issue_read() reserves a buffer, binds the buffer to the on_receive() method, and issues the asynchronous read.

The next three methods look through the command line arguments for configuration variables:

depth_feed_connection.cpp: DepthFeedConnection class implementation, continued
  1. const char*
  2. DepthFeedConnection::template_file_from_args(int argc, const char* argv[])
  3. {
  4. bool next_is_name = false;
  5. for (int i = 0; i < argc; ++i) {
  6. if (next_is_name) {
  7. return argv[i];
  8. } else if (strcmp(argv[i], "-t") == 0) {
  9. next_is_name = true;
  10. }
  11. }
  12. return "./templates/depth.xml";
  13. }
  14.  
  15. const char*
  16. DepthFeedConnection::host_from_args(int argc, const char* argv[])
  17. {
  18. bool next_is_host = false;
  19. for (int i = 0; i < argc; ++i) {
  20. if (next_is_host) {
  21. return argv[i];
  22. } else if (strcmp(argv[i], "-h") == 0) {
  23. next_is_host = true;
  24. }
  25. }
  26. return "127.0.0.1";
  27. }
  28.  
  29. int
  30. DepthFeedConnection::port_from_args(int argc, const char* argv[])
  31. {
  32. bool next_is_port = false;
  33. for (int i = 0; i < argc; ++i) {
  34. if (next_is_port) {
  35. return atoi(argv[i]);
  36. } else if (strcmp(argv[i], "-p") == 0) {
  37. next_is_port = true;
  38. }
  39. }
  40. return 10003;
  41. }
  42.  
  43. } } // End namespace

The first method looks for a -t followed by a template file name. The second looks foe a -h followed by a host name. The last one looks for a -p followed by a port number. All three return default values.

Handling Liquibook Events

At this point, the publisher is able to accept subscriber connections and route QuickFAST messages to the subscriber. What is missing is handling Liquibook events, and creating the QuickFAST messages to send. As shown in figure 2 above, the publisher needs a properly-typedTradeListener for trade events, and a properly-typed DepthListener for depth events.

The TradeListener interface has a single method to implement:

trade_listener.h: TradeListener interface declaration
  1. namespace liquibook { namespace book {
  2.  
  3. /// @brief listener of trade events. Implement to build a trade feed.
  4. template <class OrderBook >
  5. class TradeListener {
  6. public:
  7. /// @brief callback for a trade
  8. /// @param book the order book of the fill (not defined whether this is before
  9. /// or after fill)
  10. /// @param qty the quantity of this fill
  11. /// @param cost the cost of this fill (qty * price)
  12. virtual void on_trade(const OrderBook* book,
  13. Quantity qty,
  14. Cost cost) = 0;
  15. };
  16.  
  17. } }

Likewise, the DepthListener interface has a single method to implement:

trade_listener.h: TradeListener interface declaration
  1. namespace liquibook { namespace book {
  2.  
  3. /// @brief listener of depth events. Implement to build an aggregate depth
  4. /// feed.
  5. template <class OrderBook >
  6. class DepthListener {
  7. public:
  8. /// @brief callback for change in tracked aggregated depth
  9. virtual void on_depth_change(
  10. const OrderBook* book,
  11. const typename OrderBook::DepthTracker* depth) = 0;
  12. };
  13.  
  14. } }
  15.  

The publisher implements these interfaces in the DepthFeedPublisher class:

depth_feed_publisher.h: DepthFeedPublisher class declaration
  1. namespace liquibook { namespace examples {
  2.  
  3. class DepthFeedPublisher : public ExampleOrderBook::TypedDepthListener,
  4. public ExampleOrderBook::TypedTradeListener,
  5. public TemplateConsumer {

The DepthFeedPublisher class implements both the ExampleOrderBook::TypedDepthListener and the ExampleOrderBook::TypedTradeListener interfaces, which bind the book::DepthListener and the book::TradeListener templates to those used in ExampleOrderBookDepthFeedPublisher also inherits from TemplateConsumer, but this is simply a convenience (to reduce the need for name qualifiers), as TemplateConsumer's data and methods are entirely static.

The DepthFeedPublisher has a simple constructor, for initializing its pointer.

depth_feed_publisher.h: DepthFeedPublisher class declaration, continued
  1. public:
  2. DepthFeedPublisher();

The DepthFeedPublisher keeps a pointer to a DepthFeedConnection, and receives that pointer through a setter method:

depth_feed_publisher.h: DepthFeedPublisher class declaration, continued
  1. void set_connection(DepthFeedConnection* connection);

Next are the two event handlers to implement the two interfaces:

depth_feed_publisher.h: DepthFeedPublisher class declaration, continued
  1. virtual void on_trade(
  2. const book::OrderBook<OrderPtr>* order_book,
  3. book::Quantity qty,
  4. book::Cost cost);
  5.  
  6. virtual void on_depth_change(
  7. const book::DepthOrderBook<OrderPtr>* order_book,
  8. const book::DepthOrderBook<OrderPtr>::DepthTracker* tracker);

The private section has a single member - the DepthFeedConnection pointer:

depth_feed_publisher.h: DepthFeedPublisher class declaration, continued
  1. private:
  2. DepthFeedConnection* connection_;

Next are methods to build the feed messages:

depth_feed_publisher.h: DepthFeedPublisher class declaration, continued
  1. // Build an trade message
  2. void build_trade_message(
  3. QuickFAST::Messages::FieldSet& message,
  4. const std::string& symbol,
  5. book::Quantity qty,
  6. book::Cost cost);
  7.  
  8. // Build an incremental depth message
  9. void build_depth_message(
  10. QuickFAST::Messages::FieldSet& message,
  11. const std::string& symbol,
  12. const book::DepthOrderBook<OrderPtr>::DepthTracker* tracker,
  13. bool full_message);
  14. void build_depth_level(
  15. QuickFAST::Messages::SequencePtr& level_seq,
  16. const book::DepthLevel* level,
  17. int level_index);

And finally a method to generate a time stamp for the messages:

depth_feed_publisher.h: DepthFeedPublisher class declaration, continued
  1. uint32_t time_stamp();
  2. };
  3.  
  4. } } // End namespace

The DepthFeedPublisher constructor initializes its connection pointer, which is set by a separate setter method:

depth_feed_publisher.cpp: DepthFeedPublisher class implementation
  1. namespace liquibook { namespace examples {
  2.  
  3. using namespace QuickFAST::Messages;
  4.  
  5. DepthFeedPublisher::DepthFeedPublisher()
  6. : connection_(NULL)
  7. {
  8. }
  9.  
  10. void
  11. DepthFeedPublisher::set_connection(DepthFeedConnection* connection)
  12. {
  13. connection_ = connection;
  14. }

The DepthFeedPublisher's primary responsibility is to build feed messages, in response to events. The first event handler is on_trade():

depth_feed_publisher.cpp: DepthFeedPublisher class implementation, continued
  1. void
  2. DepthFeedPublisher::on_trade(
  3. const book::OrderBook<OrderPtr>* order_book,
  4. book::Quantity qty,
  5. book::Cost cost)
  6. {
  7. // Publish trade
  8. QuickFAST::Messages::FieldSet message(20);
  9. const ExampleOrderBook* exob =
  10. dynamic_cast<const ExampleOrderBook*>(order_book);
  11. std::cout << "Got trade for " << exob->symbol()
  12. << " qty " << qty
  13. << " cost " << cost << std::endl;
  14. build_trade_message(message, exob->symbol(), qty, cost);
  15. connection_->send_trade(message);
  16. }

on_trade() starts by casting the order book to the derived type. The reader will recall that the derived order book is used to store the symbol of the order book. The method then builds the trade message, through a call to build_trade_message(), and distributes the message through the DepthFeedConnection.

The next event handler handles depth change events:

depth_feed_publisher.cpp: DepthFeedPublisher class implementation, continued
  1. void
  2. DepthFeedPublisher::on_depth_change(
  3. const book::DepthOrderBook<OrderPtr>* order_book,
  4. const book::DepthOrderBook<OrderPtr>::DepthTracker* tracker)
  5. {
  6. // Publish changed levels of order book
  7. QuickFAST::Messages::FieldSet message(20);
  8. const ExampleOrderBook* exob =
  9. dynamic_cast<const ExampleOrderBook*>(order_book);
  10. build_depth_message(message, exob->symbol(), tracker, false);
  11. if (!connection_->send_incr_update(exob->symbol(), message)) {
  12. // Publish all levels of order book
  13. QuickFAST::Messages::FieldSet full_message(20);
  14. build_depth_message(full_message, exob->symbol(), tracker, true);
  15. connection_->send_full_update(exob->symbol(), full_message);
  16. }
  17. }

Similar to the trade event handler, the depth change event handler casts the order book to the derived class, builds a feed message, and sends it to the clients through the DepthFeedConnection. What is different in on_depth_change() is that since the depth feed is incremental, as noted above, the clients must be known to be in a valid state for the security of the update to handle the next message.

The reader will recall that the call to DepthFeedConnection::send_incr_update() returns true if and only if all of the clients could handle the message. If not, false is returned, and on_depth_change() builds and sends a full update instead. In the steady state, only the incremental message need by built.

The build_trade_message() method is used by on_trade() to build a trade message:

depth_feed_publisher.cpp: DepthFeedPublisher class implementation, continued
  1. void
  2. DepthFeedPublisher::build_trade_message(
  3. QuickFAST::Messages::FieldSet& message,
  4. const std::string& symbol,
  5. book::Quantity qty,
  6. book::Cost cost)
  7. {
  8. message.addField(id_timestamp_, FieldUInt32::create(time_stamp()));
  9. message.addField(id_symbol_, FieldString::create(symbol));
  10. message.addField(id_qty_, FieldUInt32::create(qty));
  11. message.addField(id_cost_, FieldUInt32::create(cost));
  12. }

Building a QuickFAST message is relatively simple. Each field is added to the message through the addField() call. The sequence number, added later, is particular to a specific client.

The next message builder, build_depth_message(), builds repeating sequences of data, and is more complex:

depth_feed_publisher.cpp: DepthFeedPublisher class implementation, continued
  1. void
  2. DepthFeedPublisher::build_depth_message(
  3. QuickFAST::Messages::FieldSet& message,
  4. const std::string& symbol,
  5. const book::DepthOrderBook<OrderPtr>::DepthTracker* tracker,
  6. bool full_message)
  7. {
  8. size_t bid_count(0), ask_count(0);
  9.  
  10. message.addField(id_timestamp_, FieldUInt32::create(time_stamp()));
  11. message.addField(id_symbol_, FieldString::create(symbol));
  12.  
  13. // Build the changed levels
  14. book::ChangeId last_published_change = tracker->last_published_change();
  15.  
  16. // Build changed bids
  17. {
  18. SequencePtr bids(new Sequence(id_bids_length_, 1));
  19. int index = 0;
  20. const book::DepthLevel* bid = tracker->bids();
  21. // Create sequence of bids
  22. while (true) {
  23. if (full_message || bid->changed_since(last_published_change)) {
  24. build_depth_level(bids, bid, index);
  25. ++bid_count;
  26. }
  27. ++index;
  28. if (bid == tracker->last_bid_level()) {
  29. break;
  30. } else {
  31. ++bid;
  32. }
  33. }
  34. message.addField(id_bids_, FieldSequence::create(bids));
  35. }
  36.  
  37. // Build changed asks
  38. {
  39. SequencePtr asks(new Sequence(id_asks_length_, 1));
  40. int index = 0;
  41. const book::DepthLevel* ask = tracker->asks();
  42. // Create sequence of asks
  43. while (true) {
  44. if (full_message || ask->changed_since(last_published_change)) {
  45. build_depth_level(asks, ask, index);
  46. ++ask_count;
  47. }
  48. ++index;
  49. if (ask == tracker->last_ask_level()) {
  50. break;
  51. } else {
  52. ++ask;
  53. }
  54. }
  55. message.addField(id_asks_, FieldSequence::create(asks));
  56. }
  57. std::cout << "Encoding " << (full_message ? "full" : "incr")
  58. << " depth message for symbol " << symbol
  59. << " with " << bid_count << " bids, "
  60. << ask_count << " asks" << std::endl;
  61. }

This method starts like add_trade(), adding the timestamp and symbol fields. Next, two sequences are built, for the changed bid and ask levels. To support an incremental feed, Liquibook tracks a change number (called ChangeId) on each depth level, and the ID of the last published change in the depth, which is set after the callback completes. To determine the levels to publish, the publisher needs to compare the last change of each level to the last published change of the depth as a whole.

In build_depth_message(), a bid sequence is created, and an associated length is attached. The changed bid levels are each added to the sequence through calls to build_depth_level(). The iteration logic here must be careful to not move beyond the last bid level. Finally, the sequence is added to the message. The same is done for changed ask levels.

By comparison, build_depth_level() is simple:

depth_feed_publisher.cpp: DepthFeedPublisher class implementation, continued
  1. void
  2. DepthFeedPublisher::build_depth_level(
  3. QuickFAST::Messages::SequencePtr& level_seq,
  4. const book::DepthLevel* level,
  5. int level_index)
  6. {
  7. FieldSetPtr level_fields(new FieldSet(4));
  8. level_fields->addField(id_level_num_, FieldUInt8::create(level_index));
  9. level_fields->addField(id_order_count_,
  10. FieldUInt32::create(level->order_count()));
  11. level_fields->addField(id_price_,
  12. FieldUInt32::create(level->price()));
  13. level_fields->addField(id_size_,
  14. FieldUInt32::create(level->aggregate_qty()));
  15. level_seq->addEntry(level_fields);
  16. }

This method shows how to add an entry to a QuickFAST sequence - by creating a FieldSet, adding fields to it, and adding the FieldSet to the sequence.

The final method is a helper, to create an integer timestamp for a message:

depth_feed_publisher.cpp: DepthFeedPublisher class implementation, continued
  1. uint32_t
  2. DepthFeedPublisher::time_stamp()
  3. {
  4. time_t now;
  5. time(&now);
  6. return now;
  7. }
  8.  
  9. } } // End namespace
  10.  

Publisher Initialization

At this point the publisher has all the necessary parts to publish a trade feed and an incremental depth feed for the exchange. The only things that remains are to create, initialize, and associate the various parts, and to generate the random orders.

The exchange is initialized in the file publisher_main.cpp. The main() function starts by establishing the securities to "trade" in the exchange - in this case the NASDAQ 100 - taken from a snapshot in April 2013. main() records the symbol and a base price in a structure:

publisher_main.cpp: SecurityInfo structure
  1. struct SecurityInfo {
  2. std::string symbol;
  3. double ref_price;
  4.  
  5. SecurityInfo(const char* sym, double price)
  6. : symbol(sym),
  7. ref_price(price)
  8. {
  9. }
  10. };
  11.  
  12. typedef std::vector<SecurityInfo> SecurityVector;

The base price serves as a basis for generating random prices for the example exchange. main() keeps this security information in a SecurityVector and populates it in the create_securities() method, later in main().

The main() function begins by creating the DepthFeedConnection, and uses Boost's thread library to create a background thread to accept connections from subscribers:

publisher_main.cpp: main() function
  1. int main(int argc, const char* argv[])
  2. {
  3. // Feed connection
  4. examples::DepthFeedConnection connection(argc, argv);
  5.  
  6. // Open connection in background thread
  7. connection.accept();
  8. boost::function<void ()> acceptor(
  9. boost::bind(&examples::DepthFeedConnection::run, &connection));
  10. boost::thread acceptor_thread(acceptor);

Next, the DepthFeedPublisher and the Exchange are created, and theDepthFeedPublisher is made aware of the DepthFeedConnection:

publisher_main.cpp: main() function, continued
  1. // Create feed publisher
  2. examples::DepthFeedPublisher feed;
  3. feed.set_connection(&connection);
  4.  
  5. // Create exchange
  6. examples::Exchange exchange(&feed, &feed);

Note that the exchange constructor requires a TradeListener and a DepthListener, both of which the DepthFeedPublisher implements.

Finally, the securities are created and used to populate the exchange, and orders are generated.

publisher_main.cpp: main() function, continued
  1. // Create securities
  2. SecurityVector securities;
  3. create_securities(securities);
  4.  
  5. // Populate exchange with securities
  6. populate_exchange(exchange, securities);
  7.  
  8. // Generate random orders
  9. generate_orders(exchange, securities);
  10.  
  11. return 0;
  12. }

The final function call, to generate_orders() is an infinite loop, so there is no need to worry about the main() method returning.

The helper function create_securities() adds 100 securities to the SecurityVector:

publisher_main.cpp: main() helper functions
  1. void
  2. create_securities(SecurityVector& securities) {
  3. securities.push_back(SecurityInfo("AAPL", 436.36));
  4. securities.push_back(SecurityInfo("ADBE", 45.06));
  5. securities.push_back(SecurityInfo("ADI", 43.93));
  6.  
  7. // Repeated for 100 securities...
  8. }

Due to the mercy of the author, the other 97 securities were omitted from this paper. The next helper function, populate_exchange() adds these securities to the exchange:

publisher_main.cpp: main() helper functions, continued
  1. void
  2. populate_exchange(examples::Exchange& exchange, const SecurityVector& securities) {
  3. SecurityVector::const_iterator sec;
  4. for (sec = securities.begin(); sec != securities.end(); ++sec) {
  5. exchange.add_order_book(sec->symbol);
  6. }
  7. }

The final helper function, generate_orders() creates random orders and adds them to the exchange: 

publisher_main.cpp: main() helper functions, continued
  1. void
  2. generate_orders(examples::Exchange& exchange, const SecurityVector& securities) {
  3. time_t now;
  4. time(&now);
  5. std::srand(now);
  6.  
  7. size_t num_securities = securities.size();
  8. while (true) {
  9. // which security
  10. size_t index = std::rand() % num_securities;
  11. const SecurityInfo& sec = securities[index];
  12. // side
  13. bool is_buy = std::rand() % 2;
  14. // price
  15. uint32_t price_base = sec.ref_price * 100;
  16. uint32_t delta_range = price_base / 50; // +/- 2% of base
  17. int32_t delta = std::rand() % delta_range;
  18. delta -= (delta_range / 2);
  19. double price = double (price_base + delta) / 100;
  20.  
  21. // qty
  22. book::Quantity qty = (std::rand() % 10 + 1) * 100;
  23.  
  24. // order
  25. examples::OrderPtr order(new examples::Order(is_buy, price, qty));
  26.  
  27. // add order
  28. exchange.add_order(sec.symbol, order);
  29.  
  30. // Wait for eyes to read
  31. sleep(1);
  32. }
  33. }

The implementation of generate_orders() is somewhat complex, but the details are not relevant. There is a call to sleep() inside the loop, so that the data stream is produced at a somewhat readable pace. At this point, the publisher is complete.

Subscriber Application

With the publisher in place, focus turns to the subscriber. The subscriber must connect to the publisher, decode the FAST feed, recreate the depth state for each security and display the results. As noted earlier, the connection is handled by the DepthFeedConnection class. The decoding and display of messages is handled by a the DepthFeedSubscriber class:

depth_feed_subscriber.h: DepthFeedSubscriber class declaration
  1. namespace liquibook { namespace examples {
  2.  
  3. class DepthFeedSubscriber : public TemplateConsumer {
  4. public:
  5. DepthFeedSubscriber(
  6. const QuickFAST::Codecs::TemplateRegistryPtr& templates);
  7.  
  8. // Handle a reset of the connection
  9. void handle_reset();
  10.  
  11. // Handle a message
  12. // return false if failure
  13. bool handle_message(BufferPtr& bp, size_t bytes_transferred);

The DepthFeedSubscriber class has a number of private members:

depth_feed_subscriber.h: DepthFeedSubscriber class declaration, continued
  1. private:
  2. QuickFAST::Codecs::Decoder decoder_;
  3. typedef std::map<std::string, book::Depth<5> > DepthMap;
  4. DepthMap depth_map_;
  5. uint64_t expected_seq_;
  6.  
  7. static const uint64_t MSG_TYPE_DEPTH;
  8. static const uint64_t MSG_TYPE_TRADE;

These members include a QuickFAST decoder, an order book for each security (kept in a std::map), and the expected sequence number. The sequence number is tracked to validate the feed, so that the subscriber is sure that every message from the publisher has been handled.

Finally, there are two constants for determining message type. The reader will note that these message type fields appear in the message, but the publisher nowhere encoded them. This is because of their field instructions, found in the FAST template, of constant.

In addition, there are a few private methods, to process incoming methods:

depth_feed_subscriber.h: DepthFeedSubscriber class declaration, continued
  1. void log_depth(book::Depth<5>& depth);
  2. bool handle_trade_message(const std::string& symbol,
  3. uint64_t& seq_num,
  4. uint64_t& timestamp,
  5. QuickFAST::Messages::Message& msg);
  6. bool handle_depth_message(const std::string& symbol,
  7. uint64_t& seq_num,
  8. uint64_t& timestamp,
  9. QuickFAST::Messages::Message& msg);
  10. };
  11. } }

After initializing the static members comes the class constructor:

depth_feed_subscriber.cpp: DepthFeedSubscriber class implementation
  1. namespace liquibook { namespace examples {
  2.  
  3. const uint64_t DepthFeedSubscriber::MSG_TYPE_DEPTH(11);
  4. const uint64_t DepthFeedSubscriber::MSG_TYPE_TRADE(22);
  5.  
  6. using QuickFAST::ValueType;
  7.  
  8. DepthFeedSubscriber::DepthFeedSubscriber(
  9. const QuickFAST::Codecs::TemplateRegistryPtr& templates)
  10. : decoder_(templates),
  11. expected_seq_(1)
  12. {
  13. }
  14.  

The constructor passes the templates to the decoder, and initializes the expected sequence number. Next is the handle_reset() method, which is called when the connection to the publisher is reset:

depth_feed_subscriber.cpp: DepthFeedSubscriber class implementation, continued
  1. void
  2. DepthFeedSubscriber::handle_reset()
  3. {
  4. expected_seq_ = 1;
  5. }

This simple method just resets the expected sequence number to one. Next, is the meaty method handle_message():

depth_feed_subscriber.cpp: DepthFeedSubscriber class implementation, continued
  1. bool
  2. DepthFeedSubscriber::handle_message(BufferPtr& bp, size_t bytes_transferred)
  3. {
  4. // Decode the message
  5. QuickFAST::Codecs::DataSourceBuffer source(bp->c_array(), bytes_transferred);
  6. QuickFAST::Codecs::SingleMessageConsumer consumer;
  7. QuickFAST::Codecs::GenericMessageBuilder builder(consumer);
  8. decoder_.decodeMessage(source, builder);
  9. QuickFAST::Messages::Message& msg(consumer.message());
  10.  
  11. // Examine message contents
  12. uint64_t seq_num, msg_type, timestamp;
  13. const QuickFAST::StringBuffer* string_buffer;
  14. size_t bids_length, asks_length;
  15. std::string symbol;
  16. if (!msg.getUnsignedInteger(id_seq_num_, ValueType::UINT32, seq_num)) {
  17. std::cout << "Could not get seq num from msg" << std::endl;
  18. return false;
  19. }
  20. if (seq_num != expected_seq_) {
  21. std::cout << "ERROR: Got Seq num " << seq_num << ", expected "
  22. << expected_seq_ << std::endl;
  23. return false;
  24. }
  25. if (!msg.getUnsignedInteger(id_msg_type_, ValueType::UINT32, msg_type)) {
  26. std::cout << "Could not get msg type from msg" << std::endl;
  27. return false;
  28. }
  29. if (!msg.getString(id_symbol_, ValueType::ASCII, string_buffer)) {
  30. std::cout << "Could not get symbol from msg" << std::endl;
  31. return false;
  32. }
  33. if (!msg.getUnsignedInteger(id_timestamp_, ValueType::UINT32, timestamp)) {
  34. std::cout << "Could not get timestamp from msg" << std::endl;
  35. return false;
  36. }
  37. bool result = false;
  38. symbol = (std::string)*string_buffer;
  39. switch (msg_type) {
  40. case MSG_TYPE_DEPTH:
  41. result = handle_depth_message(symbol, seq_num, timestamp, msg);
  42. break;
  43. case MSG_TYPE_TRADE:
  44. result = handle_trade_message(symbol, seq_num, timestamp, msg);
  45. break;
  46. default:
  47. std::cout << "ERROR: Unknown message type " << msg_type
  48. << " seq num " << seq_num << std::endl;
  49. return false;
  50. }
  51. ++expected_seq_;
  52. return result;
  53. }
  54.  

This method first decodes the FAST message. This is done by stuffing the message contents into a QuickFAST buffer specific for decoding. Next, a message builder and consumer are created and associated, and the message is decoded into the builder. After this, the message is available from the consumer.

The common fields are then checked, by using the type-specific extractors on the QuickFAST::Messages::Message class, such as getUnsignedInteger(). This starts with the sequence number, which is validated against the expected sequence number. Next the message type, the symbol, and the timestamp are extracted. If any of these fail, the method exits with an error value.

Finally, since the message type is known, the proper message-type-specific handler is called.

The first helper method logs the contents of the depth for a security:

depth_feed_subscriber.cpp: DepthFeedSubscriber class implementation, continued
  1. void
  2. DepthFeedSubscriber::log_depth(book::Depth<5>& depth)
  3. {
  4. book::DepthLevel* bid = depth.bids();
  5. book::DepthLevel* ask = depth.asks();
  6. printf("----------BID---------- ----------ASK----------\n");
  7. while (bid || ask) {
  8. if (bid && bid->order_count()) {
  9. printf("%8.2f %9d [%2d]",
  10. (double)bid->price() / Order::precision_,
  11. bid->aggregate_qty(), bid->order_count());
  12. if (bid == depth.last_bid_level()) {
  13. bid = NULL;
  14. } else {
  15. ++bid;
  16. }
  17. } else {
  18. // Blank lines
  19. printf(" ");
  20. bid = NULL;
  21. }
  22.  
  23. if (ask && ask->order_count()) {
  24. printf(" %8.2f %9d [%2d]\n",
  25. (double)ask->price() / Order::precision_,
  26. ask->aggregate_qty(), ask->order_count());
  27. if (ask == depth.last_ask_level()) {
  28. ask = NULL;
  29. } else {
  30. ++ask;
  31. }
  32. } else {
  33. // Newline
  34. printf("\n");
  35. ask = NULL;
  36. }
  37. }
  38. }

This method is complex, because it logs both bid and ask on the same line. In its loop, there could be a bid and an ask, only a bid, or only an ask.

The next helper handles a depth message:

depth_feed_subscriber.cpp: DepthFeedSubscriber class implementation, continued
  1. bool
  2. DepthFeedSubscriber::handle_depth_message(
  3. const std::string& symbol,
  4. uint64_t& seq_num,
  5. uint64_t& timestamp,
  6. QuickFAST::Messages::Message& msg)
  7. {
  8. size_t bids_length, asks_length;
  9. std::cout << timestamp
  10. << " Got depth msg " << seq_num
  11. << " for symbol " << symbol << std::endl;
  12.  
  13. // Create or find depth
  14. std::pair<DepthMap::iterator, bool> results = depth_map_.insert(
  15. std::make_pair(symbol, book::Depth<5>()));
  16. book::Depth<5>& depth = results.first->second;
  17.  
  18. if (msg.getSequenceLength(id_bids_, bids_length)) {
  19. for (size_t i = 0; i < bids_length; ++i) {
  20. const QuickFAST::Messages::MessageAccessor* accessor;
  21. if (msg.getSequenceEntry(id_bids_, i, accessor)) {
  22. uint64_t level_num, price, order_count, aggregate_qty;
  23. if (!accessor->getUnsignedInteger(id_level_num_, ValueType::UINT8,
  24. level_num)) {
  25. std::cout << "Could not get Bid level from depth msg" << std::endl;
  26. return false;
  27. }
  28. if (!accessor->getUnsignedInteger(id_price_, ValueType::UINT32,
  29. price)) {
  30. std::cout << "Could not get Bid price from depth msg" << std::endl;
  31. return false;
  32. }
  33. if (!accessor->getUnsignedInteger(id_order_count_, ValueType::UINT32,
  34. order_count)) {
  35. std::cout << "Could not get Bid count from depth msg" << std::endl;
  36. return false;
  37. }
  38. if (!accessor->getUnsignedInteger(id_size_, ValueType::UINT32,
  39. aggregate_qty)) {
  40. std::cout << "Could not get Bid agg qty from depth msg" << std::endl;
  41. return false;
  42. }
  43.  
  44. book::DepthLevel& level = depth.bids()[level_num];
  45. level.set(price, aggregate_qty, order_count);
  46.  
  47. } else {
  48. std::cout << "Failed to get bid " << i << std::endl;
  49. return false;
  50. }
  51. msg.endSequenceEntry(id_bids_, i, accessor);
  52. }
  53. }
  54. if (msg.getSequenceLength(id_asks_, asks_length)) {
  55. for (size_t i = 0; i < asks_length; ++i) {
  56. const QuickFAST::Messages::MessageAccessor* accessor;
  57. if (msg.getSequenceEntry(id_asks_, i, accessor)) {
  58. uint64_t level_num, price, order_count, aggregate_qty;
  59. if (!accessor->getUnsignedInteger(id_level_num_, ValueType::UINT8,
  60. level_num)) {
  61. std::cout << "Could not get Ask level from depth msg " << std::endl;
  62. return false;
  63. }
  64. if (!accessor->getUnsignedInteger(id_price_, ValueType::UINT32,
  65. price)) {
  66. std::cout << "Could not get Ask price from depth msg" << std::endl;
  67. return false;
  68. }
  69. if (!accessor->getUnsignedInteger(id_order_count_, ValueType::UINT32,
  70. order_count)) {
  71. std::cout << "Could not get Ask count from depth msg " << std::endl;
  72. return false;
  73. }
  74. if (!accessor->getUnsignedInteger(id_size_, ValueType::UINT32,
  75. aggregate_qty)) {
  76. std::cout << "Could not get Ask agg qty from depth msg " << std::endl;
  77. return false;
  78. }
  79.  
  80. book::DepthLevel& level = depth.asks()[level_num];
  81. level.set(price, aggregate_qty, order_count);
  82.  
  83. } else {
  84. std::cout << "Failed to get ask " << i << std::endl;
  85. return false;
  86. }
  87. msg.endSequenceEntry(id_asks_, i, accessor);
  88. }
  89. }
  90. log_depth(depth);
  91. return true;
  92. }
  93.  

This method has the common fields passed to it, and logs the symbol of the message. It then finds the proper depth object for the security, or creates one if this is the first depth message for the security.

Next the method must iterate through the sequence of changed bids. To do this, the sequence length is first extracted, and each entry accessed through an accessor.

Each bid field is then accessed using type-specific extractors, including the level number, the price, the number of orders, and the aggregate quantity. These updated values are then used to update the depth level for that security through a call to set(). Similar logic is performed for changed ask levels, and the resulting depth is logged using log_depth().

The final helper method handles a trade message:

depth_feed_subscriber.cpp: DepthFeedSubscriber class implementation, continued
  1. bool
  2. DepthFeedSubscriber::handle_trade_message(
  3. const std::string& symbol,
  4. uint64_t& seq_num,
  5. uint64_t& timestamp,
  6. QuickFAST::Messages::Message& msg)
  7. {
  8. uint64_t qty, cost;
  9. // Get trade fields
  10. if (!msg.getUnsignedInteger(id_qty_, ValueType::UINT32, qty)) {
  11. std::cout << "Could not qty from trade msg" << std::endl;
  12. return false;
  13. }
  14. if (!msg.getUnsignedInteger(id_cost_, ValueType::UINT32, cost)) {
  15. std::cout << "Could not get cost from trade msg" << std::endl;
  16. return false;
  17. }
  18.  
  19. double price = (double) cost / (qty * Order::precision_);
  20. std::cout << timestamp
  21. << " Got trade msg " << seq_num
  22. << " for symbol " << symbol
  23. << ": " << qty << "@" << price
  24. << std::endl;
  25.  
  26. return true;
  27. }
  28.  
  29. } }

Like handle_depth_message()handle_trade_message() is passed the common fields. It extracts the trade quantity and cost, from which it calculates the price of the trade (after adjusting for the precision). The result is then logged.

Subscriber Initialization

Finally, the subscriber can create, initialize, and associate its components. The main() function of subscriber starts by creating the DepthFeedConnection and the DepthFeedSubscriber:

subscriber_main.cpp: main() function
  1. int main(int argc, const char* argv[])
  2. {
  3. // Create the connection
  4. liquibook::examples::DepthFeedConnection connection(argc, argv);
  5.  
  6. // Create feed subscriber
  7. liquibook::examples::DepthFeedSubscriber feed(connection.get_templates());
  8.  
  9. // Set up handlers
  10. liquibook::examples::MessageHandler msg_handler =
  11. boost::bind(&liquibook::examples::DepthFeedSubscriber::handle_message,
  12. &feed, _1, _2);
  13. liquibook::examples::ResetHandler reset_handler =
  14. boost::bind(&liquibook::examples::DepthFeedSubscriber::handle_reset,
  15. &feed);
  16. connection.set_message_handler(msg_handler);
  17. connection.set_reset_handler(reset_handler);
  18.  
  19. // Connect to server
  20. connection.connect();
  21. connection.run();
  22.  
  23. return 0;
  24. }

Next, the handlers for messages and connection resets are set up, and finally, the subscriber connects to the publisher. This completes the subscriber application.

Running the Example

By default the publisher and subscriber connect on port 10003 on localhost. To run on a different port, add the -p to each command line. The subscriber may also specify a remote host with the -h option. To specify a different template file, or template file location, use the -t option.

For example:

  $ ./depth_feed_publisher -p 9999
  $ ./depth_feed_subscriber -p 9999

The subscriber produces output like this:

1369167639 Got depth msg 8182 for symbol GILD
----------BID----------    ----------ASK----------
   49.93       400 [ 1]       50.12       500 [ 1]
   49.66       300 [ 1]       50.13       500 [ 1]
   49.59      1000 [ 3]       50.14       500 [ 1]
   49.58       700 [ 1]       50.19       500 [ 1]
                              50.26       800 [ 1]
1369167640 Got depth msg 8183 for symbol XLNX
----------BID----------    ----------ASK----------
   37.40       400 [ 1]       37.60      1000 [ 2]
   37.36       400 [ 1]       37.61      1900 [ 3]
   37.35      1000 [ 1]       37.65       900 [ 3]
   37.28       500 [ 2]       37.70       600 [ 1]
   37.25      1700 [ 2]       37.71      1000 [ 2]
1369167641 Got depth msg 8184 for symbol GOLD
----------BID----------    ----------ASK----------
   78.30       300 [ 1]       78.43       300 [ 1]
   78.18       300 [ 1]       78.48       700 [ 1]
   78.17       900 [ 1]       78.64       600 [ 1]
   78.09       200 [ 1]       78.70       600 [ 1]
   78.06       900 [ 1]       79.12       500 [ 1]
1369167642 Got trade msg 8185 for symbol HSIC: 600@89.47
1369167642 Got trade msg 8186 for symbol HSIC: 200@89.49
1369167642 Got depth msg 8187 for symbol HSIC
----------BID----------    ----------ASK----------
   89.32       700 [ 1]       89.49       500 [ 1]
   89.09       500 [ 1]       89.52       200 [ 1]
   88.96       600 [ 1]       89.68       500 [ 1]
   88.82       200 [ 1]       89.76       900 [ 1]
   88.59       100 [ 1]       89.91       100 [ 1]

These updates show the full depth for each security updated, and not just the changed levels. The first three updates shows the depth for various securities, followed by some trades and a depth update for a fourth security. The trades go hand-in-hand with a depth update. Some depth updates cause trades, and some do not.

Note that a very good test of the incremental feed is to start two subscribers at different times, so that the first subscriber has handled a good number of messages before the second subscriber is started. The two should still be perfectly in sync, and produce identical output.

Summary

Although much code has been walked through in this example, very little custom code was related to the order book. Most of the code was either general networking code using ASIO, or QuickFAST code. In addition, the complexities of order book maintenance and depth-building are contained - most of the code shown was simple, or even trivial.

Liquibook makes it very simple to implement a limit order book matching engine, whether you are building an exchange, or are a trader simulating an exchange's matching capability.

References

 



 

The Software Engineering Tech Trends (SETT) is a monthly newsletter featuring emerging trends in software engineering.

Check out OUR MOST POPULAR articles!
secret