1 package dst.ass3.event.tests;
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertNotNull;
5 import static org.junit.Assert.fail;
7 import java.io.ByteArrayOutputStream;
8 import java.io.NotSerializableException;
9 import java.io.ObjectOutputStream;
10 import java.util.ArrayList;
11 import java.util.List;
12 import java.util.concurrent.Future;
13 import java.util.concurrent.TimeUnit;
15 import dst.ass3.event.dto.UploadEventInfoDTO;
16 import dst.ass3.event.model.domain.IUploadEventInfo;
17 import dst.ass3.event.model.domain.RequestType;
18 import org.apache.flink.configuration.Configuration;
19 import org.apache.flink.streaming.api.functions.source.SourceFunction;
20 import org.apache.flink.streaming.api.watermark.Watermark;
21 import org.junit.Before;
22 import org.junit.Rule;
23 import org.junit.Test;
24 import org.junit.rules.Timeout;
25 import org.slf4j.Logger;
26 import org.slf4j.LoggerFactory;
28 import dst.ass3.event.Ass3EventTestBase;
29 import dst.ass3.event.EventProcessingFactory;
30 import dst.ass3.event.IEventSourceFunction;
31 import dst.ass3.event.model.domain.UploadState;
33 public class Ass3_3_1Test extends Ass3EventTestBase {
35 private static final Logger LOG = LoggerFactory.getLogger(Ass3_3_1Test.class);
38 public Timeout timeout = new Timeout(15, TimeUnit.SECONDS);
40 private IEventSourceFunction sourceFunction;
43 public void setUp() throws Exception {
44 sourceFunction = EventProcessingFactory.createEventSourceFunction();
45 assertNotNull("EventProcessingFactory#createEventSourceFunction() not implemented", sourceFunction);
49 public void open_shouldConnectToSubscriber() throws Exception {
51 "IEventSourceFunction should not be connected upon construction",
52 0, publisher.getConnectedClientCount()
55 sourceFunction.open(new Configuration());
56 publisher.waitForClients();
59 "Expected IEventSourceFunction to connect to publisher after open is called",
60 1, publisher.getConnectedClientCount()
65 public void run_shouldCollectPublishedEvents() throws Exception {
66 sourceFunction.open(new Configuration());
67 publisher.waitForClients();
69 Future<List<IUploadEventInfo>> result = executor.submit(() -> {
70 MockContext<IUploadEventInfo> ctx = new MockContext<>();
71 LOG.info("Running IEventSourceFunction with MockContext");
72 sourceFunction.run(ctx);
73 LOG.info("Done running IEventSourceFunction, returning collected events");
77 publisher.publish(new UploadEventInfoDTO(1L, 0L, UploadState.QUEUED, "s1", RequestType.VIDEO));
78 publisher.publish(new UploadEventInfoDTO(2L, 0L, UploadState.QUEUED, "s2", RequestType.VIDEO));
82 LOG.info("Calling cancel on SourceFunction");
83 sourceFunction.cancel();
85 LOG.info("Dropping subscriber connections");
86 publisher.dropClients();
88 LOG.info("Calling close on SourceFunction");
89 sourceFunction.close();
91 List<IUploadEventInfo> collected = result.get();
92 assertEquals(2, collected.size());
94 IUploadEventInfo e0 = collected.get(0);
95 IUploadEventInfo e1 = collected.get(1);
97 assertEquals(1L, e0.getRequestId(), 0);
98 assertEquals(2L, e1.getRequestId(), 0);
102 public void shouldBeSerializable() throws Exception {
103 try (ObjectOutputStream out = new ObjectOutputStream(new ByteArrayOutputStream())) {
104 out.writeObject(sourceFunction);
106 } catch (NotSerializableException e) {
107 fail("Implementation of IEventSourceFunction is not serializable");
111 private static class MockContext<T> implements SourceFunction.SourceContext<T> {
113 private final Object checkpointLock = new Object();
115 private List<T> collected = new ArrayList<>();
117 public List<T> getCollected() {
122 public void collect(T element) {
123 collected.add(element);
127 public void collectWithTimestamp(T element, long timestamp) {
128 collected.add(element);
132 public void emitWatermark(Watermark mark) {
137 public void markAsTemporarilyIdle() {
142 public Object getCheckpointLock() {
143 return checkpointLock;
147 public void close() {