Introduction to OpenDDS

Middleware News Brief (MNB) features news and technical information about Open Source middleware technologies.

Introduction

Distributed real-time applications are sometimes more data-centric than service-centric, meaning that the primary objective of the participants in the distributed system is the dissemination of application data rather than access to shared services. The set of suppliers and/or consumers of application data may not be known at design time, and may change through the execution lifetime of the application. Typically, the data-centric paradigm is most efficiently realized by a publish/subscribe communication model rather than a request/response model.

The OMG Data Distribution Service (DDS) for Real-Time Systems addresses the performance requirements and hard real-time requirements of distributed data-centric applications.

DDS increases the range of publish/subscribe options available to developers of distributed real-time systems. For convenience, the DDS interfaces are defined using OMG Interface Definition Language (IDL). However, most details are left to the implementation, most significantly how data transfer occurs between the publishers and subscribers.

A DDS implementer decides on the underlying communication mechanism that moves data from a publisher to a subscriber, via TCP, UDP, UDP multicast, shared memory, etc. An implementation of the DDS specification is not required to use CORBA or the IIOP protocol to transfer data from a publisher to a subscriber.

OpenDDS is an open source, C++ implementation of the OMG Data Distribution Service specification. OpenDDS includes a file-based configuration mechanism. Through a configuration file, an OpenDDS user may configure a publisher's or subscriber's transport(s), debugging output, memory allocation, the location of the DCPSInfoRepo broker process, and many other settings.

The complete set of configuration settings is described in the Configuration Chapter of the OpenDDS Developer's Guide.

In this article, you'll get an introduction to OpenDDS through the following topics:

  1. The OpenDDS Implementation of OMG DDS
  2. DDS Architecture
  3. Stock Quoter Example
    • IDL Types
    • Publisher
    • Subscriber
    • Subscriber's Listeners
    • Building the Publisher and Subscriber
    • Configuring the Stock Quoter
    • Running the Stock Quoter over a TCP Transport
    • Running the Stock Quoter over a UDP Transport

THE OPENDDS IMPLEMENTATION OF OMG DDS

OpenDDS leverages a pluggable transport architecture, enabling data delivery through transport and marshaling implementations of the application developer's choosing. Conceptually, the architecture borrows from TAO's Pluggable Protocols Framework. OpenDDS currently supports TCP and UDP point-to-point transports as well as unreliable and reliable multicast, and uses a high-performance marshaling implementation.

This pluggable transport architecture permits a DDS user to optimize a DDS installation based on the desired transport and the homogeneous or heterogeneous nature of the application's deployment. These choices can be made without affecting the application code itself.

Figure 1. InfoRepo

Figure 1. InfoRepo

Marshaling code is generated by a specialized OpenDDS IDL compiler. A single, separate DCPS Information Repository (DCPSInfoRepo) process acts as a central clearinghouse, associating publishers and subscribers. Under the covers, OpenDDS uses CORBA to communicate with the DCPSInfoRepo process to associate publishers and subscribers. Data transfer between publishers and subscribers is direct between the publishing and subscribing processes. OpenDDS creates its own threads for the RB and for the non-CORBA I/O that takes place when sending or receiving DDS data.

DDS ARCHITECTURE

The OMG Data Distribution Service specification separates DDS into two separate architectural layers. The lower layer is the Data-Centric Publish and Subscribe (DCPS) layer, containing type-safe interfaces to a publish/subscribe communication mechanism. The upper layer is the Data Local Reconstruction Layer (DLRL), which enables an application developer to construct a local object model on top of the DCPS layer, shielding the application from DCPS knowledge. Each layer has its own set of concepts and usage patterns, and thus the concepts and terminology of the two layers can be discussed separately.

DATA-CENTRIC PUBLISH AND SUBSCRIBE - DCPS

The DCPS layer is responsible for efficiently disseminating data from publishers to interested subscribers. It is implemented using the concepts of publisher and data writer on the sending side and subscriber and data reader on the receiving side. The DCPS layer consists of one or more data domains, each of which contains a set of participants (publishers and subscribers) that communicate via DDS. Each entity (i.e., publisher or subscriber) belongs to a domain. Each process has one domain participant for each data domain of which it is a member.

Within any data domain, data is identified by a topic, which is a type-specific domain segment that allows publishers and subscribers to refer to data unambiguously. Within a domain, a topic associates a unique topic name, data type, and a set of Quality of Service (QoS) policies with the data itself. Each topic is associated with only one data type, although many different topics can publish the same data type. Behavior of publishers is determined by the QoS policies associated with the publisher, data writer, and topic elements for a particular data source. Likewise, behavior of subscribers is determined by the QoS policies associated with the subscriber, data reader, and topic elements for a particular data sink.

Figure 2. Domain

Figure 2. Domain

For more information on DCPS terminology, please see the OpenDDS Developer's Guide.

The DDS specification defines a number of Quality of Service (QoS) policies that applications use to specify their reliability, resource usage, fault tolerance, and other requirements to the service. Participants specify the behavior that they require from the service; the service decides how to achieve these behaviors. These policies may be applied to the various DCPS entities (Topic, Data Writer, Data Reader, Publisher, Subscriber, and Domain Participant) although not all policies are valid for all types of entities.

Subscribers and publishers collaborate to specify QoS policies through an offer-request paradigm. A publisher offers a set of QoS policies to all subscribers; a subscriber requests the set of QoS policies that it requires. The DDS implementation then attempts to match the requested policies with the offered policies. If the policies are consistent, then the publication and the subscription are matched.

OpenDDS supports the full set of DCPS Quality-of-Service (QoS) Policies, including:

QoS PolicyDescription
Liveliness Controls liveliness checks to make sure expected entities in the system are still alive
Reliability Determines whether the service is allowed to drop samples
History Controls what happens to an instance whose value changes before it is communicated to all Subscribers
Resource Limits Controls resources that the service can use to meet other QoS requirements

See the Object Management Group's Introduction to DDS Whitepaper, Appendix A, for a more complete list of Quality-of-Service policies and more detailed Quality-of-Service definitions.

DATA-LOCAL RECONSTRUCTION LAYER - DLRL

The Data-Local Reconstruction Layer (DLRL) is an object-oriented layer on top of DCPS. A DLRL object is a native-language (i.e. C++) object with one or more shared attributes. Each DLRL class is mapped to one or more DCPS topics; each shared attribute value is mapped to a field in a topic's data type, and its value is distributed across the application via DCPS. A DLRL participant communicates data to the rest of the application by modifying a DLRL object, resulting in the the publication of a data sample on the associated topic. A DLRL shared attribute may be a simple value or structure, a reference to another DLRL object, or a collection (list, map) of those. DLRL supports complex object graphs and complex relationships between DLRL objects.

The developer is responsible for deciding how DCPS entities are mapped to DLRL objects. The model is specified in OMG Interface Definition Language (IDL) using IDL valuetypes. The mapping is conceptually similar to an object-relational database mapping, which maps an object model onto relational database tables. We think of each DCPS topic as analogous to a relational database table, and each sample as a row in that table. The DDS specification has a default mapping from DCPS to DLRL. Or, a developer can choose to specify his own custom mapping via an XML mapping file.

