From 5bfd7b1aae0fc4c34dc1a59ab3bc08a2d0b39c71 Mon Sep 17 00:00:00 2001 From: David Kaufmann Date: Sun, 12 May 2019 20:11:10 +0200 Subject: [PATCH] update ex2.5 --- ex2/spark/Exercise5_SparkInScala.ipynb | 111 ++++++++----------------- 1 file changed, 33 insertions(+), 78 deletions(-) diff --git a/ex2/spark/Exercise5_SparkInScala.ipynb b/ex2/spark/Exercise5_SparkInScala.ipynb index 7e3b4e5..11379a5 100644 --- a/ex2/spark/Exercise5_SparkInScala.ipynb +++ b/ex2/spark/Exercise5_SparkInScala.ipynb @@ -69,7 +69,7 @@ } ], "source": [ - "val evenFibs20 = fibs20.filter(x => (x % 2 == 0))" + "val evenFibs20 = fibs20.filter(x => (x % 2 == 0)).collect()" ] }, { @@ -97,7 +97,7 @@ "source": [ "val avg1map = fibs20.map(x => (x, 1))\n", "val avg1fold = avg1map.fold (0,0) ((x,y) => (x._1 + y._1, x._2 + y._2))\n", - "var avg1 = avg1fold._1 / avg1fold._2.toFloat" + "val avg1 = avg1fold._1 / avg1fold._2.toFloat" ] }, { @@ -111,19 +111,9 @@ "cell_type": "code", "execution_count": null, "metadata": {}, - "outputs": [ - { - "data": { - "text/plain": [ - "Intitializing Scala interpreter ..." - ] - }, - "metadata": {}, - "output_type": "display_data" - } - ], + "outputs": [], "source": [ - "val avgDiff = fibs20.map(x => (x - avg1).abs)" + "val avgDiff = fibs20.map(x => (x - avg1).abs).collect()" ] }, { @@ -137,17 +127,7 @@ "cell_type": "code", "execution_count": null, "metadata": {}, - "outputs": [ - { - "data": { - "text/plain": [ - "Intitializing Scala interpreter ..." - ] - }, - "metadata": {}, - "output_type": "display_data" - } - ], + "outputs": [], "source": [ "val words = sc.parallelize(List(\"automaton\", \"language\", \"logic\",\"closure\"))" ] @@ -163,17 +143,7 @@ "cell_type": "code", "execution_count": null, "metadata": {}, - "outputs": [ - { - "data": { - "text/plain": [ - "Intitializing Scala interpreter ..." - ] - }, - "metadata": {}, - "output_type": "display_data" - } - ], + "outputs": [], "source": [ "def permutate (word:String) = word.permutations.toList" ] @@ -189,19 +159,9 @@ "cell_type": "code", "execution_count": null, "metadata": {}, - "outputs": [ - { - "data": { - "text/plain": [ - "Intitializing Scala interpreter ..." - ] - }, - "metadata": {}, - "output_type": "display_data" - } - ], + "outputs": [], "source": [ - "words.map(word => permutate(word))." + "val wordlist = words.map(word => permutate(word)).collect().flatMap(x => x)" ] }, { @@ -217,27 +177,17 @@ "cell_type": "code", "execution_count": null, "metadata": {}, - "outputs": [ - { - "data": { - "text/plain": [ - "Intitializing Scala interpreter ..." - ] - }, - "metadata": {}, - "output_type": "display_data" - } - ], + "outputs": [], "source": [ - "val dataPath = \"/home/adbs/2019S/shared/diamonds.csv\"\n", + "val dataPath = \"file:///home/adbs/2019S/shared/diamonds.csv\"\n", "val diamonds = spark.read.format(\"csv\")\n", " .option(\"header\",\"true\")\n", " .option(\"inferSchema\", \"true\")\n", " .load(dataPath)\n", "diamonds.createOrReplaceTempView(\"diamonds\")\n", "\n", - "val articlesDF = spark.read.format(\"json\").load(\"/home/adbs/2019S/shared/spark/nytarticles\")\n", - "val commentsDF = spark.read.json(\"/home/adbs/2019S/shared/spark/nytcomments\")\n", + "val articlesDF = spark.read.format(\"json\").load(\"file:///home/adbs/2019S/shared/spark/nytarticles\")\n", + "val commentsDF = spark.read.json(\"file:///home/adbs/2019S/shared/spark/nytcomments\")\n", "articlesDF.createOrReplaceTempView(\"articles\")\n", "commentsDF.createOrReplaceTempView(\"comments\")\n", "// Create RDD view into dataset\n", @@ -256,17 +206,7 @@ "cell_type": "code", "execution_count": null, "metadata": {}, - "outputs": [ - { - "data": { - "text/plain": [ - "Intitializing Scala interpreter ..." - ] - }, - "metadata": {}, - "output_type": "display_data" - } - ], + "outputs": [], "source": [ "val query1 = spark.sql(\"SELECT COUNT(*) FROM articles WHERE sectionName='Politics'\")\n", "query1.show()\n", @@ -278,7 +218,11 @@ "execution_count": null, "metadata": {}, "outputs": [], - "source": [] + "source": [ + "//val query1df = query1.toDF()\n", + "//articlesDF.where(\"sectionName='Politics'\").count()\n", + "articlesDF.where(articlesDF.col(\"sectionName\") === \"Politics\").count()" + ] }, { "cell_type": "markdown", @@ -313,7 +257,11 @@ "execution_count": null, "metadata": {}, "outputs": [], - "source": [] + "source": [ + "val query2sql = spark.sql(\"SELECT sectionName,COUNT(*) FROM articles GROUP BY sectionName\")\n", + "query2sql.show(false)\n", + "query2sql.explain()" + ] }, { "cell_type": "markdown", @@ -349,7 +297,9 @@ "execution_count": null, "metadata": {}, "outputs": [], - "source": [] + "source": [ + "val query3df = articlesDF.crossJoin(commentsDF).filter(articlesDF.col(\"articleID\") === commentsDF.col(\"articleID\")).groupBy(articlesDF.col(\"headline\")).agg(count(\"headline\")" + ] }, { "cell_type": "markdown", @@ -384,7 +334,9 @@ "execution_count": null, "metadata": {}, "outputs": [], - "source": [] + "source": [ + "val query4df = articlesDF.filter(articlesDF.col(\"headline\").rlike(\"2016\")).select(articlesDF.col(\"headline\"), articlesDF.col(\"byline\"), articlesDF.col(\"pubDate\"))" + ] }, { "cell_type": "markdown", @@ -431,7 +383,10 @@ "execution_count": null, "metadata": {}, "outputs": [], - "source": [] + "source": [ + "// does not work yet\n", + "val query5sql = spark.sql(\"SELECT COUNT(singleKeyWords), EXPLODE(keywords) AS singleKeyWords FROM articles JOIN comments ON articles.articleID = comments.articleID GROUP BY singleKeyWords ORDER BY number DESC\")" + ] }, { "cell_type": "markdown", -- 2.43.0