package dst.ass3.event.tests;

import dst.ass3.event.EventProcessingTestBase;
import dst.ass3.event.dto.UploadEventInfoDTO;
import dst.ass3.event.model.domain.RequestType;
import dst.ass3.event.model.events.*;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Future;
import java.util.function.BiConsumer;
import java.util.function.Consumer;

import static dst.ass3.event.model.domain.UploadState.*;
import static dst.ass3.event.model.domain.UploadState.QUEUED;
import static dst.ass3.event.model.domain.UploadState.UPLOADED;
import static org.hamcrest.Matchers.instanceOf;
import static org.junit.Assert.*;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;

public class Ass3_3_4Test extends EventProcessingTestBase {

    private static final Logger LOG = LoggerFactory.getLogger(Ass3_3_4Test.class);

    @Test
    public void multipleUploadFailures_triggerAlert() throws Exception {
        // checks that the window works correctly
        // expects UploadFailedWarning stream to work correctly

        Future<JobExecutionResult> flinkExecution = initAndExecuteAsync();

        LOG.info("Waiting for subscribers to connect");
        publisher.waitForClients();

        Consumer<Long> causeWarning = (eventId) -> {
            publisher.publish(5, t -> new UploadEventInfoDTO(eventId, t, UPLOADING, "s1", RequestType.DOCUMENT));
            publisher.publish(5, t -> new UploadEventInfoDTO(eventId, t, UPLOAD_FAILED, "s1", RequestType.DOCUMENT));
            publisher.publish(5, t -> new UploadEventInfoDTO(eventId, t, UPLOADING, "s1", RequestType.DOCUMENT));
            publisher.publish(5, t -> new UploadEventInfoDTO(eventId, t, UPLOAD_FAILED, "s1", RequestType.DOCUMENT));
            publisher.publish(5, t -> new UploadEventInfoDTO(eventId, t, UPLOADING, "s1", RequestType.DOCUMENT));
            publisher.publish(5, t -> new UploadEventInfoDTO(eventId, t, UPLOAD_FAILED, "s1", RequestType.DOCUMENT));
        };

        // all of these events will fail
        publisher.publish(new UploadEventInfoDTO(1L, now(), QUEUED, "s1", RequestType.DOCUMENT));
        publisher.publish(new UploadEventInfoDTO(2L, now(), QUEUED, "s1", RequestType.DOCUMENT));
        publisher.publish(new UploadEventInfoDTO(3L, now(), QUEUED, "s1", RequestType.DOCUMENT));
        publisher.publish(new UploadEventInfoDTO(4L, now(), QUEUED, "s1", RequestType.DOCUMENT));
        publisher.publish(new UploadEventInfoDTO(5L, now(), QUEUED, "s1", RequestType.DOCUMENT));
        publisher.publish(new UploadEventInfoDTO(6L, now(), QUEUED, "s1", RequestType.DOCUMENT));

        // warning 3 warnings
        causeWarning.accept(1L);
        causeWarning.accept(2L);
        causeWarning.accept(3L);

        LOG.info("Collecting alert for first three warnings");
        assertNotNull(alerts.take());

        LOG.info("Checking that the fourth warning does not trigger an alert");
        causeWarning.accept(4L);
        assertNull(alerts.poll(500));

        LOG.info("Checking that the fifth warning does not trigger an alert");
        causeWarning.accept(5L);
        assertNull(alerts.poll(500));

        LOG.info("Checking that the sixth warning triggered an alert");
        causeWarning.accept(6L);
        assertNotNull(alerts.take());

        publisher.dropClients();
        flinkExecution.get();
    }

