Using Futures to Simplify CORBA AMI

Middleware News Brief (MNB) features news and technical information about Open Source middleware technologies.

Introduction

Many applications perform actions that take a significant amount of time — perform a database query, retrieve data from a web server, generate a report, and the like. It is beneficial to run these actions separately from the main thread of the application. Multiple tasks may be able to be executed simultaneously, improving overall application performance. In addition, moving long-running operations out of the thread that manages the user interface yields a better user experience — the application will no longer "lock up" when a long-running task is executing.

Multiprocessing, however, adds complexity to the code of the application. What had been simple, sequential execution is now cluttered with thread management, synchronization, and, depending on the method used, callback functions moving result processing out of the linear flow of the application. Subtle bugs can be introduced, and debugging multithreaded code can be a challenge.

Hiding the complexity inherent in asynchronous execution, so programming logic still appears sequential, is beneficial. One technique to accomplish this is known as a future.

Futures

Futures were first discussed in the latter half of the 1970s [1], and have been implemented in a number of programming languages, such as in Python [2, 3], Ruby [4], and Java 5 [5]. While not officially part of C++, implementations can be found [6, 7]. In this article, we show how futures can be used to simplify asynchronous execution. We apply the technique to CORBA Asynchronous Method Invocation (AMI) [8, 9] as an example.

Futures exhibit two criteria. For ease of discussion, I will refer to the thread of execution that hosts the future object as the "main" thread, and the means of executing the asynchronously-running action as a "background" thread.

  1. The future object manages a task running on a background thread that produces a result.
  2. The future provides a means, from the main thread, to obtain the result produced by the task, blocking the execution of the main thread if the task has not yet completed. If, instead of producing a result, the task threw an exception in the background thread, the future provides a means for the exeception to be thrown in the main thread.

The first criterion can be implemented in two different ways. The task can be started on the background thread as soon as the future object is created, or the starting of the task can be deferred until the result of the task is needed. In the former instance, greater asynchrony can be achieved, but in the latter, less work overall needs to be performed if the result of the task is never actually required.

The second criterion can also be implemented in two different ways. The result can be returned explicitly, such as by a method named Get(). The result can be returned implicitly, where the future object is directly convertible to the result type. An explicit Get() method on the future object makes it clear when blocking may occur, but an implicit conversion can make code easier to read.

Case Study: A Client-Server System

Consider a client-server system where a client invokes a CORBA method on objects in each of several different CORBA servers, and collects the results. To simulate such a system, we define an interface, MyInterface, in IDL as follows:

// IDL/MyInterface.idl
module MyModule {
  exception MyException { string what; };

  interface MyInterface {
    long DoWork(in long delay_ms, in boolean throw_exception) 
        raises(MyException);
  };
};

where DoWork() is implemented by a server as:

// Server/MyInterface_i.cpp
CORBA::Long MyModule_MyInterface_i::DoWork(::CORBA::Long delay_ms, 
  CORBA::Boolean throw_exception) 
{
  Sleep(delay_ms);
  if (throw_exception)
    throw MyModule::MyException(
      ::CORBA::string_dup("DoWork exception"));
  return rank_;
}

The parameter delay_ms simulates the time taken for the method to perform useful work, and, if throw_exception is true, then the method will fail by throwing the CORBA user exception MyException. If the method executes successfully, it returns the value of rank_, an integer set by a value passed on the server command line. The test scripts in the Tests subdirectory of the associated code archive that accompanies this article start five servers in this manner. Code has been tested with Visual Studio 2010 on 32-bit Windows XP and 64-bit Windows 7, and GCC 4.4.2 on 64-bit GNU/Linux.

Synchronous Method Invocation

We start with a client that invokes the DoWork() method on each server, in turn. Results are accumulated in a std::vector<>, and then displayed:

