]> git.somenet.org - pub/jan/dst18.git/blob - ass3-messaging/src/main/java/dst/ass3/messaging/impl/QueueManager.java
GITOLITE.txt
[pub/jan/dst18.git] / ass3-messaging / src / main / java / dst / ass3 / messaging / impl / QueueManager.java
1 package dst.ass3.messaging.impl;
2
3 import com.rabbitmq.client.BuiltinExchangeType;
4 import com.rabbitmq.client.Channel;
5 import com.rabbitmq.client.Connection;
6 import com.rabbitmq.client.ConnectionFactory;
7 import dst.ass3.messaging.Constants;
8 import dst.ass3.messaging.IQueueManager;
9
10 import java.io.IOException;
11 import java.util.concurrent.TimeoutException;
12
13 public class QueueManager implements IQueueManager {
14     @Override
15     public void setUp() {
16         ConnectionFactory factory = new ConnectionFactory();
17         factory.setHost(Constants.RMQ_HOST);
18         factory.setPort(Integer.parseInt(Constants.RMQ_PORT));
19         factory.setUsername(Constants.RMQ_USER);
20         factory.setPassword(Constants.RMQ_PASSWORD);
21
22         try {
23             Connection conn = factory.newConnection();
24             Channel channel = conn.createChannel();
25             channel.exchangeDeclare(Constants.TOPIC_EXCHANGE, BuiltinExchangeType.TOPIC, true);
26
27             for (String qname : Constants.WORK_QUEUES) {
28                 channel.queueDeclare(qname, true, false, false, null);
29             }
30
31             channel.close();
32             conn.close();
33         } catch (IOException | TimeoutException e) {
34             throw new RuntimeException("Irrecoverable error", e); // Fail horribly
35         }
36     }
37
38     @Override
39     public void tearDown() {
40         ConnectionFactory factory = new ConnectionFactory();
41         factory.setHost(Constants.RMQ_HOST);
42         factory.setPort(Integer.parseInt(Constants.RMQ_PORT));
43         factory.setUsername(Constants.RMQ_USER);
44         factory.setPassword(Constants.RMQ_PASSWORD);
45
46         try {
47             Connection conn = factory.newConnection();
48             Channel channel = conn.createChannel();
49             channel.exchangeDelete(Constants.TOPIC_EXCHANGE);
50
51             for (String qname : Constants.WORK_QUEUES) {
52                 channel.queueDelete(qname);
53             }
54
55             channel.close();
56             conn.close();
57         } catch (IOException | TimeoutException e) {
58             throw new RuntimeException("Irrecoverable error", e); // Fail horribly
59         }
60     }
61
62     @Override
63     public void close() throws IOException {
64         tearDown(); // TODO: unsure if needed.
65     }
66 }