    @Test
    public void uploadFailuresAndTimeouts_triggerAlert() throws Exception {
        // checks whether keying works correctly, and whether Warning streams are unioned correctly
        // expects both warning streams to work correctly

        Future<JobExecutionResult> flinkExecution = initAndExecuteAsync(e ->
                e.setUploadDurationTimeout(Time.seconds(3))
        );

        LOG.info("Waiting for subscribers to connect");
        publisher.waitForClients();

        BiConsumer<Long, String> causeWarning = (eventId, server) -> {
            publisher.publish(5, t -> new UploadEventInfoDTO(eventId, t, UPLOADING, server, RequestType.DOCUMENT));
            publisher.publish(5, t -> new UploadEventInfoDTO(eventId, t, UPLOAD_FAILED, server, RequestType.DOCUMENT));
            publisher.publish(5, t -> new UploadEventInfoDTO(eventId, t, UPLOADING, server, RequestType.DOCUMENT));
            publisher.publish(5, t -> new UploadEventInfoDTO(eventId, t, UPLOAD_FAILED, server, RequestType.DOCUMENT));
            publisher.publish(5, t -> new UploadEventInfoDTO(eventId, t, UPLOADING, server, RequestType.DOCUMENT));
            publisher.publish(5, t -> new UploadEventInfoDTO(eventId, t, UPLOAD_FAILED, server, RequestType.DOCUMENT));
        };

        publisher.publish(new UploadEventInfoDTO(1L, now(), QUEUED, "s1", RequestType.DOCUMENT)); // s1 e1 will fail
        publisher.publish(new UploadEventInfoDTO(2L, now(), QUEUED, "s1", RequestType.DOCUMENT)); // s1 e2 will fail
        publisher.publish(new UploadEventInfoDTO(3L, now(), QUEUED, "s1", RequestType.DOCUMENT)); // s1 e3 will time out
        publisher.publish(new UploadEventInfoDTO(4L, now(), QUEUED, "s2", RequestType.DOCUMENT)); // s2 e4 will fail

        // s1 warning #1
        causeWarning.accept(1L, "s1");

        // s1 warning #2
        causeWarning.accept(2L, "s1");

        // s2 warning #1
        causeWarning.accept(4L, "s2");


        LOG.info("Checking that no alert has been issued yet");
        assertNull(alerts.poll(500));

        // make sure the other events don't time out
        publisher.publish(new UploadEventInfoDTO(1L, now(), UPLOADED, "s1", RequestType.DOCUMENT)); // s1 e1 will fail
        publisher.publish(new UploadEventInfoDTO(2L, now(), UPLOADED, "s1", RequestType.DOCUMENT)); // s1 e2 will fail
        publisher.publish(new UploadEventInfoDTO(4L, now(), UPLOADED, "s2", RequestType.DOCUMENT)); // s2 e4 will fail

        sleep(4000); // waiting for e3 to time out

        publisher.dropClients();
        flinkExecution.get();

        LOG.info("Collecting Alert event");
        Alert alert = alerts.take();
        assertEquals("Expected only a single alert", 0, alerts.get().size());

        assertEquals("s1", alert.getServer());
        assertEquals("An alert should comprise three warnings", 3, alert.getWarnings().size());

        Warning w0 = alert.getWarnings().get(0);
        Warning w1 = alert.getWarnings().get(1);
        Warning w2 = alert.getWarnings().get(2);

        assertThat(w0, instanceOf(UploadFailedWarning.class));
        assertThat(w1, instanceOf(UploadFailedWarning.class));
        assertThat(w2, instanceOf(UploadTimeoutWarning.class));

        assertEquals("s1", w0.getServer());
        assertEquals("s1", w1.getServer());
        assertEquals("s1", w2.getServer());
    }

