The CORBA Notification Service: JacORB and TAO Interoperability

The CORBA Notification Service: JacORB and TAO Interoperability

by Heather Drury, Product Services Manager and Senior Software Engineer

June 2002

This month's CORBA News Brief from OCI provides a high-level overview of the OMG Notification Service and illustrates using the TAO 1.2a Notification Service interoperating with a JacORB v1.4 consumer.

Overview of the Notification Service

The Notification Service Specification was originally produced by the Telecommunications Working Group within the Object Management Group (OMG), an international consortium of over 800 companies, and was later adopted as a standard service.  The goal was to extend the more basic OMG Event Service specification to support telecommunication applications yet remain backward compatible with the standard Event Service. The Notification Service preserves all the semantics specified for the OMG Event Service, allowing for interoperability between basic Event Service and Notification Service clients.  

Both the Notification and Event services enable events (with an optional data payload) to be sent and received between objects in a decoupled fashion. This provides a more flexible mechanism for message transmission than if events were transmitted directly.  Analogous to the standard OMG Event Service, the Notification model utilizes an event channel as the substrate for the communication of messages between client applications. Applications that provide messages are termed suppliers while applications that receive (or consume) messages are known as consumers. Suppliers and consumers are completely decoupled from one another (i.e., suppliers have no knowledge of which consumers are listening for their messages and vice versa).  While the OMG Event Service supports asynchronous exchange of event messages, it has several serious limitations including no support for event filtering and no ability to be configured for varying levels of qualities of service (QoS). The Notification Service was designed to enhance the OMG Event Service by adding the following features:

Figure 1 shows the general architecture of the Notification Service. Suppliers and consumers of a notification channel are connected via their associated proxies. Administration interfaces on the supplier side and consumer side provide the ability to cluster a set of proxies based on common configurations.

Figure 1: Notification Service Architecture 

Figure 1: Notification Service Architecture

TAO Notification Service

TAO 1.2a includes an implementation of the OMG Notification Service. Highlights of the 1.2a release include support for the following features:

Example using TAO and JacORB

We illustrate the utility and interoperability of the Notification Service by creating a simple supplier application that sends events through the TAO Notification Channel to a JacORB consumer. The TAO application (written in C++) is a push supplier that connects to the Notification Channel and pushes events.  These events are simple text messages that are comprised of a "sender", "subject" and "body".  The JacORB (written in Java) consumer is a push consumer that connects to the channel and asynchronously receives event notifications from the channel. Both the supplier and consumer use the structured event type. The example is composed of three applications as illustrated in Figure 2 and further described below:

Figure 2: Overview of TAO/JacORB example

Figure 2: Overview of TAO/JacORB Example

 

  1. TAO MessengerClient: a simple C++ application that connects to the TAO Naming Service, looks up the MessengerServer object reference and invokes the send_message() method on that object to send two messages, one with "TAO" as the subject and the other with "JacORB" as the subject.  The MessengerClient application does not directly interact with the Notification service in any way. We will not describe the MessengerClient application in any further detail.
  2. TAO MessengerServer: a C++ application that plays the role of the server for the MessengerClient and the supplier for the JacORB consumer. The Notification channel is created within this application. The implementation for the Messenger servant contains a send_message() method that is invoked from the MessengerClient and packages the incoming data (with the associated "subject", "sender", and "body") into a structured event and invokes the push_structured_event() method on the consumer proxy object.
  3. JacORB Consumer: a Java application that uses JacORBv1.4 to connect to the TAO Naming Service, look up the Notification Event channel, and receive incoming events.  This application also utilizes the filtering functionality of the TAO Notification Service to limit incoming events to those that contain "JacORB" in their subject and to ignore all others.
  4. TAO Notification Service: service use to set up notification channel and associated administrative objects.
  5. TAO Naming Service: naming service used to locate CORBA objects.

TAO C++ Push Supplier Application

