Using TAO and OpenDDS with .NET, Part III — DCPSInfoRepo Collocation

Using TAO and OpenDDS with .NET, Part III —
DCPSInfoRepo Collocation

By Charles Calkins, OCI Senior Software Engineer

March 2009


Introduction

For several years, OCI has been engaged with a customer in the maintenance of a legacy data-acquisition application. Data is collected by remote sensing devices and stored in a database, and the sensing devices are managed, and the data viewed, by an application written for Microsoft Windows.

Although originally a single-user application referencing a local database, over time the application has evolved into one where multiple users can simultaneously connect to a single centralized database. If one user makes a change to the database, all other connected users must be made aware of the change so their local states can be updated.

A solution to this problem is to create a single process to manage access to the database, and to provide database change notifications to interested client applications. In part I of this article, we described the architecture of DataServer, an application written in a combination of C++ and C#, that manages access to a database, and interacts with client applications via the use of CORBA for control, such as adding, updating, and deleting database records. In part II, we integrated DDS for client notification. The Object Computing, Inc. distributions of TAO and OpenDDS were used as the CORBA and DDS implementations, respectively.

In this article we will improve the user experience by reducing the number of processes in the system, and simplify the command-line arguments that must be provided.

Motivation

In part II of this article, we demonstrated the system by starting three types of processes. The DCPS Information Repository (DCPSInfoRepo.exe) is necessary for OpenDDS components to locate each other, so it must be started first. The command-line that we used is as follows, where the port on which to listen for OpenDDS connections must be specified, in addition to other arguments.

DCPSInfoRepo -ORBSvcConf lib_tcp.conf -ORBListenEndpoints iiop://:12345 
    -ORBDottedDecimalAddresses 0

DataServer is started next, and it must be provided the port on which to listen for CORBA requests, where the DCPSInfoRepo resides, as well as OpenDDS configuration options.

DataServer -ORBSvcConf lib_tcp.conf -ORBListenEndpoints iiop://:12346 
    -DCPSInfoRepo corbaloc::localhost:12345/DCPSInfoRepo 
    -DCPSConfigFile tcp_conf.ini

Lastly, multiple instances of the Client application are started, and each must be provided the location of the DCPSInfoRepo, the location of DataServer, as well as other parameters.

Client -ORBSvcConf lib_tcp.conf -ORBDottedDecimalAddresses 0 
    -ORBInitRef DataServer=corbaloc:iiop:localhost:12346/DataServer 
    -DCPSInfoRepo corbaloc:::12345/DCPSInfoRepo 
    -DCPSConfigFile tcp_conf.ini

From the user's perspective, this is complex. In a client-server system, one expects to have only clients and servers, without additional helper processes. One also expects that the configuration should be simple — set the server to listen on a particular port, and to inform each client of the host and port of the server process to establish a connection.

We will simplify the system by collocating the DCPS Information Repository (IR, for brevity) into the DataServer process, so the system will only contain server and client processes. We will also reduce the number of command-line arguments by creating command-lines internally, based on a reduced number of user-specified options.

The DCPS Information Repository (IR)

The IR acts as an intermediary between OpenDDS publishers and subscribers, allowing them to locate topics, and to find each other. In OpenDDS 1.3, it is implemented as a CORBA server, using TAO as its CORBA implementation.

As seen by the source in %DDS_ROOT%\dds\InfoRepo\DCPSInfoRepo.cpp, the behavior of the process can be distilled to:

  1. #include "DCPSInfoRepoServ.h"
  2. ...
  3. InfoRepo infoRepo(argc, argv);
  4. infoRepo.run();

where the DCPSInfoRepo process is a shell around the code in the DCPSInfoRepoServ.dll library. Since the IR's functionality is implemented in a library, we can integrate it into DataServer in a similar way, with three caveats. Looking into the implementation in %DDS_ROOT%\dds\InfoRepo\DCPSInfoRepoServ.cpp shows why.

The constructor for the InfoRepo class calls InfoRepo::init() to perform TAO and OpenDDS initialization. This line, at the beginning of the method, shows the first issue:

  1. orb_ = CORBA::ORB_init (cvt.get_argc(), cvt.get_ASCII_argv(), "");

Because the IR is implemented as a CORBA server, it uses an ORB internally, and, as seen above, the ORB is not given an ID. In part I of this article, DataServer was written also to use an unnamed ORB. As the behavior of ORB_init() is to return an existing ORB if the ID of the ORB being created matches an existing ORB, the ORB that is used for DataServer would be the same ORB as used for the IR. We wish DataServer's ORB to be separate, so we must use a named ORB in DataServer.

