Spark tutorial: Get started with Apache Spark

A step by step guide to loading a dataset, applying a schema, writing simple queries, and querying real-time data with Structured Streaming

Spark tutorial: Get started with Apache Spark
Thinkstock

Apache Spark has become the de facto standard for processing data at scale, whether for querying large datasets, training machine learning models to predict future trends, or processing streaming data.

In this article, we’ll show you how to use Apache Spark to analyze data in both Python and Spark SQL. And we’ll extend our code to support Structured Streaming, the new current state of the art for handling streaming data within the platform. We’ll be using Apache Spark 2.2.0 here, but the code in this tutorial should also work on Spark 2.1.0 and above.

How to run Apache Spark

Before we begin, we’ll need an Apache Spark installation. You can run Spark in a number of ways. If you’re already running a Hortonworks, Cloudera, or MapR cluster, then you might have Spark installed already, or you can install it easily through Ambari, Cloudera Navigator, or the MapR custom packages.

If you don’t have such a cluster at your fingertips, then Amazon EMR or Google Cloud Dataproc are both easy ways to get started. These cloud services allow you to spin up a Hadoop cluster with Apache Spark installed and ready to go. You’ll be billed for compute resources with an extra fee for the managed service. Remember to shut the clusters down when you’re not using them!

Of course, you could instead download the latest release from spark.apache.org and run it on your own laptop. You will need a Java 8 runtime installed (Java 7 will work, but is deprecated). Although you won’t have the compute power of a cluster, you will be able to run the code snippets in this tutorial.

I’ll be using a third cloud option, the Databricks Community Edition. The Databricks Community Edition provides a free mini-cluster for your use, a web notebook interface that is similar to Jupyter or Apache Zeppelin, and small amounts of storage to play with. Databricks also allows public sharing of notebooks. You can find all of the code I run in this article here.

If you’re just playing around with Apache Spark and you don’t want to have to set it up either locally or in the cloud, the Databricks Community Edition is a great way to experiment. And you can upgrade to the full Databricks platform later if you wish, or you can take your code and run it on any other Apache Spark platform instead.

As we’ll be writing Python code in this tutorial, your first step after getting Spark running will be to bring up Spark’s Python shell, PySpark:

$> PATH_TO_SPARK/bin/pyspark

A sample Spark dataset

Having loaded Apache Spark, we need some data to work on. I’ve created a small dataset for us to explore—data from a fictional video streaming service. The customers watch shows on the app and the app sends event data back to us in JSON format, letting us know the time of the event (in epoch format), the customer ID, the show ID, and the state of the event, which can be one of four different types:

  1. ”open”—the customer starts watching a new show
  2. ”heartbeat”—the customer is still watching this show
  3. ”finish_completed”—the customer finishes the entire show
  4. ”finish_incomplete”—the customer closed the show without finishing it

An example event will look like this:

{”customer_id”:88,”show_id”:216,”state”:”open”,”timestamp”:1515502045}

Loading data in Spark

Loading data in Apache Spark is remarkably easy, especially when handling structured information like our JSON events. All you need to do is create a new dataframe, by entering a line in the PySpark shell:

eventsDF = spark.read.json(“/FileStore/tables/videodata.json”)

If you’re storing your data on S3, you would use the URL to the bucket instead:

eventsDF = spark.read.json(“s3a://bucket-name/videodata.json”)

Same goes for Google Cloud Storage:

eventsDF = spark.read.json(“gs://bucket-name/videodata.json”)

If you’re running Spark locally, you can use the file:/// URI to point to the data file.

The Apache Spark runtime will read the JSON file from storage and infer a schema based on the contents of the file. This is wonderful, but does pose a few issues you need to be aware of. First, there is a runtime cost associated with this process. It won’t impact us much here, but on large datasets it can incur a performance penalty. Second, if you infer a schema on data where fields may be optional, you may end up with a schema that doesn’t describe all or your fields. Finally, even when all your fields show up in the input data, you may not get exactly what you want in the auto-generated schema.

If we look at the dataframe created from our JSON file, we can see that it’s close but not quite right:

eventsDF:pyspark.sql.dataframe.DataFrame customer_id:long show_id:long state:string timestamp:long

The customer_id, show_id, and state types are OK, but while Spark has determined that timestamp is a long, it would make more sense to use an actual timestamp so we don’t have to deal with epoch time. Instead of inferring the schema automatically, we can define one ourselves and instruct Spark to apply it to the data as it is loaded. This also has the benefit of generating an error if fields appear that our schema doesn’t define. The schema is a StructType object with an array of StructField objects for each field:

jsonSchema = StructType([StructField(“customer_id”, LongType(), True),StructField(“show_id”, LongType(), True), StructField(“state”, StringType(), True), StructField(“timestamp”, TimestampType(), True) ])

The True value determines whether the field is optional or not. The schema is then applied as we open the input data:

eventsSchemaDF = spark.read.schema(jsonSchema).json(“/FileStore/tables/videodata.json”)

If we do a head() on this dataframe to get the first entry, we can see that the epoch time has been turned into a datetime object:

eventsSchemaDF.head() => Row(customer_id=24, show_id=308, state=u’open’,timestamp=datetime.datetime(2017, 11, 12, 16, 58, 5))

Exploring data in Spark

Now that we have our dataset loaded as a dataframe, we can query it to obtain information. Let’s start with simple: How many events are there in our dataframe? We can use the count() method to find out:

eventsSchemaDF.count() => Out[]: 1166450

Now let’s count the number of “open” events? We can use the where() method for this:

eventsSchemaDF.where(eventsSchemaDF.state =="open").count() => Out[61]: 666258

Now, if we register the dataframe as a table, we can issue Spark SQL queries against it:

eventsSchemaDF.createOrReplaceTempView(“video_events”)
spark.sql(“SELECT count(*) FROM video_events WHERE state = ‘open’”).show()

We can use standard SQL syntax to create more complex queries. Below, we obtain how many open events are registered against each show and display the results in descending order (the show() method at the end of the query prints out the first 20 rows of the dataframe to the console):

spark.sql(“SELECT show_id, count(*) AS open_events FROM video_events WHERE state = ‘open’ GROUP BY show_id ORDER BY open_events DESC”).show()
apache spark show open events IDG

In dataframe syntax, this would look like:

eventsSchemaDF.select(eventsSchemaDF.show_id)
.where(eventsSchemaDF.state=="open")
.groupBy(eventsSchemaDF.show_id)
.count()
.orderBy(desc(“count”))
.show()

We can even create time windows into the data to see what is happening with our shows over a period of time. The next query creates five-minute windows and counts the states for each show within each window:

windowDF = spark.sql(“SELECT show_id, state, window(timestamp, ‘5 minutes’) AS window,count(*) AS count FROM video_events GROUP BY state,show_id, window ORDER BY window, count,show_id”)
apache spark time windows IDG

Having created our time window counts, we can persist these results back out to JSON (or any other store Apache Spark is connected to, like a Hive table) using the write() method:

windowDF.write.json(“/FileStore/tables/windowdata.json”)

The Spark web UI

Apache Spark comes with a web interface that allows us to inspect the status of a cluster. By default, the Spark web UI is found at port 8080 for the Spark driver process (where you have started PySpark) and port 8081 for all of the worker nodes. Most of the time you’ll be looking at the driver page, which will look something like this:

apache spark web ui IDG

The Spark web UI allows you to follow any currently running jobs as well as inspect jobs that have run previously. If you click on an active job, you’ll be shown active tasks on the cluster, completed tasks, and tasks yet to be scheduled. You can also see a visualization of the graph that Spark has created to process your data. Additionally, the web UI allows you to examine logs on the driver and workers, important JVM metrics such as time spent in garbage collection, straggling tasks, and a timeline of the entire job process to assist in debugging.

Structured Streaming

Our examples above work on static data; they assume we have a file that contains all the events we wish to examine. However, in the case of a video player application sending data home, that’s unlikely to be the case. Data will be arriving all the time from all over the world. How can we deal with that? We could store the data as it arrives and run a nightly batch job to see trends over the past 24 hours, but what if that’s not good enough? Apache Spark allows us to process data and obtain insights from that data in real time through its streaming support.

Streaming data processing in Apache Spark was previously handled by Spark Streaming. Spark Streaming processed streams in small batches (known as “microbatches”) and provided a functional-style API to manipulate the data. This API is still available in Apache Spark 2.x, but has been superseded by a new approach known as Structured Streaming. This presents us with the same dataframe or Spark SQL API we have already seen, but allows us to run our queries on live data.

If we were building a fully-fledged analytics platform, we would likely place incoming events into Apache Kafka or Apache Pulsar and then hook up Spark to read from those log systems. To reduce the number of moving parts, we’ll instead take advantage of the ability of Structured Streaming to read from a filesystem. New files will be processed as they are added to the supplied path.

Building a streaming dataframe is a snap: We simply replace read() with readStream(). We also adjust the input path so that it accepts a wildcard for multiple JSON files. Although we only have one file there right now, we’ll add another shortly.

eventsStreamingDF = spark.readStream
                         .schema(jsonSchema)
                         .json(/FileStore/tables/video*.json)

Now we stream that dataframe into an in-memory table where we can run ongoing queries against it, and instruct Apache Spark to start the stream processing with start():

query = eventsStreamingDF.writeStream
                         .format(“memory”)
                         .queryName(“streaming_events”)
                         .start()

Again, if we were creating a real application here, we would likely serialize the updates into a Hive table or Parquet files or the like rather than storing the table in memory.

The resulting table is called streaming_events. We can query it just like any other table in Spark SQL:

spark.sql(“SELECT count(*) FROM streaming_events WHERE state = ‘open’”)

This should give us the same answer we got from our batch operations above. However, if you make a copy of the input file, name it videodata2.json, upload it to the same location as videodata.json, and re-run this query, you should see Structured Streaming process the new file and add the data to the in-memory table. The count should now be doubled.

Finally, you can stop the streaming by calling the stop() method on the query handle:

query.stop()

Build on our Spark example

A useful exercise that builds on what we’ve shown so far is to implement a query that can detect whether a customer is sharing their account with others. Maybe we’re tolerant of some sharing in our video application, but we don’t want customers going overboard. You could write a query that checks for more than five open events in a given five-minute time window for any customer ID by extending the code already shown in this tutorial. Try to get that query running in both batch and Structured Streaming modes.

Beyond that, the Apache Spark website has a good set of documentation for all the APIs in the platform, and the Databricks website has extensive documentation and very helpful tutorials using a variety of techniques and datasets. Go forth and experiment!

Copyright © 2017 IDG Communications, Inc.