The CORBA Notification Service: JacORB and TAO Interoperability

Middleware News Brief (MNB) features news and technical information about Open Source middleware technologies.

Overview of the Notification Service

The Notification Service Specification was originally produced by the Telecommunications Working Group within the Object Management Group (OMG), an international consortium of over 800 companies, and was later adopted as a standard service. The goal was to extend the more basic OMG Event Service specification to support telecommunication applications, yet remain backward compatible with the standard Event Service. The Notification Service preserves all the semantics specified for the OMG Event Service, allowing for interoperability between basic Event Service and Notification Service clients.

Both the Notification and Event services enable events (with an optional data payload) to be sent and received between objects in a decoupled fashion. This provides a more flexible mechanism for message transmission than if events were transmitted directly.

Analogous to the standard OMG Event Service, the Notification model utilizes an event channel as the substrate for the communication of messages between client applications. Applications that provide messages are termed suppliers while applications that receive (or consume) messages are known as consumers. Suppliers and consumers are completely decoupled from one another (i.e., suppliers have no knowledge of which consumers are listening for their messages and vice versa).

While the OMG Event Service supports asynchronous exchange of event messages, it has several serious limitations, including no support for event filtering and no ability to be configured for varying levels of qualities of service (QoS). The Notification Service was designed to enhance the OMG Event Service by adding the following features:

  • Structured event type. The Notification Service introduces a new event type, the structured event, that has a well-defined data structure into which messages can be inserted and sent to interested consumers. Messages can be defined using standard IDL and provide a much more strongly typed event than was available in the OMG Event Service. In addition, a sequence of structured events can be transmitted simultaneously thereby increasing communication efficiency.
  • Event filtering. The Notification Service allows consumers of events to use filters to selectively specify which events they are interested in receiving. Event filtering occurs within the Notification Service so consumers receive only events in which they are interested and do not spend processing cycles dealing with unwanted events.
  • Publication of offers and subscriptions. The Notification Service provides a way for suppliers to inform consumers of the types of events they produce, thereby allowing developers of event consumers to make informed decisions as to whether they should continue to receive events of a particular type. Likewise, the Notification Service provides a way for consumers of events to inform suppliers of the types of events they are interested in receiving, thereby allowing developers of event suppliers to make informed decisions as to whether they should continue to produce events of a particular type.
  • Quality of Service properties. The Notification Service specifies how various QoS properties can be configured including properties that control reliability, priority, order, and discard policies for message delivery (as well as many others).

Figure 1 shows the general architecture of the Notification Service. Suppliers and consumers of a notification channel are connected via their associated proxies. Administration interfaces on the supplier side and consumer side provide the ability to cluster a set of proxies based on common configurations.

Figure 1. Notification Service Architecture

Figure 1. Notification Service Architecture

TAO Notification Service

TAO 1.2a includes an implementation of the OMG Notification Service. Highlights of the 1.2a release include support for the following features:

  • Multithreaded event dispatching and filter evaluation;
  • Full ETCL filtering constraint grammar;
  • SequencePushConsumer interface;
  • Most of the standard quality of service (QoS) properties (with the exception of reliability);
  • Both untyped and structured events (but not typed events);
  • Push model (but not pull);
  • Event channel factory; and
  • Offers and subscriptions.

Example using TAO and JacORB

We illustrate the utility and interoperability of the Notification Service by creating a simple supplier application that sends events through the TAO Notification Channel to a JacORB consumer.

The TAO application (written in C++) is a push supplier that connects to the Notification Channel and pushes events.  These events are simple text messages that comprise a "sender," a "subject," and a "body."

The JacORB (written in Java) consumer is a push consumer that connects to the channel and asynchronously receives event notifications from the channel. Both the supplier and consumer use the structured event type.

The example is composed of three applications as illustrated in Figure 2 and further described below:

Figure 2. Overview of TAO/JacORB example

