Code Generation with OpenDDS, Part I

Code Generation with OpenDDS, Part I

By Charles Calkins, OCI Senior Software Engineer

June 2010


Introduction

Applications that use the Data Distribution Service (DDS) typically have two elements in common:

  1. Boilerplate code:
    The sequence of steps to initialize the DDS framework, and to create and destroy domain participants, is the same from project to project. Simplifying the code to write for the application's skeleton reduces development time.

  2. Knowledge of IDL and C++:
    DDS implementations such as OpenDDS [1] are written in C++ and require structures used as data samples to be described in the Object Management Group's Interface Definition Language (IDL). Allowing a developer to write code which uses DDS in their language of choice, rather than in IDL and C++, can lead to a shorter learning curve and wider use of DDS as a technology.

Parts I and II of this article explore one approach to making DDS simpler to use, and to make it accessible to a wider array of development languages. We shall use OpenDDS as our DDS implementation, and goals we shall achieve are as follows:

  1. Create a C++-based wrapper for OpenDDS to simplify written code to consist solely of publish and subscribe functions.

  2. Provide a means to automatically generate these wrappers from user-defined data types (in Part II).

  3. Provide a layer which exposes the generated code to the .NET world.

  4. Demonstrate how the .NET interface to OpenDDS can be used by code written in arbitrary .NET languages.

While this article focuses on a particular wrapping of OpenDDS functionality that is made available for software using the .NET Framework, this same technique is applicable in a wider sense — code generators could be written to encapsulate OpenDDS for other target languages and environments of choice.

OpenDDS Review

While OpenDDS is described in detail in [2] and elsewhere, a short review of the major elements is beneficial.

Given the definition of a data sample expressed in IDL, the OpenDDS IDL compiler tool chain generates several code elements. The elements of interest for this article include the following:

  1. A structure that provides a C++ representation of the member variables of the data sample.

  2. An associated type support class which provides operations such as duplication, narrowing, and registration of the type in a DomainParticipant.

  3. Specializations of DataReader and DataWriter for the data sample.

Data samples are not used in isolation — they are the data representation of a topic, associated with a DDS domain. In OpenDDS, given a numeric DomainId that represents a DDS domain, the application creates a DomainParticipant for that DomainId. The generated type support class for the data sample is used to register the type with the DomainParticipant, and, given a topic name, the create_topic() method of the DomainParticipant associates the type with the topic.

Through the factory TheTransportFactory the application creates a transport, such as SimpleTcp or multicast, used for the transmission of data samples.

If an application publishes data samples, the DomainParticipant is used to create a DDS Publisher. The Publisher is attached to a transport, and then used to create a DataWriter for a particular topic. The DataWriter is narrowed to a DataWriter specific for the data sample type (as generated by the OpenDDS IDL compiler tool chain), and its write() method is used to publish a data sample.

If an application consumes data samples, the DomainParticipant is used to create a DDS Subscriber. The Subscriber is attached to a transport, and then used to create a DataReader for a particular topic. The DataReader is narrowed to a DataReader specific for the data sample type (as generated by OpenDDS IDL compiler tool chain), and the on_data_available() method of a listener associated with the DataReader is called as data samples arrive.

Upon application termination, all entities derived from a given DomainParticipant must be deleted before the DomainParticipant itself is. When all DomainParticipants have been freed, TheTransportFactory must be released, and TheServiceParticipant shut down.

A Generic C++ Wrapper

We wish to distil the complexity described in the previous section into a simple publish and subscribe interface, both for conceptual understanding and later code generation. Features of the wrapper will be highlighted here — the full implementation is available in the Common library of the code archive that accompanies this article.

To begin the wrapper, we will create a C++ class, DDSBase, with two goals in mind. The first is to manage initialization and cleanup — these will be performed in the constructor and destructor of the class, respectively, as per the RAII programming idiom [3]. The other is to keep track of the various DDS entities — when an entity is to be created, a list is first consulted to see if the entity has been created before. If it has, it is retrieved and a reference to it returned to the caller. Otherwise, the entity is created, added to the list, and a reference returned to the caller.

In the code below, the TheParticipantFactoryWithArgs() function is called in the constructor of DDSBase to initialize OpenDDS. A std::map is used to store DomainParticipants that have been created, and the contents of this map are freed in the destructor of DDSBase. The destructor also frees TheTransportFactory and TheServiceParticipant.

// Common/DDSBase.h
class DDSBase {
    typedef std::map<::DDS::DomainId_t, DDS::DomainParticipant_var> 
        DomainParticipantMap;
    DDS::DomainParticipantFactory_var dpf_;
    DomainParticipantMap participantMap_;
 
public:
    DDSBase(int argc, ACE_TCHAR *argv[]) {
        dpf_ = TheParticipantFactoryWithArgs(argc, argv);
    }
 