The MessengerServer main() method initializes the ORB (TAO), creates the Messenger servant, and registers the Messenger object with the TAO Naming Service. The code for theMessengerServer.cpp class is shown below:

  1. #include <orbsvcs/CosNamingC.h>
  2. #include "Messenger_i.h"
  3. #include <ace/streams.h>
  4.  
  5. int
  6. main(int argc, char * argv[])
  7. {
  8. try
  9. {
  10. // Initialize orb
  11. CORBA::ORB_var orb = CORBA::ORB_init(argc, argv);
  12.  
  13. // Find the Naming Service.
  14. CORBA::Object_var rootObj =
  15. orb->resolve_initial_references("NameService");
  16. CosNaming::NamingContext_var rootNC =
  17. CosNaming::NamingContext::_narrow(rootObj.in());
  18.  
  19. // Get the Root POA.
  20. CORBA::Object_var obj = orb->resolve_initial_references("RootPOA");
  21. PortableServer::POA_var poa = PortableServer::POA::_narrow(obj.in());
  22.  
  23. // Activate POA manager
  24. PortableServer::POAManager_var mgr = poa->the_POAManager();
  25. mgr->activate();
  26.  
  27. // Create our Messenger servant.
  28.  
  29. messenger_servant(orb.in());
  30.  
  31. // Register it with the RootPOA.
  32. PortableServer::ObjectId_var oid =
  33. poa->activate_object( &messenger_servant );
  34. CORBA::Object_var messenger_obj = poa->id_to_reference( oid.in() );
  35.  
  36. // Bind it in the Naming Service.
  37. CosNaming::Name name;
  38. name.length (1);
  39. name[0].id = CORBA::string_dup("MessengerService");
  40. rootNC->rebind(name, messenger_obj.in());
  41.  
  42. // Accept requests
  43. orb->run();
  44. orb->destroy();
  45.  
  46. }
  47. catch (CORBA::Exception& ex) {
  48. cerr << ex << endl;
  49. return 1;
  50. }
  51. return 0;
  52.  
  53. }

We now implement the structured push supplier that connects to the Notification Service and sends structured events when its send_message() method is invoked.  The steps involved in connecting the supplier to the event channel are illustrated in steps 1-6 in Figure 3 and further described below. The corresponding lines of code are also identified below within the MessengerServer example.

Figure 3: TAO C++ Messenger Server Supplier

Figure 3: Tao C++ Messenger Server Supplier

 

  1. Obtain an object reference to the event channel factory. (lines 21-22)
  2. Obtain an event channel. (lines 28-31)
  3. Obtain the SupplierAdmin object reference. (lines 47-48)
  4. Obtain a structured push consumer proxy object. (lines 56-59)
  5. Receive an incoming message from MessengerClient application
  6. Connect to the proxy to push the structured event. (line 119)
     

The Messenger_i class implements the Messenger interface (containing the send_message() method) and also acts as an event supplier to the TAO Notification Channel. The constructor for the Messenger servant initializes the ORB and uses the TAO Naming Service ("NameService") to look up the TAO Notification Service channel factory. The Notification Channel is then bound to the root naming context of the TAO Naming Service under the name "MyEventChannel." 

  1. Messenger_i::Messenger_i (CORBA::ORB_ptr orb)
  2. : orb_ (CORBA::ORB::_duplicate(orb))
  3.  
  4. {
  5. try
  6. {
  7. CORBA::Object_var poa_obj = orb->resolve_initial_references("RootPOA");
  8. PortableServer::POA_var poa = PortableServer::POA::_narrow(poa_obj.in());
  9.  
  10. CORBA::Object_var naming_obj =
  11. orb_->resolve_initial_references ("NameService");
  12.  
  13. if (CORBA::is_nil(naming_obj.in())) {
  14. cerr << "Unable to find naming service" << endl;
  15. }
  16.  
  17. CosNaming::NamingContext_var naming_context =
  18. CosNaming::NamingContext::_narrow(naming_obj.in());
  19.  
  20. // Create an instance of TAO's notification event channel
  21. CosNotifyChannelAdmin::EventChannelFactory_var notify_factory =
  22. TAO_Notify_EventChannelFactory_i::create(poa.in());
  23.  
  24. CosNotifyChannelAdmin::ChannelID id;
  25. CosNotification::QoSProperties initial_qos;
  26. CosNotification::AdminProperties initial_admin;
  27.  
  28. CosNotifyChannelAdmin::EventChannel_var ec =
  29. notify_factory->create_channel (initial_qos,
  30. initial_admin,
  31. id);
  32.  
  33. if (CORBA::is_nil (ec.in())) {
  34. cerr << "Unable to crete event channel" << endl;
  35. }
  36.  
  37. CosNaming::Name name(1);
  38. name.length(1);
  39. name[0].id = CORBA::string_dup("MyEventChannel");
  40.  
  41. naming_context->rebind(name, ec.in());
  42.  
  43. CosNotifyChannelAdmin::AdminID adminid;
  44. CosNotifyChannelAdmin::InterFilterGroupOperator ifgop =
  45. CosNotifyChannelAdmin::AND_OP;
  46.  
  47. CosNotifyChannelAdmin::SupplierAdmin_var supplier_admin =
  48. ec->new_for_suppliers (ifgop, adminid);
  49.  
  50. if (CORBA::is_nil (supplier_admin.in())) {
  51. cerr << "Unable to find supplier admin" << endl;
  52. }
  53.  
  54. CosNotifyChannelAdmin::ProxyID supplieradmin_proxy_id;
  55.  
  56. CosNotifyChannelAdmin::ProxyConsumer_var proxy_consumer =
  57. supplier_admin->obtain_notification_push_consumer(
  58. CosNotifyChannelAdmin::STRUCTURED_EVENT,
  59. supplieradmin_proxy_id);
  60.  
  61. StructuredEventSupplier_i *servant =
  62. new StructuredEventSupplier_i(orb_.in());
  63.  
  64. PortableServer::ObjectId_var oid = poa->activate_object(servant);
  65. CORBA::Object_var supplier_obj = poa->id_to_reference(oid.in());
  66. CosNotifyComm::StructuredPushSupplier_var supplier =
  67. CosNotifyComm::StructuredPushSupplier::_narrow(supplier_obj.in());
  68.  
  69. consumer_proxy_ =
  70. CosNotifyChannelAdmin::StructuredProxyPushConsumer::
  71. _narrow(proxy_consumer.in());
  72.  
  73. if (CORBA::is_nil (consumer_proxy_.in())) {
  74. cerr << "Unable to find structured proxy push consumer" << endl;
  75. }
  76. consumer_proxy_->connect_structured_push_supplier(supplier.in());
  77.  
  78. }
  79. catch (CORBA::Exception &ex) {
  80. cerr << ex << endl;
  81. }
  82. }

