]> git.somenet.org - pub/jan/dst18.git/blob - ass3-messaging/src/main/java/dst/ass3/messaging/impl/WorkloadMonitor.java
[3.1] seems to work.
[pub/jan/dst18.git] / ass3-messaging / src / main / java / dst / ass3 / messaging / impl / WorkloadMonitor.java
1 package dst.ass3.messaging.impl;
2
3 import com.fasterxml.jackson.databind.ObjectMapper;
4 import com.rabbitmq.client.*;
5 import com.rabbitmq.http.client.Client;
6 import dst.ass3.messaging.Constants;
7 import dst.ass3.messaging.IWorkloadMonitor;
8 import dst.ass3.messaging.RequestType;
9 import dst.ass3.messaging.WorkerResponse;
10
11 import java.io.IOException;
12 import java.net.MalformedURLException;
13 import java.net.URISyntaxException;
14 import java.util.HashMap;
15 import java.util.LinkedList;
16 import java.util.Map;
17 import java.util.concurrent.TimeoutException;
18
19
20 public class WorkloadMonitor implements IWorkloadMonitor {
21     Connection conn;
22     Channel channel;
23     String monitorQName;
24     Consumer consumer;
25     HashMap<String, LinkedList<WorkerResponse>> workerRespMap = new HashMap<>();
26
27     public WorkloadMonitor() {
28         ConnectionFactory factory = new ConnectionFactory();
29         factory.setHost(Constants.RMQ_HOST);
30         factory.setPort(Integer.parseInt(Constants.RMQ_PORT));
31         factory.setUsername(Constants.RMQ_USER);
32         factory.setPassword(Constants.RMQ_PASSWORD);
33
34         try {
35             conn = factory.newConnection();
36             channel = conn.createChannel();
37             monitorQName = channel.queueDeclare().getQueue();
38             channel.queueBind(monitorQName, Constants.TOPIC_EXCHANGE, "requests.*");
39
40             consumer = new DefaultConsumer(channel) {
41                 public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
42                     ObjectMapper mapper = new ObjectMapper();
43                     String message = new String(body, "UTF-8");
44                     WorkerResponse resp = mapper.readValue(message, WorkerResponse.class);
45
46                     LinkedList<WorkerResponse> msgList;
47                     if (!workerRespMap.containsKey(envelope.getRoutingKey())) {
48                         msgList = new LinkedList<WorkerResponse>();
49                     } else {
50                         msgList = workerRespMap.get(envelope.getRoutingKey());
51                     }
52                     msgList.addFirst(resp);
53                     while (msgList.size() > 10) {
54                         msgList.removeLast();
55                     }
56                     workerRespMap.put(envelope.getRoutingKey(), msgList);
57                 }
58             };
59             channel.basicConsume(monitorQName, true, consumer);
60
61
62         } catch (IOException | TimeoutException e) {
63             throw new RuntimeException("Irrecoverable error", e); // Fail horribly
64         }
65     }
66
67     @Override
68     public Map<RequestType, Long> getRequestCount() {
69         HashMap<RequestType, Long> ret = new HashMap<>();
70         try {
71             Client c = new Client(Constants.RMQ_API_URL, Constants.RMQ_USER, Constants.RMQ_PASSWORD);
72             ret.put(RequestType.DOCUMENT, c.getQueue(Constants.RMQ_VHOST, Constants.QUEUE_DOCUMENT).getMessagesReady());
73             ret.put(RequestType.QUIZ, c.getQueue(Constants.RMQ_VHOST, Constants.QUEUE_QUIZ).getMessagesReady());
74             ret.put(RequestType.VIDEO, c.getQueue(Constants.RMQ_VHOST, Constants.QUEUE_VIDEO).getMessagesReady());
75         } catch (MalformedURLException | URISyntaxException e) {
76             throw new RuntimeException("Irrecoverable error", e); // Fail horribly
77         }
78         return ret;
79     }
80
81     @Override
82     public Map<RequestType, Long> getWorkerCount() {
83         HashMap<RequestType, Long> ret = new HashMap<>();
84         try {
85             Client c = new Client(Constants.RMQ_API_URL, Constants.RMQ_USER, Constants.RMQ_PASSWORD);
86             ret.put(RequestType.DOCUMENT, c.getQueue(Constants.RMQ_VHOST, Constants.QUEUE_DOCUMENT).getConsumerCount());
87             ret.put(RequestType.QUIZ, c.getQueue(Constants.RMQ_VHOST, Constants.QUEUE_QUIZ).getConsumerCount());
88             ret.put(RequestType.VIDEO, c.getQueue(Constants.RMQ_VHOST, Constants.QUEUE_VIDEO).getConsumerCount());
89         } catch (MalformedURLException | URISyntaxException e) {
90             throw new RuntimeException("Irrecoverable error", e); // Fail horribly
91         }
92         return ret;
93     }
94
95     @Override
96     public Map<RequestType, Double> getAverageProcessingTime() {
97         HashMap<RequestType, Double> ret = new HashMap<>();
98         ret.put(RequestType.DOCUMENT, calcAverageProcessingTimePerRoutingKey(Constants.ROUTING_KEY_DOCUMENT));
99         ret.put(RequestType.QUIZ, calcAverageProcessingTimePerRoutingKey(Constants.ROUTING_KEY_QUIZ));
100         ret.put(RequestType.VIDEO, calcAverageProcessingTimePerRoutingKey(Constants.ROUTING_KEY_VIDEO));
101         return ret;
102     }
103
104     public Double calcAverageProcessingTimePerRoutingKey(String routingKey) {
105         if (workerRespMap.containsKey(routingKey)) {
106             long ptime = 0;
107             for (WorkerResponse resp : workerRespMap.get(routingKey)) {
108                 ptime += resp.getProcessingTime();
109             }
110             return (double) ptime / (workerRespMap.get(routingKey)).size();
111         }
112         return 0.0;
113     }
114
115     @Override
116     public void close() throws IOException {
117         channel.queueDelete(monitorQName);
118         try {
119             channel.close();
120         } catch (TimeoutException ignored) {
121         }
122         conn.close();
123     }
124 }