Fault-tolerant collection of elements that can be operated on in parallel
Two ways to create RDDs
1. Parallelizing an existing collection in your driver program
SparkContext’s parallelize method applied on an existing collection in your driver program
Copied to form a distributed dataset that can be operated on in parallel
Important parameter for parallel collections is the number of partitions to cut the dataset into
Typically you want 2-4 partitions for each CPU in your cluster
set it manually by passing it as a second parameter to parallelize (e.g. sc.parallelize(data, 10))
2. Referencing a dataset in an external storage system
local file system, HDFS, Cassandra, HBase, Amazon S3, etc
Spark supports text files, SequenceFiles, and any other Hadoop InputFormat
Text file RDDs can be created using SparkContext’s textFile method
Notes on reading files
If using a path on the local filesystem, the file must also be accessible at the same path on worker nodes
support running on directories, compressed files, and wildcards
example: you can use textFile("/my/directory"), textFile("/my/directory/*.txt"), and textFile("/my/directory/*.gz").
SparkContext.wholeTextFiles lets you read a directory containing multiple small text files, and returns each of them as (filename, content) pairs
For SequenceFiles, use SparkContext’s sequenceFile[K, V] method where K and V are the types of key and values in the file
Other Hadoop InputFormats: you can use the SparkContext.hadoopRDD method, which takes an arbitrary JobConf and input format class, key class and value class. Set these the same way you would for a Hadoop job with your input source.
RDD.saveAsObjectFile and SparkContext.objectFile support saving an RDD in a simple format consisting of serialized Java objects.
RDD Operations
transformations
Creates a new dataset from an existing one
Example: map is a transformation that passes each dataset element through a function and returns a new RDD representing the results
Each transformed RDD is recomputed each time you run an action on it unless the RDD is made persistant
Common Transformations
Other Common Transformations
actions
Return a value to the driver program after running a computation on the dataset
Example: reduce is an action that aggregates all the elements of the RDD using some function and returns the final result to the driver program
Common Actions
Passing Functions to Spark
Two ways to do this:
1. Passing anonymous functions (scala)
2. Static methods in a global singleton object
Example: You can define object MyFunctions and then pass MyFunctions.func1, as is shown above
While it is also possible to pass a reference to a method in a class instance (as opposed to a singleton object), this requires sending the object that contains that class along with the method.
To avoid this issue, the simplest way is to copy field into a local variable instead of accessing it externally
Printing elements
To print all elements on the driver, one can use the collect() method to first bring the RDD to the driver node
WRONG WAY:
rdd.foreach(println)
rdd.map(println)
stdout being called by the executors will write to the executor’s stdout instead of the one on the driver
Working with Key-Value Pairs
A few special operations are only available on RDDs of key-value pairs
The key-value pair operations are available in the PairRDDFunctions class
Shuffle operations
Shuffle is Spark’s mechanism for re-distributing data so that it’s grouped differently across partitions
Involves copying data across executors and machines, making the shuffle a complex and costly operation - performance can be reduced
Can consume significant amounts of heap memory
When data does not fit in memory Spark will spill these tables to disk
Results in additional overhead of disk I/O
Results in increased garbage collection
Generates a large number of intermediate files on disk
Files are not cleaned up from Spark’s temporary storage until Spark is stopped
Operations which can cause a shuffle
repartition operations like repartition and coalesce
‘ByKey' operations (except for counting) like groupByKey and reduceByKey
join operations like cogroup and join
Predictably ordered data following shuffle (increasing efficiency of data operations) can be achieved using:
mapPartitions to sort each partition using, for example, .sorted
repartitionAndSortWithinPartitions to efficiently sort partitions while simultaneously repartitioning
sortBy to make a globally ordered RDD
Shuffle behavior can be tuned and configured to optimize performance
RDD Persistence
Caching a dataset in memory across operations
Cache is fault-tolerant – if any partition of an RDD is lost, it will automatically be recomputed using the transformations that originally created it
You can mark an RDD to be persisted using the persist() or cache() methods on it.
Each node stores any partitions of "persistent" data that it computes in memory and reuses them in other actions on that dataset
Persisted RDDs can be stored using a specified storage level
Storage Level Options
Storage levels are meant to provide different trade-offs between memory usage and CPU efficiency.
If your RDDs fit comfortably with the default storage level (MEMORY_ONLY), leave them that way. This is the most CPU-efficient option, allowing operations on the RDDs to run as fast as possible.
If RDD does not fit in memory, try using MEMORY_ONLY_SER and selecting a fast serialization library to make the objects much more space-efficient, but still reasonably fast to access.
Don’t spill to disk unless the functions that computed your datasets are expensive, or they filter a large amount of the data. Otherwise, recomputing a partition may be as fast as reading it from disk.
Use the replicated storage levels if you want fast fault recovery
In environments with high amounts of memory or multiple applications, the experimental OFF_HEAP mode has several advantages:
Shared Variables
Spark SQL and DataFrames
Spark SQL is a Spark module for structured data processing
DataFrames
A DataFrame is a distributed collection of data organized into named columns
DataFrames can be constructed from a wide array of sources
structured data files
tables in Hive
external databases
existing RDDs
Two different methods for converting existing RDDs into DataFrames
1. infer the schema of an RDD that contains specific types of objects (uses reflection)
Scala interface automatically converts an RDD containing case classes into a DataFrame
case class defines the schema of the table
names of the arguments to the case class are read using reflection and become the names of the columns
2. construct a schema and then apply it to an existing RDD
allows you to construct DataFrames when the columns and their types are not known until runtime
a DataFrame can be created programmatically with three steps
The default data source (parquet unless otherwise configured by spark.sql.sources.default) will be used for all operations.
You can also manually specify the data source that will be used along with any extra options that you would like to pass to the data source
Can also use the shorted name (json, parquet, jdbc) for built-in sources
Apache Hive
DataFrames can also be saved as persistent tables using the saveAsTable
saveAsTable will materialize the contents of the dataframe and create a pointer to the data in the HiveMetastore
default saveAsTable will create a “managed table”
location of the data will be controlled by the metastore
data deleted automatically when a table is dropped
Persistent tables will still exist even after your Spark program has restarted, as long as you maintain your connection to the same metastore
A DataFrame for a persistent table can be created by calling the table method on a SQLContext with the name of the table
Users who do not have an existing Hive deployment can still create a HiveContext
When not configured by the hive-site.xml, the context automatically creates metastore_db and warehouse in the current directory
Hive Commands
Common Hive Statements
Hive Data Types
Parquet Files
Support for both reading and writing Parquet files that automatically preserves the schema of the original data.
Partition discovery
able to discover and infer partitioning information automatically
By passing path/to/table to either SQLContext.read.parquet or SQLContext.read.load, Spark SQL will automatically extract the partitioning information from the paths
Schema merging
Users can start with a simple schema, and gradually add more columns to the schema as needed
Configuration
Configuration of Parquet can be done using the setConf method on SQLContext or by running SET key=value commands using SQL.
Parquet Commands
JSON Datasets
Can automatically infer the schema of a JSON dataset and load it as a DataFrame
conversion can be done using SQLContext.read.json() on either an RDD of String, or a JSON file
JDBC To Other Databases
Spark SQL also includes a data source that can read data from other databases using JDBC
Save Modes
Save operations can optionally take a SaveMode, that specifies how to handle existing data if present
WARNING
save modes do not utilize any locking
are not atomic
not safe to have multiple writers attempting to write to the same location
when performing a Overwrite, the data will be deleted before writing out the new data.
Save Commands
Data Types
SQLContext
all functionality in Spark SQL comes from the SQLContext class, or one of its descendants
HiveContext
ability to write queries using the more complete HiveQL parser
access to Hive UDFs
ability to read data from Hive tables
do not need to have an existing Hive setup, and all of the data sources available to a SQLContext are still available
The sql function on a SQLContext enables applications to run SQL queries programmatically and returns the result as a DataFrame
Performance Tuning
Caching Data In Memory
Spark SQL can cache tables using an in-memory columnar format by calling sqlContext.cacheTable("tableName") or dataFrame.cache()
You can call sqlContext.uncacheTable("tableName") to remove the table from memory
Caching Commands
Common Tuning Commands
Examples
Spark ML
Why?
Make it easier to combine multiple algorithms into a single pipeline, or workflow
Main Concepts
ML Dataset
Spark ML uses the DataFrame from Spark SQL as a dataset which can hold a variety of data types. E.g., a dataset could have different columns storing text, feature vectors, true labels, and predictions
DataFrame supports many basic and structured types In addition to the types listed in the Spark SQL guide, DataFrame can use ML Vector types.
A DataFrame can be created either implicitly or explicitly from a regular RDD
ML Algorithms
Properties of ML Algorithms
Estimator
An Estimator is an algorithm which can be fit on a DataFrame to produce a Transformer. E.g., a learning algorithm is an Estimator which trains on a dataset and produces a model.
An Estimator implements a method fit() which accepts a DataFrame and produces a Transformer.
A learning algorithm such as Logistic Regression is an Estimator, and calling fit() trains a Logistic Regression Model.
The Logistic regression model data that is produced by the logistic regression algorithm is considered a Transformer.
Pipeline (all pipelines are estimators)
A Pipeline is specified as a sequence of stages, and each stage of a pipeline is either a Transformer or an Estimator.
Param
All Transformers and Estimators now share a common API for specifying parameters.
Each instance of a Transformer or Estimator has a unique ID, which is useful in specifying parameters
Stages are run in order, and the input dataset is modified as it passes through each stage
For Estimator stages, the fit() method is called to produce a Transformer (which becomes part of the PipelineModel, or fitted Pipeline), and that Transformer’s transform() method is called on the dataset.
Example of a pipeline
Stages of Pipeline
Stage 1
The Pipeline.fit() method is called on the original dataset which has raw text documents and labels
The Tokenizer.transform() method splits the raw text documents into words, adding a new column with words into the dataset.
Stage 2
The HashingTF.transform() method converts the words column into feature vectors, adding a new column with those vectors to the dataset.
Stage 3
LogisticRegression is an Estimator, so the Pipeline calls LogisticRegression.fit() to produce a LogisticRegressionModel.
A Pipeline’s stages are specified as an ordered array
Pipelines and PipelineModels help to ensure that training and test data go through identical feature processing steps.
Transformer
A Transformer is an algorithm which can transform one DataFrame into another DataFrame. E.g., an ML model is a Transformer which transforms an RDD with features into an RDD with predictions. A
Transformer implements a method transform() which converts one DataFrame into another, generally by appending one or more columns
Example functionality
A learning model might take a dataset, read the column containing feature vectors, predict the label for each feature vector, append the labels as a new column, and output the updated dataset
A feature transformer might take a dataset, read a column (e.g., text), convert it into a new column (e.g., feature vectors), append the new column to the dataset, and output the updated dataset.
PipelineModel (all pipelineModels are transformers)
For Transformer stages, the transform() method is called on the dataset
Example of pipelineModel
Stages are run in order, and the input dataset is modified as it passes through each stage
The PipelineModel has the same number of stages as the original Pipeline, but all Estimators in the original Pipeline have become Transformers.
Pipelines and PipelineModels help to ensure that training and test data go through identical feature processing steps.
Parameters
A Param is a named parameter with self-contained documentation. A ParamMap is a set of (parameter, value) pairs.
1. Set parameters for an instance. E.g., if lr is an instance of LogisticRegression, one could call lr.setMaxIter(10) to make lr.fit() use at most 10 iterations.
2. Pass a ParamMap to fit() or transform(). Any parameters in the ParamMap will override parameters previously specified via setter methods.