    ~DDSBase() {
    	for (DomainParticipantMap::iterator it=participantMap_.begin(); 
    	    it!=participantMap_.end(); it++) {
            it->second->delete_contained_entities();
            dpf_->delete_participant(it->second.in());
        }
 
        TheTransportFactory->release();
        TheServiceParticipant->shutdown();
    }

Next, we provide a method to retrieve a DomainParticipant, given a DomainId. As per the pattern described previously, if a matching participant already exists in the participant map, it is returned, else a new one is created. The DomainParticipant is returned as a _ptr instead of a _var because the lifetime of the participant is managed by this class.

We will assume that the default quality of service (QoS) for each DDS entity, such as the DomainParticipant here, is sufficient for our purposes. Adding QoS policies would be straightforward (i.e., provide a list of QoS policies to the method to apply, where the list becomes part of the map's key to distinguish otherwise identical entities), but the exercise is left for the reader.

protected:
    DDS::DomainParticipant_ptr GetParticipant(::DDS::DomainId_t domainId) {
        DomainParticipantMap::iterator it=participantMap_.find(domainId);
        if (it==participantMap_.end()) {
            ::DDS::DomainParticipant_ptr participant = dpf_->create_participant(
                domainId,
                PARTICIPANT_QOS_DEFAULT,
                0,
                OpenDDS::DCPS::DEFAULT_STATUS_MASK);
 
            if (0 == participant)
                throw DDSException("DDSBase::GetParticipant(): create_participant failed");
 
            participantMap_[domainId] = participant;
        }
 
        return ::DDS::DomainParticipant::_duplicate(participantMap_[domainId].in());
    }

This method does both the search for a previously-created DomainParticipant, and creating it if not found, as described above. The methods for the creation of DataReaders and DataWriters, however, will be performed in two stages. The CreateReader() and CreateWriter() methods represent the "create if missing" aspect, while the search for a previously created one will be performed elsewhere.

We also define a class, DDSException, a subclass of std::exception, to represent errors in the process.

// Common/DDSBase.h
class DDSException : public std::exception {
public:
    DDSException(const char *const& _What) : std::exception(_What) {}
};

Continuing with class DDSBase, we desire methods to be able to create any DataReader and DataWriter, so must be templatized. As many unrelated types are needed, they can be collected into a TypeTraits class, identified by a single template parameter that is representative of the collection. This technique is also demonstrated in the code of an earlier Middleware News Brief. [4]

In the file TypeTraits.h we define a templated struct, TypeTraits, to reference the various DDS entities associated with a given data sample structure. Unlike in the aforementioned MNB, however, we define this template via a macro to reduce the amount of code that needs to be replicated later in the application.

// TypeTraits.h
template <typename DDS_STRUCT_T>
struct TypeTraits
{
};
 
#define DEFINE_TYPETRAITS(TYPE) \
template <> \
struct TypeTraits<TYPE> \
{ \
    typedef TYPE##TypeSupport TypeSupport; \
    typedef TYPE##TypeSupportImpl TypeSupportImpl; \
    typedef TYPE##DataWriter Writer; \
    typedef TYPE##DataReader Reader; \
};

Returning to DDSBase, we can now define CreateWriter() and CreateReader() methods with just one template parameter which then provides access to the additional types. The return from CreateWriter() is a wrapper around a type-specific DataWriter for the given domain and topic.

    template <typename TStructure>
    Writer<TStructure> CreateWriter(::DDS::DomainId_t domainId, 
        const char *topic_name) {

We obtain the DomainParticipant for the domain, creating it if needed.

DDS::DomainParticipant_var participant = GetParticipant(domainId);

Next, we use the type support class to register the type in the domain, and to provide the type name for topic creation. Upon failure, an exception will be thrown.

        // Create Topic
        typename TypeTraits<TStructure>::TypeSupport::_var_type ts = 
            new TypeTraits<TStructure>::TypeSupportImpl();
 
        if (ts->register_type(participant, "") != DDS::RETCODE_OK) 
            throw DDSException("DDSBase::CreateWriter(): register_type failed");
 
        CORBA::String_var type_name = ts->get_type_name();
        DDS::Topic_var topic =
            participant->create_topic(topic_name,
            type_name.in(),
            TOPIC_QOS_DEFAULT,
            0,
            OpenDDS::DCPS::DEFAULT_STATUS_MASK);
 
        if (0 == topic)
            throw DDSException("DDSBase::CreateWriter(): create_topic failed");

The Publisher is then created, with a default quality of service.

        // Create Publisher
        DDS::Publisher_var publisher =
            participant->create_publisher(PUBLISHER_QOS_DEFAULT,
            0,
            OpenDDS::DCPS::DEFAULT_STATUS_MASK);
 
        if (0 == publisher)
            throw DDSException("DDSBase::CreateWriter(): create_publisher failed");

We now create a transport that uses the SimpleTcp protocol, and attach it to the Publisher — hard-coding a particular transport is a way to simplify the wrapper's interface, but could be parameterized if needed. As SimpleTcp provides reliable sample delivery, it is an appropriate choice.

        // Initialize and attach Transport
        OpenDDS::DCPS::TransportImpl_rch transport_impl =
            TheTransportFactory->create_transport_impl(
            OpenDDS::DCPS::DEFAULT_SIMPLE_TCP_ID,
            OpenDDS::DCPS::AUTO_CONFIG);
 
        if (transport_impl->attach(publisher.in()) != OpenDDS::DCPS::ATTACH_OK)
            throw DDSException("DDSBase::CreateWriter(): transport_impl->attach failed");

Finally, we create the type-specific DataWriter by creating a DataWriter which is narrowed to the desired type. The DataWriter is then returned to the caller.

        // Create DataWriter
        DDS::DataWriter_var writer =
            publisher->create_datawriter(topic.in(),
            DATAWRITER_QOS_DEFAULT,
            0,
            OpenDDS::DCPS::DEFAULT_STATUS_MASK);
 
        if (0 == writer)
            throw DDSException("DDSBase::CreateWriter(): create_datawriter failed");
 
        return Writer<TStructure>(writer.in());
    }

The CreateReader() method will complete the DDSBase class. As with CreateWriter(), we want this function to be generic as well. Two template parameters are needed — the key for the TypeTraits, and an additional type, representing a listener for the DataReader is provided, which will be described later in this article.

    template <typename TStructure, typename TListener>
    Reader<TStructure, TListener> CreateReader(
        ::DDS::DomainId_t domainId, const char *topic_name, 
        TListener *listener) {

Code to obtain a DomainParticipant and to register the type are the same as in CreateWriter().

        DDS::DomainParticipant_var participant = GetParticipant(domainId);
 
        // Create Topic
        typename TypeTraits<TStructure>::TypeSupport::_var_type ts = 
            new TypeTraits<TStructure>::TypeSupportImpl();
 
        if (ts->register_type(participant, "") != DDS::RETCODE_OK)
            throw DDSException("DDSBase::CreateReader(): register_type failed");
 
        CORBA::String_var type_name = ts->get_type_name();
        DDS::Topic_var topic =
            participant->create_topic(topic_name,
            type_name.in(),
            TOPIC_QOS_DEFAULT,
            0,
            OpenDDS::DCPS::DEFAULT_STATUS_MASK);
 
        if (0 == topic)
            throw DDSException("DDSBase::CreateReader(): create_topic failed");

We now perform actions in parallel to those in CreateWriter(), but as a subscriber — create a Subscriber, and create and attach the SimpleTcp transport.

        // Create Subscriber
        DDS::Subscriber_var subscriber =
            participant->create_subscriber(SUBSCRIBER_QOS_DEFAULT,
            0,
            OpenDDS::DCPS::DEFAULT_STATUS_MASK);
 
        if (0 == subscriber)
            throw DDSException("DDSBase::CreateReader(): create_subscriber failed");
 
        // Initialize and attach Transport
        OpenDDS::DCPS::TransportImpl_rch transport_impl =
            TheTransportFactory->create_transport_impl(
            OpenDDS::DCPS::DEFAULT_SIMPLE_TCP_ID,
            OpenDDS::DCPS::AUTO_CONFIG);
 
        OpenDDS::DCPS::AttachStatus status = transport_impl->attach(subscriber.in());
 
        if (status != OpenDDS::DCPS::ATTACH_OK)
            throw DDSException("DDSBase::CreateReader(): transport_impl->attach failed");

We create a DataReader, specifying the listener, and a non-default quality of service. The RELIABILITY QoS policy is set to RELIABLE, instead of the default of BEST_EFFORT. Later, we will use wait_for_acknowledgements() for the publisher to ensure that all published data has been received, and this operation requires that a DataReader must be RELIABLE. As of this writing (OpenDDS v2.1.3), the OpenDDS Developer's Guide shows the Messenger application using wait_for_acknowledgements() with a BEST_EFFORT DataReader, which is not correct.

        // Create DataReader - set RELIABLE reliability
        DDS::DataReaderQos dr_qos;
        if (subscriber->get_default_datareader_qos(dr_qos) != ::DDS::RETCODE_OK)
            throw DDSException("DDSBase::CreateReader(): get_default_datareader_qos failed");
 
        dr_qos.reliability.kind = DDS::RELIABLE_RELIABILITY_QOS;
 
        DDS::DataReader_var reader =
            subscriber->create_datareader(topic.in(),
            dr_qos,
            listener,
            OpenDDS::DCPS::DEFAULT_STATUS_MASK);
 
        if (0 == reader)
            throw DDSException("DDSBase::CreateReader(): create_datareader failed");

Finally, we return a wrapped, narrowed DataReader to the caller.

         return Reader<TStructure, TListener>(reader, listener);
    }   
};

As with CreateReader() and CreateWriter(), at this level we also want the listener for the DataReader to be generic, so it too is templatized and has a data processing method which must be overridden in a subclass. The DataReaderListenerImplBase class also provides empty implementations for various listener methods that are not needed by the wrapper.

// Common/DataReaderListenerImplBase.h
template <typename TDataReader, typename TStructure>
class DataReaderListenerImplBase
    : public virtual OpenDDS::DCPS::LocalObject<DDS::DataReaderListener> {
public:
    virtual void on_requested_deadline_missed(
        DDS::DataReader_ptr,
        const DDS::RequestedDeadlineMissedStatus&) {}
 
    virtual void on_requested_incompatible_qos(
        DDS::DataReader_ptr,
        const DDS::RequestedIncompatibleQosStatus&) {}
 
    virtual void on_sample_rejected(
        DDS::DataReader_ptr,
        const DDS::SampleRejectedStatus&) {}
 
    virtual void on_liveliness_changed(
        DDS::DataReader_ptr,
        const DDS::LivelinessChangedStatus&) {}
 
    virtual void on_subscription_matched(
        DDS::DataReader_ptr,
        const DDS::SubscriptionMatchedStatus&) {}
 
    virtual void on_sample_lost(
        DDS::DataReader_ptr,
        const DDS::SampleLostStatus&) {}

The data sample processing is handled by on_data_available(). If a data sample is successfully received from OpenDDS, and valid_data is true, it is passed to Process() to be consumed by the application. The valid_data flag is true only when data is associated with the sample, as the sample that was taken may only represent a change of instance state, and not actual data.

The sample processing proceeds until take_next_sample() returns DDS::RETCODE_NO_DATA, or an error has occurred. Only samples containing valid data will be processed — for instance, since valid_data is false for a disposed sample, disposed samples will be ignored. A while loop is used as performance is improved if all samples currently available are taken, rather than taking just one sample on each invocation of on_data_available.

    virtual void Process(const TStructure &) = 0;
    virtual void on_data_available(DDS::DataReader_ptr reader) {
 
        typename TDataReader::_var_type reader_i = TDataReader::_narrow(reader);
 
        // if an exception is thrown here, it is consumed by the transport thread, 
        // so ignore narrowing errors at this time
        if (0 == reader_i)
            return;
 
        TStructure sample;
        DDS::SampleInfo info;
 
        DDS::ReturnCode_t status = reader_i->take_next_sample(sample, info);
 
        while (status == DDS::RETCODE_OK) {
            if (info.valid_data)
                Process(sample);
            status = reader_i->take_next_sample(sample, info);
        } 
    }
};

CreateReader() and CreateWriter() return wrappers for the corresponding DDS entities. These wrappers maintain a reference to the contained entity, and provide a minimal interface for interacting with the wrapped entity, to keep things simple.

As the Writer wrapper for a DataWriter is specific to the type of DataWriter corresponding to the data sample type being written, it must be parameterized with the appropriate TypeTraits key.

// Common/Writer.h
template <typename TStructure>
class Writer {
    typedef typename TypeTraits<TStructure>::Writer WriterType;
    typename WriterType::_var_type writer_i;
public:
    Writer() : writer_i(0) {}
 
