-
Notifications
You must be signed in to change notification settings - Fork 0
/
UniqueListener.java
83 lines (69 loc) · 3.07 KB
/
UniqueListener.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
import java.io.IOException;
import java.util.HashSet;
import java.util.Set;
//import java.util.*;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
public class MusicTrack
{
public static class MusicMapper extends Mapper<Object,Text,Text,Text>
{
public void map(Object key,Text value,Context context) throws IOException,InterruptedException
{
String[] tokens=value.toString().split("\\|");
String trackid = /*"1";*/tokens[1];
String others = tokens[0]+"\t"+tokens[2]+"\t"+tokens[3]+"\t"+tokens[4];
context.write(new Text(trackid),new Text(others));
}
}
public static class MusicReduceer extends Reducer<Text,Text,Text,Text>
{
public void reduce(Text Key,Iterable<Text> value,Context context) throws IOException,InterruptedException
{
Set<Integer> userIdSet = new HashSet<Integer>();
int shared = 0;
int radio =0;
int skip= 0;
int listen=0;
for(Text val:value)
{
String[] valTokens = val.toString().split("\t");
int sh = Integer.parseInt(valTokens[1]);
int ra = Integer.parseInt(valTokens[2]);
int sk = Integer.parseInt(valTokens[3]);
shared = shared+sh;
radio=radio+ra;
skip=skip+sk;
listen = shared + radio;
int cus = Integer.parseInt(valTokens[0]);
userIdSet.add(cus);
}
IntWritable size = new IntWritable(userIdSet.size());
context.write(new Text(Key),new Text("customerId- "+size+"\t"+"Shared- "+shared+"\t"+"Radio- "+radio+"\t"+"Skipped- "+skip+"\t"+"Listen- "+listen));
}
}
public static void main(String args[]) throws Exception
{
Configuration conf=new Configuration();
Job job=new Job(conf,"MusicTrack");
job.setNumReduceTasks(1);
job.setJarByClass(MusicTrack.class);
job.setMapperClass(MusicMapper.class);
job.setReducerClass(MusicReduceer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
Path outputpath= new Path(args[1]);
FileInputFormat.addInputPath(job,new Path(args[0]));
FileOutputFormat.setOutputPath(job,new Path(args[1]));
outputpath.getFileSystem(conf).delete(outputpath,true);
System.exit(job.waitForCompletion(true)?0:1);
}
}