1 package dst.ass3.messaging.impl;
3 import com.fasterxml.jackson.databind.ObjectMapper;
4 import com.rabbitmq.client.*;
5 import com.rabbitmq.http.client.Client;
6 import dst.ass3.messaging.Constants;
7 import dst.ass3.messaging.IWorkloadMonitor;
8 import dst.ass3.messaging.RequestType;
9 import dst.ass3.messaging.WorkerResponse;
11 import java.io.IOException;
12 import java.net.MalformedURLException;
13 import java.net.URISyntaxException;
14 import java.util.HashMap;
15 import java.util.LinkedList;
17 import java.util.concurrent.TimeoutException;
20 public class WorkloadMonitor implements IWorkloadMonitor {
25 HashMap<String, LinkedList<WorkerResponse>> workerRespMap = new HashMap<>();
27 public WorkloadMonitor() {
28 ConnectionFactory factory = new ConnectionFactory();
29 factory.setHost(Constants.RMQ_HOST);
30 factory.setPort(Integer.parseInt(Constants.RMQ_PORT));
31 factory.setUsername(Constants.RMQ_USER);
32 factory.setPassword(Constants.RMQ_PASSWORD);
35 conn = factory.newConnection();
36 channel = conn.createChannel();
37 monitorQName = channel.queueDeclare().getQueue();
38 channel.queueBind(monitorQName, Constants.TOPIC_EXCHANGE, "requests.*");
40 consumer = new DefaultConsumer(channel) {
41 public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
42 ObjectMapper mapper = new ObjectMapper();
43 String message = new String(body, "UTF-8");
44 WorkerResponse resp = mapper.readValue(message, WorkerResponse.class);
46 LinkedList<WorkerResponse> msgList;
47 if (!workerRespMap.containsKey(envelope.getRoutingKey())) {
48 msgList = new LinkedList<WorkerResponse>();
50 msgList = workerRespMap.get(envelope.getRoutingKey());
52 msgList.addFirst(resp);
53 while (msgList.size() > 10) {
56 workerRespMap.put(envelope.getRoutingKey(), msgList);
59 channel.basicConsume(monitorQName, true, consumer);
62 } catch (IOException | TimeoutException e) {
63 throw new RuntimeException("Irrecoverable error", e); // Fail horribly
68 public Map<RequestType, Long> getRequestCount() {
69 HashMap<RequestType, Long> ret = new HashMap<>();
71 Client c = new Client(Constants.RMQ_API_URL, Constants.RMQ_USER, Constants.RMQ_PASSWORD);
72 ret.put(RequestType.DOCUMENT, c.getQueue(Constants.RMQ_VHOST, Constants.QUEUE_DOCUMENT).getMessagesReady());
73 ret.put(RequestType.QUIZ, c.getQueue(Constants.RMQ_VHOST, Constants.QUEUE_QUIZ).getMessagesReady());
74 ret.put(RequestType.VIDEO, c.getQueue(Constants.RMQ_VHOST, Constants.QUEUE_VIDEO).getMessagesReady());
75 } catch (MalformedURLException | URISyntaxException e) {
76 throw new RuntimeException("Irrecoverable error", e); // Fail horribly
82 public Map<RequestType, Long> getWorkerCount() {
83 HashMap<RequestType, Long> ret = new HashMap<>();
85 Client c = new Client(Constants.RMQ_API_URL, Constants.RMQ_USER, Constants.RMQ_PASSWORD);
86 ret.put(RequestType.DOCUMENT, c.getQueue(Constants.RMQ_VHOST, Constants.QUEUE_DOCUMENT).getConsumerCount());
87 ret.put(RequestType.QUIZ, c.getQueue(Constants.RMQ_VHOST, Constants.QUEUE_QUIZ).getConsumerCount());
88 ret.put(RequestType.VIDEO, c.getQueue(Constants.RMQ_VHOST, Constants.QUEUE_VIDEO).getConsumerCount());
89 } catch (MalformedURLException | URISyntaxException e) {
90 throw new RuntimeException("Irrecoverable error", e); // Fail horribly
96 public Map<RequestType, Double> getAverageProcessingTime() {
97 HashMap<RequestType, Double> ret = new HashMap<>();
98 ret.put(RequestType.DOCUMENT, calcAverageProcessingTimePerRoutingKey(Constants.ROUTING_KEY_DOCUMENT));
99 ret.put(RequestType.QUIZ, calcAverageProcessingTimePerRoutingKey(Constants.ROUTING_KEY_QUIZ));
100 ret.put(RequestType.VIDEO, calcAverageProcessingTimePerRoutingKey(Constants.ROUTING_KEY_VIDEO));
104 public Double calcAverageProcessingTimePerRoutingKey(String routingKey) {
105 if (workerRespMap.containsKey(routingKey)) {
107 for (WorkerResponse resp : workerRespMap.get(routingKey)) {
108 ptime += resp.getProcessingTime();
110 return (double) ptime / (workerRespMap.get(routingKey)).size();
116 public void close() throws IOException {
117 channel.queueDelete(monitorQName);
120 } catch (TimeoutException ignored) {