    Writer(DDS::DataWriter_ptr writer) : 
      writer_i(WriterType::_duplicate(WriterType::_narrow(writer))) {}

For aid in testing, helper functions are used to manage publication. These methods are based on ones used in various OpenDDS tests that are part of the OpenDDS software distribution. WaitForSubscriber() does not return until at least one subscriber has been matched with this publisher — this allows a publisher to block until a subscriber is available to receive the data samples. The code waits on a "publication matched" condition, where the wait continues until at least a match count of one is achieved.

   bool WaitForSubscriber() {
        // Block until Subscriber is available
        DDS::StatusCondition_var condition = writer_i->get_statuscondition();
        condition->set_enabled_statuses(DDS::PUBLICATION_MATCHED_STATUS);
 
        DDS::WaitSet_var ws = new DDS::WaitSet;
        ws->attach_condition(condition);
 
        DDS::ConditionSeq conditions;
        DDS::PublicationMatchedStatus matches = { 0, 0, 0, 0, 0 };
        DDS::Duration_t timeout = { 30, 0 };  // 30 seconds
 
        do {
            if (ws->wait(conditions, timeout) != DDS::RETCODE_OK)
                return false;
 
            if (writer_i->get_publication_matched_status(matches) != ::DDS::RETCODE_OK)
                return false;
 
        } while (matches.current_count < 1);
 
        ws->detach_condition(condition);
 
        return true;
    }

Conversely, WaitForAcknowledgements() can be used for clean termination of a publisher to ensure that all data samples have been received by a subscriber.

