Complex Event Processing with Esper

Complex Event Processing with Esper

By Paul Jensen, OCI Partner

October 2008


Introduction

Automated analysis of real-time data plays an increasingly important role in our society. Real-time or near real-time systems in a variety of domains demand the ability to extract meaning from potentially disparate events from a variety of sources. The financial domain requires such processing in the areas of fraud detection, algorithmic trading, and risk management. Network management requires intelligent system monitoring to detect intrusions and error conditions. As a general case, any reasonably complicated modern system involves a variety of interworking processes and related events. Leveraging the underlying value of these events can provide competitive advantage and early recognition of hazardous conditions.

While systems performing event processing in the vein of that outlined above have long been in existence, over the past several years various products have emerged, providing an infrastructure upon which such systems may be based. Such products are grouped under the category of CEP (Complex Event Processing) engines. CEP implies the ability to process multiple event streams, recognize significant events, and deliver data derived from one or multiple streams when the event is triggered. Typically, CEP products provide a query language which supports pattern matching, joining events on arbitrary criteria and creating time-based windows.

Often the distinction is made between Event Stream Processing (ESP), which processes an ordered stream of events, and CEP, which processes a number of streams. The former focuses on high-speed processing and mathematical algorithms, while the latter focuses on detecting patterns of events. However, as many products (Esper included) provide both capabilities, we will use the CEP acronym exclusively here. Formal definitions of terminology can be found in the CEP Glossary.

To demonstrate a usage of CEP, we present some potential aspects of intelligent order routing and CEP-based solutions. While this example is in the context of a stock trading system, similar rules and processing could be applicable to any large volume system (e.g. an online retailer). The example is implemented using the open-source product Esper. Esper offers a mature feature set and is licensed under GPLv2. Additional functionality is available under a commercial license.

Esper Basics

Esper implements some common ESP/CEP concepts. Following CEP glossary terminology, events are sent on event channels. Users may specify arbitrary Event Processing Language (EPL) statements which process the events. While EPL is a generally accepted CEP acronym, no standard language syntax exists. The EPL shown here is therefore Esper-specific. The following code sample illustrates simple initialization of Esper and publication of an event.

  1. import com.espertech.esper.client.Configuration;
  2. import com.espertech.esper.client.EPAdministrator;
  3. import com.espertech.esper.client.EPRuntime;
  4. import com.espertech.esper.client.EPServiceProvider;
  5.  
  6. Configuration configuration = new Configuration();
  7. configuration.addEventTypeAlias("TheOrder", Order.class.getName());
  8. EPServiceProvider provider =
  9. EPServiceProviderManager.getProvider("OrderRouting",configuration);
  10. EPRuntime runtime = provider.getEPRuntime();
  11. // Create order with id, symbol, price, and quantity
  12. Order order = new Order(1, "AAPL", 100.0f, 10);
  13. runtime.sendEvent(order);

The Esper Configuration class provides the capability of defining aliases for event classes to be used in EPL statements. For our examples, "TheOrder" will be used to represent Order objects. (Unfortunately "order" is a keyword in EPL and no escape mechanism exists.) EPServiceProviderManager.getProvider() creates an instance of an Esper engine. From the engine, we can access the EPRuntime to send events.

As events represent a record of an occurrence at a point in time, the event should be immutable. Esper supports a variety of event representations, including POJOs, Maps, and XML DOM objects. While the following examples only utilize POJOs, the discussions are equally applicable to the other representations. Similar to JSP Expression Language (EL) , EPL may reference simple, indexed, mapped, and nested properties using a dotted notation.

EPL statements are created via the EPAdministrator and have a syntax very similar to SQL with streams replacing tables and events replacing rows. EPL provides many of the features of SQL including joins, ordering, grouping, and views. Here is the complete EPL syntax:

  1. [INSERT INTO insert_into_def]
  2. SELECT select_list
  3. FROM stream_def [AS name] [, stream_def [AS name]] [,...]
  4. [WHERE search_conditions]
  5. [GROUP BY grouping_expression_list]
  6. [HAVING grouping_search_conditions]
  7. [output output_specification]
  8. [ORDER BY order_by_expression_list]
  9. [LIMIT num_rows]