    @Test
    public void averageUploadDurationWindow_worksCorrectly() throws Exception {
        // makes sure the event is triggered at the correct instant

        Future<JobExecutionResult> flinkExecution = initAndExecuteAsync();

        LOG.info("Waiting for subscribers to connect");
        publisher.waitForClients();

        sleep(250);
        publisher.publish(new UploadEventInfoDTO(1L, now(), QUEUED, "s1", RequestType.DOCUMENT));
        publisher.publish(new UploadEventInfoDTO(2L, now(), QUEUED, "s1", RequestType.DOCUMENT));
        publisher.publish(new UploadEventInfoDTO(3L, now(), QUEUED, "s1", RequestType.DOCUMENT));
        publisher.publish(new UploadEventInfoDTO(4L, now(), QUEUED, "s1", RequestType.DOCUMENT));
        publisher.publish(new UploadEventInfoDTO(5L, now(), QUEUED, "s1", RequestType.DOCUMENT));
        publisher.publish(new UploadEventInfoDTO(6L, now(), QUEUED, "s1", RequestType.DOCUMENT));
        publisher.publish(new UploadEventInfoDTO(7L, now(), QUEUED, "s2", RequestType.DOCUMENT));

        sleep(100);
        publisher.publish(new UploadEventInfoDTO(1L, now(), UPLOADING, "s1", RequestType.DOCUMENT));
        publisher.publish(new UploadEventInfoDTO(2L, now(), UPLOADING, "s1", RequestType.DOCUMENT));
        publisher.publish(new UploadEventInfoDTO(3L, now(), UPLOADING, "s1", RequestType.DOCUMENT));
        publisher.publish(new UploadEventInfoDTO(4L, now(), UPLOADING, "s1", RequestType.DOCUMENT));
        publisher.publish(new UploadEventInfoDTO(5L, now(), UPLOADING, "s1", RequestType.DOCUMENT));
        publisher.publish(new UploadEventInfoDTO(6L, now(), UPLOADING, "s1", RequestType.DOCUMENT));
        publisher.publish(new UploadEventInfoDTO(7L, now(), UPLOADING, "s2", RequestType.DOCUMENT));

        sleep(100);
        publisher.publish(new UploadEventInfoDTO(1L, now(), UPLOADED, "s1", RequestType.DOCUMENT));
        publisher.publish(new UploadEventInfoDTO(2L, now(), UPLOADED, "s1", RequestType.DOCUMENT));
        publisher.publish(new UploadEventInfoDTO(3L, now(), UPLOADED, "s1", RequestType.DOCUMENT));
        publisher.publish(new UploadEventInfoDTO(4L, now(), UPLOADED, "s1", RequestType.DOCUMENT));

        // the first four events should not trigger anything
        LOG.info("Checking AverageUploadDuration events after the first four UPLOADED events of s1");
        assertNull(averageUploadDurations.poll(500));

        // this event is from a different server
        publisher.publish(new UploadEventInfoDTO(7L, now(), UPLOADED, "s2", RequestType.DOCUMENT));
        LOG.info("Checking AverageUploadDuration events after the first UPLOADED event of s2");
        assertNull(averageUploadDurations.poll(500));

        // fifth event in s1 triggers the window operation
        publisher.publish(new UploadEventInfoDTO(5L, now(), UPLOADED, "s1", RequestType.DOCUMENT));
        LOG.info("Collecting AverageUploadDuration event for s1");
        AverageUploadDuration event = averageUploadDurations.take();
        assertNotNull(event);
        assertEquals("s1", event.getServer());

        // should be in a new window and therefore not trigger
        publisher.publish(new UploadEventInfoDTO(6L, now(), UPLOADED, "s1", RequestType.DOCUMENT));
        LOG.info("Checking AverageUploadDuration events after the sixth UPLOADED event of s1");
        assertNull(averageUploadDurations.poll(500));

        publisher.dropClients();

        flinkExecution.get();
    }