OpenDDS does not currently implement the DLRL.

OPENDDS STOCK QUOTER EXAMPLE

Our example illustrates publication of and subscription to data samples though the DDS DCPS layer. The example contains two DCPS topics, both related to the stock market.

A stock quote publisher publishes stock quote samples to interested subscribers; each quote contains the ticker symbol of a security, its value, and a time stamp. Quotes are published periodically throughout the trading day, as buy and sell transactions affect the underlying value of the security. In addition, a stock exchange event publisher publishes important events relating to a stock exchange, namely when the exchange opens, closes, and when trading is suspended or resumed.

Our subscriber subscribes to both stock quotes and stock exchange events. The subscriber prints the ticker symbol and value of each quote it sees. When the subscriber receives an event indicating that the stock exchange has closed for the day, it gracefully shuts down. Thus, the receipt of a "closed" stock exchange event is the subscriber's signal to stop expecting stock quote samples.

We will demonstrate how to use the same publisher and subscriber code to communicate via the TCP and UDP transports. The transport configuration is isolated in a set of configuration files, allowing us to switch transports without making any code changes.

IDL TYPES

First, we define our published DDS data types in IDL:

  1. #include "orbsvcs/TimeBase.idl"
  2. module StockQuoter
  3. {
  4. #pragma DCPS_DATA_TYPE "StockQuoter::Quote"
  5. #pragma DCPS_DATA_KEY "StockQuoter::Quote ticker"
  6. struct Quote {
  7. string ticker;
  8. string exchange;
  9. string full_name;
  10. double value;
  11. TimeBase::TimeT timestamp;
  12. };
  13.  
  14. #pragma DCPS_DATA_TYPE "StockQuoter::ExchangeEvent"
  15. #pragma DCPS_DATA_KEY "StockQuoter::ExchangeEvent exchange"
  16.  
  17. enum ExchangeEventType { TRADING_OPENED,
  18. TRADING_CLOSED,
  19. TRADING_SUSPENDED,
  20. TRADING_RESUMED };
  21. struct ExchangeEvent {
  22. string exchange;
  23. ExchangeEventType event;
  24. TimeBase::TimeT timestamp;
  25. };
  26. };

We publish two data types: a Quote type for each stock quote, and an ExchangeEvent type to indicate when the stock exchange is opened, closed, and when trading is suspended or resumed. The DCPS_DATA_TYPE pragma marks a type for use with DDS. The DCPS_DATA_KEY defined for each type is a unique identifier for each instance of the data type. Our Quote type's key is the ticker symbol of the stock. Throughout the day, we would expect to publish many values, or samples, for each ticker symbol. The set of published samples for each ticker symbol belongs to the same instance. In our example, we'll publish two ticker symbols, and thus two instances: SPY (S&P Depository Receipts, i.e. the S&P 500) and MDY (S&P Midcap Depository Receipts, i.e. the S&P Midcap 400).

Next, we compile the IDL with OpenDDS's opendds_idl compiler to generate type support code. The type support code consists of generated DCPS data writer and data reader C++ classes and additional IDL code. DDS uses type-safe interfaces for publication and subscription. Type-safe interfaces have several advantages: first, programming errors are more likely to be caught at compile time; second, generated marshaling code can be made very efficient when the marshaled data type is known at compile time; and third, we can avoid the use of inefficient types such as the CORBA any in data transfer.

The command to generate type support code for the Stock Quoter's IDL types is as follows:

  $DDS_ROOT/bin/opendds_idl StockQuoter.idl

This command generates the following files:

[CONTENT MISSING]

However, we don't need to run the opendds_idl compiler manually. Later, we'll use a Make Project Creator (MPC) project to automate the build steps for us.

Next, we use TAO's IDL compiler to compile all three IDL files -- the StockQuoter.idl file we wrote manually, plus the type support file generated by opendds_idl.

  tao_idl -I$DDS_ROOT -I$TAO_ROOT/orbsvcs StockQuoter.idl
  tao_idl -I$DDS_ROOT -I$TAO_ROOT/orbsvcs StockQuoterTypeSupport.idl

PUBLISHER

Next, we write a publisher to publish stock quotes and stock exchange events via DDS. First, we include the two type support header files generated by the opendds_idl compiler.

  1. #include "StockQuoterTypeSupportImpl.h"

We also include DCPS publisher, service participant, and QoS header files.

  1. #include "dds/DCPS/Service_Participant.h"
  2. #include "dds/DCPS/Marked_Default_Qos.h"
  3. #include "dds/DCPS/PublisherImpl.h"
  4. #include "ace/streams.h"
  5. #include "orbsvcs/Time_Utilities.h"

The following constants are used for our domain, type names and topic names. Each type is published on a separate topic. The subscriber must use the same values for its domain, type names and topic names.

  1. // constants for Stock Quoter domain Id, types, and topic
  2. DDS::DomainId_t QUOTER_DOMAIN_ID = 1066;
  3. const char* QUOTER_QUOTE_TYPE = "Quote Type";
  4. const char* QUOTER_QUOTE_TOPIC = "Stock Quotes";
  5. const char* QUOTER_EXCHANGE_EVENT_TYPE =
  6. "Exchange Event Type";
  7. const char* QUOTER_EXCHANGE_EVENT_TOPIC =
  8. "Stock Exchange Events";

When a stock exchange event - i.e. opened, closed, suspended, or resumed -- is published, we also publish the name of the stock exchange to which the event applies.

  1. const char* STOCK_EXCHANGE_NAME = "Test Stock Exchange";

This is a simple helper method to get the current date and time.

TimeBase::TimeT get_timestamp() 
    {
      TimeBase::TimeT retval;
      ACE_hrtime_t t = ACE_OS::gethrtime ();
      ORBSVCS_Time::hrtime_to_TimeT (retval, t );
      return retval;
    }

