From 2b089e349b3ef20e5ebfdd33743b7e505ca6badd Mon Sep 17 00:00:00 2001
From: Jan Vales <jan@jvales.net>
Date: Tue, 5 Jun 2018 04:07:40 +0200
Subject: [PATCH] [3.1] seems to work.

---
 .../dst/ass3/messaging/impl/QueueManager.java |  3 +-
 .../ass3/messaging/impl/WorkloadMonitor.java  | 75 ++++++++++++++++++-
 2 files changed, 75 insertions(+), 3 deletions(-)

diff --git a/ass3-messaging/src/main/java/dst/ass3/messaging/impl/QueueManager.java b/ass3-messaging/src/main/java/dst/ass3/messaging/impl/QueueManager.java
index 4713ce8..dd87553 100644
--- a/ass3-messaging/src/main/java/dst/ass3/messaging/impl/QueueManager.java
+++ b/ass3-messaging/src/main/java/dst/ass3/messaging/impl/QueueManager.java
@@ -1,5 +1,6 @@
 package dst.ass3.messaging.impl;
 
+import com.rabbitmq.client.BuiltinExchangeType;
 import com.rabbitmq.client.Channel;
 import com.rabbitmq.client.Connection;
 import com.rabbitmq.client.ConnectionFactory;
@@ -21,7 +22,7 @@ public class QueueManager implements IQueueManager {
         try {
             Connection conn = factory.newConnection();
             Channel channel = conn.createChannel();
-            channel.exchangeDeclare(Constants.TOPIC_EXCHANGE, "direct", true);
+            channel.exchangeDeclare(Constants.TOPIC_EXCHANGE, BuiltinExchangeType.TOPIC, true);
 
             for (String qname : Constants.WORK_QUEUES) {
                 channel.queueDeclare(qname, true, false, false, null);
diff --git a/ass3-messaging/src/main/java/dst/ass3/messaging/impl/WorkloadMonitor.java b/ass3-messaging/src/main/java/dst/ass3/messaging/impl/WorkloadMonitor.java
index 6175db2..b9b8c9a 100644
--- a/ass3-messaging/src/main/java/dst/ass3/messaging/impl/WorkloadMonitor.java
+++ b/ass3-messaging/src/main/java/dst/ass3/messaging/impl/WorkloadMonitor.java
@@ -1,18 +1,69 @@
 package dst.ass3.messaging.impl;
 
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.rabbitmq.client.*;
 import com.rabbitmq.http.client.Client;
 import dst.ass3.messaging.Constants;
 import dst.ass3.messaging.IWorkloadMonitor;
 import dst.ass3.messaging.RequestType;
+import dst.ass3.messaging.WorkerResponse;
 
 import java.io.IOException;
 import java.net.MalformedURLException;
 import java.net.URISyntaxException;
 import java.util.HashMap;
+import java.util.LinkedList;
 import java.util.Map;
+import java.util.concurrent.TimeoutException;
 
 
 public class WorkloadMonitor implements IWorkloadMonitor {
+    Connection conn;
+    Channel channel;
+    String monitorQName;
+    Consumer consumer;
+    HashMap<String, LinkedList<WorkerResponse>> workerRespMap = new HashMap<>();
+
+    public WorkloadMonitor() {
+        ConnectionFactory factory = new ConnectionFactory();
+        factory.setHost(Constants.RMQ_HOST);
+        factory.setPort(Integer.parseInt(Constants.RMQ_PORT));
+        factory.setUsername(Constants.RMQ_USER);
+        factory.setPassword(Constants.RMQ_PASSWORD);
+
+        try {
+            conn = factory.newConnection();
+            channel = conn.createChannel();
+            monitorQName = channel.queueDeclare().getQueue();
+            channel.queueBind(monitorQName, Constants.TOPIC_EXCHANGE, "requests.*");
+
+            consumer = new DefaultConsumer(channel) {
+                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
+                    ObjectMapper mapper = new ObjectMapper();
+                    String message = new String(body, "UTF-8");
+                    WorkerResponse resp = mapper.readValue(message, WorkerResponse.class);
+
+                    LinkedList<WorkerResponse> msgList;
+                    if (!workerRespMap.containsKey(envelope.getRoutingKey())) {
+                        msgList = new LinkedList<WorkerResponse>();
+                    } else {
+                        msgList = workerRespMap.get(envelope.getRoutingKey());
+                    }
+                    msgList.addFirst(resp);
+                    while (msgList.size() > 10) {
+                        msgList.removeLast();
+                    }
+                    workerRespMap.put(envelope.getRoutingKey(), msgList);
+                }
+            };
+            channel.basicConsume(monitorQName, true, consumer);
+
+
+        } catch (IOException | TimeoutException e) {
+            throw new RuntimeException("Irrecoverable error", e); // Fail horribly
+        }
+    }
+
     @Override
     public Map<RequestType, Long> getRequestCount() {
         HashMap<RequestType, Long> ret = new HashMap<>();
@@ -43,11 +94,31 @@ public class WorkloadMonitor implements IWorkloadMonitor {
 
     @Override
     public Map<RequestType, Double> getAverageProcessingTime() {
-        return null;
+        HashMap<RequestType, Double> ret = new HashMap<>();
+        ret.put(RequestType.DOCUMENT, calcAverageProcessingTimePerRoutingKey(Constants.ROUTING_KEY_DOCUMENT));
+        ret.put(RequestType.QUIZ, calcAverageProcessingTimePerRoutingKey(Constants.ROUTING_KEY_QUIZ));
+        ret.put(RequestType.VIDEO, calcAverageProcessingTimePerRoutingKey(Constants.ROUTING_KEY_VIDEO));
+        return ret;
+    }
+
+    public Double calcAverageProcessingTimePerRoutingKey(String routingKey) {
+        if (workerRespMap.containsKey(routingKey)) {
+            long ptime = 0;
+            for (WorkerResponse resp : workerRespMap.get(routingKey)) {
+                ptime += resp.getProcessingTime();
+            }
+            return (double) ptime / (workerRespMap.get(routingKey)).size();
+        }
+        return 0.0;
     }
 
     @Override
     public void close() throws IOException {
-
+        channel.queueDelete(monitorQName);
+        try {
+            channel.close();
+        } catch (TimeoutException ignored) {
+        }
+        conn.close();
     }
 }
-- 
2.43.0