Figure 2. Overview of TAO/JacORB Example
  1. TAO MessengerClient. A simple C++ application that connects to the TAO Naming Service, looks up the MessengerServer object reference and invokes the send_message() method on that object to send two messages, one with "TAO" as the subject and the other with "JacORB" as the subject. The MessengerClient application does not directly interact with the Notification service in any way. We will not describe the MessengerClient application in any further detail.
  2. TAO MessengerServer. A C++ application that plays the role of the server for the MessengerClient and the supplier for the JacORB consumer. The Notification channel is created within this application. The implementation for the Messenger servant contains a send_message() method that is invoked from the MessengerClient and packages the incoming data (with the associated "subject", "sender", and "body") into a structured event and invokes the push_structured_event() method on the consumer proxy object.
  3. JacORB Consumer. A Java application that uses JacORBv1.4 to connect to the TAO Naming Service, look up the Notification Event channel, and receive incoming events. This application also utilizes the filtering functionality of the TAO Notification Service to limit incoming events to those that contain "JacORB" in their subject and to ignore all others.
  4. TAO Notification Service. Service use to set up notification channel and associated administrative objects.
  5. TAO Naming Service. Naming service used to locate CORBA objects.

TAO C++ Push Supplier Application

The MessengerServer main() method initializes the ORB (TAO), creates the Messenger servant, and registers the Messenger object with the TAO Naming Service. The code for theMessengerServer.cpp class is shown below:

  1. #include <orbsvcs/CosNamingC.h>
  2. #include "Messenger_i.h"
  3. #include <ace/streams.h>
  4.  
  5. int
  6. main(int argc, char * argv[])
  7. {
  8. try
  9. {
  10. // Initialize orb
  11. CORBA::ORB_var orb = CORBA::ORB_init(argc, argv);
  12.  
  13. // Find the Naming Service.
  14. CORBA::Object_var rootObj =
  15. orb->resolve_initial_references("NameService");
  16. CosNaming::NamingContext_var rootNC =
  17. CosNaming::NamingContext::_narrow(rootObj.in());
  18.  
  19. // Get the Root POA.
  20. CORBA::Object_var obj = orb->resolve_initial_references("RootPOA");
  21. PortableServer::POA_var poa = PortableServer::POA::_narrow(obj.in());
  22.  
  23. // Activate POA manager
  24. PortableServer::POAManager_var mgr = poa->the_POAManager();
  25. mgr->activate();
  26.  
  27. // Create our Messenger servant.
  28.  
  29. messenger_servant(orb.in());
  30.  
  31. // Register it with the RootPOA.
  32. PortableServer::ObjectId_var oid =
  33. poa->activate_object( &messenger_servant );
  34. CORBA::Object_var messenger_obj = poa->id_to_reference( oid.in() );
  35.  
  36. // Bind it in the Naming Service.
  37. CosNaming::Name name;
  38. name.length (1);
  39. name[0].id = CORBA::string_dup("MessengerService");
  40. rootNC->rebind(name, messenger_obj.in());
  41.  
  42. // Accept requests
  43. orb->run();
  44. orb->destroy();
  45.  
  46. }
  47. catch (CORBA::Exception& ex) {
  48. cerr << ex << endl;
  49. return 1;
  50. }
  51. return 0;
  52.  
  53. }

We now implement the structured push supplier that connects to the Notification Service and sends structured events when its send_message() method is invoked.

The steps involved in connecting the supplier to the event channel are illustrated in steps 1-6 in Figure 3 and further described below. The corresponding lines of code are also identified below within the MessengerServer example.

Figure 3. TAO C++ Messenger Server Supplier

Figure 3. TAO C++ Messenger Server Supplier
  1. Obtain an object reference to the event channel factory. (lines 21-22)
  2. Obtain an event channel. (lines 28-31)
  3. Obtain the SupplierAdmin object reference. (lines 47-48)
  4. Obtain a structured push consumer proxy object. (lines 56-59)
  5. Receive an incoming message from MessengerClient application
  6. Connect to the proxy to push the structured event. (line 119)

