]> git.somenet.org - pub/jan/dst18.git/blob - ass3-event/src/test/java/dst/ass3/event/EventProcessingTestBase.java
[3.1] refactored queues - removed manual bindings.
[pub/jan/dst18.git] / ass3-event / src / test / java / dst / ass3 / event / EventProcessingTestBase.java
1 package dst.ass3.event;
2
3 import static org.junit.Assert.assertNotNull;
4
5 import java.util.concurrent.Future;
6 import java.util.concurrent.TimeUnit;
7 import java.util.function.Consumer;
8
9 import dst.ass3.event.model.events.*;
10 import org.apache.flink.api.common.JobExecutionResult;
11 import org.apache.flink.streaming.api.windowing.time.Time;
12 import org.junit.After;
13 import org.junit.Before;
14 import org.junit.Rule;
15 import org.junit.rules.Timeout;
16 import org.slf4j.Logger;
17 import org.slf4j.LoggerFactory;
18
19 /**
20  * EventProcessingTestBase.
21  */
22 public class EventProcessingTestBase extends Ass3EventTestBase {
23
24     private static final Logger LOG = LoggerFactory.getLogger(EventProcessingTestBase.class);
25
26     @Rule
27     public Timeout timeout = new Timeout(15, TimeUnit.SECONDS);
28
29     private IEventProcessingEnvironment epe;
30
31     protected StaticQueueSink<LifecycleEvent> lifecycleEvents;
32     protected StaticQueueSink<UploadDuration> uploadDurations;
33     protected StaticQueueSink<AverageUploadDuration> averageUploadDurations;
34     protected StaticQueueSink<UploadTimeoutWarning> uploadTimeoutWarnings;
35     protected StaticQueueSink<UploadFailedWarning> uploadFailedWarnings;
36     protected StaticQueueSink<Alert> alerts;
37
38     @Before
39     public void setUpEnvironment() throws Exception {
40         epe = EventProcessingFactory.createEventProcessingEnvironment();
41
42         assertNotNull("#createEventProcessingEnvironment() not implemented", epe);
43
44         lifecycleEvents = new StaticQueueSink<>("lifecycleEvents");
45         uploadDurations = new StaticQueueSink<>("uploadDurations");
46         averageUploadDurations = new StaticQueueSink<>("averageUploadDurations");
47         uploadTimeoutWarnings = new StaticQueueSink<>("uploadTimeoutWarnings");
48         uploadFailedWarnings = new StaticQueueSink<>("uploadFailedWarnings");
49         alerts = new StaticQueueSink<>("alerts");
50
51         epe.setLifecycleEventStreamSink(lifecycleEvents);
52         epe.setUploadDurationStreamSink(uploadDurations);
53         epe.setAverageUploadDurationStreamSink(averageUploadDurations);
54         epe.setUploadTimeoutWarningStreamSink(uploadTimeoutWarnings);
55         epe.setUploadFailedWarningStreamSink(uploadFailedWarnings);
56         epe.setAlertStreamSink(alerts);
57         epe.setUploadDurationTimeout(Time.seconds(15));
58     }
59
60     public JobExecutionResult initAndExecute() throws Exception {
61         return initAndExecute(null);
62     }
63
64     public JobExecutionResult initAndExecute(Consumer<IEventProcessingEnvironment> initializer) throws Exception {
65         if (initializer != null) {
66             initializer.accept(epe);
67         }
68         LOG.info("Initializing StreamExecutionEnvironment with {}", epe);
69         epe.initialize(flink);
70         LOG.info("Executing flink {}", flink);
71         return flink.execute();
72     }
73
74     public Future<JobExecutionResult> initAndExecuteAsync(Consumer<IEventProcessingEnvironment> initializer) {
75         return executor.submit(() -> initAndExecute(initializer));
76     }
77
78     public Future<JobExecutionResult> initAndExecuteAsync() {
79         return executor.submit(() -> initAndExecute());
80     }
81
82     @After
83     public void tearDownCollectors() throws Exception {
84         StaticQueueSink.clearAll();
85     }
86
87 }