From d216161862b40937cbe3837a4d3eca991f2d789d Mon Sep 17 00:00:00 2001 From: Jan Vales Date: Tue, 5 Jun 2018 03:08:00 +0200 Subject: [PATCH] [3.1] messaging. need refactoring? --- .../ass3/messaging/impl/MessagingFactory.java | 9 +-- .../dst/ass3/messaging/impl/QueueManager.java | 70 +++++++++++++++++++ .../ass3/messaging/impl/RequestGateway.java | 65 +++++++++++++++++ .../ass3/messaging/impl/WorkloadMonitor.java | 53 ++++++++++++++ 4 files changed, 191 insertions(+), 6 deletions(-) create mode 100644 ass3-messaging/src/main/java/dst/ass3/messaging/impl/QueueManager.java create mode 100644 ass3-messaging/src/main/java/dst/ass3/messaging/impl/RequestGateway.java create mode 100644 ass3-messaging/src/main/java/dst/ass3/messaging/impl/WorkloadMonitor.java diff --git a/ass3-messaging/src/main/java/dst/ass3/messaging/impl/MessagingFactory.java b/ass3-messaging/src/main/java/dst/ass3/messaging/impl/MessagingFactory.java index 4ecc2c4..450724b 100644 --- a/ass3-messaging/src/main/java/dst/ass3/messaging/impl/MessagingFactory.java +++ b/ass3-messaging/src/main/java/dst/ass3/messaging/impl/MessagingFactory.java @@ -9,20 +9,17 @@ public class MessagingFactory implements IMessagingFactory { @Override public IQueueManager createQueueManager() { - // TODO - return null; + return new QueueManager(); } @Override public IRequestGateway createRequestGateway() { - // TODO - return null; + return new RequestGateway(); } @Override public IWorkloadMonitor createWorkloadMonitor() { - // TODO - return null; + return new WorkloadMonitor(); } @Override diff --git a/ass3-messaging/src/main/java/dst/ass3/messaging/impl/QueueManager.java b/ass3-messaging/src/main/java/dst/ass3/messaging/impl/QueueManager.java new file mode 100644 index 0000000..4c34145 --- /dev/null +++ b/ass3-messaging/src/main/java/dst/ass3/messaging/impl/QueueManager.java @@ -0,0 +1,70 @@ +package dst.ass3.messaging.impl; + +import com.rabbitmq.client.Channel; +import com.rabbitmq.client.Connection; +import com.rabbitmq.client.ConnectionFactory; +import dst.ass3.messaging.Constants; +import dst.ass3.messaging.IQueueManager; + +import java.io.IOException; +import java.util.concurrent.TimeoutException; + +public class QueueManager implements IQueueManager { + @Override + public void setUp() { + ConnectionFactory factory = new ConnectionFactory(); + factory.setHost(Constants.RMQ_HOST); + factory.setPort(Integer.parseInt(Constants.RMQ_PORT)); + factory.setUsername(Constants.RMQ_USER); + factory.setPassword(Constants.RMQ_PASSWORD); + + try { + Connection conn = factory.newConnection(); + Channel channel = conn.createChannel(); + channel.exchangeDeclare(Constants.TOPIC_EXCHANGE, "direct", true); + + for (String qname : Constants.WORK_QUEUES) { + channel.queueDeclare(qname, true, false, false, null); + } + + //some manual work needed: routing keys not loopable :( + channel.queueBind(Constants.QUEUE_DOCUMENT, Constants.TOPIC_EXCHANGE, Constants.ROUTING_KEY_DOCUMENT); + channel.queueBind(Constants.QUEUE_QUIZ, Constants.TOPIC_EXCHANGE, Constants.ROUTING_KEY_QUIZ); + channel.queueBind(Constants.QUEUE_VIDEO, Constants.TOPIC_EXCHANGE, Constants.ROUTING_KEY_VIDEO); + + channel.close(); + conn.close(); + } catch (IOException | TimeoutException e) { + throw new RuntimeException("Irrecoverable error", e); // Fail horribly + } + } + + @Override + public void tearDown() { + ConnectionFactory factory = new ConnectionFactory(); + factory.setHost(Constants.RMQ_HOST); + factory.setPort(Integer.parseInt(Constants.RMQ_PORT)); + factory.setUsername(Constants.RMQ_USER); + factory.setPassword(Constants.RMQ_PASSWORD); + + try { + Connection conn = factory.newConnection(); + Channel channel = conn.createChannel(); + channel.exchangeDelete(Constants.TOPIC_EXCHANGE); + + for (String qname : Constants.WORK_QUEUES) { + channel.queueDelete(qname); + } + + channel.close(); + conn.close(); + } catch (IOException | TimeoutException e) { + throw new RuntimeException("Irrecoverable error", e); // Fail horribly + } + } + + @Override + public void close() throws IOException { + tearDown(); // TODO: unsure if needed. + } +} diff --git a/ass3-messaging/src/main/java/dst/ass3/messaging/impl/RequestGateway.java b/ass3-messaging/src/main/java/dst/ass3/messaging/impl/RequestGateway.java new file mode 100644 index 0000000..13b435b --- /dev/null +++ b/ass3-messaging/src/main/java/dst/ass3/messaging/impl/RequestGateway.java @@ -0,0 +1,65 @@ +package dst.ass3.messaging.impl; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.rabbitmq.client.Channel; +import com.rabbitmq.client.Connection; +import com.rabbitmq.client.ConnectionFactory; +import com.rabbitmq.client.MessageProperties; +import dst.ass3.messaging.Constants; +import dst.ass3.messaging.IRequestGateway; +import dst.ass3.messaging.UploadRequest; + +import java.io.IOException; +import java.util.concurrent.TimeoutException; + +public class RequestGateway implements IRequestGateway { + Connection conn; + Channel channel; + + public RequestGateway() { + ConnectionFactory factory = new ConnectionFactory(); + factory.setHost(Constants.RMQ_HOST); + factory.setPort(Integer.parseInt(Constants.RMQ_PORT)); + factory.setUsername(Constants.RMQ_USER); + factory.setPassword(Constants.RMQ_PASSWORD); + + try { + conn = factory.newConnection(); + channel = conn.createChannel(); + } catch (IOException | TimeoutException e) { + throw new RuntimeException("Irrecoverable error", e); // Fail horribly + } + } + + @Override + public void uploadRequest(UploadRequest request) { + String routingKey = null; + switch (request.getType()) { + case DOCUMENT: + routingKey = Constants.ROUTING_KEY_DOCUMENT; + break; + case QUIZ: + routingKey = Constants.ROUTING_KEY_QUIZ; + break; + case VIDEO: + routingKey = Constants.ROUTING_KEY_VIDEO; + break; + } + + ObjectMapper mapper = new ObjectMapper(); + try { + channel.basicPublish(Constants.TOPIC_EXCHANGE, routingKey, false, false, MessageProperties.BASIC, mapper.writeValueAsBytes(request)); + } catch (IOException e) { + e.printStackTrace(); // TODO: may not be irrecoverable! + } + } + + @Override + public void close() throws IOException { + try { + channel.close(); + } catch (TimeoutException ignored) { + } + conn.close(); + } +} diff --git a/ass3-messaging/src/main/java/dst/ass3/messaging/impl/WorkloadMonitor.java b/ass3-messaging/src/main/java/dst/ass3/messaging/impl/WorkloadMonitor.java new file mode 100644 index 0000000..6175db2 --- /dev/null +++ b/ass3-messaging/src/main/java/dst/ass3/messaging/impl/WorkloadMonitor.java @@ -0,0 +1,53 @@ +package dst.ass3.messaging.impl; + +import com.rabbitmq.http.client.Client; +import dst.ass3.messaging.Constants; +import dst.ass3.messaging.IWorkloadMonitor; +import dst.ass3.messaging.RequestType; + +import java.io.IOException; +import java.net.MalformedURLException; +import java.net.URISyntaxException; +import java.util.HashMap; +import java.util.Map; + + +public class WorkloadMonitor implements IWorkloadMonitor { + @Override + public Map getRequestCount() { + HashMap ret = new HashMap<>(); + try { + Client c = new Client(Constants.RMQ_API_URL, Constants.RMQ_USER, Constants.RMQ_PASSWORD); + ret.put(RequestType.DOCUMENT, c.getQueue(Constants.RMQ_VHOST, Constants.QUEUE_DOCUMENT).getMessagesReady()); + ret.put(RequestType.QUIZ, c.getQueue(Constants.RMQ_VHOST, Constants.QUEUE_QUIZ).getMessagesReady()); + ret.put(RequestType.VIDEO, c.getQueue(Constants.RMQ_VHOST, Constants.QUEUE_VIDEO).getMessagesReady()); + } catch (MalformedURLException | URISyntaxException e) { + throw new RuntimeException("Irrecoverable error", e); // Fail horribly + } + return ret; + } + + @Override + public Map getWorkerCount() { + HashMap ret = new HashMap<>(); + try { + Client c = new Client(Constants.RMQ_API_URL, Constants.RMQ_USER, Constants.RMQ_PASSWORD); + ret.put(RequestType.DOCUMENT, c.getQueue(Constants.RMQ_VHOST, Constants.QUEUE_DOCUMENT).getConsumerCount()); + ret.put(RequestType.QUIZ, c.getQueue(Constants.RMQ_VHOST, Constants.QUEUE_QUIZ).getConsumerCount()); + ret.put(RequestType.VIDEO, c.getQueue(Constants.RMQ_VHOST, Constants.QUEUE_VIDEO).getConsumerCount()); + } catch (MalformedURLException | URISyntaxException e) { + throw new RuntimeException("Irrecoverable error", e); // Fail horribly + } + return ret; + } + + @Override + public Map getAverageProcessingTime() { + return null; + } + + @Override + public void close() throws IOException { + + } +} -- 2.43.0