The Messenger_i class implements the Messenger interface (containing the send_message() method) and also acts as an event supplier to the TAO Notification Channel.

The constructor for the Messenger servant initializes the ORB and uses the TAO Naming Service ("NameService") to look up the TAO Notification Service channel factory.

The Notification Channel is then bound to the root naming context of the TAO Naming Service under the name "MyEventChannel."

  1. Messenger_i::Messenger_i (CORBA::ORB_ptr orb)
  2. : orb_ (CORBA::ORB::_duplicate(orb))
  3.  
  4. {
  5. try
  6. {
  7. CORBA::Object_var poa_obj = orb->resolve_initial_references("RootPOA");
  8. PortableServer::POA_var poa = PortableServer::POA::_narrow(poa_obj.in());
  9.  
  10. CORBA::Object_var naming_obj =
  11. orb_->resolve_initial_references ("NameService");
  12.  
  13. if (CORBA::is_nil(naming_obj.in())) {
  14. cerr << "Unable to find naming service" << endl;
  15. }
  16.  
  17. CosNaming::NamingContext_var naming_context =
  18. CosNaming::NamingContext::_narrow(naming_obj.in());
  19.  
  20. // Create an instance of TAO's notification event channel
  21. CosNotifyChannelAdmin::EventChannelFactory_var notify_factory =
  22. TAO_Notify_EventChannelFactory_i::create(poa.in());
  23.  
  24. CosNotifyChannelAdmin::ChannelID id;
  25. CosNotification::QoSProperties initial_qos;
  26. CosNotification::AdminProperties initial_admin;
  27.  
  28. CosNotifyChannelAdmin::EventChannel_var ec =
  29. notify_factory->create_channel (initial_qos,
  30. initial_admin,
  31. id);
  32.  
  33. if (CORBA::is_nil (ec.in())) {
  34. cerr << "Unable to crete event channel" << endl;
  35. }
  36.  
  37. CosNaming::Name name(1);
  38. name.length(1);
  39. name[0].id = CORBA::string_dup("MyEventChannel");
  40.  
  41. naming_context->rebind(name, ec.in());
  42.  
  43. CosNotifyChannelAdmin::AdminID adminid;
  44. CosNotifyChannelAdmin::InterFilterGroupOperator ifgop =
  45. CosNotifyChannelAdmin::AND_OP;
  46.  
  47. CosNotifyChannelAdmin::SupplierAdmin_var supplier_admin =
  48. ec->new_for_suppliers (ifgop, adminid);
  49.  
  50. if (CORBA::is_nil (supplier_admin.in())) {
  51. cerr << "Unable to find supplier admin" << endl;
  52. }
  53.  
  54. CosNotifyChannelAdmin::ProxyID supplieradmin_proxy_id;
  55.  
  56. CosNotifyChannelAdmin::ProxyConsumer_var proxy_consumer =
  57. supplier_admin->obtain_notification_push_consumer(
  58. CosNotifyChannelAdmin::STRUCTURED_EVENT,
  59. supplieradmin_proxy_id);
  60.  
  61. StructuredEventSupplier_i *servant =
  62. new StructuredEventSupplier_i(orb_.in());
  63.  
  64. PortableServer::ObjectId_var oid = poa->activate_object(servant);
  65. CORBA::Object_var supplier_obj = poa->id_to_reference(oid.in());
  66. CosNotifyComm::StructuredPushSupplier_var supplier =
  67. CosNotifyComm::StructuredPushSupplier::_narrow(supplier_obj.in());
  68.  
  69. consumer_proxy_ =
  70. CosNotifyChannelAdmin::StructuredProxyPushConsumer::
  71. _narrow(proxy_consumer.in());
  72.  
  73. if (CORBA::is_nil (consumer_proxy_.in())) {
  74. cerr << "Unable to find structured proxy push consumer" << endl;
  75. }
  76. consumer_proxy_->connect_structured_push_supplier(supplier.in());
  77.  
  78. }
  79. catch (CORBA::Exception &ex) {
  80. cerr << ex << endl;
  81. }
  82. }