The send_message() method of the Messenger Servant formats the message and then creates a new structured event and populates it with the contents of the message.  The push_structured_event() operation of the structured push consumer proxy object is used to push the event to the notification channel.

  1. CORBA::Boolean Messenger_i::send_message (const char * user_name,
  2. const char * subject,
  3. char *& message)
  4. throw (CORBA::SystemException)
  5. {
  6.  
  7. cout << "Message from: " << user_name << endl;
  8. cout << "Subject: " << subject << endl;
  9. cout << "Message: " << message << endl;
  10.  
  11. try
  12. {
  13.  
  14. // Event Definition
  15. CosNotification::StructuredEvent event;
  16.  
  17. event.header.fixed_header.event_type.domain_name =
  18. CORBA::string_dup("OCI");
  19. // string
  20. event.header.fixed_header.event_type.type_name =
  21. CORBA::string_dup("examples");
  22. // string
  23. event.header.fixed_header.event_name =
  24. CORBA::string_dup("myevent");
  25.  
  26. // sequence<Property>: string name, any value
  27. event.filterable_data.length (1);
  28. event.filterable_data[0].name = CORBA::string_dup("From");
  29. event.filterable_data[0].value <<= (const char *)user_name;
  30. event.filterable_data.length (2);
  31. event.filterable_data[1].name = CORBA::string_dup("Subject");
  32. event.filterable_data[1].value <<= (const char *)subject;
  33. event.filterable_data.length (3);
  34. event.filterable_data[2].name = CORBA::string_dup("Message");
  35. event.filterable_data[2].value <<= (const char *)message;
  36.  
  37. consumer_proxy_->push_structured_event(event);
  38. }
  39.  
  40. catch (CosNotifyComm::InvalidEventType &) {
  41. cerr << "Invalid Event Type Exception " << endl;
  42. return 1;
  43. }
  44.  
  45. catch (CORBA::Exception &ex) {
  46. cerr << ex << endl;
  47. return 1;
  48. }
  49. return 0;
  50. }

Lastly, the supplier class StructuredEventSupplier_i, which implements theCosNotifyComm:StructuredPushSupplier IDL interface, must be implemented. It contains two operations:

The constructor for this class simply duplicates the ORB reference.

  1. StructuredEventSupplier_i::StructuredEventSupplier_i(CORBA::ORB_ptr orb)
  2. : orb_(CORBA::ORB::_duplicate(orb))
  3. {
  4. }

The implementation for the subscription_change() method does nothing for the current implementation. The implementation for the disconnect_structured_push_supplier() deactivates the supplier object.

  1. void StructuredEventSupplier_i::disconnect_structured_push_supplier ()
  2. throw (CORBA::SystemException)
  3. {
  4.  
  5. CORBA::Object_var obj = orb_->resolve_initial_references ("POACurrent");
  6. PortableServer::Current_var current =
  7. PortableServer::Current::_narrow (obj.in());
  8. PortableServer::POA_var poa = current->get_POA ();
  9. PortableServer::ObjectId_var objectId = current->get_object_id ();
  10. poa->deactivate_object (objectId.in());
  11.  
  12. }

