From 83964325e8d3e96f2893800a7b5994d74abc2f2d Mon Sep 17 00:00:00 2001
From: Jan Vales <jan@jvales.net>
Date: Wed, 6 Jun 2018 17:11:06 +0200
Subject: [PATCH] [3.3] first part.
---
.../ass3/event/EventProcessingFactory.java | 9 ++-
.../impl/EventProcessingEnvironment.java | 80 +++++++++++++++++++
.../ass3/event/impl/EventSourceFunction.java | 63 +++++++++++++++
3 files changed, 148 insertions(+), 4 deletions(-)
create mode 100644 ass3-event/src/main/java/dst/ass3/event/impl/EventProcessingEnvironment.java
create mode 100644 ass3-event/src/main/java/dst/ass3/event/impl/EventSourceFunction.java
diff --git a/ass3-event/src/main/java/dst/ass3/event/EventProcessingFactory.java b/ass3-event/src/main/java/dst/ass3/event/EventProcessingFactory.java
index 6a8c668..2bddf92 100644
--- a/ass3-event/src/main/java/dst/ass3/event/EventProcessingFactory.java
+++ b/ass3-event/src/main/java/dst/ass3/event/EventProcessingFactory.java
@@ -1,16 +1,17 @@
package dst.ass3.event;
+import dst.ass3.event.impl.EventProcessingEnvironment;
+import dst.ass3.event.impl.EventSourceFunction;
+
/**
* Creates your {@link IEventProcessingEnvironment} and {@link IEventSourceFunction} implementation instances.
*/
public class EventProcessingFactory {
public static IEventProcessingEnvironment createEventProcessingEnvironment() {
- // TODO
- return null;
+ return new EventProcessingEnvironment();
}
public static IEventSourceFunction createEventSourceFunction() {
- // TODO
- return null;
+ return new EventSourceFunction();
}
}
diff --git a/ass3-event/src/main/java/dst/ass3/event/impl/EventProcessingEnvironment.java b/ass3-event/src/main/java/dst/ass3/event/impl/EventProcessingEnvironment.java
new file mode 100644
index 0000000..02daf72
--- /dev/null
+++ b/ass3-event/src/main/java/dst/ass3/event/impl/EventProcessingEnvironment.java
@@ -0,0 +1,80 @@
+package dst.ass3.event.impl;
+
+import dst.ass3.event.IEventProcessingEnvironment;
+import dst.ass3.event.model.domain.IUploadEventInfo;
+import dst.ass3.event.model.domain.RequestType;
+import dst.ass3.event.model.events.*;
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.windowing.time.Time;
+
+public class EventProcessingEnvironment implements IEventProcessingEnvironment {
+ DataStream<LifecycleEvent> lifecycleEventStream;
+ private Time time;
+ private SinkFunction<LifecycleEvent> lifecycleEventStreamSink;
+ private SinkFunction<UploadDuration> uploadDurationStreamSink;
+ private SinkFunction<AverageUploadDuration> averageUploadDurationStreamSink;
+ private SinkFunction<UploadTimeoutWarning> uploadTimeoutWarningStreamSink;
+ private SinkFunction<UploadFailedWarning> uploadFailedWarningStreamSink;
+ private SinkFunction<Alert> alertStreamSink;
+
+ @Override
+ public void initialize(StreamExecutionEnvironment env) {
+ DataStreamSource<IUploadEventInfo> input = env.addSource(new EventSourceFunction());
+
+ lifecycleEventStream = input.filter(new FilterFunction<IUploadEventInfo>() {
+ @Override
+ public boolean filter(IUploadEventInfo value) {
+ return value.getRequestType() != RequestType.OTHER;
+ }
+ }).map(new MapFunction<IUploadEventInfo, LifecycleEvent>() {
+ @Override
+ public LifecycleEvent map(IUploadEventInfo iUploadEventInfo) throws Exception {
+ return new LifecycleEvent(iUploadEventInfo);
+ }
+ });
+
+ lifecycleEventStream.addSink(lifecycleEventStreamSink);
+
+
+ }
+
+ @Override
+ public void setUploadDurationTimeout(Time time) {
+ this.time = time;
+ }
+
+ @Override
+ public void setLifecycleEventStreamSink(SinkFunction<LifecycleEvent> sink) {
+ lifecycleEventStreamSink = sink;
+ }
+
+ @Override
+ public void setUploadDurationStreamSink(SinkFunction<UploadDuration> sink) {
+ uploadDurationStreamSink = sink;
+ }
+
+ @Override
+ public void setAverageUploadDurationStreamSink(SinkFunction<AverageUploadDuration> sink) {
+ averageUploadDurationStreamSink = sink;
+ }
+
+ @Override
+ public void setUploadTimeoutWarningStreamSink(SinkFunction<UploadTimeoutWarning> sink) {
+ uploadTimeoutWarningStreamSink = sink;
+ }
+
+ @Override
+ public void setUploadFailedWarningStreamSink(SinkFunction<UploadFailedWarning> sink) {
+ uploadFailedWarningStreamSink = sink;
+ }
+
+ @Override
+ public void setAlertStreamSink(SinkFunction<Alert> sink) {
+ alertStreamSink = sink;
+ }
+}
diff --git a/ass3-event/src/main/java/dst/ass3/event/impl/EventSourceFunction.java b/ass3-event/src/main/java/dst/ass3/event/impl/EventSourceFunction.java
new file mode 100644
index 0000000..587a5f7
--- /dev/null
+++ b/ass3-event/src/main/java/dst/ass3/event/impl/EventSourceFunction.java
@@ -0,0 +1,63 @@
+package dst.ass3.event.impl;
+
+import dst.ass3.event.Constants;
+import dst.ass3.event.EventSubscriber;
+import dst.ass3.event.IEventSourceFunction;
+import dst.ass3.event.model.domain.IUploadEventInfo;
+import org.apache.flink.api.common.functions.IterationRuntimeContext;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.configuration.Configuration;
+
+import java.net.InetSocketAddress;
+
+public class EventSourceFunction implements IEventSourceFunction {
+ private EventSubscriber es;
+ private boolean isRunning;
+ private RuntimeContext runtimeContext;
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ es = EventSubscriber.subscribe(new InetSocketAddress(Constants.EVENT_PUBLISHER_PORT));
+ }
+
+ @Override
+ public void close() throws Exception {
+ if (es == null) return;
+ cancel();
+ es.close();
+ es = null;
+ }
+
+ @Override
+ public RuntimeContext getRuntimeContext() {
+ return runtimeContext;
+ }
+
+ @Override
+ public void setRuntimeContext(RuntimeContext runtimeContext) {
+ this.runtimeContext = runtimeContext;
+ }
+
+ @Override
+ public IterationRuntimeContext getIterationRuntimeContext() {
+ return null;
+ }
+
+ @Override
+ public void run(SourceContext<IUploadEventInfo> ctx) throws Exception {
+ isRunning = true;
+ while (isRunning) {
+ IUploadEventInfo event = es.receive();
+ if (event == null) {
+ isRunning = false;
+ continue;
+ }
+ ctx.collectWithTimestamp(event, event.getTimestamp());
+ }
+ }
+
+ @Override
+ public void cancel() {
+ isRunning = false;
+ }
+}
--
2.47.3