From 5fe09d5e593ca3572a3334df7d2fa46ba55c5b82 Mon Sep 17 00:00:00 2001 From: Jan Vales Date: Wed, 15 May 2019 01:09:20 +0200 Subject: [PATCH] ex2.1a complete --- ex2/main_1.tex | 10 +++ ex2/mapreduce/CSVSplitter.java | 57 +++++++++++++++++ ex2/mapreduce/SomeMapReduce_ex1a.java | 60 ++++++++++++++++++ ex2/mapreduce/SomeMapReduce_wc.java | 55 ++++++++++++++++ ex2/mapreduce/TextPair.java | 90 +++++++++++++++++++++++++++ ex2/mapreduce/build_run.sh | 12 ++++ 6 files changed, 284 insertions(+) create mode 100644 ex2/mapreduce/CSVSplitter.java create mode 100644 ex2/mapreduce/SomeMapReduce_ex1a.java create mode 100644 ex2/mapreduce/SomeMapReduce_wc.java create mode 100644 ex2/mapreduce/TextPair.java create mode 100755 ex2/mapreduce/build_run.sh diff --git a/ex2/main_1.tex b/ex2/main_1.tex index 0141af9..09dc2eb 100644 --- a/ex2/main_1.tex +++ b/ex2/main_1.tex @@ -1 +1,11 @@ %ex2.1 + +Map 53 53 +Reduce 1 1 + +\begin{enumerate}[label=(\alph*)] + \item http://localhost:19888/jobhistory/job/job\_1557406089646\_5204\\ + Tasks: + Map: 53\\ + Reduce: 1\\ +\end{enumerate} diff --git a/ex2/mapreduce/CSVSplitter.java b/ex2/mapreduce/CSVSplitter.java new file mode 100644 index 0000000..d77e754 --- /dev/null +++ b/ex2/mapreduce/CSVSplitter.java @@ -0,0 +1,57 @@ +// Created as a template for Advanced Database Systems 2019 + +import java.util.ArrayList; + +public class CSVSplitter { + public static String[] split(String input){ + ArrayList output = new ArrayList(); + int start = 0; + boolean insideQuote = false; + + for (int current = 0; current < input.length();current++){ + char c = input.charAt(current); + switch (c){ + case ',': + if (!insideQuote) { + output.add(input.substring(start,current)); + start = current +1; + } + break; + case '"': + if (current == start) { + insideQuote = true; + } else if ( (current < input.length() - 1) && input.charAt(current+1) == '"'){ // escaped quote + current = current + 1; // skip ahead + continue; + } else if ( (current < input.length() - 1) && input.charAt(current+1) == ',') { + insideQuote = false; + } + break; + } + } + output.add(input.substring(start)); + return output.toArray( new String[output.size()]); + } + + //This version does not consider quote when deciding splits + public static String[] splitNoQuote(String input){ + ArrayList output = new ArrayList(); + int start = 0; + boolean insideQuote = false; + + for (int current = 0; current < input.length();current++){ + char c = input.charAt(current); + switch (c){ + case ',': + if (!insideQuote) { + output.add(input.substring(start,current)); + start = current +1; + } + break; + + } + } + output.add(input.substring(start)); + return output.toArray( new String[output.size()]); + } +} diff --git a/ex2/mapreduce/SomeMapReduce_ex1a.java b/ex2/mapreduce/SomeMapReduce_ex1a.java new file mode 100644 index 0000000..eb315bd --- /dev/null +++ b/ex2/mapreduce/SomeMapReduce_ex1a.java @@ -0,0 +1,60 @@ +// Created as a template for Advanced Database Systems 2019 + +import java.io.*; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.*; +import org.apache.hadoop.mapreduce.*; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; + +public class SomeMapReduce_ex1a { + public static class MyMapper extends Mapper { + public static IntWritable one = new IntWritable(1); + + public void map(Object key, Text value, Context context) throws IOException, InterruptedException { + String[] result = CSVSplitter.split(value.toString()); + try{ + context.write(new Text(result[7]), new TextPair(result[6].toString(), Integer.toString(Integer.parseInt(result[5])))); + }catch(NumberFormatException e){} // not an integer (csv header line) or no value. + } + } + public static class MyReducer extends Reducer { + public void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { + TextPair max = null; + for (TextPair val : values) { + if (max == null){ + max = val; + continue; + } + if (Integer.parseInt(max.getSecond().toString()) < Integer.parseInt(val.getSecond().toString())){ + max = val; + continue; + } + } + if(Integer.parseInt(max.getSecond().toString()) > 0) { + context.write(key, max); + } + } + } + public static void main(String[] args) throws Exception { + Configuration conf1 = new Configuration(); + conf1.set("mapreduce.output.textoutputformat.separator",","); // This ensures that output is comma separated + Job job = Job.getInstance(conf1); + job.setJarByClass(SomeMapReduce_ex1a.class); + job.setOutputKeyClass(Text.class); + job.setOutputValueClass(TextPair.class); + job.setMapperClass(MyMapper.class); + job.setReducerClass(MyReducer.class); + job.setCombinerClass(MyReducer.class); // To allow the reducer to be used as a Combiner too +// job.setNumReduceTasks(8); // Uncomment this to run the job with more than one Reduce tasks. Depending on the system, this may produce a speedup. + FileInputFormat.setInputPaths(job, new Path(args[0])); + FileOutputFormat.setOutputPath(job, new Path(args[1])); + boolean status = job.waitForCompletion(true); + if (status) { + System.exit(0); + } else { + System.exit(1); + } + } +} diff --git a/ex2/mapreduce/SomeMapReduce_wc.java b/ex2/mapreduce/SomeMapReduce_wc.java new file mode 100644 index 0000000..561cb57 --- /dev/null +++ b/ex2/mapreduce/SomeMapReduce_wc.java @@ -0,0 +1,55 @@ +// Created as a template for Advanced Database Systems 2019 + +import java.io.IOException; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.Reducer; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; + +public class SomeMapReduce_wc { + public static class MyMapper extends Mapper { + public static IntWritable one = new IntWritable(1); + + public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { + String[] result = value.toString().split("\\W+"); + for (String s : result) { + context.write(new Text(s.toLowerCase()), one); + } + } + } + public static class MyReducer extends Reducer { + public void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { + int sum = 0; + for (IntWritable val : values) { + sum += val.get(); + } + context.write(key, new IntWritable(sum)); + } + } + public static void main(String[] args) throws Exception { + Configuration conf1 = new Configuration(); + conf1.set("mapreduce.output.textoutputformat.separator",","); // This ensures that output is comma separated + Job job = Job.getInstance(conf1); + job.setJarByClass(SomeMapReduce_wc.class); + job.setOutputKeyClass(Text.class); + job.setOutputValueClass(IntWritable.class); + job.setMapperClass(MyMapper.class); + job.setReducerClass(MyReducer.class); + job.setCombinerClass(MyReducer.class); // To allow the reducer to be used as a Combiner too +// job.setNumReduceTasks(8); // Uncomment this to run the job with more than one Reduce tasks. Depending on the system, this may produce a speedup. + FileInputFormat.setInputPaths(job, new Path(args[0])); + FileOutputFormat.setOutputPath(job, new Path(args[1])); + boolean status = job.waitForCompletion(true); + if (status) { + System.exit(0); + } else { + System.exit(1); + } + } +} diff --git a/ex2/mapreduce/TextPair.java b/ex2/mapreduce/TextPair.java new file mode 100644 index 0000000..2b0eb88 --- /dev/null +++ b/ex2/mapreduce/TextPair.java @@ -0,0 +1,90 @@ +// Originally based on an example from "Hadoop: The Definitive Guide" by Tom White. +// https://github.com/tomwhite/hadoop-book/blob/master/ch05-io/src/main/java/oldapi/TextPair.java +// Copyright (C) 2014 Tom White +// +// Adapted by Filip Darmanovic and Cem Okulmus +// Created as a template for Advanced Database Systems 2019 + +import java.io.*; +import org.apache.hadoop.io.*; + +public class TextPair implements WritableComparable { + + private Text first; + private Text second; + + public TextPair() { + set(new Text(), new Text()); + } + + public TextPair(TextPair copy) { + set(new Text(copy.getFirst().toString()), + new Text(copy.getSecond().toString())); + } + + public TextPair(String first, String second) { + set(new Text(first), new Text(second)); + } + + public TextPair(Text first, Text second) { + set(first, second); + } + + public void set(Text first, Text second) { + this.first = first; + this.second = second; + } + + public void set(String first,String second){ + set(new Text(first),new Text(second)); + } + + public Text getFirst() { + return first; + } + + public Text getSecond() { + return second; + } + + + @Override + public void write(DataOutput out) throws IOException { + first.write(out); + second.write(out); + } + + @Override + public void readFields(DataInput in) throws IOException { + first.readFields(in); + second.readFields(in); + } + + @Override + public int hashCode() { + return (1013 * first.hashCode()) ^ (1009 * second.hashCode()); // any large prime numbers should do + } + + @Override + public boolean equals(Object o) { + if (o instanceof TextPair) { + TextPair tp = (TextPair) o; + return first.equals(tp.first) && second.equals(tp.second); + } + return false; + } + + @Override + public String toString() { + return first + "," + second; + } + + @Override + public int compareTo(TextPair tp) { + int cmp = first.compareTo(tp.first); + if (cmp != 0) { + return cmp; + } + return second.compareTo(tp.second); + } +} diff --git a/ex2/mapreduce/build_run.sh b/ex2/mapreduce/build_run.sh new file mode 100755 index 0000000..79145c1 --- /dev/null +++ b/ex2/mapreduce/build_run.sh @@ -0,0 +1,12 @@ +#!/bin/bash + +. /etc/profile + +hdfs dfs -rm -r -f -skipTrash output-dir .staging .Trash + +hadoop com.sun.tools.javac.Main "SomeMapReduce$1.java" TextPair.java CSVSplitter.java +jar cf "SomeMapReduce$1.jar" *.class +hadoop jar "SomeMapReduce$1.jar" "SomeMapReduce$1" "$2" output-dir + +hdfs dfs -copyToLocal output-dir output-dir +hdfs dfs -rm -r -f -skipTrash output-dir .staging .Trash -- 2.43.0