package dst.ass3.event.impl;

import dst.ass3.event.IEventProcessingEnvironment;
import dst.ass3.event.model.domain.IUploadEventInfo;
import dst.ass3.event.model.domain.RequestType;
import dst.ass3.event.model.events.*;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.windowing.time.Time;

public class EventProcessingEnvironment implements IEventProcessingEnvironment {
    DataStream<LifecycleEvent> lifecycleEventStream;
    private Time time;
    private SinkFunction<LifecycleEvent> lifecycleEventStreamSink;
    private SinkFunction<UploadDuration> uploadDurationStreamSink;
    private SinkFunction<AverageUploadDuration> averageUploadDurationStreamSink;
    private SinkFunction<UploadTimeoutWarning> uploadTimeoutWarningStreamSink;
    private SinkFunction<UploadFailedWarning> uploadFailedWarningStreamSink;
    private SinkFunction<Alert> alertStreamSink;

    @Override
    public void initialize(StreamExecutionEnvironment env) {
        DataStreamSource<IUploadEventInfo> input = env.addSource(new EventSourceFunction());

        lifecycleEventStream = input.filter(new FilterFunction<IUploadEventInfo>() {
            @Override
            public boolean filter(IUploadEventInfo value) {
                return value.getRequestType() != RequestType.OTHER;
            }
        }).map(new MapFunction<IUploadEventInfo, LifecycleEvent>() {
            @Override
            public LifecycleEvent map(IUploadEventInfo iUploadEventInfo) throws Exception {
                return new LifecycleEvent(iUploadEventInfo);
            }
        });

        lifecycleEventStream.addSink(lifecycleEventStreamSink);


    }

    @Override
    public void setUploadDurationTimeout(Time time) {
        this.time = time;
    }

    @Override
    public void setLifecycleEventStreamSink(SinkFunction<LifecycleEvent> sink) {
        lifecycleEventStreamSink = sink;
    }

    @Override
    public void setUploadDurationStreamSink(SinkFunction<UploadDuration> sink) {
        uploadDurationStreamSink = sink;
    }

    @Override
    public void setAverageUploadDurationStreamSink(SinkFunction<AverageUploadDuration> sink) {
        averageUploadDurationStreamSink = sink;
    }

    @Override
    public void setUploadTimeoutWarningStreamSink(SinkFunction<UploadTimeoutWarning> sink) {
        uploadTimeoutWarningStreamSink = sink;
    }

    @Override
    public void setUploadFailedWarningStreamSink(SinkFunction<UploadFailedWarning> sink) {
        uploadFailedWarningStreamSink = sink;
    }

    @Override
    public void setAlertStreamSink(SinkFunction<Alert> sink) {
        alertStreamSink = sink;
    }
}
