Monday, March 9, 2015

Delta Architectures: Unifying the Lambda Architecture and leveraging Storm from Hadoop/REST


Recently, I've been asked by a bunch of people to go into more detail on the Druid/Storm integration that I wrote for our book: Storm Blueprints for Distributed Real-time Computation.  Druid is great. Storm is great. And the two together appear to solve the real-time dimensional query/aggregations problem.

In fact, it looks like people are taking it mainstream, calling it the RAD Stack, and adding the "Lambda Architecture" label. Honestly though, there may be a better way.  Lamda Architectures make the following supposition that has always bothered me.

From Nathan's article on Lambda Architectures:
Computing arbitrary functions on an arbitrary dataset in real time is a daunting problem. There is no single tool that provides a complete solution. Instead, you have to use a variety of tools and techniques to build a complete Big Data system.

The lambda architecture solves the problem of computing arbitrary functions on arbitrary data in real time by decomposing the problem into three layers: the batch layer, the serving layer, and the speed layer.

This advice has lead most people to deploy separate infrastructure/frameworks for batch, speed/processing, and query, which is good because it allows you to "use the right tool for each job".  And that has lead to things like the "RAD Stack".   People select a technology for each layer.  (e.g. Speed = Storm, Batch = Hadoop, and Serving = Impala)

But if you have lived in environments like these, they require an awful lot of resources because there is very little re-use across the systems.  More and more, I believe people are starting to question the distinction between layers. Others are proposing a Unified Lambda Architecture.

And lately, I've found myself in the camp of the unificationists...

At HMS, we've been iterating on our Lambda architecture for a few years now.  We have Storm, Hadoop and a real-time Web Services layer.  Each of these functions as a data ingestion mechanism.

They all process the same kinds of data, and only differ by interface, capacity and client-side expectations:
  • Transactional Processing:
    • Our transactional processing is our web services layer.  (We still use and love dropwizard)  In this scenario, the client expects that the data is ingested and integrated into analytics within a very short time period (e.g. seconds).  Capacity must always match or exceed demand, or else the system is considered "down".

  • Stream/Queue-based Processing
    • Often, we find ourselves leaning more on our transactional processing capabilities.  More and more clients are demanding real-time integrations, which means web services API integrations.  If that is the case, and transactions are continuous, then there is only a small distinction between stream processing and "transactional processing".  However, the distinction is important.  First, with our "stream processing" we introduce a queue.  With the queue in place, capacity need not always exceed demand.  The queue can capture over-run, and the system will work it off later.  Clients tolerate a delay (e.g. minutes) in data/analytics availability, but the system is more tolerant of downstream architectural issues with availability.  Since data is logged to a queue, the system tolerates disruptions in persistence layer(s).

  • Batch Processing
    • For batch processing, client expectations are lowered even further.  It is often hours until the client expects their data to be available.  Additionally with batch, there is a functional difference.  There is an end.  With streams and transactional input, it is an infinite set of data.  However, for batch, we often want to know the status of processing a set of data.  If you try to use Stream processing for batch interactions, then you need to build in the concept of a "set", and somehow add the ability to track status.  (initially, to avoid having to maintain separate infrastructure for Hadoop, we did this on top of storm... painfully)
Like many others, we found ourselves needing to support all of these paradigms.  Quite literally, we were rewriting code across the different frameworks/systems, which caused major pain when those implementations differed (even slightly).  Numbers didn' line up, etc.

We were forced to come up with a solution, and collapse the systems a bit.  

We looked at DRPC with Storm, and considered calling Storm from our web services tier, but DRPC seemed clunky and under supported.  Also, it seemed unwise to call DRPC from Hadoop.  (has anyone tried this?)

Instead, we decided to lock in on an abstraction for persistence.  We looked around,at ORM's and DAO patterns, but most did not support the concept of micro-batching, which is an abstraction we wanted the option to leverage across the different processing mechanisms.  In the end, we decided to leverage the Storm/Trident State abstraction as a universal mechanism for persistence.  We built out storm-cassandra-cql, and embedded it in our web services and in Hadoop.

From both Hadoop and our web services, we instantiate our own Tuples, which implement the Storm Tuple interface.  From there, we can use the State abstraction, and re-use Mappers, to ensure a consistent data model across all three processing paradigms.

