1. Spark General
    1. Resilient Distributed Datasets
      1. Fault-tolerant collection of elements that can be operated on in parallel
      2. Two ways to create RDDs
        1. 1. Parallelizing an existing collection in your driver program
          1. SparkContext’s parallelize method applied on an existing collection in your driver program
          2. Copied to form a distributed dataset that can be operated on in parallel
          3. Important parameter for parallel collections is the number of partitions to cut the dataset into
          4. Typically you want 2-4 partitions for each CPU in your cluster
          5. set it manually by passing it as a second parameter to parallelize (e.g. sc.parallelize(data, 10))
        2. 2. Referencing a dataset in an external storage system
          1. local file system, HDFS, Cassandra, HBase, Amazon S3, etc
          2. Spark supports text files, SequenceFiles, and any other Hadoop InputFormat
          3. Text file RDDs can be created using SparkContext’s textFile method
          4. Notes on reading files
          5. If using a path on the local filesystem, the file must also be accessible at the same path on worker nodes
          6. support running on directories, compressed files, and wildcards
          7. example: you can use textFile("/my/directory"), textFile("/my/directory/*.txt"), and textFile("/my/directory/*.gz").
          8. SparkContext.wholeTextFiles lets you read a directory containing multiple small text files, and returns each of them as (filename, content) pairs
          9. For SequenceFiles, use SparkContext’s sequenceFile[K, V] method where K and V are the types of key and values in the file
          10. Other Hadoop InputFormats: you can use the SparkContext.hadoopRDD method, which takes an arbitrary JobConf and input format class, key class and value class. Set these the same way you would for a Hadoop job with your input source.
          11. RDD.saveAsObjectFile and SparkContext.objectFile support saving an RDD in a simple format consisting of serialized Java objects.
      3. RDD Operations
        1. transformations
          1. Creates a new dataset from an existing one
          2. Example: map is a transformation that passes each dataset element through a function and returns a new RDD representing the results
          3. Each transformed RDD is recomputed each time you run an action on it unless the RDD is made persistant
          4. Common Transformations
          5. Other Common Transformations
        2. actions
          1. Return a value to the driver program after running a computation on the dataset
          2. Example: reduce is an action that aggregates all the elements of the RDD using some function and returns the final result to the driver program
          3. Common Actions
        3. Passing Functions to Spark
          1. Two ways to do this:
          2. 1. Passing anonymous functions (scala)
          3. 2. Static methods in a global singleton object
          4. Example: You can define object MyFunctions and then pass MyFunctions.func1, as is shown above
          5. While it is also possible to pass a reference to a method in a class instance (as opposed to a singleton object), this requires sending the object that contains that class along with the method.
          6. To avoid this issue, the simplest way is to copy field into a local variable instead of accessing it externally
          7. Printing elements
          8. To print all elements on the driver, one can use the collect() method to first bring the RDD to the driver node
          9. WRONG WAY:
          10. rdd.foreach(println)
          11. rdd.map(println)
          12. stdout being called by the executors will write to the executor’s stdout instead of the one on the driver
        4. Working with Key-Value Pairs
          1. A few special operations are only available on RDDs of key-value pairs
          2. The key-value pair operations are available in the PairRDDFunctions class
        5. Shuffle operations
          1. Shuffle is Spark’s mechanism for re-distributing data so that it’s grouped differently across partitions
          2. Involves copying data across executors and machines, making the shuffle a complex and costly operation - performance can be reduced
          3. Can consume significant amounts of heap memory
          4. When data does not fit in memory Spark will spill these tables to disk
          5. Results in additional overhead of disk I/O
          6. Results in increased garbage collection
          7. Generates a large number of intermediate files on disk
          8. Files are not cleaned up from Spark’s temporary storage until Spark is stopped
          9. Operations which can cause a shuffle
          10. repartition operations like repartition and coalesce
          11. ‘ByKey' operations (except for counting) like groupByKey and reduceByKey
          12. join operations like cogroup and join
          13. Predictably ordered data following shuffle (increasing efficiency of data operations) can be achieved using:
          14. mapPartitions to sort each partition using, for example, .sorted
          15. repartitionAndSortWithinPartitions to efficiently sort partitions while simultaneously repartitioning
          16. sortBy to make a globally ordered RDD
          17. Shuffle behavior can be tuned and configured to optimize performance
      4. RDD Persistence
        1. Caching a dataset in memory across operations
          1. Cache is fault-tolerant – if any partition of an RDD is lost, it will automatically be recomputed using the transformations that originally created it
        2. You can mark an RDD to be persisted using the persist() or cache() methods on it.
        3. Each node stores any partitions of "persistent" data that it computes in memory and reuses them in other actions on that dataset
        4. Persisted RDDs can be stored using a specified storage level
          1. Storage Level Options
          2. Storage levels are meant to provide different trade-offs between memory usage and CPU efficiency.
          3. If your RDDs fit comfortably with the default storage level (MEMORY_ONLY), leave them that way. This is the most CPU-efficient option, allowing operations on the RDDs to run as fast as possible.
          4. If RDD does not fit in memory, try using MEMORY_ONLY_SER and selecting a fast serialization library to make the objects much more space-efficient, but still reasonably fast to access.
          5. Don’t spill to disk unless the functions that computed your datasets are expensive, or they filter a large amount of the data. Otherwise, recomputing a partition may be as fast as reading it from disk.
          6. Use the replicated storage levels if you want fast fault recovery
          7. In environments with high amounts of memory or multiple applications, the experimental OFF_HEAP mode has several advantages:
      5. Shared Variables
  2. Spark SQL and DataFrames
    1. Spark SQL is a Spark module for structured data processing
    2. DataFrames
      1. A DataFrame is a distributed collection of data organized into named columns
      2. DataFrames can be constructed from a wide array of sources
        1. structured data files
        2. tables in Hive
        3. external databases
        4. existing RDDs
      3. Two different methods for converting existing RDDs into DataFrames
        1. 1. infer the schema of an RDD that contains specific types of objects (uses reflection)
          1. Scala interface automatically converts an RDD containing case classes into a DataFrame
          2. case class defines the schema of the table
          3. names of the arguments to the case class are read using reflection and become the names of the columns
        2. 2. construct a schema and then apply it to an existing RDD
          1. allows you to construct DataFrames when the columns and their types are not known until runtime
          2. a DataFrame can be created programmatically with three steps
          3. 1. CreateanRDDofRowsfromtheoriginalRDD;
          4. 2. CreatetheschemarepresentedbyaStructTypematchingthestructureofRowsintheRDD
          5. 3. ApplytheschematotheRDDofRowsviacreateDataFramemethodprovidedbySQLContext
      4. Data Sources
        1. The default data source (parquet unless otherwise configured by spark.sql.sources.default) will be used for all operations.
        2. You can also manually specify the data source that will be used along with any extra options that you would like to pass to the data source
        3. Can also use the shorted name (json, parquet, jdbc) for built-in sources
        4. Apache Hive
          1. DataFrames can also be saved as persistent tables using the saveAsTable
          2. saveAsTable will materialize the contents of the dataframe and create a pointer to the data in the HiveMetastore
          3. default saveAsTable will create a “managed table”
          4. location of the data will be controlled by the metastore
          5. data deleted automatically when a table is dropped
          6. Persistent tables will still exist even after your Spark program has restarted, as long as you maintain your connection to the same metastore
          7. A DataFrame for a persistent table can be created by calling the table method on a SQLContext with the name of the table
          8. Users who do not have an existing Hive deployment can still create a HiveContext
          9. When not configured by the hive-site.xml, the context automatically creates metastore_db and warehouse in the current directory
          10. Hive Commands
          11. Common Hive Statements
          12. Hive Data Types
        5. Parquet Files
          1. Support for both reading and writing Parquet files that automatically preserves the schema of the original data.
          2. Partition discovery
          3. able to discover and infer partitioning information automatically
          4. By passing path/to/table to either SQLContext.read.parquet or SQLContext.read.load, Spark SQL will automatically extract the partitioning information from the paths
          5. Schema merging
          6. Users can start with a simple schema, and gradually add more columns to the schema as needed
          7. Configuration
          8. Configuration of Parquet can be done using the setConf method on SQLContext or by running SET key=value commands using SQL.
          9. Parquet Commands
        6. JSON Datasets
          1. Can automatically infer the schema of a JSON dataset and load it as a DataFrame
          2. conversion can be done using SQLContext.read.json() on either an RDD of String, or a JSON file
        7. JDBC To Other Databases
          1. Spark SQL also includes a data source that can read data from other databases using JDBC
        8. Save Modes
          1. Save operations can optionally take a SaveMode, that specifies how to handle existing data if present
          2. WARNING
          3. save modes do not utilize any locking
          4. are not atomic
          5. not safe to have multiple writers attempting to write to the same location
          6. when performing a Overwrite, the data will be deleted before writing out the new data.
          7. Save Commands
        9. Data Types
    3. SQLContext
      1. all functionality in Spark SQL comes from the SQLContext class, or one of its descendants
      2. HiveContext
        1. ability to write queries using the more complete HiveQL parser
        2. access to Hive UDFs
        3. ability to read data from Hive tables
        4. do not need to have an existing Hive setup, and all of the data sources available to a SQLContext are still available
      3. The sql function on a SQLContext enables applications to run SQL queries programmatically and returns the result as a DataFrame
    4. Performance Tuning
      1. Caching Data In Memory
        1. Spark SQL can cache tables using an in-memory columnar format by calling sqlContext.cacheTable("tableName") or dataFrame.cache()
        2. You can call sqlContext.uncacheTable("tableName") to remove the table from memory
        3. Caching Commands
      2. Common Tuning Commands
    5. Examples
  3. Spark ML
    1. Why?
      1. Make it easier to combine multiple algorithms into a single pipeline, or workflow
    2. Main Concepts
      1. ML Dataset
        1. Spark ML uses the DataFrame from Spark SQL as a dataset which can hold a variety of data types. E.g., a dataset could have different columns storing text, feature vectors, true labels, and predictions
          1. DataFrame supports many basic and structured types In addition to the types listed in the Spark SQL guide, DataFrame can use ML Vector types.
          2. A DataFrame can be created either implicitly or explicitly from a regular RDD
      2. ML Algorithms
        1. Properties of ML Algorithms
          1. Estimator
          2. An Estimator is an algorithm which can be fit on a DataFrame to produce a Transformer. E.g., a learning algorithm is an Estimator which trains on a dataset and produces a model.
          3. An Estimator implements a method fit() which accepts a DataFrame and produces a Transformer.
          4. A learning algorithm such as Logistic Regression is an Estimator, and calling fit() trains a Logistic Regression Model.
          5. The Logistic regression model data that is produced by the logistic regression algorithm is considered a Transformer.
          6. Pipeline (all pipelines are estimators)
          7. A Pipeline is specified as a sequence of stages, and each stage of a pipeline is either a Transformer or an Estimator.
          8. Param
          9. All Transformers and Estimators now share a common API for specifying parameters.
          10. Each instance of a Transformer or Estimator has a unique ID, which is useful in specifying parameters
          11. Stages are run in order, and the input dataset is modified as it passes through each stage
          12. For Estimator stages, the fit() method is called to produce a Transformer (which becomes part of the PipelineModel, or fitted Pipeline), and that Transformer’s transform() method is called on the dataset.
          13. Example of a pipeline
          14. Stages of Pipeline
          15. Stage 1
          16. The Pipeline.fit() method is called on the original dataset which has raw text documents and labels
          17. The Tokenizer.transform() method splits the raw text documents into words, adding a new column with words into the dataset.
          18. Stage 2
          19. The HashingTF.transform() method converts the words column into feature vectors, adding a new column with those vectors to the dataset.
          20. Stage 3
          21. LogisticRegression is an Estimator, so the Pipeline calls LogisticRegression.fit() to produce a LogisticRegressionModel.
          22. A Pipeline’s stages are specified as an ordered array
          23. Pipelines and PipelineModels help to ensure that training and test data go through identical feature processing steps.
          24. Transformer
          25. A Transformer is an algorithm which can transform one DataFrame into another DataFrame. E.g., an ML model is a Transformer which transforms an RDD with features into an RDD with predictions. A
          26. Transformer implements a method transform() which converts one DataFrame into another, generally by appending one or more columns
          27. Example functionality
          28. A learning model might take a dataset, read the column containing feature vectors, predict the label for each feature vector, append the labels as a new column, and output the updated dataset
          29. A feature transformer might take a dataset, read a column (e.g., text), convert it into a new column (e.g., feature vectors), append the new column to the dataset, and output the updated dataset.
          30. PipelineModel (all pipelineModels are transformers)
          31. For Transformer stages, the transform() method is called on the dataset
          32. Example of pipelineModel
          33. Stages are run in order, and the input dataset is modified as it passes through each stage
          34. The PipelineModel has the same number of stages as the original Pipeline, but all Estimators in the original Pipeline have become Transformers.
          35. Pipelines and PipelineModels help to ensure that training and test data go through identical feature processing steps.
        2. Parameters
          1. A Param is a named parameter with self-contained documentation. A ParamMap is a set of (parameter, value) pairs.
          2. 1. Set parameters for an instance. E.g., if lr is an instance of LogisticRegression, one could call lr.setMaxIter(10) to make lr.fit() use at most 10 iterations.
          3. 2. Pass a ParamMap to fit() or transform(). Any parameters in the ParamMap will override parameters previously specified via setter methods.
          4. Two main ways to pass parameters to an algorithm
    3. Examples
      1. General
        1. Examp_Estimator_Transformer_Param.scala
        2. Example_Pipeline.scala
      2. Algorithm Guides
  4. Spark EC2