February 12, 2009 - By Charles Calkins, OCI Senior Software Engineer
Middleware News Brief (MNB) features news and technical information about Open Source middleware technologies.
INTRODUCTION
For several years, Object Computing has been engaged with a client in the maintenance of a legacy data-acquisition application. Data is collected by remote sensing devices and stored in a database; the sensing devices are managed and the data viewed by an application written for Microsoft Windows.
Although originally a single-user application referencing a local database, over time the application has evolved into one in which multiple users can simultaneously connect to a single centralized database. If one user makes a change to the database, all other connected users must be made aware of the change, so their local states can be updated.
A solution to this problem is to create a single process to manage access to the database and to provide database change notifications to interested client applications.
In part I of this article, we described the architecture of DataServer, an application written in a combination of C++ and C#, that manages access to a database and interacts with client applications via the use of CORBA for control, such as adding, updating, and deleting database records, and DDS for client notification.
The Object Computing distributions of TAO and OpenDDS were used as the CORBA and DDS implementations, respectively.
As described in part I, TAO and OpenDDS were selected as the middleware to accomplish this task for two main reasons:
- The first is that it is easy to be up and running quickly — the prototype that was developed to illustrate the architecture was completed in under three days.
- The second is that, because both TAO and OpenDDS are Open Source, no license fees or other costs are involved. The resulting application can be deployed widely without incurring a per-seat, per-CPU, or even a development/SDK charge.
To review, the architecture is as follows, with components written in C# shown with box hatching and components written in C++ shown with angled hatching.
In this article, we continue the development of DataServer by implementing the database notifications with OpenDDS. While this article describes features of OpenDDS that are useful for our purpose, please see this article for more detailed information on OpenDDS itself.
As our objective is to notify all clients whenever any client performs a database operation, DataServer must be able to generate notifications as needed, and clients need to be able to receive them. The distributed publish and subscribe architecture of DDS is exactly what we need.
SIDEBAR
The code in this article was developed with Microsoft Visual Studio 2005. It was compiled against TAO version 1.6a, OpenDDS version 1.3, and MPC version 3.7.2. Inline assembly was disabled to prevent the .NET-related compiler warning C4793, as the use of __asm
forces native code generation. Wide character support was enabled, as .NET uses Unicode for string representation. The build settings for these features are as follows:
// add to %ACE_ROOT%\ace\config.h
#define ACE_LACKS_INLINE_ASSEMBLY 1
#define ACE_USES_WCHAR 1
// add to %ACE_ROOT%\bin\MakeProjectCreator\config\default.features
uses_wchar=1
DDS/OpenDDS OVERVIEW
We shall begin by reviewing the elements of DDS and, where applicable, areas where OpenDDS provides more specialized functionality.
OpenDDS supports the Data-Centric Publish and Subscribe (DCPS) layer of the DDS specification, so it is that layer of DDS that we will be concerned with in this article.
Processes that wish to exchange messages via DDS are part of one or more domains, and are known as domain participants. A domain is a global data space identified by an integer. While a given process can join multiple domains, messages sent in one domain can be received only by other participants in that same domain.
A publisher is responsible for distributing data in a domain. It may publish data of different types, with each type published by a data writer associated with that type.
Data is received by a subscriber, with each data type received by a type-specific data reader.
Quality of service (QoS) policies exist at various levels to manage the data transfer process. Entities in a domain such as these may also have associated listeners, where events can be received asynchronously when states are changed, data is received, or the like.
The data that is transferred between publishers and subscribers is represented by a topic. A topic has:
- A name that's unique in the domain
- A data type that's expressed in IDL
- QoS policies associated with the data
A sample is a particular data element of a topic, and a sample may have one or more fields defined as a key.
An instance is a set of samples that have the same key.
OpenDDS allows samples to be transmitted using various protocols, including: TCP (SimpleTcp
), UDP (SimpleUdp
), unreliable multicast (SimpleMcast
), and reliable multicast (ReliableMulticast
). This is known as a transport.
In this article, we will develop a database notification topic and add the ability for DataServer to publish samples of that topic and for a client application to subscribe to samples of that topic. We will build upon the code developed for part I, focusing on OpenDDS integration, so please review the earlier article for the details of using TAO in DataServer. Also, while relevant code segments will be highlighted, please see the code archive that accompanies this article for the full details.
DATABASE NOTIFICATION
We wish to notify all clients of all operations that are performed on the database by other clients — reads, writes, updates and deletes of records. Although, for purposes of demonstration, DataServer manages only one table named Items
, we will include the table name in the notification for completeness, in addition to the ID of the row that was acted upon.
For use by DDS, the database notification must be expressed in IDL. We create a directory, DatabaseNotification
off of the root and add a file, DatabaseNotification.idl
with the following contents:
- // DatabaseNotification.idl
- enum NotificationType
- {
- NOTIFICATION_CREATED,
- NOTIFICATION_READ,
- NOTIFICATION_UPDATED,
- NOTIFICATION_DELETED
- };
-
- #pragma DCPS_DATA_TYPE "DatabaseNotification"
- #pragma DCPS_DATA_KEY "DatabaseNotification id"
- #pragma DCPS_DATA_KEY "DatabaseNotification table_name"
- struct DatabaseNotification
- {
- long long id;
- string table_name;
- NotificationType notification_type;
- };
The DatabaseNotification
structure represents the notification itself, containing both a 64-bit table row ID and the table name with which the ID is associated and referencing an enumeration that specifies the various types of actions that can occur.
Additional information could also be provided, such as a unique identifier representing the client that performed the action, but for now, the above will suffice.
In order for a structure expressed in IDL to be used as an OpenDDS type, the structure must be identified by the DCPS_DATA_TYPE
pragma. A key can be defined by the DCPS_DATA_KEY
pragma, where both the containing structure and the field name are provided.
Here, the key is composed of two fields:
id
table_name
A key is not required, but in this example, we use keys to identify each unique combination of table_name
and id
as a separate instance.
To compile the IDL definition, we next create an MPC file named DatabaseNotification.mpc
in the same directory
- // DatabaseNotification.mpc
- project : dcps, CPPBase {
- sharedname = DatabaseNotificationCommon
- dynamicflags = DATABASENOTIFICATIONCOMMON_BUILD_DLL
- requires += tao_orbscvs
- includes += $(TAO_ROOT)/orbsvcs
- idlflags += -I$(TAO_ROOT)/orbsvcs
- idlflags += -Wb,export_macro=DatabaseNotificationCommon_Export
- idlflags += \
- -Wb,export_include=DatabaseNotificationCommon_Export.h
- dcps_ts_flags += --export=DatabaseNotificationCommon_Export
-
- TypeSupport_Files {
- DatabaseNotification.idl >> \
- DatabaseNotificationTypeSupport.idl \
- DatabaseNotificationTypeSupportImpl.h \
- DatabaseNotificationTypeSupportImpl.cpp
- }
-
- IDL_Files {
- DatabaseNotificationTypeSupport.idl
- DatabaseNotification.idl
- }
-
- Header_Files {
- DatabaseNotificationTypeSupportImpl.h
- }
-
- Source_Files {
- DatabaseNotificationTypeSupportImpl.cpp
- }
- }
This project inherits from the dcps
base project, which provides many definitions needed for compiling with OpenDDS. The result of compiling the IDL is a C++ library, DatabaseNotificationCommon
.
The project also inherits from CPPBase
, as defined in part I, to set output directories and include paths and other properties.
OpenDDS uses elements of TAO, so a dependence on tao_orbscvs
is needed.
Macros must be defined to manage the declaration and exporting of generated symbols, with the primary macro set to the name of the project, DatabaseNotificationCommon
, suffixed with _Export
. The definition of this and other macros is in a header file with the same name and .h
extension. This command is used to create that file (typed on one line):
generate_export_file.pl DatabaseNotificationCommon > DatabaseNotificationCommon_Export.h
The remaining sections of the MPC file describe the files that will be compiled into the library. Note that the >>
indicates that the result of processing one file (here, running the tao_idl compiler on DatabaseNotification.idl
) results in additional files being generated, which themselves must be processed to produce the library.
Now that the MPC file is complete, it must be added to the workspace so it can be compiled as part of the project.
After adding DatabaseNotification
to the DataServer.mwc
file, the result is as follows:
- // DataServer.mwc
- workspace {
- specific {
- cmdline += -language csharp
- DataLib
- Client
- }
- DataServer
- DataServerConnectorLib
- IDL
- DatabaseNotification
- }
PUBLISHER
With the definition of the database notification complete, we can add the ability to DataServer to publish notifications when database operations are performed. We will update DataServer to be a DDS publisher by adding code to create the relevant DDS entities and to publish samples when database operations are successfully executed by extending the servant that had been created in part I.
We start with defining a few constants, for convenience, to represent the domain, as well as the notification type and topic names.
In DataServer.cpp
, we add the following before ACE_TMAIN()
:
- DDS::DomainId_t DATABASENOTIFICATION_DOMAIN_ID = 1066;
- const char* DATABASENOTIFICATION_TYPE =
- "DatabaseNotification Type";
- const char* DATABASENOTIFICATION_TOPIC =
- "DatabaseNotification Topic";
At the start of ACE_TMAIN()
, we define two variables that represent fundamental DDS elements:
- int ACE_TMAIN(int argc, ACE_TCHAR *argv[]) {
- DDS::DomainParticipantFactory_var dpf =
- DDS::DomainParticipantFactory::_nil();
- DDS::DomainParticipant_var participant =
- DDS::DomainParticipant::_nil();
The DomainParticipantFactory
allows creation of DomainParticipant
objects, while the DomainParticipant
acts as a container of – and a factory for – DDS entities within a specific domain.
When mixing TAO and OpenDDS in the same application, it's necessary to create a separate argc
/argv
set for OpenDDS initialization for two reasons.
- The first is that the initialization of OpenDDS (or TAO) will cause command-line arguments to be consumed, so necessary but already-consumed arguments will be unavailable for the initialization of the other.
- The second is that it is important to ensure that arguments used by TAO and OpenDDS do not conflict.
For instance, DataServer would typically be started with an argument such as the following on the command line to specify a listening port:
-ORBListenEndpoints iiop://:12346
Passing this same argument to both TAO and OpenDDS results in an error because both would attempt to listen on the same port, so a CORBA BAD_PARAM
exception will be thrown. We want that option to be used only for the initialization of TAO and not of OpenDDS, so we must create a new argc/argv
set that does not contain -ORBListenEndpoints
to be used for OpenDDS.
- try {
- // must duplicate argc/argv as each ORB absorbs parameters
- int argcDDS = 0;
- ACE_TCHAR **argvDDS=new ACE_TCHAR *[argc];
- for (int i=0; i<argc; i++)
- // do not pass -ORBListenEndpoints and its argument
- if (!ACE_OS::strcmp(
- ACE_TEXT("-ORBListenEndpoints"),
- argv[i]))
- i++;
- else
- argvDDS[argcDDS++] = ACE_OS::strdup(argv[i]);
We can now initialize the DomainParticipantFactory
with the new argc
/argv
set.
- dpf = TheParticipantFactoryWithArgs(argcDDS, argvDDS);
TheParticipantFactoryWithArgs()
is a macro that expands to:
TheServiceParticipant->get_domain_participant_factory(argc, argv)
where TheServiceParticipant
is a singleton that allows applications to configure OpenDDS, as well as providing other management features. For instance, calling TheServiceParticipant->shutdown()
will terminate OpenDDS processing.
Returning to DataServer, we can create the DomainParticipant
for our domain. DataServer does not need to participate in more than one domain, so only one DomainParticipant
for the DATABASENOTIFICATION_DOMAIN_ID
domain is needed.
The default QoS policy is sufficient for the DomainParticipant
, so the PARTICIPANT_QOS_DEFAULT
constant is specified.
For a list of what QoS policies are applied as a default, please refer to Appendix A at the end of this article.
DataServer does not need to monitor any events associated with the DomainParticipant
, so no listener is needed.
- participant = dpf->create_participant(
- DATABASENOTIFICATION_DOMAIN_ID,
- PARTICIPANT_QOS_DEFAULT,
- DDS::DomainParticipantListener::_nil());
- if (CORBA::is_nil(participant.in()))
- throw std::exception("create_participant failed");
We must now initialize the transport. Please refer to Appendix B for a discussion on various ways that this can be done.
For flexibility, we will choose to have the transport specified via configuration files, so we add the following to DataServer.cpp
:
- const OpenDDS::DCPS::TransportIdType TRANSPORT_IMPL_ID = 1;
-
- OpenDDS::DCPS::TransportImpl_rch trans_impl =
- TheTransportFactory->create_transport_impl(
- TRANSPORT_IMPL_ID,
- OpenDDS::DCPS::AUTO_CONFIG);
We now create a Publisher
using the DomainParticipant
as a factory. As before, we do not need a listener, and the default QoS is sufficient.
- DDS::Publisher_var pub = participant->create_publisher(
- PUBLISHER_QOS_DEFAULT,
- DDS::PublisherListener::_nil());
- if (CORBA::is_nil(pub.in()))
- throw std::exception("create_publisher failed");
With the transport defined, we can attach the Publisher
to the transport.
- OpenDDS::DCPS::AttachStatus status =
- trans_impl->attach(pub.in());
- if (status != OpenDDS::DCPS::ATTACH_OK) {
- std::string msg("Cannot attach to the transport: ");
- switch (status) {
- case OpenDDS::DCPS::ATTACH_BAD_TRANSPORT:
- throw std::exception(
- (msg+"ATTACH_BAD_TRANSPORT").c_str());
- case OpenDDS::DCPS::ATTACH_ERROR:
- throw std::exception(
- (msg+"ATTACH_ERROR").c_str());
- case OpenDDS::DCPS::ATTACH_INCOMPATIBLE_QOS:
- throw std::exception(
- (msg+"ATTACH_INCOMPATIBLE_QOS").c_str());
- default:
- throw std::exception(
- (msg+"unknown status").c_str());
- }
- }
Next, we register the type. This informs the middleware that the database notification type exists and allows it to manage data of that type.
- DatabaseNotificationTypeSupport_var
- databaseNotification_servant
- = new DatabaseNotificationTypeSupportImpl();
- if (DDS::RETCODE_OK !=
- databaseNotification_servant->register_type(
- participant.in(),
- DATABASENOTIFICATION_TYPE))
- throw std::exception("register_type failed");
As the data type itself has been registered, we now inform the middleware of the topic.
A topic is composed of:
- A data type
- A topic name
- QoS
- A listener (optional)
A listener is still not needed, and once again the default QoS is sufficient.
Although the constant TOPIC_QOS_DEFAULT
could be used, for demonstration purposes, we choose to retrieve the default QoS into a structure via get_default_topic_qos()
and apply that structure as the QoS.
- DDS::TopicQos default_topic_qos;
- participant->get_default_topic_qos(default_topic_qos);
-
- DDS::Topic_var databaseNotification_topic =
- participant->create_topic(
- DATABASENOTIFICATION_TOPIC,
- DATABASENOTIFICATION_TYPE,
- default_topic_qos,
- DDS::TopicListener::_nil());
- if (CORBA::is_nil(databaseNotification_topic.in()))
- throw std::exception("create_topic failed");
Next, we create a data writer for the database notification. At this point, we still do not require more than the default QoS, and no listener is needed.
- DDS::DataWriterQos dw_default_qos;
- pub->get_default_datawriter_qos(dw_default_qos);
-
- DDS::DataWriter_var databaseNotification_base_dw =
- pub->create_datawriter(databaseNotification_topic.in(),
- dw_default_qos,
- DDS::DataWriterListener::_nil());
- if (CORBA::is_nil(databaseNotification_base_dw.in()))
- throw std::exception("create_datawriter failed");
-
- DatabaseNotificationDataWriter_var databaseNotification_dw
- = DatabaseNotificationDataWriter::_narrow(
- databaseNotification_base_dw.in());
- if (CORBA::is_nil(databaseNotification_dw.in()))
- throw std::exception(
- "DatabaseNotificationDataWriter could not be narrowed");
To make use of the data writer, we pass it to the servant which implements the CORBA Database interface that we developed in part I.
Database_i servant(%database, databaseNotification_dw.in());
Finally, we must clean up the various DDS components that were allocated.
delete_contained_entities()
is called to free all entities that are associated with the participant, and the participant itself is freed with a call to delete_participant()
. If any other DomainParticipants
for other domains were allocated, they must also be freed at this point.
When no others remain, the TransportFactory
is freed and DDS processing is terminated by calling shutdown()
on TheServiceParticipant
.
- try {
- if (!CORBA::is_nil(participant.in()))
- participant->delete_contained_entities();
- if (!CORBA::is_nil(dpf.in()))
- dpf->delete_participant(participant.in());
- } catch (CORBA::Exception& e) {
- std::cerr << "Exception during cleanup: " << e << std::endl;
- }
- TheTransportFactory->release();
- TheServiceParticipant->shutdown();
This completes the changes to DataServer.cpp
. We must now update the servant to make use of the data writer.
We first add a member variable to store a pointer to the data writer and update the constructor to set the variable.
- // Database_i.h
- class Database_i
- : public virtual POA_Database
- {
- gcroot<DataLib::Database^> database_;
- DatabaseNotificationDataWriter_ptr dataWriter_;
- ...
- public:
- Database_i(gcroot<DataLib::Database^> database,
- DatabaseNotificationDataWriter_ptr dataWriter);
- // Database_i.cpp
- Database_i::Database_i(gcroot<DataLib::Database^> database,
- DatabaseNotificationDataWriter_ptr dataWriter) :
- database_(database), dataWriter_(dataWriter)
- {
- }
Next, we add a helper method, WriteNotification()
, which is called to publish the database notification itself.
The DatabaseNotification
structure is populated based on the method parameters and then passed to write()
of the DataWriter to be published as a DDS sample. For simplicity, a sample instance handle is not used, and the constant DDS::HANDLE_NILM
is passed instead.
- // Database_i.cpp
- void Database_i::WriteNotification(::CORBA::LongLong id,
- ::CORBA::Char *tableName, NotificationType notificationType) {
- DatabaseNotification notification;
- notification.id = id;
- notification.table_name = ::CORBA::string_dup(tableName);
- notification.notification_type = notificationType;
-
- int ret = dataWriter_->write(notification, DDS::HANDLE_NIL);
- if (ret != DDS::RETCODE_OK)
- ACE_ERROR ((LM_ERROR,
- ACE_TEXT("(%P|%t)ERROR: write returned %d.\n"), ret));
- }
Each database method is now updated to call WriteNotification()
. For instance, we modify CreateItem()
as follows; the other functions are updated in a similar way.
- ::CORBA::Boolean Database_i::CreateItem (
- const ::CORBA::WChar * description,
- ::CORBA::LongLong_out id)
- {
- System::String^ netDescription =
- gcnew System::String(description);
- ::CORBA::Boolean result =
- database_->CreateItem(netDescription, id);
- delete netDescription;
- if (result)
- WriteNotification(id, "Items", NOTIFICATION_CREATED);
- return result;
- }
To complete the changes to DataServer, we must update DataServer.mpc
. Only two changes are necessary:
- The project must inherit from
dcpsexe
, as this application uses elements of OpenDDS - A dependency upon
DatabaseNotification
is needed because DataServer must link to theDatabaseNotificationCommon
library.
The resulting file is as follows:
- // DataServer.mpc
- project : taoserver, dcpsexe, CPPBase, iortable {
- exename = DataServer
- after += IDL
- after += DataLib
- after += DatabaseNotification
-
- includes += ../IDL
- Source_Files {
- Database_i.cpp
- DataServer.cpp
- ../IDL/DatabaseC.cpp
- ../IDL/DatabaseS.cpp
- }
- IDL_Files {
- }
-
- managed = 1
- }
SUBSCRIBER
With the publisher complete, we can begin the subscriber.
As we mixed DDS publisher code with the TAO server in DataServer.cpp
, we mix DDS subscriber code with the TAO client in DataServerConnectorLib.cpp
. As with the publisher, we again define constants representing the domain, notification type, and topic, and create variables for the DomainParticipantFactory
and the DomainParticipant
.
- DDS::DomainId_t DATABASENOTIFICATION_DOMAIN_ID = 1066;
- const char* DATABASENOTIFICATION_TYPE =
- "DatabaseNotification Type";
- const char* DATABASENOTIFICATION_TOPIC =
- "DatabaseNotification Topic";
-
- void DataServerConnector::Run() {
- DDS::DomainParticipantFactory_var dpf =
- DDS::DomainParticipantFactory::_nil();
- DDS::DomainParticipant_var participant =
- DDS::DomainParticipant::_nil();
We must create separate argc/argv
sets for OpenDDS and TAO initialization, but as this is the .NET world, it is done slightly differently.
Command-line arguments in .NET are available as an array of .NET strings — these must be converted to standard argc/argv
.
- int argc = 0, argc2 = 0;
- wchar_t **argv = NULL, **argv2 = NULL;
-
- try {
- array<String^>^ arguments = Environment::GetCommandLineArgs();
- argc = arguments->Length;
- argc2 = argc;
- argv = new wchar_t *[argc];
- argv2 = new wchar_t *[argc2];
- for (int i=0; i<argc; i++) {
- pin_ptr<const wchar_t> arg = PtrToStringChars(arguments[i]);
- argv[i] = _wcsdup(arg);
- argv2[i] = _wcsdup(arg);
- }
The initialization of the participant factory, the participant, and the transport are identical to that of the publisher.
- dpf = TheParticipantFactoryWithArgs(argc, argv);
-
- participant = dpf->create_participant(
- DATABASENOTIFICATION_DOMAIN_ID,
- PARTICIPANT_QOS_DEFAULT,
- DDS::DomainParticipantListener::_nil());
- if (CORBA::is_nil(participant.in()))
- throw std::exception("create_participant failed");
-
- const OpenDDS::DCPS::TransportIdType TRANSPORT_IMPL_ID = 1;
- OpenDDS::DCPS::TransportImpl_rch trans_impl =
- TheTransportFactory->create_transport_impl(
- TRANSPORT_IMPL_ID,
- OpenDDS::DCPS::AUTO_CONFIG);
Next, instead of creating a publisher, we create a subscriber. No listener is needed and the default QoS is sufficient.
- DDS::Subscriber_var sub =
- participant->create_subscriber(SUBSCRIBER_QOS_DEFAULT,
- DDS::SubscriberListener::_nil());
- if (CORBA::is_nil(sub.in()))
- throw std::exception("create_subscriber failed");
We attach the subscriber to the transport in the same way that the publisher was attached to the transport.
- OpenDDS::DCPS::AttachStatus status =
- trans_impl->attach(sub.in());
- if (status != OpenDDS::DCPS::ATTACH_OK) {
- std::string msg("Cannot attach to the transport: ");
- switch (status) {
- case OpenDDS::DCPS::ATTACH_BAD_TRANSPORT:
- throw std::exception(
- (msg+"ATTACH_BAD_TRANSPORT").c_str());
- case OpenDDS::DCPS::ATTACH_ERROR:
- throw std::exception(
- (msg+"ATTACH_ERROR").c_str());
- case OpenDDS::DCPS::ATTACH_INCOMPATIBLE_QOS:
- throw std::exception(
- (msg+"ATTACH_INCOMPATIBLE_QOS").c_str());
- default:
- throw std::exception(
- (msg+"unknown status").c_str());
- }
- }
We also register the type and topic in the same manner as the publisher.
- DatabaseNotificationTypeSupport_var
- databaseNotification_servant =
- new DatabaseNotificationTypeSupportImpl();
- if (DDS::RETCODE_OK !=
- databaseNotification_servant->register_type(
- participant.in(),
- DATABASENOTIFICATION_TYPE))
- throw std::exception("register_type failed");
-
- DDS::TopicQos default_topic_qos;
- participant->get_default_topic_qos(default_topic_qos);
-
- DDS::Topic_var databaseNotification_topic =
- participant->create_topic(
- DATABASENOTIFICATION_TOPIC,
- DATABASENOTIFICATION_TYPE,
- default_topic_qos, DDS::TopicListener::_nil());
- if (CORBA::is_nil(databaseNotification_topic.in()))
- throw std::exception("create_topic failed");
We do need a listener for the data reader, so we create an instance of its implementation and we pass this
as a parameter so the listener can interact with this class.
The purpose of the listener is to notify the application when samples arrive indicating database changes have occurred.
The next section will describe the listener further.
- DDS::DataReaderListener_var
- databaseNotification_listener(
- new DatabaseNotificationDataReaderListenerImpl(this));
- if (CORBA::is_nil(databaseNotification_listener.in()))
- throw std::exception("cannot create the listener");
We create a data reader in a manner similar to the way we created the data writer, although this time we pass a listener reference to create_datareader()
.
- DDS::DataReaderQos dr_default_qos;
- sub->get_default_datareader_qos(dr_default_qos);
-
- DDS::DataReader_var databaseNotification_dr =
- sub->create_datareader(databaseNotification_topic.in(),
- dr_default_qos,
- databaseNotification_listener.in());
- if (CORBA::is_nil(databaseNotification_dr.in()))
- throw std::exception("create_datareader failed");
At the end of DataConnector::Run()
, we add the same cleanup code as we did at the end of ACE_TMAIN()
, although we format the exception as a DataConnectorException
for processing by the client.
- try {
- if (!CORBA::is_nil(participant.in()))
- participant->delete_contained_entities();
- if (!CORBA::is_nil(dpf.in()))
- dpf->delete_participant(participant.in());
- } catch (CORBA::Exception& e) {
- std::stringstream ss;
- ss << "Exception during cleanup: " << e;
- throw gcnew
- DataConnectorException(gcnew String(ss.str().c_str()));
- }
- TheTransportFactory->release();
- TheServiceParticipant->shutdown();
DatabaseNotification LISTENER
Samples must be processed when they arrive in order to pass the database notification to the user of DataServerConnector
, so a listener for the DataReader is needed.
In the DataServerConnector project, create a file DatabaseNotificationDataReaderListener.h
to implement a subclass of DDS::DataReaderListener
. We store a reference to the DataServerConnector
as a member variable for later use.
As DataServerConnector
is a .NET type and DatabaseNotificationDataReaderListenerImpl
is not, gcroot<>
is needed.
- // DatabaseNotificationDataReaderListener.h
- class DatabaseNotificationDataReaderListenerImpl
- : public virtual OpenDDS::DCPS::LocalObject<DDS::DataReaderListener>
- {
- gcroot<DataServerConnector^> dataConnector_;
- public:
- DatabaseNotificationDataReaderListenerImpl(
- gcroot<DataServerConnector^> dataConnector) :
- dataConnector_(dataConnector) {}
Only one listener method needs to be implemented, as only the arrival of data samples is of interest. We declare on_data_available()
to be implemented in the CPP file, but provide empty bodies for the other methods.
- virtual ~DatabaseNotificationDataReaderListenerImpl() {}
-
- virtual void on_requested_deadline_missed(
- DDS::DataReader_ptr /*reader*/,
- const DDS::RequestedDeadlineMissedStatus & /*status*/)
- throw (CORBA::SystemException) {}
-
- virtual void on_requested_incompatible_qos(
- DDS::DataReader_ptr /*reader*/,
- const DDS::RequestedIncompatibleQosStatus & /*status*/)
- throw (CORBA::SystemException) {}
-
- virtual void on_liveliness_changed(
- DDS::DataReader_ptr /*reader*/,
- const DDS::LivelinessChangedStatus & /*status*/)
- throw (CORBA::SystemException) {}
-
- virtual void on_subscription_match(
- DDS::DataReader_ptr /*reader*/,
- const DDS::SubscriptionMatchStatus & /*status*/)
- throw (CORBA::SystemException) {}
-
- virtual void on_sample_rejected(
- DDS::DataReader_ptr /*reader*/,
- const DDS::SampleRejectedStatus & /*status*/)
- throw (CORBA::SystemException) {}
-
- virtual void on_data_available(
- DDS::DataReader_ptr reader)
- throw (CORBA::SystemException);
-
- virtual void on_sample_lost(
- DDS::DataReader_ptr /*reader*/,
- const DDS::SampleLostStatus & /*status*/)
- throw (CORBA::SystemException) {}
- };
We next create a CPP file named DatabaseNotificationDataReaderListener.cpp
for the implementation of on_data_available()
.
A DataReader is passed into the method as a parameter and must be narrowed to the specific type it represents — here, a DatabaseNotificationDataReader
.
- // DatabaseNotificationDataReaderListener.cpp
- void DatabaseNotificationDataReaderListenerImpl::
- on_data_available(DDS::DataReader_ptr reader)
- throw (CORBA::SystemException)
- {
- try {
- DatabaseNotificationDataReader_var databaseNotification_dr
- = DatabaseNotificationDataReader::_narrow(reader);
- if (CORBA::is_nil(databaseNotification_dr.in()))
- throw std::exception(
- "DatabaseNotificationDataReaderListenerImpl::
- "on_data_available: _narrow failed.");
We must now call take_next_sample()
until no more data remains.
A DatabaseNotification
structure is populated with the data from the sample, and information on the sample is returned in the SampleInfo
structure.
- while (true) {
- DatabaseNotification notification;
- DDS::SampleInfo si;
- DDS::ReturnCode_t status =
- databaseNotification_dr->
- take_next_sample(notification, si);
If the status returned from take_next_sample()
is DDS::RETCODE_OK
, further processing is needed; if the return is DDS::RETCODE_NO_DATA
, the while
loop can be exited because no more samples are currently available.
If data is available, the instance_state
of the SampleInfo
must be examined.
If the instance_state
is either ALIVE (there are live DataWriters writing the instance, and the samples have not been disposed) or NOT_ALIVE_NO_WRITERS (there are no live DataWriters, but the samples have not been disposed), the sample is valid and should be processed. The fields of the sample are then converted to .NET types after extracting them from the DatabaseNotification
structure, and the method ProcessNotification()
is called on the DataServerConnector
.
If the instance_state
is NOT_ALIVE_DISPOSED, the samples have been deleted and should not be consumed.
- if (status == DDS::RETCODE_OK) {
- if ((si.instance_state == DDS::ALIVE_INSTANCE_STATE) ||
- (si.instance_state == DDS::NOT_ALIVE_NO_WRITERS_INSTANCE_STATE))
- dataConnector_->ProcessNotification(
- dataConnector_,
- gcnew ProcessNotificationEventArgs(
- notification.id,
- gcnew String(notification.table_name.in()),
- (Notification)notification.notification_type));
- }
- else if (status == DDS::RETCODE_NO_DATA)
- break;
- }
We conclude the method by handling exceptions and re-throwing them as .NET ones.
- }
- catch (CORBA::Exception& ex) {
- std::stringstream ss;
- ss << "Exception in read: " << ex;
- throw gcnew DataConnectorException(
- gcnew String(ss.str().c_str()));
- }
- catch (std::exception& ex) {
- std::stringstream ss;
- ss << "Exception in read: " << ex.what();
- throw gcnew DataConnectorException(
- gcnew String(ss.str().c_str()));
- }
- }
The listener is now complete, and we can return to DataServerConnector
.
DataServerConnector
must provide an interface to allow users of the library to receive the data received from the DDS sample. This is accomplished by a .NET event.
To create the event, we first construct a delegate. A delegate is essentially a type-safe function pointer, although it may reference more than one function at a time.
For a discussion of delegates, see references [1] and [2].
We return to DataServerConnectorLib.h
and declare:
- A delegate in the
DataServerConnector
class - A class member variable of that type
- // DataServerConnectorLib.h, class DataServerConnector
- delegate void ProcessNotificationEventHandler(Object^ sender, ProcessNotificationEventArgs ^args);
- ProcessNotificationEventHandler^ pProcessNotification_;
The parameters of the delegate match the pattern used by events, as explained in reference [3].
- The first parameter is a reference to a
System::Object
, a fundamental .NET type. - The second argument,
ProcessNotificationEventArgs
, is a custom class that's a subclass of the .NETEventArgs
class. This class is no more than a container for the values from the DDS sample, with accessor properties to easily retrieve the stored values.
Also in DataServerConnectorLib.h
, we declare the class as follows:
- // DataServerConnectorLib.h
- public ref class ProcessNotificationEventArgs : public EventArgs {
- Int64 id_;
- String^ tableName_;
- Notification notificationType_;
- public:
- ProcessNotificationEventArgs(Int64 id, String^ tableName,
- Notification notificationType) : id_(id),
- tableName_(tableName), notificationType_(notificationType) {}
- property Int64 ID { Int64 get() { return id_; } }
- property String^ TableName { String^ get() { return tableName_; } }
- property Notification NotificationType {
- Notification get() { return notificationType_; }
- }
- };
We now create the event itself. In class DataServerConnector
, declare the event as follows:
- // DataServerConnectorLib.h, class DataServerConnector
- event ProcessNotificationEventHandler^ ProcessNotification
- {
- void add(ProcessNotificationEventHandler^ p)
- { pProcessNotification_ +=p; }
- void remove(ProcessNotificationEventHandler^ p)
- { pProcessNotification_ -=p; }
- void raise(Object^ obj, ProcessNotificationEventArgs^ args) {
- if (pProcessNotification_!=nullptr)
- pProcessNotification_(obj, args);
- }
- }
The add()
method of the event is called whenever an object wishes to become an observer of the event. The reference to the object is added to the delegate variable, and as many observers as desired are allowed.
Conversely, remove()
is called whenever an object no longer wishes to be an observer of the event.
The raise()
method is called whenever an event is fired, and all observers are notified of the event. Note that before invoking the delegate, a test for the existence of observers must first be performed (a comparison against nullptr
). Invoking a null delegate is an error.
Invoking the event as a function, as the listener does in on_data_available()
...
- // DatabaseNotificationDataReaderListener.cpp
- dataConnector_->ProcessNotification(dataConnector_,
- gcnew ProcessNotificationEventArgs(
- notification.id,
- gcnew String(notification.table_name.in()),
- (Notification)notification.notification_type));
... implicitly calls raise()
, which then propagates the sample data to all observers.
Now that DDS elements have been added to DataServerConnectorLib
, we must update the MPC file to reflect the changes. The project must now do all of the following:
- Inherit from
dcps
- Link with
DatabaseNotificationCommon
- Reference TAO's
orbsvcs
- Include
DatabaseNotificationDataReaderListenerImpl.cpp
in its compilation
- // DataServerConnectorLib.mpc
- project : taoexe, dcps, CPPBase {
- sharedname = DataServerConnectorLib
-
- after += IDL
- includes += ../IDL
- includes += $(TAO_ROOT)/orbsvcs
-
- after += DatabaseNotification
-
- Source_Files {
- DataServerConnectorLib.cpp
- ../IDL/DatabaseC.cpp
- DatabaseNotificationDataReaderListenerImpl.cpp
- }
- IDL_Files {
- }
-
- managed = 1
- }
Adding an observer of the event in the Client
is simple. We add a new method to class Client
in Client.cs
to handle the event — in this case, log that the event arrived:
- // Client.cs
- void OnNotification(object obj, ProcessNotificationEventArgs args)
- {
- Log("EVENT: Table '" + args.TableName + "', item " +
- args.ID + " " + args.NotificationType);
- }
In the constructor of class Client
, we register the OnNotification()
function with the event to register the object as an event observer.
- dataConnector_.ProcessNotification +=
- new DataServerConnector.
- ProcessNotificationEventHandler(OnNotification);
In C#, the +=
operation invokes the event's add()
method to perform the observer registration. This completes the database notification subscriber.
CONCLUSION
The following screen shots demonstrate the system.
We start two Client
s, as well as the DataServer and the DCPSInfoRepo, which are not shown. For this run, the DCPSInfoRepo was run on the machine oci1373
and started with the following command (on a single line):
DCPSInfoRepo -ORBSvcConf lib_tcp.conf -ORBListenEndpoints iiop://:12345 -ORBDottedDecimalAddresses 0
The server was started, also on oci1373
, with this command (on a single line):
DataServer -ORBSvcConf lib_tcp.conf -ORBListenEndpoints iiop://:12346 -DCPSInfoRepo corbaloc::localhost:12345/DCPSInfoRepo -DCPSConfigFile tcp_conf.ini
Each of the Client
instances were started with this command (on a single line):
Client
-ORBSvcConf lib_tcp.conf -ORBDottedDecimalAddresses 0
-ORBInitRef DataServer=corbaloc:iiop:localhost:12346/DataServer
-DCPSInfoRepo corbaloc::localhost:12345/DCPSInfoRepo
-DCPSConfigFile tcp_conf.ini
Note that, when running on multiple hosts, addresses will need to be updated to use host names or IP addresses as appropriate, instead of localhost
.
We enter "My First Item" into the TextBox associated with the Create button on the first Client
.
Pressing the Create button creates the database item, and the generated ID of 1 is reflected in the ListView. An event indicating that item 1 in the Items table was created is also shown in both clients.
On the second Client, we enter the ID of 1 into the TextBox associated with the Read button.
Pressing the Read button displays "My First Item" as the item description, demonstrating that the second Client
has referenced the same database as the first Client
. An event indicating that item 1 was read is also shown in both clients.
This article has described how to mix TAO and OpenDDS in a .NET application to implement a client-server system.
The next article in this series will illustrate various refinements to DataServer.
REFERENCES
- [1] An Introduction to Delegates
http://msdn.microsoft.com/en-us/magazine/cc301810.aspx - [2] Delegates, Part 2
http://msdn.microsoft.com/en-us/magazine/cc301816.aspx - [3] Implementation of Events with Delegates
http://msdn.microsoft.com/en-us/magazine/cc301822.aspx
APPENDIX A
As of OpenDDS 1.3, the default quality of service policies correspond to the following table.
DomainParticipant | ||
---|---|---|
USER_DATA | value | (not set) |
ENTITY_FACTORY | autoenable_created_entities | 1 |
Topic | ||
TOPIC_DATA | value | (not set) |
DURABILITY | kind service_cleanup_delay.sec service_cleanup_delay.nanosec |
VOLATILE_DURABILITY_QOS DURATION_ZERO_SEC DURATION_ZERO_NSEC |
DURABILITY_SERVICE | service_cleanup_delay.sec service_cleanup_delay.nanosec history_kind history_depth max_samples max_instances max_samples_per_instance |
DURATION_ZERO_SEC DURATION_ZERO_NSEC KEEP_LAST_HISTORY_QOS 1 LENGTH_UNLIMITED LENGTH_UNLIMITED LENGTH_UNLIMITED |
DEADLINE | period.sec period.nanosec |
DURATION_INFINITY_SEC DURATION_INFINITY_NSEC |
LATENCY_BUDGET | duration.sec duration.nanosec |
DURATION_ZERO_SEC DURATION_ZERO_NSEC |
LIVELINESS | kind lease_duration.sec lease_duration.nanosec |
AUTOMATIC_LIVELINESS_QOS DURATION_INFINITY_SEC DURATION_INFINITY_NSEC |
RELIABILITY | kind max_blocking_time.sec max_blocking_time.nanosec |
BEST_EFFORT_RELIABILITY_QOS DURATION_INFINITY_SEC DURATION_INFINITY_NSEC |
DESTINATION_ORDER | kind | BY_RECEPTION_TIMESTAMP_DESTINATIONORDER_QOS |
HISTORY | kind depth |
KEEP_LAST_HISTORY_QOS 1 |
RESOURCE_LIMITS | max_samples max_instances max_samples_per_instance |
LENGTH_UNLIMITED LENGTH_UNLIMITED LENGTH_UNLIMITED |
TRANSPORT_PRIORITY | value | 0 |
LIFESPAN | duration.sec duration.nanosec |
DURATION_INFINITY_SEC DURATION_INFINITY_NSEC |
OWNERSHIP | kind | SHARED_OWNERSHIP_QOS |
Publisher | ||
PRESENTATION | access_scope coherent_access ordered_access |
INSTANCE_PRESENTATION_QOS 0 0 |
PARTITION | name | (empty sequence) |
GROUP_DATA | value | (not set) |
ENTITY_FACTORY | autoenable_created_entities | 1 |
Subscriber | ||
PRESENTATION | access_scope coherent_access ordered_access |
INSTANCE_PRESENTATION_QOS 0 0 |
PARTITION | name | (empty sequence) |
GROUP_DATA | value | (not set) |
ENTITY_FACTORY | autoenable_created_entities | 1 |
DataWriter | ||
DURABILITY | kind service_cleanup_delay.sec service_cleanup_delay.nanosec |
VOLATILE_DURABILITY_QOS DURATION_ZERO_SEC DURATION_ZERO_NSEC |
DURABILITY_SERVICE | service_cleanup_delay.sec service_cleanup_delay.nanosec history_kind history_depth max_samples max_instances max_samples_per_instance |
DURATION_ZERO_SEC DURATION_ZERO_NSEC KEEP_LAST_HISTORY_QOS 1 LENGTH_UNLIMITED LENGTH_UNLIMITED LENGTH_UNLIMITED |
DEADLINE | period.sec period.nanosec |
DURATION_INFINITY_SEC DURATION_INFINITY_NSEC |
LATENCY_BUDGET | duration.sec duration.nanosec |
DURATION_ZERO_SEC DURATION_ZERO_NSEC |
LIVELINESS | kind lease_duration.sec lease_duration.nanosec |
AUTOMATIC_LIVELINESS_QOS DURATION_INFINITY_SEC DURATION_INFINITY_NSEC |
RELIABILITY | kind max_blocking_time.sec max_blocking_time.nanosec |
BEST_EFFORT_RELIABILITY_QOS DURATION_INFINITY_SEC DURATION_INFINITY_NSEC |
DESTINATION_ORDER | kind | BY_RECEPTION_TIMESTAMP_DESTINATIONORDER_QOS |
HISTORY | kind depth |
KEEP_LAST_HISTORY_QOS 1 |
RESOURCE_LIMITS | max_samples max_instances max_samples_per_instance |
LENGTH_UNLIMITED LENGTH_UNLIMITED LENGTH_UNLIMITED |
TRANSPORT_PRIORITY | value | 0 |
LIFESPAN | duration.sec duration.nanosec |
DURATION_INFINITY_SEC DURATION_INFINITY_NSEC |
USER_DATA | value | (not set) |
OWNERSHIP | kind | SHARED_OWNERSHIP_QOS |
OWNERSHIP_STRENGTH | value | 0 |
WRITER_DATA_LIFECYCLE | autodispose_unregistered_instances | 1 |
DataReader | ||
DURABILITY | kind service_cleanup_delay.sec service_cleanup_delay.nanosec |
VOLATILE_DURABILITY_QOS DURATION_ZERO_SEC DURATION_ZERO_NSEC |
DEADLINE | period.sec period.nanosec |
DURATION_INFINITY_SEC DURATION_INFINITY_NSEC |
LATENCY_BUDGET | duration.sec duration.nanosec |
DURATION_ZERO_SEC DURATION_ZERO_NSEC |
LIVELINESS | kind lease_duration.sec lease_duration.nanosec |
AUTOMATIC_LIVELINESS_QOS DURATION_INFINITY_SEC DURATION_INFINITY_NSEC |
RELIABILITY | kind max_blocking_time.sec max_blocking_time.nanosec |
BEST_EFFORT_RELIABILITY_QOS DURATION_INFINITY_SEC DURATION_INFINITY_NSEC |
DESTINATION_ORDER | kind | BY_RECEPTION_TIMESTAMP_DESTINATIONORDER_QOS |
HISTORY | kind depth |
KEEP_LAST_HISTORY_QOS 1 |
RESOURCE_LIMITS | max_samples max_instances max_samples_per_instance |
LENGTH_UNLIMITED LENGTH_UNLIMITED LENGTH_UNLIMITED |
USER_DATA | value | (not set) |
TIME_BASED_FILTER | minimum_separation.sec minimum_separation.nanosec |
DURATION_ZERO_SEC DURATION_ZERO_NSEC |
READER_DATA_LIFECYCLE | autopurge_nowriter_samples_delay.sec autopurge_nowriter_samples_delay.nanosec |
DURATION_ZERO_SEC DURATION_ZERO_NSEC |
OWNERSHIP | kind | SHARED_OWNERSHIP_QOS |
APPENDIX B
A transport can be selected in several different ways. One way is to defer the selection of the transport to the user at runtime. For example, the following process allows the SimpleTcp
transport to be used.
METHOD 1
For this method, create two files in the filesystem. Name the first file lib_tcp.conf
, and add the following to it (as one line):
- // lib_tcp.conf
- dynamic DCPS_SimpleTcpLoader Service_Object *
- SimpleTcp:_make_DCPS_SimpleTcpLoader() "-type SimpleTcp"
Name the second file tcp_conf.ini
, and include a section to configure SimpleTcp transport parameters, as follows
- // tcp_conf.ini
- [transport_impl_1]
- transport_type=SimpleTcp
In DataServer.cpp
, define a constant equal to the same value as the number of the section in tcp_conf.ini
— here, the value 1, to match transport_impl_1
— and then use create_transport_impl()
to create the transport:
- // DataServer.cpp (method 1)
- const OpenDDS::DCPS::TransportIdType TRANSPORT_IMPL_ID = 1;
-
- OpenDDS::DCPS::TransportImpl_rch trans_impl =
- TheTransportFactory->create_transport_impl(
- TRANSPORT_IMPL_ID,
- OpenDDS::DCPS::AUTO_CONFIG);
When DataServer is run with these command-line arguments:
-ORBSvcConf lib_tcp.conf -DCPSConfigFile tcp_conf.ini
the SimpleTcp
transport will be loaded by lib_tcp.conf
, referenced as the transport to use in tcp_conf.ini
, and instantiated by create_transport_impl()
.
METHOD 2
A second way to load a transport is entirely within code, by name, so external configuration files are not required. A call to get_or_create_configuration()
will load a specific transport at the specified ID for use by create_transport_impl()
:
- // DataServer.cpp (method 2)
- const OpenDDS::DCPS::TransportIdType TRANSPORT_IMPL_ID = 1;
-
- TheTransportFactory->get_or_create_configuration(
- TRANSPORT_IMPL_ID, ACE_TEXT("SimpleTcp"));
-
- OpenDDS::DCPS::TransportImpl_rch trans_impl =
- TheTransportFactory->create_transport_impl(
- TRANSPORT_IMPL_ID,
- OpenDDS::DCPS::AUTO_CONFIG);
METHOD 3
A third way is to use constants for transport IDs that have been predefined for the standard transports, rather than defining an ID of our own:
- // DataServer.cpp (method 3)
- OpenDDS::DCPS::TransportImpl_rch trans_impl =
- TheTransportFactory->create_transport_impl(
- ::OpenDDS::DCPS::DEFAULT_SIMPLE_TCP_ID,
- OpenDDS::DCPS::AUTO_CONFIG);