]> git.somenet.org - pub/jan/dst18.git/blob - ass3-event/src/test/java/dst/ass3/event/tests/Ass3_3_4Test.java
Add template for assignment 3
[pub/jan/dst18.git] / ass3-event / src / test / java / dst / ass3 / event / tests / Ass3_3_4Test.java
1 package dst.ass3.event.tests;
2
3 import dst.ass3.event.EventProcessingTestBase;
4 import dst.ass3.event.dto.UploadEventInfoDTO;
5 import dst.ass3.event.model.domain.RequestType;
6 import dst.ass3.event.model.events.*;
7 import org.apache.flink.api.common.JobExecutionResult;
8 import org.apache.flink.streaming.api.windowing.time.Time;
9 import org.junit.Test;
10 import org.slf4j.Logger;
11 import org.slf4j.LoggerFactory;
12
13 import java.util.ArrayList;
14 import java.util.List;
15 import java.util.concurrent.Future;
16 import java.util.function.BiConsumer;
17 import java.util.function.Consumer;
18
19 import static dst.ass3.event.model.domain.UploadState.*;
20 import static dst.ass3.event.model.domain.UploadState.QUEUED;
21 import static dst.ass3.event.model.domain.UploadState.UPLOADED;
22 import static org.hamcrest.Matchers.instanceOf;
23 import static org.junit.Assert.*;
24 import static org.junit.Assert.assertEquals;
25 import static org.junit.Assert.assertThat;
26
27 public class Ass3_3_4Test extends EventProcessingTestBase {
28
29     private static final Logger LOG = LoggerFactory.getLogger(Ass3_3_4Test.class);
30
31     @Test
32     public void multipleUploadFailures_triggerAlert() throws Exception {
33         // checks that the window works correctly
34         // expects UploadFailedWarning stream to work correctly
35
36         Future<JobExecutionResult> flinkExecution = initAndExecuteAsync();
37
38         LOG.info("Waiting for subscribers to connect");
39         publisher.waitForClients();
40
41         Consumer<Long> causeWarning = (eventId) -> {
42             publisher.publish(5, t -> new UploadEventInfoDTO(eventId, t, UPLOADING, "s1", RequestType.DOCUMENT));
43             publisher.publish(5, t -> new UploadEventInfoDTO(eventId, t, UPLOAD_FAILED, "s1", RequestType.DOCUMENT));
44             publisher.publish(5, t -> new UploadEventInfoDTO(eventId, t, UPLOADING, "s1", RequestType.DOCUMENT));
45             publisher.publish(5, t -> new UploadEventInfoDTO(eventId, t, UPLOAD_FAILED, "s1", RequestType.DOCUMENT));
46             publisher.publish(5, t -> new UploadEventInfoDTO(eventId, t, UPLOADING, "s1", RequestType.DOCUMENT));
47             publisher.publish(5, t -> new UploadEventInfoDTO(eventId, t, UPLOAD_FAILED, "s1", RequestType.DOCUMENT));
48         };
49
50         // all of these events will fail
51         publisher.publish(new UploadEventInfoDTO(1L, now(), QUEUED, "s1", RequestType.DOCUMENT));
52         publisher.publish(new UploadEventInfoDTO(2L, now(), QUEUED, "s1", RequestType.DOCUMENT));
53         publisher.publish(new UploadEventInfoDTO(3L, now(), QUEUED, "s1", RequestType.DOCUMENT));
54         publisher.publish(new UploadEventInfoDTO(4L, now(), QUEUED, "s1", RequestType.DOCUMENT));
55         publisher.publish(new UploadEventInfoDTO(5L, now(), QUEUED, "s1", RequestType.DOCUMENT));
56         publisher.publish(new UploadEventInfoDTO(6L, now(), QUEUED, "s1", RequestType.DOCUMENT));
57
58         // warning 3 warnings
59         causeWarning.accept(1L);
60         causeWarning.accept(2L);
61         causeWarning.accept(3L);
62
63         LOG.info("Collecting alert for first three warnings");
64         assertNotNull(alerts.take());
65
66         LOG.info("Checking that the fourth warning does not trigger an alert");
67         causeWarning.accept(4L);
68         assertNull(alerts.poll(500));
69
70         LOG.info("Checking that the fifth warning does not trigger an alert");
71         causeWarning.accept(5L);
72         assertNull(alerts.poll(500));
73
74         LOG.info("Checking that the sixth warning triggered an alert");
75         causeWarning.accept(6L);
76         assertNotNull(alerts.take());
77
78         publisher.dropClients();
79         flinkExecution.get();
80     }
81
82     @Test
83     public void uploadFailuresAndTimeouts_triggerAlert() throws Exception {
84         // checks whether keying works correctly, and whether Warning streams are unioned correctly
85         // expects both warning streams to work correctly
86
87         Future<JobExecutionResult> flinkExecution = initAndExecuteAsync(e ->
88                 e.setUploadDurationTimeout(Time.seconds(3))
89         );
90
91         LOG.info("Waiting for subscribers to connect");
92         publisher.waitForClients();
93
94         BiConsumer<Long, String> causeWarning = (eventId, server) -> {
95             publisher.publish(5, t -> new UploadEventInfoDTO(eventId, t, UPLOADING, server, RequestType.DOCUMENT));
96             publisher.publish(5, t -> new UploadEventInfoDTO(eventId, t, UPLOAD_FAILED, server, RequestType.DOCUMENT));
97             publisher.publish(5, t -> new UploadEventInfoDTO(eventId, t, UPLOADING, server, RequestType.DOCUMENT));
98             publisher.publish(5, t -> new UploadEventInfoDTO(eventId, t, UPLOAD_FAILED, server, RequestType.DOCUMENT));
99             publisher.publish(5, t -> new UploadEventInfoDTO(eventId, t, UPLOADING, server, RequestType.DOCUMENT));
100             publisher.publish(5, t -> new UploadEventInfoDTO(eventId, t, UPLOAD_FAILED, server, RequestType.DOCUMENT));
101         };
102
103         publisher.publish(new UploadEventInfoDTO(1L, now(), QUEUED, "s1", RequestType.DOCUMENT)); // s1 e1 will fail
104         publisher.publish(new UploadEventInfoDTO(2L, now(), QUEUED, "s1", RequestType.DOCUMENT)); // s1 e2 will fail
105         publisher.publish(new UploadEventInfoDTO(3L, now(), QUEUED, "s1", RequestType.DOCUMENT)); // s1 e3 will time out
106         publisher.publish(new UploadEventInfoDTO(4L, now(), QUEUED, "s2", RequestType.DOCUMENT)); // s2 e4 will fail
107
108         // s1 warning #1
109         causeWarning.accept(1L, "s1");
110
111         // s1 warning #2
112         causeWarning.accept(2L, "s1");
113
114         // s2 warning #1
115         causeWarning.accept(4L, "s2");
116
117
118         LOG.info("Checking that no alert has been issued yet");
119         assertNull(alerts.poll(500));
120
121         // make sure the other events don't time out
122         publisher.publish(new UploadEventInfoDTO(1L, now(), UPLOADED, "s1", RequestType.DOCUMENT)); // s1 e1 will fail
123         publisher.publish(new UploadEventInfoDTO(2L, now(), UPLOADED, "s1", RequestType.DOCUMENT)); // s1 e2 will fail
124         publisher.publish(new UploadEventInfoDTO(4L, now(), UPLOADED, "s2", RequestType.DOCUMENT)); // s2 e4 will fail
125
126         sleep(4000); // waiting for e3 to time out
127
128         publisher.dropClients();
129         flinkExecution.get();
130
131         LOG.info("Collecting Alert event");
132         Alert alert = alerts.take();
133         assertEquals("Expected only a single alert", 0, alerts.get().size());
134
135         assertEquals("s1", alert.getServer());
136         assertEquals("An alert should comprise three warnings", 3, alert.getWarnings().size());
137
138         Warning w0 = alert.getWarnings().get(0);
139         Warning w1 = alert.getWarnings().get(1);
140         Warning w2 = alert.getWarnings().get(2);
141
142         assertThat(w0, instanceOf(UploadFailedWarning.class));
143         assertThat(w1, instanceOf(UploadFailedWarning.class));
144         assertThat(w2, instanceOf(UploadTimeoutWarning.class));
145
146         assertEquals("s1", w0.getServer());
147         assertEquals("s1", w1.getServer());
148         assertEquals("s1", w2.getServer());
149     }
150
151     @Test
152     public void averageUploadDurationWindow_worksCorrectly() throws Exception {
153         // makes sure the event is triggered at the correct instant
154
155         Future<JobExecutionResult> flinkExecution = initAndExecuteAsync();
156
157         LOG.info("Waiting for subscribers to connect");
158         publisher.waitForClients();
159
160         sleep(250);
161         publisher.publish(new UploadEventInfoDTO(1L, now(), QUEUED, "s1", RequestType.DOCUMENT));
162         publisher.publish(new UploadEventInfoDTO(2L, now(), QUEUED, "s1", RequestType.DOCUMENT));
163         publisher.publish(new UploadEventInfoDTO(3L, now(), QUEUED, "s1", RequestType.DOCUMENT));
164         publisher.publish(new UploadEventInfoDTO(4L, now(), QUEUED, "s1", RequestType.DOCUMENT));
165         publisher.publish(new UploadEventInfoDTO(5L, now(), QUEUED, "s1", RequestType.DOCUMENT));
166         publisher.publish(new UploadEventInfoDTO(6L, now(), QUEUED, "s1", RequestType.DOCUMENT));
167         publisher.publish(new UploadEventInfoDTO(7L, now(), QUEUED, "s2", RequestType.DOCUMENT));
168
169         sleep(100);
170         publisher.publish(new UploadEventInfoDTO(1L, now(), UPLOADING, "s1", RequestType.DOCUMENT));
171         publisher.publish(new UploadEventInfoDTO(2L, now(), UPLOADING, "s1", RequestType.DOCUMENT));
172         publisher.publish(new UploadEventInfoDTO(3L, now(), UPLOADING, "s1", RequestType.DOCUMENT));
173         publisher.publish(new UploadEventInfoDTO(4L, now(), UPLOADING, "s1", RequestType.DOCUMENT));
174         publisher.publish(new UploadEventInfoDTO(5L, now(), UPLOADING, "s1", RequestType.DOCUMENT));
175         publisher.publish(new UploadEventInfoDTO(6L, now(), UPLOADING, "s1", RequestType.DOCUMENT));
176         publisher.publish(new UploadEventInfoDTO(7L, now(), UPLOADING, "s2", RequestType.DOCUMENT));
177
178         sleep(100);
179         publisher.publish(new UploadEventInfoDTO(1L, now(), UPLOADED, "s1", RequestType.DOCUMENT));
180         publisher.publish(new UploadEventInfoDTO(2L, now(), UPLOADED, "s1", RequestType.DOCUMENT));
181         publisher.publish(new UploadEventInfoDTO(3L, now(), UPLOADED, "s1", RequestType.DOCUMENT));
182         publisher.publish(new UploadEventInfoDTO(4L, now(), UPLOADED, "s1", RequestType.DOCUMENT));
183
184         // the first four events should not trigger anything
185         LOG.info("Checking AverageUploadDuration events after the first four UPLOADED events of s1");
186         assertNull(averageUploadDurations.poll(500));
187
188         // this event is from a different server
189         publisher.publish(new UploadEventInfoDTO(7L, now(), UPLOADED, "s2", RequestType.DOCUMENT));
190         LOG.info("Checking AverageUploadDuration events after the first UPLOADED event of s2");
191         assertNull(averageUploadDurations.poll(500));
192
193         // fifth event in s1 triggers the window operation
194         publisher.publish(new UploadEventInfoDTO(5L, now(), UPLOADED, "s1", RequestType.DOCUMENT));
195         LOG.info("Collecting AverageUploadDuration event for s1");
196         AverageUploadDuration event = averageUploadDurations.take();
197         assertNotNull(event);
198         assertEquals("s1", event.getServer());
199
200         // should be in a new window and therefore not trigger
201         publisher.publish(new UploadEventInfoDTO(6L, now(), UPLOADED, "s1", RequestType.DOCUMENT));
202         LOG.info("Checking AverageUploadDuration events after the sixth UPLOADED event of s1");
203         assertNull(averageUploadDurations.poll(500));
204
205         publisher.dropClients();
206
207         flinkExecution.get();
208     }
209
210     @Test
211     public void averageUploadDurations_areCalculatedCorrectly() throws Exception {
212         // makes sure the keying works properly and that the calculation is done from UploadDuration events
213         // requires UploadDuration events to be calculated correctly
214
215         Future<JobExecutionResult> flinkExecution = initAndExecuteAsync();
216
217         LOG.info("Waiting for subscribers to connect");
218         publisher.waitForClients();
219
220         List<UploadDuration> s1Durations = new ArrayList<>(5);
221         List<UploadDuration> s2Durations = new ArrayList<>(5);
222
223         sleep(250);
224         publisher.publish(5, t -> new UploadEventInfoDTO(1L, t, QUEUED, "s1", RequestType.DOCUMENT));
225         publisher.publish(5, t -> new UploadEventInfoDTO(2L, t, QUEUED, "s1", RequestType.DOCUMENT));
226         publisher.publish(5, t -> new UploadEventInfoDTO(3L, t, QUEUED, "s1", RequestType.DOCUMENT));
227         publisher.publish(5, t -> new UploadEventInfoDTO(4L, t, QUEUED, "s1", RequestType.DOCUMENT));
228         publisher.publish(5, t -> new UploadEventInfoDTO(5L, t, QUEUED, "s1", RequestType.DOCUMENT));
229
230         sleep(250);
231         publisher.publish(5, t -> new UploadEventInfoDTO(6L, t, QUEUED, "s2", RequestType.DOCUMENT));
232         publisher.publish(5, t -> new UploadEventInfoDTO(7L, t, QUEUED, "s2", RequestType.DOCUMENT));
233         publisher.publish(5, t -> new UploadEventInfoDTO(8L, t, QUEUED, "s2", RequestType.DOCUMENT));
234         publisher.publish(5, t -> new UploadEventInfoDTO(9L, t, QUEUED, "s2", RequestType.DOCUMENT));
235         publisher.publish(5, t -> new UploadEventInfoDTO(10L, t, QUEUED, "s2", RequestType.DOCUMENT));
236
237         sleep(125);
238         publisher.publish(5, t -> new UploadEventInfoDTO(6L, t, UPLOADING, "s2", RequestType.DOCUMENT));
239         publisher.publish(5, t -> new UploadEventInfoDTO(7L, t, UPLOADING, "s2", RequestType.DOCUMENT));
240         publisher.publish(5, t -> new UploadEventInfoDTO(8L, t, UPLOADING, "s2", RequestType.DOCUMENT));
241         publisher.publish(5, t -> new UploadEventInfoDTO(9L, t, UPLOADING, "s2", RequestType.DOCUMENT));
242         publisher.publish(5, t -> new UploadEventInfoDTO(10L, t, UPLOADING, "s2", RequestType.DOCUMENT));
243
244         sleep(125);
245         publisher.publish(5, t -> new UploadEventInfoDTO(1L, t, UPLOADING, "s1", RequestType.DOCUMENT));
246         publisher.publish(5, t -> new UploadEventInfoDTO(2L, t, UPLOADING, "s1", RequestType.DOCUMENT));
247         publisher.publish(5, t -> new UploadEventInfoDTO(3L, t, UPLOADING, "s1", RequestType.DOCUMENT));
248         publisher.publish(5, t -> new UploadEventInfoDTO(4L, t, UPLOADING, "s1", RequestType.DOCUMENT));
249         publisher.publish(5, t -> new UploadEventInfoDTO(5L, t, UPLOADING, "s1", RequestType.DOCUMENT));
250
251         publisher.publish(5, t -> new UploadEventInfoDTO(6L, t, UPLOADED, "s2", RequestType.DOCUMENT));
252         publisher.publish(5, t -> new UploadEventInfoDTO(7L, t, UPLOADED, "s2", RequestType.DOCUMENT));
253         publisher.publish(5, t -> new UploadEventInfoDTO(8L, t, UPLOADED, "s2", RequestType.DOCUMENT));
254
255         LOG.info("Collecting UploadDuration events 1,2,3 for s2");
256         s2Durations.addAll(uploadDurations.take(3));
257
258         sleep(500);
259         publisher.publish(5, t -> new UploadEventInfoDTO(1L, t, UPLOADED, "s1", RequestType.DOCUMENT));
260         publisher.publish(5, t -> new UploadEventInfoDTO(2L, t, UPLOADED, "s1", RequestType.DOCUMENT));
261         publisher.publish(5, t -> new UploadEventInfoDTO(3L, t, UPLOADED, "s1", RequestType.DOCUMENT));
262
263         LOG.info("Collecting UploadDuration events 1,2,3 for s1");
264         s1Durations.addAll(uploadDurations.take(3));
265
266         publisher.publish(5, t -> new UploadEventInfoDTO(9L, t, UPLOADED, "s2", RequestType.DOCUMENT));
267         publisher.publish(5, t -> new UploadEventInfoDTO(10L, t, UPLOADED, "s2", RequestType.DOCUMENT));
268
269         LOG.info("Collecting UploadDuration events 4,5 for s2");
270         s2Durations.addAll(uploadDurations.take(2));
271
272         sleep(500);
273         publisher.publish(5, t -> new UploadEventInfoDTO(4L, t, UPLOADED, "s1", RequestType.DOCUMENT));
274         publisher.publish(5, t -> new UploadEventInfoDTO(5L, t, UPLOADED, "s1", RequestType.DOCUMENT));
275
276         LOG.info("Collecting UploadDuration events 4,5 for s1");
277         s1Durations.addAll(uploadDurations.take(2));
278
279         LOG.info("Collecting AverageUploadDuration event for s2");
280         AverageUploadDuration e0 = averageUploadDurations.take(); // s2
281
282         LOG.info("Collecting AverageUploadDuration event for s1");
283         AverageUploadDuration e1 = averageUploadDurations.take(); // s1
284
285         assertEquals("Expected calculation for s2 to have been triggered first", e0.getServer(), "s2");
286         assertEquals("Expected calculation for s1 to have been triggered second", e1.getServer(), "s1");
287
288         assertEquals("Wrong number of UploadDuration events for s1", 5, s1Durations.size());
289         assertEquals("Wrong number of UploadDuration events for s2", 5, s2Durations.size());
290
291         double s1Avg = s1Durations.stream().mapToLong(UploadDuration::getDuration).average().orElse(-1);
292         double s2Avg = s2Durations.stream().mapToLong(UploadDuration::getDuration).average().orElse(-1);
293
294         assertEquals("Average duration was not calculated from UploadDuration events correctly", e0.getDuration(), s2Avg, 100);
295         assertEquals("Average duration was not calculated from UploadDuration events correctly", e1.getDuration(), s1Avg, 100);
296
297         publisher.dropClients();
298         flinkExecution.get();
299     }
300 }