1 package dst.ass3.messaging.impl;
3 import dst.ass3.messaging.*;
4 import org.junit.After;
5 import org.junit.Before;
8 import org.junit.rules.Timeout;
9 import org.slf4j.Logger;
10 import org.slf4j.LoggerFactory;
11 import org.springframework.amqp.rabbit.listener.MessageListenerContainer;
12 import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
13 import org.springframework.context.Lifecycle;
16 import java.util.concurrent.ThreadLocalRandom;
17 import java.util.concurrent.TimeUnit;
19 import static dst.ass3.messaging.Constants.TOPIC_EXCHANGE;
20 import static dst.ass3.messaging.Constants.WORK_QUEUES;
21 import static org.hamcrest.CoreMatchers.*;
22 import static org.hamcrest.Matchers.greaterThanOrEqualTo;
23 import static org.junit.Assert.assertThat;
25 public class WorkloadMonitorTest {
27 private static final Logger LOG = LoggerFactory.getLogger(RequestGatewayTest.class);
30 public RabbitResource rabbit = new RabbitResource();
33 public Timeout timeout = new Timeout(30, TimeUnit.SECONDS);
35 private IMessagingFactory factory;
36 private IQueueManager queueManager;
37 private IRequestGateway requestGateway;
38 private IWorkloadMonitor workloadMonitor;
41 public void setUp() throws Exception {
42 factory = new MessagingFactory();
43 queueManager = factory.createQueueManager();
44 requestGateway = factory.createRequestGateway();
48 workloadMonitor = factory.createWorkloadMonitor();
52 public void tearDown() throws Exception {
53 queueManager.tearDown();
55 requestGateway.close();
61 public void getRequestCount_returnsCorrectCount() throws Exception {
63 Map<RequestType, Long> countForType = new HashMap<>();
64 for (RequestType type : RequestType.values()) {
65 countForType.put(type, ThreadLocalRandom.current().nextLong(10, 20 + 1));
66 for (long i = 0; i < countForType.get(type); i++) {
67 UploadRequest request = new UploadRequest("id" + i, type,
68 "urn:material:" + type.name().toLowerCase() + i);
69 LOG.info("Sending request {}", request);
70 requestGateway.uploadRequest(request);
74 // wait for the messages to be processed by rabbit
77 assertThat(workloadMonitor.getRequestCount(), equalTo(countForType));
79 workloadMonitor.close();
84 public void multipleWorkloadMonitors_uniqueQueueForEachMonitor() throws Exception {
85 try (IWorkloadMonitor workloadMonitor2 = factory.createWorkloadMonitor();
86 IWorkloadMonitor workloadMonitor3 = factory.createWorkloadMonitor();) {
87 long nonWorkQueues = rabbit.getManager().getQueues().stream().filter(q -> !Arrays.asList(WORK_QUEUES).contains(q.getName())).count();
88 assertThat(nonWorkQueues, greaterThanOrEqualTo(3L));
90 workloadMonitor.close();
95 public void getAverageProcessingTime_correctAverageTime() throws Exception {
97 Map<RequestType, Double> avgTimes = new HashMap<>();
98 for (RequestType type : RequestType.values()) {
99 long count = ThreadLocalRandom.current().nextLong(15, 25);
101 for (long i = 0; i < count; i++) {
102 long requestTime = ThreadLocalRandom.current().nextLong(1000, 20000 + 1);
103 if (count - i <= 10) {
104 typeTime += requestTime;
107 String body = String.format("{\"requestId\": \"%s\", \"processingTime\": \"%d\"}", UUID.randomUUID(), requestTime);
108 LOG.info("Sending request {}", body);
109 rabbit.getClient().convertAndSend(TOPIC_EXCHANGE, "requests." + type.toString().toLowerCase(), body);
111 avgTimes.put(type, ((double)typeTime / 10));
114 // wait for the messages to be processed by rabbit
117 assertThat(workloadMonitor.getAverageProcessingTime(), equalTo(avgTimes));
119 workloadMonitor.close();
124 public void getWorkerCount_returnsCorrectCount() throws Exception {
126 // spawn a random number of consumers
127 Map<RequestType, Collection<MessageListenerContainer>> consumersForType = new HashMap<>();
128 Map<RequestType, Long> consumerCountForType = new HashMap<>();
129 for (RequestType type : RequestType.values()) {
130 List<MessageListenerContainer> consumers = new ArrayList<>();
131 consumersForType.put(type, consumers);
132 consumerCountForType.put(type, ThreadLocalRandom.current().nextLong(10, 20 + 1));
133 for (long i = 0; i < consumerCountForType.get(type); i++) {
134 consumers.add(spawnConsumer("dst." + type.toString().toLowerCase()));
138 // wait for rabbit to get to know the new consumers
141 Map<RequestType, Long> workerCount = workloadMonitor.getWorkerCount();
143 // stop all consumers
144 consumersForType.entrySet().stream().map(Map.Entry::getValue).flatMap(Collection::stream).forEach(Lifecycle::stop);
146 assertThat(workerCount, equalTo(consumerCountForType));
148 workloadMonitor.close();
152 private MessageListenerContainer spawnConsumer(String queue) {
153 SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(rabbit.getConnectionFactory());
154 container.addQueueNames(queue);
160 public void close_removesQueues() throws Exception {
161 workloadMonitor.close();
163 List<org.springframework.amqp.core.Queue> queues = rabbit.getManager().getQueues();
164 long nonWorkQueues = rabbit.getManager().getQueues().stream().filter(q -> !Arrays.asList(WORK_QUEUES).contains(q.getName())).count();
165 assertThat(nonWorkQueues, is(0L));