]> git.somenet.org - pub/jan/dst18.git/blob - ass3-event/src/test/java/dst/ass3/event/tests/Ass3_3_2Test.java
Add template for assignment 3
[pub/jan/dst18.git] / ass3-event / src / test / java / dst / ass3 / event / tests / Ass3_3_2Test.java
1 package dst.ass3.event.tests;
2
3 import dst.ass3.event.EventProcessingTestBase;
4 import dst.ass3.event.dto.UploadEventInfoDTO;
5 import dst.ass3.event.model.domain.RequestType;
6 import dst.ass3.event.model.events.LifecycleEvent;
7 import org.apache.flink.api.common.JobExecutionResult;
8 import org.hamcrest.Matchers;
9 import org.junit.Test;
10 import org.slf4j.Logger;
11 import org.slf4j.LoggerFactory;
12
13 import java.util.concurrent.Future;
14
15 import static dst.ass3.event.model.domain.UploadState.QUEUED;
16 import static dst.ass3.event.model.domain.UploadState.UPLOADED;
17 import static dst.ass3.event.model.domain.UploadState.UPLOADING;
18 import static org.junit.Assert.assertEquals;
19 import static org.junit.Assert.assertNull;
20 import static org.junit.Assert.assertThat;
21
22 public class Ass3_3_2Test extends EventProcessingTestBase {
23
24     private static final Logger LOG = LoggerFactory.getLogger(Ass3_3_2Test.class);
25
26     @Test
27     public void lifecycleEventStream_worksCorrectly() throws Exception {
28         // run flink in new thread
29         Future<JobExecutionResult> flinkExecution = initAndExecuteAsync();
30
31         // wait until a subscriber connects
32         LOG.info("Waiting for subscribers to connect");
33         publisher.waitForClients();
34
35         LifecycleEvent event;
36
37         long then = System.currentTimeMillis();
38
39         // causes LifecycleEvent 0
40         LOG.info("Publishing e0");
41         publisher.publish(new UploadEventInfoDTO(0L, then, QUEUED, "s1", RequestType.VIDEO));
42
43         LOG.info("Collecting LifecycleEvent for e0");
44         event = lifecycleEvents.take();
45         LOG.info("Round-trip took {}ms", System.currentTimeMillis() - then);
46
47         assertEquals("Event ID not set correctly", 0L, event.getRequestId());
48         assertEquals("State not set correctly", QUEUED, event.getState());
49         assertEquals("Request type not set correctly", RequestType.VIDEO, event.getRequestType());
50         assertThat("Timestamp not set correctly", event.getTimestamp(), Matchers.greaterThanOrEqualTo(then));
51         assertThat("Timestamp not set correctly", event.getTimestamp(), Matchers.lessThanOrEqualTo(System.currentTimeMillis()));
52
53         // should be filtered
54         LOG.info("Publishing e1, should be filtered");
55         publisher.publish(new UploadEventInfoDTO(1L, now(), QUEUED, "s1", RequestType.OTHER));
56
57         assertNull("Events of type OTHER should be filtered", lifecycleEvents.poll(500));
58
59         // causes LifecycleEvent 1
60         LOG.info("Publishing e2");
61         publisher.publish(new UploadEventInfoDTO(2L, now(), UPLOADING, "s1", RequestType.DOCUMENT));
62
63         LOG.info("Collecting LifecycleEvent for e2");
64         event = lifecycleEvents.take();
65         assertEquals(2L, event.getRequestId());
66         assertEquals(UPLOADING, event.getState());
67
68         // should be filtered
69         LOG.info("Publishing e3, should be filtered");
70         publisher.publish(new UploadEventInfoDTO(3L, now(), QUEUED, "s2", RequestType.OTHER));
71         assertNull("Events of type OTHER should be filtered", lifecycleEvents.poll(500));
72
73         // causes LifecycleEvent 2
74         LOG.info("Publishing e4");
75         publisher.publish(new UploadEventInfoDTO(4L, now(), UPLOADED, "s1", RequestType.QUIZ));
76
77         LOG.info("Collecting LifecycleEvent for e4");
78         event = lifecycleEvents.take();
79         assertEquals(4L, event.getRequestId());
80         assertEquals(UPLOADED, event.getState());
81
82         // disconnect subscribers
83         publisher.dropClients();
84
85         // wait for execution to end
86         flinkExecution.get();
87     }
88 }