June 02, 2010 - By Charles Calkins, OCI Senior Software Engineer
Middleware News Brief (MNB) features news and technical information about Open Source middleware technologies.
INTRODUCTION
Applications that use the Data Distribution Service (DDS) typically have two elements in common:
-
Boilerplate code. The sequence of steps to initialize the DDS framework, and to create and destroy domain participants, is the same from project to project. Simplifying the code to write for the application's skeleton reduces development time.
-
Knowledge of IDL and C++. DDS implementations such as OpenDDS [1] are written in C++ and require structures used as data samples to be described in the Object Management Group's Interface Definition Language (IDL). Allowing a developer to write code which uses DDS in their language of choice, rather than in IDL and C++, can lead to a shorter learning curve and wider use of DDS as a technology.
Parts I and II of this article explore one approach to making DDS simpler to use, and to make it accessible to a wider array of development languages. We shall use OpenDDS as our DDS implementation, and goals we shall achieve are as follows:
-
Create a C++-based wrapper for OpenDDS to simplify written code to consist solely of publish and subscribe functions.
-
Provide a means to automatically generate these wrappers from user-defined data types (in Part II).
-
Provide a layer which exposes the generated code to the .NET world.
-
Demonstrate how the .NET interface to OpenDDS can be used by code written in arbitrary .NET languages.
While this article focuses on a particular wrapping of OpenDDS functionality that is made available for software using the .NET Framework, this same technique is applicable in a wider sense — code generators could be written to encapsulate OpenDDS for other target languages and environments of choice.
OPENDDS REVIEW
While OpenDDS is described in detail in [2] and elsewhere, a short review of the major elements is beneficial.
Given the definition of a data sample expressed in IDL, the OpenDDS IDL compiler tool chain generates several code elements. The elements of interest for this article include the following:
-
A structure that provides a C++ representation of the member variables of the data sample.
-
An associated type support class which provides operations such as duplication, narrowing, and registration of the type in a
DomainParticipant
. -
Specializations of
DataReader
andDataWriter
for the data sample.
Data samples are not used in isolation — they are the data representation of a topic, associated with a DDS domain. In OpenDDS, given a numeric DomainId
that represents a DDS domain, the application creates a DomainParticipant
for that DomainId
. The generated type support class for the data sample is used to register the type with the DomainParticipant
, and, given a topic name, the create_topic()
method of the DomainParticipant
associates the type with the topic.
Through the factory TheTransportFactory
the application creates a transport, such as SimpleTcp
or multicast
, used for the transmission of data samples.
If an application publishes data samples, the DomainParticipant
is used to create a DDS Publisher
. The Publisher
is attached to a transport, and then used to create a DataWriter
for a particular topic. The DataWriter
is narrowed to a DataWriter
specific for the data sample type (as generated by the OpenDDS IDL compiler tool chain), and its write()
method is used to publish a data sample.
If an application consumes data samples, the DomainParticipant
is used to create a DDS Subscriber
. The Subscriber
is attached to a transport, and then used to create a DataReader
for a particular topic. The DataReader
is narrowed to a DataReader
specific for the data sample type (as generated by OpenDDS IDL compiler tool chain), and the on_data_available()
method of a listener associated with the DataReader
is called as data samples arrive.
Upon application termination, all entities derived from a given DomainParticipant
must be deleted before the DomainParticipant
itself is. When all DomainParticipant
s have been freed, TheTransportFactory
must be released, and TheServiceParticipant
shut down.
A GENERIC C++ WRAPPER
We wish to distil the complexity described in the previous section into a simple publish and subscribe interface, both for conceptual understanding and later code generation. Features of the wrapper will be highlighted here — the full implementation is available in the Common
library of the code archive that accompanies this article.
To begin the wrapper, we will create a C++ class, DDSBase
, with two goals in mind. The first is to manage initialization and cleanup — these will be performed in the constructor and destructor of the class, respectively, as per the RAII programming idiom [3]. The other is to keep track of the various DDS entities — when an entity is to be created, a list is first consulted to see if the entity has been created before. If it has, it is retrieved and a reference to it returned to the caller. Otherwise, the entity is created, added to the list, and a reference returned to the caller.
In the code below, the TheParticipantFactoryWithArgs()
function is called in the constructor of DDSBase
to initialize OpenDDS. A std::map
is used to store DomainParticipant
s that have been created, and the contents of this map are freed in the destructor of DDSBase
. The destructor also frees TheTransportFactory
and TheServiceParticipant
.
// Common/DDSBase.h
class DDSBase {
typedef std::map<::DDS::DomainId_t, DDS::DomainParticipant_var>
DomainParticipantMap;
DDS::DomainParticipantFactory_var dpf_;
DomainParticipantMap participantMap_;
public:
DDSBase(int argc, ACE_TCHAR *argv[]) {
dpf_ = TheParticipantFactoryWithArgs(argc, argv);
}
~DDSBase() {
for (DomainParticipantMap::iterator it=participantMap_.begin();
it!=participantMap_.end(); it++) {
it->second->delete_contained_entities();
dpf_->delete_participant(it->second.in());
}
TheTransportFactory->release();
TheServiceParticipant->shutdown();
}
Next, we provide a method to retrieve a DomainParticipant
, given a DomainId
. As per the pattern described previously, if a matching participant already exists in the participant map, it is returned, else a new one is created. The DomainParticipant
is returned as a _ptr
instead of a _var
because the lifetime of the participant is managed by this class.
We will assume that the default quality of service (QoS) for each DDS entity, such as the DomainParticipant
here, is sufficient for our purposes. Adding QoS policies would be straightforward (i.e., provide a list of QoS policies to the method to apply, where the list becomes part of the map's key to distinguish otherwise identical entities), but the exercise is left for the reader.
protected:
DDS::DomainParticipant_ptr GetParticipant(::DDS::DomainId_t domainId) {
DomainParticipantMap::iterator it=participantMap_.find(domainId);
if (it==participantMap_.end()) {
::DDS::DomainParticipant_ptr participant = dpf_->create_participant(
domainId,
PARTICIPANT_QOS_DEFAULT,
0,
OpenDDS::DCPS::DEFAULT_STATUS_MASK);
if (0 == participant)
throw DDSException("DDSBase::GetParticipant(): create_participant failed");
participantMap_[domainId] = participant;
}
return ::DDS::DomainParticipant::_duplicate(participantMap_[domainId].in());
}
This method does both the search for a previously-created DomainParticipant
, and creating it if not found, as described above. The methods for the creation of DataReader
s and DataWriter
s, however, will be performed in two stages. The CreateReader()
and CreateWriter()
methods represent the "create if missing" aspect, while the search for a previously created one will be performed elsewhere.
We also define a class, DDSException
, a subclass of std::exception
, to represent errors in the process.
// Common/DDSBase.h
class DDSException : public std::exception {
public:
DDSException(const char *const& _What) : std::exception(_What) {}
};
Continuing with class DDSBase
, we desire methods to be able to create any DataReader
and DataWriter
, so must be templatized. As many unrelated types are needed, they can be collected into a TypeTraits
class, identified by a single template parameter that is representative of the collection. This technique is also demonstrated in the code of an earlier Middleware News Brief. [4]
In the file TypeTraits.h
we define a templated struct, TypeTraits
, to reference the various DDS entities associated with a given data sample structure. Unlike in the aforementioned MNB, however, we define this template via a macro to reduce the amount of code that needs to be replicated later in the application.
// TypeTraits.h
template <typename DDS_STRUCT_T>
struct TypeTraits
{
};
#define DEFINE_TYPETRAITS(TYPE) \
template <> \
struct TypeTraits<TYPE> \
{ \
typedef TYPE##TypeSupport TypeSupport; \
typedef TYPE##TypeSupportImpl TypeSupportImpl; \
typedef TYPE##DataWriter Writer; \
typedef TYPE##DataReader Reader; \
};
Returning to DDSBase
, we can now define CreateWriter()
and CreateReader()
methods with just one template parameter which then provides access to the additional types. The return from CreateWriter()
is a wrapper around a type-specific DataWriter
for the given domain and topic.
template <typename TStructure>
Writer<TStructure> CreateWriter(::DDS::DomainId_t domainId,
const char *topic_name) {
We obtain the DomainParticipant
for the domain, creating it if needed.
DDS::DomainParticipant_var participant = GetParticipant(domainId);
Next, we use the type support class to register the type in the domain, and to provide the type name for topic creation. Upon failure, an exception will be thrown.
// Create Topic
typename TypeTraits<TStructure>::TypeSupport::_var_type ts =
new TypeTraits<TStructure>::TypeSupportImpl();
if (ts->register_type(participant, "") != DDS::RETCODE_OK)
throw DDSException("DDSBase::CreateWriter(): register_type failed");
CORBA::String_var type_name = ts->get_type_name();
DDS::Topic_var topic =
participant->create_topic(topic_name,
type_name.in(),
TOPIC_QOS_DEFAULT,
0,
OpenDDS::DCPS::DEFAULT_STATUS_MASK);
if (0 == topic)
throw DDSException("DDSBase::CreateWriter(): create_topic failed");
The Publisher
is then created, with a default quality of service.
// Create Publisher
DDS::Publisher_var publisher =
participant->create_publisher(PUBLISHER_QOS_DEFAULT,
0,
OpenDDS::DCPS::DEFAULT_STATUS_MASK);
if (0 == publisher)
throw DDSException("DDSBase::CreateWriter(): create_publisher failed");
We now create a transport that uses the SimpleTcp
protocol, and attach it to the Publisher
— hard-coding a particular transport is a way to simplify the wrapper's interface, but could be parameterized if needed. As SimpleTcp
provides reliable sample delivery, it is an appropriate choice.
// Initialize and attach Transport
OpenDDS::DCPS::TransportImpl_rch transport_impl =
TheTransportFactory->create_transport_impl(
OpenDDS::DCPS::DEFAULT_SIMPLE_TCP_ID,
OpenDDS::DCPS::AUTO_CONFIG);
if (transport_impl->attach(publisher.in()) != OpenDDS::DCPS::ATTACH_OK)
throw DDSException("DDSBase::CreateWriter(): transport_impl->attach failed");
Finally, we create the type-specific DataWriter
by creating a DataWriter
which is narrowed to the desired type. The DataWriter
is then returned to the caller.
// Create DataWriter
DDS::DataWriter_var writer =
publisher->create_datawriter(topic.in(),
DATAWRITER_QOS_DEFAULT,
0,
OpenDDS::DCPS::DEFAULT_STATUS_MASK);
if (0 == writer)
throw DDSException("DDSBase::CreateWriter(): create_datawriter failed");
return Writer<TStructure>(writer.in());
}
The CreateReader()
method will complete the DDSBase
class. As with CreateWriter()
, we want this function to be generic as well. Two template parameters are needed — the key for the TypeTraits
, and an additional type, representing a listener for the DataReader
is provided, which will be described later in this article.
template <typename TStructure, typename TListener>
Reader<TStructure, TListener> CreateReader(
::DDS::DomainId_t domainId, const char *topic_name,
TListener *listener) {
Code to obtain a DomainParticipant
and to register the type are the same as in CreateWriter()
.
DDS::DomainParticipant_var participant = GetParticipant(domainId);
// Create Topic
typename TypeTraits<TStructure>::TypeSupport::_var_type ts =
new TypeTraits<TStructure>::TypeSupportImpl();
if (ts->register_type(participant, "") != DDS::RETCODE_OK)
throw DDSException("DDSBase::CreateReader(): register_type failed");
CORBA::String_var type_name = ts->get_type_name();
DDS::Topic_var topic =
participant->create_topic(topic_name,
type_name.in(),
TOPIC_QOS_DEFAULT,
0,
OpenDDS::DCPS::DEFAULT_STATUS_MASK);
if (0 == topic)
throw DDSException("DDSBase::CreateReader(): create_topic failed");
We now perform actions in parallel to those in CreateWriter()
, but as a subscriber — create a Subscriber
, and create and attach the SimpleTcp
transport.
// Create Subscriber
DDS::Subscriber_var subscriber =
participant->create_subscriber(SUBSCRIBER_QOS_DEFAULT,
0,
OpenDDS::DCPS::DEFAULT_STATUS_MASK);
if (0 == subscriber)
throw DDSException("DDSBase::CreateReader(): create_subscriber failed");
// Initialize and attach Transport
OpenDDS::DCPS::TransportImpl_rch transport_impl =
TheTransportFactory->create_transport_impl(
OpenDDS::DCPS::DEFAULT_SIMPLE_TCP_ID,
OpenDDS::DCPS::AUTO_CONFIG);
OpenDDS::DCPS::AttachStatus status = transport_impl->attach(subscriber.in());
if (status != OpenDDS::DCPS::ATTACH_OK)
throw DDSException("DDSBase::CreateReader(): transport_impl->attach failed");
We create a DataReader
, specifying the listener, and a non-default quality of service. The RELIABILITY QoS policy is set to RELIABLE, instead of the default of BEST_EFFORT. Later, we will use wait_for_acknowledgements()
for the publisher to ensure that all published data has been received, and this operation requires that a DataReader
must be RELIABLE. As of this writing (OpenDDS v2.1.3), the OpenDDS Developer's Guide shows the Messenger
application using wait_for_acknowledgements()
with a BEST_EFFORT DataReader
, which is not correct.
// Create DataReader - set RELIABLE reliability
DDS::DataReaderQos dr_qos;
if (subscriber->get_default_datareader_qos(dr_qos) != ::DDS::RETCODE_OK)
throw DDSException("DDSBase::CreateReader(): get_default_datareader_qos failed");
dr_qos.reliability.kind = DDS::RELIABLE_RELIABILITY_QOS;
DDS::DataReader_var reader =
subscriber->create_datareader(topic.in(),
dr_qos,
listener,
OpenDDS::DCPS::DEFAULT_STATUS_MASK);
if (0 == reader)
throw DDSException("DDSBase::CreateReader(): create_datareader failed");
Finally, we return a wrapped, narrowed DataReader
to the caller.
return Reader<TStructure, TListener>(reader, listener);
}
};
As with CreateReader()
and CreateWriter()
, at this level we also want the listener for the DataReader
to be generic, so it too is templatized and has a data processing method which must be overridden in a subclass. The DataReaderListenerImplBase
class also provides empty implementations for various listener methods that are not needed by the wrapper.
// Common/DataReaderListenerImplBase.h
template <typename TDataReader, typename TStructure>
class DataReaderListenerImplBase
: public virtual OpenDDS::DCPS::LocalObject<DDS::DataReaderListener> {
public:
virtual void on_requested_deadline_missed(
DDS::DataReader_ptr,
const DDS::RequestedDeadlineMissedStatus&) {}
virtual void on_requested_incompatible_qos(
DDS::DataReader_ptr,
const DDS::RequestedIncompatibleQosStatus&) {}
virtual void on_sample_rejected(
DDS::DataReader_ptr,
const DDS::SampleRejectedStatus&) {}
virtual void on_liveliness_changed(
DDS::DataReader_ptr,
const DDS::LivelinessChangedStatus&) {}
virtual void on_subscription_matched(
DDS::DataReader_ptr,
const DDS::SubscriptionMatchedStatus&) {}
virtual void on_sample_lost(
DDS::DataReader_ptr,
const DDS::SampleLostStatus&) {}
The data sample processing is handled by on_data_available()
. If a data sample is successfully received from OpenDDS, and valid_data
is true, it is passed to Process()
to be consumed by the application. The valid_data
flag is true only when data is associated with the sample, as the sample that was taken may only represent a change of instance state, and not actual data.
The sample processing proceeds until take_next_sample()
returns DDS::RETCODE_NO_DATA
, or an error has occurred. Only samples containing valid data will be processed — for instance, since valid_data
is false for a disposed sample, disposed samples will be ignored. A while
loop is used as performance is improved if all samples currently available are taken, rather than taking just one sample on each invocation of on_data_available
.
virtual void Process(const TStructure &) = 0;
virtual void on_data_available(DDS::DataReader_ptr reader) {
typename TDataReader::_var_type reader_i = TDataReader::_narrow(reader);
// if an exception is thrown here, it is consumed by the transport thread,
// so ignore narrowing errors at this time
if (0 == reader_i)
return;
TStructure sample;
DDS::SampleInfo info;
DDS::ReturnCode_t status = reader_i->take_next_sample(sample, info);
while (status == DDS::RETCODE_OK) {
if (info.valid_data)
Process(sample);
status = reader_i->take_next_sample(sample, info);
}
}
};
CreateReader()
and CreateWriter()
return wrappers for the corresponding DDS entities. These wrappers maintain a reference to the contained entity, and provide a minimal interface for interacting with the wrapped entity, to keep things simple.
As the Writer
wrapper for a DataWriter
is specific to the type of DataWriter
corresponding to the data sample type being written, it must be parameterized with the appropriate TypeTraits
key.
// Common/Writer.h
template <typename TStructure>
class Writer {
typedef typename TypeTraits<TStructure>::Writer WriterType;
typename WriterType::_var_type writer_i;
public:
Writer() : writer_i(0) {}
Writer(DDS::DataWriter_ptr writer) :
writer_i(WriterType::_duplicate(WriterType::_narrow(writer))) {}
For aid in testing, helper functions are used to manage publication. These methods are based on ones used in various OpenDDS tests that are part of the OpenDDS software distribution. WaitForSubscriber()
does not return until at least one subscriber has been matched with this publisher — this allows a publisher to block until a subscriber is available to receive the data samples. The code waits on a "publication matched" condition, where the wait continues until at least a match count of one is achieved.
bool WaitForSubscriber() {
// Block until Subscriber is available
DDS::StatusCondition_var condition = writer_i->get_statuscondition();
condition->set_enabled_statuses(DDS::PUBLICATION_MATCHED_STATUS);
DDS::WaitSet_var ws = new DDS::WaitSet;
ws->attach_condition(condition);
DDS::ConditionSeq conditions;
DDS::PublicationMatchedStatus matches = { 0, 0, 0, 0, 0 };
DDS::Duration_t timeout = { 30, 0 }; // 30 seconds
do {
if (ws->wait(conditions, timeout) != DDS::RETCODE_OK)
return false;
if (writer_i->get_publication_matched_status(matches) != ::DDS::RETCODE_OK)
return false;
} while (matches.current_count < 1);
ws->detach_condition(condition);
return true;
}
Conversely, WaitForAcknowledgements()
can be used for clean termination of a publisher to ensure that all data samples have been received by a subscriber.
bool WaitForAcknowledgements() {
// Wait for samples to be acknowledged
DDS::Duration_t timeout = { 30, 0 }; // 30 seconds
return (writer_i->wait_for_acknowledgments(timeout) == DDS::RETCODE_OK);
}
Finally, Write()
is used to publish a data sample. register_instance
is used to obtain an instance handle, to improve publication performance.
DDS::ReturnCode_t Write(const TStructure &s) {
return writer_i->write(s, ::DDS::HANDLE_NIL);
}
};
The structure of the Reader
wrapper for a DataReader
is analagous to that of Writer
. It also stores the specific DataReader
type, so must be templatized.
// Common/Reader.h
template <typename TStructure, typename TListener>
class Reader {
typedef typename TypeTraits<TStructure>::Reader ReaderType;
typename ReaderType::_var_type reader_i;
std::tr1::shared_ptr<TListener> listener_;
public:
Reader() {}
Reader(DDS::DataReader_ptr reader, TListener *listener) :
reader_i(ReaderType::_duplicate(ReaderType::_narrow(reader))),
listener_(listener) {}
TListener *GetListener() { return listener_.get(); }
Reader
provides a helper function, WaitForPublisherToComplete()
, to ensure that a publisher has disconnected — the code loops until the count of matched subscriptions is zero.
bool WaitForPublisherToComplete() {
// Block until Publisher completes
DDS::StatusCondition_var condition = reader_i->get_statuscondition();
condition->set_enabled_statuses(DDS::SUBSCRIPTION_MATCHED_STATUS);
DDS::WaitSet_var ws = new DDS::WaitSet;
ws->attach_condition(condition);
DDS::ConditionSeq conditions;
DDS::SubscriptionMatchedStatus matches = { 0, 0, 0, 0, 0 };
DDS::Duration_t timeout = { 30, 0 }; // 30 seconds
do {
if (ws->wait(conditions, timeout) != DDS::RETCODE_OK)
return false;
if (reader_i->get_subscription_matched_status(matches) != DDS::RETCODE_OK)
return false;
} while (matches.current_count > 0);
ws->detach_condition(condition);
return true;
}
};
Although Writer
provided a Write()
method, no corresponding Read()
method is needed — data samples are retrieved by the listener that was associated with the contained DataReader
.
One more generic class is part of the wrapper — a generic container for Reader
and Writer
objects. The EntityMap
stores entities based on their associated domain and topic. This class manages a map of maps — given a DomainId
, a map is retrieved which associates topics with entities. If the desired entity is not found, a generic Create()
function is called to instantiate it, before it is added to the map and returned to the caller.
// Common/EntityMap.h
template <typename TEntity>
class EntityMap {
typedef std::map<std::string, TEntity> TTopicEntityMap;
typedef std::map<::DDS::DomainId_t, TTopicEntityMap> TDomainEntityMap;
TDomainEntityMap map_;
protected:
virtual TEntity Create(DDSBase *ddsBase, ::DDS::DomainId_t domainId,
const char *topic_name) = 0;
public:
TEntity Get(DDSBase *ddsBase,::DDS::DomainId_t domainId, const char *topic_name) {
TDomainEntityMap::iterator itDomain = map_.find(domainId);
if (itDomain == map_.end())
// domain entry not found, so create topic map and entity
map_[domainId][topic_name] = Create(ddsBase, domainId, topic_name);
else {
TTopicEntityMap::iterator itTopic = map_[domainId].find(topic_name);
if (itTopic == map_[domainId].end())
// domain found, but topic not found, so create entity
map_[domainId][topic_name] = Create(ddsBase, domainId, topic_name);
}
return map_[domainId][topic_name];
}
};
A CONCRETE EXAMPLE
Now that the generic wrapper has been described, we can see how to use in a specific case — that of the Messenger
application as described in the OpenDDS Developer's Guide [5].
As defined in Messenger.idl
in the Messenger_IDL
project, the structure representing the Message
type of data sample is as follows:
// Messenger_IDL/Messenger.idl
module Messenger {
#pragma DCPS_DATA_TYPE "Messenger::Message"
#pragma DCPS_DATA_KEY "Messenger::Message subject_id"
struct Message {
string from;
string subject;
long subject_id;
string text;
long count;
};
};
After compiling Messenger.idl
with the OpenDDS IDL compiler tool chain, a number of files are produced that contain various generated entities. MessengerC.h
contains a definition of struct Message
, in the Messenger
namespace, which is an IDL-to-C++ mapping of the Message
IDL structure. The Message
structure is as follows:
// MessengerC.h
namespace Messenger
{
...
struct Message
{
typedef Message_var _var_type;
typedef Message_out _out_type;
TAO::String_Manager from;
TAO::String_Manager subject;
::CORBA::Long subject_id;
TAO::String_Manager text;
::CORBA::Long count;
};
...
}
The file MessengerTypeSupportC.h
provides definitions of classes MessageTypeSupport
, MessageDataReader
, and MessageDataWriter
, also in the Messenger
namespace. Class MessageTypeSupport
provides the type support for the Message
structure, and MessageDataReader
and MessageDataWriter
provide Message
-based specializations of the DDS entities DataReader
and DataWriter
respectively. It is these types that we will use with the generic wrapper to provide a simplified publish and subscribe interface for Messenger
data samples.
To provide a concrete wrapper, we create the project Messenger_CPP_DDSImpLib
, containing the files generated by the OpenDDS IDL compiler tool chain from Messenger.idl
, plus DDSImpl.h
and DDSImpl.cpp
. As our objective is to provide a simple interface for applications that use the .NET Framework, we begin by defining a .NET version of the Message
structure, and functions to convert from the unmanaged C++ representation to the managed, C++/CLI one. In DDSImpl.h
, we define the class MessageNet
, with corresponding .NET types. For clarity, we also maintain a parallel namespace/class representation.
// Messenger_CPP_DDSImpLib/DDSImpl.h
namespace MessengerNet {
public value class MessageNet
{
public:
System::String^ from;
System::String^ subject;
System::Int32 subject_id;
System::String^ text;
System::Int32 count;
};
}
TypeTraits
must be defined for the Message
structure, and it is done with one line of code:
DEFINE_TYPETRAITS(Messenger::Message)
We add prototypes for the functions to convert from the Messenger
to the MessengerNet
types, and vice versa.
// Messenger_CPP_DDSImpLib/DDSImpl.h
Messenger::Message Convert(const gcroot<MessengerNet::MessageNet> &sampleParam);
gcroot<MessengerNet::MessageNet> Convert(const Messenger::Message &sample);
These conversion functions are implemented in DDSImpl.cpp
and are simple copies from one representation to the other.
// Messenger_CPP_DDSImpLib/DDSImpl.cpp
Messenger::Message Convert(const gcroot<MessengerNet::MessageNet> &sampleNetParam) {
MessengerNet::MessageNet sampleNet = (MessengerNet::MessageNet)sampleNetParam;
Messenger::Message sample;
sample.subject_id = sampleNet.subject_id;
sample.from = Convert(sampleNet.from);
sample.subject = Convert(sampleNet.subject);
sample.text = Convert(sampleNet.text);
sample.count = sampleNet.count;
return sample;
}
gcroot<MessengerNet::MessageNet> Convert(const Messenger::Message &sample) {
MessengerNet::MessageNet sampleNet;
sampleNet.subject_id = sample.subject_id;
sampleNet.from = Convert(sample.from);
sampleNet.subject = Convert(sample.subject);
sampleNet.text = Convert(sample.text);
sampleNet.count = sample.count;
return sampleNet;
}
The string conversion functions are available in the Common
library in StringConvert.[h,cpp]
.
We now implement subclasses of the generic C++ wrapper classes, specialized for the Messenger
type. First, we subclass DataReaderListenerImplBase
such that the processing of a data sample causes it to be fired as a .NET event. For more information on this technique, or for the use of OpenDDS with .NET in general, see [6]. The Process()
method is overridden from the base class to post the data sample as an event, and an AddHandler()
method is provided to allow a .NET event handler to be attached to the listener. That is, when the event is posted, the handler is invoked with the event as a parameter. The event support classes are implemented in Common.cpp
in the Common
library.
// Messenger_CPP_DDSImpLib/DDSImpl.h
class MessengerMessageDataReaderListenerImpl :
public DataReaderListenerImplBase<Messenger::MessageDataReader, Messenger::Message> {
gcroot<EventManager<MessengerNet::MessageNet>^> eventManager_;
public:
MessengerMessageDataReaderListenerImpl(
gcroot<EventManager<MessengerNet::MessageNet>^> eventManager) :
eventManager_(eventManager) {}
void Process(const Messenger::Message &sample) {
eventManager_->Process(eventManager_,
gcnew ProcessEventArgs<MessengerNet::MessageNet>(Convert(sample)));
}
void AddHandler(gcroot<EventManager<MessengerNet::MessageNet>::ProcessEventHandler^> handler) {
eventManager_->Process += handler;
}
};
Next, we implement subclasses of the EntityMap
class to manage instances of MessageDataReader
and MessageDataWriter
. The map lookup logic is provided in the base class, but the Create()
methods are overloaded to instantiate the correct type.
// Messenger_CPP_DDSImpLib/DDSImpl.h
typedef Writer<Messenger::Message> MessengerMessageDataWriter;
typedef Reader<Messenger::Message, MessengerMessageDataReaderListenerImpl>
MessengerMessageDataReader;
class MessengerMessageDataWriterMap : public EntityMap<MessengerMessageDataWriter> {
virtual MessengerMessageDataWriter Create(DDSBase *ddsBase,
::DDS::DomainId_t domainId, const char *topic_name) {
return ddsBase->CreateWriter<Messenger::Message>(domainId, topic_name);
}
};
class MessengerMessageDataReaderMap : public EntityMap<MessengerMessageDataReader> {
virtual MessengerMessageDataReader Create(DDSBase *ddsBase,
::DDS::DomainId_t domainId, const char *topic_name) {
return ddsBase->CreateReader<Messenger::Message>(domainId, topic_name,
new MessengerMessageDataReaderListenerImpl(
gcnew EventManager<MessengerNet::MessageNet>()));
}
};
We now create a subclass of DDSBase
that is specialized for the Messenger
type. The implementation of each of the methods is a one-liner delegation to the appropriate contained entity.
// Messenger_CPP_DDSImpLib/DDSImpl.h
class DDSImpl : public DDSBase {
MessengerMessageDataWriterMap mapMessengerMessageDataWriter_;
MessengerMessageDataReaderMap mapMessengerMessageDataReader_;
public:
DDSImpl(int argc, ACE_TCHAR *argv[]) : DDSBase(argc, argv) {}
DDS::ReturnCode_t Publish(::DDS::DomainId_t domainId, const char *topic_name,
gcroot<MessengerNet::MessageNet> sample) {
return mapMessengerMessageDataWriter_.Get(
this, domainId, topic_name).Write(Convert(sample));
}
bool MessengerMessageWaitForSubscriber(::DDS::DomainId_t domainId,
const char *topic_name) {
return mapMessengerMessageDataWriter_.Get(
this, domainId, topic_name).WaitForSubscriber();
}
bool MessengerMessageWaitForAcknowledgements(::DDS::DomainId_t domainId,
const char *topic_name) {
return mapMessengerMessageDataWriter_.Get(
this, domainId, topic_name).WaitForAcknowledgements();
}
void Subscribe(::DDS::DomainId_t domainId, const char *topic_name,
gcroot<EventManager<MessengerNet::MessageNet>::ProcessEventHandler^> handler) {
mapMessengerMessageDataReader_.Get(
this, domainId, topic_name).GetListener()->AddHandler(handler);
}
bool MessengerMessageWaitForPublisherToComplete(::DDS::DomainId_t domainId,
const char *topic_name) {
return mapMessengerMessageDataReader_.Get(
this, domainId, topic_name).WaitForPublisherToComplete();
}
};
One more level of indirection is needed. The class DDSImpl
provides a simple publish/subscribe interface for C++, but the method parameters are not compatible with the .NET Framework Common Type System [7]. A .NET ref class DDSNet
wraps DDSImpl
and provides the required interface.
// Messenger_CPP_DDSImpLib/DDSImpl.h
public ref class DDSNet {
DDSImpl *pDDSImpl;
public:
DDSNet() {
// convert .NET arguments to standard argc/argv
int argc;
char **argv;
GetArguments(argc, argv);
pDDSImpl = new DDSImpl(argc, argv);
}
~DDSNet() {
delete pDDSImpl;
}
int Publish(int domainId, System::String^ topic_name, MessengerNet::MessageNet sample) {
NET_RETHROW_EXCEPTION(return pDDSImpl->Publish(domainId, \
ConvertToString(topic_name).c_str(), sample));
}
bool MessengerMessageWaitForSubscriber(int domainId, System::String^ topic_name) {
NET_RETHROW_EXCEPTION(return pDDSImpl->MessengerMessageWaitForSubscriber(domainId, \
ConvertToString(topic_name).c_str()));
}
bool MessengerMessageWaitForAcknowledgements(int domainId, System::String^ topic_name) {
NET_RETHROW_EXCEPTION(return pDDSImpl->MessengerMessageWaitForAcknowledgements(domainId, \
ConvertToString(topic_name).c_str()));
}
void Subscribe(int domainId, System::String^ topic_name,
EventManager<MessengerNet::MessageNet>::ProcessEventHandler^ handler) {
NET_RETHROW_EXCEPTION(pDDSImpl->Subscribe(domainId, \
ConvertToString(topic_name).c_str(), handler));
}
bool MessengerMessageWaitForPublisherToComplete(int domainId, System::String^ topic_name) {
NET_RETHROW_EXCEPTION(return pDDSImpl->MessengerMessageWaitForPublisherToComplete(domainId, \
ConvertToString(topic_name).c_str()));
}
};
NET_RETHROW_EXCEPTION
is a macro to aid in rethrowing exceptions raised by the unmanaged code as managed, .NET exceptions, where:
// Common/NetException.h
#define NET_RETHROW_EXCEPTION(x) \
try { \
x; \
} \
catch(std::exception &e) { \
throw gcnew DDSNetException(gcnew System::String(e.what())); \
}\
and:
// Common/NetException.cpp
public ref class DDSNetException : public Exception {
public:
DDSNetException(String^ what) : Exception(what) {}
};
The diagram below illustrates the relationships between the classes that were developed above. Please click on the image for a larger version.
MESSENGER IN C#
Now that the .NET interface has been established, as a test, we can replicate the behavior of the C++ Messenger
sample in C#. In the Messenger_CS_Publisher
project, we create the file Publisher.cs
, and begin a standard console-mode C# application. Although this application does not use COM interop, Main()
is marked with [STAThread]
to set a single threaded apartment COM threading model, which is emitted by default by the Visual Studio project generator.
// Messenger_CS_Publisher/Publisher.cs
using System;
using System.Collections.Generic;
namespace Publisher
{
static class Program
{
[STAThread]
static void Main()
{
try
{
We instantiate an object of the DDSNet
class.
DDSNet dds = new DDSNet();
We then wait for a subscriber on the "Movie Discussion List" topic in domain 42.
dds.MessengerMessageWaitForSubscriber(42, "Movie Discussion List");
Once a subscriber is found, we publish reviews in the same manner as the Messenger
example.
for (int i = 0; i < 10; i++) {
MessengerNet.MessageNet messageNet;
messageNet.subject_id=99;
messageNet.from = "Comic Book Guy";
messageNet.subject = "Review";
messageNet.text = "Worst. Movie. Ever.";
messageNet.count = i;
dds.Publish(42, "Movie Discussion List", messageNet);
}
We then wait for the subscriber to receive the messages, free the DDSNet
instance, and exit.
dds.MessengerMessageWaitForAcknowledgements(42, "Movie Discussion List");
dds.Dispose();
}
catch (Exception e)
{
Console.WriteLine(e);
throw;
}
}
}
}
The publisher is thus reduced from many lines of code in the C++ example to two helper methods to align the publisher with the subscriber for test purposes, and one method to publish a data sample.
The subscriber is as straightforward. In the Messenger_CS_Subscriber
project, we create the file Subscriber.cs
. We implement a class with a method which is called when a data sample arrives — in this case, the contents of the sample are printed to the console.
// Messenger_CS_Subscriber/Subscriber.cs
using System;
using System.Collections.Generic;
namespace Subscriber
{
public class Print {
public void MessengerNetMessageNetEventHandler(Object sender,
ProcessEventArgs<MessengerNet.MessageNet> args) {
Console.WriteLine("MessageNetEventHandler: subject = {0}",
args.Sample.subject);
Console.WriteLine("MessageNetEventHandler: subject_id = {0}",
args.Sample.subject_id);
Console.WriteLine("MessageNetEventHandler: from = {0}",
args.Sample.from);
Console.WriteLine("MessageNetEventHandler: count = {0}",
args.Sample.count);
Console.WriteLine("MessageNetEventHandler: text = {0}",
args.Sample.text);
}
};
Next, we begin the Program
class as before, and instantiate an object of type DDSNet
.
static class Program
{
[STAThread]
static void Main()
{
try
{
DDSNet dds = new DDSNet();
We establish the MessengerNetMessageNetEventHandler()
method as the subscriber's listener, for the "Movie Discussion List" on domain 42.
dds.Subscribe(42, "Movie Discussion List",
new EventManager<MessengerNet.MessageNet>.ProcessEventHandler(
new Print().MessengerNetMessageNetEventHandler));
We allow the subscriber to process samples until the publisher exits, free the DDSNet
instance, and exit.
dds.MessengerMessageWaitForPublisherToComplete(42,
"Movie Discussion List");
dds.Dispose();
}
catch (Exception e)
{
Console.WriteLine(e);
throw;
}
}
}
}
The subscriber consists of one helper method to align the subscriber with the publisher for test purposes, and one method to establish the subscription itself.
Running run_test_Messenger.pl
from the Test
subdirectory will execute the DCPSInfoRepo, C# publisher and subscriber, and demonstrate that the output matches the Messenger sample in the OpenDDS Developer's Guide.
SUMMARY
As demonstrated in the article, the OpenDDS interface can be reduced to a simple publish/subscribe. Although rote, it is burdensome to generate the concrete classes by hand for the data sample type. Part II [8] of this article will show how the concrete classes can be generated automatically based on a user-defined type used for the data sample, to make using OpenDDS with .NET even easier.
REFERENCES
[1] OpenDDS
http://www.opendds.org/
[2] Introduction to OpenDDS
http://www.opendds.org/Article-Intro.html
[3] The RAII Programming Idiom
http://www.hackcraft.net/raii/
[4] Using QuickFAST and OpenDDS for a Low Latency Market Data Feed
http://mnb.ociweb.com/mnb/MiddlewareNewsBrief-201001.html
[5] OpenDDS Developer's Guide
http://downloads.ociweb.com/OpenDDS/OpenDDS-latest.pdf
[6] Using TAO and OpenDDS with .NET, Part II
http://mnb.ociweb.com/mnb/MiddlewareNewsBrief-200902.html
[7] Common Type System
http://msdn.microsoft.com/en-us/library/zcx1eb1e%28VS.90%29.aspx
[8] Code Generation with OpenDDS, Part II
http://mnb.ociweb.com/mnb/MiddlewareNewsBrief-201007.html