package dst.ass3.event.tests;

import dst.ass3.event.EventProcessingTestBase;
import dst.ass3.event.dto.UploadEventInfoDTO;
import dst.ass3.event.model.domain.RequestType;
import dst.ass3.event.model.events.LifecycleEvent;
import dst.ass3.event.model.events.UploadDuration;
import dst.ass3.event.model.events.UploadFailedWarning;
import dst.ass3.event.model.events.UploadTimeoutWarning;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Future;

import static dst.ass3.event.model.domain.UploadState.*;
import static org.hamcrest.Matchers.greaterThan;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThat;

public class Ass3_3_3Test extends EventProcessingTestBase {

    private static final Logger LOG = LoggerFactory.getLogger(Ass3_3_3Test.class);

    @Test
    public void uploadDurations_areCalculatedCorrectly() throws Exception {
        // tests whether duration calculation and stream keying works correctly
        // expects LifecycleEvent stream to work correctly

        Future<JobExecutionResult> flinkExecution = initAndExecuteAsync();

        LOG.info("Waiting for subscribers to connect");
        publisher.waitForClients();

        LifecycleEvent e1Start;
        LifecycleEvent e2Start;
        LifecycleEvent e1End;
        LifecycleEvent e2End;

        publisher.publish(5, t -> new UploadEventInfoDTO(1L, t, QUEUED, "s1", RequestType.DOCUMENT)); // 1 starts before 2
        publisher.publish(5, t -> new UploadEventInfoDTO(2L, t, QUEUED, "s1", RequestType.DOCUMENT));
        publisher.publish(5, t -> new UploadEventInfoDTO(3L, t, QUEUED, "s1", RequestType.DOCUMENT)); // 3 never finishes

        LOG.info("Waiting for LifecycleEvent for event 1 being QUEUED");
        e1Start = lifecycleEvents.take();
        LOG.info("Waiting for LifecycleEvent for event 2 being QUEUED");
        e2Start = lifecycleEvents.take();
        LOG.info("Waiting for LifecycleEvent for event 3 being QUEUED");
        lifecycleEvents.take();

        sleep(500);
        publisher.publish(5, t -> new UploadEventInfoDTO(1L, t, UPLOADING, "s1", RequestType.DOCUMENT));
        publisher.publish(5, t -> new UploadEventInfoDTO(2L, t, UPLOADING, "s1", RequestType.DOCUMENT));

        LOG.info("Waiting for LifecycleEvent for event 1 being UPLOADING");
        lifecycleEvents.take();
        LOG.info("Waiting for LifecycleEvent for event 2 being UPLOADING");
        lifecycleEvents.take();

        sleep(500); // event 2 took about ~1000ms
        publisher.publish(new UploadEventInfoDTO(2L, now(), UPLOADED, "s1", RequestType.DOCUMENT)); // 2 finishes before 1

        LOG.info("Waiting for LifecycleEvent for event 2 being UPLOADED");
        e2End = lifecycleEvents.take();

        sleep(500); // event 1 took about ~1500ms
        publisher.publish(new UploadEventInfoDTO(1L, now(), UPLOADED, "s1", RequestType.DOCUMENT));

        LOG.info("Waiting for LifecycleEvent for event 1 being UPLOADED");
        e1End = lifecycleEvents.take();

        LOG.info("Collecting UploadDuration event for event 2");
        UploadDuration d0 = uploadDurations.take();

        LOG.info("Collecting UploadDuration event for event 1");
        UploadDuration d1 = uploadDurations.take();

        assertEquals("Expected event 2 to finish first", 2L, d0.getEventId()); // event 2 finished before 1
        assertEquals("Expected event 1 to finish last", 1L, d1.getEventId());

        assertThat("Expected UploadDuration to be >= 0", d0.getDuration(), greaterThan(0L));
        assertThat("Expected UploadDuration to be >= 0", d1.getDuration(), greaterThan(0L));

        assertEquals("StreamDuartion was not calculated from LifecycleEvents correctly",
                e2End.getTimestamp() - e2Start.getTimestamp(), d0.getDuration(), 100);

        assertEquals("StreamDuartion was not calculated from LifecycleEvents correctly",
                e1End.getTimestamp() - e1Start.getTimestamp(), d1.getDuration(), 100);

        publisher.dropClients();
        flinkExecution.get();
    }

