From 83964325e8d3e96f2893800a7b5994d74abc2f2d Mon Sep 17 00:00:00 2001 From: Jan Vales Date: Wed, 6 Jun 2018 17:11:06 +0200 Subject: [PATCH] [3.3] first part. --- .../ass3/event/EventProcessingFactory.java | 9 ++- .../impl/EventProcessingEnvironment.java | 80 +++++++++++++++++++ .../ass3/event/impl/EventSourceFunction.java | 63 +++++++++++++++ 3 files changed, 148 insertions(+), 4 deletions(-) create mode 100644 ass3-event/src/main/java/dst/ass3/event/impl/EventProcessingEnvironment.java create mode 100644 ass3-event/src/main/java/dst/ass3/event/impl/EventSourceFunction.java diff --git a/ass3-event/src/main/java/dst/ass3/event/EventProcessingFactory.java b/ass3-event/src/main/java/dst/ass3/event/EventProcessingFactory.java index 6a8c668..2bddf92 100644 --- a/ass3-event/src/main/java/dst/ass3/event/EventProcessingFactory.java +++ b/ass3-event/src/main/java/dst/ass3/event/EventProcessingFactory.java @@ -1,16 +1,17 @@ package dst.ass3.event; +import dst.ass3.event.impl.EventProcessingEnvironment; +import dst.ass3.event.impl.EventSourceFunction; + /** * Creates your {@link IEventProcessingEnvironment} and {@link IEventSourceFunction} implementation instances. */ public class EventProcessingFactory { public static IEventProcessingEnvironment createEventProcessingEnvironment() { - // TODO - return null; + return new EventProcessingEnvironment(); } public static IEventSourceFunction createEventSourceFunction() { - // TODO - return null; + return new EventSourceFunction(); } } diff --git a/ass3-event/src/main/java/dst/ass3/event/impl/EventProcessingEnvironment.java b/ass3-event/src/main/java/dst/ass3/event/impl/EventProcessingEnvironment.java new file mode 100644 index 0000000..02daf72 --- /dev/null +++ b/ass3-event/src/main/java/dst/ass3/event/impl/EventProcessingEnvironment.java @@ -0,0 +1,80 @@ +package dst.ass3.event.impl; + +import dst.ass3.event.IEventProcessingEnvironment; +import dst.ass3.event.model.domain.IUploadEventInfo; +import dst.ass3.event.model.domain.RequestType; +import dst.ass3.event.model.events.*; +import org.apache.flink.api.common.functions.FilterFunction; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.sink.SinkFunction; +import org.apache.flink.streaming.api.windowing.time.Time; + +public class EventProcessingEnvironment implements IEventProcessingEnvironment { + DataStream lifecycleEventStream; + private Time time; + private SinkFunction lifecycleEventStreamSink; + private SinkFunction uploadDurationStreamSink; + private SinkFunction averageUploadDurationStreamSink; + private SinkFunction uploadTimeoutWarningStreamSink; + private SinkFunction uploadFailedWarningStreamSink; + private SinkFunction alertStreamSink; + + @Override + public void initialize(StreamExecutionEnvironment env) { + DataStreamSource input = env.addSource(new EventSourceFunction()); + + lifecycleEventStream = input.filter(new FilterFunction() { + @Override + public boolean filter(IUploadEventInfo value) { + return value.getRequestType() != RequestType.OTHER; + } + }).map(new MapFunction() { + @Override + public LifecycleEvent map(IUploadEventInfo iUploadEventInfo) throws Exception { + return new LifecycleEvent(iUploadEventInfo); + } + }); + + lifecycleEventStream.addSink(lifecycleEventStreamSink); + + + } + + @Override + public void setUploadDurationTimeout(Time time) { + this.time = time; + } + + @Override + public void setLifecycleEventStreamSink(SinkFunction sink) { + lifecycleEventStreamSink = sink; + } + + @Override + public void setUploadDurationStreamSink(SinkFunction sink) { + uploadDurationStreamSink = sink; + } + + @Override + public void setAverageUploadDurationStreamSink(SinkFunction sink) { + averageUploadDurationStreamSink = sink; + } + + @Override + public void setUploadTimeoutWarningStreamSink(SinkFunction sink) { + uploadTimeoutWarningStreamSink = sink; + } + + @Override + public void setUploadFailedWarningStreamSink(SinkFunction sink) { + uploadFailedWarningStreamSink = sink; + } + + @Override + public void setAlertStreamSink(SinkFunction sink) { + alertStreamSink = sink; + } +} diff --git a/ass3-event/src/main/java/dst/ass3/event/impl/EventSourceFunction.java b/ass3-event/src/main/java/dst/ass3/event/impl/EventSourceFunction.java new file mode 100644 index 0000000..587a5f7 --- /dev/null +++ b/ass3-event/src/main/java/dst/ass3/event/impl/EventSourceFunction.java @@ -0,0 +1,63 @@ +package dst.ass3.event.impl; + +import dst.ass3.event.Constants; +import dst.ass3.event.EventSubscriber; +import dst.ass3.event.IEventSourceFunction; +import dst.ass3.event.model.domain.IUploadEventInfo; +import org.apache.flink.api.common.functions.IterationRuntimeContext; +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.configuration.Configuration; + +import java.net.InetSocketAddress; + +public class EventSourceFunction implements IEventSourceFunction { + private EventSubscriber es; + private boolean isRunning; + private RuntimeContext runtimeContext; + + @Override + public void open(Configuration parameters) throws Exception { + es = EventSubscriber.subscribe(new InetSocketAddress(Constants.EVENT_PUBLISHER_PORT)); + } + + @Override + public void close() throws Exception { + if (es == null) return; + cancel(); + es.close(); + es = null; + } + + @Override + public RuntimeContext getRuntimeContext() { + return runtimeContext; + } + + @Override + public void setRuntimeContext(RuntimeContext runtimeContext) { + this.runtimeContext = runtimeContext; + } + + @Override + public IterationRuntimeContext getIterationRuntimeContext() { + return null; + } + + @Override + public void run(SourceContext ctx) throws Exception { + isRunning = true; + while (isRunning) { + IUploadEventInfo event = es.receive(); + if (event == null) { + isRunning = false; + continue; + } + ctx.collectWithTimestamp(event, event.getTimestamp()); + } + } + + @Override + public void cancel() { + isRunning = false; + } +} -- 2.43.0