The remainder of the publisher's source code file contains its main(). We enter the publisher's main()

 int main (int argc, char *argv[]) 
    {
      DDS::DomainParticipantFactory_var dpf = 
        DDS::DomainParticipantFactory::_nil();
 
      DDS::DomainParticipant_var participant = 
        DDS::DomainParticipant::_nil();
 
      try 
      {

First, we create a domain participant. A DDS publisher may publish on many independent domains, but our example only publishes on one domain. We use the TheDomainParticipantFactoryWithArgs macro to pass command-line arguments into DCPS and get the singleton domain participant factory. We create one domain participant, for the "Quote" domain, using the default Quality-of-Service policies for a domain participant. The value of QUOTER_DOMAIN_ID passed into the factory must be identical in the publisher and the subscriber.

  // Initialize, and create a DomainParticipant
 
        dpf = TheParticipantFactoryWithArgs(argc, argv);
 
        participant = dpf->create_participant(
          QUOTER_DOMAIN_ID,
          PARTICIPANT_QOS_DEFAULT,
          DDS::DomainParticipantListener::_nil());
 
        if (CORBA::is_nil (participant.in ())) 
        {
          cerr << "create_participant failed." << endl;
          ACE_OS::exit(1);
        }

Then, we create a publisher through the domain participant with default Quality-of-Service values. We can attach a PublisherListener that is called by DCPS when certain publication-related events happen. However, we don't care about those events, so we attach a nil listener.

        // Create a publisher for the two topics
        // (PUBLISHER_QOS_DEFAULT is defined in
        // Marked_Default_Qos.h)
        DDS::Publisher_var pub =
          participant->create_publisher(
            PUBLISHER_QOS_DEFAULT,
            DDS::PublisherListener::_nil());
 
        if (CORBA::is_nil (pub.in ())) 
        {
          cerr << "create_publisher failed." << endl;
          ACE_OS::exit(1);
        }
 

There are three steps involved in publishing through DCPS. First, we register each type for the published data samples. Our example publishes samples of two IDL types, Quote and ExchangeEvent. Second, we create one or more topics upon which we publish. Each topic can only be bound to one type; thus we create a topic for each of our two types. Third, we create a data writer for each topic, and publish samples through the data writer.

We first register the IDL Quote type with the domain participant, passing an instance of the generated QuoteTypeSupportImpl class for the Quote type. The name that we use for the Quote type, which is stored in the constant value QUOTER_QUOTE_TYPE, must match the name used on the subscriber. When we create a topic, we specify this type name, enabling DCPS to later create the appropriate type of data writer for the topic.

   // Register the Quote type
        StockQuoter::QuoteTypeSupport_var quote_servant 
          = new StockQuoter::QuoteTypeSupportImpl();
 
        if (DDS::RETCODE_OK != 
              quote_servant->register_type(participant.in (),
                                           QUOTER_QUOTE_TYPE)) 
                 {
          cerr << "register_type for " << QUOTER_QUOTE_TYPE 
               << " failed." << endl;
          ACE_OS::exit(1);
        }

We then register the IDL ExchangeEvent type with the domain participant in the same way, using the generated ExchangeEventTypeSupportImpl class. Our DCPS domain participant is able to publish on topics for Quote or ExchangeEvent types.

// Register the ExchangeEvent type
        StockQuoter::ExchangeEventTypeSupport_var exchange_evt_servant 
          = new StockQuoter::ExchangeEventTypeSupportImpl();
 
        if (DDS::RETCODE_OK != 
              exchange_evt_servant->register_type(
                participant.in (),
                QUOTER_EXCHANGE_EVENT_TYPE)) 
        {
          cerr << "register_type for " 
               << QUOTER_EXCHANGE_EVENT_TYPE 
               << " failed." << endl;
          ACE_OS::exit(1);
        }

We create a topic for our Quote samples, indicating the topic name and the registered name of the Quote data type and using the default Quality-of-Service settings. Again, the Quote topic and type names must match on the publisher and the subscriber.

 // Get QoS to use for our two topics
        // Could also use TOPIC_QOS_DEFAULT instead
        DDS::TopicQos default_topic_qos;
        participant->get_default_topic_qos(default_topic_qos);
 
        // Create a topic for the Quote type...
        DDS::Topic_var quote_topic =
          participant->create_topic (QUOTER_QUOTE_TOPIC,
                                     QUOTER_QUOTE_TYPE,
                                     default_topic_qos,
                                     DDS::TopicListener::_nil());
 
        if (CORBA::is_nil (quote_topic.in ())) 
        {
          cerr << "create_topic for " 
               << QUOTER_QUOTE_TOPIC 
               << " failed." << endl;
          ACE_OS::exit(1);
        }

Similarly, we create a topic for our ExchangeEvent samples, indicating the topic name and the registered name of the ExchangeEvent type, and using the default Quality-of-Service settings. Again, the stock exchange event topic and type names must match on the publisher and on the subscriber.

// .. and another topic for the Exchange Event type
        DDS::Topic_var exchange_evt_topic =
          participant->create_topic (QUOTER_EXCHANGE_EVENT_TOPIC,
                                     QUOTER_EXCHANGE_EVENT_TYPE,
                                     default_topic_qos,
                                     DDS::TopicListener::_nil());
 
        if (CORBA::is_nil (exchange_evt_topic.in ())) 
        {
          cerr << "create_topic for " 
               << QUOTER_EXCHANGE_EVENT_TOPIC 
               << " failed."
               << endl;
          ACE_OS::exit(1);
        }

We create two data writers, one for each topic. We pass in the topic created above; the topic knows its type. Each data writer is associated to exactly one publisher and publishes on one topic. Later, our publisher publishes on each topic by writing data samples to each data writer. The following code creates a data writer for the "Stock Quotes" topic.

   // Get QoS to use for our two DataWriters
        // Could also use DATAWRITER_QOS_DEFAULT
        DDS::DataWriterQos dw_default_qos;
        pub->get_default_datawriter_qos (dw_default_qos);
 
        // Create a DataWriter for the Quote topic
        DDS::DataWriter_var quote_base_dw =
          pub->create_datawriter(quote_topic.in (),
                                 dw_default_qos,
                                 DDS::DataWriterListener::_nil());
 
        if (CORBA::is_nil (quote_base_dw.in ())) 
        {
          cerr << "create_datawriter for " 
               << QUOTER_QUOTE_TOPIC 
               << " failed." << endl;
          ACE_OS::exit(1);
        }
 
        StockQuoter::QuoteDataWriter_var quote_dw
          = StockQuoter::QuoteDataWriter::_narrow(quote_base_dw.in());
        if (CORBA::is_nil (quote_dw.in ())) 
        {
          cerr << "QuoteDataWriter could not be narrowed"
               << endl;
          ACE_OS::exit(1);
        }

We then create a data writer for the "Stock Exchange Event" topic. Again, we pass in the topic created above, and the topic knows its type.

 // Create a DataWriter for the Exchange Event topic
        DDS::DataWriter_var exchange_evt_base_dw =
          pub->create_datawriter(exchange_evt_topic.in (),
                                 dw_default_qos,
                                 DDS::DataWriterListener::_nil());
 
        if (CORBA::is_nil (exchange_evt_base_dw.in ())) 
        {
          cerr << "create_datawriter for " 
               << QUOTER_EXCHANGE_EVENT_TOPIC 
               << " failed." << endl;
          ACE_OS::exit(1);
        }
 
        StockQuoter::ExchangeEventDataWriter_var exchange_evt_dw =
          StockQuoter::ExchangeEventDataWriter::_narrow(
            exchange_evt_base_dw.in());
 
        if (CORBA::is_nil (exchange_evt_dw.in ())) 
        {
          cerr << "ExchangeEventDataWriter could not "
               << "be narrowed"<< endl;
          ACE_OS::exit(1);
        }

We may choose to register each data instance. Registering each data instance will slightly improve latency while writing samples of that instance.

A publisher may publish many data samples on each data instance. A data instance is identified by a unique key. For the Quote type, we identified ticker as the key field in its IDL type definition. Each Quote data sample with the same key value is considered part of the same data instance. In other words, each Quote sample published on the ticker symbol "SPY" is part of the same instance.

We have two Quote instances, for tickers symbols "SPY" (S&P Depository Receipts, i.e. the S&P 500) and "MDY" (S&P Midcap Depository Receipts, i.e. the S&P Midcap 400), and one ExchangeEvent instance, for the "Test Stock Exchange". We register each instance with the appropriate data writer. The registration method is actually called _cxx_register because register is a reserved word in C++.

 // Register the Exchange Event and the two
        // Quoted securities (SPY and MDY) with the
        // appropriate data writer
 
        StockQuoter::Quote spy;
        spy.ticker = CORBA::string_dup("SPY");
        DDS::InstanceHandle_t spy_handle = 
          quote_dw->_cxx_register(spy);
 
        StockQuoter::Quote mdy;
        mdy.ticker = CORBA::string_dup("MDY");
        DDS::InstanceHandle_t mdy_handle = 
          quote_dw->_cxx_register(mdy);
 
        StockQuoter::ExchangeEvent ex_evt;
        ex_evt.exchange = STOCK_EXCHANGE_NAME;
        DDS::InstanceHandle_t exchange_handle = 
          exchange_evt_dw->_cxx_register(ex_evt);

Finally, we publish. First, we publish a TRADING_OPENED event on the "Stock Exchange Event" topic.

 // Publish...
 
        StockQuoter::ExchangeEvent opened;
        opened.exchange = STOCK_EXCHANGE_NAME;
        opened.event = StockQuoter::TRADING_OPENED;
        opened.timestamp = get_timestamp();
 
        cout << "Publishing TRADING_OPENED" << endl;
        DDS::ReturnCode_t ret = 
          exchange_evt_dw->write(opened, exchange_handle);
 
        if (ret != DDS::RETCODE_OK) 
        {
          ACE_ERROR ((
            LM_ERROR, 
            ACE_TEXT("(%P|%t)ERROR: OPEN write returned %d.\n"),
            ret));
        }

Then, we publish several stock quote data samples for the "SPY" and "MDY" instances on the "Stock Quote" topic. We simply loop, increasing the quoted value for each ticker symbol a bit each time to simulate active trading on a really good day.

 ACE_Time_Value quarterSecond( 0, 250000 );
 
        for ( int i = 0; i < 20; ++i ) 
        {
          //
          // SPY
          //
 
          StockQuoter::Quote spy_quote;
          spy_quote.exchange = STOCK_EXCHANGE_NAME;
          spy_quote.ticker = CORBA::string_dup("SPY");
          spy_quote.full_name = 
            CORBA::string_dup("S&P Depository Receipts");
          spy_quote.value = 1200.0 + 10.0*i;
          spy_quote.timestamp = get_timestamp();
 
          cout << "Publishing SPY Quote: " 
               << spy_quote.value << endl;
 
          ret = quote_dw->write(spy_quote, spy_handle);
 
          if (ret != DDS::RETCODE_OK)
          {
            ACE_ERROR ((
              LM_ERROR, 
              ACE_TEXT("(%P|%t)ERROR: SPY write returned %d.\n"),
              ret));
          }
 
          ACE_OS::sleep( quarterSecond );
 
          //
          // MDY
          //
 
          StockQuoter::Quote mdy_quote;
          mdy_quote.exchange = STOCK_EXCHANGE_NAME;
          mdy_quote.ticker = CORBA::string_dup("MDY");
          mdy_quote.full_name = 
            CORBA::string_dup("S&P Midcap Depository Receipts");
          mdy_quote.value = 1400.0 + 10.0*i;
          mdy_quote.timestamp = get_timestamp();
 
          cout << "Publishing MDY Quote: " 
               << mdy_quote.value << endl;
 
          ret = quote_dw->write(mdy_quote, mdy_handle);
 
          if (ret != DDS::RETCODE_OK) 
          {
            ACE_ERROR ((
              LM_ERROR, 
              ACE_TEXT("(%P|%t)ERROR: MDY write returned %d.\n"),
              ret));
          }
 
          ACE_OS::sleep( quarterSecond );
        }

Last, we publish a TRADING_CLOSED event on the "Stock Exchange Event" topic to indicate that the stock exchange is closed for the day.

 StockQuoter::ExchangeEvent closed;
        closed.exchange = STOCK_EXCHANGE_NAME;
        closed.event = StockQuoter::TRADING_CLOSED;
        closed.timestamp = get_timestamp();
 
        cout << "Publishing TRADING_CLOSED" << endl;
 
        ret = exchange_evt_dw->write(closed, exchange_handle);
 
        if (ret != DDS::RETCODE_OK) 
        {
          ACE_ERROR ((
            LM_ERROR, 
            ACE_TEXT("(%P|%t)ERROR: CLOSED write returned %d.\n"),
            ret));
        }
 
        cout << "Exiting..." << endl;
      } catch (CORBA::Exception& e) {
        cerr << "Exception caught in main.cpp:" << endl
             << e << endl;
        ACE_OS::exit(1);
      }

Finally, we clean up after ourselves before exiting.

 // Cleanup
      try {
        if (!CORBA::is_nil (participant.in ())) {
          participant->delete_contained_entities();
        }
        if (!CORBA::is_nil (dpf.in ())) {
          dpf->delete_participant(participant.in ());
        }
      } catch (CORBA::Exception& e) {
        cerr << "Exception caught in cleanup." 
             << endl 
             << e << endl;
        ACE_OS::exit(1);
      }
      TheServiceParticipant->shutdown ();
      return 0;
    }

