From 29e3c863b8aefc89eb80706b229c1bbd765a7cbd Mon Sep 17 00:00:00 2001 From: Thomas Rausch Date: Tue, 8 May 2018 17:37:56 +0200 Subject: [PATCH] Add template for assignment 3 --- ass3-elastic/pom.xml | 53 ++++ .../dst/ass3/elastic/ContainerException.java | 22 ++ .../java/dst/ass3/elastic/ContainerInfo.java | 96 ++++++ .../elastic/ContainerNotFoundException.java | 23 ++ .../dst/ass3/elastic/IContainerService.java | 36 +++ .../ass3/elastic/IElasticityController.java | 7 + .../dst/ass3/elastic/IElasticityFactory.java | 10 + .../ass3/elastic/ImageNotFoundException.java | 22 ++ .../ass3/elastic/impl/ElasticityFactory.java | 24 ++ ass3-elastic/src/main/resources/logback.xml | 16 + .../ass3/elastic/ContainerServiceTest.java | 111 +++++++ .../elastic/ElasticityControllerTest.java | 135 ++++++++ ass3-event/pom.xml | 32 ++ .../main/java/dst/ass3/event/Constants.java | 15 + .../ass3/event/EventProcessingFactory.java | 16 + .../java/dst/ass3/event/EventPublisher.java | 173 ++++++++++ .../java/dst/ass3/event/EventSubscriber.java | 178 +++++++++++ .../event/IEventProcessingEnvironment.java | 39 +++ .../dst/ass3/event/IEventSourceFunction.java | 25 ++ .../ass3/event/dto/UploadEventInfoDTO.java | 61 ++++ .../event/model/domain/IUploadEventInfo.java | 11 + .../ass3/event/model/domain/RequestType.java | 8 + .../ass3/event/model/domain/UploadState.java | 8 + .../dst/ass3/event/model/events/Alert.java | 47 +++ .../model/events/AverageUploadDuration.java | 46 +++ .../event/model/events/LifecycleEvent.java | 95 ++++++ .../event/model/events/UploadDuration.java | 57 ++++ .../model/events/UploadFailedWarning.java | 35 ++ .../model/events/UploadTimeoutWarning.java | 35 ++ .../dst/ass3/event/model/events/Warning.java | 35 ++ .../src/main/resources/executionPlan.json | 1 + ass3-event/src/main/resources/logback.xml | 16 + .../dst/ass3/event/Ass3EventTestBase.java | 89 ++++++ .../dst/ass3/event/Ass3EventTestSuite.java | 23 ++ .../ass3/event/EventProcessingTestBase.java | 87 +++++ .../java/dst/ass3/event/StaticQueueSink.java | 83 +++++ .../dst/ass3/event/tests/Ass3_3_1Test.java | 152 +++++++++ .../dst/ass3/event/tests/Ass3_3_2Test.java | 88 +++++ .../dst/ass3/event/tests/Ass3_3_3Test.java | 209 ++++++++++++ .../dst/ass3/event/tests/Ass3_3_4Test.java | 300 ++++++++++++++++++ ass3-event/src/test/resources/logback.xml | 16 + ass3-messaging/pom.xml | 41 +++ .../java/dst/ass3/messaging/Constants.java | 37 +++ .../dst/ass3/messaging/IMessagingFactory.java | 21 ++ .../dst/ass3/messaging/IQueueManager.java | 28 ++ .../dst/ass3/messaging/IRequestGateway.java | 22 ++ .../dst/ass3/messaging/IWorkloadMonitor.java | 34 ++ .../java/dst/ass3/messaging/RequestType.java | 7 + .../dst/ass3/messaging/UploadRequest.java | 71 +++++ .../dst/ass3/messaging/WorkerResponse.java | 61 ++++ .../ass3/messaging/impl/MessagingFactory.java | 32 ++ ass3-messaging/src/main/resources/logback.xml | 16 + .../dst/ass3/messaging/RabbitResource.java | 48 +++ .../dst/ass3/messaging/impl/Ass3_1_Suite.java | 13 + .../ass3/messaging/impl/QueueManagerTest.java | 98 ++++++ .../messaging/impl/RequestGatewayTest.java | 119 +++++++ .../messaging/impl/WorkloadMonitorTest.java | 167 ++++++++++ ass3-worker/Dockerfile | 1 + ass3-worker/worker.py | 1 + pom.xml | 87 ++++- 60 files changed, 3428 insertions(+), 11 deletions(-) create mode 100644 ass3-elastic/pom.xml create mode 100644 ass3-elastic/src/main/java/dst/ass3/elastic/ContainerException.java create mode 100644 ass3-elastic/src/main/java/dst/ass3/elastic/ContainerInfo.java create mode 100644 ass3-elastic/src/main/java/dst/ass3/elastic/ContainerNotFoundException.java create mode 100644 ass3-elastic/src/main/java/dst/ass3/elastic/IContainerService.java create mode 100644 ass3-elastic/src/main/java/dst/ass3/elastic/IElasticityController.java create mode 100644 ass3-elastic/src/main/java/dst/ass3/elastic/IElasticityFactory.java create mode 100644 ass3-elastic/src/main/java/dst/ass3/elastic/ImageNotFoundException.java create mode 100644 ass3-elastic/src/main/java/dst/ass3/elastic/impl/ElasticityFactory.java create mode 100644 ass3-elastic/src/main/resources/logback.xml create mode 100644 ass3-elastic/src/test/java/dst/ass3/elastic/ContainerServiceTest.java create mode 100644 ass3-elastic/src/test/java/dst/ass3/elastic/ElasticityControllerTest.java create mode 100644 ass3-event/pom.xml create mode 100644 ass3-event/src/main/java/dst/ass3/event/Constants.java create mode 100644 ass3-event/src/main/java/dst/ass3/event/EventProcessingFactory.java create mode 100644 ass3-event/src/main/java/dst/ass3/event/EventPublisher.java create mode 100644 ass3-event/src/main/java/dst/ass3/event/EventSubscriber.java create mode 100644 ass3-event/src/main/java/dst/ass3/event/IEventProcessingEnvironment.java create mode 100644 ass3-event/src/main/java/dst/ass3/event/IEventSourceFunction.java create mode 100644 ass3-event/src/main/java/dst/ass3/event/dto/UploadEventInfoDTO.java create mode 100644 ass3-event/src/main/java/dst/ass3/event/model/domain/IUploadEventInfo.java create mode 100644 ass3-event/src/main/java/dst/ass3/event/model/domain/RequestType.java create mode 100644 ass3-event/src/main/java/dst/ass3/event/model/domain/UploadState.java create mode 100644 ass3-event/src/main/java/dst/ass3/event/model/events/Alert.java create mode 100644 ass3-event/src/main/java/dst/ass3/event/model/events/AverageUploadDuration.java create mode 100644 ass3-event/src/main/java/dst/ass3/event/model/events/LifecycleEvent.java create mode 100644 ass3-event/src/main/java/dst/ass3/event/model/events/UploadDuration.java create mode 100644 ass3-event/src/main/java/dst/ass3/event/model/events/UploadFailedWarning.java create mode 100644 ass3-event/src/main/java/dst/ass3/event/model/events/UploadTimeoutWarning.java create mode 100644 ass3-event/src/main/java/dst/ass3/event/model/events/Warning.java create mode 100644 ass3-event/src/main/resources/executionPlan.json create mode 100644 ass3-event/src/main/resources/logback.xml create mode 100644 ass3-event/src/test/java/dst/ass3/event/Ass3EventTestBase.java create mode 100644 ass3-event/src/test/java/dst/ass3/event/Ass3EventTestSuite.java create mode 100644 ass3-event/src/test/java/dst/ass3/event/EventProcessingTestBase.java create mode 100644 ass3-event/src/test/java/dst/ass3/event/StaticQueueSink.java create mode 100644 ass3-event/src/test/java/dst/ass3/event/tests/Ass3_3_1Test.java create mode 100644 ass3-event/src/test/java/dst/ass3/event/tests/Ass3_3_2Test.java create mode 100644 ass3-event/src/test/java/dst/ass3/event/tests/Ass3_3_3Test.java create mode 100644 ass3-event/src/test/java/dst/ass3/event/tests/Ass3_3_4Test.java create mode 100644 ass3-event/src/test/resources/logback.xml create mode 100644 ass3-messaging/pom.xml create mode 100644 ass3-messaging/src/main/java/dst/ass3/messaging/Constants.java create mode 100644 ass3-messaging/src/main/java/dst/ass3/messaging/IMessagingFactory.java create mode 100644 ass3-messaging/src/main/java/dst/ass3/messaging/IQueueManager.java create mode 100644 ass3-messaging/src/main/java/dst/ass3/messaging/IRequestGateway.java create mode 100644 ass3-messaging/src/main/java/dst/ass3/messaging/IWorkloadMonitor.java create mode 100644 ass3-messaging/src/main/java/dst/ass3/messaging/RequestType.java create mode 100644 ass3-messaging/src/main/java/dst/ass3/messaging/UploadRequest.java create mode 100644 ass3-messaging/src/main/java/dst/ass3/messaging/WorkerResponse.java create mode 100644 ass3-messaging/src/main/java/dst/ass3/messaging/impl/MessagingFactory.java create mode 100644 ass3-messaging/src/main/resources/logback.xml create mode 100644 ass3-messaging/src/test/java/dst/ass3/messaging/RabbitResource.java create mode 100644 ass3-messaging/src/test/java/dst/ass3/messaging/impl/Ass3_1_Suite.java create mode 100644 ass3-messaging/src/test/java/dst/ass3/messaging/impl/QueueManagerTest.java create mode 100644 ass3-messaging/src/test/java/dst/ass3/messaging/impl/RequestGatewayTest.java create mode 100644 ass3-messaging/src/test/java/dst/ass3/messaging/impl/WorkloadMonitorTest.java create mode 100644 ass3-worker/Dockerfile create mode 100644 ass3-worker/worker.py diff --git a/ass3-elastic/pom.xml b/ass3-elastic/pom.xml new file mode 100644 index 0000000..6d53fad --- /dev/null +++ b/ass3-elastic/pom.xml @@ -0,0 +1,53 @@ + + + + 4.0.0 + + + at.ac.tuwien.infosys.dst + dst + 2018.1-SNAPSHOT + .. + + + ass3-elastic + + DST :: Assignment 3 :: Elasticity + + jar + + + + at.ac.tuwien.infosys.dst + ass3-messaging + ${project.version} + + + + at.ac.tuwien.infosys.dst + ass3-messaging + ${project.version} + test-jar + test + + + + com.github.docker-java + docker-java + + + + org.mockito + mockito-core + test + + + + org.springframework.boot + spring-boot-starter-amqp + test + + + + diff --git a/ass3-elastic/src/main/java/dst/ass3/elastic/ContainerException.java b/ass3-elastic/src/main/java/dst/ass3/elastic/ContainerException.java new file mode 100644 index 0000000..0678fa4 --- /dev/null +++ b/ass3-elastic/src/main/java/dst/ass3/elastic/ContainerException.java @@ -0,0 +1,22 @@ +package dst.ass3.elastic; + +/** + * Exception indicating that the ContainerService encountered an error when performing a task. + */ +public class ContainerException extends Exception { + + public ContainerException() { + } + + public ContainerException(String message) { + super(message); + } + + public ContainerException(String message, Throwable cause) { + super(message, cause); + } + + public ContainerException(Throwable cause) { + super(cause); + } +} diff --git a/ass3-elastic/src/main/java/dst/ass3/elastic/ContainerInfo.java b/ass3-elastic/src/main/java/dst/ass3/elastic/ContainerInfo.java new file mode 100644 index 0000000..1d30517 --- /dev/null +++ b/ass3-elastic/src/main/java/dst/ass3/elastic/ContainerInfo.java @@ -0,0 +1,96 @@ +package dst.ass3.elastic; + +import dst.ass3.messaging.RequestType; + +import java.util.Objects; + +/** + * Value type that represents a container. + */ +public class ContainerInfo { + + /** + * The name of the container image. + */ + private String image; + + /** + * The container ID. + */ + private String containerId; + + /** + * True if the container is running. + */ + private boolean running; + + /** + * If the container is a worker (indicated by the image dst/ass3-worker), then this field should contain the worker + * type. Otherwise it can be null. + */ + private RequestType workerType; + + public String getImage() { + return image; + } + + public void setImage(String image) { + this.image = image; + } + + public String getContainerId() { + return containerId; + } + + public void setContainerId(String containerId) { + this.containerId = containerId; + } + + public boolean isRunning() { + return running; + } + + public void setRunning(boolean running) { + this.running = running; + } + + + public RequestType getWorkerType() { + return workerType; + } + + public void setWorkerType(RequestType workerType) { + this.workerType = workerType; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + ContainerInfo that = (ContainerInfo) o; + return running == that.running && + Objects.equals(image, that.image) && + Objects.equals(containerId, that.containerId) && + Objects.equals(workerType, that.workerType); + } + + @Override + public int hashCode() { + + return Objects.hash(image, containerId, running, workerType); + } + + @Override + public String toString() { + return "ContainerInfo{" + + "image='" + image + '\'' + + ", containerId='" + containerId + '\'' + + ", running=" + running + + ", workerType='" + workerType + '\'' + + '}'; + } +} diff --git a/ass3-elastic/src/main/java/dst/ass3/elastic/ContainerNotFoundException.java b/ass3-elastic/src/main/java/dst/ass3/elastic/ContainerNotFoundException.java new file mode 100644 index 0000000..797cfd3 --- /dev/null +++ b/ass3-elastic/src/main/java/dst/ass3/elastic/ContainerNotFoundException.java @@ -0,0 +1,23 @@ +package dst.ass3.elastic; + +/** + * Indicates that a container could not be found. + */ +public class ContainerNotFoundException extends ContainerException { + + public ContainerNotFoundException() { + } + + public ContainerNotFoundException(String message) { + super(message); + } + + public ContainerNotFoundException(String message, Throwable cause) { + super(message, cause); + } + + public ContainerNotFoundException(Throwable cause) { + super(cause); + } + +} diff --git a/ass3-elastic/src/main/java/dst/ass3/elastic/IContainerService.java b/ass3-elastic/src/main/java/dst/ass3/elastic/IContainerService.java new file mode 100644 index 0000000..1974b42 --- /dev/null +++ b/ass3-elastic/src/main/java/dst/ass3/elastic/IContainerService.java @@ -0,0 +1,36 @@ +package dst.ass3.elastic; + +import dst.ass3.messaging.RequestType; + +import java.util.List; + +public interface IContainerService { + + /** + * Returns a list of all running containers. + * + * @return a list of ContainerInfo objects + * @throws ContainerException if an error occurred when trying to fetch the running containers. + */ + List listContainers() throws ContainerException; + + /** + * Stops the container with the given container ID. + * + * @param containerId ID of the container to stop. + * @throws ContainerNotFoundException if the container to stop is not running + * @throws ContainerException if another error occurred when trying to stop the container + */ + void stopContainer(String containerId) throws ContainerException; + + /** + * Starts a worker for the specific {@link RequestType}. + * + * @param type {@link RequestType} of the worker to start + * @return ContainerInfo of the started container / worker + * @throws ImageNotFoundException if the worker docker image is not available + * @throws ContainerException if another error occurred when trying to start the worker + */ + ContainerInfo startWorker(RequestType type) throws ContainerException; + +} diff --git a/ass3-elastic/src/main/java/dst/ass3/elastic/IElasticityController.java b/ass3-elastic/src/main/java/dst/ass3/elastic/IElasticityController.java new file mode 100644 index 0000000..9e35c7e --- /dev/null +++ b/ass3-elastic/src/main/java/dst/ass3/elastic/IElasticityController.java @@ -0,0 +1,7 @@ +package dst.ass3.elastic; + +public interface IElasticityController { + + void adjustWorkers() throws ContainerException; + +} diff --git a/ass3-elastic/src/main/java/dst/ass3/elastic/IElasticityFactory.java b/ass3-elastic/src/main/java/dst/ass3/elastic/IElasticityFactory.java new file mode 100644 index 0000000..b65674d --- /dev/null +++ b/ass3-elastic/src/main/java/dst/ass3/elastic/IElasticityFactory.java @@ -0,0 +1,10 @@ +package dst.ass3.elastic; + +import dst.ass3.messaging.IWorkloadMonitor; + +public interface IElasticityFactory { + IContainerService createContainerService(); + + IElasticityController createElasticityController(IContainerService containerService, + IWorkloadMonitor workloadMonitor); +} diff --git a/ass3-elastic/src/main/java/dst/ass3/elastic/ImageNotFoundException.java b/ass3-elastic/src/main/java/dst/ass3/elastic/ImageNotFoundException.java new file mode 100644 index 0000000..bc659c5 --- /dev/null +++ b/ass3-elastic/src/main/java/dst/ass3/elastic/ImageNotFoundException.java @@ -0,0 +1,22 @@ +package dst.ass3.elastic; + +/** + * Exception indicating that the image which should be used for a container start is not available. + */ +public class ImageNotFoundException extends ContainerException { + + public ImageNotFoundException() { + } + + public ImageNotFoundException(String message) { + super(message); + } + + public ImageNotFoundException(String message, Throwable cause) { + super(message, cause); + } + + public ImageNotFoundException(Throwable cause) { + super(cause); + } +} diff --git a/ass3-elastic/src/main/java/dst/ass3/elastic/impl/ElasticityFactory.java b/ass3-elastic/src/main/java/dst/ass3/elastic/impl/ElasticityFactory.java new file mode 100644 index 0000000..64d2cd9 --- /dev/null +++ b/ass3-elastic/src/main/java/dst/ass3/elastic/impl/ElasticityFactory.java @@ -0,0 +1,24 @@ +package dst.ass3.elastic.impl; + +import dst.ass3.elastic.IContainerService; +import dst.ass3.elastic.IElasticityController; +import dst.ass3.elastic.IElasticityFactory; +import dst.ass3.messaging.IWorkloadMonitor; +import dst.ass3.messaging.impl.MessagingFactory; + +public class ElasticityFactory implements IElasticityFactory { + + @Override + public IContainerService createContainerService() { + // TODO + return null; + } + + @Override + public IElasticityController createElasticityController(IContainerService containerService, + IWorkloadMonitor workloadMonitor) { + // TODO + return null; + } + +} diff --git a/ass3-elastic/src/main/resources/logback.xml b/ass3-elastic/src/main/resources/logback.xml new file mode 100644 index 0000000..9a6b351 --- /dev/null +++ b/ass3-elastic/src/main/resources/logback.xml @@ -0,0 +1,16 @@ + + + + + + %d{yyyy-MM-dd HH:mm:ss.SSS} - %highlight(%5p) [%12.12thread] %cyan(%-40.40logger{39}): %m%n + + + + + + + + + + diff --git a/ass3-elastic/src/test/java/dst/ass3/elastic/ContainerServiceTest.java b/ass3-elastic/src/test/java/dst/ass3/elastic/ContainerServiceTest.java new file mode 100644 index 0000000..10b8342 --- /dev/null +++ b/ass3-elastic/src/test/java/dst/ass3/elastic/ContainerServiceTest.java @@ -0,0 +1,111 @@ +package dst.ass3.elastic; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.CoreMatchers.notNullValue; +import static org.junit.Assert.assertThat; + +import java.util.List; +import java.util.concurrent.TimeUnit; + +import com.github.dockerjava.api.exception.NotFoundException; +import dst.ass3.messaging.RabbitResource; +import dst.ass3.messaging.RequestType; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.Timeout; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import dst.ass3.elastic.impl.ElasticityFactory; +import org.springframework.amqp.core.Queue; + +public class ContainerServiceTest { + + private static final Logger LOG = LoggerFactory.getLogger(ContainerServiceTest.class); + + @Rule + public RabbitResource rabbit = new RabbitResource(); + + @Rule + public Timeout timeout = new Timeout(30, TimeUnit.SECONDS); + + IContainerService containerService; + IElasticityFactory factory; + + @Before + public void setUp() throws Exception { + factory = new ElasticityFactory(); + + containerService = factory.createContainerService(); + + rabbit.getAdmin().declareQueue(new Queue("dst.document")); + rabbit.getAdmin().declareQueue(new Queue("dst.quiz")); + rabbit.getAdmin().declareQueue(new Queue("dst.video")); + } + + @After + public void tearDown() throws Exception { + rabbit.getAdmin().deleteQueue("dst.document"); + rabbit.getAdmin().deleteQueue("dst.quiz"); + rabbit.getAdmin().deleteQueue("dst.video"); + } + + @Test + public void spawnListStop_lifecycleWorks() throws Exception { + List containers = containerService.listContainers(); + assertThat("Please stop all containers before running the test", containers.size(), is(0)); + + ContainerInfo c1 = containerService.startWorker(RequestType.QUIZ); + LOG.info("Started container {}", c1); + + ContainerInfo c2 = containerService.startWorker(RequestType.DOCUMENT); + LOG.info("Started container {}", c2); + + ContainerInfo c3 = containerService.startWorker(RequestType.VIDEO); + LOG.info("Started container {}", c3); + + LOG.info("Waiting for containers to boot..."); + Thread.sleep(5000); + + containers = containerService.listContainers(); + + assertThat(containers.size(), is(3)); + + LOG.info("Stopping containers..."); + containerService.stopContainer(c1.getContainerId()); + containerService.stopContainer(c2.getContainerId()); + containerService.stopContainer(c3.getContainerId()); + + Thread.sleep(5000); + + containers = containerService.listContainers(); + assertThat(containers.size(), is(0)); + } + + @Test(expected = ContainerNotFoundException.class) + public void stopNonExistingContainer_throwsException() throws Exception { + containerService.stopContainer("Non-Existing-Id"); + } + + @Test + public void listContainers_containsCompleteInfo() throws Exception { + ContainerInfo c1 = containerService.startWorker(RequestType.QUIZ); + LOG.info("Started container {}", c1); + LOG.info("Waiting for container to boot..."); + Thread.sleep(5000); + List containers = containerService.listContainers(); + ContainerInfo containerInfo = containers.stream() + .filter(c -> c1.getContainerId().equals(c.getContainerId())) + .findFirst().get(); + assertThat(containerInfo, notNullValue()); + assertThat(containerInfo.getImage(), equalTo("dst/ass3-worker")); + assertThat(containerInfo.getWorkerType(), equalTo(RequestType.QUIZ)); + assertThat(containerInfo.isRunning(), is(true)); + LOG.info("Stopping container..."); + containerService.stopContainer(containerInfo.getContainerId()); + } + +} \ No newline at end of file diff --git a/ass3-elastic/src/test/java/dst/ass3/elastic/ElasticityControllerTest.java b/ass3-elastic/src/test/java/dst/ass3/elastic/ElasticityControllerTest.java new file mode 100644 index 0000000..d7312dc --- /dev/null +++ b/ass3-elastic/src/test/java/dst/ass3/elastic/ElasticityControllerTest.java @@ -0,0 +1,135 @@ +package dst.ass3.elastic; + +import dst.ass3.elastic.impl.ElasticityFactory; +import dst.ass3.messaging.IWorkloadMonitor; +import dst.ass3.messaging.RequestType; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.*; + +@RunWith(MockitoJUnitRunner.class) +public class ElasticityControllerTest { + private static final String WORKER_IMAGE = "dst/ass3-worker"; + + IElasticityFactory factory; + + @Mock + IContainerService containerService; + @Mock + IWorkloadMonitor workloadMonitor; + + IElasticityController elasticityController; + Map processingTimes = new HashMap<>(); + Map workerCount = new HashMap<>(); + Map requestCount = new HashMap<>(); + List containers = new ArrayList<>(); + + @Before + public void setUp() throws Exception { + factory = new ElasticityFactory(); + elasticityController = factory.createElasticityController(containerService, workloadMonitor); + + processingTimes.clear(); + processingTimes.put(RequestType.DOCUMENT, 5000.0); + processingTimes.put(RequestType.VIDEO, 10000.0); + processingTimes.put(RequestType.QUIZ, 2000.0); + when(workloadMonitor.getAverageProcessingTime()).thenReturn(processingTimes); + + workerCount.clear(); + workerCount.put(RequestType.DOCUMENT, 95L); + workerCount.put(RequestType.VIDEO, 88L); + workerCount.put(RequestType.QUIZ, 61L); + when(workloadMonitor.getWorkerCount()).thenReturn(workerCount); + + requestCount.clear(); + requestCount.put(RequestType.DOCUMENT, 600L); + requestCount.put(RequestType.VIDEO, 1000L); + requestCount.put(RequestType.QUIZ, 1000L); + when(workloadMonitor.getRequestCount()).thenReturn(requestCount); + + containers.clear(); + for (int i = 0; i < 95; i++) { + containers.add(containerInfo("documentId" + i, WORKER_IMAGE, RequestType.DOCUMENT, true)); + } + for (int i = 0; i < 88; i++) { + containers.add(containerInfo("video" + i, WORKER_IMAGE, RequestType.VIDEO, true)); + } + for (int i = 0; i < 61; i++) { + containers.add(containerInfo("quiz" + i, WORKER_IMAGE, RequestType.QUIZ, true)); + } + when(containerService.listContainers()).thenReturn(containers); + } + + @After + public void tearDown() { + verify(workloadMonitor, atLeast(1)).getWorkerCount(); + verify(workloadMonitor, atLeast(1)).getRequestCount(); + verify(workloadMonitor, atLeast(1)).getAverageProcessingTime(); + } + + @Test + public void notEnoughWorkers_scaleUp() throws Exception { + // remove 10 document workers and 10 quiz workers + List containersToRemove = containers.stream() + .filter(c -> c.getContainerId().startsWith("documentId7") || c.getContainerId().startsWith("quiz1")) + .collect(Collectors.toList()); + containers.removeAll(containersToRemove); + workerCount.put(RequestType.DOCUMENT, 85L); + workerCount.put(RequestType.QUIZ, 51L); + + elasticityController.adjustWorkers(); + + verify(containerService, never()).stopContainer((String) any(String.class)); + verify(containerService, times(15)).startWorker(RequestType.DOCUMENT); + verify(containerService, times(16)).startWorker(RequestType.QUIZ); + verify(containerService, never()).startWorker(RequestType.VIDEO); + verify(containerService, never()).listContainers(); + } + + @Test + public void tooManyWorkers_scaleDown() throws Exception { + // add 20 more, some should be stopped + for (int i = 0; i < 20; i++) { + containers.add(containerInfo("quiz1" + i, WORKER_IMAGE, RequestType.QUIZ, true)); + } + workerCount.put(RequestType.QUIZ, 81L); + + elasticityController.adjustWorkers(); + + verify(containerService, times(14)).stopContainer(contains("quiz")); + verify(containerService, never()).stopContainer(contains("video")); + verify(containerService, never()).stopContainer(contains("document")); + verify(containerService, never()).startWorker((RequestType) any(RequestType.class)); + verify(containerService, times(1)).listContainers(); + } + + @Test + public void justEnoughWorkers_doNotScale() throws Exception { + elasticityController.adjustWorkers(); + verify(containerService, never()).startWorker((RequestType) any(RequestType.class)); + verify(containerService, never()).stopContainer((String) any()); + verify(containerService, never()).listContainers(); + } + + private ContainerInfo containerInfo(String id, String image, RequestType workerType, boolean running) { + ContainerInfo info = new ContainerInfo(); + info.setContainerId(id); + info.setImage(image); + info.setWorkerType(workerType); + info.setRunning(running); + return info; + } + +} \ No newline at end of file diff --git a/ass3-event/pom.xml b/ass3-event/pom.xml new file mode 100644 index 0000000..feb7e0a --- /dev/null +++ b/ass3-event/pom.xml @@ -0,0 +1,32 @@ + + + + 4.0.0 + + + at.ac.tuwien.infosys.dst + dst + 2018.1-SNAPSHOT + .. + + + ass3-event + + jar + + DST :: Assignment 3 :: Event Stream Processing + + + + org.apache.flink + flink-streaming-java_2.11 + + + org.apache.flink + flink-cep_2.11 + + + + diff --git a/ass3-event/src/main/java/dst/ass3/event/Constants.java b/ass3-event/src/main/java/dst/ass3/event/Constants.java new file mode 100644 index 0000000..d06492a --- /dev/null +++ b/ass3-event/src/main/java/dst/ass3/event/Constants.java @@ -0,0 +1,15 @@ +package dst.ass3.event; + +/** + * Constants. + */ +public final class Constants { + + /** + * The TCP port of the {@link EventPublisher}. + */ + public static final int EVENT_PUBLISHER_PORT = 1338; + + private Constants() { + } +} diff --git a/ass3-event/src/main/java/dst/ass3/event/EventProcessingFactory.java b/ass3-event/src/main/java/dst/ass3/event/EventProcessingFactory.java new file mode 100644 index 0000000..6a8c668 --- /dev/null +++ b/ass3-event/src/main/java/dst/ass3/event/EventProcessingFactory.java @@ -0,0 +1,16 @@ +package dst.ass3.event; + +/** + * Creates your {@link IEventProcessingEnvironment} and {@link IEventSourceFunction} implementation instances. + */ +public class EventProcessingFactory { + public static IEventProcessingEnvironment createEventProcessingEnvironment() { + // TODO + return null; + } + + public static IEventSourceFunction createEventSourceFunction() { + // TODO + return null; + } +} diff --git a/ass3-event/src/main/java/dst/ass3/event/EventPublisher.java b/ass3-event/src/main/java/dst/ass3/event/EventPublisher.java new file mode 100644 index 0000000..87a7048 --- /dev/null +++ b/ass3-event/src/main/java/dst/ass3/event/EventPublisher.java @@ -0,0 +1,173 @@ +package dst.ass3.event; + +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Function; + +import dst.ass3.event.model.domain.IUploadEventInfo; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.flink.shaded.netty4.io.netty.bootstrap.ServerBootstrap; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFuture; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInitializer; +import org.apache.flink.shaded.netty4.io.netty.channel.EventLoopGroup; +import org.apache.flink.shaded.netty4.io.netty.channel.group.ChannelGroup; +import org.apache.flink.shaded.netty4.io.netty.channel.group.DefaultChannelGroup; +import org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoopGroup; +import org.apache.flink.shaded.netty4.io.netty.channel.socket.SocketChannel; +import org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioServerSocketChannel; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.serialization.ClassResolvers; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.serialization.ObjectDecoder; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.serialization.ObjectEncoder; + +/** + * An EventPublisher accepts incoming TCP socket connections on a given port and is able to broadcast {@link IUploadEventInfo} + * objects to these clients. + */ +public class EventPublisher { + + private static final Logger LOG = LoggerFactory.getLogger(EventPublisher.class); + + private final Object clientChannelMonitor = new Object(); + + private final int port; + private final AtomicBoolean closed; + + private EventLoopGroup bossGroup; + private EventLoopGroup workerGroup; + private ChannelGroup clientChannels; + + public EventPublisher(int port) { + this.port = port; + this.closed = new AtomicBoolean(false); + } + + public int getPort() { + return port; + } + + /** + * Broadcast an event to all listening channels. Does nothing if no clients are connected. + * + * @param event the event to publish + * @throws IllegalStateException if the publisher hasn't been started yet or has been closed + */ + public void publish(IUploadEventInfo event) { + if (clientChannels == null || closed.get()) { + throw new IllegalStateException(); + } + + clientChannels.writeAndFlush(event).syncUninterruptibly(); + + // wait a bit for event to propagate + try { + Thread.sleep(10); + } catch (InterruptedException e) {} + } + + /** + * Like {@link #publish(IUploadEventInfo)} but waits for a given number of milliseconds and then passes the current system + * time to a factory function. + * + * @param delay the delay in ms + * @param provider the provider + */ + public void publish(long delay, Function provider) { + if (delay > 0) { + try { + Thread.sleep(delay); + } catch (InterruptedException e) { + } + } + publish(provider.apply(System.currentTimeMillis())); + } + + /** + * This method blocks if no clients are connected, and is notified as soon as a client connects. If clients are + * connected, the method returns immediately. + */ + public void waitForClients() { + if (clientChannels.isEmpty()) { + LOG.debug("Waiting for clients to connect..."); + synchronized (clientChannelMonitor) { + try { + clientChannelMonitor.wait(); + } catch (InterruptedException e) { + LOG.debug("Interrupted while waiting on client connections", e); + } + } + } + } + + public int getConnectedClientCount() { + if (clientChannels == null || closed.get()) { + throw new IllegalStateException(); + } + return clientChannels.size(); + } + + /** + * Closes all active client connections. + */ + public void dropClients() { + if (clientChannels == null || closed.get()) { + throw new IllegalStateException(); + } + clientChannels.close().syncUninterruptibly().group().clear(); + } + + /** + * Start the server and accept incoming connections. Will call {@link #close()} if an error occurs during + * connection. + */ + public void start() { + bossGroup = new NioEventLoopGroup(); + workerGroup = new NioEventLoopGroup(); + clientChannels = new DefaultChannelGroup(workerGroup.next()); + + ServerBootstrap b = new ServerBootstrap(); + b.group(bossGroup, workerGroup) + .channel(NioServerSocketChannel.class) + .childHandler(new ClientChannelInitializer()); + + // Bind and start to accept incoming connections + ChannelFuture f = b.bind(port).addListener(future -> { + if (!future.isSuccess()) { + LOG.error("Error while binding socket"); + close(); + } + }).syncUninterruptibly(); + LOG.info("Accepting connections on {}", f.channel()); + } + + /** + * Closes all channels and resources. + */ + public void close() { + if (closed.compareAndSet(false, true)) { + LOG.info("Shutting down event loops"); + if (bossGroup != null) { + bossGroup.shutdownGracefully(); + } + if (workerGroup != null) { + workerGroup.shutdownGracefully(); + } + } + } + + private class ClientChannelInitializer extends ChannelInitializer { + @Override + public void initChannel(SocketChannel ch) throws Exception { + LOG.info("Initializing client channel {}", ch); + clientChannels.add(ch); + + ch.pipeline() + .addFirst(new ObjectEncoder()) + .addFirst(new ObjectDecoder(ClassResolvers.cacheDisabled(ClassLoader.getSystemClassLoader()))); + + synchronized (clientChannelMonitor) { + clientChannelMonitor.notifyAll(); + } + } + } +} diff --git a/ass3-event/src/main/java/dst/ass3/event/EventSubscriber.java b/ass3-event/src/main/java/dst/ass3/event/EventSubscriber.java new file mode 100644 index 0000000..0525d7b --- /dev/null +++ b/ass3-event/src/main/java/dst/ass3/event/EventSubscriber.java @@ -0,0 +1,178 @@ +package dst.ass3.event; + +import java.lang.reflect.Proxy; +import java.net.SocketAddress; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; + +import dst.ass3.event.model.domain.IUploadEventInfo; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.flink.shaded.netty4.io.netty.bootstrap.Bootstrap; +import org.apache.flink.shaded.netty4.io.netty.channel.Channel; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelOption; +import org.apache.flink.shaded.netty4.io.netty.channel.EventLoopGroup; +import org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoopGroup; +import org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.serialization.ClassResolvers; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.serialization.ObjectDecoder; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.serialization.ObjectEncoder; +import org.apache.flink.shaded.netty4.io.netty.util.concurrent.Future; + +/** + * An EventSubscriber receives IUploadEventInfo objects through a netty SocketChannel. Create and connect an + * EventSubscriber using {@link #subscribe(SocketAddress)}. To receive events, call {@link #receive()}. + */ +public class EventSubscriber { + + private static final Logger LOG = LoggerFactory.getLogger(EventSubscriber.class); + + private static final IUploadEventInfo POISON_PILL = (IUploadEventInfo) Proxy.newProxyInstance( + IUploadEventInfo.class.getClassLoader(), new Class[]{IUploadEventInfo.class}, (p, m, a) -> null); + + private final SocketAddress publisherAddress; + + private final BlockingQueue queue; + + private volatile boolean closed; + + private Channel channel; + private EventLoopGroup loop; + + private EventSubscriber(SocketAddress publisherAddress) { + this.publisherAddress = publisherAddress; + this.queue = new LinkedBlockingQueue<>(); + } + + /** + * Blocks to receive the next IUploadEventInfo published into the channel. Returns {@code null} if the underlying + * channel has been closed or the thread was interrupted. + * + * @return the next IUploadEventInfo object + * @throws IllegalStateException thrown if the previous call returned null and the channel was closed + */ + public IUploadEventInfo receive() throws IllegalStateException { + synchronized (queue) { + if (closed && queue.isEmpty()) { + throw new IllegalStateException(); + } + } + + IUploadEventInfo event; + try { + event = queue.take(); + + if (event == POISON_PILL) { + return null; + } else { + return event; + } + } catch (InterruptedException e) { + return null; + } + } + + private Future start() { + loop = new NioEventLoopGroup(); + + channel = new Bootstrap() + .group(loop) + .channel(NioSocketChannel.class) + .option(ChannelOption.TCP_NODELAY, true) + .handler(new EventSubscriberHandler()) + .connect(publisherAddress) // ChannelFuture + .addListener(future -> { + if (!future.isSuccess()) { + LOG.error("Error while connecting"); + close(); + } + }) + .syncUninterruptibly() + .channel(); + + LOG.info("Connected to channel {}", channel); + + return loop.submit(() -> { + try { + channel.closeFuture().sync(); + } catch (InterruptedException e) { + // noop + } finally { + close(); + } + }); + } + + /** + * Closes all resources and threads used by the EventSubscriber. + */ + public void close() { + try { + if (loop != null) { + synchronized (queue) { + if (!loop.isShutdown() && !loop.isTerminated() && !loop.isShuttingDown()) { + LOG.info("Shutting down event loop"); + loop.shutdownGracefully(); + } + } + } + } finally { + synchronized (queue) { + if (!closed) { + LOG.debug("Adding poison pill to queue"); + closed = true; + queue.add(POISON_PILL); + } + } + } + } + + /** + * Creates a new EventSubscriber that connects to given SocketAddress. + * + * @param address the socket address + * @return a new EventSubscriber + */ + public static EventSubscriber subscribe(SocketAddress address) { + EventSubscriber eventSubscriber = new EventSubscriber(address); + eventSubscriber.start(); + return eventSubscriber; + } + + private class EventSubscriberHandler extends ChannelInboundHandlerAdapter { + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + ctx.read(); + + if (!(msg instanceof IUploadEventInfo)) { + LOG.error("Unknown message type received {}", msg); + return; + } + + synchronized (queue) { + if (!closed) { + queue.add((IUploadEventInfo) msg); + } + } + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { + LOG.error("EventSubscriberHandler caught an exception", cause); + ctx.close(); + close(); + } + + @Override + public void channelActive(ChannelHandlerContext ctx) { + ctx.pipeline() + .addFirst(new ObjectEncoder()) + .addFirst(new ObjectDecoder(ClassResolvers.cacheDisabled(ClassLoader.getSystemClassLoader()))); + } + + } +} + diff --git a/ass3-event/src/main/java/dst/ass3/event/IEventProcessingEnvironment.java b/ass3-event/src/main/java/dst/ass3/event/IEventProcessingEnvironment.java new file mode 100644 index 0000000..a1ffd0a --- /dev/null +++ b/ass3-event/src/main/java/dst/ass3/event/IEventProcessingEnvironment.java @@ -0,0 +1,39 @@ +package dst.ass3.event; + +import dst.ass3.event.model.events.*; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.sink.SinkFunction; +import org.apache.flink.streaming.api.windowing.time.Time; + +/** + * This class should be used to implement the event processing steps as described in the assignment. The test classes + * will inject SinkFunctions, create a new StreamExecutionEnvironment and then call {@link + * #initialize(StreamExecutionEnvironment)}. + */ +public interface IEventProcessingEnvironment { + + /** + * Initializes the event processing graph on the {@link StreamExecutionEnvironment}. This function is called + * after all sinks have been set. + */ + void initialize(StreamExecutionEnvironment env); + + /** + * Sets the timeout limit of a streaming event. + * + * @param time the timeout limit + */ + void setUploadDurationTimeout(Time time); + + void setLifecycleEventStreamSink(SinkFunction sink); + + void setUploadDurationStreamSink(SinkFunction sink); + + void setAverageUploadDurationStreamSink(SinkFunction sink); + + void setUploadTimeoutWarningStreamSink(SinkFunction sink); + + void setUploadFailedWarningStreamSink(SinkFunction sink); + + void setAlertStreamSink(SinkFunction sink); +} diff --git a/ass3-event/src/main/java/dst/ass3/event/IEventSourceFunction.java b/ass3-event/src/main/java/dst/ass3/event/IEventSourceFunction.java new file mode 100644 index 0000000..18589dd --- /dev/null +++ b/ass3-event/src/main/java/dst/ass3/event/IEventSourceFunction.java @@ -0,0 +1,25 @@ +package dst.ass3.event; + +import org.apache.flink.api.common.functions.RichFunction; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.source.SourceFunction; + +import dst.ass3.event.model.domain.IUploadEventInfo; + +/** + * A RichFunction & SourceFunction for IUploadEventInfo objects. + */ +public interface IEventSourceFunction extends RichFunction, SourceFunction { + + @Override + void open(Configuration parameters) throws Exception; + + @Override + void close() throws Exception; + + @Override + void run(SourceContext ctx) throws Exception; + + @Override + void cancel(); +} diff --git a/ass3-event/src/main/java/dst/ass3/event/dto/UploadEventInfoDTO.java b/ass3-event/src/main/java/dst/ass3/event/dto/UploadEventInfoDTO.java new file mode 100644 index 0000000..7465969 --- /dev/null +++ b/ass3-event/src/main/java/dst/ass3/event/dto/UploadEventInfoDTO.java @@ -0,0 +1,61 @@ +package dst.ass3.event.dto; + +import java.io.Serializable; + +import dst.ass3.event.model.domain.IUploadEventInfo; +import dst.ass3.event.model.domain.RequestType; +import dst.ass3.event.model.domain.UploadState; + +public class UploadEventInfoDTO implements Serializable, IUploadEventInfo { + private static final long serialVersionUID = 4134104076758220138L; + + private Long requestId; + private Long timestamp; + private UploadState state; + private String server; + private RequestType requestType; + + public UploadEventInfoDTO(Long requestId, Long timestamp, UploadState state, String server, RequestType requestType) { + this.requestId = requestId; + this.timestamp = timestamp; + this.state = state; + this.server = server; + this.requestType = requestType; + } + + @Override + public Long getRequestId() { + return requestId; + } + + @Override + public Long getTimestamp() { + return timestamp; + } + + @Override + public UploadState getState() { + return state; + } + + @Override + public String getServer() { + return server; + } + + @Override + public RequestType getRequestType() { + return requestType; + } + + @Override + public String toString() { + return "UploadEventInfoDTO{" + + "requestId=" + requestId + + ", timestamp=" + timestamp + + ", state=" + state + + ", server='" + server + '\'' + + ", requestType=" + requestType + + '}'; + } +} diff --git a/ass3-event/src/main/java/dst/ass3/event/model/domain/IUploadEventInfo.java b/ass3-event/src/main/java/dst/ass3/event/model/domain/IUploadEventInfo.java new file mode 100644 index 0000000..822e6ad --- /dev/null +++ b/ass3-event/src/main/java/dst/ass3/event/model/domain/IUploadEventInfo.java @@ -0,0 +1,11 @@ +package dst.ass3.event.model.domain; + +public interface IUploadEventInfo { + + Long getRequestId(); + Long getTimestamp(); + UploadState getState(); + + String getServer(); + RequestType getRequestType(); +} diff --git a/ass3-event/src/main/java/dst/ass3/event/model/domain/RequestType.java b/ass3-event/src/main/java/dst/ass3/event/model/domain/RequestType.java new file mode 100644 index 0000000..ea7107a --- /dev/null +++ b/ass3-event/src/main/java/dst/ass3/event/model/domain/RequestType.java @@ -0,0 +1,8 @@ +package dst.ass3.event.model.domain; + +public enum RequestType { + VIDEO, + QUIZ, + DOCUMENT, + OTHER +} diff --git a/ass3-event/src/main/java/dst/ass3/event/model/domain/UploadState.java b/ass3-event/src/main/java/dst/ass3/event/model/domain/UploadState.java new file mode 100644 index 0000000..b8f7fbc --- /dev/null +++ b/ass3-event/src/main/java/dst/ass3/event/model/domain/UploadState.java @@ -0,0 +1,8 @@ +package dst.ass3.event.model.domain; + +public enum UploadState { + QUEUED, + UPLOADING, + UPLOAD_FAILED, + UPLOADED +} diff --git a/ass3-event/src/main/java/dst/ass3/event/model/events/Alert.java b/ass3-event/src/main/java/dst/ass3/event/model/events/Alert.java new file mode 100644 index 0000000..be6e883 --- /dev/null +++ b/ass3-event/src/main/java/dst/ass3/event/model/events/Alert.java @@ -0,0 +1,47 @@ +package dst.ass3.event.model.events; + +import java.io.Serializable; +import java.util.List; + +/** + * A system alert that aggregates several warnings. + */ +public class Alert implements Serializable { + + private static final long serialVersionUID = -4561132671849230635L; + + private String server; + private List warnings; + + public Alert() { + } + + public Alert(String server, List warnings) { + this.server = server; + this.warnings = warnings; + } + + public String getServer() { + return server; + } + + public void setServer(String server) { + this.server = server; + } + + public List getWarnings() { + return warnings; + } + + public void setWarnings(List warnings) { + this.warnings = warnings; + } + + @Override + public String toString() { + return "Alert{" + + "server='" + server + '\'' + + ", warnings=" + warnings + + '}'; + } +} diff --git a/ass3-event/src/main/java/dst/ass3/event/model/events/AverageUploadDuration.java b/ass3-event/src/main/java/dst/ass3/event/model/events/AverageUploadDuration.java new file mode 100644 index 0000000..ababf24 --- /dev/null +++ b/ass3-event/src/main/java/dst/ass3/event/model/events/AverageUploadDuration.java @@ -0,0 +1,46 @@ +package dst.ass3.event.model.events; + +import java.io.Serializable; + +/** + * The average of several {@link UploadDuration} values. + */ +public class AverageUploadDuration implements Serializable { + + private static final long serialVersionUID = -3767582104941550250L; + + private String server; + private double duration; + + public AverageUploadDuration() { + } + + public AverageUploadDuration(String server, double duration) { + this.server = server; + this.duration = duration; + } + + public String getServer() { + return server; + } + + public void setServer(String server) { + this.server = server; + } + + public double getDuration() { + return duration; + } + + public void setDuration(double duration) { + this.duration = duration; + } + + @Override + public String toString() { + return "AverageUploadDuration{" + + "server='" + server + '\'' + + ", duration=" + duration + + '}'; + } +} diff --git a/ass3-event/src/main/java/dst/ass3/event/model/events/LifecycleEvent.java b/ass3-event/src/main/java/dst/ass3/event/model/events/LifecycleEvent.java new file mode 100644 index 0000000..3a203bc --- /dev/null +++ b/ass3-event/src/main/java/dst/ass3/event/model/events/LifecycleEvent.java @@ -0,0 +1,95 @@ +package dst.ass3.event.model.events; + +import java.io.Serializable; + +import dst.ass3.event.model.domain.IUploadEventInfo; +import dst.ass3.event.model.domain.RequestType; +import dst.ass3.event.model.domain.UploadState; + +/** + * Indicates a change in the lifecycle state of an IUploadEventInfo. + */ +public class LifecycleEvent implements Serializable { + + private static final long serialVersionUID = 8665269919851487210L; + + /** + * The id of the UploadRequest (the MOOC business concept), as returned by {@link IUploadEventInfo#getRequestId()}. + */ + private long requestId; + + private UploadState state; + private String server; + private RequestType requestType; + + /** + * The instant the event was recorded (unix epoch in milliseconds) + */ + private long timestamp; + + public LifecycleEvent() { + } + + public LifecycleEvent(IUploadEventInfo eventInfo) { + this(eventInfo.getRequestId(), eventInfo.getState(), eventInfo.getServer(), eventInfo.getRequestType(), eventInfo.getTimestamp()); + } + + public LifecycleEvent(long requestId, UploadState state, String server, RequestType requestType, long timestamp) { + this.requestId = requestId; + this.state = state; + this.server = server; + this.requestType = requestType; + this.timestamp = timestamp; + } + + public long getRequestId() { + return requestId; + } + + public void setRequestId(long requestId) { + this.requestId = requestId; + } + + public UploadState getState() { + return state; + } + + public void setState(UploadState state) { + this.state = state; + } + + public String getServer() { + return server; + } + + public void setServer(String server) { + this.server = server; + } + + public RequestType getRequestType() { + return requestType; + } + + public void setRequestType(RequestType requestType) { + this.requestType = requestType; + } + + public long getTimestamp() { + return timestamp; + } + + public void setTimestamp(long timestamp) { + this.timestamp = timestamp; + } + + @Override + public String toString() { + return "LifecycleEvent{" + + "requestId=" + requestId + + ", state=" + state + + ", server='" + server + '\'' + + ", requestType=" + requestType + + ", timestamp=" + timestamp + + '}'; + } +} diff --git a/ass3-event/src/main/java/dst/ass3/event/model/events/UploadDuration.java b/ass3-event/src/main/java/dst/ass3/event/model/events/UploadDuration.java new file mode 100644 index 0000000..a8d44cf --- /dev/null +++ b/ass3-event/src/main/java/dst/ass3/event/model/events/UploadDuration.java @@ -0,0 +1,57 @@ +package dst.ass3.event.model.events; + +import java.io.Serializable; + +/** + * Indicates the amount of time an IUploadEventInfo took to get from QUEUED to UPLOADED. + */ +public class UploadDuration implements Serializable { + + private static final long serialVersionUID = -6976972381929291369L; + + private long eventId; + private String server; + private long duration; + + public UploadDuration() { + } + + public UploadDuration(long eventId, String server, long duration) { + this.eventId = eventId; + this.server = server; + this.duration = duration; + } + + public long getEventId() { + return eventId; + } + + public void setEventId(long eventId) { + this.eventId = eventId; + } + + public String getServer() { + return server; + } + + public void setServer(String server) { + this.server = server; + } + + public long getDuration() { + return duration; + } + + public void setDuration(long duration) { + this.duration = duration; + } + + @Override + public String toString() { + return "UploadDuration{" + + "eventId=" + eventId + + ", server='" + server + '\'' + + ", duration=" + duration + + '}'; + } +} diff --git a/ass3-event/src/main/java/dst/ass3/event/model/events/UploadFailedWarning.java b/ass3-event/src/main/java/dst/ass3/event/model/events/UploadFailedWarning.java new file mode 100644 index 0000000..45cacd3 --- /dev/null +++ b/ass3-event/src/main/java/dst/ass3/event/model/events/UploadFailedWarning.java @@ -0,0 +1,35 @@ +package dst.ass3.event.model.events; + +/** + * Indicates that an upload has probably failed. + */ +public class UploadFailedWarning extends Warning { + + private static final long serialVersionUID = -9120187311385112769L; + + private long eventId; + + public UploadFailedWarning() { + super(""); + } + + public UploadFailedWarning(long eventId, String server) { + super(server); + this.eventId = eventId; + } + + public long getEventId() { + return eventId; + } + + public void setEventId(long eventId) { + this.eventId = eventId; + } + + @Override + public String toString() { + return "UploadFailedWarning{" + + "eventId=" + eventId + + '}'; + } +} diff --git a/ass3-event/src/main/java/dst/ass3/event/model/events/UploadTimeoutWarning.java b/ass3-event/src/main/java/dst/ass3/event/model/events/UploadTimeoutWarning.java new file mode 100644 index 0000000..57d6248 --- /dev/null +++ b/ass3-event/src/main/java/dst/ass3/event/model/events/UploadTimeoutWarning.java @@ -0,0 +1,35 @@ +package dst.ass3.event.model.events; + +/** + * Warning that indicates that an upload event has not reached the lifecycle state UPLOADED within a given time frame. + */ +public class UploadTimeoutWarning extends Warning { + + private static final long serialVersionUID = 7955599732178947649L; + + private long eventId; + + public UploadTimeoutWarning() { + super(""); + } + + public UploadTimeoutWarning(long eventId, String server) { + super(server); + this.eventId = eventId; + } + + public long getEventId() { + return eventId; + } + + public void setEventId(long eventId) { + this.eventId = eventId; + } + + @Override + public String toString() { + return "UploadTimeoutWarning{" + + "eventId=" + eventId + + '}'; + } +} diff --git a/ass3-event/src/main/java/dst/ass3/event/model/events/Warning.java b/ass3-event/src/main/java/dst/ass3/event/model/events/Warning.java new file mode 100644 index 0000000..b643bda --- /dev/null +++ b/ass3-event/src/main/java/dst/ass3/event/model/events/Warning.java @@ -0,0 +1,35 @@ +package dst.ass3.event.model.events; + +import java.io.Serializable; + +/** + * Base class for server warnings. + */ +public abstract class Warning implements Serializable { + + private static final long serialVersionUID = 273266717303711974L; + + private String server; + + public Warning() { + } + + public Warning(String server) { + this.server = server; + } + + public String getServer() { + return server; + } + + public void setServer(String server) { + this.server = server; + } + + @Override + public String toString() { + return "Warning{" + + "server='" + server + '\'' + + '}'; + } +} diff --git a/ass3-event/src/main/resources/executionPlan.json b/ass3-event/src/main/resources/executionPlan.json new file mode 100644 index 0000000..afd691b --- /dev/null +++ b/ass3-event/src/main/resources/executionPlan.json @@ -0,0 +1 @@ +// TODO: add the data from the execution plan export diff --git a/ass3-event/src/main/resources/logback.xml b/ass3-event/src/main/resources/logback.xml new file mode 100644 index 0000000..e423131 --- /dev/null +++ b/ass3-event/src/main/resources/logback.xml @@ -0,0 +1,16 @@ + + + + + + %d{yyyy-MM-dd HH:mm:ss.SSS} - %highlight(%5p) [%12.12thread] %cyan(%-40.40logger{39}): %m%n + + + + + + + + + + diff --git a/ass3-event/src/test/java/dst/ass3/event/Ass3EventTestBase.java b/ass3-event/src/test/java/dst/ass3/event/Ass3EventTestBase.java new file mode 100644 index 0000000..93d7d53 --- /dev/null +++ b/ass3-event/src/test/java/dst/ass3/event/Ass3EventTestBase.java @@ -0,0 +1,89 @@ +package dst.ass3.event; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.shaded.guava18.com.google.common.util.concurrent.MoreExecutors; +import org.apache.flink.streaming.api.TimeCharacteristic; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.junit.After; +import org.junit.Before; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Ass3EventTestBase. + */ +public abstract class Ass3EventTestBase { + + private static final Logger LOG = LoggerFactory.getLogger(Ass3EventTestBase.class); + + /** + * Flag to control Flink's sysout logging + */ + public static final boolean FLINK_DEBUG = false; + + protected EventPublisher publisher; + protected StreamExecutionEnvironment flink; + protected ExecutorService executor; + + private static EventPublisher previousPublisher; + + @Before + public void setUpResources() throws Exception { + executor = Executors.newCachedThreadPool(); + + if (previousPublisher != null) { + previousPublisher.close(); + } + + publisher = createEventPublisher(); + previousPublisher = publisher; + publisher.start(); + + flink = createStreamExecutionEnvironment(); + doConfigure(flink); + } + + @After + public void tearDownResources() throws Exception { + publisher.close(); + MoreExecutors.shutdownAndAwaitTermination(executor, 5, TimeUnit.SECONDS); + previousPublisher = null; + } + + protected EventPublisher createEventPublisher() { + return new EventPublisher(Constants.EVENT_PUBLISHER_PORT); + } + + protected StreamExecutionEnvironment createStreamExecutionEnvironment() { + return StreamExecutionEnvironment.createLocalEnvironment(1); + } + + protected void doConfigure(StreamExecutionEnvironment env) { + ExecutionConfig config = env.getConfig(); + + if (FLINK_DEBUG) { + config.enableSysoutLogging(); + } else { + config.disableSysoutLogging(); + } + + flink.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); + } + + protected static void sleep(long ms) { + try { + Thread.sleep(ms); + } catch (InterruptedException e) { + // ignore + } + } + + protected static long now() { + return System.currentTimeMillis(); + } + +} diff --git a/ass3-event/src/test/java/dst/ass3/event/Ass3EventTestSuite.java b/ass3-event/src/test/java/dst/ass3/event/Ass3EventTestSuite.java new file mode 100644 index 0000000..21d8fe7 --- /dev/null +++ b/ass3-event/src/test/java/dst/ass3/event/Ass3EventTestSuite.java @@ -0,0 +1,23 @@ +package dst.ass3.event; + +import dst.ass3.event.tests.Ass3_3_2Test; +import dst.ass3.event.tests.Ass3_3_3Test; +import dst.ass3.event.tests.Ass3_3_4Test; +import org.junit.runner.RunWith; +import org.junit.runners.Suite; +import org.junit.runners.Suite.SuiteClasses; + +import dst.ass3.event.tests.Ass3_3_1Test; + +/** + * Ass3EventTestSuite. + */ +@RunWith(Suite.class) +@SuiteClasses({ + Ass3_3_1Test.class, + Ass3_3_2Test.class, + Ass3_3_3Test.class, + Ass3_3_4Test.class +}) +public class Ass3EventTestSuite { +} diff --git a/ass3-event/src/test/java/dst/ass3/event/EventProcessingTestBase.java b/ass3-event/src/test/java/dst/ass3/event/EventProcessingTestBase.java new file mode 100644 index 0000000..ad8eb59 --- /dev/null +++ b/ass3-event/src/test/java/dst/ass3/event/EventProcessingTestBase.java @@ -0,0 +1,87 @@ +package dst.ass3.event; + +import static org.junit.Assert.assertNotNull; + +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; + +import dst.ass3.event.model.events.*; +import org.apache.flink.api.common.JobExecutionResult; +import org.apache.flink.streaming.api.windowing.time.Time; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.rules.Timeout; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * EventProcessingTestBase. + */ +public class EventProcessingTestBase extends Ass3EventTestBase { + + private static final Logger LOG = LoggerFactory.getLogger(EventProcessingTestBase.class); + + @Rule + public Timeout timeout = new Timeout(15, TimeUnit.SECONDS); + + private IEventProcessingEnvironment epe; + + protected StaticQueueSink lifecycleEvents; + protected StaticQueueSink uploadDurations; + protected StaticQueueSink averageUploadDurations; + protected StaticQueueSink uploadTimeoutWarnings; + protected StaticQueueSink uploadFailedWarnings; + protected StaticQueueSink alerts; + + @Before + public void setUpEnvironment() throws Exception { + epe = EventProcessingFactory.createEventProcessingEnvironment(); + + assertNotNull("#createEventProcessingEnvironment() not implemented", epe); + + lifecycleEvents = new StaticQueueSink<>("lifecycleEvents"); + uploadDurations = new StaticQueueSink<>("uploadDurations"); + averageUploadDurations = new StaticQueueSink<>("averageUploadDurations"); + uploadTimeoutWarnings = new StaticQueueSink<>("uploadTimeoutWarnings"); + uploadFailedWarnings = new StaticQueueSink<>("uploadFailedWarnings"); + alerts = new StaticQueueSink<>("alerts"); + + epe.setLifecycleEventStreamSink(lifecycleEvents); + epe.setUploadDurationStreamSink(uploadDurations); + epe.setAverageUploadDurationStreamSink(averageUploadDurations); + epe.setUploadTimeoutWarningStreamSink(uploadTimeoutWarnings); + epe.setUploadFailedWarningStreamSink(uploadFailedWarnings); + epe.setAlertStreamSink(alerts); + epe.setUploadDurationTimeout(Time.seconds(15)); + } + + public JobExecutionResult initAndExecute() throws Exception { + return initAndExecute(null); + } + + public JobExecutionResult initAndExecute(Consumer initializer) throws Exception { + if (initializer != null) { + initializer.accept(epe); + } + LOG.info("Initializing StreamExecutionEnvironment with {}", epe); + epe.initialize(flink); + LOG.info("Executing flink {}", flink); + return flink.execute(); + } + + public Future initAndExecuteAsync(Consumer initializer) { + return executor.submit(() -> initAndExecute(initializer)); + } + + public Future initAndExecuteAsync() { + return executor.submit(() -> initAndExecute()); + } + + @After + public void tearDownCollectors() throws Exception { + StaticQueueSink.clearAll(); + } + +} diff --git a/ass3-event/src/test/java/dst/ass3/event/StaticQueueSink.java b/ass3-event/src/test/java/dst/ass3/event/StaticQueueSink.java new file mode 100644 index 0000000..805cd8f --- /dev/null +++ b/ass3-event/src/test/java/dst/ass3/event/StaticQueueSink.java @@ -0,0 +1,83 @@ +package dst.ass3.event; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; + +import org.apache.flink.streaming.api.functions.sink.SinkFunction; + +/** + * A SinkFunction that collects objects into a queue located in a shared global state. Each collector accesses a + * specific key in the shared state. + * + * @param the sink input type + */ +public class StaticQueueSink implements SinkFunction { + + private static final long serialVersionUID = -3965500756295835669L; + + private static Map> state = new ConcurrentHashMap<>(); + + private String key; + + public StaticQueueSink(String key) { + this.key = key; + } + + @Override + public void invoke(T value, Context context) throws Exception { + get().add(value); + } + + public void clear() { + get().clear(); + } + + public List take(int n) { + List list = new ArrayList<>(n); + + for (int i = 0; i < n; i++) { + try { + list.add(get().take()); + } catch (InterruptedException e) { + throw new RuntimeException("Interrupted while accessing queue", e); + } + } + + return list; + } + + public T take() { + try { + return get().take(); + } catch (InterruptedException e) { + return null; + } + } + + public T poll(long ms) { + try { + return get().poll(ms, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + return null; + } + } + + public synchronized BlockingQueue get() { + return get(key); + } + + @SuppressWarnings("unchecked") + private static BlockingQueue get(String key) { + return (BlockingQueue) state.computeIfAbsent(key, k -> new LinkedBlockingQueue<>()); + } + + public static void clearAll() { + state.clear(); + } +} diff --git a/ass3-event/src/test/java/dst/ass3/event/tests/Ass3_3_1Test.java b/ass3-event/src/test/java/dst/ass3/event/tests/Ass3_3_1Test.java new file mode 100644 index 0000000..053decd --- /dev/null +++ b/ass3-event/src/test/java/dst/ass3/event/tests/Ass3_3_1Test.java @@ -0,0 +1,152 @@ +package dst.ass3.event.tests; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.fail; + +import java.io.ByteArrayOutputStream; +import java.io.NotSerializableException; +import java.io.ObjectOutputStream; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +import dst.ass3.event.dto.UploadEventInfoDTO; +import dst.ass3.event.model.domain.IUploadEventInfo; +import dst.ass3.event.model.domain.RequestType; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.Timeout; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import dst.ass3.event.Ass3EventTestBase; +import dst.ass3.event.EventProcessingFactory; +import dst.ass3.event.IEventSourceFunction; +import dst.ass3.event.model.domain.UploadState; + +public class Ass3_3_1Test extends Ass3EventTestBase { + + private static final Logger LOG = LoggerFactory.getLogger(Ass3_3_1Test.class); + + @Rule + public Timeout timeout = new Timeout(15, TimeUnit.SECONDS); + + private IEventSourceFunction sourceFunction; + + @Before + public void setUp() throws Exception { + sourceFunction = EventProcessingFactory.createEventSourceFunction(); + assertNotNull("EventProcessingFactory#createEventSourceFunction() not implemented", sourceFunction); + } + + @Test + public void open_shouldConnectToSubscriber() throws Exception { + assertEquals( + "IEventSourceFunction should not be connected upon construction", + 0, publisher.getConnectedClientCount() + ); + + sourceFunction.open(new Configuration()); + publisher.waitForClients(); + + assertEquals( + "Expected IEventSourceFunction to connect to publisher after open is called", + 1, publisher.getConnectedClientCount() + ); + } + + @Test + public void run_shouldCollectPublishedEvents() throws Exception { + sourceFunction.open(new Configuration()); + publisher.waitForClients(); + + Future> result = executor.submit(() -> { + MockContext ctx = new MockContext<>(); + LOG.info("Running IEventSourceFunction with MockContext"); + sourceFunction.run(ctx); + LOG.info("Done running IEventSourceFunction, returning collected events"); + return ctx.collected; + }); + + publisher.publish(new UploadEventInfoDTO(1L, 0L, UploadState.QUEUED, "s1", RequestType.VIDEO)); + publisher.publish(new UploadEventInfoDTO(2L, 0L, UploadState.QUEUED, "s2", RequestType.VIDEO)); + + sleep(1000); + + LOG.info("Calling cancel on SourceFunction"); + sourceFunction.cancel(); + + LOG.info("Dropping subscriber connections"); + publisher.dropClients(); + + LOG.info("Calling close on SourceFunction"); + sourceFunction.close(); + + List collected = result.get(); + assertEquals(2, collected.size()); + + IUploadEventInfo e0 = collected.get(0); + IUploadEventInfo e1 = collected.get(1); + + assertEquals(1L, e0.getRequestId(), 0); + assertEquals(2L, e1.getRequestId(), 0); + } + + @Test + public void shouldBeSerializable() throws Exception { + try (ObjectOutputStream out = new ObjectOutputStream(new ByteArrayOutputStream())) { + out.writeObject(sourceFunction); + out.flush(); + } catch (NotSerializableException e) { + fail("Implementation of IEventSourceFunction is not serializable"); + } + } + + private static class MockContext implements SourceFunction.SourceContext { + + private final Object checkpointLock = new Object(); + + private List collected = new ArrayList<>(); + + public List getCollected() { + return collected; + } + + @Override + public void collect(T element) { + collected.add(element); + } + + @Override + public void collectWithTimestamp(T element, long timestamp) { + collected.add(element); + } + + @Override + public void emitWatermark(Watermark mark) { + + } + + @Override + public void markAsTemporarilyIdle() { + + } + + @Override + public Object getCheckpointLock() { + return checkpointLock; + } + + @Override + public void close() { + + } + } + +} diff --git a/ass3-event/src/test/java/dst/ass3/event/tests/Ass3_3_2Test.java b/ass3-event/src/test/java/dst/ass3/event/tests/Ass3_3_2Test.java new file mode 100644 index 0000000..52a4535 --- /dev/null +++ b/ass3-event/src/test/java/dst/ass3/event/tests/Ass3_3_2Test.java @@ -0,0 +1,88 @@ +package dst.ass3.event.tests; + +import dst.ass3.event.EventProcessingTestBase; +import dst.ass3.event.dto.UploadEventInfoDTO; +import dst.ass3.event.model.domain.RequestType; +import dst.ass3.event.model.events.LifecycleEvent; +import org.apache.flink.api.common.JobExecutionResult; +import org.hamcrest.Matchers; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.Future; + +import static dst.ass3.event.model.domain.UploadState.QUEUED; +import static dst.ass3.event.model.domain.UploadState.UPLOADED; +import static dst.ass3.event.model.domain.UploadState.UPLOADING; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertThat; + +public class Ass3_3_2Test extends EventProcessingTestBase { + + private static final Logger LOG = LoggerFactory.getLogger(Ass3_3_2Test.class); + + @Test + public void lifecycleEventStream_worksCorrectly() throws Exception { + // run flink in new thread + Future flinkExecution = initAndExecuteAsync(); + + // wait until a subscriber connects + LOG.info("Waiting for subscribers to connect"); + publisher.waitForClients(); + + LifecycleEvent event; + + long then = System.currentTimeMillis(); + + // causes LifecycleEvent 0 + LOG.info("Publishing e0"); + publisher.publish(new UploadEventInfoDTO(0L, then, QUEUED, "s1", RequestType.VIDEO)); + + LOG.info("Collecting LifecycleEvent for e0"); + event = lifecycleEvents.take(); + LOG.info("Round-trip took {}ms", System.currentTimeMillis() - then); + + assertEquals("Event ID not set correctly", 0L, event.getRequestId()); + assertEquals("State not set correctly", QUEUED, event.getState()); + assertEquals("Request type not set correctly", RequestType.VIDEO, event.getRequestType()); + assertThat("Timestamp not set correctly", event.getTimestamp(), Matchers.greaterThanOrEqualTo(then)); + assertThat("Timestamp not set correctly", event.getTimestamp(), Matchers.lessThanOrEqualTo(System.currentTimeMillis())); + + // should be filtered + LOG.info("Publishing e1, should be filtered"); + publisher.publish(new UploadEventInfoDTO(1L, now(), QUEUED, "s1", RequestType.OTHER)); + + assertNull("Events of type OTHER should be filtered", lifecycleEvents.poll(500)); + + // causes LifecycleEvent 1 + LOG.info("Publishing e2"); + publisher.publish(new UploadEventInfoDTO(2L, now(), UPLOADING, "s1", RequestType.DOCUMENT)); + + LOG.info("Collecting LifecycleEvent for e2"); + event = lifecycleEvents.take(); + assertEquals(2L, event.getRequestId()); + assertEquals(UPLOADING, event.getState()); + + // should be filtered + LOG.info("Publishing e3, should be filtered"); + publisher.publish(new UploadEventInfoDTO(3L, now(), QUEUED, "s2", RequestType.OTHER)); + assertNull("Events of type OTHER should be filtered", lifecycleEvents.poll(500)); + + // causes LifecycleEvent 2 + LOG.info("Publishing e4"); + publisher.publish(new UploadEventInfoDTO(4L, now(), UPLOADED, "s1", RequestType.QUIZ)); + + LOG.info("Collecting LifecycleEvent for e4"); + event = lifecycleEvents.take(); + assertEquals(4L, event.getRequestId()); + assertEquals(UPLOADED, event.getState()); + + // disconnect subscribers + publisher.dropClients(); + + // wait for execution to end + flinkExecution.get(); + } +} diff --git a/ass3-event/src/test/java/dst/ass3/event/tests/Ass3_3_3Test.java b/ass3-event/src/test/java/dst/ass3/event/tests/Ass3_3_3Test.java new file mode 100644 index 0000000..5b742e4 --- /dev/null +++ b/ass3-event/src/test/java/dst/ass3/event/tests/Ass3_3_3Test.java @@ -0,0 +1,209 @@ +package dst.ass3.event.tests; + +import dst.ass3.event.EventProcessingTestBase; +import dst.ass3.event.dto.UploadEventInfoDTO; +import dst.ass3.event.model.domain.RequestType; +import dst.ass3.event.model.events.LifecycleEvent; +import dst.ass3.event.model.events.UploadDuration; +import dst.ass3.event.model.events.UploadFailedWarning; +import dst.ass3.event.model.events.UploadTimeoutWarning; +import org.apache.flink.api.common.JobExecutionResult; +import org.apache.flink.streaming.api.windowing.time.Time; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Future; + +import static dst.ass3.event.model.domain.UploadState.*; +import static org.hamcrest.Matchers.greaterThan; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertThat; + +public class Ass3_3_3Test extends EventProcessingTestBase { + + private static final Logger LOG = LoggerFactory.getLogger(Ass3_3_3Test.class); + + @Test + public void uploadDurations_areCalculatedCorrectly() throws Exception { + // tests whether duration calculation and stream keying works correctly + // expects LifecycleEvent stream to work correctly + + Future flinkExecution = initAndExecuteAsync(); + + LOG.info("Waiting for subscribers to connect"); + publisher.waitForClients(); + + LifecycleEvent e1Start; + LifecycleEvent e2Start; + LifecycleEvent e1End; + LifecycleEvent e2End; + + publisher.publish(5, t -> new UploadEventInfoDTO(1L, t, QUEUED, "s1", RequestType.DOCUMENT)); // 1 starts before 2 + publisher.publish(5, t -> new UploadEventInfoDTO(2L, t, QUEUED, "s1", RequestType.DOCUMENT)); + publisher.publish(5, t -> new UploadEventInfoDTO(3L, t, QUEUED, "s1", RequestType.DOCUMENT)); // 3 never finishes + + LOG.info("Waiting for LifecycleEvent for event 1 being QUEUED"); + e1Start = lifecycleEvents.take(); + LOG.info("Waiting for LifecycleEvent for event 2 being QUEUED"); + e2Start = lifecycleEvents.take(); + LOG.info("Waiting for LifecycleEvent for event 3 being QUEUED"); + lifecycleEvents.take(); + + sleep(500); + publisher.publish(5, t -> new UploadEventInfoDTO(1L, t, UPLOADING, "s1", RequestType.DOCUMENT)); + publisher.publish(5, t -> new UploadEventInfoDTO(2L, t, UPLOADING, "s1", RequestType.DOCUMENT)); + + LOG.info("Waiting for LifecycleEvent for event 1 being UPLOADING"); + lifecycleEvents.take(); + LOG.info("Waiting for LifecycleEvent for event 2 being UPLOADING"); + lifecycleEvents.take(); + + sleep(500); // event 2 took about ~1000ms + publisher.publish(new UploadEventInfoDTO(2L, now(), UPLOADED, "s1", RequestType.DOCUMENT)); // 2 finishes before 1 + + LOG.info("Waiting for LifecycleEvent for event 2 being UPLOADED"); + e2End = lifecycleEvents.take(); + + sleep(500); // event 1 took about ~1500ms + publisher.publish(new UploadEventInfoDTO(1L, now(), UPLOADED, "s1", RequestType.DOCUMENT)); + + LOG.info("Waiting for LifecycleEvent for event 1 being UPLOADED"); + e1End = lifecycleEvents.take(); + + LOG.info("Collecting UploadDuration event for event 2"); + UploadDuration d0 = uploadDurations.take(); + + LOG.info("Collecting UploadDuration event for event 1"); + UploadDuration d1 = uploadDurations.take(); + + assertEquals("Expected event 2 to finish first", 2L, d0.getEventId()); // event 2 finished before 1 + assertEquals("Expected event 1 to finish last", 1L, d1.getEventId()); + + assertThat("Expected UploadDuration to be >= 0", d0.getDuration(), greaterThan(0L)); + assertThat("Expected UploadDuration to be >= 0", d1.getDuration(), greaterThan(0L)); + + assertEquals("StreamDuartion was not calculated from LifecycleEvents correctly", + e2End.getTimestamp() - e2Start.getTimestamp(), d0.getDuration(), 100); + + assertEquals("StreamDuartion was not calculated from LifecycleEvents correctly", + e1End.getTimestamp() - e1Start.getTimestamp(), d1.getDuration(), 100); + + publisher.dropClients(); + flinkExecution.get(); + } + + @Test + public void durationsWithInterleavedEvents_areCalculatedCorrectly() throws Exception { + // tests whether CEP rule is tolerant towards multiple state changes between QUEUED and UPLOADED + + Future flinkExecution = initAndExecuteAsync(e -> + e.setUploadDurationTimeout(Time.seconds(1)) + ); + + LOG.info("Waiting for subscribers to connect"); + publisher.waitForClients(); + + publisher.publish(5, t -> new UploadEventInfoDTO(1L, t, QUEUED, "s1", RequestType.DOCUMENT)); + publisher.publish(5, t -> new UploadEventInfoDTO(2L, t, QUEUED, "s1", RequestType.DOCUMENT)); // never finishes + + // change state several times (tests another aspect of the CEP rule) + publisher.publish(5, t -> new UploadEventInfoDTO(1L, t, UPLOADING, "s1", RequestType.DOCUMENT)); + publisher.publish(5, t -> new UploadEventInfoDTO(1L, t, UPLOAD_FAILED, "s1", RequestType.DOCUMENT)); + publisher.publish(5, t -> new UploadEventInfoDTO(1L, t, UPLOADING, "s1", RequestType.DOCUMENT)); + publisher.publish(5, t -> new UploadEventInfoDTO(1L, t, UPLOAD_FAILED, "s1", RequestType.DOCUMENT)); + + publisher.publish(5, t -> new UploadEventInfoDTO(1L, t, UPLOADED, "s1", RequestType.DOCUMENT)); + + publisher.dropClients(); + flinkExecution.get(); + + List result = new ArrayList<>(uploadDurations.get()); + assertEquals("Expected one event to have finished", 1, result.size()); + + UploadDuration d0 = result.get(0); + assertEquals("Expected event 1 to have finished", 1L, d0.getEventId()); + } + + @Test + public void timeoutWarnings_areEmittedCorrectly() throws Exception { + Future flinkExecution = initAndExecuteAsync(e -> { + e.setUploadDurationTimeout(Time.seconds(1)); + }); + + LOG.info("Waiting for subscribers to connect"); + publisher.waitForClients(); + + publisher.publish(new UploadEventInfoDTO(1L, now(), QUEUED, "s1", RequestType.DOCUMENT)); // never finishes + + sleep(100); + publisher.publish(new UploadEventInfoDTO(2L, now(), QUEUED, "s1", RequestType.DOCUMENT)); // never finishes + + // confounding event + sleep(100); + publisher.publish(new UploadEventInfoDTO(1L, now(), UPLOADING, "s1", RequestType.DOCUMENT)); + + sleep(1500); // wait for event to time out + + publisher.dropClients(); + + LOG.info("Waiting for Flink execution to end"); + flinkExecution.get(); + + LOG.info("Collecting timeout warning for event 1"); + UploadTimeoutWarning w1 = uploadTimeoutWarnings.take(); + + LOG.info("Collecting timeout warning for event 2"); + UploadTimeoutWarning w2 = uploadTimeoutWarnings.take(); + + assertEquals("Expected event 1 to time out first", 1L, w1.getEventId()); + assertEquals("Expected event 2 to time out second", 2L, w2.getEventId()); + } + + @Test + public void uploadFailedWarnings_areEmittedCorrectly() throws Exception { + Future flinkExecution = initAndExecuteAsync(); + + LOG.info("Waiting for subscribers to connect"); + publisher.waitForClients(); + + publisher.publish(0, t -> new UploadEventInfoDTO(1L, t, QUEUED, "s1", RequestType.DOCUMENT)); + publisher.publish(0, t -> new UploadEventInfoDTO(2L, t, QUEUED, "s1", RequestType.DOCUMENT)); + + // event 1 fail #1 + publisher.publish(5, t -> new UploadEventInfoDTO(1L, t, UPLOADING, "s1", RequestType.DOCUMENT)); + publisher.publish(5, t -> new UploadEventInfoDTO(1L, t, UPLOAD_FAILED, "s1", RequestType.DOCUMENT)); + + // event 2 fail #1 + publisher.publish(5, t -> new UploadEventInfoDTO(2L, t, UPLOADING, "s1", RequestType.DOCUMENT)); + publisher.publish(5, t -> new UploadEventInfoDTO(2L, t, UPLOAD_FAILED, "s1", RequestType.DOCUMENT)); + + // event 1 fail #2 + publisher.publish(5, t -> new UploadEventInfoDTO(1L, t, UPLOADING, "s1", RequestType.DOCUMENT)); + publisher.publish(5, t -> new UploadEventInfoDTO(1L, t, UPLOAD_FAILED, "s1", RequestType.DOCUMENT)); + + // event 2 fail #2 and then success + publisher.publish(5, t -> new UploadEventInfoDTO(2L, t, UPLOADING, "s1", RequestType.DOCUMENT)); + publisher.publish(5, t -> new UploadEventInfoDTO(2L, t, UPLOAD_FAILED, "s1", RequestType.DOCUMENT)); + publisher.publish(5, t -> new UploadEventInfoDTO(2L, t, UPLOADED, "s1", RequestType.DOCUMENT)); + + LOG.info("Checking that no UploadFailedWarning was issued yet"); + assertNull(uploadFailedWarnings.poll(500)); + + LOG.info("Triggering third failure"); + // event 1 fail #3 + publisher.publish(5, t -> new UploadEventInfoDTO(1L, t, UPLOADING, "s1", RequestType.DOCUMENT)); + publisher.publish(5, t -> new UploadEventInfoDTO(1L, t, UPLOAD_FAILED, "s1", RequestType.DOCUMENT)); + + LOG.info("Waiting for UploadFailedWarning for event 1"); + UploadFailedWarning warning = uploadFailedWarnings.take(); + assertEquals(1L, warning.getEventId()); + assertEquals("s1", warning.getServer()); + + publisher.dropClients(); + flinkExecution.get(); + } +} diff --git a/ass3-event/src/test/java/dst/ass3/event/tests/Ass3_3_4Test.java b/ass3-event/src/test/java/dst/ass3/event/tests/Ass3_3_4Test.java new file mode 100644 index 0000000..6be551d --- /dev/null +++ b/ass3-event/src/test/java/dst/ass3/event/tests/Ass3_3_4Test.java @@ -0,0 +1,300 @@ +package dst.ass3.event.tests; + +import dst.ass3.event.EventProcessingTestBase; +import dst.ass3.event.dto.UploadEventInfoDTO; +import dst.ass3.event.model.domain.RequestType; +import dst.ass3.event.model.events.*; +import org.apache.flink.api.common.JobExecutionResult; +import org.apache.flink.streaming.api.windowing.time.Time; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Future; +import java.util.function.BiConsumer; +import java.util.function.Consumer; + +import static dst.ass3.event.model.domain.UploadState.*; +import static dst.ass3.event.model.domain.UploadState.QUEUED; +import static dst.ass3.event.model.domain.UploadState.UPLOADED; +import static org.hamcrest.Matchers.instanceOf; +import static org.junit.Assert.*; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThat; + +public class Ass3_3_4Test extends EventProcessingTestBase { + + private static final Logger LOG = LoggerFactory.getLogger(Ass3_3_4Test.class); + + @Test + public void multipleUploadFailures_triggerAlert() throws Exception { + // checks that the window works correctly + // expects UploadFailedWarning stream to work correctly + + Future flinkExecution = initAndExecuteAsync(); + + LOG.info("Waiting for subscribers to connect"); + publisher.waitForClients(); + + Consumer causeWarning = (eventId) -> { + publisher.publish(5, t -> new UploadEventInfoDTO(eventId, t, UPLOADING, "s1", RequestType.DOCUMENT)); + publisher.publish(5, t -> new UploadEventInfoDTO(eventId, t, UPLOAD_FAILED, "s1", RequestType.DOCUMENT)); + publisher.publish(5, t -> new UploadEventInfoDTO(eventId, t, UPLOADING, "s1", RequestType.DOCUMENT)); + publisher.publish(5, t -> new UploadEventInfoDTO(eventId, t, UPLOAD_FAILED, "s1", RequestType.DOCUMENT)); + publisher.publish(5, t -> new UploadEventInfoDTO(eventId, t, UPLOADING, "s1", RequestType.DOCUMENT)); + publisher.publish(5, t -> new UploadEventInfoDTO(eventId, t, UPLOAD_FAILED, "s1", RequestType.DOCUMENT)); + }; + + // all of these events will fail + publisher.publish(new UploadEventInfoDTO(1L, now(), QUEUED, "s1", RequestType.DOCUMENT)); + publisher.publish(new UploadEventInfoDTO(2L, now(), QUEUED, "s1", RequestType.DOCUMENT)); + publisher.publish(new UploadEventInfoDTO(3L, now(), QUEUED, "s1", RequestType.DOCUMENT)); + publisher.publish(new UploadEventInfoDTO(4L, now(), QUEUED, "s1", RequestType.DOCUMENT)); + publisher.publish(new UploadEventInfoDTO(5L, now(), QUEUED, "s1", RequestType.DOCUMENT)); + publisher.publish(new UploadEventInfoDTO(6L, now(), QUEUED, "s1", RequestType.DOCUMENT)); + + // warning 3 warnings + causeWarning.accept(1L); + causeWarning.accept(2L); + causeWarning.accept(3L); + + LOG.info("Collecting alert for first three warnings"); + assertNotNull(alerts.take()); + + LOG.info("Checking that the fourth warning does not trigger an alert"); + causeWarning.accept(4L); + assertNull(alerts.poll(500)); + + LOG.info("Checking that the fifth warning does not trigger an alert"); + causeWarning.accept(5L); + assertNull(alerts.poll(500)); + + LOG.info("Checking that the sixth warning triggered an alert"); + causeWarning.accept(6L); + assertNotNull(alerts.take()); + + publisher.dropClients(); + flinkExecution.get(); + } + + @Test + public void uploadFailuresAndTimeouts_triggerAlert() throws Exception { + // checks whether keying works correctly, and whether Warning streams are unioned correctly + // expects both warning streams to work correctly + + Future flinkExecution = initAndExecuteAsync(e -> + e.setUploadDurationTimeout(Time.seconds(3)) + ); + + LOG.info("Waiting for subscribers to connect"); + publisher.waitForClients(); + + BiConsumer causeWarning = (eventId, server) -> { + publisher.publish(5, t -> new UploadEventInfoDTO(eventId, t, UPLOADING, server, RequestType.DOCUMENT)); + publisher.publish(5, t -> new UploadEventInfoDTO(eventId, t, UPLOAD_FAILED, server, RequestType.DOCUMENT)); + publisher.publish(5, t -> new UploadEventInfoDTO(eventId, t, UPLOADING, server, RequestType.DOCUMENT)); + publisher.publish(5, t -> new UploadEventInfoDTO(eventId, t, UPLOAD_FAILED, server, RequestType.DOCUMENT)); + publisher.publish(5, t -> new UploadEventInfoDTO(eventId, t, UPLOADING, server, RequestType.DOCUMENT)); + publisher.publish(5, t -> new UploadEventInfoDTO(eventId, t, UPLOAD_FAILED, server, RequestType.DOCUMENT)); + }; + + publisher.publish(new UploadEventInfoDTO(1L, now(), QUEUED, "s1", RequestType.DOCUMENT)); // s1 e1 will fail + publisher.publish(new UploadEventInfoDTO(2L, now(), QUEUED, "s1", RequestType.DOCUMENT)); // s1 e2 will fail + publisher.publish(new UploadEventInfoDTO(3L, now(), QUEUED, "s1", RequestType.DOCUMENT)); // s1 e3 will time out + publisher.publish(new UploadEventInfoDTO(4L, now(), QUEUED, "s2", RequestType.DOCUMENT)); // s2 e4 will fail + + // s1 warning #1 + causeWarning.accept(1L, "s1"); + + // s1 warning #2 + causeWarning.accept(2L, "s1"); + + // s2 warning #1 + causeWarning.accept(4L, "s2"); + + + LOG.info("Checking that no alert has been issued yet"); + assertNull(alerts.poll(500)); + + // make sure the other events don't time out + publisher.publish(new UploadEventInfoDTO(1L, now(), UPLOADED, "s1", RequestType.DOCUMENT)); // s1 e1 will fail + publisher.publish(new UploadEventInfoDTO(2L, now(), UPLOADED, "s1", RequestType.DOCUMENT)); // s1 e2 will fail + publisher.publish(new UploadEventInfoDTO(4L, now(), UPLOADED, "s2", RequestType.DOCUMENT)); // s2 e4 will fail + + sleep(4000); // waiting for e3 to time out + + publisher.dropClients(); + flinkExecution.get(); + + LOG.info("Collecting Alert event"); + Alert alert = alerts.take(); + assertEquals("Expected only a single alert", 0, alerts.get().size()); + + assertEquals("s1", alert.getServer()); + assertEquals("An alert should comprise three warnings", 3, alert.getWarnings().size()); + + Warning w0 = alert.getWarnings().get(0); + Warning w1 = alert.getWarnings().get(1); + Warning w2 = alert.getWarnings().get(2); + + assertThat(w0, instanceOf(UploadFailedWarning.class)); + assertThat(w1, instanceOf(UploadFailedWarning.class)); + assertThat(w2, instanceOf(UploadTimeoutWarning.class)); + + assertEquals("s1", w0.getServer()); + assertEquals("s1", w1.getServer()); + assertEquals("s1", w2.getServer()); + } + + @Test + public void averageUploadDurationWindow_worksCorrectly() throws Exception { + // makes sure the event is triggered at the correct instant + + Future flinkExecution = initAndExecuteAsync(); + + LOG.info("Waiting for subscribers to connect"); + publisher.waitForClients(); + + sleep(250); + publisher.publish(new UploadEventInfoDTO(1L, now(), QUEUED, "s1", RequestType.DOCUMENT)); + publisher.publish(new UploadEventInfoDTO(2L, now(), QUEUED, "s1", RequestType.DOCUMENT)); + publisher.publish(new UploadEventInfoDTO(3L, now(), QUEUED, "s1", RequestType.DOCUMENT)); + publisher.publish(new UploadEventInfoDTO(4L, now(), QUEUED, "s1", RequestType.DOCUMENT)); + publisher.publish(new UploadEventInfoDTO(5L, now(), QUEUED, "s1", RequestType.DOCUMENT)); + publisher.publish(new UploadEventInfoDTO(6L, now(), QUEUED, "s1", RequestType.DOCUMENT)); + publisher.publish(new UploadEventInfoDTO(7L, now(), QUEUED, "s2", RequestType.DOCUMENT)); + + sleep(100); + publisher.publish(new UploadEventInfoDTO(1L, now(), UPLOADING, "s1", RequestType.DOCUMENT)); + publisher.publish(new UploadEventInfoDTO(2L, now(), UPLOADING, "s1", RequestType.DOCUMENT)); + publisher.publish(new UploadEventInfoDTO(3L, now(), UPLOADING, "s1", RequestType.DOCUMENT)); + publisher.publish(new UploadEventInfoDTO(4L, now(), UPLOADING, "s1", RequestType.DOCUMENT)); + publisher.publish(new UploadEventInfoDTO(5L, now(), UPLOADING, "s1", RequestType.DOCUMENT)); + publisher.publish(new UploadEventInfoDTO(6L, now(), UPLOADING, "s1", RequestType.DOCUMENT)); + publisher.publish(new UploadEventInfoDTO(7L, now(), UPLOADING, "s2", RequestType.DOCUMENT)); + + sleep(100); + publisher.publish(new UploadEventInfoDTO(1L, now(), UPLOADED, "s1", RequestType.DOCUMENT)); + publisher.publish(new UploadEventInfoDTO(2L, now(), UPLOADED, "s1", RequestType.DOCUMENT)); + publisher.publish(new UploadEventInfoDTO(3L, now(), UPLOADED, "s1", RequestType.DOCUMENT)); + publisher.publish(new UploadEventInfoDTO(4L, now(), UPLOADED, "s1", RequestType.DOCUMENT)); + + // the first four events should not trigger anything + LOG.info("Checking AverageUploadDuration events after the first four UPLOADED events of s1"); + assertNull(averageUploadDurations.poll(500)); + + // this event is from a different server + publisher.publish(new UploadEventInfoDTO(7L, now(), UPLOADED, "s2", RequestType.DOCUMENT)); + LOG.info("Checking AverageUploadDuration events after the first UPLOADED event of s2"); + assertNull(averageUploadDurations.poll(500)); + + // fifth event in s1 triggers the window operation + publisher.publish(new UploadEventInfoDTO(5L, now(), UPLOADED, "s1", RequestType.DOCUMENT)); + LOG.info("Collecting AverageUploadDuration event for s1"); + AverageUploadDuration event = averageUploadDurations.take(); + assertNotNull(event); + assertEquals("s1", event.getServer()); + + // should be in a new window and therefore not trigger + publisher.publish(new UploadEventInfoDTO(6L, now(), UPLOADED, "s1", RequestType.DOCUMENT)); + LOG.info("Checking AverageUploadDuration events after the sixth UPLOADED event of s1"); + assertNull(averageUploadDurations.poll(500)); + + publisher.dropClients(); + + flinkExecution.get(); + } + + @Test + public void averageUploadDurations_areCalculatedCorrectly() throws Exception { + // makes sure the keying works properly and that the calculation is done from UploadDuration events + // requires UploadDuration events to be calculated correctly + + Future flinkExecution = initAndExecuteAsync(); + + LOG.info("Waiting for subscribers to connect"); + publisher.waitForClients(); + + List s1Durations = new ArrayList<>(5); + List s2Durations = new ArrayList<>(5); + + sleep(250); + publisher.publish(5, t -> new UploadEventInfoDTO(1L, t, QUEUED, "s1", RequestType.DOCUMENT)); + publisher.publish(5, t -> new UploadEventInfoDTO(2L, t, QUEUED, "s1", RequestType.DOCUMENT)); + publisher.publish(5, t -> new UploadEventInfoDTO(3L, t, QUEUED, "s1", RequestType.DOCUMENT)); + publisher.publish(5, t -> new UploadEventInfoDTO(4L, t, QUEUED, "s1", RequestType.DOCUMENT)); + publisher.publish(5, t -> new UploadEventInfoDTO(5L, t, QUEUED, "s1", RequestType.DOCUMENT)); + + sleep(250); + publisher.publish(5, t -> new UploadEventInfoDTO(6L, t, QUEUED, "s2", RequestType.DOCUMENT)); + publisher.publish(5, t -> new UploadEventInfoDTO(7L, t, QUEUED, "s2", RequestType.DOCUMENT)); + publisher.publish(5, t -> new UploadEventInfoDTO(8L, t, QUEUED, "s2", RequestType.DOCUMENT)); + publisher.publish(5, t -> new UploadEventInfoDTO(9L, t, QUEUED, "s2", RequestType.DOCUMENT)); + publisher.publish(5, t -> new UploadEventInfoDTO(10L, t, QUEUED, "s2", RequestType.DOCUMENT)); + + sleep(125); + publisher.publish(5, t -> new UploadEventInfoDTO(6L, t, UPLOADING, "s2", RequestType.DOCUMENT)); + publisher.publish(5, t -> new UploadEventInfoDTO(7L, t, UPLOADING, "s2", RequestType.DOCUMENT)); + publisher.publish(5, t -> new UploadEventInfoDTO(8L, t, UPLOADING, "s2", RequestType.DOCUMENT)); + publisher.publish(5, t -> new UploadEventInfoDTO(9L, t, UPLOADING, "s2", RequestType.DOCUMENT)); + publisher.publish(5, t -> new UploadEventInfoDTO(10L, t, UPLOADING, "s2", RequestType.DOCUMENT)); + + sleep(125); + publisher.publish(5, t -> new UploadEventInfoDTO(1L, t, UPLOADING, "s1", RequestType.DOCUMENT)); + publisher.publish(5, t -> new UploadEventInfoDTO(2L, t, UPLOADING, "s1", RequestType.DOCUMENT)); + publisher.publish(5, t -> new UploadEventInfoDTO(3L, t, UPLOADING, "s1", RequestType.DOCUMENT)); + publisher.publish(5, t -> new UploadEventInfoDTO(4L, t, UPLOADING, "s1", RequestType.DOCUMENT)); + publisher.publish(5, t -> new UploadEventInfoDTO(5L, t, UPLOADING, "s1", RequestType.DOCUMENT)); + + publisher.publish(5, t -> new UploadEventInfoDTO(6L, t, UPLOADED, "s2", RequestType.DOCUMENT)); + publisher.publish(5, t -> new UploadEventInfoDTO(7L, t, UPLOADED, "s2", RequestType.DOCUMENT)); + publisher.publish(5, t -> new UploadEventInfoDTO(8L, t, UPLOADED, "s2", RequestType.DOCUMENT)); + + LOG.info("Collecting UploadDuration events 1,2,3 for s2"); + s2Durations.addAll(uploadDurations.take(3)); + + sleep(500); + publisher.publish(5, t -> new UploadEventInfoDTO(1L, t, UPLOADED, "s1", RequestType.DOCUMENT)); + publisher.publish(5, t -> new UploadEventInfoDTO(2L, t, UPLOADED, "s1", RequestType.DOCUMENT)); + publisher.publish(5, t -> new UploadEventInfoDTO(3L, t, UPLOADED, "s1", RequestType.DOCUMENT)); + + LOG.info("Collecting UploadDuration events 1,2,3 for s1"); + s1Durations.addAll(uploadDurations.take(3)); + + publisher.publish(5, t -> new UploadEventInfoDTO(9L, t, UPLOADED, "s2", RequestType.DOCUMENT)); + publisher.publish(5, t -> new UploadEventInfoDTO(10L, t, UPLOADED, "s2", RequestType.DOCUMENT)); + + LOG.info("Collecting UploadDuration events 4,5 for s2"); + s2Durations.addAll(uploadDurations.take(2)); + + sleep(500); + publisher.publish(5, t -> new UploadEventInfoDTO(4L, t, UPLOADED, "s1", RequestType.DOCUMENT)); + publisher.publish(5, t -> new UploadEventInfoDTO(5L, t, UPLOADED, "s1", RequestType.DOCUMENT)); + + LOG.info("Collecting UploadDuration events 4,5 for s1"); + s1Durations.addAll(uploadDurations.take(2)); + + LOG.info("Collecting AverageUploadDuration event for s2"); + AverageUploadDuration e0 = averageUploadDurations.take(); // s2 + + LOG.info("Collecting AverageUploadDuration event for s1"); + AverageUploadDuration e1 = averageUploadDurations.take(); // s1 + + assertEquals("Expected calculation for s2 to have been triggered first", e0.getServer(), "s2"); + assertEquals("Expected calculation for s1 to have been triggered second", e1.getServer(), "s1"); + + assertEquals("Wrong number of UploadDuration events for s1", 5, s1Durations.size()); + assertEquals("Wrong number of UploadDuration events for s2", 5, s2Durations.size()); + + double s1Avg = s1Durations.stream().mapToLong(UploadDuration::getDuration).average().orElse(-1); + double s2Avg = s2Durations.stream().mapToLong(UploadDuration::getDuration).average().orElse(-1); + + assertEquals("Average duration was not calculated from UploadDuration events correctly", e0.getDuration(), s2Avg, 100); + assertEquals("Average duration was not calculated from UploadDuration events correctly", e1.getDuration(), s1Avg, 100); + + publisher.dropClients(); + flinkExecution.get(); + } +} diff --git a/ass3-event/src/test/resources/logback.xml b/ass3-event/src/test/resources/logback.xml new file mode 100644 index 0000000..aa08bf5 --- /dev/null +++ b/ass3-event/src/test/resources/logback.xml @@ -0,0 +1,16 @@ + + + + + + %d{yyyy-MM-dd HH:mm:ss.SSS} - %highlight(%5p) [%12.12thread] %cyan(%-40.40logger{39}): %m%n + + + + + + + + + + diff --git a/ass3-messaging/pom.xml b/ass3-messaging/pom.xml new file mode 100644 index 0000000..7626049 --- /dev/null +++ b/ass3-messaging/pom.xml @@ -0,0 +1,41 @@ + + + + 4.0.0 + + + at.ac.tuwien.infosys.dst + dst + 2018.1-SNAPSHOT + .. + + + ass3-messaging + + DST :: Assignment 3 :: Messaging + + jar + + + + com.rabbitmq + amqp-client + + + com.rabbitmq + http-client + + + com.fasterxml.jackson.core + jackson-databind + + + + org.springframework.boot + spring-boot-starter-amqp + test + + + + diff --git a/ass3-messaging/src/main/java/dst/ass3/messaging/Constants.java b/ass3-messaging/src/main/java/dst/ass3/messaging/Constants.java new file mode 100644 index 0000000..1340c4f --- /dev/null +++ b/ass3-messaging/src/main/java/dst/ass3/messaging/Constants.java @@ -0,0 +1,37 @@ +package dst.ass3.messaging; + +/** + * Contains several constants related to the RabbitMQ infrastructure and expected names for queues, exchanges and + * routing keys. + */ +public final class Constants { + + public static final String RMQ_HOST = "192.168.99.99"; + public static final String RMQ_PORT = "5672"; + public static final String RMQ_VHOST = "/"; + public static final String RMQ_USER = "dst"; + public static final String RMQ_PASSWORD = "dst"; + + public static final String RMQ_API_PORT = "15672"; + public static final String RMQ_API_URL = "http://" + RMQ_HOST + ":" + RMQ_API_PORT + "/api/"; + + public static final String QUEUE_QUIZ = "dst.quiz"; + public static final String QUEUE_DOCUMENT = "dst.document"; + public static final String QUEUE_VIDEO = "dst.video"; + + public static final String[] WORK_QUEUES = { + QUEUE_QUIZ, + QUEUE_DOCUMENT, + QUEUE_VIDEO + }; + + public static final String TOPIC_EXCHANGE = "dst.workers"; + + public static final String ROUTING_KEY_QUIZ = "requests.quiz"; + public static final String ROUTING_KEY_DOCUMENT = "requests.document"; + public static final String ROUTING_KEY_VIDEO = "requests.video"; + + private Constants() { + // util class + } +} diff --git a/ass3-messaging/src/main/java/dst/ass3/messaging/IMessagingFactory.java b/ass3-messaging/src/main/java/dst/ass3/messaging/IMessagingFactory.java new file mode 100644 index 0000000..d0ee0e1 --- /dev/null +++ b/ass3-messaging/src/main/java/dst/ass3/messaging/IMessagingFactory.java @@ -0,0 +1,21 @@ +package dst.ass3.messaging; + +import java.io.Closeable; +import java.io.IOException; + +public interface IMessagingFactory extends Closeable { + + IQueueManager createQueueManager(); + + IRequestGateway createRequestGateway(); + + IWorkloadMonitor createWorkloadMonitor(); + + /** + * Closes any resource the factory may create. + * + * @throws IOException propagated exceptions + */ + @Override + void close() throws IOException; +} diff --git a/ass3-messaging/src/main/java/dst/ass3/messaging/IQueueManager.java b/ass3-messaging/src/main/java/dst/ass3/messaging/IQueueManager.java new file mode 100644 index 0000000..542d0cb --- /dev/null +++ b/ass3-messaging/src/main/java/dst/ass3/messaging/IQueueManager.java @@ -0,0 +1,28 @@ +package dst.ass3.messaging; + +import java.io.Closeable; +import java.io.IOException; + +/** + * Responsible for creating and tearing down all necessary RabbitMQ queues or exchanges necessary for running the system. + */ +public interface IQueueManager extends Closeable { + + /** + * Initializes all queues or topic exchanges necessary for running the system. + */ + void setUp(); + + /** + * Removes all queues or topic exchanged associated with the system. + */ + void tearDown(); + + /** + * Closes underlying conection or resources, if any. + * + * @throws IOException propagated exceptions + */ + @Override + void close() throws IOException; +} diff --git a/ass3-messaging/src/main/java/dst/ass3/messaging/IRequestGateway.java b/ass3-messaging/src/main/java/dst/ass3/messaging/IRequestGateway.java new file mode 100644 index 0000000..786ce99 --- /dev/null +++ b/ass3-messaging/src/main/java/dst/ass3/messaging/IRequestGateway.java @@ -0,0 +1,22 @@ +package dst.ass3.messaging; + +import java.io.Closeable; +import java.io.IOException; + +public interface IRequestGateway extends Closeable { + + /** + * Serializes and routes a request to the correct queue. + * + * @param request the request + */ + void uploadRequest(UploadRequest request); + + /** + * Closes any resources that may have been initialized (connections, channels, etc.) + * + * @throws IOException propagated exceptions + */ + @Override + void close() throws IOException; +} diff --git a/ass3-messaging/src/main/java/dst/ass3/messaging/IWorkloadMonitor.java b/ass3-messaging/src/main/java/dst/ass3/messaging/IWorkloadMonitor.java new file mode 100644 index 0000000..5cfc224 --- /dev/null +++ b/ass3-messaging/src/main/java/dst/ass3/messaging/IWorkloadMonitor.java @@ -0,0 +1,34 @@ +package dst.ass3.messaging; + +import java.io.Closeable; +import java.io.IOException; +import java.util.Map; + +public interface IWorkloadMonitor extends Closeable { + + /** + * Returns for each request type the amount of waiting requests. + * + * @return a map + */ + Map getRequestCount(); + + /** + * Returns the amount of workers for each request type. This can be deduced from the amount of consumers to each + * queue. + * + * @return a map + */ + Map getWorkerCount(); + + /** + * Returns for each request type the average processing time of the last 10 recorded requests. The data comes from + * subscriptions to the respective topics. + * + * @return a map + */ + Map getAverageProcessingTime(); + + @Override + void close() throws IOException; +} diff --git a/ass3-messaging/src/main/java/dst/ass3/messaging/RequestType.java b/ass3-messaging/src/main/java/dst/ass3/messaging/RequestType.java new file mode 100644 index 0000000..7aa21b5 --- /dev/null +++ b/ass3-messaging/src/main/java/dst/ass3/messaging/RequestType.java @@ -0,0 +1,7 @@ +package dst.ass3.messaging; + +public enum RequestType { + VIDEO, + QUIZ, + DOCUMENT +} diff --git a/ass3-messaging/src/main/java/dst/ass3/messaging/UploadRequest.java b/ass3-messaging/src/main/java/dst/ass3/messaging/UploadRequest.java new file mode 100644 index 0000000..c9227fd --- /dev/null +++ b/ass3-messaging/src/main/java/dst/ass3/messaging/UploadRequest.java @@ -0,0 +1,71 @@ +package dst.ass3.messaging; + +import java.util.Objects; + +public class UploadRequest { + + private String id; + private RequestType type; + private String urn; + + public UploadRequest() { + } + + public UploadRequest(String id, RequestType type, String urn) { + this.id = id; + this.type = type; + this.urn = urn; + } + + public String getId() { + return id; + } + + public void setId(String id) { + this.id = id; + } + + public RequestType getType() { + return type; + } + + public void setType(RequestType type) { + this.type = type; + } + + public String getUrn() { + return urn; + } + + public void setUrn(String urn) { + this.urn = urn; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + UploadRequest that = (UploadRequest) o; + return Objects.equals(id, that.id) && + Objects.equals(type, that.type) && + Objects.equals(urn, that.urn); + } + + @Override + public int hashCode() { + return Objects.hash(id, type, urn); + } + + @Override + public String toString() { + return "UploadRequest{" + + "id='" + id + '\'' + + ", type='" + type + '\'' + + ", urn='" + urn + '\'' + + '}'; + } +} diff --git a/ass3-messaging/src/main/java/dst/ass3/messaging/WorkerResponse.java b/ass3-messaging/src/main/java/dst/ass3/messaging/WorkerResponse.java new file mode 100644 index 0000000..ce89cd1 --- /dev/null +++ b/ass3-messaging/src/main/java/dst/ass3/messaging/WorkerResponse.java @@ -0,0 +1,61 @@ +package dst.ass3.messaging; + +import java.util.Objects; + +/** + * Message sent by a worker after it is finished processing a request. + */ +public class WorkerResponse { + + /** + * The ID of the original {@link UploadRequest}. + */ + private String requestId; + + /** + * The time it took to process the request (in milliseconds). + */ + private Long processingTime; + + public String getRequestId() { + return requestId; + } + + public void setRequestId(String requestId) { + this.requestId = requestId; + } + + public Long getProcessingTime() { + return processingTime; + } + + public void setProcessingTime(Long processingTime) { + this.processingTime = processingTime; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + WorkerResponse that = (WorkerResponse) o; + return Objects.equals(requestId, that.requestId) && + Objects.equals(processingTime, that.processingTime); + } + + @Override + public int hashCode() { + return Objects.hash(requestId, processingTime); + } + + @Override + public String toString() { + return "WorkerResponse{" + + "requestId='" + requestId + '\'' + + ", processingTime=" + processingTime + + '}'; + } +} diff --git a/ass3-messaging/src/main/java/dst/ass3/messaging/impl/MessagingFactory.java b/ass3-messaging/src/main/java/dst/ass3/messaging/impl/MessagingFactory.java new file mode 100644 index 0000000..4ecc2c4 --- /dev/null +++ b/ass3-messaging/src/main/java/dst/ass3/messaging/impl/MessagingFactory.java @@ -0,0 +1,32 @@ +package dst.ass3.messaging.impl; + +import dst.ass3.messaging.IMessagingFactory; +import dst.ass3.messaging.IQueueManager; +import dst.ass3.messaging.IRequestGateway; +import dst.ass3.messaging.IWorkloadMonitor; + +public class MessagingFactory implements IMessagingFactory { + + @Override + public IQueueManager createQueueManager() { + // TODO + return null; + } + + @Override + public IRequestGateway createRequestGateway() { + // TODO + return null; + } + + @Override + public IWorkloadMonitor createWorkloadMonitor() { + // TODO + return null; + } + + @Override + public void close() { + // implement if needed + } +} diff --git a/ass3-messaging/src/main/resources/logback.xml b/ass3-messaging/src/main/resources/logback.xml new file mode 100644 index 0000000..9a6b351 --- /dev/null +++ b/ass3-messaging/src/main/resources/logback.xml @@ -0,0 +1,16 @@ + + + + + + %d{yyyy-MM-dd HH:mm:ss.SSS} - %highlight(%5p) [%12.12thread] %cyan(%-40.40logger{39}): %m%n + + + + + + + + + + diff --git a/ass3-messaging/src/test/java/dst/ass3/messaging/RabbitResource.java b/ass3-messaging/src/test/java/dst/ass3/messaging/RabbitResource.java new file mode 100644 index 0000000..f1e125e --- /dev/null +++ b/ass3-messaging/src/test/java/dst/ass3/messaging/RabbitResource.java @@ -0,0 +1,48 @@ +package dst.ass3.messaging; + +import org.junit.rules.ExternalResource; +import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; +import org.springframework.amqp.rabbit.core.RabbitAdmin; +import org.springframework.amqp.rabbit.core.RabbitManagementTemplate; +import org.springframework.amqp.rabbit.core.RabbitTemplate; + +public class RabbitResource extends ExternalResource { + + private RabbitAdmin admin; + private RabbitManagementTemplate manager; + private CachingConnectionFactory connectionFactory; + private RabbitTemplate client; + + @Override + protected void before() throws Throwable { + manager = new RabbitManagementTemplate(Constants.RMQ_API_URL, Constants.RMQ_USER, Constants.RMQ_PASSWORD); + + connectionFactory = new CachingConnectionFactory(Constants.RMQ_HOST); + connectionFactory.setUsername(Constants.RMQ_USER); + connectionFactory.setPassword(Constants.RMQ_PASSWORD); + + client = new RabbitTemplate(connectionFactory); + admin = new RabbitAdmin(connectionFactory); + } + + @Override + protected void after() { + connectionFactory.destroy(); + } + + public RabbitManagementTemplate getManager() { + return manager; + } + + public RabbitTemplate getClient() { + return client; + } + + public RabbitAdmin getAdmin() { + return admin; + } + + public CachingConnectionFactory getConnectionFactory() { + return connectionFactory; + } +} diff --git a/ass3-messaging/src/test/java/dst/ass3/messaging/impl/Ass3_1_Suite.java b/ass3-messaging/src/test/java/dst/ass3/messaging/impl/Ass3_1_Suite.java new file mode 100644 index 0000000..03a6159 --- /dev/null +++ b/ass3-messaging/src/test/java/dst/ass3/messaging/impl/Ass3_1_Suite.java @@ -0,0 +1,13 @@ +package dst.ass3.messaging.impl; + +import org.junit.runner.RunWith; +import org.junit.runners.Suite; + +@RunWith(Suite.class) +@Suite.SuiteClasses({ + QueueManagerTest.class, + RequestGatewayTest.class, + WorkloadMonitorTest.class +}) +public class Ass3_1_Suite { +} diff --git a/ass3-messaging/src/test/java/dst/ass3/messaging/impl/QueueManagerTest.java b/ass3-messaging/src/test/java/dst/ass3/messaging/impl/QueueManagerTest.java new file mode 100644 index 0000000..c0bbd08 --- /dev/null +++ b/ass3-messaging/src/test/java/dst/ass3/messaging/impl/QueueManagerTest.java @@ -0,0 +1,98 @@ +package dst.ass3.messaging.impl; + +import static dst.ass3.messaging.Constants.RMQ_VHOST; +import static dst.ass3.messaging.Constants.TOPIC_EXCHANGE; +import static dst.ass3.messaging.Constants.WORK_QUEUES; +import static org.hamcrest.CoreMatchers.*; +import static org.junit.Assert.assertThat; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.Timeout; +import org.springframework.amqp.core.Exchange; +import org.springframework.amqp.core.Queue; + +import dst.ass3.messaging.IMessagingFactory; +import dst.ass3.messaging.IQueueManager; +import dst.ass3.messaging.RabbitResource; + +public class QueueManagerTest { + + @Rule + public RabbitResource rabbit = new RabbitResource(); + + @Rule + public Timeout timeout = new Timeout(10, TimeUnit.SECONDS); + + private IMessagingFactory factory = new MessagingFactory(); + private IQueueManager queueManager; + + @Before + public void setUp() throws Exception { + factory = new MessagingFactory(); + queueManager = factory.createQueueManager(); + } + + @After + public void tearDown() throws Exception { + try { + queueManager.close(); + } catch (IOException e) { + // ignore + } + } + + @Test + public void setUp_createsQueues() throws Exception { + queueManager.setUp(); + + try { + List queues = rabbit.getManager().getQueues(); + assertThat(queues.size(), not(0)); + + // make sure all work queues exist + List queueNames = queues.stream().map(Queue::getName).collect(Collectors.toList()); + Arrays.stream(WORK_QUEUES) + .forEach(wq -> assertThat(queueNames, hasItem(wq))); + } finally { + queueManager.tearDown(); + } + } + + @Test + public void setUp_createsExchange() throws Exception { + queueManager.setUp(); + try { + Exchange exchange = rabbit.getManager().getExchange(RMQ_VHOST, TOPIC_EXCHANGE); + assertThat(exchange, notNullValue()); + } finally { + queueManager.tearDown(); + } + } + + @Test + public void tearDown_removesQueues() throws Exception { + queueManager.setUp(); + queueManager.tearDown(); + List queues = rabbit.getManager().getQueues(); + List queueNames = queues.stream().map(Queue::getName).collect(Collectors.toList()); + Arrays.stream(WORK_QUEUES) + .forEach(wq -> assertThat(queueNames, not(hasItem(wq)))); + } + + @Test + public void tearDown_removesExchange() throws Exception { + queueManager.setUp(); + queueManager.tearDown(); + Exchange exchange = rabbit.getManager().getExchange(RMQ_VHOST, TOPIC_EXCHANGE); + assertThat(exchange, nullValue()); + } +} \ No newline at end of file diff --git a/ass3-messaging/src/test/java/dst/ass3/messaging/impl/RequestGatewayTest.java b/ass3-messaging/src/test/java/dst/ass3/messaging/impl/RequestGatewayTest.java new file mode 100644 index 0000000..c497181 --- /dev/null +++ b/ass3-messaging/src/test/java/dst/ass3/messaging/impl/RequestGatewayTest.java @@ -0,0 +1,119 @@ +package dst.ass3.messaging.impl; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.notNullValue; +import static org.hamcrest.CoreMatchers.nullValue; +import static org.junit.Assert.assertThat; + +import java.util.concurrent.TimeUnit; + +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.Timeout; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.amqp.core.Message; + +import dst.ass3.messaging.Constants; +import dst.ass3.messaging.IMessagingFactory; +import dst.ass3.messaging.IQueueManager; +import dst.ass3.messaging.IRequestGateway; +import dst.ass3.messaging.RabbitResource; +import dst.ass3.messaging.RequestType; +import dst.ass3.messaging.UploadRequest; + +public class RequestGatewayTest { + + private static final Logger LOG = LoggerFactory.getLogger(RequestGatewayTest.class); + + @Rule + public RabbitResource rabbit = new RabbitResource(); + + @Rule + public Timeout timeout = new Timeout(10, TimeUnit.SECONDS); + + private IMessagingFactory factory; + private IQueueManager queueManager; + private IRequestGateway requestGateway; + + @Before + public void setUp() throws Exception { + factory = new MessagingFactory(); + queueManager = factory.createQueueManager(); + requestGateway = factory.createRequestGateway(); + + queueManager.setUp(); + } + + @After + public void tearDown() throws Exception { + queueManager.tearDown(); + + requestGateway.close(); + queueManager.close(); + factory.close(); + } + + @Test + public void uploadRequest_routesRequestsToCorrectQueues() throws Exception { + UploadRequest r1 = new UploadRequest("id1", RequestType.DOCUMENT, "urn:material:document:1"); + UploadRequest r2 = new UploadRequest("id2", RequestType.DOCUMENT, "urn:material:document:2"); + UploadRequest r3 = new UploadRequest("id3", RequestType.VIDEO, "urn:material:video:1"); + + LOG.info("Sending request {}", r1); + requestGateway.uploadRequest(r1); + LOG.info("Sending request {}", r2); + requestGateway.uploadRequest(r2); + LOG.info("Sending request {}", r3); + requestGateway.uploadRequest(r3); + + LOG.info("Taking request from queue {}", Constants.QUEUE_DOCUMENT); + Message m1 = rabbit.getClient().receive(Constants.QUEUE_DOCUMENT, 1000); + assertThat(m1, notNullValue()); + + LOG.info("Taking request from queue {}", Constants.QUEUE_DOCUMENT); + Message m2 = rabbit.getClient().receive(Constants.QUEUE_DOCUMENT, 1000); + assertThat(m2, notNullValue()); + + LOG.info("Taking request from queue {}", Constants.QUEUE_VIDEO); + Message m3 = rabbit.getClient().receive(Constants.QUEUE_VIDEO, 1000); + assertThat(m3, notNullValue()); + + assertThat("Expected queue to be empty as no request for that type were issued", + rabbit.getClient().receive(Constants.QUEUE_QUIZ, 1000), nullValue()); + } + + @Test + public void uploadRequest_serializesIntoJsonFormat() throws Exception { + UploadRequest r1 = new UploadRequest("id1", RequestType.QUIZ, "urn:material:quiz:1"); + UploadRequest r2 = new UploadRequest("id2", RequestType.DOCUMENT, "urn:material:document:1"); + UploadRequest r3 = new UploadRequest("id3", RequestType.VIDEO, "urn:material:video:1"); + + LOG.info("Sending request {}", r1); + requestGateway.uploadRequest(r1); + LOG.info("Sending request {}", r2); + requestGateway.uploadRequest(r2); + LOG.info("Sending request {}", r3); + requestGateway.uploadRequest(r3); + + LOG.info("Taking request from queue {}", Constants.QUEUE_QUIZ); + Message m1 = rabbit.getClient().receive(Constants.QUEUE_QUIZ, 1000); + assertThat(m1, notNullValue()); + assertThat(new String(m1.getBody()), + equalTo("{\"id\":\"id1\",\"type\":\"QUIZ\",\"urn\":\"urn:material:quiz:1\"}")); + + LOG.info("Taking request from queue {}", Constants.QUEUE_DOCUMENT); + Message m2 = rabbit.getClient().receive(Constants.QUEUE_DOCUMENT, 1000); + assertThat(m2, notNullValue()); + assertThat(new String(m2.getBody()), + equalTo("{\"id\":\"id2\",\"type\":\"DOCUMENT\",\"urn\":\"urn:material:document:1\"}")); + + LOG.info("Taking request from queue {}", Constants.QUEUE_VIDEO); + Message m3 = rabbit.getClient().receive(Constants.QUEUE_VIDEO, 1000); + assertThat(m3, notNullValue()); + assertThat(new String(m3.getBody()), + equalTo("{\"id\":\"id3\",\"type\":\"VIDEO\",\"urn\":\"urn:material:video:1\"}")); + } +} \ No newline at end of file diff --git a/ass3-messaging/src/test/java/dst/ass3/messaging/impl/WorkloadMonitorTest.java b/ass3-messaging/src/test/java/dst/ass3/messaging/impl/WorkloadMonitorTest.java new file mode 100644 index 0000000..c0cd9bf --- /dev/null +++ b/ass3-messaging/src/test/java/dst/ass3/messaging/impl/WorkloadMonitorTest.java @@ -0,0 +1,167 @@ +package dst.ass3.messaging.impl; + +import dst.ass3.messaging.*; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.Timeout; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.amqp.rabbit.listener.MessageListenerContainer; +import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; +import org.springframework.context.Lifecycle; + +import java.util.*; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; + +import static dst.ass3.messaging.Constants.TOPIC_EXCHANGE; +import static dst.ass3.messaging.Constants.WORK_QUEUES; +import static org.hamcrest.CoreMatchers.*; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.junit.Assert.assertThat; + +public class WorkloadMonitorTest { + + private static final Logger LOG = LoggerFactory.getLogger(RequestGatewayTest.class); + + @Rule + public RabbitResource rabbit = new RabbitResource(); + + @Rule + public Timeout timeout = new Timeout(30, TimeUnit.SECONDS); + + private IMessagingFactory factory; + private IQueueManager queueManager; + private IRequestGateway requestGateway; + private IWorkloadMonitor workloadMonitor; + + @Before + public void setUp() throws Exception { + factory = new MessagingFactory(); + queueManager = factory.createQueueManager(); + requestGateway = factory.createRequestGateway(); + + queueManager.setUp(); + + workloadMonitor = factory.createWorkloadMonitor(); + } + + @After + public void tearDown() throws Exception { + queueManager.tearDown(); + + requestGateway.close(); + queueManager.close(); + factory.close(); + } + + @Test + public void getRequestCount_returnsCorrectCount() throws Exception { + try { + Map countForType = new HashMap<>(); + for (RequestType type : RequestType.values()) { + countForType.put(type, ThreadLocalRandom.current().nextLong(10, 20 + 1)); + for (long i = 0; i < countForType.get(type); i++) { + UploadRequest request = new UploadRequest("id" + i, type, + "urn:material:" + type.name().toLowerCase() + i); + LOG.info("Sending request {}", request); + requestGateway.uploadRequest(request); + } + } + + // wait for the messages to be processed by rabbit + Thread.sleep(2000); + + assertThat(workloadMonitor.getRequestCount(), equalTo(countForType)); + } finally { + workloadMonitor.close(); + } + } + + @Test + public void multipleWorkloadMonitors_uniqueQueueForEachMonitor() throws Exception { + try (IWorkloadMonitor workloadMonitor2 = factory.createWorkloadMonitor(); + IWorkloadMonitor workloadMonitor3 = factory.createWorkloadMonitor();) { + long nonWorkQueues = rabbit.getManager().getQueues().stream().filter(q -> !Arrays.asList(WORK_QUEUES).contains(q.getName())).count(); + assertThat(nonWorkQueues, greaterThanOrEqualTo(3L)); + } finally { + workloadMonitor.close(); + } + } + + @Test + public void getAverageProcessingTime_correctAverageTime() throws Exception { + try { + Map avgTimes = new HashMap<>(); + for (RequestType type : RequestType.values()) { + long count = ThreadLocalRandom.current().nextLong(15, 25); + long typeTime = 0; + for (long i = 0; i < count; i++) { + long requestTime = ThreadLocalRandom.current().nextLong(1000, 20000 + 1); + if (count - i <= 10) { + typeTime += requestTime; + } + + String body = String.format("{\"requestId\": \"%s\", \"processingTime\": \"%d\"}", UUID.randomUUID(), requestTime); + LOG.info("Sending request {}", body); + rabbit.getClient().convertAndSend(TOPIC_EXCHANGE, "requests." + type.toString().toLowerCase(), body); + } + avgTimes.put(type, ((double)typeTime / 10)); + } + + // wait for the messages to be processed by rabbit + Thread.sleep(2000); + + assertThat(workloadMonitor.getAverageProcessingTime(), equalTo(avgTimes)); + } finally { + workloadMonitor.close(); + } + } + + @Test + public void getWorkerCount_returnsCorrectCount() throws Exception { + try { + // spawn a random number of consumers + Map> consumersForType = new HashMap<>(); + Map consumerCountForType = new HashMap<>(); + for (RequestType type : RequestType.values()) { + List consumers = new ArrayList<>(); + consumersForType.put(type, consumers); + consumerCountForType.put(type, ThreadLocalRandom.current().nextLong(10, 20 + 1)); + for (long i = 0; i < consumerCountForType.get(type); i++) { + consumers.add(spawnConsumer("dst." + type.toString().toLowerCase())); + } + } + + // wait for rabbit to get to know the new consumers + Thread.sleep(2000); + + Map workerCount = workloadMonitor.getWorkerCount(); + + // stop all consumers + consumersForType.entrySet().stream().map(Map.Entry::getValue).flatMap(Collection::stream).forEach(Lifecycle::stop); + + assertThat(workerCount, equalTo(consumerCountForType)); + } finally { + workloadMonitor.close(); + } + } + + private MessageListenerContainer spawnConsumer(String queue) { + SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(rabbit.getConnectionFactory()); + container.addQueueNames(queue); + container.start(); + return container; + } + + @Test + public void close_removesQueues() throws Exception { + workloadMonitor.close(); + + List queues = rabbit.getManager().getQueues(); + long nonWorkQueues = rabbit.getManager().getQueues().stream().filter(q -> !Arrays.asList(WORK_QUEUES).contains(q.getName())).count(); + assertThat(nonWorkQueues, is(0L)); + } +} \ No newline at end of file diff --git a/ass3-worker/Dockerfile b/ass3-worker/Dockerfile new file mode 100644 index 0000000..f87f5c1 --- /dev/null +++ b/ass3-worker/Dockerfile @@ -0,0 +1 @@ +# TODO \ No newline at end of file diff --git a/ass3-worker/worker.py b/ass3-worker/worker.py new file mode 100644 index 0000000..f87f5c1 --- /dev/null +++ b/ass3-worker/worker.py @@ -0,0 +1 @@ +# TODO \ No newline at end of file diff --git a/pom.xml b/pom.xml index eb160e3..f8490c5 100644 --- a/pom.xml +++ b/pom.xml @@ -5,7 +5,7 @@ at.ac.tuwien.infosys.dst dst - 2018.1 + 2018.1-SNAPSHOT DST :: Parent @@ -189,6 +189,12 @@ ${hamcrest.version} test + + org.mockito + mockito-core + ${mockito.version} + test + @@ -327,6 +333,52 @@ ${spring-boot.version} + + + org.apache.flink + flink-streaming-java_2.11 + ${flink.version} + + + org.apache.flink + flink-cep_2.11 + ${flink.version} + + + com.github.docker-java + docker-java + ${docker-api-client.version} + + + com.rabbitmq + amqp-client + ${rabbitmq-client.version} + + + com.rabbitmq + http-client + ${rabbitmq-http.version} + + + org.springframework.boot + spring-boot-starter-amqp + ${spring-boot.version} + + + com.fasterxml.jackson.core + jackson-core + ${jackson.version} + + + com.fasterxml.jackson.core + jackson-annotations + ${jackson.version} + + + com.fasterxml.jackson.core + jackson-databind + ${jackson.version} + @@ -335,16 +387,8 @@ all - ass1-jpa - ass1-doc - ass1-kv - ass2-service/api - ass2-service/auth-client - ass2-service/auth - ass2-service/courseplan - ass2-service/facade - ass2-aop - ass2-di + ass3-messaging + ass3-event @@ -440,6 +484,18 @@ + + ass3-event + + ass3-event + + + + ass3-messaging + + ass3-messaging + + @@ -460,6 +516,7 @@ 3.7 4.12 1.3 + 2.18.3 5.2.13.Final 5.3.6.Final @@ -477,6 +534,14 @@ 1.10.1 0.5.1 1.5.0.Final + + 1.4.2 + 3.0.14 + 1.5.1 + 1.4 + 1.3.1.RELEASE + 4.0.3 + 2.9.5 -- 2.43.0