]> git.somenet.org - pub/jan/dst18.git/blob - ass3-event/src/main/java/dst/ass3/event/impl/EventProcessingEnvironment.java
GITOLITE.txt
[pub/jan/dst18.git] / ass3-event / src / main / java / dst / ass3 / event / impl / EventProcessingEnvironment.java
1 package dst.ass3.event.impl;
2
3 import dst.ass3.event.IEventProcessingEnvironment;
4 import dst.ass3.event.model.domain.IUploadEventInfo;
5 import dst.ass3.event.model.domain.RequestType;
6 import dst.ass3.event.model.events.*;
7 import org.apache.flink.api.common.functions.FilterFunction;
8 import org.apache.flink.api.common.functions.MapFunction;
9 import org.apache.flink.streaming.api.datastream.DataStream;
10 import org.apache.flink.streaming.api.datastream.DataStreamSource;
11 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
12 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
13 import org.apache.flink.streaming.api.windowing.time.Time;
14
15 public class EventProcessingEnvironment implements IEventProcessingEnvironment {
16     DataStream<LifecycleEvent> lifecycleEventStream;
17     private Time time;
18     private SinkFunction<LifecycleEvent> lifecycleEventStreamSink;
19     private SinkFunction<UploadDuration> uploadDurationStreamSink;
20     private SinkFunction<AverageUploadDuration> averageUploadDurationStreamSink;
21     private SinkFunction<UploadTimeoutWarning> uploadTimeoutWarningStreamSink;
22     private SinkFunction<UploadFailedWarning> uploadFailedWarningStreamSink;
23     private SinkFunction<Alert> alertStreamSink;
24
25     @Override
26     public void initialize(StreamExecutionEnvironment env) {
27         DataStreamSource<IUploadEventInfo> input = env.addSource(new EventSourceFunction());
28
29         lifecycleEventStream = input.filter(new FilterFunction<IUploadEventInfo>() {
30             @Override
31             public boolean filter(IUploadEventInfo value) {
32                 return value.getRequestType() != RequestType.OTHER;
33             }
34         }).map(new MapFunction<IUploadEventInfo, LifecycleEvent>() {
35             @Override
36             public LifecycleEvent map(IUploadEventInfo iUploadEventInfo) throws Exception {
37                 return new LifecycleEvent(iUploadEventInfo);
38             }
39         });
40
41         lifecycleEventStream.addSink(lifecycleEventStreamSink);
42
43
44     }
45
46     @Override
47     public void setUploadDurationTimeout(Time time) {
48         this.time = time;
49     }
50
51     @Override
52     public void setLifecycleEventStreamSink(SinkFunction<LifecycleEvent> sink) {
53         lifecycleEventStreamSink = sink;
54     }
55
56     @Override
57     public void setUploadDurationStreamSink(SinkFunction<UploadDuration> sink) {
58         uploadDurationStreamSink = sink;
59     }
60
61     @Override
62     public void setAverageUploadDurationStreamSink(SinkFunction<AverageUploadDuration> sink) {
63         averageUploadDurationStreamSink = sink;
64     }
65
66     @Override
67     public void setUploadTimeoutWarningStreamSink(SinkFunction<UploadTimeoutWarning> sink) {
68         uploadTimeoutWarningStreamSink = sink;
69     }
70
71     @Override
72     public void setUploadFailedWarningStreamSink(SinkFunction<UploadFailedWarning> sink) {
73         uploadFailedWarningStreamSink = sink;
74     }
75
76     @Override
77     public void setAlertStreamSink(SinkFunction<Alert> sink) {
78         alertStreamSink = sink;
79     }
80 }