June 09, 2021 - By Justin Wilson, OCI Principal Software Engineer
Middleware News Brief (MNB) features news and technical information about Open Source middleware technologies.
Contents
- Introduction
- Motivation
- Tutorial
- Discussion
- Observer Interface Details
- ValueWriter Interface Details
- Conclusion
- Next Steps
Introduction
+------------+ +-----------+ +-----------+ +------------+
| | | | | | | | read
write | DataWriter | | Writer's | | Reader's | store | DataReader |------->
------>| Cache |------>| Transport |------>| Transport |------>| Cache | take
| | | | | | | |------->
+------------+ +-----------+ +-----------+ +------------+
In this article, I demonstrate how to 1) serialize samples to JSON for the purpose of logging and 2) log all samples that are written, received (stored in a DataReader's cache), read, and taken in an OpenDDS application. Such a log may be useful for debugging and auditing.
The corresponding interfaces in OpenDDS are called ValueWriter
and Observer
.
Together, these features allow one to create a log of all sample activity without adding custom code for each DataWriter and DataReader.
ValueWriter
and Observer
are not in the DDS Specification.
Motivation
A common requirement in distributed applications is to log messages that are exchanged with peers. This message log is often used for debugging but may have other uses, such as creating an audit trail for regulatory compliance. For this type of logging in DDS applications, we are interested in samples. That is, the logging should include a record when a sample is written, placed in a DataReader's cache, read, or taken.
With the exception of the samples placed in a DataReader's cache, all of these messages could be logged by inserting code at the appropriate place in the application.
However, maintaining this code can become burdensome as an application grows in size.
Thus, we desire a solution where a small amount of generic code can handle all DataWriters and DataReaders in an application.
This is the motivation for the Observer
interface.
The Observer
interface receives metadata about the sample and a raw pointer to the sample data.
What is needed for logging is a generic way of converting the sample data into a serialized representation.
The ValueWriter
interface that is described later allows one to process arbitrary samples by receiving events that describe the structure and content of the sample.
In this article, we use an implementation of ValueWriter
creatively called JsonValueWriter
to serialize the sample data to a JSON representation.
Tutorial
If you wish to follow along, you will need a build of OpenDDS with RapidJSON support. Directions for enabling RapidJSON can be found in the OpenDDS documenation.
For this article, I modified the Messenger example in $DDS_ROOT/DevGuideExamples/Messenger
.
However, there is nothing special about this example, and the changes I outline should be applicable to your favorite OpenDDS applications.
First, I define my observer implementation SampleObserver
by placing the following code in $DDS_ROOT/DevGuideExamples/Messenger/SampleObserver.h
:
#ifndef SAMPLE_OBSERVER_H
#define SAMPLE_OBSERVER_H
#include <dds/DCPS/EntityImpl.h>
#include <dds/DCPS/JsonValueWriter.h>
#include <dds/DCPS/Observer.h>
class SampleObserver : public OpenDDS::DCPS::Observer {
public:
void on_sample_sent(DDS::DataWriter_ptr, const Sample& s)
{
std::cout << "Sent " << OpenDDS::DCPS::to_json(s) << std::endl;
}
void on_sample_received(DDS::DataReader_ptr, const Sample& s)
{
std::cout << "Received " << OpenDDS::DCPS::to_json(s) << std::endl;
}
void on_sample_read(DDS::DataReader_ptr, const Sample& s)
{
std::cout << "Read " << OpenDDS::DCPS::to_json(s) << std::endl;
}
void on_sample_taken(DDS::DataReader_ptr, const Sample& s)
{
std::cout << "Taken " << OpenDDS::DCPS::to_json(s) << std::endl;
}
};
#endif // SAMPLE_OBSERVER_H
I then include this file from $DDS_ROOT/DevGuideExamples/Messenger/Publisher.cpp
and $DDS_ROOT/DevGuideExamples/Messenger/Publisher.cpp
with
#include "SampleObserver.h"
Finally, I install the observer by adding two lines of code after the participant is created in $DDS_ROOT/DevGuideExamples/Messenger/Publisher.cpp
and $DDS_ROOT/DevGuideExamples/Messenger/Publisher.cpp
:
if (!participant) {
ACE_ERROR_RETURN((LM_ERROR,
ACE_TEXT("ERROR: %N:%l: main() -")
ACE_TEXT(" create_participant failed!\n")),
1);
}
// Add these two lines.
OpenDDS::DCPS::Observer_rch observer = OpenDDS::DCPS::make_rch<SampleObserver>();
dynamic_cast<OpenDDS::DCPS::EntityImpl*>(participant.in())->set_observer(observer, SampleObserver::e_SAMPLE_SENT | SampleObserver::e_SAMPLE_RECEIVED | SampleObserver::e_SAMPLE_READ | SampleObserver::e_SAMPLE_TAKEN);
When I build and run the example, I see output that includes the following lines:
...
Taken {"instance":11,"instance_state":1,"timestamp":{"sec":1620928055,"nanosec":900743000},"sequence_number":2,"data":{"from":"Comic Book Guy","subject":"Review","subject_id":99,"text":"Worst. Movie. Ever.","count":0}}
...
Sent {"instance":13,"instance_state":1,"timestamp":{"sec":1620928055,"nanosec":900743000},"sequence_number":2,"data":{"from":"Comic Book Guy","subject":"Review","subject_id":99,"text":"Worst. Movie. Ever.","count":0}}
...
Received {"instance":11,"instance_state":1,"timestamp":{"sec":1620928055,"nanosec":900743000},"sequence_number":2,"data":{"from":"Comic Book Guy","subject":"Review","subject_id":99,"text":"Worst. Movie. Ever.","count":0}}
...
I also see lines like:
...
Received {"instance":8,"instance_state":1,"timestamp":{"sec":1620928055,"nanosec":875746000},"sequence_number":2,"data":{"key":{"value":[1,3,0,0,63,254,244,203,0,0,0,3,0,0,8,69]},"name":"Movie Discussion List","type_name":"Messenger::Message","durability":{"kind":"VOLATILE_DURABILITY_QOS"},"durability_service":{"service_cleanup_delay":{"sec":0,"nanosec":0},"history_kind":"KEEP_LAST_HISTORY_QOS","history_depth":1,"max_samples":-1,"max_instances":-1,"max_samples_per_instance":-1},"deadline":{"period":{"sec":2147483647,"nanosec":2147483647}},"latency_budget":{"duration":{"sec":0,"nanosec":0}},"liveliness":{"kind":"AUTOMATIC_LIVELINESS_QOS","lease_duration":{"sec":2147483647,"nanosec":2147483647}},"reliability":{"kind":"BEST_EFFORT_RELIABILITY_QOS","max_blocking_time":{"sec":2147483647,"nanosec":2147483647}},"transport_priority":{"value":0},"lifespan":{"duration":{"sec":2147483647,"nanosec":2147483647}},"destination_order":{"kind":"BY_RECEPTION_TIMESTAMP_DESTINATIONORDER_QOS"},"history":{"kind":"KEEP_LAST_HISTORY_QOS","depth":1},"resource_limits":{"max_samples":-1,"max_instances":-1,"max_samples_per_instance":-1},"ownership":{"kind":"SHARED_OWNERSHIP_QOS"},"topic_data":{"value":[]},"representation":{"value":[]}}}
...
Discussion
The file $DDS_ROOT/dds/DCPS/Observer.h
defines the OpenDDS::DCPS::Observer
interface.
My implementation of the Observer
interface overrides four methods that will be invoked when a sample is:
- written (
on_sample_sent
) - received by the DataReader (
on_sample_received
) - read (
on_sample_read
) - taken (
on_sample_taken
)
These methods take a pointer to the DataWriter/DataReader and an Observer::Sample
.
The Observer::Sample
type is also defined in $DDS_ROOT/dds/DCPS/Observer.h
and includes all of the following:
- An instance handle
- An instance state
- A timestamp
- A sequence number
- A raw pointer to the sample data
- A reference to a
ValueWriterDispatcher
The ValueWriterDispatcher
knows how to apply a ValueWriter
to the raw pointer (more on this later).
My implementation calls OpenDDS::DCPS::to_json
for each sample to convert it to a string containing a JSON representation of the sample.
OpenDDS::DCPS::to_json
is defined in $DDS_ROOT/dds/DCPS/JsonValueWriter.h
.
To use my SampleObserver
, I dynamically allocate one with OpenDDS::DCPS::make_rch
and then set it on the participant, which requires a cast to OpenDDS::DCPS::EntityImpl*
, as the Observer interface is not in the DDS specification.
("rch" means "reference counted handle", i.e., a smart pointer. OpenDDS::DCPS::make_rch
is a helper function that dynamically allocates an object and returns the reference counted handle.)
When attaching the observer, I pass a mask indicating the events that my observer should receive.
The events for which I register correspond to the methods implemented in SampleObserver
.
The output produced by my sample observer contains entries for the DataWriter (prefixed by Sent
) and the DataReader (prefixed by Received
and Taken
).
In the Messenger example, the reader takes samples instead of reading them, so there are no entries prefixed with Read
.
The JSON corresponds to the members of the Observer::Sample
sans the ValueWriterDispatcher
.
The actual sample data is stored in the data
member.
Logically, the console output doesn't make sense because the sample is taken before it is sent and received. However, this is easily explained with the non-determinism introduced by multiple threads and output buffering.
Since the observer is attached to the participant, it also receives samples from the built-in topics. Attaching an observer to a built-in topic DataReader, the built-in topic Subscriber, or a participant allows one to log information about discovery in a generic way as well.
Observer Interface Details
The Observer interface is defined in $DDS_ROOT/dds/DCPS/Observer.h
.
Implementations of the Observer interface can receive an event when a DataWriter/DataReader
- is enabled, deleted, or has its QoS changed
- associates or disassociates with a remote
- writes (sends), receives, reads, or takes a sample
The actual events that will be received are controlled by a mask that is set when attaching the observer to an entity.
An instance of the Observer
interface can be attached to any DDS Entity, including Participants, Publishers, Subscribers, DataWriters, and DataReaders by casting to OpenDDS::DCPS::EntityImpl
.
This forms a natural hierarchy where the lowest observer that is interested in an event will be the one to receive it.
For example, suppose that an observer A that is interested in associated/disassociated events and sample sent events has been attached to a Participant, and another observer B interested solely in sample sent events is attached to a DataWriter belonging to a Publisher belonging to that Participant. When a sample is written, observer B will receive the event. When an association/disassociation occurs, observer A will receive the event.
ValueWriter Interface Details
The ValueWriter
interface is defined in $DDS_ROOT/dds/DCPS/ValueWriter.h
.
The OpenDDS IDL application generates a function called vwrite
that takes a ValueWriter
and an object and invokes events on the ValueWriter
according to the structure and composition of the object.
For types not defined by IDL, one can define a vwrite
function manually. For example, the $DDS_ROOT/dds/DCPS/Observer.cpp
file defines a vwrite
function for the Observer::Sample
class:
void vwrite(ValueWriter& vw, const Observer::Sample& sample)
{
vw.begin_struct();
vw.begin_struct_member("instance");
vw.write_int32(sample.instance);
vw.end_struct_member();
vw.begin_struct_member("instance_state");
vw.write_uint32(sample.instance_state);
vw.end_struct_member();
vw.begin_struct_member("timestamp");
vwrite(vw, sample.timestamp);
vw.end_struct_member();
vw.begin_struct_member("sequence_number");
vw.write_int64(sample.sequence_number.getValue());
vw.end_struct_member();
vw.begin_struct_member("data");
sample.data_dispatcher.write(vw, sample.data);
vw.end_struct_member();
vw.end_struct();
}
This example shows how an Observer::Sample
is conveyed to a ValueWriter
.
It is a struct (begin_struct/end_struct
).
Each member is called out (begin_struct_member/end_struct_member
), and the value for each member is written, e.g., write_int32
.
A vwrite
function may call other vwrite
functions to handle aggregation, e.g., vwrite(vw, sample.timestamp)
.
The vwrite
function for Observer::Sample
uses the ValueWriterDispatcher
to invoke the appropriate vwrite
method for the raw data pointer:
sample.data_dispatcher.write(vw, sample.data);
The OpenDDS::DCPS::to_json
function used in this article uses an implementation of ValueWriter
called JsonValueWriter
which is defined in $DDS_ROOT/dds/DCPS/JsonValueWriter.h
.
The implementation of JsonValueWriter
dispatches to the RapidJSON library for the heavy lifting of serialization.
However, the reader familiar with JSON will correctly assume that begin_struct
inserts a {
in the output stream, begin_struct_member
inserts "NAME":
, and so on.
The ValueWriter
interface is designed to accommodate a variety of serialization formats, whether they are hierarchical text-based formats like JSON, HTML, XML, and YAML or binary formats like CDR/XCDR, BSON, or Google Protocol Buffers.
Conclusion
Logging is a necessary feature for distributed applications, including applications built using OpenDDS. A generic approach to intercepting relevant events and samples and serializing those samples is necessary for long-term maintainability.
The Observer
interface in OpenDDS allows one to attach an object to any DDS Entity that can receive events for enabled status changes, QoS changes, association changes, and sample creation, storage, and access.
The ValueWriter
interface allows one to define generic objects for serializing samples.
OpenDDS comes with an implementation of ValueWriter
called JsonValueWriter
which can serialize samples as JSON.
Users of OpenDDS are free to create other implementations of ValueWriter
to satisfy their logging needs.
Next Steps
- See the Observer section in the Developer's Guide.
- Check out the Observer test.
- Check out at example of
OpenDDS::DCPS::to_json
in the RtpsRelay. - Implement a
ValueWriter
for your own logging needs.