1 package dst.ass3.event.tests;
3 import dst.ass3.event.EventProcessingTestBase;
4 import dst.ass3.event.dto.UploadEventInfoDTO;
5 import dst.ass3.event.model.domain.RequestType;
6 import dst.ass3.event.model.events.*;
7 import org.apache.flink.api.common.JobExecutionResult;
8 import org.apache.flink.streaming.api.windowing.time.Time;
10 import org.slf4j.Logger;
11 import org.slf4j.LoggerFactory;
13 import java.util.ArrayList;
14 import java.util.List;
15 import java.util.concurrent.Future;
16 import java.util.function.BiConsumer;
17 import java.util.function.Consumer;
19 import static dst.ass3.event.model.domain.UploadState.*;
20 import static dst.ass3.event.model.domain.UploadState.QUEUED;
21 import static dst.ass3.event.model.domain.UploadState.UPLOADED;
22 import static org.hamcrest.Matchers.instanceOf;
23 import static org.junit.Assert.*;
24 import static org.junit.Assert.assertEquals;
25 import static org.junit.Assert.assertThat;
27 public class Ass3_3_4Test extends EventProcessingTestBase {
29 private static final Logger LOG = LoggerFactory.getLogger(Ass3_3_4Test.class);
32 public void multipleUploadFailures_triggerAlert() throws Exception {
33 // checks that the window works correctly
34 // expects UploadFailedWarning stream to work correctly
36 Future<JobExecutionResult> flinkExecution = initAndExecuteAsync();
38 LOG.info("Waiting for subscribers to connect");
39 publisher.waitForClients();
41 Consumer<Long> causeWarning = (eventId) -> {
42 publisher.publish(5, t -> new UploadEventInfoDTO(eventId, t, UPLOADING, "s1", RequestType.DOCUMENT));
43 publisher.publish(5, t -> new UploadEventInfoDTO(eventId, t, UPLOAD_FAILED, "s1", RequestType.DOCUMENT));
44 publisher.publish(5, t -> new UploadEventInfoDTO(eventId, t, UPLOADING, "s1", RequestType.DOCUMENT));
45 publisher.publish(5, t -> new UploadEventInfoDTO(eventId, t, UPLOAD_FAILED, "s1", RequestType.DOCUMENT));
46 publisher.publish(5, t -> new UploadEventInfoDTO(eventId, t, UPLOADING, "s1", RequestType.DOCUMENT));
47 publisher.publish(5, t -> new UploadEventInfoDTO(eventId, t, UPLOAD_FAILED, "s1", RequestType.DOCUMENT));
50 // all of these events will fail
51 publisher.publish(new UploadEventInfoDTO(1L, now(), QUEUED, "s1", RequestType.DOCUMENT));
52 publisher.publish(new UploadEventInfoDTO(2L, now(), QUEUED, "s1", RequestType.DOCUMENT));
53 publisher.publish(new UploadEventInfoDTO(3L, now(), QUEUED, "s1", RequestType.DOCUMENT));
54 publisher.publish(new UploadEventInfoDTO(4L, now(), QUEUED, "s1", RequestType.DOCUMENT));
55 publisher.publish(new UploadEventInfoDTO(5L, now(), QUEUED, "s1", RequestType.DOCUMENT));
56 publisher.publish(new UploadEventInfoDTO(6L, now(), QUEUED, "s1", RequestType.DOCUMENT));
59 causeWarning.accept(1L);
60 causeWarning.accept(2L);
61 causeWarning.accept(3L);
63 LOG.info("Collecting alert for first three warnings");
64 assertNotNull(alerts.take());
66 LOG.info("Checking that the fourth warning does not trigger an alert");
67 causeWarning.accept(4L);
68 assertNull(alerts.poll(500));
70 LOG.info("Checking that the fifth warning does not trigger an alert");
71 causeWarning.accept(5L);
72 assertNull(alerts.poll(500));
74 LOG.info("Checking that the sixth warning triggered an alert");
75 causeWarning.accept(6L);
76 assertNotNull(alerts.take());
78 publisher.dropClients();
83 public void uploadFailuresAndTimeouts_triggerAlert() throws Exception {
84 // checks whether keying works correctly, and whether Warning streams are unioned correctly
85 // expects both warning streams to work correctly
87 Future<JobExecutionResult> flinkExecution = initAndExecuteAsync(e ->
88 e.setUploadDurationTimeout(Time.seconds(3))
91 LOG.info("Waiting for subscribers to connect");
92 publisher.waitForClients();
94 BiConsumer<Long, String> causeWarning = (eventId, server) -> {
95 publisher.publish(5, t -> new UploadEventInfoDTO(eventId, t, UPLOADING, server, RequestType.DOCUMENT));
96 publisher.publish(5, t -> new UploadEventInfoDTO(eventId, t, UPLOAD_FAILED, server, RequestType.DOCUMENT));
97 publisher.publish(5, t -> new UploadEventInfoDTO(eventId, t, UPLOADING, server, RequestType.DOCUMENT));
98 publisher.publish(5, t -> new UploadEventInfoDTO(eventId, t, UPLOAD_FAILED, server, RequestType.DOCUMENT));
99 publisher.publish(5, t -> new UploadEventInfoDTO(eventId, t, UPLOADING, server, RequestType.DOCUMENT));
100 publisher.publish(5, t -> new UploadEventInfoDTO(eventId, t, UPLOAD_FAILED, server, RequestType.DOCUMENT));
103 publisher.publish(new UploadEventInfoDTO(1L, now(), QUEUED, "s1", RequestType.DOCUMENT)); // s1 e1 will fail
104 publisher.publish(new UploadEventInfoDTO(2L, now(), QUEUED, "s1", RequestType.DOCUMENT)); // s1 e2 will fail
105 publisher.publish(new UploadEventInfoDTO(3L, now(), QUEUED, "s1", RequestType.DOCUMENT)); // s1 e3 will time out
106 publisher.publish(new UploadEventInfoDTO(4L, now(), QUEUED, "s2", RequestType.DOCUMENT)); // s2 e4 will fail
109 causeWarning.accept(1L, "s1");
112 causeWarning.accept(2L, "s1");
115 causeWarning.accept(4L, "s2");
118 LOG.info("Checking that no alert has been issued yet");
119 assertNull(alerts.poll(500));
121 // make sure the other events don't time out
122 publisher.publish(new UploadEventInfoDTO(1L, now(), UPLOADED, "s1", RequestType.DOCUMENT)); // s1 e1 will fail
123 publisher.publish(new UploadEventInfoDTO(2L, now(), UPLOADED, "s1", RequestType.DOCUMENT)); // s1 e2 will fail
124 publisher.publish(new UploadEventInfoDTO(4L, now(), UPLOADED, "s2", RequestType.DOCUMENT)); // s2 e4 will fail
126 sleep(4000); // waiting for e3 to time out
128 publisher.dropClients();
129 flinkExecution.get();
131 LOG.info("Collecting Alert event");
132 Alert alert = alerts.take();
133 assertEquals("Expected only a single alert", 0, alerts.get().size());
135 assertEquals("s1", alert.getServer());
136 assertEquals("An alert should comprise three warnings", 3, alert.getWarnings().size());
138 Warning w0 = alert.getWarnings().get(0);
139 Warning w1 = alert.getWarnings().get(1);
140 Warning w2 = alert.getWarnings().get(2);
142 assertThat(w0, instanceOf(UploadFailedWarning.class));
143 assertThat(w1, instanceOf(UploadFailedWarning.class));
144 assertThat(w2, instanceOf(UploadTimeoutWarning.class));
146 assertEquals("s1", w0.getServer());
147 assertEquals("s1", w1.getServer());
148 assertEquals("s1", w2.getServer());
152 public void averageUploadDurationWindow_worksCorrectly() throws Exception {
153 // makes sure the event is triggered at the correct instant
155 Future<JobExecutionResult> flinkExecution = initAndExecuteAsync();
157 LOG.info("Waiting for subscribers to connect");
158 publisher.waitForClients();
161 publisher.publish(new UploadEventInfoDTO(1L, now(), QUEUED, "s1", RequestType.DOCUMENT));
162 publisher.publish(new UploadEventInfoDTO(2L, now(), QUEUED, "s1", RequestType.DOCUMENT));
163 publisher.publish(new UploadEventInfoDTO(3L, now(), QUEUED, "s1", RequestType.DOCUMENT));
164 publisher.publish(new UploadEventInfoDTO(4L, now(), QUEUED, "s1", RequestType.DOCUMENT));
165 publisher.publish(new UploadEventInfoDTO(5L, now(), QUEUED, "s1", RequestType.DOCUMENT));
166 publisher.publish(new UploadEventInfoDTO(6L, now(), QUEUED, "s1", RequestType.DOCUMENT));
167 publisher.publish(new UploadEventInfoDTO(7L, now(), QUEUED, "s2", RequestType.DOCUMENT));
170 publisher.publish(new UploadEventInfoDTO(1L, now(), UPLOADING, "s1", RequestType.DOCUMENT));
171 publisher.publish(new UploadEventInfoDTO(2L, now(), UPLOADING, "s1", RequestType.DOCUMENT));
172 publisher.publish(new UploadEventInfoDTO(3L, now(), UPLOADING, "s1", RequestType.DOCUMENT));
173 publisher.publish(new UploadEventInfoDTO(4L, now(), UPLOADING, "s1", RequestType.DOCUMENT));
174 publisher.publish(new UploadEventInfoDTO(5L, now(), UPLOADING, "s1", RequestType.DOCUMENT));
175 publisher.publish(new UploadEventInfoDTO(6L, now(), UPLOADING, "s1", RequestType.DOCUMENT));
176 publisher.publish(new UploadEventInfoDTO(7L, now(), UPLOADING, "s2", RequestType.DOCUMENT));
179 publisher.publish(new UploadEventInfoDTO(1L, now(), UPLOADED, "s1", RequestType.DOCUMENT));
180 publisher.publish(new UploadEventInfoDTO(2L, now(), UPLOADED, "s1", RequestType.DOCUMENT));
181 publisher.publish(new UploadEventInfoDTO(3L, now(), UPLOADED, "s1", RequestType.DOCUMENT));
182 publisher.publish(new UploadEventInfoDTO(4L, now(), UPLOADED, "s1", RequestType.DOCUMENT));
184 // the first four events should not trigger anything
185 LOG.info("Checking AverageUploadDuration events after the first four UPLOADED events of s1");
186 assertNull(averageUploadDurations.poll(500));
188 // this event is from a different server
189 publisher.publish(new UploadEventInfoDTO(7L, now(), UPLOADED, "s2", RequestType.DOCUMENT));
190 LOG.info("Checking AverageUploadDuration events after the first UPLOADED event of s2");
191 assertNull(averageUploadDurations.poll(500));
193 // fifth event in s1 triggers the window operation
194 publisher.publish(new UploadEventInfoDTO(5L, now(), UPLOADED, "s1", RequestType.DOCUMENT));
195 LOG.info("Collecting AverageUploadDuration event for s1");
196 AverageUploadDuration event = averageUploadDurations.take();
197 assertNotNull(event);
198 assertEquals("s1", event.getServer());
200 // should be in a new window and therefore not trigger
201 publisher.publish(new UploadEventInfoDTO(6L, now(), UPLOADED, "s1", RequestType.DOCUMENT));
202 LOG.info("Checking AverageUploadDuration events after the sixth UPLOADED event of s1");
203 assertNull(averageUploadDurations.poll(500));
205 publisher.dropClients();
207 flinkExecution.get();
211 public void averageUploadDurations_areCalculatedCorrectly() throws Exception {
212 // makes sure the keying works properly and that the calculation is done from UploadDuration events
213 // requires UploadDuration events to be calculated correctly
215 Future<JobExecutionResult> flinkExecution = initAndExecuteAsync();
217 LOG.info("Waiting for subscribers to connect");
218 publisher.waitForClients();
220 List<UploadDuration> s1Durations = new ArrayList<>(5);
221 List<UploadDuration> s2Durations = new ArrayList<>(5);
224 publisher.publish(5, t -> new UploadEventInfoDTO(1L, t, QUEUED, "s1", RequestType.DOCUMENT));
225 publisher.publish(5, t -> new UploadEventInfoDTO(2L, t, QUEUED, "s1", RequestType.DOCUMENT));
226 publisher.publish(5, t -> new UploadEventInfoDTO(3L, t, QUEUED, "s1", RequestType.DOCUMENT));
227 publisher.publish(5, t -> new UploadEventInfoDTO(4L, t, QUEUED, "s1", RequestType.DOCUMENT));
228 publisher.publish(5, t -> new UploadEventInfoDTO(5L, t, QUEUED, "s1", RequestType.DOCUMENT));
231 publisher.publish(5, t -> new UploadEventInfoDTO(6L, t, QUEUED, "s2", RequestType.DOCUMENT));
232 publisher.publish(5, t -> new UploadEventInfoDTO(7L, t, QUEUED, "s2", RequestType.DOCUMENT));
233 publisher.publish(5, t -> new UploadEventInfoDTO(8L, t, QUEUED, "s2", RequestType.DOCUMENT));
234 publisher.publish(5, t -> new UploadEventInfoDTO(9L, t, QUEUED, "s2", RequestType.DOCUMENT));
235 publisher.publish(5, t -> new UploadEventInfoDTO(10L, t, QUEUED, "s2", RequestType.DOCUMENT));
238 publisher.publish(5, t -> new UploadEventInfoDTO(6L, t, UPLOADING, "s2", RequestType.DOCUMENT));
239 publisher.publish(5, t -> new UploadEventInfoDTO(7L, t, UPLOADING, "s2", RequestType.DOCUMENT));
240 publisher.publish(5, t -> new UploadEventInfoDTO(8L, t, UPLOADING, "s2", RequestType.DOCUMENT));
241 publisher.publish(5, t -> new UploadEventInfoDTO(9L, t, UPLOADING, "s2", RequestType.DOCUMENT));
242 publisher.publish(5, t -> new UploadEventInfoDTO(10L, t, UPLOADING, "s2", RequestType.DOCUMENT));
245 publisher.publish(5, t -> new UploadEventInfoDTO(1L, t, UPLOADING, "s1", RequestType.DOCUMENT));
246 publisher.publish(5, t -> new UploadEventInfoDTO(2L, t, UPLOADING, "s1", RequestType.DOCUMENT));
247 publisher.publish(5, t -> new UploadEventInfoDTO(3L, t, UPLOADING, "s1", RequestType.DOCUMENT));
248 publisher.publish(5, t -> new UploadEventInfoDTO(4L, t, UPLOADING, "s1", RequestType.DOCUMENT));
249 publisher.publish(5, t -> new UploadEventInfoDTO(5L, t, UPLOADING, "s1", RequestType.DOCUMENT));
251 publisher.publish(5, t -> new UploadEventInfoDTO(6L, t, UPLOADED, "s2", RequestType.DOCUMENT));
252 publisher.publish(5, t -> new UploadEventInfoDTO(7L, t, UPLOADED, "s2", RequestType.DOCUMENT));
253 publisher.publish(5, t -> new UploadEventInfoDTO(8L, t, UPLOADED, "s2", RequestType.DOCUMENT));
255 LOG.info("Collecting UploadDuration events 1,2,3 for s2");
256 s2Durations.addAll(uploadDurations.take(3));
259 publisher.publish(5, t -> new UploadEventInfoDTO(1L, t, UPLOADED, "s1", RequestType.DOCUMENT));
260 publisher.publish(5, t -> new UploadEventInfoDTO(2L, t, UPLOADED, "s1", RequestType.DOCUMENT));
261 publisher.publish(5, t -> new UploadEventInfoDTO(3L, t, UPLOADED, "s1", RequestType.DOCUMENT));
263 LOG.info("Collecting UploadDuration events 1,2,3 for s1");
264 s1Durations.addAll(uploadDurations.take(3));
266 publisher.publish(5, t -> new UploadEventInfoDTO(9L, t, UPLOADED, "s2", RequestType.DOCUMENT));
267 publisher.publish(5, t -> new UploadEventInfoDTO(10L, t, UPLOADED, "s2", RequestType.DOCUMENT));
269 LOG.info("Collecting UploadDuration events 4,5 for s2");
270 s2Durations.addAll(uploadDurations.take(2));
273 publisher.publish(5, t -> new UploadEventInfoDTO(4L, t, UPLOADED, "s1", RequestType.DOCUMENT));
274 publisher.publish(5, t -> new UploadEventInfoDTO(5L, t, UPLOADED, "s1", RequestType.DOCUMENT));
276 LOG.info("Collecting UploadDuration events 4,5 for s1");
277 s1Durations.addAll(uploadDurations.take(2));
279 LOG.info("Collecting AverageUploadDuration event for s2");
280 AverageUploadDuration e0 = averageUploadDurations.take(); // s2
282 LOG.info("Collecting AverageUploadDuration event for s1");
283 AverageUploadDuration e1 = averageUploadDurations.take(); // s1
285 assertEquals("Expected calculation for s2 to have been triggered first", e0.getServer(), "s2");
286 assertEquals("Expected calculation for s1 to have been triggered second", e1.getServer(), "s1");
288 assertEquals("Wrong number of UploadDuration events for s1", 5, s1Durations.size());
289 assertEquals("Wrong number of UploadDuration events for s2", 5, s2Durations.size());
291 double s1Avg = s1Durations.stream().mapToLong(UploadDuration::getDuration).average().orElse(-1);
292 double s2Avg = s2Durations.stream().mapToLong(UploadDuration::getDuration).average().orElse(-1);
294 assertEquals("Average duration was not calculated from UploadDuration events correctly", e0.getDuration(), s2Avg, 100);
295 assertEquals("Average duration was not calculated from UploadDuration events correctly", e1.getDuration(), s1Avg, 100);
297 publisher.dropClients();
298 flinkExecution.get();