June 18, 2002 - By Heather Drury, OCI Product Services Manager & Senior Software Engineer
Middleware News Brief (MNB) features news and technical information about Open Source middleware technologies.
Overview of the Notification Service
The Notification Service Specification was originally produced by the Telecommunications Working Group within the Object Management Group (OMG), an international consortium of over 800 companies, and was later adopted as a standard service. The goal was to extend the more basic OMG Event Service specification to support telecommunication applications, yet remain backward compatible with the standard Event Service. The Notification Service preserves all the semantics specified for the OMG Event Service, allowing for interoperability between basic Event Service and Notification Service clients.
Both the Notification and Event services enable events (with an optional data payload) to be sent and received between objects in a decoupled fashion. This provides a more flexible mechanism for message transmission than if events were transmitted directly.
Analogous to the standard OMG Event Service, the Notification model utilizes an event channel as the substrate for the communication of messages between client applications. Applications that provide messages are termed suppliers while applications that receive (or consume) messages are known as consumers. Suppliers and consumers are completely decoupled from one another (i.e., suppliers have no knowledge of which consumers are listening for their messages and vice versa).
While the OMG Event Service supports asynchronous exchange of event messages, it has several serious limitations, including no support for event filtering and no ability to be configured for varying levels of qualities of service (QoS). The Notification Service was designed to enhance the OMG Event Service by adding the following features:
- Structured event type. The Notification Service introduces a new event type, the structured event, that has a well-defined data structure into which messages can be inserted and sent to interested consumers. Messages can be defined using standard IDL and provide a much more strongly typed event than was available in the OMG Event Service. In addition, a sequence of structured events can be transmitted simultaneously thereby increasing communication efficiency.
- Event filtering. The Notification Service allows consumers of events to use filters to selectively specify which events they are interested in receiving. Event filtering occurs within the Notification Service so consumers receive only events in which they are interested and do not spend processing cycles dealing with unwanted events.
- Publication of offers and subscriptions. The Notification Service provides a way for suppliers to inform consumers of the types of events they produce, thereby allowing developers of event consumers to make informed decisions as to whether they should continue to receive events of a particular type. Likewise, the Notification Service provides a way for consumers of events to inform suppliers of the types of events they are interested in receiving, thereby allowing developers of event suppliers to make informed decisions as to whether they should continue to produce events of a particular type.
- Quality of Service properties. The Notification Service specifies how various QoS properties can be configured including properties that control reliability, priority, order, and discard policies for message delivery (as well as many others).
Figure 1 shows the general architecture of the Notification Service. Suppliers and consumers of a notification channel are connected via their associated proxies. Administration interfaces on the supplier side and consumer side provide the ability to cluster a set of proxies based on common configurations.
TAO Notification Service
TAO 1.2a includes an implementation of the OMG Notification Service. Highlights of the 1.2a release include support for the following features:
- Multithreaded event dispatching and filter evaluation;
- Full ETCL filtering constraint grammar;
- SequencePushConsumer interface;
- Most of the standard quality of service (QoS) properties (with the exception of reliability);
- Both untyped and structured events (but not typed events);
- Push model (but not pull);
- Event channel factory; and
- Offers and subscriptions.
Example using TAO and JacORB
We illustrate the utility and interoperability of the Notification Service by creating a simple supplier application that sends events through the TAO Notification Channel to a JacORB consumer.
The TAO application (written in C++) is a push supplier that connects to the Notification Channel and pushes events. These events are simple text messages that comprise a "sender," a "subject," and a "body."
The JacORB (written in Java) consumer is a push consumer that connects to the channel and asynchronously receives event notifications from the channel. Both the supplier and consumer use the structured event type.
The example is composed of three applications as illustrated in Figure 2 and further described below:
- TAO
MessengerClient
. A simple C++ application that connects to the TAO Naming Service, looks up theMessengerServer
object reference and invokes thesend_message()
method on that object to send two messages, one with "TAO" as the subject and the other with "JacORB" as the subject. TheMessengerClient
application does not directly interact with the Notification service in any way. We will not describe theMessengerClient
application in any further detail. - TAO
MessengerServer
. A C++ application that plays the role of the server for theMessengerClient
and the supplier for the JacORB consumer. The Notification channel is created within this application. The implementation for the Messenger servant contains asend_message()
method that is invoked from theMessengerClient
and packages the incoming data (with the associated "subject", "sender", and "body") into a structured event and invokes thepush_structured_event()
method on the consumer proxy object. - JacORB Consumer. A Java application that uses JacORBv1.4 to connect to the TAO Naming Service, look up the Notification Event channel, and receive incoming events. This application also utilizes the filtering functionality of the TAO Notification Service to limit incoming events to those that contain "JacORB" in their subject and to ignore all others.
- TAO Notification Service. Service use to set up notification channel and associated administrative objects.
- TAO Naming Service. Naming service used to locate CORBA objects.
TAO C++ Push Supplier Application
The MessengerServer
main()
method initializes the ORB (TAO), creates the Messenger servant, and registers the Messenger object with the TAO Naming Service. The code for theMessengerServer.cpp
class is shown below:
- #include <orbsvcs/CosNamingC.h>
- #include "Messenger_i.h"
- #include <ace/streams.h>
-
- int
- main(int argc, char * argv[])
- {
- try
- {
- // Initialize orb
- CORBA::ORB_var orb = CORBA::ORB_init(argc, argv);
-
- // Find the Naming Service.
- CORBA::Object_var rootObj =
- orb->resolve_initial_references("NameService");
- CosNaming::NamingContext_var rootNC =
- CosNaming::NamingContext::_narrow(rootObj.in());
-
- // Get the Root POA.
- CORBA::Object_var obj = orb->resolve_initial_references("RootPOA");
- PortableServer::POA_var poa = PortableServer::POA::_narrow(obj.in());
-
- // Activate POA manager
- PortableServer::POAManager_var mgr = poa->the_POAManager();
- mgr->activate();
-
- // Create our Messenger servant.
-
- messenger_servant(orb.in());
-
- // Register it with the RootPOA.
- PortableServer::ObjectId_var oid =
- poa->activate_object( &messenger_servant );
- CORBA::Object_var messenger_obj = poa->id_to_reference( oid.in() );
-
- // Bind it in the Naming Service.
- CosNaming::Name name;
- name.length (1);
- name[0].id = CORBA::string_dup("MessengerService");
- rootNC->rebind(name, messenger_obj.in());
-
- // Accept requests
- orb->run();
- orb->destroy();
-
- }
- catch (CORBA::Exception& ex) {
- cerr << ex << endl;
- return 1;
- }
- return 0;
-
- }
We now implement the structured push supplier that connects to the Notification Service and sends structured events when its send_message()
method is invoked.
The steps involved in connecting the supplier to the event channel are illustrated in steps 1-6 in Figure 3 and further described below. The corresponding lines of code are also identified below within the MessengerServer
example.
- Obtain an object reference to the event channel factory. (lines 21-22)
- Obtain an event channel. (lines 28-31)
- Obtain the SupplierAdmin object reference. (lines 47-48)
- Obtain a structured push consumer proxy object. (lines 56-59)
- Receive an incoming message from MessengerClient application
- Connect to the proxy to push the structured event. (line 119)
The Messenger_i
class implements the Messenger interface (containing the send_message()
method) and also acts as an event supplier to the TAO Notification Channel.
The constructor for the Messenger servant initializes the ORB and uses the TAO Naming Service ("NameService") to look up the TAO Notification Service channel factory.
The Notification Channel is then bound to the root naming context of the TAO Naming Service under the name "MyEventChannel."
- Messenger_i::Messenger_i (CORBA::ORB_ptr orb)
- : orb_ (CORBA::ORB::_duplicate(orb))
-
- {
- try
- {
- CORBA::Object_var poa_obj = orb->resolve_initial_references("RootPOA");
- PortableServer::POA_var poa = PortableServer::POA::_narrow(poa_obj.in());
-
- CORBA::Object_var naming_obj =
- orb_->resolve_initial_references ("NameService");
-
- if (CORBA::is_nil(naming_obj.in())) {
- cerr << "Unable to find naming service" << endl;
- }
-
- CosNaming::NamingContext_var naming_context =
- CosNaming::NamingContext::_narrow(naming_obj.in());
-
- // Create an instance of TAO's notification event channel
- CosNotifyChannelAdmin::EventChannelFactory_var notify_factory =
- TAO_Notify_EventChannelFactory_i::create(poa.in());
-
- CosNotifyChannelAdmin::ChannelID id;
- CosNotification::QoSProperties initial_qos;
- CosNotification::AdminProperties initial_admin;
-
- CosNotifyChannelAdmin::EventChannel_var ec =
- notify_factory->create_channel (initial_qos,
- initial_admin,
- id);
-
- if (CORBA::is_nil (ec.in())) {
- cerr << "Unable to crete event channel" << endl;
- }
-
- CosNaming::Name name(1);
- name.length(1);
- name[0].id = CORBA::string_dup("MyEventChannel");
-
- naming_context->rebind(name, ec.in());
-
- CosNotifyChannelAdmin::AdminID adminid;
- CosNotifyChannelAdmin::InterFilterGroupOperator ifgop =
- CosNotifyChannelAdmin::AND_OP;
-
- CosNotifyChannelAdmin::SupplierAdmin_var supplier_admin =
- ec->new_for_suppliers (ifgop, adminid);
-
- if (CORBA::is_nil (supplier_admin.in())) {
- cerr << "Unable to find supplier admin" << endl;
- }
-
- CosNotifyChannelAdmin::ProxyID supplieradmin_proxy_id;
-
- CosNotifyChannelAdmin::ProxyConsumer_var proxy_consumer =
- supplier_admin->obtain_notification_push_consumer(
- CosNotifyChannelAdmin::STRUCTURED_EVENT,
- supplieradmin_proxy_id);
-
- StructuredEventSupplier_i *servant =
- new StructuredEventSupplier_i(orb_.in());
-
- PortableServer::ObjectId_var oid = poa->activate_object(servant);
- CORBA::Object_var supplier_obj = poa->id_to_reference(oid.in());
- CosNotifyComm::StructuredPushSupplier_var supplier =
- CosNotifyComm::StructuredPushSupplier::_narrow(supplier_obj.in());
-
- consumer_proxy_ =
- CosNotifyChannelAdmin::StructuredProxyPushConsumer::
- _narrow(proxy_consumer.in());
-
- if (CORBA::is_nil (consumer_proxy_.in())) {
- cerr << "Unable to find structured proxy push consumer" << endl;
- }
- consumer_proxy_->connect_structured_push_supplier(supplier.in());
-
- }
- catch (CORBA::Exception &ex) {
- cerr << ex << endl;
- }
- }
The send_message()
method of the Messenger Servant formats the message and then creates a new structured event and populates it with the contents of the message. The push_structured_event()
operation of the structured push consumer proxy object is used to push the event to the notification channel.
- CORBA::Boolean Messenger_i::send_message (const char * user_name,
- const char * subject,
- char *& message)
- throw (CORBA::SystemException)
- {
-
- cout << "Message from: " << user_name << endl;
- cout << "Subject: " << subject << endl;
- cout << "Message: " << message << endl;
-
- try
- {
-
- // Event Definition
- CosNotification::StructuredEvent event;
-
- event.header.fixed_header.event_type.domain_name =
- CORBA::string_dup("OCI");
- // string
- event.header.fixed_header.event_type.type_name =
- CORBA::string_dup("examples");
- // string
- event.header.fixed_header.event_name =
- CORBA::string_dup("myevent");
-
- // sequence<Property>: string name, any value
- event.filterable_data.length (1);
- event.filterable_data[0].name = CORBA::string_dup("From");
- event.filterable_data[0].value <<= (const char *)user_name;
- event.filterable_data.length (2);
- event.filterable_data[1].name = CORBA::string_dup("Subject");
- event.filterable_data[1].value <<= (const char *)subject;
- event.filterable_data.length (3);
- event.filterable_data[2].name = CORBA::string_dup("Message");
- event.filterable_data[2].value <<= (const char *)message;
-
- consumer_proxy_->push_structured_event(event);
- }
-
- catch (CosNotifyComm::InvalidEventType &) {
- cerr << "Invalid Event Type Exception " << endl;
- return 1;
- }
-
- catch (CORBA::Exception &ex) {
- cerr << ex << endl;
- return 1;
- }
- return 0;
- }
Lastly, the supplier class StructuredEventSupplier_i
, which implements the CosNotifyComm:StructuredPushSupplier
IDL interface, must be implemented. It contains two operations:
disconnect_structured_push_supplier()
. Supplier invokes this method when it is finished providing event data; andsubscription_change()
. Method invoked in the event that the consumers change their subscription model.
The constructor for this class simply duplicates the ORB reference.
- StructuredEventSupplier_i::StructuredEventSupplier_i(CORBA::ORB_ptr orb)
- : orb_(CORBA::ORB::_duplicate(orb))
- {
- }
The implementation for the subscription_change()
method does nothing for the current implementation. The implementation for the disconnect_structured_push_supplier()
deactivates the supplier object.
- void StructuredEventSupplier_i::disconnect_structured_push_supplier ()
- throw (CORBA::SystemException)
- {
-
- CORBA::Object_var obj = orb_->resolve_initial_references ("POACurrent");
- PortableServer::Current_var current =
- PortableServer::Current::_narrow (obj.in());
- PortableServer::POA_var poa = current->get_POA ();
- PortableServer::ObjectId_var objectId = current->get_object_id ();
- poa->deactivate_object (objectId.in());
-
- }
JacORB Java Push Consumer Application
We now implement the JacORB consumer which implements a structured push consumer that connects to the TAO Notification Channel and prints all the structured events it receives. The consumer uses the filtering functionality of the Notification Service to set up a constraint to allow only messages whose subjects contain "JacORB" to be received.
- Obtain an object reference to the event channel. (line 36)
- Obtain the
ConsumerAdmin
object reference. (line 42) - Use the default filter factory to create a new filter; add the filter to the
ConsumerAdmin
. (lines 64-65, 76) - Obtain a structured push supplier proxy object. (lines 95, 105)
- Connect to the proxy. (lines 129-133)
The JacORB consumer is implemented in the Java class JacORBConsumer
shown below:
- package JacORBConsumer;
-
- import org.omg.CosNotification.*;
- import org.omg.CosNotifyComm.*;
- import org.omg.CosNotifyChannelAdmin.*;
- import org.omg.CosNotifyFilter.*;
- import org.omg.CosNaming.*;
- import org.omg.CosNaming.NamingContextPackage.*;
- import org.omg.PortableServer.*;
-
- import java.io.*;
- import java.util.*;
- import java.net.*;
-
- public class Consumer extends StructuredPushConsumerPOA
- {
- public static void main( String[] args )
- {
- try {
-
- System.out.println ("Initialize orb\n");
- org.omg.CORBA.ORB orb = org.omg.CORBA.ORB.init(args, null);
-
-
- // get naming service reference and context
- BufferedReader reader = new BufferedReader(new FileReader("ns.ior"));
- String ns = reader.readLine();
- org.omg.CORBA.Object obj = orb.string_to_object(ns);
-
- NamingContext rootContext = NamingContextHelper.narrow( obj );
- System.out.println("Got Root context: "+ obj);
-
- // find the event channel reference
- NameComponent[] name = { new NameComponent( "MyEventChannel", "" )};
- obj = rootContext.resolve(name);
- EventChannel channel = EventChannelHelper.narrow(obj);
- System.out.println ("**************MyChannel is " + channel.toString ());
-
- // get the admin interface and the supplier proxy
- InterFilterGroupOperator ifgop = InterFilterGroupOperator.AND_OP;
- org.omg.CORBA.IntHolder adminId = new org.omg.CORBA.IntHolder(0);
- ConsumerAdmin consumerAdmin = channel.new_for_consumers(ifgop, adminId);
-
- // get the default filter factory
- FilterFactory filterFactory = channel.default_filter_factory();
- Filter filter = null;
- if ( filterFactory == null ){
- System.err.println ("No default filter factory found!");
- }
- try {
- filter = filterFactory.create_filter("TCL");
- }
- catch (Exception e){
- System.err.println( "ERROR: " + e );
- e.printStackTrace( System.err );
- }
-
- String expr = "Subject == 'JacORB'";
-
- ConstraintExp exp[] = new ConstraintExp[1];
- EventType eventType[] = new EventType [0];
- exp[0] = new ConstraintExp (eventType, expr);
- try {
- ConstraintInfo info[] = filter.add_constraints (exp);
- consumerAdmin.add_filter(filter);
- }
- catch (InvalidConstraint ex) {
- System.err.println( "ERROR: " + e );
- e.printStackTrace( System.err );
- }
-
- EventType added[] = new EventType[1];
- EventType removed[] = new EventType [0];
- added[0] = new EventType ("*", "*");
- try{
- consumerAdmin.subscription_change(added, removed);
- }
- catch (Exception e){
- System.err.println( "ERROR: " + e );
- e.printStackTrace( System.err );
- }
-
- POA poa = org.omg.PortableServer.POAHelper.narrow
- (orb.resolve_initial_references("RootPOA"));
-
- // create and implicitly activate the client
- StructuredPushConsumer structuredPushConsumer =
- (StructuredPushConsumer)new Consumer()._this(orb);
-
- // get the stuctured proxy push supplier
- ClientType clientType = ClientType.STRUCTURED_EVENT;
- org.omg.CORBA.IntHolder proxyId = new org.omg.CORBA.IntHolder (0);
- ProxySupplier proxySupplier = null;
- try{
- proxySupplier = consumerAdmin.obtain_notification_push_supplier (clientType, proxyId);
- }
- catch( AdminLimitExceeded e ){
- System.err.println ( "ERROR: " + e);
- e.printStackTrace( System.err );
- }
- StructuredProxyPushSupplier proxyPushSupplier =
- StructuredProxyPushSupplierHelper.narrow(proxySupplier);
-
- // connect ourselves to the event channel
- proxyPushSupplier.connect_structured_push_consumer(structuredPushConsumer);
-
- poa.the_POAManager().activate();
- System.out.println ("run the orb");
- orb.run();
- }
- // Catch exceptions
- catch ( Exception e ) {
- System.err.println( "ERROR: " + e );
- e.printStackTrace( System.err );
- }
- System.out.println("Normal Termination...");
- }
-
- public void disconnect_structured_push_consumer (){
- System.out.println ("disconnect_structured_push_consumer invoked");
- }
-
- public void offer_change (EventType added[], EventType removed[]){
- System.out.println ("offer_change invoked");
- }
-
- public void push_structured_event (StructuredEvent event){
- try {
- System.out.println ("\nevent name is: " + event.header.fixed_header.event_name);
- System.out.println ("domain name is: " + event.header.fixed_header.event_type.domain_name);
- System.out.println ("type name is: " + event.header.fixed_header.event_type.type_name);
- for (int i = 0; i < event.filterable_data.length; i++){
- System.out.println (event.filterable_data[i].name + ":\t" + event.filterable_data[i].value);
- }
- }
- catch ( Exception e ) {
- System.err.println( "ERROR: " + e );
- e.printStackTrace( System.err );
- }
- }
- }
- The consumer locates the TAO Notification service and invokes the
new_for_consumers()
method on the channel interface to get the ConsumerAdmin object reference. - The consumer uses the
default_filter_factory()
to create a filter that utilizes the TCL constraint language. - A constraint is created ("Subject == 'JacORB'") and added to the filter using the
add_constraints()
method on the filter. - The filter is then added to the
ConsumerAdmin
object using theadd_filter()
method. - Invoking the
obtain_notification_push_supplier()
operation on theConsumerAdmin
object obtains the supplier push proxy. - Finally, the proxy is connected, and the consumer waits for events to be pushed. Events are printed in the
push_structured_event()
method.
Invoking the disconnect_structured_push_consumer()
operation, by the channel, will disconnect the consumer at any point in time (care should be taken in the disconnection of suppliers and consumers).
The offered_change()
operation does nothing in this example, but an intelligent consumer may, for example, disconnect when the channel no longer offers the types of events that are of interest.
Summary
This article focused on illustrating how to use the TAO Notification Service in conjunction with a JacORB consumer.
Many resources are available that provide greater detail into the OMG Notification Service, TAO, and JacORB. These include the OMG Notification Service specification and Java Programming with CORBA by Brose, Vogel, and Duddy.
In depth information on CORBA programming with TAO can be found from the the recently released TAO 1.2a Developer's Guide. Other related resources can be found by visiting the links listed in the References section.
References
- [1] TAO at Washington University, St. Louis
http://tao.doc.wustl.edu - [2] OCI TAO: The ACE ORB
http://www.theaceorb.com - [3] OCI TAO Resources
http://www.theaceorb.com/references/ - [4] JacORB
http://www.jacorb.org - [5] OMG Notification Service Specification
http://www.omg.org/cgi-bin/doc?formal/2000-06-20