1 package dst.ass3.event;
3 import static org.junit.Assert.assertNotNull;
5 import java.util.concurrent.Future;
6 import java.util.concurrent.TimeUnit;
7 import java.util.function.Consumer;
9 import dst.ass3.event.model.events.*;
10 import org.apache.flink.api.common.JobExecutionResult;
11 import org.apache.flink.streaming.api.windowing.time.Time;
12 import org.junit.After;
13 import org.junit.Before;
14 import org.junit.Rule;
15 import org.junit.rules.Timeout;
16 import org.slf4j.Logger;
17 import org.slf4j.LoggerFactory;
20 * EventProcessingTestBase.
22 public class EventProcessingTestBase extends Ass3EventTestBase {
24 private static final Logger LOG = LoggerFactory.getLogger(EventProcessingTestBase.class);
27 public Timeout timeout = new Timeout(15, TimeUnit.SECONDS);
29 private IEventProcessingEnvironment epe;
31 protected StaticQueueSink<LifecycleEvent> lifecycleEvents;
32 protected StaticQueueSink<UploadDuration> uploadDurations;
33 protected StaticQueueSink<AverageUploadDuration> averageUploadDurations;
34 protected StaticQueueSink<UploadTimeoutWarning> uploadTimeoutWarnings;
35 protected StaticQueueSink<UploadFailedWarning> uploadFailedWarnings;
36 protected StaticQueueSink<Alert> alerts;
39 public void setUpEnvironment() throws Exception {
40 epe = EventProcessingFactory.createEventProcessingEnvironment();
42 assertNotNull("#createEventProcessingEnvironment() not implemented", epe);
44 lifecycleEvents = new StaticQueueSink<>("lifecycleEvents");
45 uploadDurations = new StaticQueueSink<>("uploadDurations");
46 averageUploadDurations = new StaticQueueSink<>("averageUploadDurations");
47 uploadTimeoutWarnings = new StaticQueueSink<>("uploadTimeoutWarnings");
48 uploadFailedWarnings = new StaticQueueSink<>("uploadFailedWarnings");
49 alerts = new StaticQueueSink<>("alerts");
51 epe.setLifecycleEventStreamSink(lifecycleEvents);
52 epe.setUploadDurationStreamSink(uploadDurations);
53 epe.setAverageUploadDurationStreamSink(averageUploadDurations);
54 epe.setUploadTimeoutWarningStreamSink(uploadTimeoutWarnings);
55 epe.setUploadFailedWarningStreamSink(uploadFailedWarnings);
56 epe.setAlertStreamSink(alerts);
57 epe.setUploadDurationTimeout(Time.seconds(15));
60 public JobExecutionResult initAndExecute() throws Exception {
61 return initAndExecute(null);
64 public JobExecutionResult initAndExecute(Consumer<IEventProcessingEnvironment> initializer) throws Exception {
65 if (initializer != null) {
66 initializer.accept(epe);
68 LOG.info("Initializing StreamExecutionEnvironment with {}", epe);
69 epe.initialize(flink);
70 LOG.info("Executing flink {}", flink);
71 return flink.execute();
74 public Future<JobExecutionResult> initAndExecuteAsync(Consumer<IEventProcessingEnvironment> initializer) {
75 return executor.submit(() -> initAndExecute(initializer));
78 public Future<JobExecutionResult> initAndExecuteAsync() {
79 return executor.submit(() -> initAndExecute());
83 public void tearDownCollectors() throws Exception {
84 StaticQueueSink.clearAll();