Look out, Spark and Storm, here comes Apache Apex

A new open source streaming analytics solution derived from DataTorrent's RTS platform, Apex offers blazing speed and simplified programmability. Let's give it a spin

Look out, Spark and Storm, here comes Apache Apex
Thinkstock

When you think of streaming analytics, very likely Spark comes to mind. Even though Spark is a quasi-streaming offering, the Structured Streaming functionality introduced in Spark 2.0 is a big step up. You might also think about Storm, a true streaming solution that in version 1.0 is trying to buck its reputation for being hard to use.

Add Apache Apex, which debuted in June 2015, to your list of stream processing possibilities. Apex comes from DataTorrent and its impressive RTS platform, which includes a core processing engine; a suite of dashboarding, ingesting, and monitoring tools; and dtAssemble, a graphical flow-based programming system aimed at data scientists.

RTS may not have the buzz of Spark, but it has seen production deployments at GE and Capital One. It has demonstrated that it can scale to billions of events per second and respond to events in less than 20ms.

Apex, the processing engine at the core of their RTS platform, is DataTorrent's submission to Apache. Apex is designed to run in your existing Hadoop ecosystem, using YARN to scale up or down as required and leveraging HDFS for fault tolerance. Although it doesn’t provide all the bells and whistles of the full RTS platform, Apex delivers the major functionality you’d expect from a data processing platform.

A sample Apex application

Let's look at a very basic Apex pipeline to examine some of the core concepts. In this example, I'll read logging lines from Kafka, taking a count of the types of log lines seen and writing the counts out to the console. I'll include code snippets here, but you can also find the complete application on GitHub.

Apex's core concept is the operator, which is a Java class that implements methods receiving input and generating output. (If you know Storm, you know they're similar in concept to bolts and spouts.) In addition, each operator defines a set of ports for either input or output of data. The methods will either read input from an InputPort or send data downstream through an OutportPort.

The flow of data through an operator is modeled by breaking the stream down into time-based windows of data, but unlike Spark's microbatching, processing the input data does not have to wait until the end of the window.

apache apex operators DataTorrent

In the example below, we need three operators, each of which corresponds to the three types of operator Apex supports: an input operator for reading lines from Kafka, a generic operator for counting the logging types, and an output operator for writing to the console. For the first and last, we can turn to Apex's Malhar library, but we need to implement our custom business logic for counting the different types of logging we're seeing.

Here's the code that implements our LogCounterOperator:

public class LogCounterOperator extends BaseOperator {

private HashMap<String, Integer> counter;

public transient DefaultInputPort<String> input = new DefaultInputPort<String>() {

    @Override

    public void process(String text) {

     String type = text.substring(0, text.indexOf(' '));

     Integer currentCounter = counter.getOrDefault(type, 0);

     counter.put(type, currentCounter+1);

    }

      };

public transient DefaultOutputPort<Map<String, Integer>> output = new DefaultOutputPort<>();

@Override

public void endWindow() {

      output.emit(counter);

}

@Override

public void setup(OperatorContext context){

      counter = new HashMap();

}

}

We're using a simple HashMap for counting our types of log, and we define two ports on handling data flowing through the operator: one for input, and one for output. As these are typed, trying to fit incompatible operators will be a compile-time failure rather than something you find out after deployment. Note that although I've only defined one input and one output port here, it's possible to have multiple inputs and outputs.

The lifecycle of a Generic Operator is simple. Apex will first call setup() for any needed initialization; in the above example, setup() handles the creation of the HashMap. It will then call beginWindow() to indicate that a new window/batch of input processing is beginning, then call process() on every item of data that flows through the operator during the window. When there’s no more time left in the current window, Apex calls endWindow(). We don't need any per-window logic, so we leave ourselves with the empty beginWindow() definition that you can find in the abstract BaseOperator. However, at the end of every window, we want to send out our current counts, so we emit the HashMap through the outport port.

Meanwhile, the overridden process() method handles our business logic of taking the first word from the log line and updating our counters. Finally, we have a teardown() method that is called when Apex brings down the pipeline for any clean-up that may be needed -- in this case, we don't need to do anything, but I've cleared the HashMap as an example.

Having constructed our operator, we can now construct the pipeline itself. Again, if you have experience of making a Storm topology, you'll be right at home with this piece of code:

public void populateDAG(DAG dag, Configuration conf) {

      KafkaSinglePortStringInputOperator kafkaInput = dag.addOperator("KafkaInput", new KafkaSinglePortStringInputOperator());

      kafkaInput.setIdempotentStorageManager(new IdempotentStorageManager.FSIdempotentStorageManager());

      LogCounterOperator logCounter = dag.addOperator("LogCounterOperator", new LogCounterOperator());

      ConsoleOutputOperator console = dag.addOperator("Console", new ConsoleOutputOperator());

  

      dag.addStream("LogLines", kafkaInput.outputPort, logCounter.input);

      dag.addStream("Console", logCounter.output, console.input);

}

First, we define the nodes of our DAG -- the operators. Then we define the edges of the graph ("streams" in Apex parlance). These streams connect an outport port of an operator to an input port of another operator. Here we connect Kafka to LogCounterOperator and connect the outport port to ConsoleOutputOperator. That's it! If we compile and run the application, we can see the HashMap printed to standard output:

{INFO=1}

{ERROR=1, INFO=1}

{ERROR=1, INFO=2}

{ERROR=1, INFO=2, DEBUG=1}

And so on.

Malhar: A box of useful bricks

The great thing about operators is that they are small, well-defined bits of code, so they're easy to construct and test. They snap together like Lego bricks, with the slight difference that you don't normally have to make your own Lego bricks.

Enter Malhar, essentially a giant bucket of Lego that includes everything from your standard 2-by-4 to that up-down bit you "just need" from time to time. Do you need to read from Splunk, merge that information with text files stored on an FTP site, then store the result in HBase? Malhar has you covered.

Thus, Apex is really appealing to work with because Malhar comes with such an array of included operators that you often only have to worry about your business logic. Sometimes the documentation on the Malhar operators is a bit sparse, but almost everything in the repository has a brace of tests, so you can see how they work with a little effort.

Apex has a few more tricks up its sleeve, too. Along with the usual assortment of metrics and reporting, the dtCli application allows you to dynamically change a submitted application at runtime. Did you want to add a set of operators that write the logging lines to HDFS without bringing your entire application down? You can do that with Apex, unlike most other DAG-based systems available today.

The world of open source data stream processing engines is crowded, but Apex is a formidable entrant. With the Malhar library providing an impressive array of connectors, and Apex itself providing a stable base of fault tolerance, low latency, and scalability, it's easy to get up to speed and be productive with the framework. One caveat is that the operator concept is a little closer to the nuts and bolts of processing instead of Flink and Spark's higher-level constructs.

I would suggest that DataTorrent would be wise to implement an Apex runner for Apache Beam to make it easier for developers to port their application from existing frameworks. Nonetheless, I’d definitely recommend giving Apex a whirl when you evaluate streaming data processing engines.

Copyright © 2016 IDG Communications, Inc.