The send_message() method of the Messenger Servant formats the message and then creates a new structured event and populates it with the contents of the message.  The push_structured_event() operation of the structured push consumer proxy object is used to push the event to the notification channel.

  1. CORBA::Boolean Messenger_i::send_message (const char * user_name,
  2. const char * subject,
  3. char *& message)
  4. throw (CORBA::SystemException)
  5. {
  6.  
  7. cout << "Message from: " << user_name << endl;
  8. cout << "Subject: " << subject << endl;
  9. cout << "Message: " << message << endl;
  10.  
  11. try
  12. {
  13.  
  14. // Event Definition
  15. CosNotification::StructuredEvent event;
  16.  
  17. event.header.fixed_header.event_type.domain_name =
  18. CORBA::string_dup("OCI");
  19. // string
  20. event.header.fixed_header.event_type.type_name =
  21. CORBA::string_dup("examples");
  22. // string
  23. event.header.fixed_header.event_name =
  24. CORBA::string_dup("myevent");
  25.  
  26. // sequence<Property>: string name, any value
  27. event.filterable_data.length (1);
  28. event.filterable_data[0].name = CORBA::string_dup("From");
  29. event.filterable_data[0].value <<= (const char *)user_name;
  30. event.filterable_data.length (2);
  31. event.filterable_data[1].name = CORBA::string_dup("Subject");
  32. event.filterable_data[1].value <<= (const char *)subject;
  33. event.filterable_data.length (3);
  34. event.filterable_data[2].name = CORBA::string_dup("Message");
  35. event.filterable_data[2].value <<= (const char *)message;
  36.  
  37. consumer_proxy_->push_structured_event(event);
  38. }
  39.  
  40. catch (CosNotifyComm::InvalidEventType &) {
  41. cerr << "Invalid Event Type Exception " << endl;
  42. return 1;
  43. }
  44.  
  45. catch (CORBA::Exception &ex) {
  46. cerr << ex << endl;
  47. return 1;
  48. }
  49. return 0;
  50. }

Lastly, the supplier class StructuredEventSupplier_i, which implements the CosNotifyComm:StructuredPushSupplier IDL interface, must be implemented. It contains two operations:

  • disconnect_structured_push_supplier(). Supplier invokes this method when it is finished providing event data; and
  • subscription_change(). Method invoked in the event that the consumers change their subscription model.

The constructor for this class simply duplicates the ORB reference.

  1. StructuredEventSupplier_i::StructuredEventSupplier_i(CORBA::ORB_ptr orb)
  2. : orb_(CORBA::ORB::_duplicate(orb))
  3. {
  4. }

The implementation for the subscription_change() method does nothing for the current implementation. The implementation for the disconnect_structured_push_supplier() deactivates the supplier object.

  1. void StructuredEventSupplier_i::disconnect_structured_push_supplier ()
  2. throw (CORBA::SystemException)
  3. {
  4.  
  5. CORBA::Object_var obj = orb_->resolve_initial_references ("POACurrent");
  6. PortableServer::Current_var current =
  7. PortableServer::Current::_narrow (obj.in());
  8. PortableServer::POA_var poa = current->get_POA ();
  9. PortableServer::ObjectId_var objectId = current->get_object_id ();
  10. poa->deactivate_object (objectId.in());
  11.  
  12. }

JacORB Java Push Consumer Application

We now implement the JacORB consumer which implements a structured push consumer that connects to the TAO Notification Channel and prints all the structured events it receives. The consumer uses the filtering functionality of the Notification Service to set up a constraint to allow only messages whose subjects contain "JacORB" to be received.

