]> git.somenet.org - pub/jan/dst18.git/blob - ass3-event/src/test/java/dst/ass3/event/tests/Ass3_3_3Test.java
Add template for assignment 3
[pub/jan/dst18.git] / ass3-event / src / test / java / dst / ass3 / event / tests / Ass3_3_3Test.java
1 package dst.ass3.event.tests;
2
3 import dst.ass3.event.EventProcessingTestBase;
4 import dst.ass3.event.dto.UploadEventInfoDTO;
5 import dst.ass3.event.model.domain.RequestType;
6 import dst.ass3.event.model.events.LifecycleEvent;
7 import dst.ass3.event.model.events.UploadDuration;
8 import dst.ass3.event.model.events.UploadFailedWarning;
9 import dst.ass3.event.model.events.UploadTimeoutWarning;
10 import org.apache.flink.api.common.JobExecutionResult;
11 import org.apache.flink.streaming.api.windowing.time.Time;
12 import org.junit.Test;
13 import org.slf4j.Logger;
14 import org.slf4j.LoggerFactory;
15
16 import java.util.ArrayList;
17 import java.util.List;
18 import java.util.concurrent.Future;
19
20 import static dst.ass3.event.model.domain.UploadState.*;
21 import static org.hamcrest.Matchers.greaterThan;
22 import static org.junit.Assert.assertEquals;
23 import static org.junit.Assert.assertNull;
24 import static org.junit.Assert.assertThat;
25
26 public class Ass3_3_3Test extends EventProcessingTestBase {
27
28     private static final Logger LOG = LoggerFactory.getLogger(Ass3_3_3Test.class);
29
30     @Test
31     public void uploadDurations_areCalculatedCorrectly() throws Exception {
32         // tests whether duration calculation and stream keying works correctly
33         // expects LifecycleEvent stream to work correctly
34
35         Future<JobExecutionResult> flinkExecution = initAndExecuteAsync();
36
37         LOG.info("Waiting for subscribers to connect");
38         publisher.waitForClients();
39
40         LifecycleEvent e1Start;
41         LifecycleEvent e2Start;
42         LifecycleEvent e1End;
43         LifecycleEvent e2End;
44
45         publisher.publish(5, t -> new UploadEventInfoDTO(1L, t, QUEUED, "s1", RequestType.DOCUMENT)); // 1 starts before 2
46         publisher.publish(5, t -> new UploadEventInfoDTO(2L, t, QUEUED, "s1", RequestType.DOCUMENT));
47         publisher.publish(5, t -> new UploadEventInfoDTO(3L, t, QUEUED, "s1", RequestType.DOCUMENT)); // 3 never finishes
48
49         LOG.info("Waiting for LifecycleEvent for event 1 being QUEUED");
50         e1Start = lifecycleEvents.take();
51         LOG.info("Waiting for LifecycleEvent for event 2 being QUEUED");
52         e2Start = lifecycleEvents.take();
53         LOG.info("Waiting for LifecycleEvent for event 3 being QUEUED");
54         lifecycleEvents.take();
55
56         sleep(500);
57         publisher.publish(5, t -> new UploadEventInfoDTO(1L, t, UPLOADING, "s1", RequestType.DOCUMENT));
58         publisher.publish(5, t -> new UploadEventInfoDTO(2L, t, UPLOADING, "s1", RequestType.DOCUMENT));
59
60         LOG.info("Waiting for LifecycleEvent for event 1 being UPLOADING");
61         lifecycleEvents.take();
62         LOG.info("Waiting for LifecycleEvent for event 2 being UPLOADING");
63         lifecycleEvents.take();
64
65         sleep(500); // event 2 took about ~1000ms
66         publisher.publish(new UploadEventInfoDTO(2L, now(), UPLOADED, "s1", RequestType.DOCUMENT)); // 2 finishes before 1
67
68         LOG.info("Waiting for LifecycleEvent for event 2 being UPLOADED");
69         e2End = lifecycleEvents.take();
70
71         sleep(500); // event 1 took about ~1500ms
72         publisher.publish(new UploadEventInfoDTO(1L, now(), UPLOADED, "s1", RequestType.DOCUMENT));
73
74         LOG.info("Waiting for LifecycleEvent for event 1 being UPLOADED");
75         e1End = lifecycleEvents.take();
76
77         LOG.info("Collecting UploadDuration event for event 2");
78         UploadDuration d0 = uploadDurations.take();
79
80         LOG.info("Collecting UploadDuration event for event 1");
81         UploadDuration d1 = uploadDurations.take();
82
83         assertEquals("Expected event 2 to finish first", 2L, d0.getEventId()); // event 2 finished before 1
84         assertEquals("Expected event 1 to finish last", 1L, d1.getEventId());
85
86         assertThat("Expected UploadDuration to be >= 0", d0.getDuration(), greaterThan(0L));
87         assertThat("Expected UploadDuration to be >= 0", d1.getDuration(), greaterThan(0L));
88
89         assertEquals("StreamDuartion was not calculated from LifecycleEvents correctly",
90                 e2End.getTimestamp() - e2Start.getTimestamp(), d0.getDuration(), 100);
91
92         assertEquals("StreamDuartion was not calculated from LifecycleEvents correctly",
93                 e1End.getTimestamp() - e1Start.getTimestamp(), d1.getDuration(), 100);
94
95         publisher.dropClients();
96         flinkExecution.get();
97     }
98
99     @Test
100     public void durationsWithInterleavedEvents_areCalculatedCorrectly() throws Exception {
101         // tests whether CEP rule is tolerant towards multiple state changes between QUEUED and UPLOADED
102
103         Future<JobExecutionResult> flinkExecution = initAndExecuteAsync(e ->
104                 e.setUploadDurationTimeout(Time.seconds(1))
105         );
106
107         LOG.info("Waiting for subscribers to connect");
108         publisher.waitForClients();
109
110         publisher.publish(5, t -> new UploadEventInfoDTO(1L, t, QUEUED, "s1", RequestType.DOCUMENT));
111         publisher.publish(5, t -> new UploadEventInfoDTO(2L, t, QUEUED, "s1", RequestType.DOCUMENT)); // never finishes
112
113         // change state several times (tests another aspect of the CEP rule)
114         publisher.publish(5, t -> new UploadEventInfoDTO(1L, t, UPLOADING, "s1", RequestType.DOCUMENT));
115         publisher.publish(5, t -> new UploadEventInfoDTO(1L, t, UPLOAD_FAILED, "s1", RequestType.DOCUMENT));
116         publisher.publish(5, t -> new UploadEventInfoDTO(1L, t, UPLOADING, "s1", RequestType.DOCUMENT));
117         publisher.publish(5, t -> new UploadEventInfoDTO(1L, t, UPLOAD_FAILED, "s1", RequestType.DOCUMENT));
118
119         publisher.publish(5, t -> new UploadEventInfoDTO(1L, t, UPLOADED, "s1", RequestType.DOCUMENT));
120
121         publisher.dropClients();
122         flinkExecution.get();
123
124         List<UploadDuration> result = new ArrayList<>(uploadDurations.get());
125         assertEquals("Expected one event to have finished", 1, result.size());
126
127         UploadDuration d0 = result.get(0);
128         assertEquals("Expected event 1 to have finished", 1L, d0.getEventId());
129     }
130
131     @Test
132     public void timeoutWarnings_areEmittedCorrectly() throws Exception {
133         Future<JobExecutionResult> flinkExecution = initAndExecuteAsync(e -> {
134             e.setUploadDurationTimeout(Time.seconds(1));
135         });
136
137         LOG.info("Waiting for subscribers to connect");
138         publisher.waitForClients();
139
140         publisher.publish(new UploadEventInfoDTO(1L, now(), QUEUED, "s1", RequestType.DOCUMENT)); // never finishes
141
142         sleep(100);
143         publisher.publish(new UploadEventInfoDTO(2L, now(), QUEUED, "s1", RequestType.DOCUMENT)); // never finishes
144
145         // confounding event
146         sleep(100);
147         publisher.publish(new UploadEventInfoDTO(1L, now(), UPLOADING, "s1", RequestType.DOCUMENT));
148
149         sleep(1500); // wait for event to time out
150
151         publisher.dropClients();
152
153         LOG.info("Waiting for Flink execution to end");
154         flinkExecution.get();
155
156         LOG.info("Collecting timeout warning for event 1");
157         UploadTimeoutWarning w1 = uploadTimeoutWarnings.take();
158
159         LOG.info("Collecting timeout warning for event 2");
160         UploadTimeoutWarning w2 = uploadTimeoutWarnings.take();
161
162         assertEquals("Expected event 1 to time out first", 1L, w1.getEventId());
163         assertEquals("Expected event 2 to time out second", 2L, w2.getEventId());
164     }
165
166     @Test
167     public void uploadFailedWarnings_areEmittedCorrectly() throws Exception {
168         Future<JobExecutionResult> flinkExecution = initAndExecuteAsync();
169
170         LOG.info("Waiting for subscribers to connect");
171         publisher.waitForClients();
172
173         publisher.publish(0, t -> new UploadEventInfoDTO(1L, t, QUEUED, "s1", RequestType.DOCUMENT));
174         publisher.publish(0, t -> new UploadEventInfoDTO(2L, t, QUEUED, "s1", RequestType.DOCUMENT));
175
176         // event 1 fail #1
177         publisher.publish(5, t -> new UploadEventInfoDTO(1L, t, UPLOADING, "s1", RequestType.DOCUMENT));
178         publisher.publish(5, t -> new UploadEventInfoDTO(1L, t, UPLOAD_FAILED, "s1", RequestType.DOCUMENT));
179
180         // event 2 fail #1
181         publisher.publish(5, t -> new UploadEventInfoDTO(2L, t, UPLOADING, "s1", RequestType.DOCUMENT));
182         publisher.publish(5, t -> new UploadEventInfoDTO(2L, t, UPLOAD_FAILED, "s1", RequestType.DOCUMENT));
183
184         // event 1 fail #2
185         publisher.publish(5, t -> new UploadEventInfoDTO(1L, t, UPLOADING, "s1", RequestType.DOCUMENT));
186         publisher.publish(5, t -> new UploadEventInfoDTO(1L, t, UPLOAD_FAILED, "s1", RequestType.DOCUMENT));
187
188         // event 2 fail #2 and then success
189         publisher.publish(5, t -> new UploadEventInfoDTO(2L, t, UPLOADING, "s1", RequestType.DOCUMENT));
190         publisher.publish(5, t -> new UploadEventInfoDTO(2L, t, UPLOAD_FAILED, "s1", RequestType.DOCUMENT));
191         publisher.publish(5, t -> new UploadEventInfoDTO(2L, t, UPLOADED, "s1", RequestType.DOCUMENT));
192
193         LOG.info("Checking that no UploadFailedWarning was issued yet");
194         assertNull(uploadFailedWarnings.poll(500));
195
196         LOG.info("Triggering third failure");
197         // event 1 fail #3
198         publisher.publish(5, t -> new UploadEventInfoDTO(1L, t, UPLOADING, "s1", RequestType.DOCUMENT));
199         publisher.publish(5, t -> new UploadEventInfoDTO(1L, t, UPLOAD_FAILED, "s1", RequestType.DOCUMENT));
200
201         LOG.info("Waiting for UploadFailedWarning for event 1");
202         UploadFailedWarning warning = uploadFailedWarnings.take();
203         assertEquals(1L, warning.getEventId());
204         assertEquals("s1", warning.getServer());
205
206         publisher.dropClients();
207         flinkExecution.get();
208     }
209 }