From 83964325e8d3e96f2893800a7b5994d74abc2f2d Mon Sep 17 00:00:00 2001
From: Jan Vales <jan@jvales.net>
Date: Wed, 6 Jun 2018 17:11:06 +0200
Subject: [PATCH] [3.3] first part.

---
 .../ass3/event/EventProcessingFactory.java    |  9 ++-
 .../impl/EventProcessingEnvironment.java      | 80 +++++++++++++++++++
 .../ass3/event/impl/EventSourceFunction.java  | 63 +++++++++++++++
 3 files changed, 148 insertions(+), 4 deletions(-)
 create mode 100644 ass3-event/src/main/java/dst/ass3/event/impl/EventProcessingEnvironment.java
 create mode 100644 ass3-event/src/main/java/dst/ass3/event/impl/EventSourceFunction.java

diff --git a/ass3-event/src/main/java/dst/ass3/event/EventProcessingFactory.java b/ass3-event/src/main/java/dst/ass3/event/EventProcessingFactory.java
index 6a8c668..2bddf92 100644
--- a/ass3-event/src/main/java/dst/ass3/event/EventProcessingFactory.java
+++ b/ass3-event/src/main/java/dst/ass3/event/EventProcessingFactory.java
@@ -1,16 +1,17 @@
 package dst.ass3.event;
 
+import dst.ass3.event.impl.EventProcessingEnvironment;
+import dst.ass3.event.impl.EventSourceFunction;
+
 /**
  * Creates your {@link IEventProcessingEnvironment} and {@link IEventSourceFunction} implementation instances.
  */
 public class EventProcessingFactory {
     public static IEventProcessingEnvironment createEventProcessingEnvironment() {
-        // TODO
-        return null;
+        return new EventProcessingEnvironment();
     }
 
     public static IEventSourceFunction createEventSourceFunction() {
-        // TODO
-        return null;
+        return new EventSourceFunction();
     }
 }
diff --git a/ass3-event/src/main/java/dst/ass3/event/impl/EventProcessingEnvironment.java b/ass3-event/src/main/java/dst/ass3/event/impl/EventProcessingEnvironment.java
new file mode 100644
index 0000000..02daf72
--- /dev/null
+++ b/ass3-event/src/main/java/dst/ass3/event/impl/EventProcessingEnvironment.java
@@ -0,0 +1,80 @@
+package dst.ass3.event.impl;
+
+import dst.ass3.event.IEventProcessingEnvironment;
+import dst.ass3.event.model.domain.IUploadEventInfo;
+import dst.ass3.event.model.domain.RequestType;
+import dst.ass3.event.model.events.*;
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.windowing.time.Time;
+
+public class EventProcessingEnvironment implements IEventProcessingEnvironment {
+    DataStream<LifecycleEvent> lifecycleEventStream;
+    private Time time;
+    private SinkFunction<LifecycleEvent> lifecycleEventStreamSink;
+    private SinkFunction<UploadDuration> uploadDurationStreamSink;
+    private SinkFunction<AverageUploadDuration> averageUploadDurationStreamSink;
+    private SinkFunction<UploadTimeoutWarning> uploadTimeoutWarningStreamSink;
+    private SinkFunction<UploadFailedWarning> uploadFailedWarningStreamSink;
+    private SinkFunction<Alert> alertStreamSink;
+
+    @Override
+    public void initialize(StreamExecutionEnvironment env) {
+        DataStreamSource<IUploadEventInfo> input = env.addSource(new EventSourceFunction());
+
+        lifecycleEventStream = input.filter(new FilterFunction<IUploadEventInfo>() {
+            @Override
+            public boolean filter(IUploadEventInfo value) {
+                return value.getRequestType() != RequestType.OTHER;
+            }
+        }).map(new MapFunction<IUploadEventInfo, LifecycleEvent>() {
+            @Override
+            public LifecycleEvent map(IUploadEventInfo iUploadEventInfo) throws Exception {
+                return new LifecycleEvent(iUploadEventInfo);
+            }
+        });
+
+        lifecycleEventStream.addSink(lifecycleEventStreamSink);
+
+
+    }
+
+    @Override
+    public void setUploadDurationTimeout(Time time) {
+        this.time = time;
+    }
+
+    @Override
+    public void setLifecycleEventStreamSink(SinkFunction<LifecycleEvent> sink) {
+        lifecycleEventStreamSink = sink;
+    }
+
+    @Override
+    public void setUploadDurationStreamSink(SinkFunction<UploadDuration> sink) {
+        uploadDurationStreamSink = sink;
+    }
+
+    @Override
+    public void setAverageUploadDurationStreamSink(SinkFunction<AverageUploadDuration> sink) {
+        averageUploadDurationStreamSink = sink;
+    }
+
+    @Override
+    public void setUploadTimeoutWarningStreamSink(SinkFunction<UploadTimeoutWarning> sink) {
+        uploadTimeoutWarningStreamSink = sink;
+    }
+
+    @Override
+    public void setUploadFailedWarningStreamSink(SinkFunction<UploadFailedWarning> sink) {
+        uploadFailedWarningStreamSink = sink;
+    }
+
+    @Override
+    public void setAlertStreamSink(SinkFunction<Alert> sink) {
+        alertStreamSink = sink;
+    }
+}
diff --git a/ass3-event/src/main/java/dst/ass3/event/impl/EventSourceFunction.java b/ass3-event/src/main/java/dst/ass3/event/impl/EventSourceFunction.java
new file mode 100644
index 0000000..587a5f7
--- /dev/null
+++ b/ass3-event/src/main/java/dst/ass3/event/impl/EventSourceFunction.java
@@ -0,0 +1,63 @@
+package dst.ass3.event.impl;
+
+import dst.ass3.event.Constants;
+import dst.ass3.event.EventSubscriber;
+import dst.ass3.event.IEventSourceFunction;
+import dst.ass3.event.model.domain.IUploadEventInfo;
+import org.apache.flink.api.common.functions.IterationRuntimeContext;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.configuration.Configuration;
+
+import java.net.InetSocketAddress;
+
+public class EventSourceFunction implements IEventSourceFunction {
+    private EventSubscriber es;
+    private boolean isRunning;
+    private RuntimeContext runtimeContext;
+
+    @Override
+    public void open(Configuration parameters) throws Exception {
+        es = EventSubscriber.subscribe(new InetSocketAddress(Constants.EVENT_PUBLISHER_PORT));
+    }
+
+    @Override
+    public void close() throws Exception {
+        if (es == null) return;
+        cancel();
+        es.close();
+        es = null;
+    }
+
+    @Override
+    public RuntimeContext getRuntimeContext() {
+        return runtimeContext;
+    }
+
+    @Override
+    public void setRuntimeContext(RuntimeContext runtimeContext) {
+        this.runtimeContext = runtimeContext;
+    }
+
+    @Override
+    public IterationRuntimeContext getIterationRuntimeContext() {
+        return null;
+    }
+
+    @Override
+    public void run(SourceContext<IUploadEventInfo> ctx) throws Exception {
+        isRunning = true;
+        while (isRunning) {
+            IUploadEventInfo event = es.receive();
+            if (event == null) {
+                isRunning = false;
+                continue;
+            }
+            ctx.collectWithTimestamp(event, event.getTimestamp());
+        }
+    }
+
+    @Override
+    public void cancel() {
+        isRunning = false;
+    }
+}
-- 
2.43.0