The second issue is due to this line:

  1. ::DDS::DomainParticipantFactory_var dpf
  2. = TheParticipantFactoryWithArgs(cvt.get_argc(),
  3. cvt.get_TCHAR_argv());

As shown in part IITheParticipantFactoryWithArgs() is a macro which expands to:

  1. TheServiceParticipant->get_domain_participant_factory(argc, argv)

where TheServiceParticipant is a process-wide singleton. While a new ORB can be obtained by ensuring a unique name, not so with TheServiceParticipant. The first call to TheParticipantFactoryWithArgs() initializes TheServiceParticipant with the supplied arguments, and all OpenDDS entities in the process share that same configuration. We must ensure that arguments related to OpenDDS that are needed by the IR are not in conflict with ones that are used by DataServer itself.

For example, as shown above we have been passing -DCPSInfoRepo corbaloc::localhost:12345/DCPSInfoRepo and -DCPSConfigFile tcp_conf.ini as arguments to DataServer. Looking further at get_domain_participant_factory() as implemented in %DDS_ROOT%\dds\DCPS\Service_Participant.cpp, we see that it will first try to parse command-line arguments via a call to parse_args(). The code to process the -DCPSInfoRepo command-line option is as follows:

  1. else if ((currentArg = arg_shifter.get_the_parameter(ACE_TEXT("-DCPSInfoRepo"))) != 0)
  2. {
  3. this->set_repo_ior( currentArg, DEFAULT_REPO);
  4. arg_shifter.consume_arg ();
  5. got_info = true;
  6. }

That is, processing the command-line argument results in a call to set_repo_ior() to resolve the supplied object reference of the IR, but, as this argument is being processed by the IR itself, the IR has not yet started, so an OBJECT_NOT_EXIST exception will be thrown.

A similar problem occurs if the object reference was provided in the DCPS configuration file. get_domain_participant_factory() calls load_configuration() to process the configuration file, which calls load_common_configuration(). Here, the DCPSInfoRepo option is processed as follows:

  1. if (got_info)
  2. {
  3. ACE_DEBUG((LM_DEBUG,
  4. ACE_TEXT("(%P|%t)ignore DCPSInfoRepo config value, use command option.\n")));
  5. }
  6. else
  7. {
  8. ACE_TString value;
  9. GET_CONFIG_STRING_VALUE (this->cf_, sect, ACE_TEXT("DCPSInfoRepo"), value)
  10. this->set_repo_ior( value.c_str(), DEFAULT_REPO);
  11. }

If got_info is false, then -DCPSInfoRepo was not present on the command-line, so should be read from the configuration file. TheDCPSInfoRepo entry is read, and the value passed to set_repo_ior().

If an object reference is specified, the same problem as above occurs — it does not refer to a valid object, so an OBJECT_NOT_EXIST exception will be thrown. It is interesting to note that if the DCPSInfoRepo entry is not present in the configuration file, set_repo_ior() will be called with a blank string, leading to an invalid object reference (INV_OBJREF) exception being thrown.

Returning to InfoRepo::init(), we see that, after the call to TheParticipantFactoryWithArgs() that led to the exceptions thrown due to command-line and configuration file parsing, the object reference of the IR is correctly set.

  1. TheServiceParticipant->set_repo_ior(
  2. ACE_TEXT_CHAR_TO_TCHAR(objref_str.in()),
  3. OpenDDS::DCPS::Service_Participant::DEFAULT_REPO
  4. );

So, although specifying the IR object reference via command-line or configuration file will generate an exception, the correct object reference will ultimately be set. Due to the manner of configuration file processing, though, the only way to eliminate any exception being thrown is to not use a command-line option for the reference, and no configuration file at all.

Not using a configuration file raises the third issue. Because the transport for DataServer is specified in the configuration file, another method must be chosen. To avoid hard-coding the transport type, we will allow the user to select the transport via a command-line option.

Taking the above into consideration, our plan to collocate the IR into the DataServer process, we will will perform the following steps:

  1. Create a thread for the IR to execute within.
  2. Assemble appropriate command-line arguments for OpenDDS and TAO initialization.
  3. Integrate the changes into DataServer.
  4. Update the server MPC file to include IR components.