    @Test
    public void durationsWithInterleavedEvents_areCalculatedCorrectly() throws Exception {
        // tests whether CEP rule is tolerant towards multiple state changes between QUEUED and UPLOADED

        Future<JobExecutionResult> flinkExecution = initAndExecuteAsync(e ->
                e.setUploadDurationTimeout(Time.seconds(1))
        );

        LOG.info("Waiting for subscribers to connect");
        publisher.waitForClients();

        publisher.publish(5, t -> new UploadEventInfoDTO(1L, t, QUEUED, "s1", RequestType.DOCUMENT));
        publisher.publish(5, t -> new UploadEventInfoDTO(2L, t, QUEUED, "s1", RequestType.DOCUMENT)); // never finishes

        // change state several times (tests another aspect of the CEP rule)
        publisher.publish(5, t -> new UploadEventInfoDTO(1L, t, UPLOADING, "s1", RequestType.DOCUMENT));
        publisher.publish(5, t -> new UploadEventInfoDTO(1L, t, UPLOAD_FAILED, "s1", RequestType.DOCUMENT));
        publisher.publish(5, t -> new UploadEventInfoDTO(1L, t, UPLOADING, "s1", RequestType.DOCUMENT));
        publisher.publish(5, t -> new UploadEventInfoDTO(1L, t, UPLOAD_FAILED, "s1", RequestType.DOCUMENT));

        publisher.publish(5, t -> new UploadEventInfoDTO(1L, t, UPLOADED, "s1", RequestType.DOCUMENT));

        publisher.dropClients();
        flinkExecution.get();

        List<UploadDuration> result = new ArrayList<>(uploadDurations.get());
        assertEquals("Expected one event to have finished", 1, result.size());

        UploadDuration d0 = result.get(0);
        assertEquals("Expected event 1 to have finished", 1L, d0.getEventId());
    }

    @Test
    public void timeoutWarnings_areEmittedCorrectly() throws Exception {
        Future<JobExecutionResult> flinkExecution = initAndExecuteAsync(e -> {
            e.setUploadDurationTimeout(Time.seconds(1));
        });

        LOG.info("Waiting for subscribers to connect");
        publisher.waitForClients();

        publisher.publish(new UploadEventInfoDTO(1L, now(), QUEUED, "s1", RequestType.DOCUMENT)); // never finishes

        sleep(100);
        publisher.publish(new UploadEventInfoDTO(2L, now(), QUEUED, "s1", RequestType.DOCUMENT)); // never finishes

        // confounding event
        sleep(100);
        publisher.publish(new UploadEventInfoDTO(1L, now(), UPLOADING, "s1", RequestType.DOCUMENT));

        sleep(1500); // wait for event to time out

        publisher.dropClients();

        LOG.info("Waiting for Flink execution to end");
        flinkExecution.get();

        LOG.info("Collecting timeout warning for event 1");
        UploadTimeoutWarning w1 = uploadTimeoutWarnings.take();

        LOG.info("Collecting timeout warning for event 2");
        UploadTimeoutWarning w2 = uploadTimeoutWarnings.take();

        assertEquals("Expected event 1 to time out first", 1L, w1.getEventId());
        assertEquals("Expected event 2 to time out second", 2L, w2.getEventId());
    }

    @Test
    public void uploadFailedWarnings_areEmittedCorrectly() throws Exception {
        Future<JobExecutionResult> flinkExecution = initAndExecuteAsync();

        LOG.info("Waiting for subscribers to connect");
        publisher.waitForClients();

        publisher.publish(0, t -> new UploadEventInfoDTO(1L, t, QUEUED, "s1", RequestType.DOCUMENT));
        publisher.publish(0, t -> new UploadEventInfoDTO(2L, t, QUEUED, "s1", RequestType.DOCUMENT));

        // event 1 fail #1
        publisher.publish(5, t -> new UploadEventInfoDTO(1L, t, UPLOADING, "s1", RequestType.DOCUMENT));
        publisher.publish(5, t -> new UploadEventInfoDTO(1L, t, UPLOAD_FAILED, "s1", RequestType.DOCUMENT));

        // event 2 fail #1
        publisher.publish(5, t -> new UploadEventInfoDTO(2L, t, UPLOADING, "s1", RequestType.DOCUMENT));
        publisher.publish(5, t -> new UploadEventInfoDTO(2L, t, UPLOAD_FAILED, "s1", RequestType.DOCUMENT));

        // event 1 fail #2
        publisher.publish(5, t -> new UploadEventInfoDTO(1L, t, UPLOADING, "s1", RequestType.DOCUMENT));
        publisher.publish(5, t -> new UploadEventInfoDTO(1L, t, UPLOAD_FAILED, "s1", RequestType.DOCUMENT));

        // event 2 fail #2 and then success
        publisher.publish(5, t -> new UploadEventInfoDTO(2L, t, UPLOADING, "s1", RequestType.DOCUMENT));
        publisher.publish(5, t -> new UploadEventInfoDTO(2L, t, UPLOAD_FAILED, "s1", RequestType.DOCUMENT));
        publisher.publish(5, t -> new UploadEventInfoDTO(2L, t, UPLOADED, "s1", RequestType.DOCUMENT));

        LOG.info("Checking that no UploadFailedWarning was issued yet");
        assertNull(uploadFailedWarnings.poll(500));

        LOG.info("Triggering third failure");
        // event 1 fail #3
        publisher.publish(5, t -> new UploadEventInfoDTO(1L, t, UPLOADING, "s1", RequestType.DOCUMENT));
        publisher.publish(5, t -> new UploadEventInfoDTO(1L, t, UPLOAD_FAILED, "s1", RequestType.DOCUMENT));

        LOG.info("Waiting for UploadFailedWarning for event 1");
        UploadFailedWarning warning = uploadFailedWarnings.take();
        assertEquals(1L, warning.getEventId());
        assertEquals("s1", warning.getServer());

        publisher.dropClients();
        flinkExecution.get();
    }
}
