Tuesday, March 25, 2014

Shark on Cassandra (w/ Cash) : Interrogating cached data from C* using HiveQL

As promised, here is part deux of the Spark/Shark on Cassandra series.

In the previous post, we got up and running with Spark on Cassandra.   Spark gave us a way to report off of data stored in Cassandra.  It was an improvement over MR/Hadoop, but we were still left articulating our operations in Java code.  Shark provides an integration layer between Hive and Spark, which allows us to articulate operations in HiveQL at the shark prompt.  This enables a non-developer community to explore and analyze data in Cassandra.

Setup a Spark Cluster

Before we jump to Shark, let's get a Spark cluster going.  To start a spark cluster, first start the master server with:
$SPARK_HOME> bin/start-master.sh
starting org.apache.spark.deploy.master.Master, logging to /Users/bone/tools/spark-0.8.1-incubating-bin-hadoop2/bin/../logs/spark-bone-org.apache.spark.deploy.master.Master-1-zen.local.out

To ensure the master started properly, tail the logs:
14/03/25 19:54:42 INFO ActorSystemImpl: RemoteServerStarted@akka://sparkMaster@zen.local:7077
14/03/25 19:54:42 INFO Master: Starting Spark master at spark://zen.local:7077
14/03/25 19:54:42 INFO MasterWebUI: Started Master web UI at http://10.0.0.5:8080

