]> git.somenet.org - pub/jan/adbs.git/blob - ex2/spark/Exercise5_SparkInScala.ipynb
[ex3.2] Mongodb challenge FTW.
[pub/jan/adbs.git] / ex2 / spark / Exercise5_SparkInScala.ipynb
1 {
2  "cells": [
3   {
4    "cell_type": "markdown",
5    "metadata": {},
6    "source": [
7     "# Exercise 5 - Spark in Scala _[4 points]_\n",
8     "\n",
9     "In this exercise you have to solve the tasks given below. \n"
10    ]
11   },
12   {
13    "cell_type": "markdown",
14    "metadata": {},
15    "source": [
16     "## a) Elementary RDD functions \n",
17     "\n",
18     "(Brushing up on the basics of functional programming Scala)\n",
19     "\n",
20     "\n"
21    ]
22   },
23   {
24    "cell_type": "markdown",
25    "metadata": {},
26    "source": [
27     "####  You are given a list of the first 20 numbers of the Fibbonacci numbers. "
28    ]
29   },
30   {
31    "cell_type": "code",
32    "execution_count": null,
33    "metadata": {},
34    "outputs": [],
35    "source": [
36     "val fibs20 = sc.parallelize(List( 0, 1, 1, 2, 3, 5, 8, 13, 21, 34, 55, 89, 144, 233, 377, 610, 987, 1597, 2584, 4181))  "
37    ]
38   },
39   {
40    "cell_type": "markdown",
41    "metadata": {},
42    "source": [
43     "####  Produce a list that contains all even number from the list 'fibs20':"
44    ]
45   },
46   {
47    "cell_type": "code",
48    "execution_count": null,
49    "metadata": {},
50    "outputs": [],
51    "source": [
52     "val evenFibs20 = fibs20.filter(x => (x % 2 == 0)).collect()"
53    ]
54   },
55   {
56    "cell_type": "markdown",
57    "metadata": {},
58    "source": [
59     "####  Compute the average value of the list 'fibs20':"
60    ]
61   },
62   {
63    "cell_type": "code",
64    "execution_count": null,
65    "metadata": {},
66    "outputs": [],
67    "source": [
68     "val avg1map = fibs20.map(x => (x, 1))\n",
69     "val avg1fold = avg1map.fold (0,0) ((x,y) => (x._1 + y._1, x._2 + y._2))\n",
70     "val avg1 = avg1fold._1 / avg1fold._2.toFloat"
71    ]
72   },
73   {
74    "cell_type": "markdown",
75    "metadata": {},
76    "source": [
77     "####  Produce a list of that shows for each element of the list 'fibs20' its absolute difference from the average:"
78    ]
79   },
80   {
81    "cell_type": "code",
82    "execution_count": null,
83    "metadata": {},
84    "outputs": [],
85    "source": [
86     "val avgDiff = fibs20.map(x => (x - avg1).abs).collect()"
87    ]
88   },
89   {
90    "cell_type": "markdown",
91    "metadata": {},
92    "source": [
93     "####  You are given a random list of words"
94    ]
95   },
96   {
97    "cell_type": "code",
98    "execution_count": null,
99    "metadata": {},
100    "outputs": [],
101    "source": [
102     "val words = sc.parallelize(List(\"automaton\", \"language\", \"logic\",\"closure\"))"
103    ]
104   },
105   {
106    "cell_type": "markdown",
107    "metadata": {},
108    "source": [
109     "####  Furthermore, we define a function that maps a word to its list of permutations"
110    ]
111   },
112   {
113    "cell_type": "code",
114    "execution_count": null,
115    "metadata": {},
116    "outputs": [],
117    "source": [
118     "def permutate (word:String) = word.permutations.toList"
119    ]
120   },
121   {
122    "cell_type": "markdown",
123    "metadata": {},
124    "source": [
125     "####  Produce a single list containing all permutations of elements from the list 'words':"
126    ]
127   },
128   {
129    "cell_type": "code",
130    "execution_count": null,
131    "metadata": {},
132    "outputs": [],
133    "source": [
134     "val wordlist = words.map(word => permutate(word)).flatMap(x => x).collect()"
135    ]
136   },
137   {
138    "cell_type": "markdown",
139    "metadata": {},
140    "source": [
141     "## b) From SQL to Dataframe (and back again)\n",
142     "\n",
143     "#### Find for each of the Spark SQL queries an equivalent one that only uses the Dataframe API (or vice versa)\n"
144    ]
145   },
146   {
147    "cell_type": "code",
148    "execution_count": null,
149    "metadata": {},
150    "outputs": [],
151    "source": [
152     "val dataPath = \"file:///home/adbs/2019S/shared/diamonds.csv\"\n",
153     "val diamonds = spark.read.format(\"csv\")\n",
154     "  .option(\"header\",\"true\")\n",
155     "  .option(\"inferSchema\", \"true\")\n",
156     "  .load(dataPath)\n",
157     "diamonds.createOrReplaceTempView(\"diamonds\")\n",
158     "\n",
159     "val articlesDF = spark.read.format(\"json\").load(\"file:///home/adbs/2019S/shared/spark/nytarticles\")\n",
160     "val commentsDF = spark.read.json(\"file:///home/adbs/2019S/shared/spark/nytcomments\")\n",
161     "articlesDF.createOrReplaceTempView(\"articles\")\n",
162     "commentsDF.createOrReplaceTempView(\"comments\")\n",
163     "// Create RDD view into dataset\n",
164     "val articlesRDD = articlesDF.rdd\n",
165     "val commentsRDD = commentsDF.rdd"
166    ]
167   },
168   {
169    "cell_type": "markdown",
170    "metadata": {},
171    "source": [
172     "#### Query 1: Transform the given Spark SQL query into the Dataframe API"
173    ]
174   },
175   {
176    "cell_type": "code",
177    "execution_count": null,
178    "metadata": {},
179    "outputs": [],
180    "source": [
181     "val query1 = spark.sql(\"SELECT COUNT(*) FROM articles WHERE sectionName='Politics'\")\n",
182     "query1.show()\n",
183     "query1.explain()"
184    ]
185   },
186   {
187    "cell_type": "code",
188    "execution_count": null,
189    "metadata": {},
190    "outputs": [],
191    "source": [
192     "//val query1df = query1.toDF()\n",
193     "//articlesDF.where(\"sectionName='Politics'\").count()\n",
194     "articlesDF.where(articlesDF.col(\"sectionName\") === \"Politics\").count()"
195    ]
196   },
197   {
198    "cell_type": "markdown",
199    "metadata": {},
200    "source": [
201     "#### Query 2: Transform the given Dataframe API query into Spark SQL"
202    ]
203   },
204   {
205    "cell_type": "code",
206    "execution_count": null,
207    "metadata": {},
208    "outputs": [],
209    "source": [
210     "val query2 = articlesDF.groupBy(\"sectionName\").count()\n",
211     "query2.show(false)\n",
212     "query2.explain()"
213    ]
214   },
215   {
216    "cell_type": "code",
217    "execution_count": null,
218    "metadata": {},
219    "outputs": [],
220    "source": [
221     "val query2sql = spark.sql(\"SELECT sectionName,COUNT(*) FROM articles GROUP BY sectionName\")\n",
222     "query2sql.show(false)\n",
223     "query2sql.explain()"
224    ]
225   },
226   {
227    "cell_type": "markdown",
228    "metadata": {},
229    "source": [
230     "#### Query 3: Transform the given Spark SQL query into the Dataframe API"
231    ]
232   },
233   {
234    "cell_type": "code",
235    "execution_count": null,
236    "metadata": {},
237    "outputs": [],
238    "source": [
239     "val query3  = spark.sql(\n",
240     "    \"SELECT a.headline, COUNT(c.commentID) AS numComments FROM articles a, comments c WHERE a.articleID = c.articleID GROUP BY a.headline\" )\n",
241     "query3.show(false) // 'false' turns of truncation of row entries\n",
242     "query3.explain()"
243    ]
244   },
245   {
246    "cell_type": "code",
247    "execution_count": null,
248    "metadata": {},
249    "outputs": [],
250    "source": [
251     "val query3df = articlesDF.crossJoin(commentsDF).filter(articlesDF.col(\"articleID\") === commentsDF.col(\"articleID\")).groupBy(articlesDF.col(\"headline\")).agg(count(\"headline\")"
252    ]
253   },
254   {
255    "cell_type": "markdown",
256    "metadata": {},
257    "source": [
258     "#### Query 4: Transform the given Spark SQL query into the Dataframe API"
259    ]
260   },
261   {
262    "cell_type": "code",
263    "execution_count": null,
264    "metadata": {},
265    "outputs": [],
266    "source": [
267     "val query4 = spark.sql(\" SELECT headline, byline, pubDate FROM articles WHERE headline RLIKE \\\"2016\\\" \")\n",
268     "query4.show(false)\n",
269     "query4.explain()"
270    ]
271   },
272   {
273    "cell_type": "code",
274    "execution_count": null,
275    "metadata": {},
276    "outputs": [],
277    "source": [
278     "val query4df = articlesDF.filter(articlesDF.col(\"headline\").rlike(\"2016\")).select(articlesDF.col(\"headline\"), articlesDF.col(\"byline\"), articlesDF.col(\"pubDate\"))"
279    ]
280   },
281   {
282    "cell_type": "markdown",
283    "metadata": {},
284    "source": [
285     "#### Query 5: Transform the given Dataframe API query into Spark SQL"
286    ]
287   },
288   {
289    "cell_type": "code",
290    "execution_count": null,
291    "metadata": {},
292    "outputs": [],
293    "source": [
294     "val query5 = articlesDF\n",
295     "      .join(commentsDF, articlesDF(\"articleID\") === commentsDF(\"articleID\"))\n",
296     "      .select(explode(articlesDF(\"keywords\")).as(\"singleKeyWords\"))\n",
297     "      .groupBy(\"singleKeyWords\")\n",
298     "      .agg(count(\"singleKeyWords\").as(\"number\"))\n",
299     "      .orderBy(desc(\"number\"))\n",
300     "query5.show(false)\n",
301     "query5.explain()"
302    ]
303   },
304   {
305    "cell_type": "markdown",
306    "metadata": {},
307    "source": [
308     "Note here that \"explode\" is a Spark SQL function that turns a tuple with column that contains a collection of objects into multiple tuples each with a single value from this collection. More information here: https://spark.apache.org/docs/2.3.0/api/sql/index.html#explode"
309    ]
310   },
311   {
312    "cell_type": "code",
313    "execution_count": null,
314    "metadata": {},
315    "outputs": [],
316    "source": [
317     "val query5sql = spark.sql(\"SELECT singleKeyWords, COUNT(*) AS number FROM (SELECT EXPLODE(keywords) AS singleKeyWords FROM articles JOIN comments ON articles.articleID = comments.articleID) GROUP BY singleKeyWords ORDER BY number DESC\")"
318    ]
319   },
320   {
321    "cell_type": "markdown",
322    "metadata": {},
323    "source": [
324     "### For All Queries Above: \n",
325     "#### Analyze the plans (.explain() ) and compare performance (using the Spark Web UI). Try to reason about any major differences in the logical plans (if there are any)."
326    ]
327   },
328   {
329    "cell_type": "markdown",
330    "metadata": {},
331    "source": [
332     "## c) Wide and Narrow Dependencies\n",
333     "\n",
334     "#### Look at the Dataframe queries given as part of b) or for which you wrote the Dataframe version.\n",
335     "\n",
336     "#### Use the Spark Internal Web UI to analyse the dependencies and stages of the queries, and try to determine which commands on which Dataframes are executed as wide dependencies and which as narrow dependencies. \n"
337    ]
338   },
339   {
340    "cell_type": "code",
341    "execution_count": null,
342    "metadata": {},
343    "outputs": [],
344    "source": []
345   }
346  ],
347  "metadata": {
348   "kernelspec": {
349    "display_name": "Scala (spylon-kernel)",
350    "language": "scala",
351    "name": "spylon-kernel"
352   },
353   "language_info": {
354    "codemirror_mode": "text/x-scala",
355    "file_extension": ".scala",
356    "help_links": [
357     {
358      "text": "MetaKernel Magics",
359      "url": "https://github.com/calysto/metakernel/blob/master/metakernel/magics/README.md"
360     }
361    ],
362    "mimetype": "text/x-scala",
363    "name": "scala",
364    "pygments_lexer": "scala",
365    "version": "0.4.1"
366   }
367  },
368  "nbformat": 4,
369  "nbformat_minor": 2
370 }