Spark Core -

RDD usage examples, org.apache.spark.rdd.RDD

Key-Value RDD, org.apache.spark.rdd.PairRDDFunctions

Ordered RDD, org.apache.spark.rdd.OrderedRDDFunctions

Transformation

map : Return a new RDD by applying a function to all elements of this RDD.

def map[U](f: (T)  U)(implicit arg0: ClassTag[U]): RDD[U]

scala> val rdd1 = sc.parallelize(List(("a", 1), ("b", 2), ("c", 3)))
rddx: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[102] at map at <console>:16
scala> val rddx = rdd1.map( e => s"${e._1}-${e._2}" )
scala> rddx.foreach(println)
b-2
a-1
c-3

filter : Return a new RDD containing only the elements that satisfy a predicate.

def filter(f: (T)  Boolean): RDD[T]

scala> val rddx = rdd1.filter { e => e._2 >= 2 }
(c,3)
(b,2)

flatMap : Return a new RDD by first applying a function to all elements of this RDD, and then flattening the results.

def flatMap[U](f: (T)  TraversableOnce[U])(implicit arg0: ClassTag[U]): RDD[U]
scala> val rdd1 = sc.parallelize("a,b,c" :: "x,y" :: Nil)
scala> val rddx = rdd1.flatMap(e => e.split(","))
scala> rddx.foreach(println)
a
x
y
b
c

mapPartitions : Return a new RDD by applying a function to each partition of this RDD.

def mapPartitions[U]
    (f: (Iterator[T])  Iterator[U], preservesPartitioning: Boolean = false)
    (implicit arg0: ClassTag[U]): RDD[U]

scala> val rddx = rdd1 mapPartitions { it => it.map { e => Map(e -> e.length) } }
rddx: org.apache.spark.rdd.RDD[scala.collection.immutable.Map[String,Int]] = MapPartitionsRDD[108] at mapPartitions at <console>:16
scala> rddx.foreach(println)
Map(a,b,c -> 5)
Map(x,y -> 3)

union : Return the union of this RDD and another one.

intersection : Return the intersection of this RDD and another one.

subtract : Return an RDD with the elements from this that are not in other.

distinct : Return a new RDD containing the distinct elements in this RDD.

def union(other: RDD[T]): RDD[T]
def intersection(other: RDD[T]): RDD[T]
def subtract(other: RDD[T]): RDD[T]
def distinct(): RDD[T]

scala> val rdd1 = sc.parallelize(1 :: 2 :: 3 :: Nil)
scala> val rdd2 = sc.parallelize(3 :: 3 :: 4 :: 5 :: Nil)
scala> (rdd1 union rdd2).foreach(print)
1233345
scala> (rdd1 intersection rdd2).foreach(print)
3
scala> (rdd2 subtract rdd1).foreach(print)
45
scala> rdd2.distinct.foreach(print)
453
method description
cartesian Return the Cartesian product of this RDD and another one, that is, the RDD of all pairs of elements (a, b) where a is in this and b is in other.
zip Zips this RDD with another one, returning key-value pairs with the first element in each RDD, second element in each RDD, etc.
zipWithIndex Zips this RDD with its element indices.
def cartesian[U](other: RDD[U])(implicit arg0: ClassTag[U]): RDD[(T, U)]
def zip[U](other: RDD[U])(implicit arg0: ClassTag[U]): RDD[(T, U)]
def zipWithIndex(): RDD[(T, Long)]

scala> val rdd1 = sc.parallelize('a' :: 'b' :: 'c' :: Nil)
scala> val rdd2 = sc.parallelize(1 :: 2 :: 3 :: Nil)

scala> rdd1.cartesian(rdd2).foreach(print)
(a,3)(a,1)(a,2)(b,1)(b,2)(b,3)(c,1)(c,2)(c,3)

scala> rdd1.zip(rdd2).foreach(print)
// prerequisite : rdd1.count == rdd2.count
(a,1)(b,2)(c,3)

scala> rdd1.zipWithIndex.foreach(print)
(a,0)(b,1)(c,2)
method description
groupBy (may shuffle !!!) Return an RDD of grouped items.
keyBy Creates tuples of the elements in this RDD by applying f.
sortBy Return this RDD sorted by the given key function.
def groupBy[K](f: (T)  K)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])]
def keyBy[K](f: (T)  K): RDD[(K, T)]
def sortBy[K](f: (T)  K, ascending: Boolean = true, numPartitions: Int = this.partitions.length)(implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T]