SIDEBAR
The code in this article was developed with Microsoft Visual Studio 2005. It was compiled against TAO version 1.6aOpenDDS versions 1.3 and 2.0, and MPC version 3.7.31. Inline assembly was disabled to prevent the .NET-related compiler warning C4793, as the use of __asm forces native code generation. Wide character support was enabled, as .NET uses Unicode for string representation. The build settings for these features are as follows:

  1. // add to %ACE_ROOT%\ace\config.h
  2. #define ACE_LACKS_INLINE_ASSEMBLY 1
  3. #define ACE_USES_WCHAR 1
  4.  
  5. // add to %ACE_ROOT%\bin\MakeProjectCreator\config\default.features
  6. uses_wchar=1

The code archive associated with this article contains two source trees rooted at DataServer and DataServer2DataServer should be used with OpenDDS version 1.3, and DataServer2 with OpenDDS version 2.0. Differences between the source trees are minimal — in OpenDDS version 2.0, to be compliant with the OMG DDS 1.2 specification, several entity creation functions now take an additional mask parameter, and one method and one type in the DataReader listener have been renamed.

Step 1 — Create the IR Thread

Recall from above that the DCPSInfoRepo process is simply the following:

  1. #include "DCPSInfoRepoServ.h"
  2. ...
  3. InfoRepo infoRepo(argc, argv);
  4. infoRepo.run();

Because the run() method of InfoRepo blocks until the IR is shut down, we must execute this in a separate thread in DataServer. We can do this by creating an ACE task — the svc() method of ACE_Task_Base is executed in its own thread.

Although the code is as simple as calling infoRepo.run(), there is one problem. DataServer cannot perform any OpenDDS operation until the IR is running, but as run() is blocking, there isn't a straightforward way to detect that the IR is ready.

The solution to this problem can be found in a trick used in DDS testing, as is implemented in %DDS_ROOT%\tests\DCPS\LivelinessTest\publisher.cpp. A timer is created that expires virtually immediately, and, as the timer can only fire when it is executed by the IR's reactor, the reactor must therefore be ready to process OpenDDS requests as well. The action that the timer takes when it expires is to signal the outside world that the IR is ready for use. The timer is implemented by an ACE_Event_Handler for the timer itself, and an ACE_Condition for the signal.

We begin by creating a header file, InfoRepoColoc.h, as part of the DataServer project. We create a subclass of ACE_Event_Handler, as follows:

  1. // InfoRepoColoc.h
  2. class InfoRepoStartEvent : public ACE_Event_Handler {
  3. ACE_Recursive_Thread_Mutex lock_;
  4. ACE_Condition<ACE_Recursive_Thread_Mutex> cond_;
  5. public:
  6. InfoRepoStartEvent();
  7. virtual int handle_timeout(const ACE_Time_Value &, const void *);
  8. void Install();
  9. void WaitForStart();
  10. };

We implement InfoRepoStartEvent in InfoRepoColoc.cpp. The constructor is simple — it associates the condition variable with the mutex.

  1. // InfoRepoColoc.cpp
  2. InfoRepoStartEvent::InfoRepoStartEvent() : cond_(lock_) {}

Install() retrieves the ORB used by OpenDDS, and schedules the InfoRepoStartEvent (this) to execute in the ORB's reactor. The timer is set to expire one microsecond after it is started.

  1. void InfoRepoStartEvent::Install() {
  2. CORBA::ORB_var orb = TheServiceParticipant->get_ORB();
  3. ACE_Reactor* reactor = orb->orb_core()->reactor();
  4.  
  5. if (reactor->schedule_timer(this, 0, ACE_Time_Value(0,1)) == -1)
  6. throw std::exception("schedule_timer() returned -1");
  7. }

handle_timeout() is called when the timer expires. When it executes, it signals the condition variable.

  1. int InfoRepoStartEvent::handle_timeout(const ACE_Time_Value &, const void *) {
  2. ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, this->lock_, -1);
  3. return cond_.signal();
  4. }

WaitForStart() blocks until the condition variable is signaled.

  1. void InfoRepoStartEvent::WaitForStart() {
  2. ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_);
  3. cond_.wait();
  4. }

