]> git.somenet.org - pub/jan/dst18.git/blob - ass3-messaging/src/test/java/dst/ass3/messaging/impl/WorkloadMonitorTest.java
Add template for assignment 3
[pub/jan/dst18.git] / ass3-messaging / src / test / java / dst / ass3 / messaging / impl / WorkloadMonitorTest.java
1 package dst.ass3.messaging.impl;
2
3 import dst.ass3.messaging.*;
4 import org.junit.After;
5 import org.junit.Before;
6 import org.junit.Rule;
7 import org.junit.Test;
8 import org.junit.rules.Timeout;
9 import org.slf4j.Logger;
10 import org.slf4j.LoggerFactory;
11 import org.springframework.amqp.rabbit.listener.MessageListenerContainer;
12 import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
13 import org.springframework.context.Lifecycle;
14
15 import java.util.*;
16 import java.util.concurrent.ThreadLocalRandom;
17 import java.util.concurrent.TimeUnit;
18
19 import static dst.ass3.messaging.Constants.TOPIC_EXCHANGE;
20 import static dst.ass3.messaging.Constants.WORK_QUEUES;
21 import static org.hamcrest.CoreMatchers.*;
22 import static org.hamcrest.Matchers.greaterThanOrEqualTo;
23 import static org.junit.Assert.assertThat;
24
25 public class WorkloadMonitorTest {
26
27     private static final Logger LOG = LoggerFactory.getLogger(RequestGatewayTest.class);
28
29     @Rule
30     public RabbitResource rabbit = new RabbitResource();
31
32     @Rule
33     public Timeout timeout = new Timeout(30, TimeUnit.SECONDS);
34
35     private IMessagingFactory factory;
36     private IQueueManager queueManager;
37     private IRequestGateway requestGateway;
38     private IWorkloadMonitor workloadMonitor;
39
40     @Before
41     public void setUp() throws Exception {
42         factory = new MessagingFactory();
43         queueManager = factory.createQueueManager();
44         requestGateway = factory.createRequestGateway();
45
46         queueManager.setUp();
47
48         workloadMonitor = factory.createWorkloadMonitor();
49     }
50
51     @After
52     public void tearDown() throws Exception {
53         queueManager.tearDown();
54
55         requestGateway.close();
56         queueManager.close();
57         factory.close();
58     }
59
60     @Test
61     public void getRequestCount_returnsCorrectCount() throws Exception {
62         try {
63             Map<RequestType, Long> countForType = new HashMap<>();
64             for (RequestType type : RequestType.values()) {
65                 countForType.put(type, ThreadLocalRandom.current().nextLong(10, 20 + 1));
66                 for (long i = 0; i < countForType.get(type); i++) {
67                     UploadRequest request = new UploadRequest("id" + i, type,
68                             "urn:material:" + type.name().toLowerCase() + i);
69                     LOG.info("Sending request {}", request);
70                     requestGateway.uploadRequest(request);
71                 }
72             }
73
74             // wait for the messages to be processed by rabbit
75             Thread.sleep(2000);
76
77             assertThat(workloadMonitor.getRequestCount(), equalTo(countForType));
78         } finally {
79             workloadMonitor.close();
80         }
81     }
82
83     @Test
84     public void multipleWorkloadMonitors_uniqueQueueForEachMonitor() throws Exception {
85         try (IWorkloadMonitor workloadMonitor2 = factory.createWorkloadMonitor();
86              IWorkloadMonitor workloadMonitor3 = factory.createWorkloadMonitor();) {
87             long nonWorkQueues = rabbit.getManager().getQueues().stream().filter(q -> !Arrays.asList(WORK_QUEUES).contains(q.getName())).count();
88             assertThat(nonWorkQueues, greaterThanOrEqualTo(3L));
89         } finally {
90             workloadMonitor.close();
91         }
92     }
93
94     @Test
95     public void getAverageProcessingTime_correctAverageTime() throws Exception {
96         try {
97             Map<RequestType, Double> avgTimes = new HashMap<>();
98             for (RequestType type : RequestType.values()) {
99                 long count = ThreadLocalRandom.current().nextLong(15, 25);
100                 long typeTime = 0;
101                 for (long i = 0; i < count; i++) {
102                     long requestTime = ThreadLocalRandom.current().nextLong(1000, 20000 + 1);
103                     if (count - i <= 10) {
104                         typeTime += requestTime;
105                     }
106
107                     String body = String.format("{\"requestId\": \"%s\", \"processingTime\": \"%d\"}", UUID.randomUUID(), requestTime);
108                     LOG.info("Sending request {}", body);
109                     rabbit.getClient().convertAndSend(TOPIC_EXCHANGE, "requests." + type.toString().toLowerCase(), body);
110                 }
111                 avgTimes.put(type, ((double)typeTime / 10));
112             }
113
114             // wait for the messages to be processed by rabbit
115             Thread.sleep(2000);
116
117             assertThat(workloadMonitor.getAverageProcessingTime(), equalTo(avgTimes));
118         } finally {
119             workloadMonitor.close();
120         }
121     }
122
123     @Test
124     public void getWorkerCount_returnsCorrectCount() throws Exception {
125         try {
126             // spawn a random number of consumers
127             Map<RequestType, Collection<MessageListenerContainer>> consumersForType = new HashMap<>();
128             Map<RequestType, Long> consumerCountForType = new HashMap<>();
129             for (RequestType type : RequestType.values()) {
130                 List<MessageListenerContainer> consumers = new ArrayList<>();
131                 consumersForType.put(type, consumers);
132                 consumerCountForType.put(type, ThreadLocalRandom.current().nextLong(10, 20 + 1));
133                 for (long i = 0; i < consumerCountForType.get(type); i++) {
134                     consumers.add(spawnConsumer("dst." + type.toString().toLowerCase()));
135                 }
136             }
137
138             // wait for rabbit to get to know the new consumers
139             Thread.sleep(2000);
140
141             Map<RequestType, Long> workerCount = workloadMonitor.getWorkerCount();
142
143             // stop all consumers
144             consumersForType.entrySet().stream().map(Map.Entry::getValue).flatMap(Collection::stream).forEach(Lifecycle::stop);
145
146             assertThat(workerCount, equalTo(consumerCountForType));
147         } finally {
148             workloadMonitor.close();
149         }
150     }
151
152     private MessageListenerContainer spawnConsumer(String queue) {
153         SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(rabbit.getConnectionFactory());
154         container.addQueueNames(queue);
155         container.start();
156         return container;
157     }
158
159     @Test
160     public void close_removesQueues() throws Exception {
161         workloadMonitor.close();
162
163         List<org.springframework.amqp.core.Queue> queues = rabbit.getManager().getQueues();
164         long nonWorkQueues = rabbit.getManager().getQueues().stream().filter(q -> !Arrays.asList(WORK_QUEUES).contains(q.getName())).count();
165         assertThat(nonWorkQueues, is(0L));
166     }
167 }