scala> val rdd1 = sc.parallelize( ('a',1) :: ('b',2) :: ('c',3) :: ('c',4) :: Nil )
scala> rdd1.groupBy(e => e._1).foreach(println)
(b,CompactBuffer((b,2)))
(a,CompactBuffer((a,1)))
(c,CompactBuffer((c,3), (c,4)))

scala> rdd1.keyBy(e => e._1).foreach(println)
(b,(b,2))
(c,(c,3))
(c,(c,4))
(a,(a,1))

scala> rdd1.sortBy(e => e._2, false).collect.foreach(println)
(c,4)
(c,3)
(b,2)
(a,1)

coalesce : Return a new RDD that is reduced into numPartitions partitions.

repartition : Return a new RDD that has exactly numPartitions partitions.

repartitionAndSortWithinPartitions : Repartition the RDD according to the given partitioner and, within each resulting partition, sort records by their keys.

ordered RDD doc

def coalesce(numPartitions: Int, shuffle: Boolean = false)(implicit ord: Ordering[T] = null): RDD[T]
def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T]
def repartitionAndSortWithinPartitions(partitioner: Partitioner): RDD[(K, V)]

scala> val rdd1 = sc.parallelize( for (i <- 1 to 100; j <- 'a' to 'j') yield (i, j) )
scala> rdd1.count
res89: Long = 1000
scala> rdd1.partitions.size
res90: Int = 4

scala> val rddx = rdd1.coalesce(2)
rddx: org.apache.spark.rdd.RDD[(Int, Char)] = CoalescedRDD[185] at coalesce at <console>:16
scala> rddx.partitions.size
res91: Int = 2

scala> val rddx = rdd1.repartition(8)
rddx: org.apache.spark.rdd.RDD[(Int, Char)] = MapPartitionsRDD[189] at repartition at <console>:16
scala> rddx.partitions.size
res92: Int = 8

Transformation : Key-Value Pairs

org.apache.spark.rdd.PairRDDFunctions

keys : Return an RDD with the keys of each tuple.

values : Return an RDD with the values of each tuple.

mapValues : Pass each value in the key-value pair RDD through a map function without changing the keys; this also retains the original RDD’s partitioning.

def keys: RDD[K]
def values: RDD[V]
def mapValues[U](f: (V)  U): RDD[(K, U)]

scala> val rdd1 = sc.parallelize( ('a',1) :: ('b',2) :: ('c',3) :: Nil )
scala> rdd1.keys.foreach(print)
abc
scala> rdd1.values.foreach(print)
132
scala> rdd1.mapValues(v => v*v).foreach(print)
(a,1)(b,4)(c,9)

join : Return an RDD containing all pairs of elements with matching keys in this and other.

leftOuterJoin : Perform a left outer join of this and other.

rightOuterJoin :

fullOuterJoin : Perform a full outer join of this and other.

subtractByKey : Return an RDD with the pairs from this whose keys are not in other.

def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))]
def leftOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (V, Option[W]))]
def fullOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (Option[V], Option[W]))]
def subtractByKey[W](other: RDD[(K, W)])(implicit arg0: ClassTag[W]): RDD[(K, V)]

scala> val rdd1 = sc.parallelize( ('a',1) :: ('b',2) :: ('c',3) :: Nil )
scala> val rdd2 = sc.parallelize( ('b',200.1d) :: ('c',300.1d) :: ('d',400.1d) :: Nil )
scala> rdd1.join(rdd2).foreach(println)
(b,(2,200.1))
(c,(3,300.1))

scala> rdd1.leftOuterJoin(rdd2).foreach(println)
(a,(1,None))
(b,(2,Some(200.1)))
(c,(3,Some(300.1)))

scala> rdd1.rightOuterJoin(rdd2).foreach(println)
(c,(Some(3),300.1))
(d,(None,400.1))
(b,(Some(2),200.1))

scala> rdd1.fullOuterJoin(rdd2).foreach(println)
(a,(Some(1),None))
(c,(Some(3),Some(300.1)))
(b,(Some(2),Some(200.1)))
(d,(None,Some(400.1)))

scala> rdd1.subtractByKey(rdd2).foreach(println)
(a,1)