With the timer event completed, we can now create the IR thread. We create another class definition in InfoRepoColoc.hInfoRepoTask, with member variables for the command-line arguments, the IR, and the timer event.

  1. // InfoRepoColoc.h
  2. class InfoRepoTask : public ACE_Task_Base {
  3. ACE_ARGV_T<ACE_TCHAR> args_;
  4. InfoRepo *infoRepo_;
  5. InfoRepoStartEvent infoRepoStartEvent_;
  6. public:
  7. InfoRepoTask(ACE_ARGV_T<ACE_TCHAR> &args);
  8. ~InfoRepoTask();
  9. virtual int svc();
  10. void WaitForStart();
  11. };

Returning to InfoRepoColoc.cpp, implement the constructor to initialize the member variables. In step 2 we will create command lines using ACE_ARGV_T<>, but as the class does not implement a copy constructor, we initialize our member variable explicitly by an argc/argv pair.

  1. // InfoRepoColoc.cpp
  2. InfoRepoTask::InfoRepoTask(ACE_ARGV_T<ACE_TCHAR> &args) :
  3. infoRepo_(0), args_(args.argc(), args.argv()) {}

The destructor for InfoRepoTask shuts down the IR, and waits for its termination.

  1. InfoRepoTask::~InfoRepoTask() {
  2. if (infoRepo_ != 0)
  3. infoRepo_->shutdown();
  4. if (thr_mgr() != 0)
  5. thr_mgr()->wait();
  6. delete infoRepo_;
  7. }

In the svc() method we call run() on the IR, and display any errors that may occur. The error handling used here is the same as in %DDS_ROOT%\dds\InfoRepo\DCPSInfoRepo.cpp.

  1. int InfoRepoTask::svc() {
  2. if (infoRepo_==0) {
  3. std::cerr << "InfoRepo not created" << std::endl;
  4. return 1;
  5. }
  6.  
  7. try {
  8. infoRepo_->run();
  9. }
  10. catch (InfoRepo::InitError& ex) {
  11. std::cerr << "Unexpected initialization Error: "
  12. << ex.msg_ << std::endl;
  13. return 1;
  14. }
  15. catch (const CORBA::Exception& ex) {
  16. ex._tao_print_exception (
  17. "ERROR: ::DDS DCPS Info Repo caught exception");
  18. return 1;
  19. }
  20.  
  21. return 0;
  22. }

WaitForStart() creates the IR object, installs the timer event, starts the ACE task with a call to activate(), and then waits until the condition variable is signaled by the expiration of the timer. Thus, when WaitForStart() exits, the IR is up and running.

  1. void InfoRepoTask::WaitForStart() {
  2. int argc = args_.argc();
  3. ACE_TCHAR **argv = args_.argv();
  4.  
  5. try {
  6. infoRepo_ = new InfoRepo(argc, argv);
  7. infoRepoStartEvent_.Install();
  8. activate(THR_BOUND, 1, 0, ACE_DEFAULT_THREAD_PRIORITY);
  9. infoRepoStartEvent_.WaitForStart();
  10. } catch (InfoRepo::InitError& ex) {
  11. // can log or take other action, but rethrow as a std::exception
  12. throw std::exception(ex.msg_.c_str());
  13. } catch (const CORBA::Exception& /*ex*/) {
  14. // can log or take other action, but rethrow
  15. // ex._tao_print_exception("DCPSInfoRepo exception");
  16. throw; // rethrow
  17. }
  18. }

Step 2 — Create Command-line Arguments

As shown in part II, command-lines must be explicitly constructed for both TAO and OpenDDS, as sharing options between them can cause conflicts. For one, we saw that passing the same -ORBListenEndpoints argument to both TAO and OpenDDS initialization would cause the application to fail as both TAO and OpenDDS will try to listen for incoming requests on the same port.

The two command-lines that we have been using are:

DCPSInfoRepo -ORBSvcConf lib_tcp.conf -ORBListenEndpoints iiop://:12345 
    -ORBDottedDecimalAddresses 0

DataServer -ORBSvcConf lib_tcp.conf -ORBListenEndpoints iiop://:12346 
    -DCPSInfoRepo corbaloc::localhost:12345/DCPSInfoRepo 
    -DCPSConfigFile tcp_conf.ini

We see that we must instruct the IR to listen on port 12345, and DataServer to listen on port 12346. From the discussion above, we do not need to pass -DCPSInfoRepo and -DCPSConfigFile, though we desire to include any other TAO or OpenDDS arguments that a user may wish to include.

In order to simplify this, let us presume that the user need only supply a base port number which defaults to 12345 if not specified, and let all of the other arguments be automatically created. We can use the same ACE class that the IR does to parse arguments, ACE_Arg_Shifter, and form the command lines using ACE_ARGV_T<>.

