]> git.somenet.org - pub/jan/dst18.git/blob - ass3-event/src/test/java/dst/ass3/event/Ass3EventTestBase.java
[3.3] first part.
[pub/jan/dst18.git] / ass3-event / src / test / java / dst / ass3 / event / Ass3EventTestBase.java
1 package dst.ass3.event;
2
3 import java.util.concurrent.ExecutorService;
4 import java.util.concurrent.Executors;
5 import java.util.concurrent.TimeUnit;
6
7 import org.apache.flink.api.common.ExecutionConfig;
8 import org.apache.flink.shaded.guava18.com.google.common.util.concurrent.MoreExecutors;
9 import org.apache.flink.streaming.api.TimeCharacteristic;
10 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
11 import org.junit.After;
12 import org.junit.Before;
13 import org.slf4j.Logger;
14 import org.slf4j.LoggerFactory;
15
16 /**
17  * Ass3EventTestBase.
18  */
19 public abstract class Ass3EventTestBase {
20
21     private static final Logger LOG = LoggerFactory.getLogger(Ass3EventTestBase.class);
22
23     /**
24      * Flag to control Flink's sysout logging
25      */
26     public static final boolean FLINK_DEBUG = false;
27
28     protected EventPublisher publisher;
29     protected StreamExecutionEnvironment flink;
30     protected ExecutorService executor;
31
32     private static EventPublisher previousPublisher;
33
34     @Before
35     public void setUpResources() throws Exception {
36         executor = Executors.newCachedThreadPool();
37
38         if (previousPublisher != null) {
39             previousPublisher.close();
40         }
41
42         publisher = createEventPublisher();
43         previousPublisher = publisher;
44         publisher.start();
45
46         flink = createStreamExecutionEnvironment();
47         doConfigure(flink);
48     }
49
50     @After
51     public void tearDownResources() throws Exception {
52         publisher.close();
53         MoreExecutors.shutdownAndAwaitTermination(executor, 5, TimeUnit.SECONDS);
54         previousPublisher = null;
55     }
56
57     protected EventPublisher createEventPublisher() {
58         return new EventPublisher(Constants.EVENT_PUBLISHER_PORT);
59     }
60
61     protected StreamExecutionEnvironment createStreamExecutionEnvironment() {
62         return StreamExecutionEnvironment.createLocalEnvironment(1);
63     }
64
65     protected void doConfigure(StreamExecutionEnvironment env) {
66         ExecutionConfig config = env.getConfig();
67
68         if (FLINK_DEBUG) {
69             config.enableSysoutLogging();
70         } else {
71             config.disableSysoutLogging();
72         }
73
74         flink.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
75     }
76
77     protected static void sleep(long ms) {
78         try {
79             Thread.sleep(ms);
80         } catch (InterruptedException e) {
81             // ignore
82         }
83     }
84
85     protected static long now() {
86         return System.currentTimeMillis();
87     }
88
89 }