// Client_Sync/Client_Sync.cpp
void DoWorkTest(CORBA::ORB_ptr orb, int num_servers) {
  try {
    std::cout << "DoWorkTest" << std::endl;
 
    // Call methods
    std::vector<CORBA::Long> results;
    for (int i=0; i<num_servers; i++) {
      MyModule::MyInterface_var myInterface = GetReference(orb, i);
      results.push_back(myInterface->DoWork(1000, false));
    }
 
    // Loop and display results
    for (std::vector<CORBA::Long>::iterator it = results.begin(); 
      it != results.end(); it++) 
      std::cout << "Rank: " << *it << std::endl;
  }
  catch (MyModule::MyException& ex) {
    std::cerr << "MyException exception: " << ex.what << std::endl;
  }
  catch (CORBA::Exception& ex) {
    std::cerr << "CORBA exception: " << ex << std::endl;
  }
}

Passing 1000 to DoWork() causes the method to wait for one second before returning, so, as shown by Tests/run_test_sync.pl, running this client against five servers takes approximately five seconds, as the calls are made synchronously.

DoWorkTest
Rank: 0
Rank: 1
Rank: 2
Rank: 3
Rank: 4
Elapsed: 5041 ms

Asynchronous Method Invocation

We can improve performance by introducing AMI, though it adds complexity to the code. We first activate a POA, as the client must act as a CORBA server in order to process the AMI callbacks.