    @Test
    public void averageUploadDurations_areCalculatedCorrectly() throws Exception {
        // makes sure the keying works properly and that the calculation is done from UploadDuration events
        // requires UploadDuration events to be calculated correctly

        Future<JobExecutionResult> flinkExecution = initAndExecuteAsync();

        LOG.info("Waiting for subscribers to connect");
        publisher.waitForClients();

        List<UploadDuration> s1Durations = new ArrayList<>(5);
        List<UploadDuration> s2Durations = new ArrayList<>(5);

        sleep(250);
        publisher.publish(5, t -> new UploadEventInfoDTO(1L, t, QUEUED, "s1", RequestType.DOCUMENT));
        publisher.publish(5, t -> new UploadEventInfoDTO(2L, t, QUEUED, "s1", RequestType.DOCUMENT));
        publisher.publish(5, t -> new UploadEventInfoDTO(3L, t, QUEUED, "s1", RequestType.DOCUMENT));
        publisher.publish(5, t -> new UploadEventInfoDTO(4L, t, QUEUED, "s1", RequestType.DOCUMENT));
        publisher.publish(5, t -> new UploadEventInfoDTO(5L, t, QUEUED, "s1", RequestType.DOCUMENT));

        sleep(250);
        publisher.publish(5, t -> new UploadEventInfoDTO(6L, t, QUEUED, "s2", RequestType.DOCUMENT));
        publisher.publish(5, t -> new UploadEventInfoDTO(7L, t, QUEUED, "s2", RequestType.DOCUMENT));
        publisher.publish(5, t -> new UploadEventInfoDTO(8L, t, QUEUED, "s2", RequestType.DOCUMENT));
        publisher.publish(5, t -> new UploadEventInfoDTO(9L, t, QUEUED, "s2", RequestType.DOCUMENT));
        publisher.publish(5, t -> new UploadEventInfoDTO(10L, t, QUEUED, "s2", RequestType.DOCUMENT));

        sleep(125);
        publisher.publish(5, t -> new UploadEventInfoDTO(6L, t, UPLOADING, "s2", RequestType.DOCUMENT));
        publisher.publish(5, t -> new UploadEventInfoDTO(7L, t, UPLOADING, "s2", RequestType.DOCUMENT));
        publisher.publish(5, t -> new UploadEventInfoDTO(8L, t, UPLOADING, "s2", RequestType.DOCUMENT));
        publisher.publish(5, t -> new UploadEventInfoDTO(9L, t, UPLOADING, "s2", RequestType.DOCUMENT));
        publisher.publish(5, t -> new UploadEventInfoDTO(10L, t, UPLOADING, "s2", RequestType.DOCUMENT));

        sleep(125);
        publisher.publish(5, t -> new UploadEventInfoDTO(1L, t, UPLOADING, "s1", RequestType.DOCUMENT));
        publisher.publish(5, t -> new UploadEventInfoDTO(2L, t, UPLOADING, "s1", RequestType.DOCUMENT));
        publisher.publish(5, t -> new UploadEventInfoDTO(3L, t, UPLOADING, "s1", RequestType.DOCUMENT));
        publisher.publish(5, t -> new UploadEventInfoDTO(4L, t, UPLOADING, "s1", RequestType.DOCUMENT));
        publisher.publish(5, t -> new UploadEventInfoDTO(5L, t, UPLOADING, "s1", RequestType.DOCUMENT));

        publisher.publish(5, t -> new UploadEventInfoDTO(6L, t, UPLOADED, "s2", RequestType.DOCUMENT));
        publisher.publish(5, t -> new UploadEventInfoDTO(7L, t, UPLOADED, "s2", RequestType.DOCUMENT));
        publisher.publish(5, t -> new UploadEventInfoDTO(8L, t, UPLOADED, "s2", RequestType.DOCUMENT));

        LOG.info("Collecting UploadDuration events 1,2,3 for s2");
        s2Durations.addAll(uploadDurations.take(3));

        sleep(500);
        publisher.publish(5, t -> new UploadEventInfoDTO(1L, t, UPLOADED, "s1", RequestType.DOCUMENT));
        publisher.publish(5, t -> new UploadEventInfoDTO(2L, t, UPLOADED, "s1", RequestType.DOCUMENT));
        publisher.publish(5, t -> new UploadEventInfoDTO(3L, t, UPLOADED, "s1", RequestType.DOCUMENT));

        LOG.info("Collecting UploadDuration events 1,2,3 for s1");
        s1Durations.addAll(uploadDurations.take(3));

        publisher.publish(5, t -> new UploadEventInfoDTO(9L, t, UPLOADED, "s2", RequestType.DOCUMENT));
        publisher.publish(5, t -> new UploadEventInfoDTO(10L, t, UPLOADED, "s2", RequestType.DOCUMENT));

        LOG.info("Collecting UploadDuration events 4,5 for s2");
        s2Durations.addAll(uploadDurations.take(2));

        sleep(500);
        publisher.publish(5, t -> new UploadEventInfoDTO(4L, t, UPLOADED, "s1", RequestType.DOCUMENT));
        publisher.publish(5, t -> new UploadEventInfoDTO(5L, t, UPLOADED, "s1", RequestType.DOCUMENT));

        LOG.info("Collecting UploadDuration events 4,5 for s1");
        s1Durations.addAll(uploadDurations.take(2));

        LOG.info("Collecting AverageUploadDuration event for s2");
        AverageUploadDuration e0 = averageUploadDurations.take(); // s2

        LOG.info("Collecting AverageUploadDuration event for s1");
        AverageUploadDuration e1 = averageUploadDurations.take(); // s1

        assertEquals("Expected calculation for s2 to have been triggered first", e0.getServer(), "s2");
        assertEquals("Expected calculation for s1 to have been triggered second", e1.getServer(), "s1");

        assertEquals("Wrong number of UploadDuration events for s1", 5, s1Durations.size());
        assertEquals("Wrong number of UploadDuration events for s2", 5, s2Durations.size());

        double s1Avg = s1Durations.stream().mapToLong(UploadDuration::getDuration).average().orElse(-1);
        double s2Avg = s2Durations.stream().mapToLong(UploadDuration::getDuration).average().orElse(-1);

        assertEquals("Average duration was not calculated from UploadDuration events correctly", e0.getDuration(), s2Avg, 100);
        assertEquals("Average duration was not calculated from UploadDuration events correctly", e1.getDuration(), s1Avg, 100);

        publisher.dropClients();
        flinkExecution.get();
    }
}
