]> git.somenet.org - pub/jan/dst18.git/blob - ass3-event/src/test/java/dst/ass3/event/tests/Ass3_3_1Test.java
Add template for assignment 3
[pub/jan/dst18.git] / ass3-event / src / test / java / dst / ass3 / event / tests / Ass3_3_1Test.java
1 package dst.ass3.event.tests;
2
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertNotNull;
5 import static org.junit.Assert.fail;
6
7 import java.io.ByteArrayOutputStream;
8 import java.io.NotSerializableException;
9 import java.io.ObjectOutputStream;
10 import java.util.ArrayList;
11 import java.util.List;
12 import java.util.concurrent.Future;
13 import java.util.concurrent.TimeUnit;
14
15 import dst.ass3.event.dto.UploadEventInfoDTO;
16 import dst.ass3.event.model.domain.IUploadEventInfo;
17 import dst.ass3.event.model.domain.RequestType;
18 import org.apache.flink.configuration.Configuration;
19 import org.apache.flink.streaming.api.functions.source.SourceFunction;
20 import org.apache.flink.streaming.api.watermark.Watermark;
21 import org.junit.Before;
22 import org.junit.Rule;
23 import org.junit.Test;
24 import org.junit.rules.Timeout;
25 import org.slf4j.Logger;
26 import org.slf4j.LoggerFactory;
27
28 import dst.ass3.event.Ass3EventTestBase;
29 import dst.ass3.event.EventProcessingFactory;
30 import dst.ass3.event.IEventSourceFunction;
31 import dst.ass3.event.model.domain.UploadState;
32
33 public class Ass3_3_1Test extends Ass3EventTestBase {
34
35     private static final Logger LOG = LoggerFactory.getLogger(Ass3_3_1Test.class);
36
37     @Rule
38     public Timeout timeout = new Timeout(15, TimeUnit.SECONDS);
39
40     private IEventSourceFunction sourceFunction;
41
42     @Before
43     public void setUp() throws Exception {
44         sourceFunction = EventProcessingFactory.createEventSourceFunction();
45         assertNotNull("EventProcessingFactory#createEventSourceFunction() not implemented", sourceFunction);
46     }
47
48     @Test
49     public void open_shouldConnectToSubscriber() throws Exception {
50         assertEquals(
51                 "IEventSourceFunction should not be connected upon construction",
52                 0, publisher.getConnectedClientCount()
53         );
54
55         sourceFunction.open(new Configuration());
56         publisher.waitForClients();
57
58         assertEquals(
59                 "Expected IEventSourceFunction to connect to publisher after open is called",
60                 1, publisher.getConnectedClientCount()
61         );
62     }
63
64     @Test
65     public void run_shouldCollectPublishedEvents() throws Exception {
66         sourceFunction.open(new Configuration());
67         publisher.waitForClients();
68
69         Future<List<IUploadEventInfo>> result = executor.submit(() -> {
70             MockContext<IUploadEventInfo> ctx = new MockContext<>();
71             LOG.info("Running IEventSourceFunction with MockContext");
72             sourceFunction.run(ctx);
73             LOG.info("Done running IEventSourceFunction, returning collected events");
74             return ctx.collected;
75         });
76
77         publisher.publish(new UploadEventInfoDTO(1L, 0L, UploadState.QUEUED, "s1", RequestType.VIDEO));
78         publisher.publish(new UploadEventInfoDTO(2L, 0L, UploadState.QUEUED, "s2", RequestType.VIDEO));
79
80         sleep(1000);
81
82         LOG.info("Calling cancel on SourceFunction");
83         sourceFunction.cancel();
84
85         LOG.info("Dropping subscriber connections");
86         publisher.dropClients();
87
88         LOG.info("Calling close on SourceFunction");
89         sourceFunction.close();
90
91         List<IUploadEventInfo> collected = result.get();
92         assertEquals(2, collected.size());
93
94         IUploadEventInfo e0 = collected.get(0);
95         IUploadEventInfo e1 = collected.get(1);
96
97         assertEquals(1L, e0.getRequestId(), 0);
98         assertEquals(2L, e1.getRequestId(), 0);
99     }
100
101     @Test
102     public void shouldBeSerializable() throws Exception {
103         try (ObjectOutputStream out = new ObjectOutputStream(new ByteArrayOutputStream())) {
104             out.writeObject(sourceFunction);
105             out.flush();
106         } catch (NotSerializableException e) {
107             fail("Implementation of IEventSourceFunction is not serializable");
108         }
109     }
110
111     private static class MockContext<T> implements SourceFunction.SourceContext<T> {
112
113         private final Object checkpointLock = new Object();
114
115         private List<T> collected = new ArrayList<>();
116
117         public List<T> getCollected() {
118             return collected;
119         }
120
121         @Override
122         public void collect(T element) {
123             collected.add(element);
124         }
125
126         @Override
127         public void collectWithTimestamp(T element, long timestamp) {
128             collected.add(element);
129         }
130
131         @Override
132         public void emitWatermark(Watermark mark) {
133
134         }
135
136         @Override
137         public void markAsTemporarilyIdle() {
138             
139         }
140
141         @Override
142         public Object getCheckpointLock() {
143             return checkpointLock;
144         }
145
146         @Override
147         public void close() {
148
149         }
150     }
151
152 }