1 package dst.ass3.event.tests;
3 import dst.ass3.event.EventProcessingTestBase;
4 import dst.ass3.event.dto.UploadEventInfoDTO;
5 import dst.ass3.event.model.domain.RequestType;
6 import dst.ass3.event.model.events.LifecycleEvent;
7 import dst.ass3.event.model.events.UploadDuration;
8 import dst.ass3.event.model.events.UploadFailedWarning;
9 import dst.ass3.event.model.events.UploadTimeoutWarning;
10 import org.apache.flink.api.common.JobExecutionResult;
11 import org.apache.flink.streaming.api.windowing.time.Time;
12 import org.junit.Test;
13 import org.slf4j.Logger;
14 import org.slf4j.LoggerFactory;
16 import java.util.ArrayList;
17 import java.util.List;
18 import java.util.concurrent.Future;
20 import static dst.ass3.event.model.domain.UploadState.*;
21 import static org.hamcrest.Matchers.greaterThan;
22 import static org.junit.Assert.assertEquals;
23 import static org.junit.Assert.assertNull;
24 import static org.junit.Assert.assertThat;
26 public class Ass3_3_3Test extends EventProcessingTestBase {
28 private static final Logger LOG = LoggerFactory.getLogger(Ass3_3_3Test.class);
31 public void uploadDurations_areCalculatedCorrectly() throws Exception {
32 // tests whether duration calculation and stream keying works correctly
33 // expects LifecycleEvent stream to work correctly
35 Future<JobExecutionResult> flinkExecution = initAndExecuteAsync();
37 LOG.info("Waiting for subscribers to connect");
38 publisher.waitForClients();
40 LifecycleEvent e1Start;
41 LifecycleEvent e2Start;
45 publisher.publish(5, t -> new UploadEventInfoDTO(1L, t, QUEUED, "s1", RequestType.DOCUMENT)); // 1 starts before 2
46 publisher.publish(5, t -> new UploadEventInfoDTO(2L, t, QUEUED, "s1", RequestType.DOCUMENT));
47 publisher.publish(5, t -> new UploadEventInfoDTO(3L, t, QUEUED, "s1", RequestType.DOCUMENT)); // 3 never finishes
49 LOG.info("Waiting for LifecycleEvent for event 1 being QUEUED");
50 e1Start = lifecycleEvents.take();
51 LOG.info("Waiting for LifecycleEvent for event 2 being QUEUED");
52 e2Start = lifecycleEvents.take();
53 LOG.info("Waiting for LifecycleEvent for event 3 being QUEUED");
54 lifecycleEvents.take();
57 publisher.publish(5, t -> new UploadEventInfoDTO(1L, t, UPLOADING, "s1", RequestType.DOCUMENT));
58 publisher.publish(5, t -> new UploadEventInfoDTO(2L, t, UPLOADING, "s1", RequestType.DOCUMENT));
60 LOG.info("Waiting for LifecycleEvent for event 1 being UPLOADING");
61 lifecycleEvents.take();
62 LOG.info("Waiting for LifecycleEvent for event 2 being UPLOADING");
63 lifecycleEvents.take();
65 sleep(500); // event 2 took about ~1000ms
66 publisher.publish(new UploadEventInfoDTO(2L, now(), UPLOADED, "s1", RequestType.DOCUMENT)); // 2 finishes before 1
68 LOG.info("Waiting for LifecycleEvent for event 2 being UPLOADED");
69 e2End = lifecycleEvents.take();
71 sleep(500); // event 1 took about ~1500ms
72 publisher.publish(new UploadEventInfoDTO(1L, now(), UPLOADED, "s1", RequestType.DOCUMENT));
74 LOG.info("Waiting for LifecycleEvent for event 1 being UPLOADED");
75 e1End = lifecycleEvents.take();
77 LOG.info("Collecting UploadDuration event for event 2");
78 UploadDuration d0 = uploadDurations.take();
80 LOG.info("Collecting UploadDuration event for event 1");
81 UploadDuration d1 = uploadDurations.take();
83 assertEquals("Expected event 2 to finish first", 2L, d0.getEventId()); // event 2 finished before 1
84 assertEquals("Expected event 1 to finish last", 1L, d1.getEventId());
86 assertThat("Expected UploadDuration to be >= 0", d0.getDuration(), greaterThan(0L));
87 assertThat("Expected UploadDuration to be >= 0", d1.getDuration(), greaterThan(0L));
89 assertEquals("StreamDuartion was not calculated from LifecycleEvents correctly",
90 e2End.getTimestamp() - e2Start.getTimestamp(), d0.getDuration(), 100);
92 assertEquals("StreamDuartion was not calculated from LifecycleEvents correctly",
93 e1End.getTimestamp() - e1Start.getTimestamp(), d1.getDuration(), 100);
95 publisher.dropClients();
100 public void durationsWithInterleavedEvents_areCalculatedCorrectly() throws Exception {
101 // tests whether CEP rule is tolerant towards multiple state changes between QUEUED and UPLOADED
103 Future<JobExecutionResult> flinkExecution = initAndExecuteAsync(e ->
104 e.setUploadDurationTimeout(Time.seconds(1))
107 LOG.info("Waiting for subscribers to connect");
108 publisher.waitForClients();
110 publisher.publish(5, t -> new UploadEventInfoDTO(1L, t, QUEUED, "s1", RequestType.DOCUMENT));
111 publisher.publish(5, t -> new UploadEventInfoDTO(2L, t, QUEUED, "s1", RequestType.DOCUMENT)); // never finishes
113 // change state several times (tests another aspect of the CEP rule)
114 publisher.publish(5, t -> new UploadEventInfoDTO(1L, t, UPLOADING, "s1", RequestType.DOCUMENT));
115 publisher.publish(5, t -> new UploadEventInfoDTO(1L, t, UPLOAD_FAILED, "s1", RequestType.DOCUMENT));
116 publisher.publish(5, t -> new UploadEventInfoDTO(1L, t, UPLOADING, "s1", RequestType.DOCUMENT));
117 publisher.publish(5, t -> new UploadEventInfoDTO(1L, t, UPLOAD_FAILED, "s1", RequestType.DOCUMENT));
119 publisher.publish(5, t -> new UploadEventInfoDTO(1L, t, UPLOADED, "s1", RequestType.DOCUMENT));
121 publisher.dropClients();
122 flinkExecution.get();
124 List<UploadDuration> result = new ArrayList<>(uploadDurations.get());
125 assertEquals("Expected one event to have finished", 1, result.size());
127 UploadDuration d0 = result.get(0);
128 assertEquals("Expected event 1 to have finished", 1L, d0.getEventId());
132 public void timeoutWarnings_areEmittedCorrectly() throws Exception {
133 Future<JobExecutionResult> flinkExecution = initAndExecuteAsync(e -> {
134 e.setUploadDurationTimeout(Time.seconds(1));
137 LOG.info("Waiting for subscribers to connect");
138 publisher.waitForClients();
140 publisher.publish(new UploadEventInfoDTO(1L, now(), QUEUED, "s1", RequestType.DOCUMENT)); // never finishes
143 publisher.publish(new UploadEventInfoDTO(2L, now(), QUEUED, "s1", RequestType.DOCUMENT)); // never finishes
147 publisher.publish(new UploadEventInfoDTO(1L, now(), UPLOADING, "s1", RequestType.DOCUMENT));
149 sleep(1500); // wait for event to time out
151 publisher.dropClients();
153 LOG.info("Waiting for Flink execution to end");
154 flinkExecution.get();
156 LOG.info("Collecting timeout warning for event 1");
157 UploadTimeoutWarning w1 = uploadTimeoutWarnings.take();
159 LOG.info("Collecting timeout warning for event 2");
160 UploadTimeoutWarning w2 = uploadTimeoutWarnings.take();
162 assertEquals("Expected event 1 to time out first", 1L, w1.getEventId());
163 assertEquals("Expected event 2 to time out second", 2L, w2.getEventId());
167 public void uploadFailedWarnings_areEmittedCorrectly() throws Exception {
168 Future<JobExecutionResult> flinkExecution = initAndExecuteAsync();
170 LOG.info("Waiting for subscribers to connect");
171 publisher.waitForClients();
173 publisher.publish(0, t -> new UploadEventInfoDTO(1L, t, QUEUED, "s1", RequestType.DOCUMENT));
174 publisher.publish(0, t -> new UploadEventInfoDTO(2L, t, QUEUED, "s1", RequestType.DOCUMENT));
177 publisher.publish(5, t -> new UploadEventInfoDTO(1L, t, UPLOADING, "s1", RequestType.DOCUMENT));
178 publisher.publish(5, t -> new UploadEventInfoDTO(1L, t, UPLOAD_FAILED, "s1", RequestType.DOCUMENT));
181 publisher.publish(5, t -> new UploadEventInfoDTO(2L, t, UPLOADING, "s1", RequestType.DOCUMENT));
182 publisher.publish(5, t -> new UploadEventInfoDTO(2L, t, UPLOAD_FAILED, "s1", RequestType.DOCUMENT));
185 publisher.publish(5, t -> new UploadEventInfoDTO(1L, t, UPLOADING, "s1", RequestType.DOCUMENT));
186 publisher.publish(5, t -> new UploadEventInfoDTO(1L, t, UPLOAD_FAILED, "s1", RequestType.DOCUMENT));
188 // event 2 fail #2 and then success
189 publisher.publish(5, t -> new UploadEventInfoDTO(2L, t, UPLOADING, "s1", RequestType.DOCUMENT));
190 publisher.publish(5, t -> new UploadEventInfoDTO(2L, t, UPLOAD_FAILED, "s1", RequestType.DOCUMENT));
191 publisher.publish(5, t -> new UploadEventInfoDTO(2L, t, UPLOADED, "s1", RequestType.DOCUMENT));
193 LOG.info("Checking that no UploadFailedWarning was issued yet");
194 assertNull(uploadFailedWarnings.poll(500));
196 LOG.info("Triggering third failure");
198 publisher.publish(5, t -> new UploadEventInfoDTO(1L, t, UPLOADING, "s1", RequestType.DOCUMENT));
199 publisher.publish(5, t -> new UploadEventInfoDTO(1L, t, UPLOAD_FAILED, "s1", RequestType.DOCUMENT));
201 LOG.info("Waiting for UploadFailedWarning for event 1");
202 UploadFailedWarning warning = uploadFailedWarnings.take();
203 assertEquals(1L, warning.getEventId());
204 assertEquals("s1", warning.getServer());
206 publisher.dropClients();
207 flinkExecution.get();