1 package dst.ass3.event.tests;
3 import dst.ass3.event.EventProcessingTestBase;
4 import dst.ass3.event.dto.UploadEventInfoDTO;
5 import dst.ass3.event.model.domain.RequestType;
6 import dst.ass3.event.model.events.LifecycleEvent;
7 import org.apache.flink.api.common.JobExecutionResult;
8 import org.hamcrest.Matchers;
10 import org.slf4j.Logger;
11 import org.slf4j.LoggerFactory;
13 import java.util.concurrent.Future;
15 import static dst.ass3.event.model.domain.UploadState.QUEUED;
16 import static dst.ass3.event.model.domain.UploadState.UPLOADED;
17 import static dst.ass3.event.model.domain.UploadState.UPLOADING;
18 import static org.junit.Assert.assertEquals;
19 import static org.junit.Assert.assertNull;
20 import static org.junit.Assert.assertThat;
22 public class Ass3_3_2Test extends EventProcessingTestBase {
24 private static final Logger LOG = LoggerFactory.getLogger(Ass3_3_2Test.class);
27 public void lifecycleEventStream_worksCorrectly() throws Exception {
28 // run flink in new thread
29 Future<JobExecutionResult> flinkExecution = initAndExecuteAsync();
31 // wait until a subscriber connects
32 LOG.info("Waiting for subscribers to connect");
33 publisher.waitForClients();
37 long then = System.currentTimeMillis();
39 // causes LifecycleEvent 0
40 LOG.info("Publishing e0");
41 publisher.publish(new UploadEventInfoDTO(0L, then, QUEUED, "s1", RequestType.VIDEO));
43 LOG.info("Collecting LifecycleEvent for e0");
44 event = lifecycleEvents.take();
45 LOG.info("Round-trip took {}ms", System.currentTimeMillis() - then);
47 assertEquals("Event ID not set correctly", 0L, event.getRequestId());
48 assertEquals("State not set correctly", QUEUED, event.getState());
49 assertEquals("Request type not set correctly", RequestType.VIDEO, event.getRequestType());
50 assertThat("Timestamp not set correctly", event.getTimestamp(), Matchers.greaterThanOrEqualTo(then));
51 assertThat("Timestamp not set correctly", event.getTimestamp(), Matchers.lessThanOrEqualTo(System.currentTimeMillis()));
54 LOG.info("Publishing e1, should be filtered");
55 publisher.publish(new UploadEventInfoDTO(1L, now(), QUEUED, "s1", RequestType.OTHER));
57 assertNull("Events of type OTHER should be filtered", lifecycleEvents.poll(500));
59 // causes LifecycleEvent 1
60 LOG.info("Publishing e2");
61 publisher.publish(new UploadEventInfoDTO(2L, now(), UPLOADING, "s1", RequestType.DOCUMENT));
63 LOG.info("Collecting LifecycleEvent for e2");
64 event = lifecycleEvents.take();
65 assertEquals(2L, event.getRequestId());
66 assertEquals(UPLOADING, event.getState());
69 LOG.info("Publishing e3, should be filtered");
70 publisher.publish(new UploadEventInfoDTO(3L, now(), QUEUED, "s2", RequestType.OTHER));
71 assertNull("Events of type OTHER should be filtered", lifecycleEvents.poll(500));
73 // causes LifecycleEvent 2
74 LOG.info("Publishing e4");
75 publisher.publish(new UploadEventInfoDTO(4L, now(), UPLOADED, "s1", RequestType.QUIZ));
77 LOG.info("Collecting LifecycleEvent for e4");
78 event = lifecycleEvents.take();
79 assertEquals(4L, event.getRequestId());
80 assertEquals(UPLOADED, event.getState());
82 // disconnect subscribers
83 publisher.dropClients();
85 // wait for execution to end