]> git.somenet.org - pub/jan/dst18.git/blob - ass3-messaging/src/main/java/dst/ass3/messaging/impl/QueueManager.java
[3.1] refactored queues - removed manual bindings.
[pub/jan/dst18.git] / ass3-messaging / src / main / java / dst / ass3 / messaging / impl / QueueManager.java
1 package dst.ass3.messaging.impl;
2
3 import com.rabbitmq.client.Channel;
4 import com.rabbitmq.client.Connection;
5 import com.rabbitmq.client.ConnectionFactory;
6 import dst.ass3.messaging.Constants;
7 import dst.ass3.messaging.IQueueManager;
8
9 import java.io.IOException;
10 import java.util.concurrent.TimeoutException;
11
12 public class QueueManager implements IQueueManager {
13     @Override
14     public void setUp() {
15         ConnectionFactory factory = new ConnectionFactory();
16         factory.setHost(Constants.RMQ_HOST);
17         factory.setPort(Integer.parseInt(Constants.RMQ_PORT));
18         factory.setUsername(Constants.RMQ_USER);
19         factory.setPassword(Constants.RMQ_PASSWORD);
20
21         try {
22             Connection conn = factory.newConnection();
23             Channel channel = conn.createChannel();
24             channel.exchangeDeclare(Constants.TOPIC_EXCHANGE, "direct", true);
25
26             for (String qname : Constants.WORK_QUEUES) {
27                 channel.queueDeclare(qname, true, false, false, null);
28             }
29
30             channel.close();
31             conn.close();
32         } catch (IOException | TimeoutException e) {
33             throw new RuntimeException("Irrecoverable error", e); // Fail horribly
34         }
35     }
36
37     @Override
38     public void tearDown() {
39         ConnectionFactory factory = new ConnectionFactory();
40         factory.setHost(Constants.RMQ_HOST);
41         factory.setPort(Integer.parseInt(Constants.RMQ_PORT));
42         factory.setUsername(Constants.RMQ_USER);
43         factory.setPassword(Constants.RMQ_PASSWORD);
44
45         try {
46             Connection conn = factory.newConnection();
47             Channel channel = conn.createChannel();
48             channel.exchangeDelete(Constants.TOPIC_EXCHANGE);
49
50             for (String qname : Constants.WORK_QUEUES) {
51                 channel.queueDelete(qname);
52             }
53
54             channel.close();
55             conn.close();
56         } catch (IOException | TimeoutException e) {
57             throw new RuntimeException("Irrecoverable error", e); // Fail horribly
58         }
59     }
60
61     @Override
62     public void close() throws IOException {
63         tearDown(); // TODO: unsure if needed.
64     }
65 }