Bridging XMPP and DDS Messaging Frameworks
By Marc Neeley, OCI Principal Software Engineer, and Kevin Stanley, OCI Partner and Principal Software Engineer
April 2012
Use Case - Integrating Messaging Frameworks
In the course of developing systems, it is often necessary to integrate components that are built using different communication mechanisms. This situation can arise for many reasons:
- Differing requirements
- Constraints imposed by the associated subsystems that must be integrated
- Different skillsets within the organization
- Legacy systems that were developed prior to emerging new standards
- ... and others
A common approach to resolving this situation is to employ a messaging bridge. A messaging bridge connects multiple message channels, managing the connections details and message translation between the channels.
When developing a messaging bridge, you will need to map concepts in each of the communication paradigms. Ideally you would use communication approaches that are in the same family (e.g., Publish/Subscribe, Client Server, Remote Procedure Call, etc.). For the purposes of this article, we consider two modern, message-oriented middleware frameworks:
- Extensible Messaging Presence Protocol (XMPP)
- The Data Distribution Service (DDS) for real-time systems
Both DDS and XMPP provide support for the Publish/Subscribe paradigm.
XMPP is commonly employed for client/server applications involving human interaction. DDS provides a high-performance messaging infrastructure with support for many features that enable support for rich, robust, real-time communications.
Our main objective is to illustrate the considerations in developing a messaging bridge in general, as well as its application specifically through the use of XMPP and DDS messaging frameworks.
A number of considerations may drive the need for bridging.
- Existing systems with disparate interface or connectivity standards may be present.
- Extension of one protocol type into the environment of a foreign system may not be supportable or practical.
- Some or all of the systems may not be able to be modified to support alternative messaging mechanisms.
We will use a scenario that involves human users of an XMPP chat server (OpenFire) that will interact with each other as well as industrial equipment, which cannot utilize the XMPP protocol either practically or technically.
We start with these assumptions:
- There are devices (pumps) attached to a network that can be controlled through a programmable API.
- The devices are attached to a network, which may be invoked through a defined control interface.
On the other side, users will interact in a group chat session with each other and monitor and control multiple devices that will be supplying readings to the group chat session and will accept commands from the users in the chat room to change and get settings while troubleshooting issues (see Figure 1).
The following sections provide a short overview of each messaging framework, followed by a discussion of the design and development of the messaging bridge.
XMPP Background
In 1999 Jeremie Miller provided an open technology protocol standard for instant messaging and presence called Jabber, and from that came the implementation of an open source server called jabberd shortly thereafter. Since that time, the Internet Engineering Task Force (IETF) standards body and the XMPP Standards Foundation (XSF) have matured the XMPP standard from basic chat to more advanced collaboration functions, such as whiteboarding and video presence.
The XMPP series of specifications are delivered as Request For Comment (RFC) internet drafts and matured for adoption by the XSF and IETF. The core specifications, RFC 3920 and RFC 3921, are considered to be the minimal necessary for general server-based chat and presence. These RFCs are currently being superseded by RFC 6120 and RFC 6121, but they have not found their way into mainstream implementations to-date. The specifications for core XMPP functionality are found at the following link: http://xmpp.org/xmpp-protocols/rfcs
The more interesting functionality delivered by most XMPP server implementations include extension specifications to the core set. These are called XMPP Extension Protocols (XEP) and can be found here: http://www.xmpp.org/xmpp-protocols/xmpp-extensions
One of the extensions forming a basis for the use case in this publication is XEP-0045 Muti-User Chat. This extension offers the advantage of group and topic management, group conversation, and presence indication. The implementation is a blackboard type pattern with simple client-server integration between user chat clients and one or more central servers. XMPP servers can be federated such that clients operating in one domain can collaborate with other clients in remote domains without needing to maintain connections to more than one XMPP server. Server-to-Server (s2s) communication is a core feature in RFC 3920, but a more efficient server-to-server (s2s) extension (XEP-0288) is emerging as a candidate to supersede this capability.
The messaging protocols defined across the full set of XMPP specifications are XML-based. Variations of the message passing protocol for basic chat exist for domain-specific needs such as content syndication, file sharing, geo-location, gaming, and binary content distribution. The XML content exchanged between participating users is sent as a stanza. Here is an example:
- <message from="bob@ociweb.com"
- to="louie@ociweb.com"
- type="chat">
- <body>What's for lunch?</body>
- <subject>Query</subject>
- </message>
Disadvantages of XMPP approaches and associated technologies do exist under certain contexts. The XML payload of each XMPP communication, for instance, can potentially raise the overhead level in environments where such overhead isn't tolerable. XMPP does demand centralized services to exchange messages since all roster, presence, and routing functions, along with many other extensions, are only available there. XMPP overcomes this central dependency for geographically divided users that are operating on separate servers, by including in the standard server-to-server federation and adopting a DNS style user addressing scheme similar to email addresses.
DDS Background
The DDS specification was created by the Object Management Group (OMG) and adopted in 2003. It supports a loosely-coupled, typesafe, publish/subscribe communication paradigm including features that make it an effective mechanism for use in real-time and embedded systems. DDS provides an abstraction of strongly typed data where publishers and subscribers write and read data in a conceptual global data space. It is particularly suited for cases where users wish to use a data-centric architecture, enabling applications to focus on changes to data and avoid dependencies on communications in their logic. With this style of messaging, applications communicate with one another anonymously via a logical Data Bus without a centralized server, thereby avoiding a single point of failure for interactions between clients. DDS implementations provide an API for high-level programming languages, including Java and C++.
DDS provides a very compact binary message format, especially compared to an XML-based message format. It also supports a bounded use of resources over potentially intermittent links - able to replay messages upon reconnection. One of the main features supported by DDS is that of a configurable Quality of Service (or QoS) settings. DDS addresses reliability of delivery by supporting persistence, durability and retransmission QoS settings. Through the application of QoS policies, applications are able to manage the use of bandwidth, network and memory resources and control many characteristics of the reliability of the messages between publishers and subscribers. DDS does not expose the application to the messaging aspect of the interaction. The application need only be aware of the data types and any changes that are made to it - hiding the details and associated complexity.
DDS allows publishers and subscribers to be paired based on matching of topics. Publishers specify a unique topic associated with the data they are publishing, and subscribers request data on that topic. Topics, however, are not the only mechanism for controlling the flow of data in DDS -- subscribers can apply filters to further restrict the data to a more narrow area of interest within the published data.
Implementing a messaging bridge
The Messaging Bridge pattern allows applications using different messaging systems to communicate seamlessly. It replicates messages between the messaging systems and is typically implemented using channel adapters on each side of the bridge, providing an interface for the application to translate messages. In our example, the MessageEngine implements the messaging bridge, integrating an XMPP channel and a DDS channel.
Often in the implementation of applications that involve bridging message protocols, care must be taken to ensure that the dynamic behaviors and state of either side of the bridge is maintained according to the expectations of the respective environments. For example, API and message syntax are part of the DDS standard, where XMPP is only governed by XML and wire protocol standards. The bridge must take this into account and ensure the disparate environments are given what they need to operate appropriately.
In the case of bridging DDS and XMPP protocols, there is a comfortable alignment of concepts that make the programming of a bridge easy to follow. A mapping of the concepts is shown in the following table.
XMPP | DDS |
---|---|
Chat Room | Topic |
Client Session | Instance |
Presence | Liveliness |
Messages exchanged between the two environments are also transformed by the bridge and, in this example, the XMPP messages must be defined according to the XMPP standard stanza notation.
On the DDS side, messages are defined in Interface Definition Language (IDL) during design time and generated into source-level types, which are subsequently compiled and linked in with each component that needs to read or write the interface data.
For XMPP, messages are described using XML schema definitions (XSD). A complete listing of the XMPP schema definitions can be found at the following URL: http://xmpp.org/resources/schemas/.
Below we show an example XMPP stanza and the IDL specification of a user-defined DDS "Message" data type. The actual DDS messages would be transmitted using a binary wire format and are not shown.
XMPP Stanza | DDS (IDL) |
---|---|
<message from="bob@ociweb.com" to="louie@ociweb.com" type="chat"> <subject>Query</subject> <body>Lunch?</body> </message> |
module Messenger { #pragma DCPS_DATA_TYPE "Messenger::Message" #pragma DCPS_DATA_KEY "Messenger::Message subject_id" struct Message { string from; string subject; long subject_id; string text; long count; }; }; |
A message from each definition contains its own representation of a From, To, Type, Subject, and Body element. In the case of DDS and the IDL shown, the subject_id
and count
elements don't have a match on the XMPP side, but are not necessary for XMPP messages.
To implement our example we extended an existing example in OpenDDS implementing a Message type whose IDL is shown in the table above.
In the IDL, the message is defined using a struct with a set of attributes that define the data that is being published. The pragmas in the IDL are specific to the DDS framework - namely DCPS_DATA_TYPE and DCPS_DATA_KEY. The DCPS_DATA_TYPE pragma marks the defined type for use within OpenDDS, and the DCPS_DATA_KEY informs OpenDDS which attribute(s) are to be used as keys, marking new instances within the published data.
Inefficiencies in duplicating XMPP messages make it an issue when distributing them to multiple destinations. This is where utilizing DDS for message distribution can be an advantage. By bridging these messaging frameworks, applications on the DDS side of the bridge do not experience this inefficiency.
XMPP Swift Library
To help us with communications between the OpenFire XMPP server and our Message Engine bridge, we have leveraged the very useful XMPP C++ library called Swift (http://www.swift.im/swiften). The library implements all of the RFC-3920 and RFC-3921 requirements, along with several of the XMPP extension standards, including the Multi-User Chat capabilities that we use for our use case.
The major features allow a C++ application to create client connections to an XMPP server just like a human controlled chat client would. The interaction patterns for XMPP are very event driven. The arrival of a message from a client to an XMPP server, for example, triggers the evaluation of the recipient address and the subsequent sending of the message to the end client who is also listening for new events.
This event approach is supported in the Swift functionality which provides callback mechanisms to allow a C++ application to instantiate listeners for these events. The description for the MessageEngineComponent in the next section highlights the callbacks used in our example. The remainder of the Message Engine components use this library throughout their implementation as well and are described next.
The following descriptions detail each of the elements listed above with excerpts of supporting code as needed.
- MessageEngine. This source simply contains the
main()
of the service and instantiates the MessageEngineComponent. - MessageEngineComponent. This class is responsible for initializing a client connection with the OpenFire XMPP server on one side of the bridge an then it is responsible for initializing the subscriber and publisher participants with our example Device Simulations on the DDS side of the bridge. Once the initialization is complete, the component is responsible for listening for stanzas (XMPP chat messages) coming from the OpenFire server that chat clients are supplying to the multi-user chat room. The Swift library, introduced earlier, provides a callback for our C++ component to bind to and listen for incoming messages. Here is how our application binds to the callback :
- XMPPclient_ = new Client(uid, pwd, networkFactories);
-
- XMPPclient->onMessageReceived.connect(
- boost::bind(&MessageEngineComponent::handleMessageReceived,
- this,
- _1));
The code above instantiates a new Swift client connection and then uses the boost::bind()
operation to bind the local method handleMessageReceived()
with the Swift client event onMessageReceived
.
The MessageEngineComponent
also binds local methods with the following Swift client events:
onConnected
onDataRead
onDataWritten
onPresenceReceived
onJoinComplete
Now we will look at the handleMessageReceived
method implementation. This is where the bridge transformation of an XMPP message to a DDS message occurs. The embedded comments explain each step:
- // The input parameter to this method is a Boost smart pointer to a
- // Swift Message data type that has been received
-
- void MessageEngineComponent::handleMessageReceived(Swift::Message::ref message) {
-
- // Create a DDS message to load the XMPP content into
- // This data type is defined in MessageEngine.idl
- Messenger::Message ddsMsg;
-
- // An XMPP message isn’t required to have a subject or subject id
- if(message->hasSubject()) {
- ddsMsg.subject_id = atol(message->getID().c_str());
- ddsMsg.subject = message->getSubject().c_str();
- }
-
- // Now populate the remainder of the DDS message with the XMPP content.
- ddsMsg.from = message->getFrom().toString().c_str();
- ddsMsg.text = message->getBody().c_str();
- ddsMsg.count = 1; // simple number of messages in this transmission
-
-
- // Now publish the newly created DDS message
- publisher_->write(ddsMsg);
-
- }
- MessageEngineDDSDataListenerImpl. This class is responsible for listening for DDS events coming from the Device simulators that are publishing data destined for human recipients in the chat room. The following method is triggered by the DataReader when publishers (devices) have sent data. Again the commenting in the code explains each step.
- void
- MessageEngineDDSDataListenerImpl::on_data_available(DDS::DataReader_ptr reader)
- ACE_THROW_SPEC((CORBA::SystemException))
- {
- // Safely downcast data reader to type-specific data reader
- Messenger::MessageDataReader_var reader_i = Messenger::MessageDataReader::_narrow(reader);
-
- Messenger::Message msg;
- DDS::SampleInfo info;
-
- // Remove (take) the next sample from the data reader
- DDS::ReturnCode_t error = reader_i->take_next_sample(msg, info);
-
- // Make sure take was successful
- if (error == DDS::RETCODE_OK) {
- // Make sure this is not a sample dispose message
-
- if (verbose_) {
- std::cout << "Sample taken ok..." << std::endl;
- }
-
- if (info.valid_data) {
- ++sample_count;
-
- // Create a new XMPP Message for the payload of the message
- // The Swift library creates a new stanza based on this type
-
- boost::shared_ptr<Swift::Message> xmppMsg(new Swift::Message());
-
- // Set the type of message to Groupchat so that the XMPP server will
- // properly allow the message to be used in a Multi-User session
- xmppMsg->setType(Swift::Message::Type::Groupchat);
-
- // Since this message engine handles messages for more than one device,
- // append the sender to the front of the message text so those in the
- // chat room can see which device it is from.
-
- std::string localMsg_;
- xmppMsg->setBody(localMsg_.append("[").append(msg.from.in()).append("] ").append("[").append(msg.subject.in()).append("]").append(msg.text.in()));
-
- // Populate the from and to fields of the stanza
-
- xmppMsg->setFrom(localClient_->getJID());
- xmppMsg->setTo(localRoom_);
-
- // Now send the message to the XMPP server
-
- localClient_->sendMessage(xmppMsg);
-
- // Log the message coming from the device if verbosity is turned on
-
- if (verbose_) {
- std::cout << Msg: subject " << msg.subject.in() << std::endl
- << " subject_id " << msg.subject_id << std::endl
- << " from " << msg.from.in() << std::endl
- << " count " << msg.count << std::endl
- << " text " << msg.text.in() << std::endl;
- }
- }
- } else {
- ACE_ERROR((LM_ERROR,
- ACE_TEXT("ERROR: %N:%l: on_data_available() -")
- ACE_TEXT(" take_next_sample failed!\n")));
- }
- }
A feature that is included in the XMPP standard is the concept of presence. Presence is obviously used to indicate if a given user has an active connection from a chat client. In our scenario, however, it is important that the Message Engine indicate to the human users in the session that devices reporting diagnostic data to the session are "present" (or not). Because the devices are using OpenDDS to communicate to the Message Engine, we can use the on_liveliness_changed
feature of OpenDDS to tell the Message Engine how many instances of a device are active publishers to the Message Engine.
Below we see the portion of code that captures liveliness state from an OpenDDS built-in topic carrying the liveliness state of attached publishers to the DataReader of interest. When liveliness is triggered by the built-in topic, the implementation below simply creates an XMPP message and "chats" it into the chat room indicating how many devices are "present" in the session. The commenting in the code describes what is happening along the way.
void
MessageEngineDDSDataListenerImpl::on_liveliness_changed(DDS::DataReader* dr,
const DDS::LivelinessChangedStatus& status)
{
// Create a new XMPP Message to indicate how many devices are reporting.
boost::shared_ptr<Swift::Message> xmppMsg(new Swift::Message());
// Set the type of message to Groupchat so that the XMPP server will
// properly allow the message to be used in a Multi-User session
xmppMsg->setType(Swift::Message::Type::Groupchat);
// Populate the from and to fields of the stanza
xmppMsg->setFrom(localClient_->getJID());
xmppMsg->setTo(localRoom_);
std::ostringstream localMsg_ (std::ostringstream::out);
// Check to see how many devices are indicated by the built-in topic alive_count
// value and format a message based on the count.
if (status.alive_count > 0) {
localMsg_ << status.alive_count << ((status.alive_count == 1) ? " device is in the room" : " devices are in the room...");
xmppMsg->setBody(localMsg_.str());
} else {
xmppMsg->setBody("No devices are in the room...");
}
// Now send the message to the XMPP server
localClient_->sendMessage(xmppMsg);
if(verbose_) {
std::cout << "Liveliness: alive = " << status.alive_count << " not_alive = " << status.not_alive_count
<< " alive change = " << status.alive_count_change << " not alive count change = " << status.not_alive_count <<
std::endl;
aliveCount_ = status.alive_count;
}
}
- MessageEngine_Idl. This is a library created by the generated source resulting from the compilation of the MessageEngine.idl file. The data types and utility methods in the library are needed by any participant that uses a DDS message. In our case that is the MessageEngine and any Device.
- MessageEngine_DdsUtil. This library is a collection of useful DDS class implementations that allow any of our example participants that need to publish or subscribe to a device message to create a connection via an associated DDS topic.
DEVICE SIMULATION
In our example, we simulate the behavior of devices in software. The Device Simulation is both a DDS publisher and subscriber. It subscribes for commands and publishes sensor data, along with responses to the commands it receives. The fact that these commands are forwarded to the Device through the Message Engine is irrelevant to the Device Simulation. With the publish / subscribe paradigm, communication is anonymous. The Device Simulation is notified when a new sample is available by the Subscriber (implemented by the DDSFilteredSubscriberFacade
class. The mechanism for notification in DDS is through a callback on the Listener, which for our example is implemented by the DeviceSimulation class.
The DeviceSimulation is implemented as a Listener that is registered with the DDSFilteredSubscriberFacade
. Similar to the approach used in the Swift library, DDS uses callbacks on listeners when messages are received.
The Device Simulation callback method (on_data_available
) is invoked whenever a new publication is received by the device process. When an update is received by the client's DDS framework, the subscriber is notified (step 1) and can then invoke the callback on the Listener (step 2). The callback method is shown in the example code below:
- void DeviceSimulation::on_data_available(
- DDS::DataReader_ptr reader)
- ACE_THROW_SPEC((CORBA::SystemException))
- {
- // Safely downcast data reader to type-specific data reader
- Messenger::MessageDataReader_var reader_i = narrow(reader);
-
- Messenger::Message msg;
- DDS::SampleInfo info;
-
- // Remove (take) the next sample from the data reader
- DDS::ReturnCode_t error = reader_i->take_next_sample(msg, info);
-
- . . .
-
- // Parse the message to verify it is a valid command and process it
- this->parseMessage(msg);
-
- }
The DeviceSimulation callback then parses the message and if it is a valid command, that command is subsequently executed. The commands supported include: "set_pressure", "get_pressure", and "who". After executing the command, the Device Simulation sends either a confirmation or reply back through its own Publisher façade (steps 3 & 4). When responding to the "who" command, the device returns it's identifier through the publisher façade.
The main for the device simulation is shown in the following code snippet, which illustrates the setup of the device simulation and associated communication infrastructure.
- int
- ACE_TMAIN(int argc, ACE_TCHAR *argv[])
- {
- try {
- // Initialize DomainParticipantFactory, handling command line args
- DDS::DomainParticipantFactory_ptr dpf = TheParticipantFactoryWithArgs(argc, argv);
-
- if(argc != 2)
- {
- std::cerr << "usage: device_main <device_id>" << std::endl;
- exit(-1);
- }
- else
- {
- std::string device_name(argv[1]);
- }
-
- // Define the ID for the domain used in our example
- const DDS::DomainId_t domain = 42;
-
- // This publisher is used to send messages from
- // Enterprise Message Clients
- const char* deviceTopic = "DeviceMessages";
-
- DDSPublisherFacade publisher(dpf, domain, deviceTopic);
-
- // This subscriber is created so this client can receive messages
- // from XMPP clients
- const char* commandTopic = "CommandMessages";
-
- // Create a Subscriber and listener for messages coming from XMPP
- // Instantiate a DataReaderListener to use for the subscriber */
- DDS::DataReaderListener_var xmppMessagesListener(
- new DeviceSimulation(device_name.c_str(), &publisher));
-
- char command_filter[100];
- ::sprintf(command_filter,
- "(text like 'cmd:%%%s->%%') OR (text like 'cmd:%%who')",
- device_name.c_str());
-
- //Create a subscriber for receiving this client’s own messages */
- DDSFilteredSubscriberFacade xmppMessagesSubFacade(dpf,
- domain,
- commandTopic,
- command_filter,
- xmppMessagesListener);
-
- . . .
DDS segregates messages into Domains, which represent a partition in the global data space, separating the data into independent segments, which are logically isolated from one another. The publisher and subscriber components must agree on unique topics within the domain that identify points of interest for communication between them. Each topic is associated with a data type that is communicated between publishers and subscribers using the concepts of Instance and Sample. An Instance represents some data item that can change over time and each Sample representing a value taken by the datum.
A set of utility classes were created to simplify the interface for the client application (the DeviceSimulation code). The Publisher and Subscriber Façade classes (shown below) support the creation and management of DDS communication endpoints. These classes encapsulate the process of setting up the necessary elements for publishing and subscribing for data updates respectively. All of the setup and tear down operations are performed within these facades, allowing the application to construct them and use them until it is time to exit, without requiring detailed knowledge of the DDS communication mechanisms.
For this application we also take advantage of the ability to define filters in the subscriber process and send them to the publisher process. The filters defined in the subscriber process are forwarded to the publisher process and applied prior to sending publications to subscribers, providing a convenient mechanism to limit unnecessary communications using DDS.
The Device Simulation in this example serves to illustrate the functionality that would be needed to receive the messages and process them. The following code illustrates how the messages are constructed and published by the Device Simulation using the publisher façade. Unrelated to the requirements of communicating with the messaging bridge, we also track the number of messages issued by the Device Simulation.
- DDS::ReturnCode_t DeviceSimulation::sendMessage(
- const char* source,
- const char* subject,
- const char* text)
- {
- message_.subject = subject;
- message_.from = deviceId_.c_str();
- message_.text = text;
-
- DDS::ReturnCode_t status = publisher_->write(message_);
-
- ++message_.count;
- ++message_.subject_id;
-
- return status;
- }
Conclusion
In this article we have provided an overview of two contemporary messaging approaches, XMPP and DDS, and ,described how to integrate them using a bridging approach. Our discussion includes a top-level overview of the technologies and discusses how to map the central concepts from each of them to one another, as well as providing sample code for the major parts of the integrated system. Our motivating example provides a contrived but feasible scenario for bridging a domain supporting multi-user chat and devices reachable via a network interface.
XMPP is an open, extensible, and multi-featured messaging framework that is well suited for applications focused on user collaboration. XMPP is supported by multiple core specifications and extensions. The flexibility of XML for messaging allows a variety of custom functionality to be added while maintaining interoperability via the XMPP standards although XMPP suffers from communications inefficiency due to significant payload size compared to optimized payloads of other message exchange protocols.
DDS provides an efficient publish/subscribe messaging framework that works well in situations where performance is premium. A variety of features related to Quality of Service (QoS) are available to help implement a rich communication foundation. OpenDDS provides a free and open source implementation of the DDS framework with C++ and Java bindings. Additional information about OpenDDS and support options can be found at https://objectcomputing.com/products/opendds.
Our example messaging bridge illustrates integration of these messaging frameworks to ensure that all messages are translated and forwarded to the appropriate applications. Additional considerations, which would be necessary for a real world application, such as dealing with security aspects and fault tolerance, were not included in the example solution. The bridge, however does provide an example of how to accommodate the dynamics of each message framework along with managing the state of both sides of the bridge to ensure a coherent integration of the associated systems.
Example files
Software Engineering Tech Trends (SETT) is a regular publication featuring emerging trends in software engineering.