    bool WaitForAcknowledgements() {
        // Wait for samples to be acknowledged
        DDS::Duration_t timeout = { 30, 0 };  // 30 seconds
        return (writer_i->wait_for_acknowledgments(timeout) == DDS::RETCODE_OK);
    }

Finally, Write() is used to publish a data sample. register_instance is used to obtain an instance handle, to improve publication performance.

   DDS::ReturnCode_t Write(const TStructure &s) {
        return writer_i->write(s, ::DDS::HANDLE_NIL);    
    }
};

The structure of the Reader wrapper for a DataReader is analagous to that of Writer. It also stores the specific DataReader type, so must be templatized.

// Common/Reader.h
template <typename TStructure, typename TListener>
class Reader {
    typedef typename TypeTraits<TStructure>::Reader ReaderType;
    typename ReaderType::_var_type reader_i;
    std::tr1::shared_ptr<TListener> listener_;
 
public:
    Reader() {}
 
    Reader(DDS::DataReader_ptr reader, TListener *listener) : 
        reader_i(ReaderType::_duplicate(ReaderType::_narrow(reader))),
        listener_(listener) {}
 
    TListener *GetListener() { return listener_.get(); }

Reader provides a helper function, WaitForPublisherToComplete(), to ensure that a publisher has disconnected — the code loops until the count of matched subscriptions is zero.