Figure 4. JacORB Consumer

Figure 4. JacORB Consumer
  1. Obtain an object reference to the event channel. (line 36)
  2. Obtain the ConsumerAdmin object reference. (line 42)
  3. Use the default filter factory to create a new filter; add the filter to the ConsumerAdmin. (lines 64-65, 76)
  4. Obtain a structured push supplier proxy object. (lines 95, 105)
  5. Connect to the proxy. (lines 129-133)

The JacORB consumer is implemented in the Java class JacORBConsumer shown below:

  1. package JacORBConsumer;
  2.  
  3. import org.omg.CosNotification.*;
  4. import org.omg.CosNotifyComm.*;
  5. import org.omg.CosNotifyChannelAdmin.*;
  6. import org.omg.CosNotifyFilter.*;
  7. import org.omg.CosNaming.*;
  8. import org.omg.CosNaming.NamingContextPackage.*;
  9. import org.omg.PortableServer.*;
  10.  
  11. import java.io.*;
  12. import java.util.*;
  13. import java.net.*;
  14.  
  15. public class Consumer extends StructuredPushConsumerPOA
  16. {
  17. public static void main( String[] args )
  18. {
  19. try {
  20.  
  21. System.out.println ("Initialize orb\n");
  22. org.omg.CORBA.ORB orb = org.omg.CORBA.ORB.init(args, null);
  23.  
  24.  
  25. // get naming service reference and context
  26. BufferedReader reader = new BufferedReader(new FileReader("ns.ior"));
  27. String ns = reader.readLine();
  28. org.omg.CORBA.Object obj = orb.string_to_object(ns);
  29.  
  30. NamingContext rootContext = NamingContextHelper.narrow( obj );
  31. System.out.println("Got Root context: "+ obj);
  32.  
  33. // find the event channel reference
  34. NameComponent[] name = { new NameComponent( "MyEventChannel", "" )};
  35. obj = rootContext.resolve(name);
  36. EventChannel channel = EventChannelHelper.narrow(obj);
  37. System.out.println ("**************MyChannel is " + channel.toString ());
  38.  
  39. // get the admin interface and the supplier proxy
  40. InterFilterGroupOperator ifgop = InterFilterGroupOperator.AND_OP;
  41. org.omg.CORBA.IntHolder adminId = new org.omg.CORBA.IntHolder(0);
  42. ConsumerAdmin consumerAdmin = channel.new_for_consumers(ifgop, adminId);
  43.  
  44. // get the default filter factory
  45. FilterFactory filterFactory = channel.default_filter_factory();
  46. Filter filter = null;
  47. if ( filterFactory == null ){
  48. System.err.println ("No default filter factory found!");
  49. }
  50. try {
  51. filter = filterFactory.create_filter("TCL");
  52. }
  53. catch (Exception e){
  54. System.err.println( "ERROR: " + e );
  55. e.printStackTrace( System.err );
  56. }
  57.  
  58. String expr = "Subject == 'JacORB'";
  59.  
  60. ConstraintExp exp[] = new ConstraintExp[1];
  61. EventType eventType[] = new EventType [0];
  62. exp[0] = new ConstraintExp (eventType, expr);
  63. try {
  64. ConstraintInfo info[] = filter.add_constraints (exp);
  65. consumerAdmin.add_filter(filter);
  66. }
  67. catch (InvalidConstraint ex) {
  68. System.err.println( "ERROR: " + e );
  69. e.printStackTrace( System.err );
  70. }
  71.  
  72. EventType added[] = new EventType[1];
  73. EventType removed[] = new EventType [0];
  74. added[0] = new EventType ("*", "*");
  75. try{
  76. consumerAdmin.subscription_change(added, removed);
  77. }
  78. catch (Exception e){
  79. System.err.println( "ERROR: " + e );
  80. e.printStackTrace( System.err );
  81. }
  82.  
  83. POA poa = org.omg.PortableServer.POAHelper.narrow
  84. (orb.resolve_initial_references("RootPOA"));
  85.  
  86. // create and implicitly activate the client
  87. StructuredPushConsumer structuredPushConsumer =
  88. (StructuredPushConsumer)new Consumer()._this(orb);
  89.  
  90. // get the stuctured proxy push supplier
  91. ClientType clientType = ClientType.STRUCTURED_EVENT;
  92. org.omg.CORBA.IntHolder proxyId = new org.omg.CORBA.IntHolder (0);
  93. ProxySupplier proxySupplier = null;
  94. try{
  95. proxySupplier = consumerAdmin.obtain_notification_push_supplier (clientType, proxyId);
  96. }
  97. catch( AdminLimitExceeded e ){
  98. System.err.println ( "ERROR: " + e);
  99. e.printStackTrace( System.err );
  100. }
  101. StructuredProxyPushSupplier proxyPushSupplier =
  102. StructuredProxyPushSupplierHelper.narrow(proxySupplier);
  103.  
  104. // connect ourselves to the event channel
  105. proxyPushSupplier.connect_structured_push_consumer(structuredPushConsumer);
  106.  
  107. poa.the_POAManager().activate();
  108. System.out.println ("run the orb");
  109. orb.run();
  110. }
  111. // Catch exceptions
  112. catch ( Exception e ) {
  113. System.err.println( "ERROR: " + e );
  114. e.printStackTrace( System.err );
  115. }
  116. System.out.println("Normal Termination...");
  117. }
  118.  
  119. public void disconnect_structured_push_consumer (){
  120. System.out.println ("disconnect_structured_push_consumer invoked");
  121. }
  122.  
  123. public void offer_change (EventType added[], EventType removed[]){
  124. System.out.println ("offer_change invoked");
  125. }
  126.  
  127. public void push_structured_event (StructuredEvent event){
  128. try {
  129. System.out.println ("\nevent name is: " + event.header.fixed_header.event_name);
  130. System.out.println ("domain name is: " + event.header.fixed_header.event_type.domain_name);
  131. System.out.println ("type name is: " + event.header.fixed_header.event_type.type_name);
  132. for (int i = 0; i < event.filterable_data.length; i++){
  133. System.out.println (event.filterable_data[i].name + ":\t" + event.filterable_data[i].value);
  134. }
  135. }
  136. catch ( Exception e ) {
  137. System.err.println( "ERROR: " + e );
  138. e.printStackTrace( System.err );
  139. }
  140. }
  141. }
  • The consumer locates the TAO Notification service and invokes the new_for_consumers() method on the channel interface to get the ConsumerAdmin object reference.
  • The consumer uses the default_filter_factory() to create a filter that utilizes the TCL constraint language.
  • A constraint is created ("Subject == 'JacORB'") and added to the filter using the add_constraints() method on the filter.
  • The filter is then added to the ConsumerAdmin object using the add_filter() method.
  • Invoking the obtain_notification_push_supplier() operation on the ConsumerAdmin object obtains the supplier push proxy.
  • Finally, the proxy is connected, and the consumer waits for events to be pushed. Events are printed in the push_structured_event() method.

Invoking the disconnect_structured_push_consumer() operation, by the channel, will disconnect the consumer at any point in time (care should be taken in the disconnection of suppliers and consumers).

The offered_change() operation does nothing in this example, but an intelligent consumer may, for example, disconnect when the channel no longer offers the types of events that are of interest.

Summary

This article focused on illustrating how to use the TAO Notification Service in conjunction with a JacORB consumer.

Many resources are available that provide greater detail into the OMG Notification Service, TAO, and JacORB. These include the OMG Notification Service specification and Java Programming with CORBA by Brose, Vogel, and Duddy.

In depth information on CORBA programming with TAO can be found from the the recently released TAO 1.2a Developer's Guide. Other related resources can be found by visiting the links listed in the References section.

References