This completes the C++ code for the publisher.

SUBSCRIBER

Our subscriber subscribes to the stock quotes and stock exchange events, receiving data samples from the publisher. We use the publisher's TRADING_CLOSED event to indicate that trading has finished for the day, triggering the subscriber's graceful shutdown.

Much of the code in the subscriber is similar to that in the publisher. We get a domain participant, register types, etc. in the same way we did in the publisher. The main difference is that the subscriber is passive, waiting to receive samples, while the publisher is active. The subscriber uses listener objects to receive samples from the publisher.

First, we include the two type support header files generated by the dcps_ts.pl script. We also included these files in the publisher. However, we also include two listener header files, one for each published type. A listener is called by DDS when a data sample is published on the associated topic.

 #include "StockQuoterTypeSupportImpl.h"
    #include "ExchangeEventDataReaderListenerImpl.h"

We also include DCPS subscriber, service participant, and QoS header files.

 #include "dds/DCPS/Service_Participant.h"
    #include "dds/DCPS/Marked_Default_Qos.h"
    #include "dds/DCPS/SubscriberImpl.h"
    #include "dds/DCPS/BuiltinTopicUtils.h"
    #include "ace/streams.h"
    #include "orbsvcs/Time_Utilities.h"

The following constants are used for our domain, type names and topic names. These must match the domain, type names, and topic names used by the publisher.

// constants for Stock Quoter domain Id, types, and topic
    // (same as publisher)
    DDS::DomainId_t QUOTER_DOMAIN_ID = 1066;
    const char* QUOTER_QUOTE_TYPE = "Quote Type";
    const char* QUOTER_QUOTE_TOPIC = "Stock Quotes";
    const char* QUOTER_EXCHANGE_EVENT_TYPE =
      "Exchange Event Type";
    const char* QUOTER_EXCHANGE_EVENT_TOPIC = 
      "Stock Exchange Events";