JacORB Java Push Consumer Application

We now implement the JacORB consumer which implements a structured push consumer that connects to the TAO Notification Channel and prints all the structured events it receives. The consumer uses the filtering functionality of the Notification Service to set up a constraint to allow only messages whose subjects contain "JacORB" to be received.

Figure 4: JacORB Consumer

Figure 4: JacORB Consumer

 

  1. Obtain an object reference to the event channel. (line 36)
  2. Obtain the ConsumerAdmin object reference. (line 42)
  3. Use the default filter factory to create a new filter; add the filter to the ConsumerAdmin. (lines 64-65, 76)
  4. Obtain a structured push supplier proxy object. (lines 95, 105)
  5. Connect to the proxy. (lines 129-133)

The JacORB consumer is implemented in the Java class JacORBConsumer shown below:

  1. package JacORBConsumer;
  2.  
  3. import org.omg.CosNotification.*;
  4. import org.omg.CosNotifyComm.*;
  5. import org.omg.CosNotifyChannelAdmin.*;
  6. import org.omg.CosNotifyFilter.*;
  7. import org.omg.CosNaming.*;
  8. import org.omg.CosNaming.NamingContextPackage.*;
  9. import org.omg.PortableServer.*;
  10.  
  11. import java.io.*;
  12. import java.util.*;
  13. import java.net.*;
  14.  
  15. public class Consumer extends StructuredPushConsumerPOA
  16. {
  17. public static void main( String[] args )
  18. {
  19. try {
  20.  
  21. System.out.println ("Initialize orb\n");
  22. org.omg.CORBA.ORB orb = org.omg.CORBA.ORB.init(args, null);
  23.  
  24.  
  25. // get naming service reference and context
  26. BufferedReader reader = new BufferedReader(new FileReader("ns.ior"));
  27. String ns = reader.readLine();
  28. org.omg.CORBA.Object obj = orb.string_to_object(ns);
  29.  
  30. NamingContext rootContext = NamingContextHelper.narrow( obj );
  31. System.out.println("Got Root context: "+ obj);
  32.  
  33. // find the event channel reference
  34. NameComponent[] name = { new NameComponent( "MyEventChannel", "" )};
  35. obj = rootContext.resolve(name);
  36. EventChannel channel = EventChannelHelper.narrow(obj);
  37. System.out.println ("**************MyChannel is " + channel.toString ());
  38.  
  39. // get the admin interface and the supplier proxy
  40. InterFilterGroupOperator ifgop = InterFilterGroupOperator.AND_OP;
  41. org.omg.CORBA.IntHolder adminId = new org.omg.CORBA.IntHolder(0);
  42. ConsumerAdmin consumerAdmin = channel.new_for_consumers(ifgop, adminId);
  43.  
  44. // get the default filter factory
  45. FilterFactory filterFactory = channel.default_filter_factory();
  46. Filter filter = null;
  47. if ( filterFactory == null ){
  48. System.err.println ("No default filter factory found!");
  49. }
  50. try {
  51. filter = filterFactory.create_filter("TCL");
  52. }
  53. catch (Exception e){
  54. System.err.println( "ERROR: " + e );
  55. e.printStackTrace( System.err );
  56. }
  57.  
  58. String expr = "Subject == 'JacORB'";
  59.  
  60. ConstraintExp exp[] = new ConstraintExp[1];
  61. EventType eventType[] = new EventType [0];
  62. exp[0] = new ConstraintExp (eventType, expr);
  63. try {
  64. ConstraintInfo info[] = filter.add_constraints (exp);
  65. consumerAdmin.add_filter(filter);
  66. }
  67. catch (InvalidConstraint ex) {
  68. System.err.println( "ERROR: " + e );
  69. e.printStackTrace( System.err );
  70. }
  71.  
  72. EventType added[] = new EventType[1];
  73. EventType removed[] = new EventType [0];
  74. added[0] = new EventType ("*", "*");
  75. try{
  76. consumerAdmin.subscription_change(added, removed);
  77. }
  78. catch (Exception e){
  79. System.err.println( "ERROR: " + e );
  80. e.printStackTrace( System.err );
  81. }
  82.  
  83. POA poa = org.omg.PortableServer.POAHelper.narrow
  84. (orb.resolve_initial_references("RootPOA"));
  85.  
  86. // create and implicitly activate the client
  87. StructuredPushConsumer structuredPushConsumer =
  88. (StructuredPushConsumer)new Consumer()._this(orb);
  89.  
  90. // get the stuctured proxy push supplier
  91. ClientType clientType = ClientType.STRUCTURED_EVENT;
  92. org.omg.CORBA.IntHolder proxyId = new org.omg.CORBA.IntHolder (0);
  93. ProxySupplier proxySupplier = null;
  94. try{
  95. proxySupplier = consumerAdmin.obtain_notification_push_supplier (clientType, proxyId);
  96. }
  97. catch( AdminLimitExceeded e ){
  98. System.err.println ( "ERROR: " + e);
  99. e.printStackTrace( System.err );
  100. }
  101. StructuredProxyPushSupplier proxyPushSupplier =
  102. StructuredProxyPushSupplierHelper.narrow(proxySupplier);
  103.  
  104. // connect ourselves to the event channel
  105. proxyPushSupplier.connect_structured_push_consumer(structuredPushConsumer);
  106.  
  107. poa.the_POAManager().activate();
  108. System.out.println ("run the orb");
  109. orb.run();
  110. }
  111. // Catch exceptions
  112. catch ( Exception e ) {
  113. System.err.println( "ERROR: " + e );
  114. e.printStackTrace( System.err );
  115. }
  116. System.out.println("Normal Termination...");
  117. }
  118.  
  119. public void disconnect_structured_push_consumer (){
  120. System.out.println ("disconnect_structured_push_consumer invoked");
  121. }
  122.  
  123. public void offer_change (EventType added[], EventType removed[]){
  124. System.out.println ("offer_change invoked");
  125. }
  126.  
  127. public void push_structured_event (StructuredEvent event){
  128. try {
  129. System.out.println ("\nevent name is: " + event.header.fixed_header.event_name);
  130. System.out.println ("domain name is: " + event.header.fixed_header.event_type.domain_name);
  131. System.out.println ("type name is: " + event.header.fixed_header.event_type.type_name);
  132. for (int i = 0; i < event.filterable_data.length; i++){
  133. System.out.println (event.filterable_data[i].name + ":\t" + event.filterable_data[i].value);
  134. }
  135. }
  136. catch ( Exception e ) {
  137. System.err.println( "ERROR: " + e );
  138. e.printStackTrace( System.err );
  139. }
  140. }
  141. }