Let us begin by creating a function in DataServer to parse command lines. We create a function ParseArgs() which will extract arguments passed on the command-line. We wish to allow users to provide additional TAO and OpenDDS arguments if desired, such as -ORBDebugLevelParseArgs() is passed argc and argv from ACE_TMAIN() and returns, by reference parameters, the parsed arguments.

  1. // DataServer.cpp
  2. void ParseArgs(int argc, ACE_TCHAR *argv[],
  3. int &port, ACE_TString& transport,
  4. std::vector<ACE_TString> &otherTAOArgs,
  5. std::vector<ACE_TString> &otherDDSArgs) {

The argument parsing itself is performed by the ACE_Arg_Shifter class. Methods of this class allow the current command-line argument, optionally with its parameter, to be examined, and either consumed or ignored. We create a command-line option, -p, which has one argument, the port number to use. We detect it, and consume, it, as follows:

  1. ACE_Arg_Shifter arg_shifter(argc, argv);
  2. while (arg_shifter.is_anything_left()) {
  3. const ACE_TCHAR *currentArg = 0;
  4. if ((currentArg = arg_shifter.get_the_parameter(ACE_TEXT("-p"))) != 0) {
  5. port = ACE_OS::atoi(currentArg);
  6. arg_shifter.consume_arg();
  7. }

We create a second command line option, -t, also with one argument, the transport to use. Please see the sidebar at the end of the section for more details.

  1. else if ((currentArg =
  2. arg_shifter.get_the_parameter(ACE_TEXT("-t"))) != 0) {
  3. transport = currentArg;
  4. arg_shifter.consume_arg();
  5. }

Because we want to accumulate TAO and OpenDDS arguments for later use, we check to see if the current argument being examined begins with -ORB or -DCPS, and, if so, add it to the appropriate list.

  1. else if (arg_shifter.cur_arg_strncasecmp(ACE_TEXT("-ORB")) != -1) {
  2. // add both the argument itself and its parameter
  3. otherTAOArgs.push_back(arg_shifter.get_current());
  4. arg_shifter.consume_arg();
  5. otherTAOArgs.push_back(arg_shifter.get_current());
  6. arg_shifter.consume_arg();
  7. }
  8. else if (arg_shifter.cur_arg_strncasecmp(ACE_TEXT("-DCPS")) != -1) {
  9. // add both the argument itself and its parameter
  10. otherDDSArgs.push_back(arg_shifter.get_current());
  11. arg_shifter.consume_arg();
  12. otherDDSArgs.push_back(arg_shifter.get_current());
  13. arg_shifter.consume_arg();
  14. }

If an argument is not recognized, we will ignore it.

  1. else
  2. arg_shifter.ignore_arg();
  3. }
  4. }

