]> git.somenet.org - pub/jan/dst18.git/blob - ass3-messaging/src/main/java/dst/ass3/messaging/impl/QueueManager.java
[3.1] messaging. need refactoring?
[pub/jan/dst18.git] / ass3-messaging / src / main / java / dst / ass3 / messaging / impl / QueueManager.java
1 package dst.ass3.messaging.impl;
2
3 import com.rabbitmq.client.Channel;
4 import com.rabbitmq.client.Connection;
5 import com.rabbitmq.client.ConnectionFactory;
6 import dst.ass3.messaging.Constants;
7 import dst.ass3.messaging.IQueueManager;
8
9 import java.io.IOException;
10 import java.util.concurrent.TimeoutException;
11
12 public class QueueManager implements IQueueManager {
13     @Override
14     public void setUp() {
15         ConnectionFactory factory = new ConnectionFactory();
16         factory.setHost(Constants.RMQ_HOST);
17         factory.setPort(Integer.parseInt(Constants.RMQ_PORT));
18         factory.setUsername(Constants.RMQ_USER);
19         factory.setPassword(Constants.RMQ_PASSWORD);
20
21         try {
22             Connection conn = factory.newConnection();
23             Channel channel = conn.createChannel();
24             channel.exchangeDeclare(Constants.TOPIC_EXCHANGE, "direct", true);
25
26             for (String qname : Constants.WORK_QUEUES) {
27                 channel.queueDeclare(qname, true, false, false, null);
28             }
29
30             //some manual work needed: routing keys not loopable :(
31             channel.queueBind(Constants.QUEUE_DOCUMENT, Constants.TOPIC_EXCHANGE, Constants.ROUTING_KEY_DOCUMENT);
32             channel.queueBind(Constants.QUEUE_QUIZ, Constants.TOPIC_EXCHANGE, Constants.ROUTING_KEY_QUIZ);
33             channel.queueBind(Constants.QUEUE_VIDEO, Constants.TOPIC_EXCHANGE, Constants.ROUTING_KEY_VIDEO);
34
35             channel.close();
36             conn.close();
37         } catch (IOException | TimeoutException e) {
38             throw new RuntimeException("Irrecoverable error", e); // Fail horribly
39         }
40     }
41
42     @Override
43     public void tearDown() {
44         ConnectionFactory factory = new ConnectionFactory();
45         factory.setHost(Constants.RMQ_HOST);
46         factory.setPort(Integer.parseInt(Constants.RMQ_PORT));
47         factory.setUsername(Constants.RMQ_USER);
48         factory.setPassword(Constants.RMQ_PASSWORD);
49
50         try {
51             Connection conn = factory.newConnection();
52             Channel channel = conn.createChannel();
53             channel.exchangeDelete(Constants.TOPIC_EXCHANGE);
54
55             for (String qname : Constants.WORK_QUEUES) {
56                 channel.queueDelete(qname);
57             }
58
59             channel.close();
60             conn.close();
61         } catch (IOException | TimeoutException e) {
62             throw new RuntimeException("Irrecoverable error", e); // Fail horribly
63         }
64     }
65
66     @Override
67     public void close() throws IOException {
68         tearDown(); // TODO: unsure if needed.
69     }
70 }