Common RDD operations

Follow this section on Jupyter notebook of RDD operations

Element-wise transformations

map()

The map() transformation takes in a function and applies it to each element in the RDD with the result of the function being the new value of each element in the resulting RDD.

# Using map transformation
nums = sc.parallelize([1, 2, 3, 4])
squared = nums.map(lambda x: x * x).collect()
for num in squared:
    print(num)

output

1
4
9
16

Info It is useful to note that map()’s return type does not have to be the same as its input type. For example, map() can operate on an RDD of strings and return an RDD of doubles.

flatMap()

Sometimes we want to produce multiple output elements for each input element. The operation to do this is called flatMap(). Like map(), flatMap() operates on each element, but returns an iterator with the return values.

# flatMap() in Python, splitting lines into words
lines = sc.parallelize(["hello world", "hi"])
words = lines.flatMap(lambda line: line.split(" "))
words.first() # returns "hello"
# Print flatmap() output
print(words.take(words.count()))

flatMap() “flattens” the iterators returned to it, so that instead of ending up with an RDD of lists we have an RDD of the elements in those lists.

# flatMap() components
['hello', 'world', 'hi']
# map() components
[['hello', 'world'], ['hi']]

Basic RDD transformations on an RDD containing {1, 2, 3, 3}

Two-RDD transformations on RDDs containing {1, 2, 3} and {3, 4, 5}

Actions

Basic actions on an RDD containing {1, 2, 3, 3}

Refer to Spark v2.1.0 docs for more RDD operators in pyspark.

Download Jupyter notebook of RDD operations

aggregate

The aggregate action does not require the return type to be the same type as the RDD. Like with fold, we supply an initial zero value of the type we want to return. Then we provide two functions. The first one is used to combine the elements from our RDD with the accumulator. The second function is needed to merge two accumulators.

.aggregate(initial_value, combine_value_with_accumulator, combine_accumulators)

Consider an RDD of durations and we would like to compute the total duration, and the number of elements in the RDD (.count()).

# Create an RDD of durations and is paritioned into 2.
duration = sc.parallelize([0, 0.1, 0.2, 0.4, 0.], 2)

In partition #1 we have [0, 0.1, 0.2] and partition #2 has [0.4, 0]. To compute the total duration and number of elements in duration, we can use the aggregate function. Alternatively, to get the total duration we could do a .reduce(add) action and the number of elements could be obtained using .count(). However, this requires iterating through the RDD twice. To obtain the (sum, count) equivalent of (.reduce(add), .count()), we use the .aggregate() method.

The aggregate method would look something like this:

# .aggregate( inital_value, combine_element_accumulator, combine_accumulators)
sum_count = duration.aggregate(
    (0, 0) #initial values (sum, count),
    (lambda acc, value: (acc[0] + value, acc[1] + 1)), # combine value with acc))
    (lambda acc1, acc2: (acc1[0] + acc2[0], acc1[1] + acc2[1])) # combine accumulators
)

In partition 1, as the aggregate function iterates through each element in the task, and encounters the first element 0, the initial value of the accumulator is (0, 0) and runs the combine value with acc (first function) in the aggregate():

(lambda acc, value: (acc[0] + value, acc[1] + 1)), # combine value with acc))
# The acc is the initial value (0, 0) and value is 0 (first element)
# (lambda (0, 0), 0: (0 + 0, 0 + 1)) => (0, 1)
# for the second element 0.1
# (lambda (0, 1), 0.1: (0 + 0.1, 1 + 1)) => (0.1, 2)
# for the third element 0.2
# (lambda (0.1, 2), 0.1: (0.1 + 0.2, 2 + 1)) => (0.3, 3)

Similarly in partition #2, which has (0.4, 0), yields an accumulated values of (sum, count) as (0.4, 2). This is followed by combine accumulators function.

(lambda acc1, acc2: (acc1[0] + acc2[0], acc1[1] + acc2[1])) # combine accumulators
# this is equivalent to combining acc1 from partition 1 and acc2 from part 2.
# (lambda (0.3, 3), (0.4, 2): (0.3 + 0.4, 3 + 2)) # combine accumulators => (0.7, 5)

Thus the combine accumulators yield a value of (sum, count) as (0.7, 5). The use .aggregate(...) means the RDD is iterated through only once, which improves the efficiency.

Last updated