This post is a follow-up of the talk given at Big Data AW meetup in Stockholm and focused on different use cases and design approaches for building scalable data processing platforms with SMACK(Spark, Mesos, Akka, Cassandra, Kafka) stack. While stack is really concise and consists of only several components it is possible to implement different system designs which list not only purely batch or stream processing, but more complex Lambda and Kappa architectures as well. So let’s start with a really short overview to be on the same page and continue with designs and examples coming from production projects experience.
Recap
-
Spark - fast and general engine for distributed, large-scale data processing
-
Mesos - cluster resource management system that provides efficient resource isolation and sharing across distributed applications
-
Akka - a toolkit and runtime for building highly concurrent, distributed, and resilient message-driven applications on the JVM
-
Cassandra - distributed, a highly available database designed to handle large amounts of data across multiple datacenters
-
Kafka - a high-throughput, low-latency distributed messaging system/commit log designed for handling real-time data feeds
Storage layer: Cassandra
Cassandra is well-known for its high availability and high-throughput characteristics and is able to handle enormous write loads and survive cluster nodes failures. In terms of the CAP theorem, Cassandra provides tunable consistency/availability for operations.
What is more interesting when it comes to data processing is that Cassandra is linearly scalable(increased loads could be addressed by just adding more nodes to a cluster) and it provides cross-datacenter replication(XDCR) capabilities. Actually, XDCR provides not only replication but a set of really interesting use cases to be used for:
- geo-distributed datacenters handling data specific for the region or located closer to customers
- data migration across datacenters: recovery after failures or moving data to a new DC
- separate operational and analytics workloads
But all these features come for their own price and with Cassandra, this price is its data model, which could be thought of just as a nested sorted map that is distributed across cluster nodes by partition key, and entries are sorted/grouped by clustering columns. Here’s a small example:
CREATE TABLE campaign(
id uuid,
year int,
month int,
day int,
views bigint,
clicks bigint,
PRIMARY KEY (id, year, month, day)
);
INSERT INTO campaign(id, year, month, day, views, clicks)
VALUES(40b08953-a…,2015, 9, 10, 1000, 42);
SELECT views, clicks FROM campaign
WHERE id=40b08953-a… and year=2015 and month>8;
To get specific data in some range the full key must be specified and no range clauses are allowed except for the last column in the list. This constraint is introduced to limit multiple scans for different ranges which will produce random access to disks and lower down the performance. This means that the data model should be carefully designed against the read queries to limit the number of reads/scans which leads to lesser flexibility when it comes to supporting new queries. Here are C* data modeling 101 slides that provide several examples of how CQL tables are represented internally.
But what if one has some tables that need to be joined somehow with other tables? Let’s consider the next case: calculate total views per campaign for a given month for all campaigns.
CREATE TABLE event(
id uuid,
ad_id uuid,
campaign uuid,
ts bigint,
type text,
PRIMARY KEY(id)
);
With the given model, the only way to achieve this goal is to read all campaigns, read all events, sum the proper ones (with matched campaign id) and assign them to the campaign. And it looks really challenging to implement such a sort of application because the amount of data stored in Cassandra could be really huge and won’t fit the memory. So the processing of such sort of data should be done in a distributed manner and Spark perfectly fits this use case.
Processing layer: Spark
The main abstraction Spark operates with is RDD(Resilient Distributed Dataset, a distributed collection of elements) and the workflow consists of four main phases:
- RDD operations(transformations and actions) form DAG (Direct Acyclic Graph)
- DAG is split into stages of tasks which are then submitted to the cluster manager
- stages combine tasks that don’t require shuffling/repartitioning
- tasks run on workers and results then return to the client
Here’s how one can solve the above problem with Spark and Cassandra:
val sc = new SparkContext(conf)
case class Event(id: UUID, ad_id: UUID, campaign: UUID, ts: Long, `type`: String)
sc.cassandraTable[Event]("keyspace", "event")
.filter(e => e.`type` == "view" && checkMonth(e.ts))
.map(e => (e.campaign, 1))
.reduceByKey(_ + _)
.collect()
Interaction with Cassandra is performed via spark-cassandra-connector which makes it really easy and straightforward. There’s one more interesting option to work with NoSQL stores - SparkSQL, which translates SQL statements into a series of RDD operations.
case class CampaignReport(id: String, views: Long, clicks: Long)
sql("""SELECT campaign.id as id, campaign.views as views,
campaign.clicks as clicks, event.type as type
FROM campaign
JOIN event ON campaign.id = event.campaign
""").rdd
.groupBy(row => row.getAs[String]("id"))
.map{ case (id, rows) =>
val views = rows.head.getAs[Long]("views")
val clicks = rows.head.getAs[Long]("clicks")
val res = rows.groupBy(row => row.getAs[String]("type")).mapValues(_.size)
CampaignReport(id, views = views + res("view"), clicks = clicks + res("click"))
}.saveToCassandra(“keyspace”, “campaign_report”)
With several lines of code, it’s possible to implement naive Lamba design which of course could be much more sophisticated, but this example shows just how easy this can be achieved.
Almost MapReduce: bringing processing closer to data
Spark-Cassandra connector is data locality aware and reads the data from the closest node in a cluster thus minimizing the amount of data transferred around the network. To fully facilitate Spark-C* connector data locality awareness, Spark workers should be collocated with Cassandra nodes.
Alongside Spark collocation with Cassandra, it makes sense to separate your operational (or write-heavy) cluster from one for analytics:
- clusters can be scaled independently
- data is replicated by Cassandra, no extra work is needed
- analytics cluster has different Read/Write load patterns
- analytics cluster could contain additional data (e.g. dictionaries) and processing results
- Spark resource impact is limited to only one cluster
Let’s look at Spark applications deployment options one more time: There are three main options available for cluster resource managers:
- Spark Standalone - Spark master and Workers are installed and executed as standalone applications (which obviously introduces some ops overhead and support only static resource allocation per worker)
- YARN is really nice if you already have a Hadoop ecosystem
- Mesos which from the beginning was designed for dynamic allocation of cluster resources and not only for running Hadoop applications but for handling heterogeneous workloads
Mesos architecture
Mesos cluster consists of Master nodes that are responsible for resource offers and scheduling and Slave nodes which do the actual heavy lifting of tasks execution. In HA mode with multiple Masters ZooKeeper is used for leader election and service discovery. Applications executed on Mesos are called Frameworks and utilize API to handle resource offers and submit tasks to Mesos. Generally, the process of task execution consists of these steps:
- Slaves publish available resources to Master
- Master sends resource offers to Frameworks
- Scheduler replies with tasks and resources needed per task
- Master sends tasks to slaves
Bringing Spark, Mesos, and Cassandra together
As said before Spark workers should be collocated with Cassandra nodes to enforce data locality awareness thus lowering the amount of network traffic and Cassandra cluster load. Here’s one of the possible deployment scenarios on how to achieve this with Mesos.
- Mesos Masters and ZooKeepers collocated
- Mesos Slaves and Cassandra nodes collocated to enforce better data locality for Spark
- Spark binaries deployed to all worker nodes and
spark-env.sh
is configured with proper master endpoints and executor jar location - Spark Executor JAR uploaded to S3/HDFS
With provided setup Spark job can be submitted to the cluster with simple spark-submit
invocation from any worker nodes having Spark binaries installed and assembly jar containing actual job logic uploaded
spark-submit --class io.datastrophic.SparkJob /etc/jobs/spark-jobs.jar
There exist options to run Dockerized Spark so that there’s no need to distribute binaries across every single cluster node.
Scheduled long-running tasks
Every data processing system sooner or later faces the necessity of running two types of jobs: scheduled/periodic jobs like periodic batch aggregations and long-running ones which are the case for stream processing. The main requirement for both of these types is fault tolerance - jobs must continue running even in case of cluster nodes failures. Mesos ecosystem comes with two great frameworks supporting each of these types of jobs.
Marathon is a framework for fault-tolerant execution of long-running tasks supporting HA mode with ZooKeeper, able to run Docker, and having a nice REST API. Here’s an example of a simple job configuration running spark-submit
as a shell command:
Chronos has the same characteristics as Marathon but is designed for running scheduled jobs and in general, it is distributed HA cron supporting graphs of jobs. Here’s an example of an S3 compaction job configuration which is implemented as a simple bash script:
There are plenty of frameworks already available or under active development which targeted to integrate widely used systems with Mesos resource management capabilities. Just to name some of them:
- Hadoop
- Cassandra
- Kafka
- Myriad: YARN on Mesos
- Storm
- Samza
Ingesting the data
So far so good: the storage layer is designed, resource management is set up and jobs are configured. The only thing which is not there yet is the data to process. Assuming that incoming data will arrive at high rates the endpoints which will receive it should meet next requirements:
- provide high throughput/low latency
- being resilient
- allow easy scalability
- support back pressure
Backpressure is not a must, but it would be nice to have this as an option to handle load spikes.
Akka perfectly fits the requirements and basically, it was designed to provide this feature set. So what’s is Akka:
- actor model implementation for JVM
- message-based and asynchronous
- enforces no shared mutable state
- easy scalable from one process to a cluster of machines
- actors form hierarchies with parental supervision
- not only concurrency framework:
akka-http
,akka-streams
, andakka-persistence
Here’s a simplified example of three actors which handle JSON HttpRequest, parse it into domain model case class and save it to Cassandra:
class HttpActor extends Actor {
def receive = {
case req: HttpRequest =>
system.actorOf(Props[JsonParserActor]) ! req.body
case e: Event =>
system.actorOf(Props[CassandraWriterActor]) ! e
}
}
class JsonParserActor extends Actor {
def receive = {
case s: String => Try(Json.parse(s).as[Event]) match {
case Failure(ex) => //error handling code
case Success(event) => sender ! event
}
}
}
class CassandraWriterActor extends Actor with ActorLogging {
//for demo purposes, session initialized here
val session = Cluster.builder()
.addContactPoint("cassandra.host")
.build()
.connect()
override def receive: Receive = {
case event: Event =>
val statement = new SimpleStatement(event.createQuery)
.setConsistencyLevel(ConsistencyLevel.QUORUM)
Try(session.execute(statement)) match {
case Failure(ex) => //error handling code
case Success => sender ! WriteSuccessfull
}
}
}
It looks like only several lines of code are needed to make everything work, but while writing raw data (events) to Cassandra with Akka is easy there is a number of gotchas:
- Cassandra is still designed for fast serving but not batch processing, so pre-aggregation of incoming data is needed
- computation time of aggregations/rollups will grow with the amount of data
- actors are not suitable for performing aggregation due to the stateless design model
- micro-batches could partially solve the problem
- some sort of reliable buffer for raw data is still needed
Kafka as a buffer for incoming data
For keeping incoming data with some retention and its further pre-aggregation/processing some sort of distributed commit log could be used. In this case, consumers will read data in batches, process it, and store it into Cassandra in form of pre-aggregates. Here’s an example of publishing JSON data coming over HTTP to Kafka with akka-http
:
val config = new ProducerConfig(KafkaConfig())
lazy val producer = new KafkaProducer[A, A](config)
val topic = “raw_events”
val routes: Route = {
post{
decodeRequest{
entity(as[String]){ str =>
JsonParser.parse(str).validate[Event] match {
case s: JsSuccess[String] => producer.send(new KeyedMessage(topic, str))
case e: JsError => BadRequest -> JsError.toFlatJson(e).toString()
}
}
}
}
}
object AkkaHttpMicroservice extends App with Service {
Http().bindAndHandle(routes, config.getString("http.interface"), config.getInt("http.port"))
}
Consuming the data: Spark Streaming
While Akka is still could be used for consuming stream data from Kafka, having Spark in your ecosystem brings Spark Streaming as an option to solve the problem:
- it supports a variety of data sources
- provides at-least-once semantics
- exactly-once semantics available with Kafka Direct and idempotent storage
Consuming event stream from Kinesis with Spark Streaming example:
val ssc = new StreamingContext(conf, Seconds(10))
val kinesisStream = KinesisUtils.createStream(ssc,appName,streamName,
endpointURL,regionName, InitialPositionInStream.LATEST,
Duration(checkpointInterval), StorageLevel.MEMORY_ONLY)
}
//transforming given stream to Event and saving to C*
kinesisStream.map(JsonUtils.byteArrayToEvent)
.saveToCassandra(keyspace, table)
ssc.start()
ssc.awaitTermination()
Designing for Failure: Backups and Patching
Usually, this is the most boring part of any system but it’s really important when there exists any possibility that the data which came into the system could be invalid or when all the analytics data center crushes.
So why not store the data in Kafka/Kinesis? For the moment of writing Kinesis has only one day of retention and without backups, in case of a failure, all processing results could be lost. While Kafka supports much larger retention periods, the cost of hardware ownership should be considered because for example, S3 storage is way cheaper than multiple instances running Kafka as well as S3 SLA are really good.
Apart from having backups the restoring/patching strategies should be designed upfront and tested so that any problems with data could be quickly fixed. Programmer’s mistake in aggregation job or duplicate data could break the accuracy of the computation results so fixing the error shouldn’t be a big problem. One thing to make all these operations easier is to enforce idempotency in the data model so that multiple repetitions of the same operations produce the same results(e.g. an SQL update is an idempotent operation while the counter increment is not).
Here is an example of a Spark job which reads S3 backup and loads it into Cassandra:
val sc = new SparkContext(conf)
sc.textFile(s"s3n://bucket/2015/*/*.gz")
.map(s => Try(JsonUtils.stringToEvent(s)))
.filter(_.isSuccess).map(_.get)
.saveToCassandra(config.keyspace, config.table)
The Big picture
The high-level design of data platform built with SMACK
So what does the SMACK stack provide:
- a concise toolbox for a wide variety of data processing scenarios
- battle-tested and widely used software with large communities
- easy scalability and replication of data while preserving low latencies
- unified cluster management for heterogeneous loads
- single platform for any kind of applications
- implementation platform for different architecture designs (batch, streaming, Lambda, Kappa)
- fast time-to-market (e.g. for MVP verification)