From 2b089e349b3ef20e5ebfdd33743b7e505ca6badd Mon Sep 17 00:00:00 2001
From: Jan Vales <jan@jvales.net>
Date: Tue, 5 Jun 2018 04:07:40 +0200
Subject: [PATCH] [3.1] seems to work.
---
.../dst/ass3/messaging/impl/QueueManager.java | 3 +-
.../ass3/messaging/impl/WorkloadMonitor.java | 75 ++++++++++++++++++-
2 files changed, 75 insertions(+), 3 deletions(-)
diff --git a/ass3-messaging/src/main/java/dst/ass3/messaging/impl/QueueManager.java b/ass3-messaging/src/main/java/dst/ass3/messaging/impl/QueueManager.java
index 4713ce8..dd87553 100644
--- a/ass3-messaging/src/main/java/dst/ass3/messaging/impl/QueueManager.java
+++ b/ass3-messaging/src/main/java/dst/ass3/messaging/impl/QueueManager.java
@@ -1,5 +1,6 @@
package dst.ass3.messaging.impl;
+import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
@@ -21,7 +22,7 @@ public class QueueManager implements IQueueManager {
try {
Connection conn = factory.newConnection();
Channel channel = conn.createChannel();
- channel.exchangeDeclare(Constants.TOPIC_EXCHANGE, "direct", true);
+ channel.exchangeDeclare(Constants.TOPIC_EXCHANGE, BuiltinExchangeType.TOPIC, true);
for (String qname : Constants.WORK_QUEUES) {
channel.queueDeclare(qname, true, false, false, null);
diff --git a/ass3-messaging/src/main/java/dst/ass3/messaging/impl/WorkloadMonitor.java b/ass3-messaging/src/main/java/dst/ass3/messaging/impl/WorkloadMonitor.java
index 6175db2..b9b8c9a 100644
--- a/ass3-messaging/src/main/java/dst/ass3/messaging/impl/WorkloadMonitor.java
+++ b/ass3-messaging/src/main/java/dst/ass3/messaging/impl/WorkloadMonitor.java
@@ -1,18 +1,69 @@
package dst.ass3.messaging.impl;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.rabbitmq.client.*;
import com.rabbitmq.http.client.Client;
import dst.ass3.messaging.Constants;
import dst.ass3.messaging.IWorkloadMonitor;
import dst.ass3.messaging.RequestType;
+import dst.ass3.messaging.WorkerResponse;
import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URISyntaxException;
import java.util.HashMap;
+import java.util.LinkedList;
import java.util.Map;
+import java.util.concurrent.TimeoutException;
public class WorkloadMonitor implements IWorkloadMonitor {
+ Connection conn;
+ Channel channel;
+ String monitorQName;
+ Consumer consumer;
+ HashMap<String, LinkedList<WorkerResponse>> workerRespMap = new HashMap<>();
+
+ public WorkloadMonitor() {
+ ConnectionFactory factory = new ConnectionFactory();
+ factory.setHost(Constants.RMQ_HOST);
+ factory.setPort(Integer.parseInt(Constants.RMQ_PORT));
+ factory.setUsername(Constants.RMQ_USER);
+ factory.setPassword(Constants.RMQ_PASSWORD);
+
+ try {
+ conn = factory.newConnection();
+ channel = conn.createChannel();
+ monitorQName = channel.queueDeclare().getQueue();
+ channel.queueBind(monitorQName, Constants.TOPIC_EXCHANGE, "requests.*");
+
+ consumer = new DefaultConsumer(channel) {
+ public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
+ ObjectMapper mapper = new ObjectMapper();
+ String message = new String(body, "UTF-8");
+ WorkerResponse resp = mapper.readValue(message, WorkerResponse.class);
+
+ LinkedList<WorkerResponse> msgList;
+ if (!workerRespMap.containsKey(envelope.getRoutingKey())) {
+ msgList = new LinkedList<WorkerResponse>();
+ } else {
+ msgList = workerRespMap.get(envelope.getRoutingKey());
+ }
+ msgList.addFirst(resp);
+ while (msgList.size() > 10) {
+ msgList.removeLast();
+ }
+ workerRespMap.put(envelope.getRoutingKey(), msgList);
+ }
+ };
+ channel.basicConsume(monitorQName, true, consumer);
+
+
+ } catch (IOException | TimeoutException e) {
+ throw new RuntimeException("Irrecoverable error", e); // Fail horribly
+ }
+ }
+
@Override
public Map<RequestType, Long> getRequestCount() {
HashMap<RequestType, Long> ret = new HashMap<>();
@@ -43,11 +94,31 @@ public class WorkloadMonitor implements IWorkloadMonitor {
@Override
public Map<RequestType, Double> getAverageProcessingTime() {
- return null;
+ HashMap<RequestType, Double> ret = new HashMap<>();
+ ret.put(RequestType.DOCUMENT, calcAverageProcessingTimePerRoutingKey(Constants.ROUTING_KEY_DOCUMENT));
+ ret.put(RequestType.QUIZ, calcAverageProcessingTimePerRoutingKey(Constants.ROUTING_KEY_QUIZ));
+ ret.put(RequestType.VIDEO, calcAverageProcessingTimePerRoutingKey(Constants.ROUTING_KEY_VIDEO));
+ return ret;
+ }
+
+ public Double calcAverageProcessingTimePerRoutingKey(String routingKey) {
+ if (workerRespMap.containsKey(routingKey)) {
+ long ptime = 0;
+ for (WorkerResponse resp : workerRespMap.get(routingKey)) {
+ ptime += resp.getProcessingTime();
+ }
+ return (double) ptime / (workerRespMap.get(routingKey)).size();
+ }
+ return 0.0;
}
@Override
public void close() throws IOException {
-
+ channel.queueDelete(monitorQName);
+ try {
+ channel.close();
+ } catch (TimeoutException ignored) {
+ }
+ conn.close();
}
}
--
2.47.3