In the log output, you will see the master Spark URL (e.g. spark://zen.local:7077).  You will also see the URL for the web UI.  Cut and paste that URL into a browser and have a look at the UI. You'll notice that no workers are available.  So, let's start one:
$SPARK_HOME> bin/start-slaves.sh
Password:
localhost: starting org.apache.spark.deploy.worker.Worker, logging to /Users/bone/tools/spark-0.8.1-incubating-bin-hadoop2/bin/../logs/spark-bone-org.apache.spark.deploy.worker.Worker-1-zen.local.out

Again, tail the logs.  You should see the worker successfully register with the master.  You should also see the worker show up in the web UI.  And now, we are ready to get moving with Shark.

Setup Shark

First, download Shark and Hive.  I used shark-0.8.1-bin-hadoop1.tgz and hive-0.9.0-bin.tgz. Untar each of those.  In the $SHARK_HOME/conf directory, copy the shark-env.sh.template file to shark-env.sh and edit the file.  Ensure the settings are configured properly.  For example:
export SPARK_MEM=4g
export SHARK_MASTER_MEM=4g
export SCALA_HOME="/Users/bone/tools/scala-2.9.3"
export HIVE_HOME="/Users/bone/tools/hive-0.9.0-bin"
export SPARK_HOME="/Users/bone/tools/spark-0.8.1-incubating-bin-hadoop2"
export MASTER="spark://zen.local:7077"
Note that the MASTER variable is set to the master URL from the spark cluster.  Make sure that the HADOOP_HOME variable is *NOT* set.  Shark can operate directly on Spark. (you need not have Hadoop deployed)

As with Spark, we are going to use an integration layer developed by TupleJump.   The integration layer is called Cash:
https://github.com/tuplejump/cash/

To get started, clone the cash repo and follow the instructions here.   In summary, build the project and copy target/*.jar and target/dependency/cassandra-*.jar into $HIVE_HOME/lib.

Play with Shark

Fun time.  Start shark with the following:
bone@zen:~/tools/shark-> bin/shark

Note that there are two other versions of this command (bin/shark-withinfo and bin/shark-withdebug).  Both are *incredibly* useful if you run into trouble. 

Once you see the shark prompt, you should be able to refresh the Spark Web UI and see Shark under Running Applications.  To get started, first create a database.  Using the schema from our previous example/post, let's call our database "northpole":
shark> create database northpole;
OK
Time taken: 0.264 seconds

Next, you'll want to create an external table that maps to your cassandra table with:
shark> CREATE EXTERNAL TABLE northpole.children(child_id string, country string, first_name string, last_name string, state string, zip string)
     >    STORED BY 'org.apache.hadoop.hive.cassandra.cql.CqlStorageHandler'
     >    WITH SERDEPROPERTIES ("cql.primarykey"="child_id", "comment"="check", "read_repair_chance"="0.1", "cassandra.host"="localhost", "cassandra.port"="9160", "dclocal_read_repair_chance"="0.0", "gc_grace_seconds"="864000", "bloom_filter_fp_chance"="0.1", "cassandra.ks.repfactor"="1", "compaction"="{'class' : 'SizeTieredCompactionStrategy'}", "replicate_on_write"="false", "caching"="all");
OK
Time taken: 0.419 seconds

At this point, you are free to execute some HiveQL queries!  Let's do a simple select:
shark> select * from northpole.children;
977.668: [Full GC 672003K->30415K(4054528K), 0.2639420 secs]
OK
michael.myers USA Michael Myers PA 18964
bart.simpson USA Bart Simpson CA 94111
johny.b.good USA Johny Good CA 94333
owen.oneill IRL Owen O'Neill D EI33
richie.rich USA Richie Rich CA 94333
collin.oneill IRL Collin O'Neill D EI33
dennis.menace USA Dennis Menace CA 94222
Time taken: 13.251 seconds

Caching

How cool is that? Now, let's create a cached table!
shark> CREATE TABLE child_cache TBLPROPERTIES ("shark.cache" = "true") AS SELECT * FROM northpole.children;
Moving data to: file:/user/hive/warehouse/child_cache
OK
Time taken: 10.294 seconds

And finally, let's try that select again, this time against the cached table:
shark> select * from child_cache;
OK
owen.oneill IRL Owen O'Neill D EI33
bart.simpson USA Bart Simpson CA 94111
michael.myers USA Michael Myers PA 18964
dennis.menace USA Dennis Menace CA 94222
richie.rich USA Richie Rich CA 94333
johny.b.good USA Johny Good CA 94333
collin.oneill IRL Collin O'Neill D EI33
Time taken: 6.511 seconds

Woot!

Alrighty, that should get you started.
Again -- kudos to TupleJump for all their work on the Spark/Shark -> C* bridge.

Wednesday, March 19, 2014

Pleasantly Uncomfortable : Innovation run rampant.

Over the last three years, we've built out a kick-ass platform for data management and analytics.

Early on, it was a lot of new technology.  We were integrating and deploying technologies at a rapid rate, almost one per month: dropwizard, spring, extjs then angular, cassandra, solr then elastic search, kafka, zookeeper, storm, titan, github, puppet, etc.  It was a whirlwind.

The new technologies substantially increased productivity and agility.  The platform was working. Product development became capabilities composition.

But recently, it occurred to me that the people we hired along the way and the processes we implemented to support that rapid technical evolution are more powerful than the platform itself. To support that platform approach, we adopted open-source dynamics internally.  Anyone can contribute to any module, just submit a pull request.  Teams are accountable for their products, but they are free to enhance and contribute to any module in the platform.  Those dynamics have allowed us to keep the innovative/collaborative spirit as we grow the company.

And oh my, are we growing...

We now have half-a-dozen product development teams. Each is a cross-discipline (dev, qa, ba, pm) mix of onshore and offshore resources.  The product development teams are enabled by another half-dozen teams that provide infrastructure and support services (ux/ui design, data/information architecture, infrastructure, maintenance, and devops).  The support teams pave the way for the product development teams, so they can rock out features and functionality at warp speed.  For the most part, the product teams use Scrum while the support teams use Kanban so they can react quickly to changing priorities and urgent needs.

Each month we have sprint reviews that mimic a science fair.  Teams have started dressing up and wearing costumes.  It is fun.  Plain and simple.   But at this last sprint review, something happened to me that has never happened in my career.

I've spent my years at two different types of companies:  startups and large enterprises.  At the startups, every innovation (and all of the code) was generated by hands at keyboards within a single room (or two).  You knew everything that was going into the product, every second of the day.  At large enterprises, innovation was stifled by process.  You knew everything that was happening because things happened at a turtle's pace.  At HMS, we've got something special, a balance between those worlds.

At the last sprint review, I was surprised... for the first time in my career.  The teams presented innovations that I didn't know about and never would have come up with on my own.   There were beautiful re-factorings, and enabling technical integrations.  But honestly, I was uncomfortable.   I thought maybe I was losing my edge.  I questioned whether I reached a point where I could no longer keep up with everything.  It was disconcerting.

I spent a couple hours moping.  Then in a moment of clarity, I realized that I was a member of an Innovative Organization: an organization at an optimal balance point between process and productivity, where the reigns of innovation were in everyone's hands -- with a platform and processes that supported them.

Yeah, this sound sounds corny.  But I kid you not, it is amazing.  We've gone from a situation where a few champions were moving boulders up mountains, to a state where entire teams are leaning forward and pulling the organization along.  I'm now happy to enjoy the ride. (but you can bet your ass, I'm going to try to race ahead and meet them at the pass =)

(Kudos to @acollautt, @irieksts,@jmosco, @pabrahamsson, @bflad for giving me this uncomfortable wedgie-like feeling)


Friday, March 7, 2014

Spark on Cassandra (w/ Calliope)


We all know that reporting off of data stored in NoSQL repositories can be cumbersome.  Either you built the queries into your data model, or you didn't.  If you are lucky, you've paired Cassandra with an indexer like SOLR or Elastic Search, but sometimes an index isn't enough to perform complex analytics on your data.  Alternatively, maybe you just want to do a simple transformation on the data.  That is often easier said than done.

What we all need is a generic way to run functions over data stored in Cassandra.   Sure, you could go grab Hadoop, and be locked into articulating analytics/transformations as MapReduce constructs.  But that just makes people sad.  Instead, I'd recommend Spark.  It makes people happy.

When I set out to run Spark against Cassandra however, I found relatively little information.  This is my attempt to rectify that.   If you are impatient, you can just go clone the repo I made:
https://github.com/boneill42/spark-on-cassandra-quickstart

Get Calliope

First stop, Calliope.
http://tuplejump.github.io/calliope/
Then go here so you know how to pronounce it. =)

Again, for reasons I've mentioned before,  I wanted to access Cassandra via CQL.  Unfortunately, at the time of this writing, the CQL version of Calliope wasn't generally available.  You need to submit for early access.  Fortunately, Rohit and crew are very responsive.  And once you have access, you can create a new project that uses it.  Drop the dependency in your pom.

<dependency>
    <groupId>com.tuplejump</groupId>
    <artifactId>calliope_2.9.3</artifactId>
    <version>0.8.1-EA</version>
</dependency>

Get Scala'd Up

Now, if you want to fully immerse yourself in the Spark experience, you'll want to develop in Scala.  For me, that meant switching over to IntelliJ because I had some challenges using Eclipse with specific (older) versions of Scala. Calliope 0.8.1 early access was compiled with Scala 2.9.3. So you'll want an IDE that supports that version.  To get maven support for scala, drop the following into your pom:

<pluginRepositories>
   <pluginRepository>
      <id>scala-tools.org</id>
      <name>Scala-tools Maven2 Repository</name>
      <url>http://scala-tools.org/repo-releases</url>
   </pluginRepository>
</pluginRepositories>

<dependency>
   <groupId>org.scalatest</groupId>
   <artifactId>scalatest_2.9.3</artifactId>
   <version>2.0.M5b</version>
</dependency>

<plugin>
   <groupId>org.scala-tools</groupId>
   <artifactId>maven-scala-plugin</artifactId>
   <version>2.15.2</version>
   <executions>
      <execution>
         <goals>
            <goal>compile</goal>
            <goal>testCompile</goal>
         </goals>
      </execution>
   </executions>
   <configuration>
      <scalaVersion>2.9.3</scalaVersion>
      <launchers>
         <launcher>
            <id>run-scalatest</id>
            <mainClass>org.scalatest.tools.Runner</mainClass>
            <args>
               <arg>-p</arg>
               <arg>${project.build.testOutputDirectory}</arg>
            </args>
            <jvmArgs>
               <jvmArg>-Xmx512m</jvmArg>
            </jvmArgs>
         </launcher>
      </launchers>
      <jvmArgs>
         <jvmArg>-Xms64m</jvmArg>
         <jvmArg>-Xmx1024m</jvmArg>
      </jvmArgs>
   </configuration>
</plugin>


Get Spark

Now, the simple part.  Add Spark. =)
<dependency>
   <groupId>org.apache.spark</groupId>
   <artifactId>spark-core_2.9.3</artifactId>
   <version>0.8.1-incubating</version>
</dependency>

Sling Code

Now, that we have our project stood up.  Let's race over a column family and do some processing!

All of the code to perform a Spark job is contained in FindChildrenTest.  There are two components: a transformer and the job.  The transformer is very similar to the Mapper concept that we have in storm-cassandra-cql.  The transformer translates CQL rows into objects that can be used in the job.  Here is the code for the transformer:

private object Cql3CRDDTransformers {
  import RichByteBuffer._
  implicit def row2String(key: ThriftRowKey, row: ThriftRowMap): List[String] = {
    row.keys.toList
  }
  implicit def cql3Row2Child(keys: CQLRowKeyMap, values: CQLRowMap): Child = {
    Child(keys.get("child_id").get, values.get("first_name").get, values.get("last_name").get, values.get("country").get, values.get("state").get, values.get("zip").get)
  }
}


The only real important part is the function that translates a row (keys and values) into the Child object.

 With a transformer in place, it is quite simple to create a job:

class FindChildrenTest extends FunSpec {
  import Cql3CRDDTransformers._
  val sc = new SparkContext("local[1]", "castest")
  describe("Find All Children in IRL") {
    it("should be able find children in IRL") {
      val cas = CasBuilder.cql3.withColumnFamily("northpole", "children")
      val cqlRdd = sc.cql3Cassandra[Child](cas)
      val children = cqlRdd.collect().toList
      children.filter((child) => child.country.equals("IRL")).map((child) => println(child))
      sc.stop()
    }
  }
}


The first line connects to a keyspace, table.  For this example, I reused a schema from my webinar a few years ago.  You can find the cql here.  The second line creates a Resilient Distributed Dataset (RDD) containing Child objects.  An RDD is the primary dataset abstraction in Spark. Once you have an RDD, you can operate on that RDD as if it were any other map.  (pretty powerful stuff)

In the code above, we filter the RDD for children in Ireland.  We then race over the result, and print the children out.  If all goes well, you should end up with the following output:

Child(collin.oneill,Collin,O'Neill,IRL,D,EI33)
Child(owen.oneill,Owen,O'Neill,IRL,D,EI33)

OK -- That should be enough to make you dangerous.  I have to give a *HUGE* pile of kudos to Rohit Rai and his team at TupleJump for developing the Calliope project. They are doing great things at TupleJump.  I'm keeping an eye on Stargate and Cash as well.  In fact, next time, I'll take this a step further and show Shark against Cassandra using Cash.