package dst.ass3.messaging.impl;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.rabbitmq.client.*;
import com.rabbitmq.http.client.Client;
import dst.ass3.messaging.Constants;
import dst.ass3.messaging.IWorkloadMonitor;
import dst.ass3.messaging.RequestType;
import dst.ass3.messaging.WorkerResponse;

import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URISyntaxException;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.Map;
import java.util.concurrent.TimeoutException;


public class WorkloadMonitor implements IWorkloadMonitor {
    Connection conn;
    Channel channel;
    String monitorQName;
    Consumer consumer;
    HashMap<String, LinkedList<WorkerResponse>> workerRespMap = new HashMap<>();

    public WorkloadMonitor() {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(Constants.RMQ_HOST);
        factory.setPort(Integer.parseInt(Constants.RMQ_PORT));
        factory.setUsername(Constants.RMQ_USER);
        factory.setPassword(Constants.RMQ_PASSWORD);

        try {
            conn = factory.newConnection();
            channel = conn.createChannel();
            monitorQName = channel.queueDeclare().getQueue();
            channel.queueBind(monitorQName, Constants.TOPIC_EXCHANGE, "requests.*");

            consumer = new DefaultConsumer(channel) {
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    ObjectMapper mapper = new ObjectMapper();
                    String message = new String(body, "UTF-8");
                    WorkerResponse resp = mapper.readValue(message, WorkerResponse.class);

                    LinkedList<WorkerResponse> msgList;
                    if (!workerRespMap.containsKey(envelope.getRoutingKey())) {
                        msgList = new LinkedList<WorkerResponse>();
                    } else {
                        msgList = workerRespMap.get(envelope.getRoutingKey());
                    }
                    msgList.addFirst(resp);
                    while (msgList.size() > 10) {
                        msgList.removeLast();
                    }
                    workerRespMap.put(envelope.getRoutingKey(), msgList);
                }
            };
            channel.basicConsume(monitorQName, true, consumer);


        } catch (IOException | TimeoutException e) {
            throw new RuntimeException("Irrecoverable error", e); // Fail horribly
        }
    }

    @Override
    public Map<RequestType, Long> getRequestCount() {
        HashMap<RequestType, Long> ret = new HashMap<>();
        try {
            Client c = new Client(Constants.RMQ_API_URL, Constants.RMQ_USER, Constants.RMQ_PASSWORD);
            ret.put(RequestType.DOCUMENT, c.getQueue(Constants.RMQ_VHOST, Constants.QUEUE_DOCUMENT).getMessagesReady());
            ret.put(RequestType.QUIZ, c.getQueue(Constants.RMQ_VHOST, Constants.QUEUE_QUIZ).getMessagesReady());
            ret.put(RequestType.VIDEO, c.getQueue(Constants.RMQ_VHOST, Constants.QUEUE_VIDEO).getMessagesReady());
        } catch (MalformedURLException | URISyntaxException e) {
            throw new RuntimeException("Irrecoverable error", e); // Fail horribly
        }
        return ret;
    }

    @Override
    public Map<RequestType, Long> getWorkerCount() {
        HashMap<RequestType, Long> ret = new HashMap<>();
        try {
            Client c = new Client(Constants.RMQ_API_URL, Constants.RMQ_USER, Constants.RMQ_PASSWORD);
            ret.put(RequestType.DOCUMENT, c.getQueue(Constants.RMQ_VHOST, Constants.QUEUE_DOCUMENT).getConsumerCount());
            ret.put(RequestType.QUIZ, c.getQueue(Constants.RMQ_VHOST, Constants.QUEUE_QUIZ).getConsumerCount());
            ret.put(RequestType.VIDEO, c.getQueue(Constants.RMQ_VHOST, Constants.QUEUE_VIDEO).getConsumerCount());
        } catch (MalformedURLException | URISyntaxException e) {
            throw new RuntimeException("Irrecoverable error", e); // Fail horribly
        }
        return ret;
    }

    @Override
    public Map<RequestType, Double> getAverageProcessingTime() {
        HashMap<RequestType, Double> ret = new HashMap<>();
        ret.put(RequestType.DOCUMENT, calcAverageProcessingTimePerRoutingKey(Constants.ROUTING_KEY_DOCUMENT));
        ret.put(RequestType.QUIZ, calcAverageProcessingTimePerRoutingKey(Constants.ROUTING_KEY_QUIZ));
        ret.put(RequestType.VIDEO, calcAverageProcessingTimePerRoutingKey(Constants.ROUTING_KEY_VIDEO));
        return ret;
    }

    public Double calcAverageProcessingTimePerRoutingKey(String routingKey) {
        if (workerRespMap.containsKey(routingKey)) {
            long ptime = 0;
            for (WorkerResponse resp : workerRespMap.get(routingKey)) {
                ptime += resp.getProcessingTime();
            }
            return (double) ptime / (workerRespMap.get(routingKey)).size();
        }
        return 0.0;
    }

    @Override
    public void close() throws IOException {
        channel.queueDelete(monitorQName);
        try {
            channel.close();
        } catch (TimeoutException ignored) {
        }
        conn.close();
    }
}
