Logging Samples as JSON in OpenDDS Applications

Middleware News Brief (MNB) features news and technical information about Open Source middleware technologies.

       +------------+       +-----------+       +-----------+       +------------+                
       |            |       |           |       |           |       |            | read
 write | DataWriter |       |  Writer's |       |  Reader's | store | DataReader |------->
------>|   Cache    |------>| Transport |------>| Transport |------>|   Cache    | take
       |            |       |           |       |           |       |            |------->
       +------------+       +-----------+       +-----------+       +------------+

In this article, I demonstrate how to 1) serialize samples to JSON for the purpose of logging and 2) log all samples that are written, received (stored in a DataReader's cache), read, and taken in an OpenDDS application. Such a log may be useful for debugging and auditing.

The corresponding interfaces in OpenDDS are called ValueWriter and Observer. Together, these features allow one to create a log of all sample activity without adding custom code for each DataWriter and DataReader.

ValueWriter and Observer are not in the DDS Specification.

Motivation

A common requirement in distributed applications is to log messages that are exchanged with peers. This message log is often used for debugging but may have other uses, such as creating an audit trail for regulatory compliance. For this type of logging in DDS applications, we are interested in samples. That is, the logging should include a record when a sample is written, placed in a DataReader's cache, read, or taken.

With the exception of the samples placed in a DataReader's cache, all of these messages could be logged by inserting code at the appropriate place in the application. However, maintaining this code can become burdensome as an application grows in size. Thus, we desire a solution where a small amount of generic code can handle all DataWriters and DataReaders in an application. This is the motivation for the Observer interface.

The Observer interface receives metadata about the sample and a raw pointer to the sample data. What is needed for logging is a generic way of converting the sample data into a serialized representation. The ValueWriter interface that is described later allows one to process arbitrary samples by receiving events that describe the structure and content of the sample. In this article, we use an implementation of ValueWriter creatively called JsonValueWriter to serialize the sample data to a JSON representation.

Tutorial

If you wish to follow along, you will need a build of OpenDDS with RapidJSON support. Directions for enabling RapidJSON can be found in the OpenDDS documenation.

For this article, I modified the Messenger example in $DDS_ROOT/DevGuideExamples/Messenger. However, there is nothing special about this example, and the changes I outline should be applicable to your favorite OpenDDS applications.

First, I define my observer implementation SampleObserver by placing the following code in $DDS_ROOT/DevGuideExamples/Messenger/SampleObserver.h:

#ifndef SAMPLE_OBSERVER_H
#define SAMPLE_OBSERVER_H

#include <dds/DCPS/EntityImpl.h>
#include <dds/DCPS/JsonValueWriter.h>
#include <dds/DCPS/Observer.h>

class SampleObserver : public OpenDDS::DCPS::Observer {
public:
  void on_sample_sent(DDS::DataWriter_ptr, const Sample& s)
  {
    std::cout << "Sent " << OpenDDS::DCPS::to_json(s) << std::endl;
  }

  void on_sample_received(DDS::DataReader_ptr, const Sample& s)
  {
    std::cout << "Received " << OpenDDS::DCPS::to_json(s) << std::endl;
  }

  void on_sample_read(DDS::DataReader_ptr, const Sample& s)
  {
    std::cout << "Read " << OpenDDS::DCPS::to_json(s) << std::endl;
  }

  void on_sample_taken(DDS::DataReader_ptr, const Sample& s)
  {
    std::cout << "Taken " << OpenDDS::DCPS::to_json(s) << std::endl;
  }
};

#endif // SAMPLE_OBSERVER_H

I then include this file from $DDS_ROOT/DevGuideExamples/Messenger/Publisher.cpp and $DDS_ROOT/DevGuideExamples/Messenger/Publisher.cpp with

#include "SampleObserver.h"

Finally, I install the observer by adding two lines of code after the participant is created in $DDS_ROOT/DevGuideExamples/Messenger/Publisher.cpp and $DDS_ROOT/DevGuideExamples/Messenger/Publisher.cpp:

if (!participant) {
  ACE_ERROR_RETURN((LM_ERROR,
                    ACE_TEXT("ERROR: %N:%l: main() -")
                    ACE_TEXT(" create_participant failed!\n")),
                   1);
}

// Add these two lines.
OpenDDS::DCPS::Observer_rch observer = OpenDDS::DCPS::make_rch<SampleObserver>();
dynamic_cast<OpenDDS::DCPS::EntityImpl*>(participant.in())->set_observer(observer, SampleObserver::e_SAMPLE_SENT | SampleObserver::e_SAMPLE_RECEIVED | SampleObserver::e_SAMPLE_READ | SampleObserver::e_SAMPLE_TAKEN);

When I build and run the example, I see output that includes the following lines:

...
Taken {"instance":11,"instance_state":1,"timestamp":{"sec":1620928055,"nanosec":900743000},"sequence_number":2,"data":{"from":"Comic Book Guy","subject":"Review","subject_id":99,"text":"Worst. Movie. Ever.","count":0}}
...
Sent {"instance":13,"instance_state":1,"timestamp":{"sec":1620928055,"nanosec":900743000},"sequence_number":2,"data":{"from":"Comic Book Guy","subject":"Review","subject_id":99,"text":"Worst. Movie. Ever.","count":0}}
...
Received {"instance":11,"instance_state":1,"timestamp":{"sec":1620928055,"nanosec":900743000},"sequence_number":2,"data":{"from":"Comic Book Guy","subject":"Review","subject_id":99,"text":"Worst. Movie. Ever.","count":0}}
...

I also see lines like:

...
Received {"instance":8,"instance_state":1,"timestamp":{"sec":1620928055,"nanosec":875746000},"sequence_number":2,"data":{"key":{"value":[1,3,0,0,63,254,244,203,0,0,0,3,0,0,8,69]},"name":"Movie Discussion List","type_name":"Messenger::Message","durability":{"kind":"VOLATILE_DURABILITY_QOS"},"durability_service":{"service_cleanup_delay":{"sec":0,"nanosec":0},"history_kind":"KEEP_LAST_HISTORY_QOS","history_depth":1,"max_samples":-1,"max_instances":-1,"max_samples_per_instance":-1},"deadline":{"period":{"sec":2147483647,"nanosec":2147483647}},"latency_budget":{"duration":{"sec":0,"nanosec":0}},"liveliness":{"kind":"AUTOMATIC_LIVELINESS_QOS","lease_duration":{"sec":2147483647,"nanosec":2147483647}},"reliability":{"kind":"BEST_EFFORT_RELIABILITY_QOS","max_blocking_time":{"sec":2147483647,"nanosec":2147483647}},"transport_priority":{"value":0},"lifespan":{"duration":{"sec":2147483647,"nanosec":2147483647}},"destination_order":{"kind":"BY_RECEPTION_TIMESTAMP_DESTINATIONORDER_QOS"},"history":{"kind":"KEEP_LAST_HISTORY_QOS","depth":1},"resource_limits":{"max_samples":-1,"max_instances":-1,"max_samples_per_instance":-1},"ownership":{"kind":"SHARED_OWNERSHIP_QOS"},"topic_data":{"value":[]},"representation":{"value":[]}}}
...

Discussion

The file $DDS_ROOT/dds/DCPS/Observer.h defines the OpenDDS::DCPS::Observer interface. My implementation of the Observer interface overrides four methods that will be invoked when a sample is:

  • written (on_sample_sent)
  • received by the DataReader (on_sample_received)
  • read (on_sample_read)
  • taken (on_sample_taken)

These methods take a pointer to the DataWriter/DataReader and an Observer::Sample.

The Observer::Sample type is also defined in $DDS_ROOT/dds/DCPS/Observer.h and includes all of the following:

  • An instance handle
  • An instance state
  • A timestamp
  • A sequence number
  • A raw pointer to the sample data
  • A reference to a ValueWriterDispatcher

The ValueWriterDispatcher knows how to apply a ValueWriter to the raw pointer (more on this later). My implementation calls OpenDDS::DCPS::to_json for each sample to convert it to a string containing a JSON representation of the sample.

OpenDDS::DCPS::to_json is defined in $DDS_ROOT/dds/DCPS/JsonValueWriter.h.

To use my SampleObserver, I dynamically allocate one with OpenDDS::DCPS::make_rch and then set it on the participant, which requires a cast to OpenDDS::DCPS::EntityImpl*, as the Observer interface is not in the DDS specification.

("rch" means "reference counted handle", i.e., a smart pointer. OpenDDS::DCPS::make_rch is a helper function that dynamically allocates an object and returns the reference counted handle.)

When attaching the observer, I pass a mask indicating the events that my observer should receive. The events for which I register correspond to the methods implemented in SampleObserver.

The output produced by my sample observer contains entries for the DataWriter (prefixed by Sent) and the DataReader (prefixed by Received and Taken). In the Messenger example, the reader takes samples instead of reading them, so there are no entries prefixed with Read. The JSON corresponds to the members of the Observer::Sample sans the ValueWriterDispatcher. The actual sample data is stored in the data member.

Logically, the console output doesn't make sense because the sample is taken before it is sent and received. However, this is easily explained with the non-determinism introduced by multiple threads and output buffering.

Since the observer is attached to the participant, it also receives samples from the built-in topics. Attaching an observer to a built-in topic DataReader, the built-in topic Subscriber, or a participant allows one to log information about discovery in a generic way as well.

Observer Interface Details

The Observer interface is defined in $DDS_ROOT/dds/DCPS/Observer.h. Implementations of the Observer interface can receive an event when a DataWriter/DataReader

  1. is enabled, deleted, or has its QoS changed
  2. associates or disassociates with a remote
  3. writes (sends), receives, reads, or takes a sample

The actual events that will be received are controlled by a mask that is set when attaching the observer to an entity.

An instance of the Observer interface can be attached to any DDS Entity, including Participants, Publishers, Subscribers, DataWriters, and DataReaders by casting to OpenDDS::DCPS::EntityImpl. This forms a natural hierarchy where the lowest observer that is interested in an event will be the one to receive it.

For example, suppose that an observer A that is interested in associated/disassociated events and sample sent events has been attached to a Participant, and another observer B interested solely in sample sent events is attached to a DataWriter belonging to a Publisher belonging to that Participant. When a sample is written, observer B will receive the event. When an association/disassociation occurs, observer A will receive the event.

ValueWriter Interface Details

The ValueWriter interface is defined in $DDS_ROOT/dds/DCPS/ValueWriter.h. The OpenDDS IDL application generates a function called vwrite that takes a ValueWriter and an object and invokes events on the ValueWriter according to the structure and composition of the object.

For types not defined by IDL, one can define a vwrite function manually. For example, the $DDS_ROOT/dds/DCPS/Observer.cpp file defines a vwrite function for the Observer::Sample class:

void vwrite(ValueWriter& vw, const Observer::Sample& sample)
{
   vw.begin_struct();
   vw.begin_struct_member("instance");
   vw.write_int32(sample.instance);
   vw.end_struct_member();
   vw.begin_struct_member("instance_state");
   vw.write_uint32(sample.instance_state);
   vw.end_struct_member();
   vw.begin_struct_member("timestamp");
   vwrite(vw, sample.timestamp);
   vw.end_struct_member();
   vw.begin_struct_member("sequence_number");
   vw.write_int64(sample.sequence_number.getValue());
   vw.end_struct_member();
   vw.begin_struct_member("data");
   sample.data_dispatcher.write(vw, sample.data);
   vw.end_struct_member();
   vw.end_struct();
}

This example shows how an Observer::Sample is conveyed to a ValueWriter. It is a struct (begin_struct/end_struct). Each member is called out (begin_struct_member/end_struct_member), and the value for each member is written, e.g., write_int32.

A vwrite function may call other vwrite functions to handle aggregation, e.g., vwrite(vw, sample.timestamp). The vwrite function for Observer::Sample uses the ValueWriterDispatcher to invoke the appropriate vwrite method for the raw data pointer:

sample.data_dispatcher.write(vw, sample.data);

The OpenDDS::DCPS::to_json function used in this article uses an implementation of ValueWriter called JsonValueWriter which is defined in $DDS_ROOT/dds/DCPS/JsonValueWriter.h. The implementation of JsonValueWriter dispatches to the RapidJSON library for the heavy lifting of serialization. However, the reader familiar with JSON will correctly assume that begin_struct inserts a { in the output stream, begin_struct_member inserts "NAME":, and so on.

The ValueWriter interface is designed to accommodate a variety of serialization formats, whether they are hierarchical text-based formats like JSON, HTML, XML, and YAML or binary formats like CDR/XCDR, BSON, or Google Protocol Buffers.

Conclusion

Logging is a necessary feature for distributed applications, including applications built using OpenDDS. A generic approach to intercepting relevant events and samples and serializing those samples is necessary for long-term maintainability.

The Observer interface in OpenDDS allows one to attach an object to any DDS Entity that can receive events for enabled status changes, QoS changes, association changes, and sample creation, storage, and access. The ValueWriter interface allows one to define generic objects for serializing samples.

OpenDDS comes with an implementation of ValueWriter called JsonValueWriter which can serialize samples as JSON. Users of OpenDDS are free to create other implementations of ValueWriter to satisfy their logging needs.

Next Steps

secret