   bool WaitForPublisherToComplete() {
        // Block until Publisher completes
        DDS::StatusCondition_var condition = reader_i->get_statuscondition();
        condition->set_enabled_statuses(DDS::SUBSCRIPTION_MATCHED_STATUS);
 
        DDS::WaitSet_var ws = new DDS::WaitSet;
        ws->attach_condition(condition);
 
        DDS::ConditionSeq conditions;
        DDS::SubscriptionMatchedStatus matches = { 0, 0, 0, 0, 0 };
        DDS::Duration_t timeout = { 30, 0 }; // 30 seconds
 
        do {
            if (ws->wait(conditions, timeout) != DDS::RETCODE_OK)
                return false;
 
            if (reader_i->get_subscription_matched_status(matches) != DDS::RETCODE_OK)
                return false;
 
        } while (matches.current_count > 0);
 
        ws->detach_condition(condition);
 
        return true;
    }
};

Although Writer provided a Write() method, no corresponding Read() method is needed — data samples are retrieved by the listener that was associated with the contained DataReader.

One more generic class is part of the wrapper — a generic container for Reader and Writerobjects. The EntityMap stores entities based on their associated domain and topic. This class manages a map of maps — given a DomainId, a map is retrieved which associates topics with entities. If the desired entity is not found, a generic Create() function is called to instantiate it, before it is added to the map and returned to the caller.

// Common/EntityMap.h
template <typename TEntity>
class EntityMap {
    typedef std::map<std::string, TEntity> TTopicEntityMap;
    typedef std::map<::DDS::DomainId_t, TTopicEntityMap> TDomainEntityMap;
 
    TDomainEntityMap map_;
 
protected:
    virtual TEntity Create(DDSBase *ddsBase, ::DDS::DomainId_t domainId, 
        const char *topic_name) = 0;
 
public:
    TEntity Get(DDSBase *ddsBase,::DDS::DomainId_t domainId, const char *topic_name) {
        TDomainEntityMap::iterator itDomain = map_.find(domainId);
        if (itDomain == map_.end())  
            // domain entry not found, so create topic map and entity
            map_[domainId][topic_name] = Create(ddsBase, domainId, topic_name);
        else {
            TTopicEntityMap::iterator itTopic = map_[domainId].find(topic_name);
            if (itTopic == map_[domainId].end())  
                // domain found, but topic not found, so create entity
                map_[domainId][topic_name] = Create(ddsBase, domainId, topic_name);
        }
        return map_[domainId][topic_name];
    }
};

A Concrete Example

Now that the generic wrapper has been described, we can see how to use in a specific case — that of the Messenger application as described in the OpenDDS Developer's Guide [5].

As defined in Messenger.idl in the Messenger_IDL project, the structure representing the Message type of data sample is as follows:

// Messenger_IDL/Messenger.idl
module Messenger {
 
#pragma DCPS_DATA_TYPE "Messenger::Message"
#pragma DCPS_DATA_KEY "Messenger::Message subject_id"
 
    struct Message {
        string from;
        string subject;
        long subject_id;
        string text;
        long count;
    };
};

After compiling Messenger.idl with the OpenDDS IDL compiler tool chain, a number of files are produced that contain various generated entities. MessengerC.h contains a definition of struct Message, in the Messenger namespace, which is an IDL-to-C++ mapping of the Message IDL structure. The Message structure is as follows:

// MessengerC.h
namespace Messenger
{
...
    struct Message
    {
        typedef Message_var _var_type;
        typedef Message_out _out_type;
 
