From 2b089e349b3ef20e5ebfdd33743b7e505ca6badd Mon Sep 17 00:00:00 2001 From: Jan Vales Date: Tue, 5 Jun 2018 04:07:40 +0200 Subject: [PATCH] [3.1] seems to work. --- .../dst/ass3/messaging/impl/QueueManager.java | 3 +- .../ass3/messaging/impl/WorkloadMonitor.java | 75 ++++++++++++++++++- 2 files changed, 75 insertions(+), 3 deletions(-) diff --git a/ass3-messaging/src/main/java/dst/ass3/messaging/impl/QueueManager.java b/ass3-messaging/src/main/java/dst/ass3/messaging/impl/QueueManager.java index 4713ce8..dd87553 100644 --- a/ass3-messaging/src/main/java/dst/ass3/messaging/impl/QueueManager.java +++ b/ass3-messaging/src/main/java/dst/ass3/messaging/impl/QueueManager.java @@ -1,5 +1,6 @@ package dst.ass3.messaging.impl; +import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; @@ -21,7 +22,7 @@ public class QueueManager implements IQueueManager { try { Connection conn = factory.newConnection(); Channel channel = conn.createChannel(); - channel.exchangeDeclare(Constants.TOPIC_EXCHANGE, "direct", true); + channel.exchangeDeclare(Constants.TOPIC_EXCHANGE, BuiltinExchangeType.TOPIC, true); for (String qname : Constants.WORK_QUEUES) { channel.queueDeclare(qname, true, false, false, null); diff --git a/ass3-messaging/src/main/java/dst/ass3/messaging/impl/WorkloadMonitor.java b/ass3-messaging/src/main/java/dst/ass3/messaging/impl/WorkloadMonitor.java index 6175db2..b9b8c9a 100644 --- a/ass3-messaging/src/main/java/dst/ass3/messaging/impl/WorkloadMonitor.java +++ b/ass3-messaging/src/main/java/dst/ass3/messaging/impl/WorkloadMonitor.java @@ -1,18 +1,69 @@ package dst.ass3.messaging.impl; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.rabbitmq.client.*; import com.rabbitmq.http.client.Client; import dst.ass3.messaging.Constants; import dst.ass3.messaging.IWorkloadMonitor; import dst.ass3.messaging.RequestType; +import dst.ass3.messaging.WorkerResponse; import java.io.IOException; import java.net.MalformedURLException; import java.net.URISyntaxException; import java.util.HashMap; +import java.util.LinkedList; import java.util.Map; +import java.util.concurrent.TimeoutException; public class WorkloadMonitor implements IWorkloadMonitor { + Connection conn; + Channel channel; + String monitorQName; + Consumer consumer; + HashMap> workerRespMap = new HashMap<>(); + + public WorkloadMonitor() { + ConnectionFactory factory = new ConnectionFactory(); + factory.setHost(Constants.RMQ_HOST); + factory.setPort(Integer.parseInt(Constants.RMQ_PORT)); + factory.setUsername(Constants.RMQ_USER); + factory.setPassword(Constants.RMQ_PASSWORD); + + try { + conn = factory.newConnection(); + channel = conn.createChannel(); + monitorQName = channel.queueDeclare().getQueue(); + channel.queueBind(monitorQName, Constants.TOPIC_EXCHANGE, "requests.*"); + + consumer = new DefaultConsumer(channel) { + public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { + ObjectMapper mapper = new ObjectMapper(); + String message = new String(body, "UTF-8"); + WorkerResponse resp = mapper.readValue(message, WorkerResponse.class); + + LinkedList msgList; + if (!workerRespMap.containsKey(envelope.getRoutingKey())) { + msgList = new LinkedList(); + } else { + msgList = workerRespMap.get(envelope.getRoutingKey()); + } + msgList.addFirst(resp); + while (msgList.size() > 10) { + msgList.removeLast(); + } + workerRespMap.put(envelope.getRoutingKey(), msgList); + } + }; + channel.basicConsume(monitorQName, true, consumer); + + + } catch (IOException | TimeoutException e) { + throw new RuntimeException("Irrecoverable error", e); // Fail horribly + } + } + @Override public Map getRequestCount() { HashMap ret = new HashMap<>(); @@ -43,11 +94,31 @@ public class WorkloadMonitor implements IWorkloadMonitor { @Override public Map getAverageProcessingTime() { - return null; + HashMap ret = new HashMap<>(); + ret.put(RequestType.DOCUMENT, calcAverageProcessingTimePerRoutingKey(Constants.ROUTING_KEY_DOCUMENT)); + ret.put(RequestType.QUIZ, calcAverageProcessingTimePerRoutingKey(Constants.ROUTING_KEY_QUIZ)); + ret.put(RequestType.VIDEO, calcAverageProcessingTimePerRoutingKey(Constants.ROUTING_KEY_VIDEO)); + return ret; + } + + public Double calcAverageProcessingTimePerRoutingKey(String routingKey) { + if (workerRespMap.containsKey(routingKey)) { + long ptime = 0; + for (WorkerResponse resp : workerRespMap.get(routingKey)) { + ptime += resp.getProcessingTime(); + } + return (double) ptime / (workerRespMap.get(routingKey)).size(); + } + return 0.0; } @Override public void close() throws IOException { - + channel.queueDelete(monitorQName); + try { + channel.close(); + } catch (TimeoutException ignored) { + } + conn.close(); } } -- 2.43.0