]> git.somenet.org - pub/jan/adbs.git/blob - ex2/mapreduce/SomeMapReduce_ex2a.java
etwas zu ex2.5 geschrieben.
[pub/jan/adbs.git] / ex2 / mapreduce / SomeMapReduce_ex2a.java
1 // Created as a template for  Advanced Database Systems 2019
2
3 import java.io.*;
4 import java.util.*;
5 import org.apache.hadoop.conf.Configuration;
6 import org.apache.hadoop.fs.Path;
7 import org.apache.hadoop.io.*;
8 import org.apache.hadoop.mapreduce.*;
9 import org.apache.hadoop.mapreduce.lib.input.*;
10 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
11
12 import org.apache.commons.logging.*;
13
14
15 public class SomeMapReduce_ex2a {
16     public static class TextArrayWritable extends ArrayWritable {
17         public TextArrayWritable(){
18             super(Text.class);
19         }
20         public TextArrayWritable(String[] strings) {
21             super(Text.class);
22             Text[] texts = new Text[strings.length];
23             for (int i = 0; i < strings.length; i++) {
24                 texts[i] = new Text(strings[i]);
25             }
26             set(texts);
27         }
28     }
29
30     public static class MyMapper extends Mapper<Object, Text, Text, TextArrayWritable> {
31         public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
32             String[] result = CSVSplitter.split(value.toString());
33
34             try{
35                 // format: key: author. vals: mappertype, title, checkouts
36                 context.write(new Text(result[7]), new TextArrayWritable(new String[]{"MyMapper", result[6].toString(), Integer.toString(Integer.parseInt(result[5]))}));
37             }catch(NumberFormatException e){} // not an integer (csv header line) or no value.
38         }
39     }
40     public static class MyOtherMapper extends Mapper<Object, Text, Text, TextArrayWritable> {
41         public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
42             String[] result = CSVSplitter.split(value.toString());
43             // format: key: author. vals: mappertype, title, pubyear, subjects, itemlocation
44             context.write(new Text(result[2]), new TextArrayWritable(new String[]{"MyOtherMapper", result[1].toString(), result[4].toString(), result[6].toString(), result[10].toString()}));
45         }
46     }
47
48     public static class MyReducer extends Reducer<Text, TextArrayWritable, Text, Text> {
49         private static final Log LOG = LogFactory.getLog(MyReducer.class);
50
51         public void reduce(Text key, Iterable<TextArrayWritable> values, Context context) throws IOException, InterruptedException {
52             HashMap<String,String[]> hm = new HashMap<String,String[]>();
53             String[] max = new String[]{"none", "title", "0"};
54
55             for (TextArrayWritable val : values) {
56                 String[] vals = val.toStrings();
57                 if ("MyOtherMapper".equals(vals[0])){
58                     hm.put(key.toString()+"_"+vals[1], vals);
59                 }else{
60                     if (max == null){
61                         max = vals;
62                         continue;
63                     }
64                     if (Integer.parseInt(max[2]) < Integer.parseInt(vals[2])){
65                         max = vals;
66                         continue;
67                     }
68                 }
69             }
70
71             if(Integer.parseInt(max[2]) > 0 && hm.containsKey(key.toString()+"_"+max[1])) {
72                 String[] additionalData = hm.get(key.toString()+"_"+max[1]);
73                 // format: key: author. vals: title, pubyear, subjects, itemlocation
74                 context.write(key, new Text(max[1]+","+additionalData[1]+","+additionalData[2]+","+additionalData[3]));
75             }
76         }
77     }
78     public static void main(String[] args) throws Exception {
79         Configuration conf1 = new Configuration();
80         conf1.set("mapreduce.output.textoutputformat.separator",",");  // This ensures that output is comma separated
81         Job job = Job.getInstance(conf1);
82         job.setJarByClass(SomeMapReduce_ex2a.class);
83         job.setOutputKeyClass(Text.class);
84         job.setOutputValueClass(Text.class);
85         job.setMapOutputKeyClass(Text.class);
86         job.setMapOutputValueClass(TextArrayWritable.class);
87         job.setReducerClass(MyReducer.class);
88 //    job.setNumReduceTasks(8);     // Uncomment this to run the job with more than one Reduce tasks. Depending on the system, this may produce a speedup.
89         MultipleInputs.addInputPath(job, new Path(args[2]), TextInputFormat.class, MyOtherMapper.class);
90         MultipleInputs.addInputPath(job, new Path(args[1]), TextInputFormat.class, MyMapper.class);
91         FileOutputFormat.setOutputPath(job, new Path(args[0]));
92         boolean status = job.waitForCompletion(true);
93         if (status) {
94             System.exit(0);
95         } else {
96             System.exit(1);
97         }
98     }
99 }