The consumer locates the TAO Notification service and invokes the new_for_consumers() method on the channel interface to get the ConsumerAdmin object reference. The consumer uses the default_filter_factory() to create a filter that utilizes the TCL constraint language. A constraint is created ("Subject == 'JacORB'") and added to the filter using the add_constraints() method on the filter. The filter is then added to the ConsumerAdmin object using the add_filter() method. Invoking the obtain_notification_push_supplier() operation on the ConsumerAdmin object obtains the supplier push proxy. Finally, the proxy is connected and the consumer waits for events to be pushed. Events are printed in the push_structured_event() method. Invoking the disconnect_structured_push_consumer() operation, by the channel, will disconnect the consumer at any point in time (care should be taken in the disconnection of suppliers and consumers). The offered_change() operation does nothing in this example, but an intelligent consumer may, for example, disconnect when the channel no longer offers the types of events that are of interest.

Summary

This article focused on illustrating how to use the TAO Notification Service in conjunction with a JacORB consumer. There are many resources available that provide greater detail into the OMG Notification Service, TAO, and JacORB. These include the OMG Notification Service specification and Java Programming with CORBA by Brose, Vogel, and Duddy. In depth information on CORBA programming with TAO can be found from the the recently released TAO 1.2a Developer's Guide. Other related resources can be found by visiting the links listed in the References section.

References

[1] TAO at Washington University, St. Louis
http://tao.doc.wustl.edu 
[2] OCI TAO: The ACE ORB
http://www.theaceorb.com
[3] OCI TAO Resources
http://www.theaceorb.com/references/
[4] JacORB
http://www.jacorb.org
[5] OMG Notification Service Specification
http://www.omg.org/cgi-bin/doc?formal/2000-06-20

The Middleware News Brief is a periodic newsletter. The purpose and intent of this publication is to advance and inform on features, news, and technical information about Open Source, middleware technologies (e.g., TAO, OpenDDS, JacORB), including case studies and other technical content.

© Copyright Object Computing, Inc. 1993, 2016. All rights reserved

Subscribe

secret