# Exercise 5 - Spark in Scala _[4 points]_

In this exercise you have to solve the tasks given below. 


## a) Elementary RDD functions 

(Brushing up on the basics of functional programming Scala)




####  You are given a list of the first 20 numbers of the Fibbonacci numbers. 

In [None]:
val fibs20 = sc.parallelize(List( 0, 1, 1, 2, 3, 5, 8, 13, 21, 34, 55, 89, 144, 233, 377, 610, 987, 1597, 2584, 4181))  

####  Produce a list that contains all even number from the list 'fibs20':

In [None]:
val evenFibs20 = fibs20.filter(x => (x % 2 == 0)).collect()

####  Compute the average value of the list 'fibs20':

In [None]:
val avg1map = fibs20.map(x => (x, 1))
val avg1fold = avg1map.fold (0,0) ((x,y) => (x._1 + y._1, x._2 + y._2))
val avg1 = avg1fold._1 / avg1fold._2.toFloat

####  Produce a list of that shows for each element of the list 'fibs20' its absolute difference from the average:

In [None]:
val avgDiff = fibs20.map(x => (x - avg1).abs).collect()

####  You are given a random list of words

In [None]:
val words = sc.parallelize(List("automaton", "language", "logic","closure"))

####  Furthermore, we define a function that maps a word to its list of permutations

In [None]:
def permutate (word:String) = word.permutations.toList

####  Produce a single list containing all permutations of elements from the list 'words':

In [None]:
val wordlist = words.map(word => permutate(word)).flatMap(x => x).collect()

## b) From SQL to Dataframe (and back again)

#### Find for each of the Spark SQL queries an equivalent one that only uses the Dataframe API (or vice versa)


In [None]:
val dataPath = "file:///home/adbs/2019S/shared/diamonds.csv"
val diamonds = spark.read.format("csv")
  .option("header","true")
  .option("inferSchema", "true")
  .load(dataPath)
diamonds.createOrReplaceTempView("diamonds")

val articlesDF = spark.read.format("json").load("file:///home/adbs/2019S/shared/spark/nytarticles")
val commentsDF = spark.read.json("file:///home/adbs/2019S/shared/spark/nytcomments")
articlesDF.createOrReplaceTempView("articles")
commentsDF.createOrReplaceTempView("comments")
// Create RDD view into dataset
val articlesRDD = articlesDF.rdd
val commentsRDD = commentsDF.rdd

#### Query 1: Transform the given Spark SQL query into the Dataframe API

In [None]:
val query1 = spark.sql("SELECT COUNT(*) FROM articles WHERE sectionName='Politics'")
query1.show()
query1.explain()

In [None]:
//val query1df = query1.toDF()
//articlesDF.where("sectionName='Politics'").count()
articlesDF.where(articlesDF.col("sectionName") === "Politics").count()

#### Query 2: Transform the given Dataframe API query into Spark SQL

In [None]:
val query2 = articlesDF.groupBy("sectionName").count()
query2.show(false)
query2.explain()

In [None]:
val query2sql = spark.sql("SELECT sectionName,COUNT(*) FROM articles GROUP BY sectionName")
query2sql.show(false)
query2sql.explain()

#### Query 3: Transform the given Spark SQL query into the Dataframe API

In [None]:
val query3  = spark.sql(
    "SELECT a.headline, COUNT(c.commentID) AS numComments FROM articles a, comments c WHERE a.articleID = c.articleID GROUP BY a.headline" )
query3.show(false) // 'false' turns of truncation of row entries
query3.explain()

In [None]:
val query3df = articlesDF.crossJoin(commentsDF).filter(articlesDF.col("articleID") === commentsDF.col("articleID")).groupBy(articlesDF.col("headline")).agg(count("headline")

#### Query 4: Transform the given Spark SQL query into the Dataframe API

In [None]:
val query4 = spark.sql(" SELECT headline, byline, pubDate FROM articles WHERE headline RLIKE \"2016\" ")
query4.show(false)
query4.explain()

In [None]:
val query4df = articlesDF.filter(articlesDF.col("headline").rlike("2016")).select(articlesDF.col("headline"), articlesDF.col("byline"), articlesDF.col("pubDate"))

#### Query 5: Transform the given Dataframe API query into Spark SQL

In [None]:
val query5 = articlesDF
      .join(commentsDF, articlesDF("articleID") === commentsDF("articleID"))
      .select(explode(articlesDF("keywords")).as("singleKeyWords"))
      .groupBy("singleKeyWords")
      .agg(count("singleKeyWords").as("number"))
      .orderBy(desc("number"))
query5.show(false)
query5.explain()

Note here that "explode" is a Spark SQL function that turns a tuple with column that contains a collection of objects into multiple tuples each with a single value from this collection. More information here: https://spark.apache.org/docs/2.3.0/api/sql/index.html#explode

In [None]:
val query5sql = spark.sql("SELECT singleKeyWords, COUNT(*) AS number FROM (SELECT EXPLODE(keywords) AS singleKeyWords FROM articles JOIN comments ON articles.articleID = comments.articleID) GROUP BY singleKeyWords ORDER BY number DESC")

### For All Queries Above: 
#### Analyze the plans (.explain() ) and compare performance (using the Spark Web UI). Try to reason about any major differences in the logical plans (if there are any).

## c) Wide and Narrow Dependencies

#### Look at the Dataframe queries given as part of b) or for which you wrote the Dataframe version.

#### Use the Spark Internal Web UI to analyse the dependencies and stages of the queries, and try to determine which commands on which Dataframes are executed as wide dependencies and which as narrow dependencies. 