While a SQL statement follows a request/response model, an EPL statement typically executes repeatedly based on the arrival of events or triggering of timers. EPL statements start executing immediately upon being created. For example, the following code creates an EPL statement which publishes an event whenever an Order is sent for security symbol 'A'.

  1. EPAdministrator admin = provider.getEPAdministrator();
  2. EPStatement epStmt = admin.createEPL(
  3. "select o from TheOrder as o where o.symbol = 'A'");

These events may be received by one or more listeners which extend the UpdateListener interface:

  1. epStmt.addListener(
  2. NEW UpdateListener() {
  3. @Override
  4. public void UPDATE(EventBean[] newEvents, EventBean[] oldEvents) {
  5. ORDER ORDER = (ORDER)newEvents[0].GET("o");
  6. System.OUT.println("Received order " + ORDER.getId());
  7. }
  8. });

Why so many arguments to update()? An EventBean is essentially a Map containing all the fields in the select of the EPL. In general, statements may result in multiple result rows as reflected in the use of an array of EventBeans for newEvents. The oldEvents parameter can be used to communicate removal of events from window views or old values of aggregrate expressions (covered below). Typically, this parameter is null.

Temporal Event Support

Esper provides the capability to deal with multiple events in a temporal fashion. While the above example shows the invocation of a callback immediately based on a single event, Esper can correlate and aggregate events over time by maintaining explicit or implicit windows. As part of an intelligent order routing decision, we may wish to track the latency of placing an order and route to the venue with the lowest latency. For the sake of this example, we introduce an OrderSubmittedEvent and OrderAcceptedEvent and calculate the delay between them.

  1. SELECT a.exchange.name AS exchangeName,
  2. a.TIMESTAMP.TIME - s.TIMESTAMP.TIME AS latency
  3. FROM pattern
  4. [every s=SubmitOrderEvent -> a=OrderAcceptedEvent(theOrder.id=s.theOrder.id)]

Let's break this down. Esper provides a rich language for expressing patterns which can be triggered an arbitrary number of times and maintain needed state for the duration of their lifetime. When a pattern expression is fulfilled, it creates an event. Here, the engine is informed to start a pattern match on a SubmitOrderEvent (called "s") "every" time it sees one and looking for a subsequent (->) OrderAcceptedEvent (called "a") which has the same order id. When this occurs, the pattern fires and causes the evaluation of the remainder of the statement, sending the latest latency information for the related exchange.

While latency for only the most recent order is somewhat useful, some statistical measure for a time period would likely be more representative. This introduces Esper's support for Window Views. As with a SQL view, an Esper view is a collection of data derived from underlying data. Data windows, or data window views, are views that retain incoming events until an expiry policy (e.g. time, length) indicates to release events. For this example, we will employ the win:time data window view to maintain a list of SubmitOrderEvent/OrderAcceptedEvent pairs which occurred within the last hour. Then a simple use of the familiar SQL group by and avg() provides the desired output.

  1. SELECT a.exchange.name AS exchangeName,
  2. avg(a.TIMESTAMP.TIME - s.TIMESTAMP.TIME) AS latency
  3. FROM pattern
  4. [every s=SubmitOrderEvent -> a=OrderAcceptedEvent(theOrder.id=s.theOrder.id)].win:TIME(1 HOUR)
  5. GROUP BY a.exchange.name

The key aspect of the above statement is the appended .win:time(1 hour). This causes the submitted and accepted event captured each time the pattern triggers to be placed in a window view for a period of 1 hour and then removed from the view. This statement will result in invocation of a callback with updated latency values on its listeners whenever a pattern match event enters or leaves the time window.

We are also interested if an order has not been accepted for any reason. The following statement will trigger if the order is not accepted within 30 seconds of being submitted.

  1. SELECT s
  2. FROM pattern [every s=SubmitOrderEvent ->
  3. (timer:INTERVAL(30 sec) AND NOT OrderAcceptedEvent(theOrder.id=s.theOrder.id))]

