1 package dst.ass3.event.impl;
3 import dst.ass3.event.IEventProcessingEnvironment;
4 import dst.ass3.event.model.domain.IUploadEventInfo;
5 import dst.ass3.event.model.domain.RequestType;
6 import dst.ass3.event.model.events.*;
7 import org.apache.flink.api.common.functions.FilterFunction;
8 import org.apache.flink.api.common.functions.MapFunction;
9 import org.apache.flink.streaming.api.datastream.DataStream;
10 import org.apache.flink.streaming.api.datastream.DataStreamSource;
11 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
12 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
13 import org.apache.flink.streaming.api.windowing.time.Time;
15 public class EventProcessingEnvironment implements IEventProcessingEnvironment {
16 DataStream<LifecycleEvent> lifecycleEventStream;
18 private SinkFunction<LifecycleEvent> lifecycleEventStreamSink;
19 private SinkFunction<UploadDuration> uploadDurationStreamSink;
20 private SinkFunction<AverageUploadDuration> averageUploadDurationStreamSink;
21 private SinkFunction<UploadTimeoutWarning> uploadTimeoutWarningStreamSink;
22 private SinkFunction<UploadFailedWarning> uploadFailedWarningStreamSink;
23 private SinkFunction<Alert> alertStreamSink;
26 public void initialize(StreamExecutionEnvironment env) {
27 DataStreamSource<IUploadEventInfo> input = env.addSource(new EventSourceFunction());
29 lifecycleEventStream = input.filter(new FilterFunction<IUploadEventInfo>() {
31 public boolean filter(IUploadEventInfo value) {
32 return value.getRequestType() != RequestType.OTHER;
34 }).map(new MapFunction<IUploadEventInfo, LifecycleEvent>() {
36 public LifecycleEvent map(IUploadEventInfo iUploadEventInfo) throws Exception {
37 return new LifecycleEvent(iUploadEventInfo);
41 lifecycleEventStream.addSink(lifecycleEventStreamSink);
47 public void setUploadDurationTimeout(Time time) {
52 public void setLifecycleEventStreamSink(SinkFunction<LifecycleEvent> sink) {
53 lifecycleEventStreamSink = sink;
57 public void setUploadDurationStreamSink(SinkFunction<UploadDuration> sink) {
58 uploadDurationStreamSink = sink;
62 public void setAverageUploadDurationStreamSink(SinkFunction<AverageUploadDuration> sink) {
63 averageUploadDurationStreamSink = sink;
67 public void setUploadTimeoutWarningStreamSink(SinkFunction<UploadTimeoutWarning> sink) {
68 uploadTimeoutWarningStreamSink = sink;
72 public void setUploadFailedWarningStreamSink(SinkFunction<UploadFailedWarning> sink) {
73 uploadFailedWarningStreamSink = sink;
77 public void setAlertStreamSink(SinkFunction<Alert> sink) {
78 alertStreamSink = sink;