From edcba2be73c36ceef476eb7cd552ad4cfa5c0a00 Mon Sep 17 00:00:00 2001 From: Jan Vales Date: Wed, 15 May 2019 04:15:52 +0200 Subject: [PATCH] ex2.1b done. --- ex2/main_1.tex | 10 ++- ex2/mapreduce/SomeMapReduce_ex1a.java | 18 +++-- ex2/mapreduce/SomeMapReduce_ex2a.java | 99 +++++++++++++++++++++++++++ ex2/mapreduce/build_run.sh | 9 ++- 4 files changed, 122 insertions(+), 14 deletions(-) create mode 100644 ex2/mapreduce/SomeMapReduce_ex2a.java diff --git a/ex2/main_1.tex b/ex2/main_1.tex index 09dc2eb..fc3cd8e 100644 --- a/ex2/main_1.tex +++ b/ex2/main_1.tex @@ -4,8 +4,16 @@ Map 53 53 Reduce 1 1 \begin{enumerate}[label=(\alph*)] - \item http://localhost:19888/jobhistory/job/job\_1557406089646\_5204\\ + \item\begin{verbatim}rsync -vaPp --delete ~/gitstuff/adbs/ex2/mapreduce/ e726236f@lbd.zserv.tuwien.ac.at:mapreduce/; ssh -t e726236f@lbd.zserv.tuwien.ac.at "cd mapreduce; ./build_run.sh '_ex1a' '/user/adbs/2019S/shared/seattle-checkouts-by-title/checkouts-by-title.csv'" + \end{verbatim}\\ + http://localhost:19888/jobhistory/job/job\_1557406089646\_5204\\ Tasks: Map: 53\\ Reduce: 1\\ + + \item\begin{verbatim}rsync -vaPp --delete ~/gitstuff/adbs/ex2/mapreduce/ e726236f@lbd.zserv.tuwien.ac.at:mapreduce/; ssh -t e726236f@lbd.zserv.tuwien.ac.at "cd mapreduce; ./build_run.sh '_ex2a' '/user/adbs/2019S/shared/seattle-checkouts-by-title/checkouts-by-title.csv' '/user/adbs/2019S/shared/seattle-library-collection-inventory/library-collection-inventory.csv'" + \end{verbatim}\\ + http://localhost:19888/jobhistory/job/job\_1557406089646\_5309\\ + Map: 109\\ + Reduce: 1\\ \end{enumerate} diff --git a/ex2/mapreduce/SomeMapReduce_ex1a.java b/ex2/mapreduce/SomeMapReduce_ex1a.java index eb315bd..c016f5a 100644 --- a/ex2/mapreduce/SomeMapReduce_ex1a.java +++ b/ex2/mapreduce/SomeMapReduce_ex1a.java @@ -10,8 +10,6 @@ import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class SomeMapReduce_ex1a { public static class MyMapper extends Mapper { - public static IntWritable one = new IntWritable(1); - public void map(Object key, Text value, Context context) throws IOException, InterruptedException { String[] result = CSVSplitter.split(value.toString()); try{ @@ -20,7 +18,7 @@ public class SomeMapReduce_ex1a { } } public static class MyReducer extends Reducer { - public void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { + public void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { TextPair max = null; for (TextPair val : values) { if (max == null){ @@ -38,18 +36,18 @@ public class SomeMapReduce_ex1a { } } public static void main(String[] args) throws Exception { - Configuration conf1 = new Configuration(); - conf1.set("mapreduce.output.textoutputformat.separator",","); // This ensures that output is comma separated - Job job = Job.getInstance(conf1); - job.setJarByClass(SomeMapReduce_ex1a.class); + Configuration conf1 = new Configuration(); + conf1.set("mapreduce.output.textoutputformat.separator",","); // This ensures that output is comma separated + Job job = Job.getInstance(conf1); + job.setJarByClass(SomeMapReduce_ex1a.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(TextPair.class); job.setMapperClass(MyMapper.class); job.setReducerClass(MyReducer.class); job.setCombinerClass(MyReducer.class); // To allow the reducer to be used as a Combiner too -// job.setNumReduceTasks(8); // Uncomment this to run the job with more than one Reduce tasks. Depending on the system, this may produce a speedup. - FileInputFormat.setInputPaths(job, new Path(args[0])); - FileOutputFormat.setOutputPath(job, new Path(args[1])); +// job.setNumReduceTasks(8); // Uncomment this to run the job with more than one Reduce tasks. Depending on the system, this may produce a speedup. + FileInputFormat.setInputPaths(job, new Path(args[1])); + FileOutputFormat.setOutputPath(job, new Path(args[0])); boolean status = job.waitForCompletion(true); if (status) { System.exit(0); diff --git a/ex2/mapreduce/SomeMapReduce_ex2a.java b/ex2/mapreduce/SomeMapReduce_ex2a.java new file mode 100644 index 0000000..5b0e446 --- /dev/null +++ b/ex2/mapreduce/SomeMapReduce_ex2a.java @@ -0,0 +1,99 @@ +// Created as a template for Advanced Database Systems 2019 + +import java.io.*; +import java.util.*; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.*; +import org.apache.hadoop.mapreduce.*; +import org.apache.hadoop.mapreduce.lib.input.*; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; + +import org.apache.commons.logging.*; + + +public class SomeMapReduce_ex2a { + public static class TextArrayWritable extends ArrayWritable { + public TextArrayWritable(){ + super(Text.class); + } + public TextArrayWritable(String[] strings) { + super(Text.class); + Text[] texts = new Text[strings.length]; + for (int i = 0; i < strings.length; i++) { + texts[i] = new Text(strings[i]); + } + set(texts); + } + } + + public static class MyMapper extends Mapper { + public void map(Object key, Text value, Context context) throws IOException, InterruptedException { + String[] result = CSVSplitter.split(value.toString()); + + try{ + // format: key: author. vals: mappertype, title, checkouts + context.write(new Text(result[7]), new TextArrayWritable(new String[]{"MyMapper", result[6].toString(), Integer.toString(Integer.parseInt(result[5]))})); + }catch(NumberFormatException e){} // not an integer (csv header line) or no value. + } + } + public static class MyOtherMapper extends Mapper { + public void map(Object key, Text value, Context context) throws IOException, InterruptedException { + String[] result = CSVSplitter.split(value.toString()); + // format: key: author. vals: mappertype, title, pubyear, subjects, itemlocation + context.write(new Text(result[2]), new TextArrayWritable(new String[]{"MyOtherMapper", result[1].toString(), result[4].toString(), result[6].toString(), result[10].toString()})); + } + } + + public static class MyReducer extends Reducer { + private static final Log LOG = LogFactory.getLog(MyReducer.class); + + public void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { + HashMap hm = new HashMap(); + String[] max = new String[]{"none", "title", "0"}; + + for (TextArrayWritable val : values) { + String[] vals = val.toStrings(); + if ("MyOtherMapper".equals(vals[0])){ + hm.put(key.toString()+"_"+vals[1], vals); + }else{ + if (max == null){ + max = vals; + continue; + } + if (Integer.parseInt(max[2]) < Integer.parseInt(vals[2])){ + max = vals; + continue; + } + } + } + + if(Integer.parseInt(max[2]) > 0 && hm.containsKey(key.toString()+"_"+max[1])) { + String[] additionalData = hm.get(key.toString()+"_"+max[1]); + // format: key: author. vals: title, pubyear, subjects, itemlocation + context.write(key, new Text(max[1]+","+additionalData[1]+","+additionalData[2]+","+additionalData[3])); + } + } + } + public static void main(String[] args) throws Exception { + Configuration conf1 = new Configuration(); + conf1.set("mapreduce.output.textoutputformat.separator",","); // This ensures that output is comma separated + Job job = Job.getInstance(conf1); + job.setJarByClass(SomeMapReduce_ex2a.class); + job.setOutputKeyClass(Text.class); + job.setOutputValueClass(Text.class); + job.setMapOutputKeyClass(Text.class); + job.setMapOutputValueClass(TextArrayWritable.class); + job.setReducerClass(MyReducer.class); +// job.setNumReduceTasks(8); // Uncomment this to run the job with more than one Reduce tasks. Depending on the system, this may produce a speedup. + MultipleInputs.addInputPath(job, new Path(args[2]), TextInputFormat.class, MyOtherMapper.class); + MultipleInputs.addInputPath(job, new Path(args[1]), TextInputFormat.class, MyMapper.class); + FileOutputFormat.setOutputPath(job, new Path(args[0])); + boolean status = job.waitForCompletion(true); + if (status) { + System.exit(0); + } else { + System.exit(1); + } + } +} diff --git a/ex2/mapreduce/build_run.sh b/ex2/mapreduce/build_run.sh index 79145c1..040cc2f 100755 --- a/ex2/mapreduce/build_run.sh +++ b/ex2/mapreduce/build_run.sh @@ -2,11 +2,14 @@ . /etc/profile +RUN_PARAM=$1 +shift 1 + hdfs dfs -rm -r -f -skipTrash output-dir .staging .Trash -hadoop com.sun.tools.javac.Main "SomeMapReduce$1.java" TextPair.java CSVSplitter.java -jar cf "SomeMapReduce$1.jar" *.class -hadoop jar "SomeMapReduce$1.jar" "SomeMapReduce$1" "$2" output-dir +hadoop com.sun.tools.javac.Main "SomeMapReduce$RUN_PARAM.java" TextPair.java CSVSplitter.java +jar cf "SomeMapReduce$RUN_PARAM.jar" *.class +hadoop jar "SomeMapReduce$RUN_PARAM.jar" "SomeMapReduce$RUN_PARAM" output-dir $@ hdfs dfs -copyToLocal output-dir output-dir hdfs dfs -rm -r -f -skipTrash output-dir .staging .Trash -- 2.43.0