groupByKey (May SHUFFLE !!!): Group the values for each key in the RDD into a single sequence.

reduceByKey : Merge the values for each key using an associative reduce function.

aggregateByKey : Aggregate the values of each key, using given combine functions and a neutral “zero value”. This function can return a different result type, U, than the type of the values in this RDD, V. Thus, we need one operation for merging a V into a U and one operation for merging two U’s, as in scala.TraversableOnce. The former operation is used for merging values within a partition, and the latter is used for merging values between partitions. To avoid memory allocation, both of these functions are allowed to modify and return their first argument instead of creating a new U.

cogroup : For each key k in this or other1 or other2, return a resulting RDD that contains a tuple with the list of values for that key in this, other1 and other2.

def groupByKey(): RDD[(K, Iterable[V])]   // MAY SHUFFLE !!!
def reduceByKey(func: (V, V)  V): RDD[(K, V)]
def aggregateByKey[U](zeroValue: U)(seqOp: (U, V)  U, combOp: (U, U)  U)(implicit arg0: ClassTag[U]): RDD[(K, U)]
def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)]): RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))]

scala> val rdd1 = sc.parallelize( ('a',1) :: ('b',21) :: ('b',22) :: Nil )
scala> rdd1.groupByKey.foreach(println)
(a,CompactBuffer(1))
(b,CompactBuffer(21, 22))

scala> rdd1.reduceByKey( (v1, v2) => v1 + v2 ).foreach(println)
(a,1)
(b,43)

scala> val zero = mutable.Set[String]()
scala> val rdd1 = sc.parallelize( 1->"apple" :: 2->"pie" :: 1->"banana" :: 1->"apple" :: 2->"pie" :: Nil )
scala> rdd1.aggregateByKey(zero)( (set, v) => set += v, (set1, set2) => set1 ++= set2 ).foreach(println)
(1,Set(banana, apple))
(2,Set(pie))

scala> val rdd1 = sc.parallelize( 1->'a' :: 2->'b' :: 3->'c' :: Nil )
scala> val rdd2 = sc.parallelize( 1->.01d :: 1->.11d :: 2->.02d :: Nil )
scala> rdd1.cogroup(rdd2).foreach(println)
(3,(CompactBuffer(c),CompactBuffer()))
(1,(CompactBuffer(a),CompactBuffer(0.01, 0.11)))
(2,(CompactBuffer(b),CompactBuffer(0.02)))

Action

count : Return the number of elements in the RDD.

countByValue : Return the count of each unique value in this RDD as a local map of (value, count) pairs.

first : Return the first element in this RDD.

max : Returns the max of this RDD as defined by the implicit Ordering[T].

min : Returns the min of this RDD as defined by the implicit Ordering[T].

def count(): Long
def countByValue()(implicit ord: Ordering[T] = null): Map[T, Long]
def first(): T
def max()(implicit ord: Ordering[T]): T
def min()(implicit ord: Ordering[T]): T

scala> val rdd1 = sc.parallelize(7 :: -3 :: 3 :: 4 :: Nil)

scala> rdd1.count
res107: Long = 4
scala> rdd1.countByValue
res108: scala.collection.Map[Int,Long] = Map(4 -> 1, -3 -> 1, 7 -> 1, 3 -> 1)
scala> rdd1.first
res109: Int = 7
scala> rdd1.min
res110: Int = -3
scala> rdd1.max
res111: Int = 7
method description
take Take the first num elements of the RDD.
takeOrdered Returns the first k (smallest) elements from this RDD as defined by the specified implicit Ordering[T] and maintains the ordering.
top Returns the top k (largest) elements from this RDD as defined by the specified implicit Ordering[T] and maintains the ordering.
fold Aggregate the elements of each partition, and then the results for all the partitions, using a given associative and commutative function and a neutral “zero value”.
reduce Reduces the elements of this RDD using the specified commutative and associative binary operator.

treeReduce : Reduces the elements of this RDD in a multi-level tree pattern.

treeAggregate : Aggregates the elements of this RDD in a multi-level tree pattern.

