package dst.ass3.event.tests;

import dst.ass3.event.EventProcessingTestBase;
import dst.ass3.event.dto.UploadEventInfoDTO;
import dst.ass3.event.model.domain.RequestType;
import dst.ass3.event.model.events.LifecycleEvent;
import org.apache.flink.api.common.JobExecutionResult;
import org.hamcrest.Matchers;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.Future;

import static dst.ass3.event.model.domain.UploadState.QUEUED;
import static dst.ass3.event.model.domain.UploadState.UPLOADED;
import static dst.ass3.event.model.domain.UploadState.UPLOADING;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThat;

public class Ass3_3_2Test extends EventProcessingTestBase {

    private static final Logger LOG = LoggerFactory.getLogger(Ass3_3_2Test.class);

    @Test
    public void lifecycleEventStream_worksCorrectly() throws Exception {
        // run flink in new thread
        Future<JobExecutionResult> flinkExecution = initAndExecuteAsync();

        // wait until a subscriber connects
        LOG.info("Waiting for subscribers to connect");
        publisher.waitForClients();

        LifecycleEvent event;

        long then = System.currentTimeMillis();

        // causes LifecycleEvent 0
        LOG.info("Publishing e0");
        publisher.publish(new UploadEventInfoDTO(0L, then, QUEUED, "s1", RequestType.VIDEO));

        LOG.info("Collecting LifecycleEvent for e0");
        event = lifecycleEvents.take();
        LOG.info("Round-trip took {}ms", System.currentTimeMillis() - then);

        assertEquals("Event ID not set correctly", 0L, event.getRequestId());
        assertEquals("State not set correctly", QUEUED, event.getState());
        assertEquals("Request type not set correctly", RequestType.VIDEO, event.getRequestType());
        assertThat("Timestamp not set correctly", event.getTimestamp(), Matchers.greaterThanOrEqualTo(then));
        assertThat("Timestamp not set correctly", event.getTimestamp(), Matchers.lessThanOrEqualTo(System.currentTimeMillis()));

        // should be filtered
        LOG.info("Publishing e1, should be filtered");
        publisher.publish(new UploadEventInfoDTO(1L, now(), QUEUED, "s1", RequestType.OTHER));

        assertNull("Events of type OTHER should be filtered", lifecycleEvents.poll(500));

        // causes LifecycleEvent 1
        LOG.info("Publishing e2");
        publisher.publish(new UploadEventInfoDTO(2L, now(), UPLOADING, "s1", RequestType.DOCUMENT));

        LOG.info("Collecting LifecycleEvent for e2");
        event = lifecycleEvents.take();
        assertEquals(2L, event.getRequestId());
        assertEquals(UPLOADING, event.getState());

        // should be filtered
        LOG.info("Publishing e3, should be filtered");
        publisher.publish(new UploadEventInfoDTO(3L, now(), QUEUED, "s2", RequestType.OTHER));
        assertNull("Events of type OTHER should be filtered", lifecycleEvents.poll(500));

        // causes LifecycleEvent 2
        LOG.info("Publishing e4");
        publisher.publish(new UploadEventInfoDTO(4L, now(), UPLOADED, "s1", RequestType.QUIZ));

        LOG.info("Collecting LifecycleEvent for e4");
        event = lifecycleEvents.take();
        assertEquals(4L, event.getRequestId());
        assertEquals(UPLOADED, event.getState());

        // disconnect subscribers
        publisher.dropClients();

        // wait for execution to end
        flinkExecution.get();
    }
}