From Hadoop, as a shortcut, we used the State object directly from the reduce phase, setting the output format to NullOutputFormat.  Ideally, we probably should have implemented a new OutputFormat that was StormCassandraCqlFormat or something, but I'm not sure that would have bought us much.

For the web services, the immediate integration was straight-forward.  Convert the JSON to a Tuple, call update() on the StateUpdater, then call commit() on the State object.  But we also wanted to be able to batch, and perform dimensional aggregations prior to committing to "deep storage". This introduced a problem, we would have data that was acknowledge (200 response code), but not yet persisted.  Not good.  In the event of a node failure, we would lose data.  Really not good.

So, what was the solution?  We could have integrated Druid, but instead we decided to keep it lightweight, and... leverage Storm as our safety net!

Consider the following "traditional" interpretation of the Lambda Architecture:


In this traditional approach, the batch layer (Hadoop) is often used to "correct" errors in processing introduced in the speed layer (Storm).  Hadoop is the safety net, correcting numbers (typically via overnight batch jobs)  We decided to flip that model, and use Storm as our safety net, with this approach:



In this case, we use the embedded State object to aggregate data across a batch, but we also write to a Kafka queue for persistence before we acknowledge the HTTP request.  The sequence diagram looks like this:


We persist the event to a queue, update the Trident State object, and *then* return a 200. Then, periodically, we flush the State to storage. (Cassandra in this case)  It is okay if we drop a node, because Storm will eventually (re)process the event and (re)incorporate the data if need be.  (and here is where I'm going to gloss over some really important details -- to be addressed in my next post)

The point being... we've begun to collapse our layers, starting with persistence.  We are re-using the Trident State abstraction from both Hadoop and Web Services, and we've moved Storm into a "re-processing / safety net" layer, which was previously filled by Hadoop/Batch processing.

For lack of a better term, we've been calling this a Delta Architecture because the whole system is focused on incremental updates to state, made from any and all processing paradigms.

Hopefully, this gets people thinking.  In my next post, I'll explain how you can use the same architecture to deliver dimensional aggregations (like Druid), without incorporating Druid directly.

We also have open questions --
Can we execute an embedded topology!?
Does it make sense to do so?

For more detail, have a look at the presentation I did at the Storm NYC meetup, Data Pipelines and Improving on the Lambda Architecture.

I fully appreciate that much of Lambda is a matter of perspective.  And FWIW -- this is mine (currently -- and subject to change =).   And thanks to Nathan for articulating the concept of a Lambda architecture, materializing the "Big Data" view has given people a common vernacular with which to discuss solutions to some really hard problems.

fwiw.


Wednesday, March 4, 2015

Getting started with Cassandra Development in Eclipse (BEWARE of jdk8_40_ea, NoClassDefFoundError: ExtendedPlatformComponent)

I finally got back around to getting my environment setup for Cassandra development.  I ran into one snag, and couple things have changed so I figured I would capture the experience here.

Fork and Build

First, fork and clone from here:
https://github.com/apache/cassandra

Then run,

$> ant generate-eclipse-files

Eclipse Setup & Run Configuration

Then from Eclipse,
File -> Import -> Existing Projects into Workspace...

Navigate to your cassandra folder, and import the project.

Now, configure the run with:

Run Configurations -> Java Application -> Right-Click -> New

Main class:
org.apache.cassandra.service.CassandraDaemon

Then use the following VM arguments:
-Dcassandra.config=file:///Users/bone/git/boneill42/cassandra/conf/cassandra.yaml -Dcassandra.storagedir=/tmp/cassandra -Dcassandra-foreground=yes

The gotcha: Make sure you are running jdk8_40 and NOT jdk8_40_ea!!!

If you are still running early access (_ea), you may see:
java.lang.NoClassDefFoundError: sun/management/ExtendedPlatformComponent

Logging

Next, it's useful to configure logging.  Cassandra changed from log4j to logback.  You can find the logback configuration in test/conf (logback-test.xml).

If you want to see DEBUG and/or INFO messages, edit this file and change the STDOUT element to:

  <appender name="STDOUT" target="System.out" class="ch.qos.logback.core.ConsoleAppender">
...
      <level>INFO</level>
...
  </appender>


After that, you are good to go.  Click run, and you should have a live Cassandra in Eclipse