1 package dst.ass3.messaging.impl;
3 import com.rabbitmq.client.BuiltinExchangeType;
4 import com.rabbitmq.client.Channel;
5 import com.rabbitmq.client.Connection;
6 import com.rabbitmq.client.ConnectionFactory;
7 import dst.ass3.messaging.Constants;
8 import dst.ass3.messaging.IQueueManager;
10 import java.io.IOException;
11 import java.util.concurrent.TimeoutException;
13 public class QueueManager implements IQueueManager {
16 ConnectionFactory factory = new ConnectionFactory();
17 factory.setHost(Constants.RMQ_HOST);
18 factory.setPort(Integer.parseInt(Constants.RMQ_PORT));
19 factory.setUsername(Constants.RMQ_USER);
20 factory.setPassword(Constants.RMQ_PASSWORD);
23 Connection conn = factory.newConnection();
24 Channel channel = conn.createChannel();
25 channel.exchangeDeclare(Constants.TOPIC_EXCHANGE, BuiltinExchangeType.TOPIC, true);
27 for (String qname : Constants.WORK_QUEUES) {
28 channel.queueDeclare(qname, true, false, false, null);
33 } catch (IOException | TimeoutException e) {
34 throw new RuntimeException("Irrecoverable error", e); // Fail horribly
39 public void tearDown() {
40 ConnectionFactory factory = new ConnectionFactory();
41 factory.setHost(Constants.RMQ_HOST);
42 factory.setPort(Integer.parseInt(Constants.RMQ_PORT));
43 factory.setUsername(Constants.RMQ_USER);
44 factory.setPassword(Constants.RMQ_PASSWORD);
47 Connection conn = factory.newConnection();
48 Channel channel = conn.createChannel();
49 channel.exchangeDelete(Constants.TOPIC_EXCHANGE);
51 for (String qname : Constants.WORK_QUEUES) {
52 channel.queueDelete(qname);
57 } catch (IOException | TimeoutException e) {
58 throw new RuntimeException("Irrecoverable error", e); // Fail horribly
63 public void close() throws IOException {
64 tearDown(); // TODO: unsure if needed.