        TAO::String_Manager from;
        TAO::String_Manager subject;
        ::CORBA::Long subject_id;
        TAO::String_Manager text;
        ::CORBA::Long count;
    };
...
}

The file MessengerTypeSupportC.h provides definitions of classes MessageTypeSupportMessageDataReader, and MessageDataWriter, also in the Messenger namespace. Class MessageTypeSupport provides the type support for the Message structure, and MessageDataReader and MessageDataWriter provide Message-based specializations of the DDS entities DataReader and DataWriter respectively. It is these types that we will use with the generic wrapper to provide a simplified publish and subscribe interface for Messenger data samples.

To provide a concrete wrapper, we create the project Messenger_CPP_DDSImpLib, containing the files generated by the OpenDDS IDL compiler tool chain from Messenger.idl, plus DDSImpl.h and DDSImpl.cpp. As our objective is to provide a simple interface for applications that use the .NET Framework, we begin by defining a .NET version of the Message structure, and functions to convert from the unmanaged C++ representation to the managed, C++/CLI one. In DDSImpl.h, we define the class MessageNet, with corresponding .NET types. For clarity, we also maintain a parallel namespace/class representation.

// Messenger_CPP_DDSImpLib/DDSImpl.h
namespace MessengerNet {
    public value class MessageNet
    {
    public:
        System::String^ from;
        System::String^ subject;
        System::Int32 subject_id;
        System::String^ text;
        System::Int32 count;
    };
}

TypeTraits must be defined for the Message structure, and it is done with one line of code:

DEFINE_TYPETRAITS(Messenger::Message)

We add prototypes for the functions to convert from the Messenger to the MessengerNet types, and vice versa.

// Messenger_CPP_DDSImpLib/DDSImpl.h
Messenger::Message Convert(const gcroot<MessengerNet::MessageNet> &sampleParam);
gcroot<MessengerNet::MessageNet> Convert(const Messenger::Message &sample);

These conversion functions are implemented in DDSImpl.cpp and are simple copies from one representation to the other.

// Messenger_CPP_DDSImpLib/DDSImpl.cpp
Messenger::Message Convert(const gcroot<MessengerNet::MessageNet> &sampleNetParam) {
    MessengerNet::MessageNet sampleNet = (MessengerNet::MessageNet)sampleNetParam;
    Messenger::Message sample;
 
    sample.subject_id = sampleNet.subject_id;
    sample.from = Convert(sampleNet.from);
    sample.subject = Convert(sampleNet.subject);
    sample.text = Convert(sampleNet.text);
    sample.count = sampleNet.count;
 
    return sample;
}
 
gcroot<MessengerNet::MessageNet> Convert(const Messenger::Message &sample) {
    MessengerNet::MessageNet sampleNet;
 
    sampleNet.subject_id = sample.subject_id;
    sampleNet.from = Convert(sample.from);
    sampleNet.subject = Convert(sample.subject);
    sampleNet.text = Convert(sample.text);
    sampleNet.count = sample.count;
 
    return sampleNet;
}

The string conversion functions are available in the Common library in StringConvert.[h,cpp].

We now implement subclasses of the generic C++ wrapper classes, specialized for the Messenger type. First, we subclass DataReaderListenerImplBase such that the processing of a data sample causes it to be fired as a .NET event. For more information on this technique, or for the use of OpenDDS with .NET in general, see [6]. The Process() method is overridden from the base class to post the data sample as an event, and an AddHandler() method is provided to allow a .NET event handler to be attached to the listener. That is, when the event is posted, the handler is invoked with the event as a parameter. The event support classes are implemented in Common.cpp in the Common library.

// Messenger_CPP_DDSImpLib/DDSImpl.h
class MessengerMessageDataReaderListenerImpl : 
    public DataReaderListenerImplBase<Messenger::MessageDataReader, Messenger::Message> {
    gcroot<EventManager<MessengerNet::MessageNet>^> eventManager_;
public:
    MessengerMessageDataReaderListenerImpl(
        gcroot<EventManager<MessengerNet::MessageNet>^> eventManager) : 
            eventManager_(eventManager) {}
 
    void Process(const Messenger::Message &sample) {
        eventManager_->Process(eventManager_, 
            gcnew ProcessEventArgs<MessengerNet::MessageNet>(Convert(sample)));
    }
 
    void AddHandler(gcroot<EventManager<MessengerNet::MessageNet>::ProcessEventHandler^> handler) {
        eventManager_->Process += handler;
    }
};

Next, we implement subclasses of the EntityMap class to manage instances of MessageDataReader and MessageDataWriter. The map lookup logic is provided in the base class, but the Create() methods are overloaded to instantiate the correct type.

// Messenger_CPP_DDSImpLib/DDSImpl.h
typedef Writer<Messenger::Message> MessengerMessageDataWriter;
typedef Reader<Messenger::Message, MessengerMessageDataReaderListenerImpl> 
    MessengerMessageDataReader;
 
class MessengerMessageDataWriterMap : public EntityMap<MessengerMessageDataWriter> {
    virtual MessengerMessageDataWriter Create(DDSBase *ddsBase, 
        ::DDS::DomainId_t domainId, const char *topic_name) {
        return ddsBase->CreateWriter<Messenger::Message>(domainId, topic_name);
    }
};
 
class MessengerMessageDataReaderMap : public EntityMap<MessengerMessageDataReader> {
    virtual MessengerMessageDataReader Create(DDSBase *ddsBase, 
        ::DDS::DomainId_t domainId, const char *topic_name) {
        return ddsBase->CreateReader<Messenger::Message>(domainId, topic_name, 
            new MessengerMessageDataReaderListenerImpl(
                gcnew EventManager<MessengerNet::MessageNet>()));
    }
};

We now create a subclass of DDSBase that is specialized for the Messenger type. The implementation of each of the methods is a one-liner delegation to the appropriate contained entity.

// Messenger_CPP_DDSImpLib/DDSImpl.h
class DDSImpl : public DDSBase {
    MessengerMessageDataWriterMap mapMessengerMessageDataWriter_;
    MessengerMessageDataReaderMap mapMessengerMessageDataReader_;
 
public:
    DDSImpl(int argc, ACE_TCHAR *argv[]) : DDSBase(argc, argv) {}
 