def take(num: Int): Array[T]
def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T]
def top(num: Int)(implicit ord: Ordering[T]): Array[T]
def fold(zeroValue: T)(op: (T, T)  T): T
def reduce(f: (T, T)  T): T
def treeReduce(f: (T, T)  T, depth: Int = 2): T
def treeAggregate[U](zeroValue: U)(seqOp: (U, T)  U, combOp: (U, U)  U, depth: Int = 2)(implicit arg0: ClassTag[U]): U

scala> val rdd1 = sc.parallelize(7 :: -3 :: 3 :: 9 :: 4 :: Nil)
scala> rdd1.take(2)
res112: Array[Int] = Array(7, -3)
scala> rdd1.takeOrdered(2)
res113: Array[Int] = Array(-3, 3)
scala> rdd1.top(2)
res114: Array[Int] = Array(9, 7)
scala> rdd1.fold(0)((sum, e) => sum + e)
res115: Int = 20
scala> rdd1.fold(1)((prod, e) => prod * e)
res116: Int = -2268
scala> rdd1.reduce((e1, e2) => e1 + e2)
res118: Int = 20
scala> rdd1.treeReduce(_ + _)
res70: Int = 20

Actions of Key-Value Pairs

countByKey : Count the number of elements for each key, collecting the results to a local Map.

lookup : Return the list of values in the RDD for key key.

def countByKey(): Map[K, Long]
def lookup(key: K): Seq[V]

scala> val rdd1 = sc.parallelize( ('a',1) :: ('b',2) :: ('c',3) :: ('c',4) :: Nil )
scala> rdd1.countByKey
res119: scala.collection.Map[Char,Long] = Map(a -> 1, b -> 1, c -> 2)
scala> rdd1.lookup('c')
res120: Seq[Int] = WrappedArray(3, 4)

Actions of Numeric Types

org.apache.spark.rdd.DoubleRDDFunctions

mean : Compute the mean of this RDD’s elements.

stddev : Compute the standard deviation of this RDD’s elements.

sum : Add up the elements in this RDD.

variance : Compute the variance of this RDD’s elements.

def mean(): Double
def stdev(): Double
def sum(): Double
def variance(): Double

scala> val rdd1 = sc.parallelize( 1 :: 3 :: 5 :: 7 :: 9 :: Nil )
scala> rdd1.mean
res125: Double = 5.0
scala> rdd1.stdev
res126: Double = 2.8284271247461903
scala> rdd1.sum
res127: Double = 25.0
scala> rdd1.variance
res128: Double = 8.0

Caching

cache : Persist this RDD with the default storage level (MEMORY_ONLY).

persist : Set this RDD’s storage level to persist its values across operations after the first time it is computed. This can only be used to assign a new storage level if the RDD does not have a storage level set yet. Local checkpointing is an exception.

unpersist : Mark the RDD as non-persistent, and remove all blocks for it from memory and disk.

def cache(): RDD.this.type
def persist(newLevel: StorageLevel): RDD.this.type
def unpersist(blocking: Boolean = true): RDD.this.type

scala> import org.apache.spark.storage.StorageLevel
scala> rdd1.persist(StorageLevel.MEMORY_ONLY)

Shared Variable

broadcast : Broadcast a read-only variable to the cluster, returning a org.apache.spark.broadcast.Broadcast object for reading it in distributed functions.

def broadcast[T](value: T)(implicit arg0: ClassTag[T]): Broadcast[T]

case class Car(id: Long, brandId: Int, ownerId: Int)
case class CarDetail(id: Long, brand: String, owner: String)

val ownerMap = Map(1-> "Author", 2-> "Peter", 3-> "Micky")
val brandMap = Map(11 -> "BMW", 22-> "Toyota")
val bcOwners = sc.broadcast(ownerMap)
val bcBrands = sc.broadcast(brandMap)
var cars = Car(1, 11, 1) :: Car(2, 22, 2) :: Car(3, 11, 3) :: Nil
val carsRdd = sc.parallelize(cars)
var detailsRdd = carsRdd.map{ c => CarDetail(c.id, bcBrands.value(c.brandId), bcOwners.value(c.ownerId)) }

scala> detailsRdd.foreach(println)
CarDetail(1,BMW,Author)
CarDetail(2,Toyota,Peter)
CarDetail(3,BMW,Micky)

accumulator : Create an org.apache.spark.Accumulator variable of a given type, with a name for display in the Spark UI.

def accumulator[T](initialValue: T, name: String)(implicit param: AccumulatorParam[T]): Accumulator[T]