// Client_AMI/Client_AMI.cpp
int main(int argc, char* argv[]) {
  // ...code
  
  CORBA::Object_var obj = orb->resolve_initial_references("RootPOA");
  PortableServer::POA_var poa = PortableServer::POA::_narrow(obj.in());
  PortableServer::POAManager_var mgr = poa->the_POAManager();
  mgr->activate();

Next, for each request, we create a reply-handler servant, and pass it to sendc_DoWork(). The handler will be invoked when the method completes. We maintain a std::vector<> of servants for later use.

// Client_AMI/Client_AMI.cpp
void DoWorkTest(CORBA::ORB_ptr orb, PortableServer::POA_ptr poa, 
  int num_servers) 
{
  try {
    std::cout << "DoWorkTest" << std::endl;
 
    std::vector<PortableServer::Servant_var<
      MyModule_AMI_MyInterfaceHandler_i> > servants;
    for (int i=0; i<num_servers; i++) {
      MyModule::MyInterface_var myInterface = GetReference(orb, i);
 
      // Create a reply-handler servant
      PortableServer::Servant_var<MyModule_AMI_MyInterfaceHandler_i>
        replyHandler_servant = new MyModule_AMI_MyInterfaceHandler_i();
      PortableServer::ObjectId_var oid =
        poa->activate_object(replyHandler_servant.in());
      CORBA::Object_var handler_obj = poa->id_to_reference(oid.in());
      MyModule::AMI_MyInterfaceHandler_var replyHandler =
        MyModule::AMI_MyInterfaceHandler::_narrow(handler_obj.in());
      servants.push_back(replyHandler_servant);
 
      // Call the method
      myInterface->sendc_DoWork(replyHandler.in(), 1000, false);
    }

The reply-handler implements two methods. The first receives and stores the return from DoWork(), when the method executes successfully:

// Client_AMI/MyReplyHandler.cpp
void MyModule_AMI_MyInterfaceHandler_i::DoWork(
  CORBA::Long ami_return_val) 
{
  ACE_Guard<ACE_Thread_Mutex> guard(lock_);
  reply_received_ = true;
  rank_ = ami_return_val;
}

The other is for processing any exceptions that are thrown.

// Client_AMI/MyReplyHandler.cpp
void MyModule_AMI_MyInterfaceHandler_i::DoWork_excep(
  ::Messaging::ExceptionHolder *excep_holder) 
{
  try {
    excep_holder->raise_exception();
  }
  catch (CORBA::SystemException& e) {
    std::cout << "DoWork System Exception " << e << std::endl;
  }
  catch (CORBA::UserException& e) {
    std::cout << "DoWork User Exception " << e << std::endl;
  }
}

The client's ORB must be active to process the reply handlers, so the client calls perform_work() on the ORB until all replies have been received.

// Client_AMI/Client_AMI.cpp
    while (1) {

      // Check to see if reply has been returned.
      if (orb->work_pending()) {
        orb->perform_work(); // ORB will invoke reply handler here
                
        // continue to loop until all replies have been received
        int replies_received = 0;
        for (std::vector<PortableServer::Servant_var<
          MyModule_AMI_MyInterfaceHandler_i> >::iterator it = 
            servants.begin(); it != servants.end(); it++) 
        {
            if ((*it)->ReplyReceived())
              replies_received++;
        }

        if (replies_received == num_servers)
          break;
      }
    }

Once all asynchronous method invocations have been completed, the client then displays the results.

// Client_AMI/Client_AMI.cpp
    // Loop and display results
    for (std::vector<PortableServer::Servant_var<
      MyModule_AMI_MyInterfaceHandler_i> >::iterator it = 
        servants.begin(); it != servants.end(); it++) 
    {
      std::cout << "Rank: " << (*it)->GetRank() << std::endl;
    }
  }
  catch (MyModule::MyException& ex) {
    std::cerr << "MyException exception: " << ex.what << std::endl;
  }
  catch (CORBA::Exception& ex) {
    std::cerr << "CORBA exception: " << ex << std::endl;
  }
}

Executing Tests/run_test_ami.pl shows that execution now takes just over one second, as the calls are executed in parallel. Performance has been gained at the expense of code complexity.

DoWorkTest
Rank: 0
Rank: 1
Rank: 2
Rank: 3
Rank: 4
Elapsed: 1019 ms

Futures

Although performance has improved, the code to perform an asynchronous invocation and process its result has become significantly more complex. We can return to the simplicity of the synchronous version with the use of futures.

Our implementation of a future consists of two parts, FutureData<T> to store the result or exception returned by the asynchronous method, and Future<T>, the future itself.

FutureData<T> contains a lock, in addition to the result or exception returned from the method, and, as before, an indication of whether or not the reply has been received.

// Client_Future/Future.h
template <typename T>
class FutureData {
  ACE_Thread_Mutex lock_;
  T result_;
  ::Messaging::ExceptionHolder_var excep_holder_;
  bool haveReply_;
public:
  FutureData() : haveReply_(false) {}

The methods SetResult() and SetException() will be called by the reply-handler, while HaveReply() will be called by the future, running in the main thread, so the lock is used to ensure access to the haveReply_ boolean is synchronized.

// Client_Future/Future.h
  void SetResult(T result) {
    result_ = result;
    ACE_Guard<ACE_Thread_Mutex> guard(lock_);
    haveReply_ = true;
  }

  void SetException(::Messaging::ExceptionHolder *excep_holder) {
    excep_holder->_add_ref();
    excep_holder_ = excep_holder;
    ACE_Guard<ACE_Thread_Mutex> guard(lock_);
    haveReply_ = true;
  }

  bool HaveReply() {
    ACE_Guard<ACE_Thread_Mutex^gt; guard(lock_);
    return haveReply_;
  }

GetResult() and GetException() will only be called once the AMI has completed, so there is no need to acquire the lock.

// Client_Future/Future.h
  T GetResult() const { return result_; }
  ::Messaging::ExceptionHolder_var GetException() const 
    { return excep_holder_; }
};

A Future is constructed with a reference to the ORB, the asynchronous method to execute (represented as a boost::function object [10]), and a pointer to the FutureData that is to be associated with the future. The constructor makes the asynchronous invocation by evaluating the function that was passed in.

// Client_Future/Future.h
template <typename T>
class Future {
  boost::shared_ptr<FutureData<T> > data_;
  CORBA::ORB_var orb_;
public:
  Future(CORBA::ORB_ptr orb, 
    boost::function<void()> f, 
    boost::shared_ptr<FutureData<T> > data) : 
    orb_(CORBA::ORB::_duplicate(orb)), data_(data) 
  {
    f();
  }

The Get() method returns the result of the invocation, or throws the execption that the call generated. If the result is not yet available, Get() calls Wait() to block until the call is complete. Note that instead of evaluating f() in the constructor, it could be done here, so the call would not be made unless the result was requested.

// Client_Future/Future.h
  // explicit Get()
  T Get() {
    if (!data_->HaveReply())
      Wait();
    if (data_->GetException())
      data_->GetException()->raise_exception();
    return data_->GetResult();
  }

We also provide an implicit conversion of the Future<T> object to the return type, so Get() need not be explicitly used.

// Client_Future/Future.h
  // implicit Get()
  operator T() { return Get(); }

Finally, if the method invocation has not yet completed, Wait() calls perform_work() on the ORB until a reply is returned.

// Client_Future/Future.h
  void Wait() {
    while (!data_->HaveReply()) {
      if (orb_->work_pending()) 
        orb_->perform_work(); 
    }
  }
};

The binding of the AMI method to the boost::function is simplified through the use of two templated methods. First, CreateReplyHandler() is used to create a reply handler for an arbitrary servant type.

// Client_Future/Future.h
template <typename SERVANT_TYPE, typename HANDLER, 
  typename METHOD_RETURN>
HANDLER * CreateReplyHandler(PortableServer::POA_ptr poa, 
  boost::shared_ptr<FutureData<METHOD_RETURN> > data) 
{
  PortableServer::Servant_var<POA_Messaging::ReplyHandler> 
    replyHandler_servant = new SERVANT_TYPE(data);
  PortableServer::ObjectId_var oid = poa->activate_object(
    replyHandler_servant.in());
  CORBA::Object_var handler_obj = poa->id_to_reference(oid.in());
  return HANDLER::_narrow(handler_obj.in());
}

Second, CreateFuture() is used to create the future itself. This version of the template accepts an AMI call that takes two parameters — methods taking different numbers of parameters can be easily written.

// Client_Future/Future.h
template <typename METHOD_RETURN, typename SERVANT_TYPE, 
  typename HANDLER, typename P1, typename P2, typename INTERFACE>
Future<METHOD_RETURN> CreateFuture(CORBA::ORB_ptr orb, 
  PortableServer::POA_ptr poa, INTERFACE *myInterface, 
  void (INTERFACE::*method)(HANDLER *, P1, P2), P1 param1, P2 param2) 
{
  // create a Future object with the supplied method as its function
  boost::shared_ptr<FutureData<METHOD_RETURN> > data(
    new FutureData<METHOD_RETURN>);
  return Future<METHOD_RETURN>(
    orb, 
    boost::bind(method, myInterface, 
      CreateReplyHandler<SERVANT_TYPE, HANDLER>(poa, data), 
      param1, param2), 
    data);
}

The heart of the method is the call to boost::bind() which wraps the asynchronous method as a boost::function. In the case of DoWork(), the direct call

myInterface->sendc_DoWork(replyHandler.in(), 1000, false)

becomes:

boost::bind(sendc_DoWork, myInterface, 
  CreateReplyHandler<...>(poa, data), 1000, false)

as the first parameter to boost::bind() is a method to call, the second parameter an instance of an object to call it on, and the remaining parameters those to the method being called.

With the Future<T> infrastructure in place, we only need to create a method handler which stores the method return into a FutureData<T> object. One way to do this is to create a base class that stores the FutureData<T> and provides empty methods for all of the methods in the CORBA interface, and then to create a specific subclass to handle the method that is to be used with a future. This keeps the support code to a minimum, and may even lend itself to automatic code generation. For DoWork(), we do the following:

//Client_Future/Client_Future.cpp
template <typename T>
class InterfaceHandlerBase : public virtual 
  POA_MyModule::AMI_MyInterfaceHandler 
{
protected:
  boost::shared_ptr<FutureData<T> > data_;
public:
  InterfaceHandlerBase(boost::shared_ptr<FutureData<T> > data) : 
    data_(data) {}
  boost::shared_ptr<FutureData<T> > GetData() const { return data_; }
 
  virtual void DoWork(CORBA::Long ami_return_val) {}
  virtual void DoWork_excep(
    ::Messaging::ExceptionHolder* excep_holder) {}
  // ...
};
 
 
class DoWorkHandler : public virtual InterfaceHandlerBase<CORBA::Long> 
{
public:
  DoWorkHandler(boost::shared_ptr<FutureData<CORBA::Long> > data) : 
    InterfaceHandlerBase<CORBA::Long>(data) {}
  virtual void DoWork(CORBA::Long ami_return_val) 
    { data_->SetResult(ami_return_val); }
  virtual void DoWork_excep(::Messaging::ExceptionHolder* excep_holder) 
    { data_->SetException(excep_holder); }
};

We can now use futures with DoWork().

//Client_Future/Client_Future.cpp
void DoWorkTest(CORBA::ORB_ptr orb, PortableServer::POA_ptr poa, 
  int num_servers) 
{
  try {
    std::cout << "DoWorkTest" << std::endl;

    // Call methods
    std::vector<Future<CORBA::Long> > futures;
    for (int i=0; i<num_servers; i++) {
      MyModule::MyInterface_var myInterface = GetReference(orb, i);
      futures.push_back(CreateFuture<CORBA::Long, DoWorkHandler>(
        orb, poa, myInterface.in(), 
        &MyModule::MyInterface::sendc_DoWork, 1000, false));
    }

    // Loop over Future objects and display results
    for (std::vector<Future<CORBA::Long> >::iterator it = 
      futures.begin(); it != futures.end(); it++) 
    {
      std::cout << "Rank: " << *it << std::endl;
    }
  }
  catch (MyModule::MyException& ex) {
    std::cerr << "MyException exception: " << ex.what << std::endl;
  }
  catch (CORBA::Exception& ex) {
    std::cerr << "CORBA exception: " << ex << std::endl;
  }
}

Instead of storing results directly in the std::vector<>, Future<CORBA::Long> objects are stored, and, this call in the synchronous version:

results.push_back(myInterface->DoWork(1000, false));

becomes this in the futures version:

futures.push_back(CreateFuture<CORBA::Long, DoWorkHandler>(
  orb, poa, myInterface.in(), 
  &MyModule::MyInterface::sendc_DoWork, 1000, false));

Simple, synchronous-looking code is once again written, but the performance is still that of the AMI version, as shown by executing Tests/run_test_future.pl:

DoWorkTest
Rank: 0
Rank: 1
Rank: 2
Rank: 3
Rank: 4
Elapsed: 1020 ms

Supporting additional methods of MyInterface is also now trivial. If the method DoMoreWork() is added to the interface,

// IDL/MyInterface.idl
string DoMoreWork(in long delay_ms, in boolean throw_exception, 
  out long val) raises(MyException);

where the method return is now a CORBA string, and an out parameter is added, the only changes to the code needed to support it are to add empty methods to InterfaceHandlerBase so DoWorkHandler will still compile,

virtual void DoMoreWork(const char *ami_return_val, 
  CORBA::Long val) {}
virtual void DoMoreWork_excep(
  ::Messaging::ExceptionHolder *excep_holder) {}

and to add a handler, DoMoreWorkHandler:

//Client_Future/Client_Future.cpp
class DoMoreWorkHandler : public virtual InterfaceHandlerBase<
  std::pair<std::string, CORBA::Long> > {
public:
  DoMoreWorkHandler(boost::shared_ptr<FutureData<
    std::pair<std::string, CORBA::Long> > > data) : 
    InterfaceHandlerBase<std::pair<std::string, CORBA::Long> >(data) {}
  virtual void DoMoreWork(const char * ami_return_val, CORBA::Long val) 
    { data_->SetResult(std::pair<std::string, 
      CORBA::Long>(ami_return_val, val)); }
  virtual void DoMoreWork_excep(
    ::Messaging::ExceptionHolder* excep_holder) 
    { data_->SetException(excep_holder); }
};

Here, the value stored by the future is std::pair<std::string,CORBA::Long>, to capture both the returned string and out parameter. Invoking DoMoreWork() is analogous to that of DoWork(), so even a more complex return from the asynchronous method can still be processed easily.

//Client_Future/Client_Future.cpp
void DoMoreWorkTest(CORBA::ORB_ptr orb, PortableServer::POA_ptr poa, 
  int num_servers) 
{
  try {
    std::cout << "DoMoreWorkTest" << std::endl;

    // Call methods
    std::vector<Future<std::pair<std::string, CORBA::Long> > > futures;
    for (int i=0; i<num_servers; i++) {
      MyModule::MyInterface_var myInterface = GetReference(orb, i);
      futures.push_back(
        CreateFuture<std::pair<std::string, CORBA::Long>, 
        DoMoreWorkHandler>(orb, poa, myInterface.in(), 
        &MyModule::MyInterface::sendc_DoMoreWork, 1000, false));
    }

    // Loop over Future objects and display results
    for (std::vector<Future<
      std::pair<std::string, CORBA::Long> > >::iterator it = 
      futures.begin(); it != futures.end(); it++) 
    {
      std::pair<std::string, CORBA::Long> r = *it;
      std::cout << "Rank: " << r.first << " " << r.second << std::endl;
    }
  }
  catch (MyModule::MyException& ex) {
    std::cerr << "MyException exception: " << ex.what << std::endl;
  }
  catch (CORBA::Exception& ex) {
    std::cerr << "CORBA exception: " << ex << std::endl;
  }
}

Composing Futures

One item to keep in mind when using futures is how to compose them, and retain parallelism. For clarity of discussion, let this helper function create a call to DoWork():

Future<CORBA::Long> f(CORBA::ORB_ptr orb, PortableServer::POA_ptr poa, 
  int i, int delay_ms) 
{
  return CreateFuture<CORBA::Long, DoWorkHandler>(orb, poa, 
      GetReference(poa->_get_orb(), i).in(), 
      &MyModule::MyInterface::sendc_DoWork, delay_ms, false);
}

Recall that as DoWork() returns the rank number of the server that the method is invoked upon, the above produces a value equal to the parameter i, e.g., f(orb, poa, 3, ms) = 3.

If we now execute

CORBA::Long result = f(orb, poa, 1, 3000) + 
  f(orb, poa, 2, f(orb, poa, 4, 1000) * 1000);

the correct value 3 is calculated, but it takes 8 seconds to do it. This occurs because the above expression is evaluated as:

a = f(orb, poa, 1, 3000).Get();
b = f(orb, poa, 4, 1000).Get();
c = f(orb, poa, 2, b * 1000).Get();
CORBA::Long result = a + c;

So, as the value of a is needed, 3 seconds elapse to calculate it. The value of b is then requested. It takes 1 second to calculate b, and 4 seconds to calculate c, so the time elapsed is sequential: 3 + (1 + 4) = 8.

If the expression was written as:

Future<CORBA::Long> f1 = f(orb, poa, 1, 3000);
Future<CORBA::Long> f2 = f(orb, poa, 2, f(orb, poa, 4, 1000) * 1000);
CORBA::Long result = f1 + f2;

then the futures f1 and f2 will be executing concurrently, as their Get() methods are not called until the calculation of result when their values are needed as operands for the add. While it takes 3 seconds to calculate f1 and 5 seconds to calculate f2, the overall time is 5 seconds as the two calculations execute in parallel.

Conclusion

Concurrent execution of tasks can improve overall application performance and responsiveness. However, implementing concurrency can increase the complexity of application code. We have shown that futures can be used to manage this complexity. In our example, we encapsulated CORBA Asynchronous Method Invocation (AMI) into a future object with the help of Boost and template metaprogramming. The same technique can be used with other types of asynchronous operations, as well. The resulting application code, from the perspective of the caller, is nearly as simple as synchronous method invocation, yet has the improved performance that concurrency brings.

References

[1] Futures and promises
http://en.wikipedia.org/wiki/Futures_and_promises

[2] pythonfutures
http://code.google.com/p/pythonfutures/

[3] Deferred Reference
http://twistedmatrix.com/documents/current/core/howto/defer.html

[4] promise
http://rubygems.org/gems/promise

[5] java.util.concurrent, Interface Future
http://download.oracle.com/javase/1.5.0/docs/api/java/util/concurrent/Future.html

[6] Worker threads returning futures
http://www.boostcookbook.com/Recipe:/1234841

[7] Boost.Future
http://braddock.com/~braddock/future/

[8] Schmidt, Vinoski. Programming Asynchronous Method Invocations with CORBA Messaging
http://www.cs.wustl.edu/~schmidt/PDF/C++-report-col16.pdf

[9] TAO Developer's Guide for OCI TAO 1.6a, sec 6.2, AMI Callback Model
http://www.theaceorb.com/product/index.html#doc1.6a

[10] Boost.Function
http://www.boost.org/doc/libs/1_44_0/doc/html/function.html

secret