1 package dst.ass3.messaging.impl;
3 import com.rabbitmq.client.Channel;
4 import com.rabbitmq.client.Connection;
5 import com.rabbitmq.client.ConnectionFactory;
6 import dst.ass3.messaging.Constants;
7 import dst.ass3.messaging.IQueueManager;
9 import java.io.IOException;
10 import java.util.concurrent.TimeoutException;
12 public class QueueManager implements IQueueManager {
15 ConnectionFactory factory = new ConnectionFactory();
16 factory.setHost(Constants.RMQ_HOST);
17 factory.setPort(Integer.parseInt(Constants.RMQ_PORT));
18 factory.setUsername(Constants.RMQ_USER);
19 factory.setPassword(Constants.RMQ_PASSWORD);
22 Connection conn = factory.newConnection();
23 Channel channel = conn.createChannel();
24 channel.exchangeDeclare(Constants.TOPIC_EXCHANGE, "direct", true);
26 for (String qname : Constants.WORK_QUEUES) {
27 channel.queueDeclare(qname, true, false, false, null);
32 } catch (IOException | TimeoutException e) {
33 throw new RuntimeException("Irrecoverable error", e); // Fail horribly
38 public void tearDown() {
39 ConnectionFactory factory = new ConnectionFactory();
40 factory.setHost(Constants.RMQ_HOST);
41 factory.setPort(Integer.parseInt(Constants.RMQ_PORT));
42 factory.setUsername(Constants.RMQ_USER);
43 factory.setPassword(Constants.RMQ_PASSWORD);
46 Connection conn = factory.newConnection();
47 Channel channel = conn.createChannel();
48 channel.exchangeDelete(Constants.TOPIC_EXCHANGE);
50 for (String qname : Constants.WORK_QUEUES) {
51 channel.queueDelete(qname);
56 } catch (IOException | TimeoutException e) {
57 throw new RuntimeException("Irrecoverable error", e); // Fail horribly
62 public void close() throws IOException {
63 tearDown(); // TODO: unsure if needed.