-
Spark General
-
Resilient Distributed Datasets
- Fault-tolerant collection of elements that can be operated on in parallel
-
Two ways to create RDDs
-
1. Parallelizing an existing collection in your driver program
- SparkContext’s parallelize method applied on an existing collection in your driver program
- Copied to form a distributed dataset that can be operated on in parallel
- Important parameter for parallel collections is the number of partitions to cut the dataset into
- Typically you want 2-4 partitions for each CPU in your cluster
- set it manually by passing it as a second parameter to parallelize (e.g. sc.parallelize(data, 10))
-
2. Referencing a dataset in an external storage system
- local file system, HDFS, Cassandra, HBase, Amazon S3, etc
- Spark supports text files, SequenceFiles, and any other Hadoop InputFormat
- Text file RDDs can be created using SparkContext’s textFile method
- Notes on reading files
- If using a path on the local filesystem, the file must also be accessible at the same path on worker nodes
- support running on directories, compressed files, and wildcards
- example: you can use textFile("/my/directory"), textFile("/my/directory/*.txt"), and textFile("/my/directory/*.gz").
- SparkContext.wholeTextFiles lets you read a directory containing multiple small text files, and returns each of them as (filename, content) pairs
- For SequenceFiles, use SparkContext’s sequenceFile[K, V] method where K and V are the types of key and values in the file
- Other Hadoop InputFormats: you can use the SparkContext.hadoopRDD method, which takes an arbitrary JobConf and input format class, key class and value class. Set these the same way you would for a Hadoop job with your input source.
- RDD.saveAsObjectFile and SparkContext.objectFile support saving an RDD in a simple format consisting of serialized Java objects.
-
RDD Operations
-
transformations
- Creates a new dataset from an existing one
- Example: map is a transformation that passes each dataset element through a function and returns a new RDD representing the results
- Each transformed RDD is recomputed each time you run an action on it unless the RDD is made persistant
- Common Transformations
- Other Common Transformations
-
actions
- Return a value to the driver program after running a computation on the dataset
- Example: reduce is an action that aggregates all the elements of the RDD using some function and returns the final result to the driver program
- Common Actions
-
Passing Functions to Spark
- Two ways to do this:
- 1. Passing anonymous functions (scala)
- 2. Static methods in a global singleton object
- Example: You can define object MyFunctions and then pass MyFunctions.func1, as is shown above
- While it is also possible to pass a reference to a method in a class instance (as opposed to a singleton object), this requires sending the object that contains that class along with the method.
- To avoid this issue, the simplest way is to copy field into a local variable instead of accessing it externally
- Printing elements
- To print all elements on the driver, one can use the collect() method to first bring the RDD to the driver node
- WRONG WAY:
- rdd.foreach(println)
- rdd.map(println)
- stdout being called by the executors will write to the executor’s stdout instead of the one on the driver
-
Working with Key-Value Pairs
- A few special operations are only available on RDDs of key-value pairs
- The key-value pair operations are available in the PairRDDFunctions class
-
Shuffle operations
- Shuffle is Spark’s mechanism for re-distributing data so that it’s grouped differently across partitions
- Involves copying data across executors and machines, making the shuffle a complex and costly operation - performance can be reduced
- Can consume significant amounts of heap memory
- When data does not fit in memory Spark will spill these tables to disk
- Results in additional overhead of disk I/O
- Results in increased garbage collection
- Generates a large number of intermediate files on disk
- Files are not cleaned up from Spark’s temporary storage until Spark is stopped
- Operations which can cause a shuffle
- repartition operations like repartition and coalesce
- ‘ByKey' operations (except for counting) like groupByKey and reduceByKey
- join operations like cogroup and join
- Predictably ordered data following shuffle (increasing efficiency of data operations) can be achieved using:
- mapPartitions to sort each partition using, for example, .sorted
- repartitionAndSortWithinPartitions to efficiently sort partitions while simultaneously repartitioning
- sortBy to make a globally ordered RDD
- Shuffle behavior can be tuned and configured to optimize performance
-
RDD Persistence
-
Caching a dataset in memory across operations
- Cache is fault-tolerant – if any partition of an RDD is lost, it will automatically be recomputed using the transformations that originally created it
- You can mark an RDD to be persisted using the persist() or cache() methods on it.
- Each node stores any partitions of "persistent" data that it computes in memory and reuses them in other actions on that dataset
-
Persisted RDDs can be stored using a specified storage level
- Storage Level Options
- Storage levels are meant to provide different trade-offs between memory usage and CPU efficiency.
- If your RDDs fit comfortably with the default storage level (MEMORY_ONLY), leave them that way. This is the most CPU-efficient option, allowing operations on the RDDs to run as fast as possible.
- If RDD does not fit in memory, try using MEMORY_ONLY_SER and selecting a fast serialization library to make the objects much more space-efficient, but still reasonably fast to access.
- Don’t spill to disk unless the functions that computed your datasets are expensive, or they filter a large amount of the data. Otherwise, recomputing a partition may be as fast as reading it from disk.
- Use the replicated storage levels if you want fast fault recovery
- In environments with high amounts of memory or multiple applications, the experimental OFF_HEAP mode has several advantages:
- Shared Variables
-
Spark SQL and DataFrames
- Spark SQL is a Spark module for structured data processing
-
DataFrames
- A DataFrame is a distributed collection of data organized into named columns
-
DataFrames can be constructed from a wide array of sources
- structured data files
- tables in Hive
- external databases
- existing RDDs
-
Two different methods for converting existing RDDs into DataFrames
-
1. infer the schema of an RDD that contains specific types of objects (uses reflection)
- Scala interface automatically converts an RDD containing case classes into a DataFrame
- case class defines the schema of the table
- names of the arguments to the case class are read using reflection and become the names of the columns
-
2. construct a schema and then apply it to an existing RDD
- allows you to construct DataFrames when the columns and their types are not known until runtime
- a DataFrame can be created programmatically with three steps
- 1. CreateanRDDofRowsfromtheoriginalRDD;
- 2. CreatetheschemarepresentedbyaStructTypematchingthestructureofRowsintheRDD
- 3. ApplytheschematotheRDDofRowsviacreateDataFramemethodprovidedbySQLContext
-
Data Sources
- The default data source (parquet unless otherwise configured by spark.sql.sources.default) will be used for all operations.
- You can also manually specify the data source that will be used along with any extra options that you would like to pass to the data source
- Can also use the shorted name (json, parquet, jdbc) for built-in sources
-
Apache Hive
- DataFrames can also be saved as persistent tables using the saveAsTable
- saveAsTable will materialize the contents of the dataframe and create a pointer to the data in the HiveMetastore
- default saveAsTable will create a “managed table”
- location of the data will be controlled by the metastore
- data deleted automatically when a table is dropped
- Persistent tables will still exist even after your Spark program has restarted, as long as you maintain your connection to the same metastore
- A DataFrame for a persistent table can be created by calling the table method on a SQLContext with the name of the table
- Users who do not have an existing Hive deployment can still create a HiveContext
- When not configured by the hive-site.xml, the context automatically creates metastore_db and warehouse in the current directory
- Hive Commands
- Common Hive Statements
- Hive Data Types
-
Parquet Files
- Support for both reading and writing Parquet files that automatically preserves the schema of the original data.
- Partition discovery
- able to discover and infer partitioning information automatically
- By passing path/to/table to either SQLContext.read.parquet or SQLContext.read.load, Spark SQL will automatically extract the partitioning information from the paths
- Schema merging
- Users can start with a simple schema, and gradually add more columns to the schema as needed
- Configuration
- Configuration of Parquet can be done using the setConf method on SQLContext or by running SET key=value commands using SQL.
- Parquet Commands
-
JSON Datasets
- Can automatically infer the schema of a JSON dataset and load it as a DataFrame
- conversion can be done using SQLContext.read.json() on either an RDD of String, or a JSON file
-
JDBC To Other Databases
- Spark SQL also includes a data source that can read data from other databases using JDBC
-
Save Modes
- Save operations can optionally take a SaveMode, that specifies how to handle existing data if present
- WARNING
- save modes do not utilize any locking
- are not atomic
- not safe to have multiple writers attempting to write to the same location
- when performing a Overwrite, the data will be deleted before writing out the new data.
- Save Commands
- Data Types
-
SQLContext
- all functionality in Spark SQL comes from the SQLContext class, or one of its descendants
-
HiveContext
- ability to write queries using the more complete HiveQL parser
- access to Hive UDFs
- ability to read data from Hive tables
- do not need to have an existing Hive setup, and all of the data sources available to a SQLContext are still available
- The sql function on a SQLContext enables applications to run SQL queries programmatically and returns the result as a DataFrame
-
Performance Tuning
-
Caching Data In Memory
- Spark SQL can cache tables using an in-memory columnar format by calling sqlContext.cacheTable("tableName") or dataFrame.cache()
- You can call sqlContext.uncacheTable("tableName") to remove the table from memory
- Caching Commands
- Common Tuning Commands
- Examples
-
Spark ML
-
Why?
- Make it easier to combine multiple algorithms into a single pipeline, or workflow
-
Main Concepts
-
ML Dataset
-
Spark ML uses the DataFrame from Spark SQL as a dataset which can hold a variety of data types. E.g., a dataset could have different columns storing text, feature vectors, true labels, and predictions
- DataFrame supports many basic and structured types In addition to the types listed in the Spark SQL guide, DataFrame can use ML Vector types.
- A DataFrame can be created either implicitly or explicitly from a regular RDD
-
ML Algorithms
-
Properties of ML Algorithms
- Estimator
- An Estimator is an algorithm which can be fit on a DataFrame to produce a Transformer. E.g., a learning algorithm is an Estimator which trains on a dataset and produces a model.
- An Estimator implements a method fit() which accepts a DataFrame and produces a Transformer.
- A learning algorithm such as Logistic Regression is an Estimator, and calling fit() trains a Logistic Regression Model.
- The Logistic regression model data that is produced by the logistic regression algorithm is considered a Transformer.
- Pipeline (all pipelines are estimators)
- A Pipeline is specified as a sequence of stages, and each stage of a pipeline is either a Transformer or an Estimator.
- Param
- All Transformers and Estimators now share a common API for specifying parameters.
- Each instance of a Transformer or Estimator has a unique ID, which is useful in specifying parameters
- Stages are run in order, and the input dataset is modified as it passes through each stage
- For Estimator stages, the fit() method is called to produce a Transformer (which becomes part of the PipelineModel, or fitted Pipeline), and that Transformer’s transform() method is called on the dataset.
- Example of a pipeline
- Stages of Pipeline
- Stage 1
- The Pipeline.fit() method is called on the original dataset which has raw text documents and labels
- The Tokenizer.transform() method splits the raw text documents into words, adding a new column with words into the dataset.
- Stage 2
- The HashingTF.transform() method converts the words column into feature vectors, adding a new column with those vectors to the dataset.
- Stage 3
- LogisticRegression is an Estimator, so the Pipeline calls LogisticRegression.fit() to produce a LogisticRegressionModel.
- A Pipeline’s stages are specified as an ordered array
- Pipelines and PipelineModels help to ensure that training and test data go through identical feature processing steps.
- Transformer
- A Transformer is an algorithm which can transform one DataFrame into another DataFrame. E.g., an ML model is a Transformer which transforms an RDD with features into an RDD with predictions. A
- Transformer implements a method transform() which converts one DataFrame into another, generally by appending one or more columns
- Example functionality
- A learning model might take a dataset, read the column containing feature vectors, predict the label for each feature vector, append the labels as a new column, and output the updated dataset
- A feature transformer might take a dataset, read a column (e.g., text), convert it into a new column (e.g., feature vectors), append the new column to the dataset, and output the updated dataset.
- PipelineModel (all pipelineModels are transformers)
- For Transformer stages, the transform() method is called on the dataset
- Example of pipelineModel
- Stages are run in order, and the input dataset is modified as it passes through each stage
- The PipelineModel has the same number of stages as the original Pipeline, but all Estimators in the original Pipeline have become Transformers.
- Pipelines and PipelineModels help to ensure that training and test data go through identical feature processing steps.
-
Parameters
- A Param is a named parameter with self-contained documentation. A ParamMap is a set of (parameter, value) pairs.
- 1. Set parameters for an instance. E.g., if lr is an instance of LogisticRegression, one could call lr.setMaxIter(10) to make lr.fit() use at most 10 iterations.
- 2. Pass a ParamMap to fit() or transform(). Any parameters in the ParamMap will override parameters previously specified via setter methods.
- Two main ways to pass parameters to an algorithm
-
Examples
-
General
- Examp_Estimator_Transformer_Param.scala
- Example_Pipeline.scala
- Algorithm Guides
- Spark EC2