The remainder of the subscriber's source code file contains its main(). We enter the subscriber's main().

  int main (int argc, char *argv[]) 
    {
      DDS::DomainParticipantFactory_var dpf = 
        DDS::DomainParticipantFactory::_nil();
 
      DDS::DomainParticipant_var participant = 
        DDS::DomainParticipant::_nil();
 
      try {

We create a domain participant, just as we did in the publisher. The specified domain matches the publisher's domain.

// Initialize, and create a DomainParticipant
        // (same code as publisher)
 
        dpf = TheParticipantFactoryWithArgs(argc, argv);
 
        participant = dpf->create_participant(
          QUOTER_DOMAIN_ID,
          PARTICIPANT_QOS_DEFAULT,
          DDS::DomainParticipantListener::_nil());
 
        if (CORBA::is_nil (participant.in ())) 
        {
          cerr << "create_participant failed." << endl;
          ACE_OS::exit(1);
        }

Then, we create a subscriber through the domain participant with default Quality-of-Service policy values. We can attach a SubscriberListener that is called by DCPS when certain subscription-related events happen. However, we don't care about those events, so we attach a nil. This is almost the same as what we did in the publisher when we called create_publisher.

// Create a subscriber for the two topics
        // (SUBSCRIBER_QOS_DEFAULT is defined 
        // in Marked_Default_Qos.h)
        DDS::Subscriber_var sub =
          participant->create_subscriber(
            SUBSCRIBER_QOS_DEFAULT,
            DDS::SubscriberListener::_nil());
 
        if (CORBA::is_nil (sub.in ())) 
        {
          cerr << "create_subscriber failed." << endl;
          ACE_OS::exit(1);
        }

As in the publisher, we must register the IDL Quote and ExchangeEvent types with the domain participant to subscribe to topics on those types.

// Register the Quote type
        // (same code as publisher)
        StockQuoter::QuoteTypeSupport_var quote_servant 
          = new StockQuoter::QuoteTypeSupportImpl();
 
        if (DDS::RETCODE_OK != 
              quote_servant->register_type(participant.in (),
                                           QUOTER_QUOTE_TYPE))
        {
          cerr << "register_type for " << QUOTER_QUOTE_TYPE 
               << " failed." << endl;
          ACE_OS::exit(1);
        }
 
        // Register the ExchangeEvent type
        // (same code as publisher)
        StockQuoter::ExchangeEventTypeSupport_var exchange_evt_servant  
          = new StockQuoter::ExchangeEventTypeSupportImpl();
 
        if (DDS::RETCODE_OK != 
              exchange_evt_servant->register_type(
                participant.in (),
                QUOTER_EXCHANGE_EVENT_TYPE)) 
        {
          cerr << "register_type for " 
               << QUOTER_EXCHANGE_EVENT_TYPE 
               << " failed." << endl;
          ACE_OS::exit(1);
        }

As in the publisher, we create a topic for our stock quotes, indicating the topic name and the registered name of the Quote type and using the default Quality-of-Service settings. Again, the stock quote topic name must match on the publisher and the subscriber.

// Get QoS to use for our two topics
        // Could also use TOPIC_QOS_DEFAULT instead
        // (same code as publisher)
        DDS::TopicQos default_topic_qos;
        participant->get_default_topic_qos(default_topic_qos);
 
        // Create a topic for the Quote type...
        // (same code as publisher)
        DDS::Topic_var quote_topic =
          participant->create_topic (QUOTER_QUOTE_TOPIC,
                                     QUOTER_QUOTE_TYPE,
                                     default_topic_qos,
                                     DDS::TopicListener::_nil());
 
        if (CORBA::is_nil (quote_topic.in ())) 
        {
          cerr << "create_topic for " 
               << QUOTER_QUOTE_TOPIC 
               << " failed." << endl;
          ACE_OS::exit(1);
        }

Similarly, we create a topic for our ExchangeEvent samples, indicating the topic name and the registered name of the ExchangeEvent type, and using the default Quality-of-Service settings. Again, the stock exchange event topic name must match on the publisher and the subscriber.

 // .. and another topic for the Exchange Event type
        // (same code as publisher)
        DDS::Topic_var exchange_evt_topic =
          participant->create_topic (QUOTER_EXCHANGE_EVENT_TOPIC,
                                     QUOTER_EXCHANGE_EVENT_TYPE,
                                     default_topic_qos,
                                     DDS::TopicListener::_nil());
 
        if (CORBA::is_nil (exchange_evt_topic.in ())) 
        {
          cerr << "create_topic for " 
               << QUOTER_EXCHANGE_EVENT_TOPIC 
               << " failed." 
               << endl;
          ACE_OS::exit(1);
        }

On the publisher, we created two data writers, one for each topic. On the subscriber, we'll create two data readers, one for each topic. Each data reader has exactly one subscriber and subscribes to one topic. We also attach a listener to each data reader to receive notification of published data samples. This is where the publisher and subscriber code diverges.

The following code creates a listener for the "Stock Quotes" topic. The listener is a local CORBA object, implementing the DDS::DataReaderListener IDL interface. We use OpenDDS's convenient servant_to_reference function template to obtain a reference of the interface type.

// Create DataReaders and DataReaderListeners for the
        // Quote and ExchangeEvent
 
        // Create a Quote listener
        QuoteDataReaderListenerImpl quote_listener_servant;    
 
        DDS::DataReaderListener_var quote_listener =
          ::OpenDDS::DCPS::servant_to_reference("e_listener_servant);
 
        if (CORBA::is_nil (quote_listener.in ())) 
        {
          cerr << "Quote listener is nil." << endl;
          ACE_OS::exit(1);
        }

We create a second listener for the "Stock Exchange Event" topic.

 // Create an ExchangeEvent listener
        ExchangeEventDataReaderListenerImpl exchange_evt_listener_servant;
 
        DDS::DataReaderListener_var exchange_evt_listener =
          ::OpenDDS::DCPS::servant_to_reference(&exchange_evt_listener_servant);
 
        if (CORBA::is_nil (exchange_evt_listener.in ())) 
        {
          cerr << "ExchangeEvent listener is nil." << endl;
          ACE_OS::exit(1);
        }

Finally, we create a data reader for each of the two topics. First, we create a data reader for the "Stock Quotes" topic, attaching the relevant listener we created above.

 // Create the Quote DataReader
 
        // Get the default QoS
        // Could also use DATAREADER_QOS_DEFAULT
        DDS::DataReaderQos dr_default_qos;
        sub->get_default_datareader_qos (dr_default_qos);
 
        DDS::DataReader_var quote_dr = 
          sub->create_datareader(quote_topic.in (),
                                 dr_default_qos,
                                 quote_listener.in ());

Then, we create a data reader for the "Stock Exchange Events" topic, attaching the other listener we created above.

  // Create the ExchangeEvent DataReader
 
        DDS::DataReader_var exchange_evt_dr = 
          sub->create_datareader(exchange_evt_topic.in (),
                                 dr_default_qos,
                                 exchange_evt_listener.in ());

OpenDDS spawns it own threads to handle incoming events from the publisher. Thus, there is no event loop code in the subscriber. However, we must be sure not to allow the main thread to exit before we're ready to shut down the entire subscriber process. So, we loop, processing stock quotes and stock exchange events until the TRADING_CLOSED event is received on the "Stock Exchange Events" topic. Essentially, we want to receive published data samples until the stock exchange tells us that it has closed. The sleep call causes us to check this once per second to avoid consuming too much of the CPU.

// Wait for events from the Publisher; shut
        // down when "close" received
        cout << "Subscriber: waiting for events" << endl;
        while ( ! exchange_evt_listener_servant.
                    is_exchange_closed_received() )
        {
          ACE_OS::sleep(1);
        }

When we have received the TRADING_CLOSED event, we gracefully exit the loop.

  cout << "Received CLOSED event from publisher; "
             << " exiting..." 
             << endl;
 
      } catch (CORBA::Exception& e) {
        cerr << "Exception caught in main.cpp:" << endl
             << e << endl;
        ACE_OS::exit(1);
      }

Finally, we clean up after ourselves before exiting.

  // Cleanup
      try {
        if (!CORBA::is_nil (participant.in ())) {
          participant->delete_contained_entities();
        }
        if (!CORBA::is_nil (dpf.in ())) {
          dpf->delete_participant(participant.in ());
        }
      } catch (CORBA::Exception& e) {
        cerr << "Exception caught in cleanup." 
             << endl 
             << e << endl;
        ACE_OS::exit(1);
      }
      TheServiceParticipant->shutdown ();
      return 0;
    }

SUBSCRIBER'S "STOCK QUOTE" AND "STOCK EXCHANGE EVENT" LISTENERS

The "Stock Quote" data reader and the "Stock Exchange Event" data reader each has a listener attached. These listeners are called by the DDS framework each time a data sample is received from a publisher. We have decided that each of the two data readers shall have its own listener, although we could use a single listener for both data readers if we code the listener to handle both data types.

Each listener implements the DDS::DataReaderListener IDL interface. We used both a QuoteDataReaderListenerImpl and an ExchangeEventDataReaderListenerImpl in the subscriber code above, but we have not yet defined those classes. We will do that now.

First, we write a listener for the Quote type's data reader. This listener class implements the DDS::DataReaderListener IDL interface, overriding seven pure virtual methods from the interface. It is a CORBA local object implementation of an IDL interface, inheriting from the generated class for the IDL interface.

The listener class must override all seven methods, including methods for which the listener's implementation is empty. However, for simplicity, we'll show only the on_data_available method, which is called when a new Quote data sample is available. The other six methods have empty implementations. We'll also use a default constructor and destructor.

 #include "StockQuoterTypeSupportC.h"
    #include "StockQuoterTypeSupportImpl.h"
    #include "dds/DCPS/Service_Participant.h"
    #include "dds/DdsDcpsSubscriptionS.h"
    #include "ace/streams.h"
 
    class QuoteDataReaderListenerImpl
      : public virtual OpenDDS::DCPS::LocalObject<DDS::DataReaderListener>
    {
    public:
      // DDS calls on_data_available on the listener for each
      // received Quote sample.
      virtual void on_data_available(DDS::DataReader_ptr reader)
        throw (CORBA::SystemException) 
      {
        try 
        {

We first narrow the value of the data reader parameter to the appropriate type for a Quote sample.

StockQuoter::QuoteDataReader_var quote_dr = 
            StockQuoter::QuoteDataReader::_narrow(reader);
 
          if (CORBA::is_nil (quote_dr.in ())) 
          {
            cerr << "QuoteDataReaderListenerImpl:: "
                 << "on_data_available:" 
                 << " _narrow failed." << endl;
            ACE_OS::exit(1);
          }

Then, we take the next Quote sample from the data reader. Note the type safety of the QuoteDataReader interface.

StockQuoter::Quote quote;
          DDS::SampleInfo si;
 
          DDS::ReturnCode_t status = 
            quote_dr->take_next_sample(quote, si) ;

Once we have received the Quote sample, we simply print out its contents.

if (status == DDS::RETCODE_OK) {
            cout << "Quote: ticker    = " << quote.ticker.in()    
                 << endl
                 << "       exchange  = " << quote.exchange.in()  
                 << endl
                 << "       full name = " << quote.full_name.in() 
                 << endl
                 << "       value     = " << quote.value          
                 << endl
                 << "       timestamp = " << quote.timestamp      
                 << endl;
 
            cout << "SampleInfo.sample_rank = " 
                 << si.sample_rank << endl;
          }
          else if (status == DDS::RETCODE_NO_DATA) 
          {
            cerr << "ERROR: reader received DDS::RETCODE_NO_DATA!"
                 << endl;
          }
          else 
          {
            cerr << "ERROR: read Quote: Error: " 
                 <<  status << endl;
          }

The Quote sample's memory is cleaned up by the stack when it goes out of scope.

  } catch (CORBA::Exception& e) {
          cerr << "Exception caught in read:" 
               << endl << e << endl;
          ACE_OS::exit(1);
        }
      }

We have not shown implementations of the other methods in the DDS::DataReaderListener interface, but we must override them as well, even if their implementations are empty.

 // must also override:
      //    on_requested_deadline_missed
      //    on_requested_incompatible_qos (
      //    on_liveliness_changed
      //    on_subscription_match
      //    on_sample_rejected
      //    on_sample_lost
    };

Next, we write a listener for the ExchangeEvent type's data reader. The basic structure is the same as that of the QuoteDataReaderListenerImpl.

#include "ExchangeEventDataReaderListenerImpl.h"
    #include "StockQuoterTypeSupportImpl.h"
    #include "dds/DCPS/Service_Participant.h"
    #include "dds/DdsDcpsSubscriptionS.h"
    #include "ace/streams.h"
    #include "ace/Synch.h"
 
    class ExchangeEventDataReaderListenerImpl
      : public virtual OpenDDS::DCPS::LocalObject<DDS::DataReaderListener>
    {
    public:

We add the is_exchange_closed_received method to the data reader so the subscriber's main program can find out when the TRADING_CLOSED stock exchange event has been received. This method checks a boolean value under the protection of a mutex lock. The boolean value is set by the listener's on_data_available method when a TRADING_CLOSED stock exchange event is received.

// app-specific
      CORBA::Boolean is_exchange_closed_received()
      {
        ACE_Guard<ACE_Mutex> guard(this->lock_);
        return this->is_exchange_closed_received_;
      }

DDS calls on_data_available on the listener for each received ExchangeEvent sample.

  virtual void on_data_available(DDS::DataReader_ptr reader)
        throw (CORBA::SystemException) 
      {
        try 
        {

As in the QuoteDataReaderListenerImpl, we first narrow the value of the data reader parameter to the appropriate type, which in this case is an ExchangeEventDataReader.

 StockQuoter::ExchangeEventDataReader_var exchange_evt_dr = 
            StockQuoter::ExchangeEventDataReader::_narrow(reader);
 
          if (CORBA::is_nil (exchange_evt_dr.in ())) {
            cerr << "ExchangeEventDataReaderListenerImpl:: "
                 << "on_data_available:"
                 << " _narrow failed." 
                 << endl;
            ACE_OS::exit(1);
          }

Then, we take the next ExchangeEvent sample from the data reader. Note the type safety.

   StockQuoter::ExchangeEvent exchange_evt;
          DDS::SampleInfo si;
 
          DDS::ReturnCode_t status = 
            exchange_evt_dr->take_next_sample(exchange_evt, si) ;

Once we have received the ExchangeEvent sample, we simply print out its contents.

     if (status == DDS::RETCODE_OK) {
            cout << "ExchangeEvent: exchange  = " 
                 << exchange_evt.exchange.in() << endl;
 
            switch ( exchange_evt.event ) {
              case StockQuoter::TRADING_OPENED:
                cout << "TRADING_OPENED" << endl;
                break;

When we receive a TRADING_CLOSED event, we set a flag indicating that the stock exchange has been closed for the day.

 case StockQuoter::TRADING_CLOSED: {
                cout << "TRADING_CLOSED" << endl;
 
                ACE_Guard<ACE_Mutex> guard(this->lock_);
                this->is_exchange_closed_received_ = 1;
                break;
              }
              case StockQuoter::TRADING_SUSPENDED:
                cout << "TRADING_SUSPENDED" << endl;
                break;
              case StockQuoter::TRADING_RESUMED:
                cout << "TRADING_RESUMED" << endl;
                break;
              default:
                cerr << "ERROR: reader received unknown "
                     << "ExchangeEvent: "
                     << exchange_evt.event 
                     << endl;
            }
 
            cout << "timestamp = " 
                 << exchange_evt.timestamp
                 << endl;
 
            cout << "SampleInfo.sample_rank = " 
                 << si.sample_rank 
                 << endl;
          }
          else if (status == DDS::RETCODE_NO_DATA) 
          {
            cerr << "ERROR: reader received "
                 << "DDS::RETCODE_NO_DATA!" 
                 << endl;
          }
          else 
          {
            cerr << "ERROR: read ExchangeEvent: Error: " 
                 <<  status 
                 << endl;

The ExchangeEvent sample is cleaned up by the stack when it goes out of scope.

} catch (CORBA::Exception& e) {
      cerr << "Exception caught in read:" << endl 
           << e << endl;
      ACE_OS::exit(1);    
    }
  }
 
  // must also override:
  //    on_requested_deadline_missed
  //    on_requested_incompatible_qos (
  //    on_liveliness_changed
  //    on_subscription_match
  //    on_sample_rejected
  //    on_sample_lost

We have added two private class attributes to keep track of the TRADING_CLOSED event and protect that value with a lock.

  private:
      CORBA::Boolean is_exchange_closed_received_;
      ACE_Mutex lock_;
    };

This completes the C++ code for the subscriber.

BUILDING THE PUBLISHER AND SUBSCRIBER

We use MPC, the Make Project Creator, to generate build files for the publisher and subscriber. MPC provides a simple syntax and is capable of generating build files for GNU Make, Visual C++, and many other build systems. For more information on MPC, please see OCI's MPC page at https://objectcomputing.com/products/mpc.

We create two files to build our Stock Quoter, a workspace file and a project file. Our workspace file simply tells MPC where to find the MPC dcps and dcpsexe base project files that we'll use later.

 //
    // file StockQuoter.mwc
    //
 
    workspace {
      cmdline += -relative DDS_ROOT=$DDS_ROOT
    }

Next, we create an MPC file containing three projects - a Common project containing IDL and TypeSupport files, a Publisher, and a Subscriber. Each of the three projects inherits from either the dcps or dcpsexe base project, which are located in $DDS_ROOT. First, we create a library called StockQuoterCommon to hold the code generated by the TAO IDL and opendds_idl compilers.

//
    // file StockQuoter.mpc
    //
 
    project(*Common) : dcps {
      sharedname = StockQuoterCommon
      libout = .
      includes += $(TAO_ROOT)/orbsvcs
      idlflags += -I$(TAO_ROOT)/orbsvcs
      idlflags += -Wb,export_macro=StockQuoterCommon_Export
      idlflags += -Wb,export_include=StockQuoterCommon_Export.h
      dcps_ts_flags += --export=StockQuoterCommon_Export
      dynamicflags = STOCKQUOTERCOMMON_BUILD_DLL

dcps project has a new section, TypeSupport_Files. This section executes the opendds_idl script to generate TypeSupport files from our DDS data types. Here, we indicate the IDL file that contains our DDS data types and also indicate the TypeSupport files generated from it.

 TypeSupport_Files {
        StockQuoter.idl
      }

Our IDL_Files section contains our original IDL files plus the TypeSupport IDL files generated by the previous section.

IDL_Files {
        StockQuoterTypeSupport.idl
        StockQuoter.idl
      }

The Header_Files and Source_Files sections contain the opendds_idl-generated TypeSupport implementation files. MPC automatically adds the generated IDL stubs and skeletons, so we don't need to add those manually.

   Header_Files {
        StockQuoterTypeSupportImpl.h
      }
 
      Source_Files {
        StockQuoterTypeSupportImpl.cpp
      }
    } 

Our publisher uses the StockQuoterCommon library from above, and adds a publisher.cpp source file containing the publisher's main().

project(*Publisher) : dcpsexe, svc_utils {
      after += *Common
      exename   = publisher
 
      includes += $(TAO_ROOT)/orbsvcs
      libs += StockQuoterCommon
      dynamicflags = STOCKQUOTERCOMMON_HAS_DLL
 
      TypeSupport_Files {
      }
 
      IDL_Files {
      }
 
      Header_Files {
      }
 
      Source_Files {
        publisher.cpp
      }
 
      Documentation_Files {
        README.txt
        domain_ids
      }
    }

Our subscriber also uses the StockQuoterCommon library, adds a subscriber.cpp source file containing the subscriber's main(), and its two listeners.

  project(*Subscriber) : dcpsexe {
      after += *Common
      exename   = subscriber
 
      includes += $(TAO_ROOT)/orbsvcs
      libs += StockQuoterCommon
      dynamicflags = STOCKQUOTERCOMMON_HAS_DLL
 
      TypeSupport_Files {
      }
 
      IDL_Files {
      }
 
      Header_Files {
        QuoteDataReaderListenerImpl.h
      }
 
      Source_Files {
        QuoteDataReaderListenerImpl.cpp
        subscriber.cpp
      }
 
      Documentation_Files {
        README.txt
        domain_ids
      }
    }

We use this MPC file to generate build files for our build system. For example, to generate GNU Makefiles, we execute

 $ACE_ROOT/bin/mwc.pl -type gnuace StockQuoter.mwc

To generate Visual C++ 7.1 solution files, we execute

 perl %ACE_ROOT%/bin/mwc.pl -type vc71 StockQuoter.mwc

We then build the project.

CONFIGURING THE STOCK QUOTER

OpenDDS includes a file-based configuration mechanism. With it, an OpenDDS user may configure a publisher's or subscriber's transport, the location of the DCPSInfoRepo process, and many other settings. The syntax of the configuration file is similar to the syntax of a Windows INI file. It contains several sections, which in turn contain property-like entries. The basic syntax is as follows:

   [section1-name]
    Attribute1=value1
    Attribute2=value2
 
    [section2-name]
    Attribute1=value1
    Attribute2=value2

The complete set of configuration settings is described in the Configuration Chapter of the OpenDDS Developer's Guide.

Our TCP-based example uses one configuration file, dds_tcp_conf.ini, for both the publisher and the subscriber:

#
    # dds_tcp_conf.ini
    #
 
[common]
 
# Debug Level
DCPSDebugLevel=0
 
# IOR of DCPSInfoRepo process.
DCPSInfoRepo=corbaloc::localhost:12345/DCPSInfoRepo
 
# Sets the global transport configuration (used by default in the
# process to config1, defined below
DCPSGlobalTransportConfig=config1
 
# Transport configuration named config1, contains a single transport
# instance named tcp1 (defined below)
[config/config1]
transports=tcp1
 
# Transport instance named tcp1, of type "tcp".  Uses defaults for
# all configuration paramaters.
[transport/tcp1]
transport_type=tcp

Please note that there are three sections, [common][config/config1], and [transport/tcp1]. The[common] section contains configuration values that apply to the entire process; in this configuration file, we specify a debug level, an object reference for the DCPSInfoRepo process, and a global transport configuration. Here, ourDCPSInfoRepo process is listening on the loopback (127.0.0.1) interface, which means we have configured it to only be available to DDS processes running on the same host. To make it available across the network, use an IP address or a network hostname instead of localhost. We have specified config1 as our global transport configuration, meaning it that the transport configuration with that name is used by all readers and writers in our process that do not explicitly specify another transport configuration.

The [config/config1] section defines a transport configuration with the name config1. The transports option specifies tcp1 as the only transport instance included in this configuration.

The [transport/tcp1] section defines a transport instance named tcp1 and specifies its transport type as tcp. This section can also be used to configure the transport with a number of configuration options as described in the OpenDDS documentation.

RUNNING THE STOCK QUOTER OVER A TCP TRANSPORT

To run the example, we must start a DCPSInfoRepo process, and start at least one publisher and one subscriber. To start the DCPSInfoRepo, we use the following command line:

  $DDS_ROOT/bin/DCPSInfoRepo -ORBListenEndpoints iiop://localhost:12345

Our DCPSInfoRepo process listens on port 12345. That port matches the port that we specified in the DCPSInfoRepo object reference in the transport configuration file above. This DCPSInfoRepo process is listening on the loopback (127.0.0.1) interface, which means we have configured it to only be available to DDS processes running on the same host. Again, to make it available across the network, use an IP address or a network hostname instead of localhost.

We run two subscribers and one publisher:

  subscriber -DCPSConfigFile dds_tcp_conf.ini
 
    subscriber -DCPSConfigFile dds_tcp_conf.ini
 
    publisher -DCPSConfigFile dds_tcp_conf.ini

We use the -DCPSConfigFile command-line argument to indicate the name of the configuration file we created above. Note that each subscriber and publisher uses the same transport configuration file.

The above command lines are used to run the DCPSInfoRepo, publisher and subscriber with built-in-topic on which is the default case. We can also run these processes with the built-in-topic off. The -NOBITS is used by DCPSInfoRepo to turn off built-in-topic and the "-DCPSBit 0" is used by other DDS applications. The command lines are as follows:

   $DDS_ROOT/bin/DCPSInfoRepo \
      -NOBITS -ORBListenEndpoints iiop://localhost:12345
 
    subscriber -DCPSBit 0 -DCPSConfigFile dds_tcp_conf.ini
 
    subscriber -DCPSBit 0 -DCPSConfigFile dds_tcp_conf.ini
 
    publisher -DCPSBit 0 -DCPSConfigFile dds_tcp_conf.ini

The publisher publishes 20 stock quotes for each the SPY and MDY ticker symbols, and each subscriber receives them. When the publisher is finished, it publishes a "TRADING_CLOSED" message, which causes the subscribers to exit.

RUNNING THE STOCK QUOTER OVER A UDP TRANSPORT

We can use the same code base to run the example over a UDP transport by simply running with a configuration file that defines a global transport configuration that specifies a UDP transport instance.

Here is the dds_udp_conf.ini file with changes marked in bold:

   #
    # dds_udp_conf.ini
    #
 
[common]
 
# Debug Level
DCPSDebugLevel=0
 
# IOR of DCPSInfoRepo process.
DCPSInfoRepo=corbaloc::localhost:12345/DCPSInfoRepo
 
# Sets the global transport configuration (used by default in the
# process to config1, defined below
DCPSGlobalTransportConfig=config1
 
# Transport configuration named config1, contains a single transport
# instance named udp1 (defined below)
[config/config1]
transports=udp1
 
# Transport instance named udp1, of type "udp".  Uses defaults for
# all configuration paramaters.
[transport/udp1]
transport_type=udp
 

We then start the DCPSInfoRepo process as before:

 $DDS_ROOT/bin/DCPSInfoRepo -ORBListenEndpoints iiop://localhost:12345

We start the two subscribers and the publisher, using our new transport configuration files:

  subscriber -DCPSConfigFile dds_udp_conf.ini 
 
    subscriber -DCPSConfigFile dds_udp_conf.ini 
 
    publisher -DCPSConfigFile dds_udp_conf.ini

We can also run each process with the built-in-topic off. The command-lines are as follows:

$DDS_ROOT/bin/DCPSInfoRepo -NOBITS \
      -ORBListenEndpoints iiop://localhost:12345
 
    subscriber -DCPSBit 0 -DCPSConfigFile dds_udp_conf.ini 
 
    subscriber -DCPSBit 0 -DCPSConfigFile dds_udp_conf.ini 
 
    publisher -DCPSBit 0 -DCPSConfigFile dds_udp_conf.ini

As before, the publisher publishes 20 stock quotes for each the SPY and MDY ticker symbols, and each subscriber receives them. When the publisher is finished, it again publishes a "TRADING_CLOSED" message, which causes the subscribers to exit. The only difference is that we have substituted a UDP transport for a TCP transport; the change in transport required no code changes.

SUMMARY

The OMG Data Distribution Service (DDS) for Real-Time Systems is a specification for a high-performance, type-safe, publish-and-subscribe communication middleware. DDS addresses data-centric applications, i.e. those for which dissemination of application data is a significant requirement.

OpenDDS is an open source implementation of the OMG Data Distribution Service specification, providing users with an efficient publish-and-subscribe framework with the advantages of the open-source software development model.

OpenDDS includes a file-based configuration mechanism. Through a configuration file, an OpenDDS user may configure a publisher's or subscriber's transport(s), debugging output, memory allocation, the location of the DCPSInfoRepo broker process, and many other settings. We have shown in our example that an OpenDDS application's underlying transport can be swapped out without making any code changes.

REFERENCES

secret