]> git.somenet.org - pub/jan/adbs.git/blob - ex2/mapreduce/SomeMapReduce_wc.java
ex2.1a complete
[pub/jan/adbs.git] / ex2 / mapreduce / SomeMapReduce_wc.java
1 // Created as a template for  Advanced Database Systems 2019
2
3 import java.io.IOException;
4 import org.apache.hadoop.conf.Configuration;
5 import org.apache.hadoop.fs.Path;
6 import org.apache.hadoop.io.IntWritable;
7 import org.apache.hadoop.io.LongWritable;
8 import org.apache.hadoop.io.Text;
9 import org.apache.hadoop.mapreduce.Job;
10 import org.apache.hadoop.mapreduce.Mapper;
11 import org.apache.hadoop.mapreduce.Reducer;
12 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
13 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
14
15 public class SomeMapReduce_wc {
16     public static class MyMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
17         public static IntWritable one = new IntWritable(1);
18
19         public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
20             String[] result = value.toString().split("\\W+");
21             for (String s : result) {
22                 context.write(new Text(s.toLowerCase()), one);
23             }
24         }
25     }
26     public static class MyReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
27         public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
28             int sum = 0;
29             for (IntWritable val : values) {
30                 sum += val.get();
31             }
32             context.write(key, new IntWritable(sum));
33         }
34     }
35     public static void main(String[] args) throws Exception {
36         Configuration conf1 = new Configuration();
37         conf1.set("mapreduce.output.textoutputformat.separator",",");  // This ensures that output is comma separated
38         Job job = Job.getInstance(conf1);
39         job.setJarByClass(SomeMapReduce_wc.class);
40         job.setOutputKeyClass(Text.class);
41         job.setOutputValueClass(IntWritable.class);
42         job.setMapperClass(MyMapper.class);
43         job.setReducerClass(MyReducer.class);
44         job.setCombinerClass(MyReducer.class); // To allow the reducer to be used as a Combiner too
45 //      job.setNumReduceTasks(8);     // Uncomment this to run the job with more than one Reduce tasks. Depending on the system, this may produce a speedup.
46         FileInputFormat.setInputPaths(job, new Path(args[0]));
47         FileOutputFormat.setOutputPath(job, new Path(args[1]));
48         boolean status = job.waitForCompletion(true);
49         if (status) {
50             System.exit(0);
51         } else {
52             System.exit(1);
53         }
54     }
55 }