    DDS::ReturnCode_t Publish(::DDS::DomainId_t domainId, const char *topic_name, 
        gcroot<MessengerNet::MessageNet> sample) {
        return mapMessengerMessageDataWriter_.Get(
            this, domainId, topic_name).Write(Convert(sample));
    }
 
    bool MessengerMessageWaitForSubscriber(::DDS::DomainId_t domainId, 
        const char *topic_name) { 
        return mapMessengerMessageDataWriter_.Get(
            this, domainId, topic_name).WaitForSubscriber(); 
    }
 
    bool MessengerMessageWaitForAcknowledgements(::DDS::DomainId_t domainId, 
        const char *topic_name) { 
        return mapMessengerMessageDataWriter_.Get(
            this, domainId, topic_name).WaitForAcknowledgements(); 
    }
 
    void Subscribe(::DDS::DomainId_t domainId, const char *topic_name, 
        gcroot<EventManager<MessengerNet::MessageNet>::ProcessEventHandler^> handler) {
        mapMessengerMessageDataReader_.Get(
            this, domainId, topic_name).GetListener()->AddHandler(handler);
    }
 
    bool MessengerMessageWaitForPublisherToComplete(::DDS::DomainId_t domainId, 
        const char *topic_name) {
        return mapMessengerMessageDataReader_.Get(
            this, domainId, topic_name).WaitForPublisherToComplete();
    }
};

One more level of indirection is needed. The class DDSImpl provides a simple publish/subscribe interface for C++, but the method parameters are not compatible with the .NET Framework Common Type System [7]. A .NET ref class DDSNet wraps DDSImpl and provides the required interface.

// Messenger_CPP_DDSImpLib/DDSImpl.h
public ref class DDSNet {
    DDSImpl *pDDSImpl;
 
public:
    DDSNet() {
        // convert .NET arguments to standard argc/argv
        int argc;
        char **argv;
        GetArguments(argc, argv);
        pDDSImpl = new DDSImpl(argc, argv);
    }
    ~DDSNet() { 
        delete pDDSImpl;
    }
 
    int Publish(int domainId, System::String^ topic_name, MessengerNet::MessageNet sample) {
        NET_RETHROW_EXCEPTION(return pDDSImpl->Publish(domainId, \
            ConvertToString(topic_name).c_str(), sample));
    }
 
    bool MessengerMessageWaitForSubscriber(int domainId, System::String^ topic_name) {
        NET_RETHROW_EXCEPTION(return pDDSImpl->MessengerMessageWaitForSubscriber(domainId, \
            ConvertToString(topic_name).c_str()));
    }
 
    bool MessengerMessageWaitForAcknowledgements(int domainId, System::String^ topic_name) { 
        NET_RETHROW_EXCEPTION(return pDDSImpl->MessengerMessageWaitForAcknowledgements(domainId, \
            ConvertToString(topic_name).c_str()));
    }
 
    void Subscribe(int domainId, System::String^ topic_name, 
        EventManager<MessengerNet::MessageNet>::ProcessEventHandler^ handler) {
        NET_RETHROW_EXCEPTION(pDDSImpl->Subscribe(domainId, \
            ConvertToString(topic_name).c_str(), handler));
    }
 
    bool MessengerMessageWaitForPublisherToComplete(int domainId, System::String^ topic_name) {
        NET_RETHROW_EXCEPTION(return pDDSImpl->MessengerMessageWaitForPublisherToComplete(domainId, \
            ConvertToString(topic_name).c_str()));
    }
};

NET_RETHROW_EXCEPTION is a macro to aid in rethrowing exceptions raised by the unmanaged code as managed, .NET exceptions, where:

// Common/NetException.h
#define NET_RETHROW_EXCEPTION(x) \
    try { \
        x; \
    } \
    catch(std::exception &e) { \
        throw gcnew DDSNetException(gcnew System::String(e.what())); \
    }\

and:

// Common/NetException.cpp
public ref class DDSNetException : public Exception {
public:
    DDSNetException(String^ what) : Exception(what) {}
};

The diagram below illustrates the relationships between the classes that were developed above. Please click on the image for a larger version.

classes

Messenger in C#

Now that the .NET interface has been established, as a test, we can replicate the behavior of the C++ Messenger sample in C#. In the Messenger_CS_Publisher project, we create the file Publisher.cs, and begin a standard console-mode C# application. Although this application does not use COM interop, Main() is marked with [STAThread] to set a single threaded apartment COM threading model, which is emitted by default by the Visual Studio project generator.

// Messenger_CS_Publisher/Publisher.cs
using System;
using System.Collections.Generic;
 
namespace Publisher
{
    static class Program
    {
        [STAThread]
        static void Main()
        {
            try
            {

We instantiate an object of the DDSNet class.

DDSNet dds = new DDSNet();

We then wait for a subscriber on the "Movie Discussion List" topic in domain 42.

 dds.MessengerMessageWaitForSubscriber(42, "Movie Discussion List");

Once a subscriber is found, we publish reviews in the same manner as the Messenger example.

                for (int i = 0; i < 10; i++) {
                    MessengerNet.MessageNet messageNet;
                    messageNet.subject_id=99;
                    messageNet.from = "Comic Book Guy";
                    messageNet.subject = "Review";
                    messageNet.text = "Worst. Movie. Ever.";
                    messageNet.count = i;
 
                    dds.Publish(42, "Movie Discussion List", messageNet);
                }

We then wait for the subscriber to receive the messages, free the DDSNet instance, and exit.

                dds.MessengerMessageWaitForAcknowledgements(42, "Movie Discussion List");
                dds.Dispose();
            }
            catch (Exception e)
            {
                Console.WriteLine(e);
                throw;
            }
        }
    }
}

The publisher is thus reduced from many lines of code in the C++ example to two helper methods to align the publisher with the subscriber for test purposes, and one method to publish a data sample.

The subscriber is as straightforward. In the Messenger_CS_Subscriber project, we create the file Subscriber.cs. We implement a class with a method which is called when a data sample arrives — in this case, the contents of the sample are printed to the console.

// Messenger_CS_Subscriber/Subscriber.cs
using System;
using System.Collections.Generic;
 
namespace Subscriber
{
    public class Print {
        public void MessengerNetMessageNetEventHandler(Object sender, 
            ProcessEventArgs<MessengerNet.MessageNet> args) {
            Console.WriteLine("MessageNetEventHandler:    subject = {0}", 
                args.Sample.subject);
            Console.WriteLine("MessageNetEventHandler: subject_id = {0}", 
                args.Sample.subject_id);
            Console.WriteLine("MessageNetEventHandler:       from = {0}", 
                args.Sample.from);
            Console.WriteLine("MessageNetEventHandler:      count = {0}", 
                args.Sample.count);
            Console.WriteLine("MessageNetEventHandler:       text = {0}", 
                args.Sample.text);
        }
    };

Next, we begin the Program class as before, and instantiate an object of type DDSNet.