We now create a function which builds the command-lines. It takes as parameters what ParseArgs() has parsed, and returns, via reference parameter, command-lines in the form of ACE_ARGV_T<> objects.

  1. // DataServer.cpp
  2. void BuildCommandLines(int dcpsInfoRepoPort,
  3. ACE_TCHAR *argv0,
  4. const std::vector<ACE_TString> &otherTAOArgs,
  5. const std::vector<ACE_TString> &otherDDSArgs,
  6. ACE_ARGV_T<ACE_TCHAR> &irArgs,
  7. ACE_ARGV_T<ACE_TCHAR> &taoArgs) {

For convenience, we will assume that the port used for TAO is one greater than the one used by OpenDDS, though we could have created a separate parameter if we had desired. We first create string representations of the port numbers for later use.

  1. int dataServerPort = dcpsInfoRepoPort+1;
  2. ACE_TCHAR dcpsInfoRepoPortBuf[20];
  3. ACE_OS::itoa(dcpsInfoRepoPort, dcpsInfoRepoPortBuf, 10);
  4. ACE_TCHAR dataServerPortBuf[20];
  5. ACE_OS::itoa(dataServerPort, dataServerPortBuf, 10);

We now build the IR arguments by constructing and adding each argument to irArgs. Each call to add() corresponds to an argument that will be separated by a space on the command-line.

  1. irArgs.add(argv0);
  2. irArgs.add(ACE_TEXT("-ORBSvcConf"));
  3. irArgs.add(ACE_TEXT("lib_tcp.conf"));
  4. irArgs.add(ACE_TEXT("-ORBListenEndpoints"));
  5. // for simplicity, no hostname or IP address is specified in order to listen
  6. // on all interfaces
  7. irArgs.add(ACE_OS::strdup((ACE_TString(ACE_TEXT("iiop://:")) +
  8. dcpsInfoRepoPortBuf).c_str()));
  9. irArgs.add(ACE_TEXT("-ORBDottedDecimalAddresses"));
  10. irArgs.add(ACE_TEXT("0"));

Because, internally, the IR uses TAO, we wish to pass all TAO and OpenDDS arguments, such as -ORBDebugLevel, to the IR.

  1. for (std::vector<ACE_TString>::const_iterator it = otherDDSArgs.begin();
  2. it!=otherDDSArgs.end(); it++)
  3. irArgs.add(it->c_str());
  4. for (std::vector<ACE_TString>::const_iterator it = otherTAOArgs.begin();
  5. it!=otherTAOArgs.end(); it++)
  6. irArgs.add(it->c_str());

We construct the arguments for TAO in a similar way. We do not need to pass OpenDDS arguments to TAO.

  1. taoArgs.add(argv0);
  2. taoArgs.add(ACE_TEXT("-ORBListenEndpoints"));
  3. taoArgs.add(ACE_OS::strdup((ACE_TString(ACE_TEXT("iiop://:")) +
  4. dataServerPortBuf).c_str()));
  5. taoArgs.add(ACE_TEXT("-ORBSvcConf"));
  6. taoArgs.add(ACE_TEXT("lib_tcp.conf"));
  7. taoArgs.add(ACE_TEXT("-ORBDottedDecimalAddresses"));
  8. taoArgs.add(ACE_TEXT("0"));
  9. // just pass TAO args to TAO
  10. for (std::vector<ACE_TString>::const_iterator it = otherTAOArgs.begin();
  11. it!=otherTAOArgs.end(); it++)
  12. taoArgs.add(it->c_str());
  13. }

We also build command-lines for the client in a similar way, although we also include a -h option to specify the hostname of the machine running DataServer (defaulting to localhost), in addition to a -p option to specify the base port the port that DataServer is listening on (defaulting to 12345). Please see the code archive that accompanies this article for details.


SIDEBAR
In the code above, a single option, -t, to select the transport, is provided. The various transports have options which, while defaults are available, would need to be able to be modified by the user in the implementation of a complete system. For instance, the multicast transports allow a multicast group address to be supplied.

As we are not using a configuration file, these options need to be set directly in code. To do so, the DONT_AUTO_CONFIG enumeration is passed to create_transport_impl(), and an instance of a TransportConfiguration object is created. The various properties associated with the transport are set on the TransportConfiguration object, and the configuration is applied by a call to configure() on the Transport_Impl.

The OpenDDS test FooTest5 demonstrates how various transports are configured (see%DDS_ROOT%\tests\DCPS\FooTest5\common.cpp). For example, the configuration of the reliable multicast transport looks like:

  1. OpenDDS::DCPS::TransportImpl_rch writer_reliable_multicast_impl
  2. = TheTransportFactory->create_transport_impl(
  3. PUB_TRAFFIC_RELIABLE_MULTICAST,
  4. ACE_TEXT("ReliableMulticast"),
  5. OpenDDS::DCPS::DONT_AUTO_CONFIG);
  6.  
  7. OpenDDS::DCPS::TransportConfiguration_rch writer_config
  8. = TheTransportFactory->create_configuration(
  9. PUB_TRAFFIC_RELIABLE_MULTICAST,
  10. ACE_TEXT("ReliableMulticast"));
  11.  
  12. OpenDDS::DCPS::ReliableMulticastTransportConfiguration*
  13. writer_reliable_multicast_config
  14. = static_cast<
  15. OpenDDS::DCPS::ReliableMulticastTransportConfiguration*>
  16. (writer_config.in());
  17.  
  18. ACE_INET_Addr writer_address(writer_address_str);
  19. writer_reliable_multicast_config->multicast_group_address_ =
  20. writer_address;
  21. writer_reliable_multicast_config->multicast_group_address_str_ =
  22. writer_address_str;
  23.  
  24. if (writer_reliable_multicast_impl->configure(writer_config.in()) != 0)
  25. // ...

A full implementation of providing and setting arguments for desired transport parameters is beyond the scope of this article.


Step 3 — Integrate the Changes into DataServer

