1 package dst.ass3.event;
3 import java.util.concurrent.ExecutorService;
4 import java.util.concurrent.Executors;
5 import java.util.concurrent.TimeUnit;
7 import org.apache.flink.api.common.ExecutionConfig;
8 import org.apache.flink.shaded.guava18.com.google.common.util.concurrent.MoreExecutors;
9 import org.apache.flink.streaming.api.TimeCharacteristic;
10 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
11 import org.junit.After;
12 import org.junit.Before;
13 import org.slf4j.Logger;
14 import org.slf4j.LoggerFactory;
19 public abstract class Ass3EventTestBase {
21 private static final Logger LOG = LoggerFactory.getLogger(Ass3EventTestBase.class);
24 * Flag to control Flink's sysout logging
26 public static final boolean FLINK_DEBUG = false;
28 protected EventPublisher publisher;
29 protected StreamExecutionEnvironment flink;
30 protected ExecutorService executor;
32 private static EventPublisher previousPublisher;
35 public void setUpResources() throws Exception {
36 executor = Executors.newCachedThreadPool();
38 if (previousPublisher != null) {
39 previousPublisher.close();
42 publisher = createEventPublisher();
43 previousPublisher = publisher;
46 flink = createStreamExecutionEnvironment();
51 public void tearDownResources() throws Exception {
53 MoreExecutors.shutdownAndAwaitTermination(executor, 5, TimeUnit.SECONDS);
54 previousPublisher = null;
57 protected EventPublisher createEventPublisher() {
58 return new EventPublisher(Constants.EVENT_PUBLISHER_PORT);
61 protected StreamExecutionEnvironment createStreamExecutionEnvironment() {
62 return StreamExecutionEnvironment.createLocalEnvironment(1);
65 protected void doConfigure(StreamExecutionEnvironment env) {
66 ExecutionConfig config = env.getConfig();
69 config.enableSysoutLogging();
71 config.disableSysoutLogging();
74 flink.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
77 protected static void sleep(long ms) {
80 } catch (InterruptedException e) {
85 protected static long now() {
86 return System.currentTimeMillis();