    static class Program
    {
        [STAThread]
        static void Main()
        {
            try
            {
                DDSNet dds = new DDSNet();

We establish the MessengerNetMessageNetEventHandler() method as the subscriber's listener, for the "Movie Discussion List" on domain 42.

           dds.Subscribe(42, "Movie Discussion List",
                    new EventManager<MessengerNet.MessageNet>.ProcessEventHandler(
                        new Print().MessengerNetMessageNetEventHandler));

We allow the subscriber to process samples until the publisher exits, free the DDSNet instance, and exit.

                dds.MessengerMessageWaitForPublisherToComplete(42, 
                    "Movie Discussion List");
                dds.Dispose();
            }
            catch (Exception e)
            {
                Console.WriteLine(e);
                throw;
            }
        }
    }
}

The subscriber consists of one helper method to align the subscriber with the publisher for test purposes, and one method to establish the subscription itself.

Running run_test_Messenger.pl from the Test subdirectory will execute the DCPSInfoRepo, C# publisher and subscriber, and demonstrate that the output matches the Messenger sample in the OpenDDS Developer's Guide.

Summary

As demonstrated in the article, the OpenDDS interface can be reduced to a simple publish/subscribe. Although rote, it is burdensome to generate the concrete classes by hand for the data sample type. Part II [8] of this article will show how the concrete classes can be generated automatically based on a user-defined type used for the data sample, to make using OpenDDS with .NET even easier.

References

[1] OpenDDS
http://www.opendds.org/

[2] Introduction to OpenDDS
http://www.opendds.org/Article-Intro.html

[3] The RAII Programming Idiom
http://www.hackcraft.net/raii/

[4] Using QuickFAST and OpenDDS for a Low Latency Market Data Feed
http://mnb.ociweb.com/mnb/MiddlewareNewsBrief-201001.html

[5] OpenDDS Developer's Guide
http://downloads.ociweb.com/OpenDDS/OpenDDS-latest.pdf

[6] Using TAO and OpenDDS with .NET, Part II
http://mnb.ociweb.com/mnb/MiddlewareNewsBrief-200902.html

[7] Common Type System
http://msdn.microsoft.com/en-us/library/zcx1eb1e%28VS.90%29.aspx

[8] Code Generation with OpenDDS, Part II
http://mnb.ociweb.com/mnb/MiddlewareNewsBrief-201007.html

The Middleware News Brief is a periodic newsletter. The purpose and intent of this publication is to advance and inform on features, news, and technical information about Open Source, middleware technologies (e.g., TAO, OpenDDS, JacORB), including case studies and other technical content.

© Copyright Object Computing, Inc. 1993, 2016. All rights reserved

Subscribe

secret