We can now update DataServer's ACE_TMAIN() to allow IR collocation. In part II, we iterated over the provided command-line arguments, and generated a new set by duplicating arguments as appropriate. We replace that code with calls to ParseArgs() and BuildCommandLines(), after setting appropriate defaults.

  1. int ACE_TMAIN(int argc, ACE_TCHAR *argv[]) {
  2. try {
  3. // create proper command lines
  4. int port = 12345;
  5. ACE_TString transport(ACE_TEXT("SimpleTcp"));
  6. std::vector<ACE_TString> otherTAOArgs, otherDDSArgs;
  7. ParseArgs(argc, argv, port, transport, otherTAOArgs, otherDDSArgs);
  8.  
  9. ACE_ARGV_T<ACE_TCHAR> irArgs, taoArgs;
  10. BuildCommandLines(port, argv[0], otherTAOArgs, otherDDSArgs, irArgs, taoArgs);

We now create an instance of the InfoRepoTask on the local stack, passing the appropriate command-line arguments.

  1. InfoRepoTask irTask(irArgs);

Before DataServer is allowed to proceed further, we wait for the IR to start.

  1. DDS::DomainParticipantFactory_var dpf = DDS::DomainParticipantFactory::_nil();
  2. DDS::DomainParticipant_var participant = DDS::DomainParticipant::_nil();
  3.  
  4. try {
  5. irTask.WaitForStart();

As we saw above, OpenDDS initialization is performed by the IR, so we only need to obtain TheParticipantFactory rather than initialize it ourselves.

  1. dpf = TheParticipantFactory;

We create the DomainParticipant as we did before, and use method 2 from Appendix B in part II of this article to create the transport.

  1. // ... create the DomainParticipant
  2.  
  3. // create the transport based on the command-line argument
  4. const OpenDDS::DCPS::TransportIdType TRANSPORT_IMPL_ID = 1;
  5.  
  6. TheTransportFactory->get_or_create_configuration(
  7. TRANSPORT_IMPL_ID, transport.c_str());
  8.  
  9. OpenDDS::DCPS::TransportImpl_rch trans_impl =
  10. TheTransportFactory->create_transport_impl(TRANSPORT_IMPL_ID,
  11. OpenDDS::DCPS::AUTO_CONFIG);

The various DDS entities are created as before. The initialization of TAO changes, however — the constructed command-line arguments are used, and an ORB ID is provided.

  1. // ... create the various DDS entities
  2.  
  3. // initialize the ORB
  4. int taoArgc = taoArgs.argc();
  5. ACE_TCHAR **taoArgv = taoArgs.argv();
  6. CORBA::ORB_var orb = CORBA::ORB_init(taoArgc, taoArgv, "DataServer");
  7.  
  8. // ... activate the POA manager, etc.

Because the IR cleans up OpenDDS when it terminates, we complete the changes to DataServer by removing the OpenDDS cleanup code that we added in part II.

Step 4 — MPC

Our last step is to update the MPC file for DataServer so it can link against the DCPSInfoRepoServ library (via a libs clause), and to include InfoRepoColoc.cpp for compilation by adding it to the Source_Files section. The resulting file is as follows:

  1. project : taoserver, dcpsexe, CPPBase, iortable {
  2. exename = DataServer
  3. after += IDL
  4. after += DataLib
  5. after += DatabaseNotification
  6.  
  7. includes += ../IDL
  8. Source_Files {
  9. Database_i.cpp
  10. DataServer.cpp
  11. ../IDL/DatabaseC.cpp
  12. ../IDL/DatabaseS.cpp
  13. InfoRepoColoc.cpp
  14. }
  15. IDL_Files {
  16. }
  17.  
  18. managed = 1
  19.  
  20. libs += DCPSInfoRepoServ
  21. }

Conclusion

In this article, we have examined the DCPS Information Repository and shown how it can be colocated into a server process, with the help of various ACE classes. We have reduced the number of processes in the system to just a server and clients, and the number of command-line arguments to the minimum necessary to locate the components of the system. This concludes this series of articles on using TAO and OpenDDS with .NET

References

The Middleware News Brief is a periodic newsletter. The purpose and intent of this publication is to advance and inform on features, news, and technical information about Open Source, middleware technologies (e.g., TAO, OpenDDS, JacORB), including case studies and other technical content.

© Copyright Object Computing, Inc. 1993, 2016. All rights reserved

Subscribe

secret