When employed in a simple select such as this, patterns may be defined and executed independently of an EPLStatement, so the above could be implemented as the following:

  1. EPStatement pattern = admin.createPattern(
  2. "every s=SubmitOrderEvent ->
  3. (timer:interval(30 sec) and not OrderAcceptedEvent(theOrder.id=s.theOrder.id))");

Named Windows

OK, we've got some criteria to act on now, but where is it being stored? Certainly we can take the data we receive in update and store it ourselves, but wouldn't it be nice if Esper could store it for us? Named windows address this issue. Named windows are global data windows and can be referenced in EPL statements similarly to event streams. Named windows must be explicitly created and deleted via EPL statements. Creating a named window to hold the output of our latency calculations:

  1. EPStatement namedWindowStmt = admin.createEPL(
  2. "create window ExchangeLatency.std:unique(exchangeName) (exchangeName String, latency double)");

A named window definition must include its name (ExchangeLatency), a data window view (std:unique) and column names and types. The std:unique view restricts the view to only one entry for each unique value of exchangeName.

Now we change our previous statement to store updates in the named window.

  1. INSERT INTO ExchangeLatency
  2. SELECT a.exchange.name AS exchangeName, avg(a.TIMESTAMP.TIME - s.TIMESTAMP.TIME) AS latency
  3. FROM pattern [every s=SubmitOrderEvent -> a=OrderAcceptedEvent(theOrder.id=s.theOrder.id)].win:TIME(1 HOUR)
  4. GROUP BY a.exchange.name

Listeners may be attached to the named window just as with any EPStatement. Callbacks will be invoked whenever events are inserted into the window.

Integrating non-event based data

Systems frequently require a mixture of real time and non real time data in order to perform a calculation. Our order routing example may restrict the destination of the order based on fairly static attributes of a venue. Some venues may not offer a given security, may charge higher rates in some cases, or may have other restrictions which apply. Another possibility is that orders may be internally rejected and not routed to any destination. Reasons for such a rejection may be internally mandated trading limits or SEC restrictions (e.g. a ban on naked short selling).

Esper provides a few mechanisms to incorporate external data by other means than sending them as events - variables, methods, and SQL queries.

Esper variables may be included in EPL and their values changed via the EPRuntime.setVariableValue() method. They may also be modified based on an EPL statement.

EPL supports calling methods on events (and objects nested within events) as well as static methods. We'll continue to add functionality to our order routing by creating a static method which returns the eligible exchanges for a ticker symbol:

  1. public class InstrumentRepository {
  2. public static Exchange[] getExchangesForSymbol(String symbol) {
  3. ...
  4. }
  5. }

The following statement invokes the static method, passing data from an event property as a parameter, and returns the order along with its eligible exchanges. This event data could be inserted into a named window as shown above and additional downstream processing can incorporate other data or events.

  1. SELECT ord, eligibleExchanges FROM TheOrder AS ord,
  2. method:com.ociweb.jnb.esper.InstrumentRepository.getExchangesForSymbol(ord.symbol)

Relational database information may be accessed via SQL from within an EPL statement. These queries are treated in most respects like a standard event stream.

Other Esper Capabilities

Esper possesses many capabilities not covered in this article, including:

The Esper site includes excellent documentation and many examples. A benchmark kit is also provided. Esper can reportedly process over 500,000 events/second with 1000 statements. A .NET version, NEsper, is also available.

Summary

Esper and CEP are complex topics and this article has only provided a brief introduction to both. CEP technology is in a formative state and much work remains in determining its applicability to various problem spaces and defining patterns of usage. As any new technology, many questions exist. How does CEP differ from a rules engine? How does state management relate to CEP? CEP standardization is also lacking. For example, while many products use a variation of SQL to process events no standard exists for such expressions.

CEP is arguably immature, but many companies, including major vendors such as IBM and Tibco, are aggressively driving the technology forward. While a limited number of case studies are available (often due to privacy concerns), various industries have provided positive feedback on their experiences. Given its momentum and potential, CEP is worth watching and a technology to consider adding to your toolbox.

References