From d216161862b40937cbe3837a4d3eca991f2d789d Mon Sep 17 00:00:00 2001
From: Jan Vales <jan@jvales.net>
Date: Tue, 5 Jun 2018 03:08:00 +0200
Subject: [PATCH] [3.1] messaging. need refactoring?
---
.../ass3/messaging/impl/MessagingFactory.java | 9 +--
.../dst/ass3/messaging/impl/QueueManager.java | 70 +++++++++++++++++++
.../ass3/messaging/impl/RequestGateway.java | 65 +++++++++++++++++
.../ass3/messaging/impl/WorkloadMonitor.java | 53 ++++++++++++++
4 files changed, 191 insertions(+), 6 deletions(-)
create mode 100644 ass3-messaging/src/main/java/dst/ass3/messaging/impl/QueueManager.java
create mode 100644 ass3-messaging/src/main/java/dst/ass3/messaging/impl/RequestGateway.java
create mode 100644 ass3-messaging/src/main/java/dst/ass3/messaging/impl/WorkloadMonitor.java
diff --git a/ass3-messaging/src/main/java/dst/ass3/messaging/impl/MessagingFactory.java b/ass3-messaging/src/main/java/dst/ass3/messaging/impl/MessagingFactory.java
index 4ecc2c4..450724b 100644
--- a/ass3-messaging/src/main/java/dst/ass3/messaging/impl/MessagingFactory.java
+++ b/ass3-messaging/src/main/java/dst/ass3/messaging/impl/MessagingFactory.java
@@ -9,20 +9,17 @@ public class MessagingFactory implements IMessagingFactory {
@Override
public IQueueManager createQueueManager() {
- // TODO
- return null;
+ return new QueueManager();
}
@Override
public IRequestGateway createRequestGateway() {
- // TODO
- return null;
+ return new RequestGateway();
}
@Override
public IWorkloadMonitor createWorkloadMonitor() {
- // TODO
- return null;
+ return new WorkloadMonitor();
}
@Override
diff --git a/ass3-messaging/src/main/java/dst/ass3/messaging/impl/QueueManager.java b/ass3-messaging/src/main/java/dst/ass3/messaging/impl/QueueManager.java
new file mode 100644
index 0000000..4c34145
--- /dev/null
+++ b/ass3-messaging/src/main/java/dst/ass3/messaging/impl/QueueManager.java
@@ -0,0 +1,70 @@
+package dst.ass3.messaging.impl;
+
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.Connection;
+import com.rabbitmq.client.ConnectionFactory;
+import dst.ass3.messaging.Constants;
+import dst.ass3.messaging.IQueueManager;
+
+import java.io.IOException;
+import java.util.concurrent.TimeoutException;
+
+public class QueueManager implements IQueueManager {
+ @Override
+ public void setUp() {
+ ConnectionFactory factory = new ConnectionFactory();
+ factory.setHost(Constants.RMQ_HOST);
+ factory.setPort(Integer.parseInt(Constants.RMQ_PORT));
+ factory.setUsername(Constants.RMQ_USER);
+ factory.setPassword(Constants.RMQ_PASSWORD);
+
+ try {
+ Connection conn = factory.newConnection();
+ Channel channel = conn.createChannel();
+ channel.exchangeDeclare(Constants.TOPIC_EXCHANGE, "direct", true);
+
+ for (String qname : Constants.WORK_QUEUES) {
+ channel.queueDeclare(qname, true, false, false, null);
+ }
+
+ //some manual work needed: routing keys not loopable :(
+ channel.queueBind(Constants.QUEUE_DOCUMENT, Constants.TOPIC_EXCHANGE, Constants.ROUTING_KEY_DOCUMENT);
+ channel.queueBind(Constants.QUEUE_QUIZ, Constants.TOPIC_EXCHANGE, Constants.ROUTING_KEY_QUIZ);
+ channel.queueBind(Constants.QUEUE_VIDEO, Constants.TOPIC_EXCHANGE, Constants.ROUTING_KEY_VIDEO);
+
+ channel.close();
+ conn.close();
+ } catch (IOException | TimeoutException e) {
+ throw new RuntimeException("Irrecoverable error", e); // Fail horribly
+ }
+ }
+
+ @Override
+ public void tearDown() {
+ ConnectionFactory factory = new ConnectionFactory();
+ factory.setHost(Constants.RMQ_HOST);
+ factory.setPort(Integer.parseInt(Constants.RMQ_PORT));
+ factory.setUsername(Constants.RMQ_USER);
+ factory.setPassword(Constants.RMQ_PASSWORD);
+
+ try {
+ Connection conn = factory.newConnection();
+ Channel channel = conn.createChannel();
+ channel.exchangeDelete(Constants.TOPIC_EXCHANGE);
+
+ for (String qname : Constants.WORK_QUEUES) {
+ channel.queueDelete(qname);
+ }
+
+ channel.close();
+ conn.close();
+ } catch (IOException | TimeoutException e) {
+ throw new RuntimeException("Irrecoverable error", e); // Fail horribly
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ tearDown(); // TODO: unsure if needed.
+ }
+}
diff --git a/ass3-messaging/src/main/java/dst/ass3/messaging/impl/RequestGateway.java b/ass3-messaging/src/main/java/dst/ass3/messaging/impl/RequestGateway.java
new file mode 100644
index 0000000..13b435b
--- /dev/null
+++ b/ass3-messaging/src/main/java/dst/ass3/messaging/impl/RequestGateway.java
@@ -0,0 +1,65 @@
+package dst.ass3.messaging.impl;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.Connection;
+import com.rabbitmq.client.ConnectionFactory;
+import com.rabbitmq.client.MessageProperties;
+import dst.ass3.messaging.Constants;
+import dst.ass3.messaging.IRequestGateway;
+import dst.ass3.messaging.UploadRequest;
+
+import java.io.IOException;
+import java.util.concurrent.TimeoutException;
+
+public class RequestGateway implements IRequestGateway {
+ Connection conn;
+ Channel channel;
+
+ public RequestGateway() {
+ ConnectionFactory factory = new ConnectionFactory();
+ factory.setHost(Constants.RMQ_HOST);
+ factory.setPort(Integer.parseInt(Constants.RMQ_PORT));
+ factory.setUsername(Constants.RMQ_USER);
+ factory.setPassword(Constants.RMQ_PASSWORD);
+
+ try {
+ conn = factory.newConnection();
+ channel = conn.createChannel();
+ } catch (IOException | TimeoutException e) {
+ throw new RuntimeException("Irrecoverable error", e); // Fail horribly
+ }
+ }
+
+ @Override
+ public void uploadRequest(UploadRequest request) {
+ String routingKey = null;
+ switch (request.getType()) {
+ case DOCUMENT:
+ routingKey = Constants.ROUTING_KEY_DOCUMENT;
+ break;
+ case QUIZ:
+ routingKey = Constants.ROUTING_KEY_QUIZ;
+ break;
+ case VIDEO:
+ routingKey = Constants.ROUTING_KEY_VIDEO;
+ break;
+ }
+
+ ObjectMapper mapper = new ObjectMapper();
+ try {
+ channel.basicPublish(Constants.TOPIC_EXCHANGE, routingKey, false, false, MessageProperties.BASIC, mapper.writeValueAsBytes(request));
+ } catch (IOException e) {
+ e.printStackTrace(); // TODO: may not be irrecoverable!
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ try {
+ channel.close();
+ } catch (TimeoutException ignored) {
+ }
+ conn.close();
+ }
+}
diff --git a/ass3-messaging/src/main/java/dst/ass3/messaging/impl/WorkloadMonitor.java b/ass3-messaging/src/main/java/dst/ass3/messaging/impl/WorkloadMonitor.java
new file mode 100644
index 0000000..6175db2
--- /dev/null
+++ b/ass3-messaging/src/main/java/dst/ass3/messaging/impl/WorkloadMonitor.java
@@ -0,0 +1,53 @@
+package dst.ass3.messaging.impl;
+
+import com.rabbitmq.http.client.Client;
+import dst.ass3.messaging.Constants;
+import dst.ass3.messaging.IWorkloadMonitor;
+import dst.ass3.messaging.RequestType;
+
+import java.io.IOException;
+import java.net.MalformedURLException;
+import java.net.URISyntaxException;
+import java.util.HashMap;
+import java.util.Map;
+
+
+public class WorkloadMonitor implements IWorkloadMonitor {
+ @Override
+ public Map<RequestType, Long> getRequestCount() {
+ HashMap<RequestType, Long> ret = new HashMap<>();
+ try {
+ Client c = new Client(Constants.RMQ_API_URL, Constants.RMQ_USER, Constants.RMQ_PASSWORD);
+ ret.put(RequestType.DOCUMENT, c.getQueue(Constants.RMQ_VHOST, Constants.QUEUE_DOCUMENT).getMessagesReady());
+ ret.put(RequestType.QUIZ, c.getQueue(Constants.RMQ_VHOST, Constants.QUEUE_QUIZ).getMessagesReady());
+ ret.put(RequestType.VIDEO, c.getQueue(Constants.RMQ_VHOST, Constants.QUEUE_VIDEO).getMessagesReady());
+ } catch (MalformedURLException | URISyntaxException e) {
+ throw new RuntimeException("Irrecoverable error", e); // Fail horribly
+ }
+ return ret;
+ }
+
+ @Override
+ public Map<RequestType, Long> getWorkerCount() {
+ HashMap<RequestType, Long> ret = new HashMap<>();
+ try {
+ Client c = new Client(Constants.RMQ_API_URL, Constants.RMQ_USER, Constants.RMQ_PASSWORD);
+ ret.put(RequestType.DOCUMENT, c.getQueue(Constants.RMQ_VHOST, Constants.QUEUE_DOCUMENT).getConsumerCount());
+ ret.put(RequestType.QUIZ, c.getQueue(Constants.RMQ_VHOST, Constants.QUEUE_QUIZ).getConsumerCount());
+ ret.put(RequestType.VIDEO, c.getQueue(Constants.RMQ_VHOST, Constants.QUEUE_VIDEO).getConsumerCount());
+ } catch (MalformedURLException | URISyntaxException e) {
+ throw new RuntimeException("Irrecoverable error", e); // Fail horribly
+ }
+ return ret;
+ }
+
+ @Override
+ public Map<RequestType, Double> getAverageProcessingTime() {
+ return null;
+ }
+
+ @Override
+ public void close() throws IOException {
+
+ }
+}
--
2.47.3