# Exercise Session 8 - Old Exam Questions

## Spark

##### For more information on Spark (e.g. for creation functions), use ``help``:

In [None]:
import json

from pyspark import SparkContext, RDD
from jsoniq import RumbleSession

spark = RumbleSession \
    .builder \
    .master("local[*]") \
    .getOrCreate()

sc = spark.sparkContext

In [None]:
help(sc.textFile)

### Movies Dataset

We will use a dataset that contains an array of movies with respective genres, actors, etc. Each movie has all of the following attributes:

- `name`: string containing the name of the movie.
- `genres`: string containing comma-delimited ganres of the movie.
- `year`: integer year of the movie
- `votes`: integer with number of votes of the movie
- `rating`: string containing the rating of the movie
- `actors`: an array representing a set of actors, where each element is a dictionary with the following key-value pairs:
    - `name`: string with the name of the actor
    - `birth`: integer with year of birth of the actor
    - `death`: integer with the year of death of the actor

You can use the RDD interface or the DataFrame interface below.

### RDD interface

Run the code below to import the JSON lines file as an RDD:

In [None]:
movies:RDD = sc.textFile("movies.jsonl", 10).map(json.loads)

Inspect the dataset by printing the first row:

In [None]:
movies.take(1)

Below are some examples of Spark queries on the Movies dataset.

In [None]:
def helper(s):
    return s['name']

movies.map(lambda s: s["name"]).take(3)

In [None]:
movies.filter(lambda s: s["name"][0] == "K").take(1)

In [None]:
movies.flatMap(lambda s: s["actors"]).take(3)

In [None]:
movies.count()

In [None]:
movies_kv = movies.map(lambda v: (v['name'], float(v['rating'])))
movies_num = movies.map(lambda v: float(v['rating']))

# Common transformations
movies.filter(lambda v: int(v['year']) > 1960) # f: v -> bool
movies.map(lambda v: float(v['rating'])) # f: v -> v'
movies.flatMap(lambda v: v['actors']) # f: v -> [v', v', ...]
movies.distinct()
movies.sortBy(lambda v: int(v['year'])) # f: v -> Comparable

# Common kv-transformations
movies_kv.reduceByKey(lambda a, b: a + b)  # Commutative and associative
movies_kv.sortByKey(ascending=False)
movies_kv.groupByKey()
movies_kv.mapValues(lambda v: v + 1) # f: v -> v'

# Common actions
movies_kv.collect()
movies_kv.count()
movies_kv.take(5)
movies_num.takeOrdered(2) # Min 2 elements
movies_num.top(2) # Max 2 elements
movies_num.count()
movies_num.sum()
movies_num.min()
movies_num.mean()
movies_num.reduce(lambda a, b: a + b) # Commutative and associative

# Common kv-actions
movies_kv.countByKey()
movies_kv.lookup("American Pie")

##### For more information on RDD transformers (e.g. ``filter``), use ``help``:

In [None]:
# help(movies.filter)
movies.filter?

In [None]:
movies.take(1)

Now it's your turn: you can write all your queries in new cells below. Feel free to add as many cells as needed.

In [None]:
# Snooze Copilot. ;)
# What is the number of movies that were released after the year 1960


In [None]:
# What is the average rating of all movies in the dataset?


In [None]:
# What is the year of death of the actor who was born in the earliest year?


In [None]:
# What is the total number of votes for all the movies with more than 200 votes?


### Solutions

In [None]:
movies.take(1)

In [None]:
# What is the number of movies that were released after the year 1960
movies.filter(lambda v: int(v['year']) > 1960).count()

In [None]:
# What is the average rating of all movies in the dataset?
movies.map(lambda v: float(v['rating'])).mean()

In [None]:
# What is the year of death of the actor who was born in the earliest year?
v1 = (movies
    .flatMap(lambda v: v['actors'])
    .map(lambda v: (int(v['birth']), (int(v['death']), v['name'])))
    .distinct()
    .sortByKey()
    .take(3)
)
print(v1)
v2 = (movies
    .flatMap(lambda v: v['actors'])
    .min(lambda v: int(v['birth']))
)
print(v2)


In [None]:
# What is the total number of votes for all the movies with more than 200 votes?
v1 = movies.filter(lambda v: int(v['votes']) > 200).map(lambda v: int(v['votes'])).sum()
print(v1)
# Readability++
v2 = movies.map(lambda v: int(v['votes'])).filter(lambda v: v > 200).sum()
print(v2)
# For the jokes
v3 = movies.flatMap(lambda v: [int(v['votes'])] if (int(v['votes']) > 200) else []).sum()
print(v3)