January 15, 2010 - By Don Busch
Middleware News Brief (MNB) features news and technical information about Open Source middleware technologies.
INTRODUCTION
For security traders, having up-to-date and accurate information has always been critical. The financial industry was an early adopter of electronic communication to distribute market data going back to the days of the stock ticker, and continues to push the limits of the technology today.
In recent years, the volume of market data generated by exchanges around the world has skyrocketed. Not only is the amount of data increasing, but so are the types of data being tracked and published. The global nature of the financial world means that traders now have to watch multiple exchanges, where a few years ago they might have monitored only one or two. Trading firms and exchanges are finding a need to process messages at increasing rates, increases that are not matched by improvements in processor speed or network throughput. This requires innovative ways to cope with the growth.
The exchanges have helped by adopting a standard way to compress market data, called FAST (Fix Adapted for STreaming). Dedicated lines from vendors ensure high bandwidth and reliability. However at the receiving end, the market data still has to be decoded, filtered, distributed for analysis, and sent to the trader or trading system before it can result in a trade. This time lapse from the feed to trade is called latency. It cannot be avoided, but shrewd trading companies try to minimize it to achieve a timing advantage in the market. A trading company's market data infrastructure needs to handle a high volume of data from multiple sources in a cost-effective, reliable manner with the lowest possible latency. The market data infrastructure must optimize its use of available bandwidth and disseminate data with a minimum of overhead.
Open-source tools such as QuickFAST and OpenDDS can make low-latency, high-volume data distribution a reality. QuickFAST is an open-source implementation of the FAST protocol, an efficient data compression protocol created by the FIX Protocol Limited organization. OpenDDS is an implementation of the Object Management's Group's Data Distribution Service for Real-Time Systems (DDS) specification for low-latency publish/subscribe data dissemination. By using open source software on the critical path, trading companies can gain continuous insight into their latency and make plans to reduce it.
This paper discusses the testing that OCI did to explore the combination of an open source decoder (QuickFAST) with an open source publish and subscribe distribution service (OpenDDS) using commodity hardware. OCI has put together a typical system that uses these open source tools to accept incoming market data and distribute it throughout the local office to the locations where it will be most useful.
PERFORMANCE
We tested this system on two SGI Intel Xeon W5590 3.33GHz servers connected via a Voltaire QDR Infiniband switch and running SUSE Linux Enterprise Server 11. These servers were provided by SGI. The test results depended greatly on how complex the messages were and how much processing we did on each message. For simple messages, similar to the ARCA messages published by the New York Stock Exchange, we published each FAST message as one OpenDDS sample. For complex messages, similar to the messages published by the Chicago Mercantile Exchange (CME), we split up the message's MarketDataEntries sequence and published each security's data on a separate OpenDDS sample.
For our simple, ARCA-like message, we have observed QuickFAST decode times under 500 nanoseconds, and latencies in publishing across OpenDDS between hosts in the neighborhood of about 40 microseconds. For complex, CME-like messages, we have observed QuickFAST decode times under 2.5 microseconds, and latencies in publishing across OpenDDS between hosts in the neighborhood of about 65 microseconds.
As always, your mileage may vary. The attached source code should enable you to run these examples, and others, in your own environment to compare results.
QUICKFAST
QuickFAST is an open-source native C++ implementation of the FAST protocol. FAST (Fix Adapted for STreaming) was developed by FIX Protocol Limited as a way to reduce the bandwidth and network-latency required to distribute market data without incurring excessive CPU costs. It is being widely adopted in the financial industry. FAST applications exchange compression instructions out-of-band to take advantage of the repetitive nature of financial information.
FAST participants exchange out-of-band templates that describe how messages are compressed across the wire. A FAST template for a message defines the fields contained in the message, field instructions for each field, and field operators for each field instruction. For example, a field operator may define the field as a constant-valued field, which means that the field's contents are not even sent across the wire since the field's value is encoded in the template. Or, a field operator may define the field as a copied field, meaning that if the field's value is not present then the value is assumed to be the same as the previous message's value. Examples of FAST templates appear later in this article. These are simple examples, and the FAST protocol enables a great deal more control over the compression behavior than what we show here.
A QuickFAST application implements the QuickFAST ValueMessageBuilder
to receive a callback from QuickFAST for each FAST message and each FAST message field it receives, as the diagram illustrates. An application may, if it wishes, act on the contents of a message field before the entire FAST message has been decoded.
OPENDDS
OpenDDS is an open-source publish-and-subscribe data dissemination framework that implements the OMG Data Distribution Service (DDS) specification. DDS is appropriate for distributed computing systems in which the primary objective of the participants is the quick and efficient dissemination of application data rather than access to shared services. The set of suppliers and/or consumers of application data may not be known at design time, and may change through the execution lifetime of the application. Dissemination of financial market data is a good match for OpenDDS's capabilities. The following diagram illustrates basic OpenDDS communication on three topics:
OpenDDS applications use type-safe interfaces to communicate application-defined structs directly between publishing and subscribing processes. On the publishing side, the application creates a type-specific DataWriter to write data on a topic; on the subscribing side, the application creates a DataReader for the same type, subscribing on the same topic. Data samples are written from a DataWriter directly to a DataReader; there is no intervening broker for each sample written. The DataReader only receives data published on its requested topics. OpenDDS also supports several Quality-of-Service (QoS) policies to configure attributes such as reliability, persistence, the behavior of a late-joining subscribers, etc.
For more information on OpenDDS, see the OpenDDS web site and this introductory article.
EXAMPLE - "SIMPLE" MESSAGES
In our example, we will simulate a market data feed, starting from the exchange and ending at a final data consumer which may be a trader's workstation, an algorithmic trading engine, a history database, or something else. We will send FAST data from our simulated exchange via UDP multicast, use QuickFAST to convert each FAST message into a C++ application struct understood by OpenDDS, and publish the resulting OpenDDS sample to any interested subscribers.
We will run two examples. First, we will demonstrate with a simple data set that is somewhat similar to ARCA data. Then, we will show a more complex data set that is somewhat similar to CME data. Because actual market data is copyrighted, we cannot provide actual ARCA or CME data. So we will simulate it. The diagram illustrates the example's topology:
FAST TEMPLATE -- "SIMPLE" MESSAGES
First, we define the format of our simple FAST messages with a FAST template definition. In the real world, such a template is usually defined by the exchange and made available to the exchange's clients. The template defines how data is compressed across FAST, and its definition depends on domain knowledge of the data being exchanged. The FAST protocol gains its efficiency by this out-of-band exchange of the compression template.
In this template file, we will define a total of four messages -- "Quote", "New", "Change", and "Remove". Plus, for our own example, we will define a "Done" message to indicate to the QuickFAST consumer when the message stream has stopped. We will send the FAST messages from a multicasting process to simulate the topology that a trading application would likely encounter -- the "Done" message tells the multicast receiver when then message stream has completed. The "Quote" template follows:
- <?xml version="1.0"?>
- <templates xmlns="http://www.fixprotocol.org/ns/fast/td/1.1">
-
- <template name="Quote" id="1" xmlns="http://www.fixprotocol.org/ns/fast/td/1.1">
- <uInt16 name="MessageType" id="100">
- <constant value="11"></constant>
- </uInt16>
- <uInt32 name="SequenceNumber" id="200">
- <increment></increment>
- </uInt32>
- <uInt32 name="Timestamp" id="300">
- <copy></copy>
- </uInt32>
- <uInt32 name="OrderId" id="400">
- <copy></copy>
- </uInt32>
- <uInt32 name="Volume" id="500">
- <copy></copy>
- </uInt32>
- <uInt32 name="Price" id="600">
- <copy></copy>
- </uInt32>
- <uInt8 name="ExchangeId" id="700">
- <copy></copy>
- </uInt8>
- <uInt16 name="SecurityId" id="800">
- <copy></copy>
- </uInt16>
- <uInt8 name="SecurityType" id="900">
- <copy></copy>
- </uInt8>
- <uInt8 name="SessionId" id="1000">
- <copy></copy>
- </uInt8>
- </template>
You can see that the template has an identifier, and ten field instructions. The first field instruction is for a MessageType
field, which is an unsigned 16-bit integer with a field operator that indicates that the field has a constant value of 11. Since it has a constant value, the MessageType
field is never actually sent across the wire because its value can be determined by the knowing template's identifier. This is one of the ways in which FAST uses out-of-band knowledge of the data to minimize the amount of data sent over the wire.
The second field instruction is an unsigned 32-bit integer for the field named SequenceNumber
. SequenceNumber
has an "increment" field operator, which is another way of conserving bandwidth. When several Quote
messages are encoded in the same UDP packet, the SequenceNumber
value is only provided for the first message in the packet. On the receiving side, QuickFAST deduces the SequenceNumber
for subsequent messages by incrementing its last known SequenceNumber
.
The third field instruction is an unsigned 32-bit integer for the field named Timestamp
. Timestamp
has a "copy" field operator, which means that if the Timestamp
field is not in a message, the value from the previously processed message should be copied. Thus, if several consecutive messages have the same Timestamp
value, the Timestamp
value is only sent across the wire once.
As you can see, one of the primary FAST compression techniques is to remember a value sent in a previous message instead of sending it across the wire. Those values are stored in what the FAST specification refers to as a Dictionary. By default, there is one global dictionary, and that dictionary contains a map of values using the field instruction name as a key. For example, the map has an entry for the Timestamp
field's last value, and any message that has a Timestamp
field can use that value. However, FAST template authors may take control over the scope of the dictionary by specifying a dictionary per template, per application type, or a user-defined dictionary. Template authors have a great deal of discretion over the behavior of the dictionary.
In the rest of the template file, we will show definitions for the "New", "Change", "Remove", and "Done" messages:
- <template name="New" id="2" xmlns="http://www.fixprotocol.org/ns/fast/td/1.1">
- <uInt16 name="MessageType" id="100">
- <constant value="12"></constant>
- </uInt16>
- <uInt32 name="SequenceNumber" id="200">
- <increment></increment>
- </uInt32>
- <uInt32 name="Timestamp" id="300">
- <copy></copy>
- </uInt32>
- <uInt32 name="OrderId" id="400">
- <copy></copy>
- </uInt32>
- <uInt32 name="Volume" id="500">
- <copy></copy>
- </uInt32>
- <uInt32 name="Price" id="600">
- <copy></copy>
- </uInt32>
- <uInt8 name="ExchangeId" id="700">
- <copy></copy>
- </uInt8>
- <uInt16 name="SecurityId" id="800">
- <copy></copy>
- </uInt16>
- <uInt8 name="SecurityType" id="900">
- <copy></copy>
- </uInt8>
- <uInt8 name="SessionId" id="1000">
- <copy></copy>
- </uInt8>
- <uInt8 name="Side" id="1100">
- <copy></copy>
- </uInt8>
- </template>
-
- <template name="Change" id="3" xmlns="http://www.fixprotocol.org/ns/fast/td/1.1">
- <uInt16 name="MessageType" id="100">
- <constant value="13"></constant>
- </uInt16>
- <uInt32 name="SequenceNumber" id="200">
- <increment></increment>
- </uInt32>
- <uInt32 name="Timestamp" id="300">
- <copy></copy>
- </uInt32>
- <uInt32 name="OrderId" id="400">
- <copy></copy>
- </uInt32>
- <uInt32 name="Volume" id="500">
- <copy></copy>
- </uInt32>
- <uInt32 name="Price" id="600">
- <copy></copy>
- </uInt32>
- <uInt8 name="ExchangeId" id="700">
- <copy></copy>
- </uInt8>
- <uInt16 name="SecurityId" id="800">
- <copy></copy>
- </uInt16>
- <uInt8 name="SecurityType" id="900">
- <copy></copy>
- </uInt8>
- <uInt8 name="SessionId" id="1000">
- <copy></copy>
- </uInt8>
- <uInt8 name="Side" id="1100">
- <copy></copy>
- </uInt8>
- </template>
-
- <template name="Remove" id="4" xmlns="http://www.fixprotocol.org/ns/fast/td/1.1">
- <uInt16 name="MessageType" id="100">
- <constant value="14"></constant>
- </uInt16>
- <uInt32 name="SequenceNumber" id="200">
- <increment></increment>
- </uInt32>
- <uInt32 name="Timestamp" id="300">
- <copy></copy>
- </uInt32>
- <uInt32 name="OrderId" id="400">
- <copy></copy>
- </uInt32>
- <uInt8 name="ExchangeId" id="700">
- <copy></copy>
- </uInt8>
- <uInt16 name="SecurityId" id="800">
- <copy></copy>
- </uInt16>
- <uInt8 name="SecurityType" id="900">
- <copy></copy>
- </uInt8>
- <uInt8 name="SessionId" id="1000">
- <copy></copy>
- </uInt8>
- <uInt8 name="Side" id="1100">
- <copy></copy>
- </uInt8>
- </template>
-
- <template name="Done" id="99" xmlns="http://www.fixprotocol.org/ns/fast/td/1.1">
- <uInt16 name="MessageType" id="100">
- <constant value="99"></constant>
- </uInt16>
- </template>
-
- </templates>
FAST MESSAGE ENCODER
Next, we need to generate test data to send from our simulated exchange. We will use the QuickFAST Encoder to do that. We have created an encoder executable for the SimpleTemplates.xml
file called SimpleGenerator
. The source code is in the src/Generators
directory.
The basic steps to encode FAST messages with QuickFAST are in the following code snippet. The code in the src/Generators
directory is structured a bit differently; there is a layer of abstraction added to avoid code duplication between the Simple and Complex examples, and each example has a MessagePopulator
class that generates test messages containing simulated data. The simplified version is here:
- QuickFAST::Codecs::XMLTemplateParser parser;
- QuickFAST::Codecs::TemplateRegistryPtr templateRegistry =
- parser.parse("./data/SimpleTemplates.xml");
-
- QuickFAST::Codecs::Encoder encoder(templateRegistry);
-
- // For each message...
- QuickFAST::template_id_t templateId = <determine the message's template Id>
- QuickFAST::Messages::FieldSet message(20); // allocate space for 20 message fields
-
- // Destination for encoded message; this destination
- // encodes the message into a string
- QuickFAST::Codecs::DataDestinationString encodingDestination;
-
- encoder.encodeMessage(encodingDestination,
- templateId,
- message);
-
-
- // Repeat for each message
To generate FAST messages based on the SimpleTemplates.xml
template file, execute the SimpleGenerator
as follows:
./bin/SimpleGenerator -t ./data/SimpleTemplates.xml -o ./data/simple30000.dat -n 30000
where ./data/simple30000.dat
is the generated FAST data file, and 30000 is the number of FAST messages created in that data file. You can list the command-line arguments of the SimpleGenerator via
./bin/SimpleGenerator -?
By default, the SimpleGenerator puts no more than 1024 bytes in each message packet. That message size is configurable with the "-p"
command-line option. The purpose of the message packet is to indicate when a group of messages will be multicast in the same UDP multicast packet, which means that the Encoder's dictionary does not need to be reset until all of the packet is full. Since packets can be lost over UDP multicast, each new packet starts the decoder off with an empty dictionary. The dictionary plays a significant role in the FAST compression, so we do want to put as many messages into each packet as we can before we reset the dictionary.
To indicate the start and end of a packet, the SimpleGenerator writes the length of the packet as a header preceding each set of FAST messages. The Multicaster process below uses that header to determine how many bytes are contained in the packet. Aside from reading the packet length, the Multicaster does not perform any processing on the FAST messages; it merely removes the length header and forwards the raw bytes to the Publishing process. The message length header is simply used to frame each multicast packet (we have to know when one packet ends and another begins), but it does not come into play in the decoding of the message.
MULTICASTER
The Multicaster is a simple process that reads in our FAST-generated data file, line by line, and multicasts each line of the FAST file using Boost Asio. The Multicaster can be executed as follows. However, you will want to start the OpenDDS publisher and subscriber first (see below) so the multicast FAST messages actually have somewhere to go.
./bin/Multicaster -f ./data/simple30000.dat -a 224.1.2.133 -p 13014 -s 1000
The "-f ./data/simple30000.dat"
arguments indicate the source FAST data file, which is the data file we generated above. The "-a 224.1.2.133"
and "-p 13014"
indicate the multicast address and port, and are optional.
The "-s 1000"
argument is a number of microseconds to sleep between multicast sends. Since UDP multicast is unreliable, it's not difficult to throw enough messages at it to cause messages to be dropped. I have noticed dropped packets on data sets larger than 8000 messages. A small sleep slows the send down enough to enable all of the messages to reach their destinations successfully. In an exchange, a typical configuration is to multicast the same data feed on two multicast addresses and let the decoding side arbitrate between the two feeds to ensure that it does not miss any messages. Even with that setup, it's valuable to test using realistic data rates to ensure that the packet drop rate on each multicast feed is not problematic.
You can list the command-line arguments of the Multicaster via
./bin/Multicaster -?
The Multicaster code is in src/Multicaster/main.cpp
.
QUICKFAST DECODER AND OPENDDS PUBLISHER
The Decoder/Publisher process receives multicast FAST message packets, decodes them into QuickFAST messages, converts each decoded messages into a C++ struct that can be published by OpenDDS, and publishes to interested subscribers. The publisher's source code is insrc/Publishers/SimplePublisher.cpp
.
For the OpenDDS publish/subscribe portion of the application, we define an IDL file with OpenDDS data types that map to the FAST message types defined in the FAST template. That IDL file is located in idl/Simple.idl
. We show the "Quote" data type below as an example. You can see how the "Quote" data type's fields map to the FAST message fields from the FAST template above. There are analogous IDL structs for the "New", "Change", and "Remove" data types.
module MiddlewareNewsBrief
{
#pragma DCPS_DATA_TYPE "MiddlewareNewsBrief::Quote"
struct Quote
{
unsigned short MessageType;
unsigned long SequenceNumber;
unsigned long Timestamp;
unsigned long OrderId;
unsigned long Volume;
unsigned long Price;
unsigned short SecurityId;
char ExchangeId;
char SecurityType;
char SessionId;
};
}; // module MiddlewareNewsBrief
There are three parts to the Decoder/Publisher. First, it receives multicast FAST messages and decodes them. To do that, we must give the Decoder the same FAST template file as the encoding process so each side understands what to provide and what to expect. We also provide a MessageBuilder to the decoder to process the decoded message. The MessageBuilder is application-specific; our example's MessageBuilder is a C++ template-based builder that takes each FAST message field and uses it to populate a C++ struct for publication across OpenDDS.
The code below illustrates how to set up the QuickFAST decoder to decode FAST messages and communicate with a MessageBuilder. Our MessageBuilder's details have not yet been presented.
std::string multicastAddress = // get multicast address from the command-line
size_t multicastPort = // get multicast port from the command-line
QuickFAST::Codecs::XMLTemplateParser parser;
QuickFAST::Codecs::TemplateRegistryPtr templateRegistry =
parser.parse("./data/SimpleTemplates.xml");
// The multicast decoder listens on the multicast address:port
QuickFAST::Codecs::MulticastDecoder decoder(templateRegistry,
multicastAddress,
"0.0.0.0",
multicastPort);
// MessageBuilder created for this example; will give more detail later
MiddlewareNewsBrief::CompositeMessageBuilder<QuickFAST::uint16> builder("MessageType");
// Hand the MessageBuilder to the Decoder
decoder.start(builder);
// Run the MulticastDecoder in one separate thread
decoder.run(1,false);
// Example builder will receive a "Done" message when the
// FAST stream is completely received
while (!builder.isDone()) {
boost::this_thread::sleep(boost::posix_time::seconds(1));
}
decoder.stop();
decoder.joinThreads();
Our CompositeMessageBuilder
uses the "MessageType" field of each FAST message to determine what kind of C++ struct to create from each message, and forwards each message field to a specialized builder that populates a matching C++ struct. For example, a FAST "Quote" message causes a "MiddlwareNewsBrief::Quote" struct to be created and populated, as illustrated by the diagram. Our MessageBuilder then sends that C++ struct to an OpenDDS DataWriter that writes the struct across OpenDDS.
The following code shows how the Quote
message's fields are wired into the Quote
message builder and attached to the CompositeMessageBuilder
. The other message types are configured similarly.
// MessageBuilder created for this example; decides what type
// of C++ struct to populate based on the MessageType field
MiddlewareNewsBrief::CompositeMessageBuilder<QuickFAST::uint16>
builder("MessageType")
// Create a StructMessageBuilder for the Quote message type;
// The quoteDataWriter enables the builder to publish its Quote structs
// over OpenDDS; we will describe it later
MiddlewareNewsBrief::StructMessageBuilder<Quote>* quoteBuilder =
new MiddlewareNewsBrief::StructMessageBuilder<Quote>
(*(quoteDataWriter.get()));
boost::shared_ptr<MiddlewareNewsBrief::MessageBuilderBase>
quoteBuilderPtr(quoteBuilder);
// Add the Quote builder to the Composite; "11" is the value of the MessageType
// field for a FAST "Quote" message.
builder.addBuilder(11,quoteBuilderPtr);
// Also have StructMessageBuilders for New, Remove, Change messages
// Wire in all of the fields of the Quote struct, mapping each FAST field
// to a struct field
quoteBuilder->addFieldPopulator(
"MessageType",
100,
&MiddlewareNewsBrief::Quote::MessageType,
MiddlewareNewsBrief::UINT16());
quoteBuilder->addFieldPopulator(
"SequenceNumber",
200,
&MiddlewareNewsBrief::Quote::SequenceNumber,
MiddlewareNewsBrief::UINT32());
quoteBuilder->addFieldPopulator(
"Timestamp",
300,
&MiddlewareNewsBrief::Quote::Timestamp,
MiddlewareNewsBrief::UINT32());
quoteBuilder->addFieldPopulator(
"OrderId",
400,
&MiddlewareNewsBrief::Quote::OrderId,
MiddlewareNewsBrief::UINT32());
quoteBuilder->addFieldPopulator(
"Volume",
500,
&MiddlewareNewsBrief::Quote::Volume,
MiddlewareNewsBrief::UINT32());
quoteBuilder->addFieldPopulator(
"Price",
600,
&MiddlewareNewsBrief::Quote::Price,
MiddlewareNewsBrief::UINT32());
quoteBuilder->addFieldPopulator(
"SecurityId",
800,
&MiddlewareNewsBrief::Quote::SecurityId,
MiddlewareNewsBrief::UINT16());
quoteBuilder->addFieldPopulator(
"SecurityType",
900,
&MiddlewareNewsBrief::Quote::SecurityType,
MiddlewareNewsBrief::UINT8());
quoteBuilder->addFieldPopulator(
"SessionId",
1000,
&MiddlewareNewsBrief::Quote::SessionId,
MiddlewareNewsBrief::UINT8());
The final configuration step of the Decoder/Publisher is the OpenDDS initialization and configuration. For each FAST message received, we publish a C++ struct over OpenDDS to any interested subscriber. The subscriber then echoes the C++ struct sample back to the original publishing process for accurate performance measurement. When we run the example, we will use the UDP unicast transport to avoid TCP backpressure issues which affect the latency measurement numbers. Note that this is where we define the quotePublisher
object that we passed to the StructMessageBuilder
above.
const int DOMAIN_ID = 8675309;
// Initialize, and create a DomainParticipantFactory
DDS::DomainParticipantFactory_var factory =
TheParticipantFactoryWithArgs(argc, argv);
// Create the DomainParticipant
DDS::DomainParticipant_var participant =
factory->create_participant(DOMAIN_ID,
PARTICIPANT_QOS_DEFAULT,
0,
0);
if (participant == 0)
{
std::cerr << "create_participant failed." << std::endl;
return -1;
}
// Create a publisher for the topics
const int TRANSPORT_IMPL_ID = 1;
DDS::Publisher_var publisher =
MiddlewareNewsBrief::DDSUtil::create_publisher(participant.in(),
TRANSPORT_IMPL_ID,
"Primary");
// Create a subscriber for the Echo of the two topics
const int TRANSPORT_IMPL_ID_2 = 2;
DDS::Subscriber_var subscriber =
MiddlewareNewsBrief::DDSUtil::create_subscriber(participant.in(),
TRANSPORT_IMPL_ID_2,
"Secondary");
// Initialize the DoneToken manager, which publishes a "done" token
MiddlewareNewsBrief::DoneTokenManager doneToken;
doneToken.initWriter(participant,publisher,subscriber);
//
// DataWriters, one per message type
//
typedef MiddlewareNewsBrief::Quote Quote;
boost::shared_ptr<MiddlewareNewsBrief::DataWriter<Quote> >
quoteDataWriter(new MiddlewareNewsBrief::DataWriter<Quote>());
quoteDataWriter->init(participant,
publisher,
subscriber,
&MiddlewareNewsBrief::Quote::Timestamp);
// Similar code for New, Change, Remove messages
OPENDDS SUBSCRIBER
The OpenDDS subscribing process subscribes to the four topics published by the OpenDDS publishing process. One topic publishes all Quote messages, one topic publishes all New messages, one publishes all Change messages, and one publishes all Remove messages. It is not necessary to map one topic to one DDS type, but that is what we have chosen to do in this simple example. Later, we will look at a more complex example where we use a topic to represent each security being published. The diagram illustrates how OpenDDS disseminates data on multiple topics:
The SimpleSubscriber code is located in src/Subscribers/SimpleSubscriber.cpp
. The core of the SimpleSubscriber code is as follows:
const int DOMAIN_ID = 8675309;
//
// OpenDDS Init
//
// Initialize, and create a DomainParticipantFactory
DDS::DomainParticipantFactory_var factory =
TheParticipantFactoryWithArgs(argc, argv);
// Create the DomainParticipant
DDS::DomainParticipant_var participant =
factory->create_participant(DOMAIN_ID,
PARTICIPANT_QOS_DEFAULT,
0,
0);
if (participant == 0)
{
std::cerr << "create_participant failed." << std::endl;
return -1;
}
// Create a subscriber for the topics
const int TRANSPORT_IMPL_ID = 1;
DDS::Subscriber_var subscriber =
MiddlewareNewsBrief::DDSUtil::create_subscriber(participant.in(),
TRANSPORT_IMPL_ID,
"Primary");
// Create a publisher for the echo of the topics
const int TRANSPORT_IMPL_ID_2= 2;
DDS::Publisher_var publisher =
MiddlewareNewsBrief::DDSUtil::create_publisher(participant.in(),
TRANSPORT_IMPL_ID_2,
"Secondary");
// Initialize the DoneToken manager, which publishes a "done" token
MiddlewareNewsBrief::DoneTokenManager doneToken;
doneToken.initReader(participant,publisher,subscriber);
//
// DataReaders, one per message type
//
MiddlewareNewsBrief::DataReader<MiddlewareNewsBrief::Quote> quoteDR;
quoteDR.init(participant,publisher,subscriber,
&MiddlewareNewsBrief::Quote::Timestamp);
MiddlewareNewsBrief::DataReader<MiddlewareNewsBrief::New> newDR;
newDR.init(participant,publisher,subscriber,
&MiddlewareNewsBrief::New::Timestamp);
MiddlewareNewsBrief::DataReader<MiddlewareNewsBrief::Change> changeDR;
changeDR.init(participant,publisher,subscriber,
&MiddlewareNewsBrief::Change::Timestamp);
MiddlewareNewsBrief::DataReader<MiddlewareNewsBrief::Remove> removeDR;
removeDR.init(participant,publisher,subscriber,
&MiddlewareNewsBrief::Remove::Timestamp);
std::cout << "Finished subscribing; entering event loop" << std::endl;
//
// Process events
//
do
{
boost::this_thread::sleep(boost::posix_time::seconds(1));
}
while(!doneToken.isDone());
RUNNING THE EXAMPLE
We run the example in four steps. First, we start an OpenDDS DCPSInfoRepo repository process. This process connects DataWriters and DataReaders together, but does not come into play on each write of a data sample:
export INFOREPO_HOST=
./bin/run_inforepo
Next, we start a subscribing process:
export INFOREPO_HOST=
./bin/run_dcps_exe SimpleSubscriber -tr udp
The run_dcps_exe
script is a utility script that configures several OpenDDS command-line arguments. The "-tr udp"
arguments tell that script to use the OpenDDS UDP unicast transport. Other valid values are "tcp" and "mcast".
As with all of the example processes, passing an "-?" argument on the command line prints the usage message.
Next, we start the decoding/publishing process:
export INFOREPO_HOST=
./bin/run_dcps_exe SimplePublisher -t ./data/SimpleTemplates.xml -tr udp
The "-t ./data/SimpleTemplates.xml" arguments provide the FAST template file for the QuickFAST decoder. Both the QuickFAST encoder and decoder must have the same FAST template file. The SimplePublisher, by default, listens on the multicast address and port of 224.1.2.133:13014
.
Finally, we start a multicasting process to send encoded FAST messages to the SimplePublisher. Recall that we generated a FAST data file above with the SimpleGenerator. We will read that file now to send FAST message packets to the SimplePublisher. The Multicaster multicasts on address:port of224.1.2.133:13014
by default.
./bin/Multicaster -f ./data/simple30000.dat -s 1000
The SimplePublisher process should quickly print the message
Writing Done token
which indicates that all FAST messages have been received from the Multicaster, decoded, converted to C++ structs, and published through OpenDDS. The subscriber then echoes each sample back to the publisher, but the subscriber sleeps a bit between each echoed write so the measurement of round trip latency is not CPU bound. When the example completes, the publisher's output should look like this:
Elapsed time from first start time to last end time: 2849393
name count sum mean std_dev recursions rsum rmean rstd_dev
Echo::on_data_availa() 7500 577690 77.025 15.790 0
Echo::on_data_availa() 7500 595248 79.366 14.779 0
Echo::on_data_availa() 7500 619719 82.629 15.468 0
Echo::on_data_availa() 7500 629871 83.983 16.630 0
::endMessage() 7500 15594 2.079 0.281 0
::endMessage() 7500 16423 2.190 0.398 0
::endMessage() 7500 16273 2.170 0.382 0
::endMessage() 7500 16370 2.183 0.457 0
decoder.decode(source, builder) 1 440779
As mentioned above, this example was run across two SGI Intel Xeon W5590 3.33GHz systems running SUSE Enterprise Linux Server 11. These results indicate that 30,000 FAST messages (7500 each of Quote, New, Change, Remove) were decoded and published across OpenDDS in 440,779 microseconds. The "*endMessage()
" entries show the time to both decode a QuickFAST message and convert it to a struct for publication across OpenDDS; those average about 2.1 microseconds. The "*on_data_avail()
" entries show the round-trip latency to publish a message across OpenDDS and then echo the sample back to the publisher. We divide those round-trip numbers in half to get a measurement for OpenDDS latency, which is approximately 40 microseconds.
There are also simpler ways to run the publisher and subscriber. First, you can read the FAST data file directly into the publisher, skipping the Multicaster step:
./bin/run_dcps_exe SimpleSubscriber -tr udp
./bin/run_dcps_exe SimplePublisher -t ./data/SimpleTemplates.xml -f ./data/simple30000.dat -tr udp
Also, you may use the "-noecho" argument to disable the echo of the sample back to the publisher. The example runs a lot faster, but the latency measurement will not be accurate if the two processes are on different hosts:
./bin/run_dcps_exe SimpleSubscriber -tr udp -noecho
./bin/run_dcps_exe SimplePublisher -t ./data/SimpleTemplates.xml -f ./data/simple30000.dat -tr udp -noecho
Finally, you can measure the performance of the QuickFAST decoding all by itself by using an empty MessageBuilder with the SimplePublisher:
./bin/run_dcps_exe SimplePublisher -t ./data/SimpleTemplates.xml -f ./data/simple30000.dat -tr udp -emptyBuilder
We have observed QuickFAST decoding times under 500 nanoseconds for ARCA-sized data:
name count sum mean std_dev recursions rsum rmean rstd_dev
EmptyBuilder::endMessage 30001 13455 0.448 0.506 0
decoder.decode(source, builder) 1 18006
SECOND EXAMPLE - "COMPLEX" MESSAGES
In our second example, we will simulate a market data feed using more complex Fix-based data with many more fields and repeating groups. We will call these the "Complex" examples. Our FAST template file consists of two message types: a QuoteRequest and a MarketData.
This example differs from the "Simple" example in two key ways. First, the data is more complex, which means it will take longer to decode each FAST message, convert it into a C++ struct, and publish it across OpenDDS. Second, and more importantly, we will map the FAST messages to OpenDDS topics differently. The ComplexGenerator that we will use to generate test data generates data for 100 securities in its default setting. Each of our FAST messages contains data for several different securities in its MDEntries group. In our processing, we will split those FAST messages up on a per-security basis, and publish data for each security on a different OpenDDS topic. So, we will have 100 OpenDDS MarketData topics and 100 OpenDDS QuoteRequest topics, and we will split up the FAST messages to publish each security's information on the correct topic.
For illustration, the MarketData message is below, from the file data/ComplexTemplates.xml
:
- <?xml version="1.0" encoding="UTF-8"?>
- <templates xmlns="http://www.fixprotocol.org/ns/fast/td/1.1">
-
- <template name="MarketData" id="1" reset="Y" dictionary="1"
- xmlns="http://www.fixprotocol.org/ns/fast/td/1.1">
- <string name="ApplVerID" id="1128">
- <constant value="1.0"></constant>
- </string>
- <string name="MessageType" id="35">
- <constant value="X"></constant>
- </string>
- <string name="SenderCompID" id="49">
- <constant value="Test Exchange"></constant>
- </string>
- <uInt32 name="MsgSeqNum" id="34"></uInt32>
- <uInt32 name="SendingTime" id="52"></uInt32>
- <uInt32 name="TradeDate" id="75"></uInt32>
- <sequence name="MDEntries">
- <length name="NoMDEntries" id="268"></length>
- <uInt32 name="MDUpdateAction" id="279">
- <copy value="1"></copy>
- </uInt32>
- <uInt32 name="MDPriceLevel" id="1023" presence="optional">
- <default value="1"></default>
- </uInt32>
- <string name="MDEntryType" id="269">
- <copy value="0"></copy>
- </string>
- <uInt32 name="OpenCloseSettleFlag" id="286" presence="optional">
- </uInt32>
- <uInt32 name="SecurityIDSource" id="22">
- <constant value="9"></constant>
- </uInt32>
- <uInt32 name="SecurityID" id="48">
- <copy></copy>
- </uInt32>
- <uInt32 name="RptSeq" id="83">
- <increment></increment>
- </uInt32>
- <decimal name="MDEntryPx" id="270">
- <exponent>
- <default value="0"></default>
- </exponent>
- <mantissa>
- <delta></delta>
- </mantissa>
- </decimal>
- <uInt32 name="MDEntryTime" id="273">
- <copy></copy>
- </uInt32>
- <int32 name="MDEntrySize" id="271" presence="optional">
- <delta></delta>
- </int32>
- <uInt32 name="NumberOfOrders" id="346" presence="optional">
- <delta></delta>
- </uInt32>
- <string name="TradingSessionID" id="336" presence="optional">
- <default value="2"></default>
- </string>
- <decimal name="NetChgPrevDay" id="451" presence="optional">
- <exponent>
- <default></default>
- </exponent>
- <mantissa>
- <delta></delta>
- </mantissa>
- </decimal>
- <uInt32 name="TradeVolume" id="1020" presence="optional">
- <default></default>
- </uInt32>
- <string name="TradeCondition" id="277" presence="optional">
- <default></default>
- </string>
- <string name="TickDirection" id="274" presence="optional">
- <default></default>
- </string>
- <string name="QuoteCondition" id="276" presence="optional">
- <default></default>
- </string>
- <uInt32 name="AggressorSide" id="5797" presence="optional">
- <default></default>
- </uInt32>
- <string name="MatchEventIndicator" id="5799" presence="optional">
- <default value="1"></default>
- </string>
- </sequence>
- </template>
The corresponding IDL for this FAST message is in the file idl/Complex.idl
, as follows. You can see how each FAST field maps to a field in the IDL structs, and how the MarketDataEntry repeating group maps to the IDL MarketDataEntries
sequence:
module MiddlewareNewsBrief
{
typedef sequence<octet> Octets;
#pragma DCPS_DATA_TYPE "MiddlewareNewsBrief::MarketDataEntry"
#pragma DCPS_DATA_KEY "MiddlewareNewsBrief::MarketDataEntry securityID"
struct MarketDataEntry
{
unsigned long mdUpdateAction;
unsigned long mdPriceLevel;
string mdEntryType;
unsigned long openCloseSettleFlag;
unsigned long securityIDSource;
unsigned long securityID;
unsigned long rptSeq;
double mdEntryPx;
unsigned long mdEntryTime; // timestamp
long mdEntrySize;
unsigned long numberOfOrders;
string tradingSessionID;
double netChgPrevDay;
unsigned long tradeVolume;
string tradeCondition;
string tickDirection;
string quoteCondition;
unsigned long aggressorSide;
string matchEventIndicator;
};
typedef sequence<MarketDataEntry> MarketDataEntries;
#pragma DCPS_DATA_TYPE "MiddlewareNewsBrief::MarketData"
#pragma DCPS_DATA_KEY "MiddlewareNewsBrief::MarketData securityID"
struct MarketData
{
unsigned long securityID;
string applVersionID;
string messageType;
string senderCompID;
unsigned long msgSeqNum;
unsigned long sendingTime; // timestamp
unsigned long tradeDate;
MarketDataEntries mdEntries;
};
};
The OpenDDS publisher-side code is in src/Publishers/ComplexPublisher.cpp
; the subscriber-side code is in src/ComplexSubscriber.cpp
.
Again, we need to generate FAST messages based on the ComplexTemplates.xml
template file. We execute the ComplexGenerator as follows:
./bin/ComplexGenerator -t ./data/ComplexTemplates.xml -o ./data/complex30000.dat -n 30000
where ./data/complex30000.dat
is the generated FAST data file, and 30000 is the number of FAST messages created in that data file. By default, the ComplexGenerator generates FAST messages with an average MarketDataEntries sequence length of 3, and where many messages have more than one security in the MDEntries sequence.
We can take control over that behavior in a couple of ways. First, we can change the average sequence length via
./bin/ComplexGenerator -seq ...
We can also decide that each FAST message will only have information about one security in its MarketDataEntries sequence:
./bin/ComplexGenerator -nosplit ...
Both of these will affect the performance of the decoder and the latency of the OpenDDS publishing because larger messages take longer to decode and write, and using multiple securities in a FAST message maps the message to multiple OpenDDS writes instead of one in our example.
As always, you can find out all of the supported command-line arguments via
./bin/ComplexGenerator -?
We run the example in the same four steps. First, we start an OpenDDS DCPSInfoRepo repository process. This process connects DataWriters and DataReaders together, but does not come into play on each write of a data sample:
export INFOREPO_HOST=
./bin/run_inforepo
Next, we start a subscribing process using the UDP transport:
export INFOREPO_HOST=
./bin/run_dcps_exe ComplexSubscriber -tr udp
Next, we start the decoding/publishing process, also using the UDP transport:
export INFOREPO_HOST=
./bin/run_dcps_exe ComplexPublisher -t ./data/ComplexTemplates.xml -tr udp
The ComplexPublisher, by default, listens on the multicast address and port of 224.1.2.133:13014
.
Finally, we start a multicasting process to send encoded FAST messages to the ComplexPublisher. Recall that the Multicaster multicasts on address:port of 224.1.2.133:13014
by default.
./bin/Multicaster -f ./data/complex30000.dat -s 1000
The ComplexPublisher process should quickly print the message
Writing Done token
which indicates that all FAST messages have been received from the Multicaster. The publisher converts each FAST message into several DDS samples and publishes them to the subscriber. The subscriber then echoes each sample back to the publisher, but the subscriber sleeps a bit between each echoed write so the measurement of round trip latency is not CPU bound. When the example completes, the publisher's output should look like this:
Elapsed time from first start time to last end time: 4496354
name count sum mean std_dev recursions rsum rmean rstd_dev
Echo::on_data_availa() 300 36772 122.573 25.739 0
Echo::on_data_availa() 41700 5419738 129.970 32.030 0
::endMessage() 41700 982462 23.560 11.711 0
::endMessage() 300 1755 5.850 1.768 0
decoder.decode(source, builder) 1 1220888
This example was also run across two SGI Intel Xeon W5590 3.33GHz systems. These results indicate that 30,000 FAST messages were decoded into 42,000 C++ structs and published across OpenDDS as 42,000 writes in 1,220,888 microseconds. The "*endMessage"
entries show the time to both decode a QuickFAST message and convert it to a each struct for publication across OpenDDS; as you can see, handling the MarketDataRefresh structs is more time consuming due to the one-to-many mapping of FAST messages to structs. The "*on_data_avail()"
entries show the round-trip latency to publish each struct across OpenDDS and then echo the sample back to the publisher. We divide those round-trip numbers in half to get a measurement for OpenDDS latency, which is approximately 65 microseconds.
As before, you can measure the performance of the QuickFAST decoding all by itself by using an empty MessageBuilder with the SimplePublisher:
./bin/run_dcps_exe ComplexPublisher -t ./data/ComplexTemplates.xml -f ./data/complex30000.dat -tr udp -emptyBuilder
SUMMARY
We have demonstrated that open source products such as QuickFAST and OpenDDS can be used to decode and disseminate market data quickly and efficiently. We have measured the behavior and performance of QuickFAST and OpenDDS for a low-latency market data feed, and have shown QuickFAST decoding latencies under 500 nanoseconds and OpenDDS publishing latencies as low as 40 microseconds running on high-quality commodity hardware across UDP over Infiniband. This suggests that opportunities for lowering latency exist in using native Infiniband communication across the network. OpenDDS has a pluggable transport framework designed for such a performance tuning situation.
Not only does the combination of QuickFAST and OpenDDS perform well, but it does so at a significant cost savings to proprietary software, and gives the user a level of control and visibility into the code that is not available from proprietary software. Savings realized from the use of open source software can be applied to other areas such as better hardware. QuickFAST and OpenDDS are professionally developed and commercially supported by OCI, a full-service software engineering, open source product and training company.
REFERENCES
- QuickFAST
http://code.google.com/p/quickfast/ - OpenDDS
http://www.opendds.org/ - Introduction to OpenDDS
http://www.opendds.org/Article-Intro.html - FAST specification
http://www.fixprotocol.org/fast - OMG Data Distribution Service for Real-Time Systems specification
http://www.omg.org/spec/DDS/1.2/PDF/ - SGI
http://www.sgi.com/