package dst.ass3.event;

import static org.junit.Assert.assertNotNull;

import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;

import dst.ass3.event.model.events.*;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.rules.Timeout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * EventProcessingTestBase.
 */
public class EventProcessingTestBase extends Ass3EventTestBase {

    private static final Logger LOG = LoggerFactory.getLogger(EventProcessingTestBase.class);

    @Rule
    public Timeout timeout = new Timeout(15, TimeUnit.SECONDS);

    private IEventProcessingEnvironment epe;

    protected StaticQueueSink<LifecycleEvent> lifecycleEvents;
    protected StaticQueueSink<UploadDuration> uploadDurations;
    protected StaticQueueSink<AverageUploadDuration> averageUploadDurations;
    protected StaticQueueSink<UploadTimeoutWarning> uploadTimeoutWarnings;
    protected StaticQueueSink<UploadFailedWarning> uploadFailedWarnings;
    protected StaticQueueSink<Alert> alerts;

    @Before
    public void setUpEnvironment() throws Exception {
        epe = EventProcessingFactory.createEventProcessingEnvironment();

        assertNotNull("#createEventProcessingEnvironment() not implemented", epe);

        lifecycleEvents = new StaticQueueSink<>("lifecycleEvents");
        uploadDurations = new StaticQueueSink<>("uploadDurations");
        averageUploadDurations = new StaticQueueSink<>("averageUploadDurations");
        uploadTimeoutWarnings = new StaticQueueSink<>("uploadTimeoutWarnings");
        uploadFailedWarnings = new StaticQueueSink<>("uploadFailedWarnings");
        alerts = new StaticQueueSink<>("alerts");

        epe.setLifecycleEventStreamSink(lifecycleEvents);
        epe.setUploadDurationStreamSink(uploadDurations);
        epe.setAverageUploadDurationStreamSink(averageUploadDurations);
        epe.setUploadTimeoutWarningStreamSink(uploadTimeoutWarnings);
        epe.setUploadFailedWarningStreamSink(uploadFailedWarnings);
        epe.setAlertStreamSink(alerts);
        epe.setUploadDurationTimeout(Time.seconds(15));
    }

    public JobExecutionResult initAndExecute() throws Exception {
        return initAndExecute(null);
    }

    public JobExecutionResult initAndExecute(Consumer<IEventProcessingEnvironment> initializer) throws Exception {
        if (initializer != null) {
            initializer.accept(epe);
        }
        LOG.info("Initializing StreamExecutionEnvironment with {}", epe);
        epe.initialize(flink);
        LOG.info("Executing flink {}", flink);
        return flink.execute();
    }

    public Future<JobExecutionResult> initAndExecuteAsync(Consumer<IEventProcessingEnvironment> initializer) {
        return executor.submit(() -> initAndExecute(initializer));
    }

    public Future<JobExecutionResult> initAndExecuteAsync() {
        return executor.submit(() -> initAndExecute());
    }

    @After
    public void tearDownCollectors() throws Exception {
        StaticQueueSink.clearAll();
    }

}
