From d216161862b40937cbe3837a4d3eca991f2d789d Mon Sep 17 00:00:00 2001
From: Jan Vales <jan@jvales.net>
Date: Tue, 5 Jun 2018 03:08:00 +0200
Subject: [PATCH] [3.1] messaging. need refactoring?

---
 .../ass3/messaging/impl/MessagingFactory.java |  9 +--
 .../dst/ass3/messaging/impl/QueueManager.java | 70 +++++++++++++++++++
 .../ass3/messaging/impl/RequestGateway.java   | 65 +++++++++++++++++
 .../ass3/messaging/impl/WorkloadMonitor.java  | 53 ++++++++++++++
 4 files changed, 191 insertions(+), 6 deletions(-)
 create mode 100644 ass3-messaging/src/main/java/dst/ass3/messaging/impl/QueueManager.java
 create mode 100644 ass3-messaging/src/main/java/dst/ass3/messaging/impl/RequestGateway.java
 create mode 100644 ass3-messaging/src/main/java/dst/ass3/messaging/impl/WorkloadMonitor.java

diff --git a/ass3-messaging/src/main/java/dst/ass3/messaging/impl/MessagingFactory.java b/ass3-messaging/src/main/java/dst/ass3/messaging/impl/MessagingFactory.java
index 4ecc2c4..450724b 100644
--- a/ass3-messaging/src/main/java/dst/ass3/messaging/impl/MessagingFactory.java
+++ b/ass3-messaging/src/main/java/dst/ass3/messaging/impl/MessagingFactory.java
@@ -9,20 +9,17 @@ public class MessagingFactory implements IMessagingFactory {
 
     @Override
     public IQueueManager createQueueManager() {
-        // TODO
-        return null;
+        return new QueueManager();
     }
 
     @Override
     public IRequestGateway createRequestGateway() {
-        // TODO
-        return null;
+        return new RequestGateway();
     }
 
     @Override
     public IWorkloadMonitor createWorkloadMonitor() {
-        // TODO
-        return null;
+        return new WorkloadMonitor();
     }
 
     @Override
diff --git a/ass3-messaging/src/main/java/dst/ass3/messaging/impl/QueueManager.java b/ass3-messaging/src/main/java/dst/ass3/messaging/impl/QueueManager.java
new file mode 100644
index 0000000..4c34145
--- /dev/null
+++ b/ass3-messaging/src/main/java/dst/ass3/messaging/impl/QueueManager.java
@@ -0,0 +1,70 @@
+package dst.ass3.messaging.impl;
+
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.Connection;
+import com.rabbitmq.client.ConnectionFactory;
+import dst.ass3.messaging.Constants;
+import dst.ass3.messaging.IQueueManager;
+
+import java.io.IOException;
+import java.util.concurrent.TimeoutException;
+
+public class QueueManager implements IQueueManager {
+    @Override
+    public void setUp() {
+        ConnectionFactory factory = new ConnectionFactory();
+        factory.setHost(Constants.RMQ_HOST);
+        factory.setPort(Integer.parseInt(Constants.RMQ_PORT));
+        factory.setUsername(Constants.RMQ_USER);
+        factory.setPassword(Constants.RMQ_PASSWORD);
+
+        try {
+            Connection conn = factory.newConnection();
+            Channel channel = conn.createChannel();
+            channel.exchangeDeclare(Constants.TOPIC_EXCHANGE, "direct", true);
+
+            for (String qname : Constants.WORK_QUEUES) {
+                channel.queueDeclare(qname, true, false, false, null);
+            }
+
+            //some manual work needed: routing keys not loopable :(
+            channel.queueBind(Constants.QUEUE_DOCUMENT, Constants.TOPIC_EXCHANGE, Constants.ROUTING_KEY_DOCUMENT);
+            channel.queueBind(Constants.QUEUE_QUIZ, Constants.TOPIC_EXCHANGE, Constants.ROUTING_KEY_QUIZ);
+            channel.queueBind(Constants.QUEUE_VIDEO, Constants.TOPIC_EXCHANGE, Constants.ROUTING_KEY_VIDEO);
+
+            channel.close();
+            conn.close();
+        } catch (IOException | TimeoutException e) {
+            throw new RuntimeException("Irrecoverable error", e); // Fail horribly
+        }
+    }
+
+    @Override
+    public void tearDown() {
+        ConnectionFactory factory = new ConnectionFactory();
+        factory.setHost(Constants.RMQ_HOST);
+        factory.setPort(Integer.parseInt(Constants.RMQ_PORT));
+        factory.setUsername(Constants.RMQ_USER);
+        factory.setPassword(Constants.RMQ_PASSWORD);
+
+        try {
+            Connection conn = factory.newConnection();
+            Channel channel = conn.createChannel();
+            channel.exchangeDelete(Constants.TOPIC_EXCHANGE);
+
+            for (String qname : Constants.WORK_QUEUES) {
+                channel.queueDelete(qname);
+            }
+
+            channel.close();
+            conn.close();
+        } catch (IOException | TimeoutException e) {
+            throw new RuntimeException("Irrecoverable error", e); // Fail horribly
+        }
+    }
+
+    @Override
+    public void close() throws IOException {
+        tearDown(); // TODO: unsure if needed.
+    }
+}
diff --git a/ass3-messaging/src/main/java/dst/ass3/messaging/impl/RequestGateway.java b/ass3-messaging/src/main/java/dst/ass3/messaging/impl/RequestGateway.java
new file mode 100644
index 0000000..13b435b
--- /dev/null
+++ b/ass3-messaging/src/main/java/dst/ass3/messaging/impl/RequestGateway.java
@@ -0,0 +1,65 @@
+package dst.ass3.messaging.impl;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.Connection;
+import com.rabbitmq.client.ConnectionFactory;
+import com.rabbitmq.client.MessageProperties;
+import dst.ass3.messaging.Constants;
+import dst.ass3.messaging.IRequestGateway;
+import dst.ass3.messaging.UploadRequest;
+
+import java.io.IOException;
+import java.util.concurrent.TimeoutException;
+
+public class RequestGateway implements IRequestGateway {
+    Connection conn;
+    Channel channel;
+
+    public RequestGateway() {
+        ConnectionFactory factory = new ConnectionFactory();
+        factory.setHost(Constants.RMQ_HOST);
+        factory.setPort(Integer.parseInt(Constants.RMQ_PORT));
+        factory.setUsername(Constants.RMQ_USER);
+        factory.setPassword(Constants.RMQ_PASSWORD);
+
+        try {
+            conn = factory.newConnection();
+            channel = conn.createChannel();
+        } catch (IOException | TimeoutException e) {
+            throw new RuntimeException("Irrecoverable error", e); // Fail horribly
+        }
+    }
+
+    @Override
+    public void uploadRequest(UploadRequest request) {
+        String routingKey = null;
+        switch (request.getType()) {
+            case DOCUMENT:
+                routingKey = Constants.ROUTING_KEY_DOCUMENT;
+                break;
+            case QUIZ:
+                routingKey = Constants.ROUTING_KEY_QUIZ;
+                break;
+            case VIDEO:
+                routingKey = Constants.ROUTING_KEY_VIDEO;
+                break;
+        }
+
+        ObjectMapper mapper = new ObjectMapper();
+        try {
+            channel.basicPublish(Constants.TOPIC_EXCHANGE, routingKey, false, false, MessageProperties.BASIC, mapper.writeValueAsBytes(request));
+        } catch (IOException e) {
+            e.printStackTrace(); // TODO: may not be irrecoverable!
+        }
+    }
+
+    @Override
+    public void close() throws IOException {
+        try {
+            channel.close();
+        } catch (TimeoutException ignored) {
+        }
+        conn.close();
+    }
+}
diff --git a/ass3-messaging/src/main/java/dst/ass3/messaging/impl/WorkloadMonitor.java b/ass3-messaging/src/main/java/dst/ass3/messaging/impl/WorkloadMonitor.java
new file mode 100644
index 0000000..6175db2
--- /dev/null
+++ b/ass3-messaging/src/main/java/dst/ass3/messaging/impl/WorkloadMonitor.java
@@ -0,0 +1,53 @@
+package dst.ass3.messaging.impl;
+
+import com.rabbitmq.http.client.Client;
+import dst.ass3.messaging.Constants;
+import dst.ass3.messaging.IWorkloadMonitor;
+import dst.ass3.messaging.RequestType;
+
+import java.io.IOException;
+import java.net.MalformedURLException;
+import java.net.URISyntaxException;
+import java.util.HashMap;
+import java.util.Map;
+
+
+public class WorkloadMonitor implements IWorkloadMonitor {
+    @Override
+    public Map<RequestType, Long> getRequestCount() {
+        HashMap<RequestType, Long> ret = new HashMap<>();
+        try {
+            Client c = new Client(Constants.RMQ_API_URL, Constants.RMQ_USER, Constants.RMQ_PASSWORD);
+            ret.put(RequestType.DOCUMENT, c.getQueue(Constants.RMQ_VHOST, Constants.QUEUE_DOCUMENT).getMessagesReady());
+            ret.put(RequestType.QUIZ, c.getQueue(Constants.RMQ_VHOST, Constants.QUEUE_QUIZ).getMessagesReady());
+            ret.put(RequestType.VIDEO, c.getQueue(Constants.RMQ_VHOST, Constants.QUEUE_VIDEO).getMessagesReady());
+        } catch (MalformedURLException | URISyntaxException e) {
+            throw new RuntimeException("Irrecoverable error", e); // Fail horribly
+        }
+        return ret;
+    }
+
+    @Override
+    public Map<RequestType, Long> getWorkerCount() {
+        HashMap<RequestType, Long> ret = new HashMap<>();
+        try {
+            Client c = new Client(Constants.RMQ_API_URL, Constants.RMQ_USER, Constants.RMQ_PASSWORD);
+            ret.put(RequestType.DOCUMENT, c.getQueue(Constants.RMQ_VHOST, Constants.QUEUE_DOCUMENT).getConsumerCount());
+            ret.put(RequestType.QUIZ, c.getQueue(Constants.RMQ_VHOST, Constants.QUEUE_QUIZ).getConsumerCount());
+            ret.put(RequestType.VIDEO, c.getQueue(Constants.RMQ_VHOST, Constants.QUEUE_VIDEO).getConsumerCount());
+        } catch (MalformedURLException | URISyntaxException e) {
+            throw new RuntimeException("Irrecoverable error", e); // Fail horribly
+        }
+        return ret;
+    }
+
+    @Override
+    public Map<RequestType, Double> getAverageProcessingTime() {
+        return null;
+    }
+
+    @Override
+    public void close() throws IOException {
+
+    }
+}
-- 
2.43.0