]> git.somenet.org - pub/jan/sbc.git/blob - src/main/java/at/ac/tuwien/sbc/valesriegler/xvsm/LoadBalancerXVSM.java
[XVSM] LoadBalancer implemented
[pub/jan/sbc.git] / src / main / java / at / ac / tuwien / sbc / valesriegler / xvsm / LoadBalancerXVSM.java
1 package at.ac.tuwien.sbc.valesriegler.xvsm;
2
3
4 import at.ac.tuwien.sbc.valesriegler.common.Util;
5 import at.ac.tuwien.sbc.valesriegler.types.DeliveryGroupData;
6 import at.ac.tuwien.sbc.valesriegler.types.DeliveryStatus;
7 import at.ac.tuwien.sbc.valesriegler.xvsm.loadbalancer.PizzeriaState;
8 import at.ac.tuwien.sbc.valesriegler.xvsm.loadbalancer.PizzeriaStatus;
9 import at.ac.tuwien.sbc.valesriegler.xvsm.spacehelpers.SpaceAction;
10 import org.mozartspaces.capi3.AnyCoordinator;
11 import org.mozartspaces.core.ContainerReference;
12 import org.mozartspaces.core.MzsConstants;
13 import org.mozartspaces.core.TransactionReference;
14 import org.slf4j.Logger;
15 import org.slf4j.LoggerFactory;
16
17 import java.io.Serializable;
18 import java.net.URI;
19 import java.util.*;
20
21 public class LoadBalancerXVSM extends AbstractXVSMConnector {
22     private static final Logger log = LoggerFactory.getLogger(LoadBalancerXVSM.class);
23     private static final int FREQUENCY = 1500;
24
25     private int loadBalancerId;
26     private Set<Integer> pizzeriaIdentifiers;
27     private Map<Integer, PizzeriaState> pizzeriaStates = Collections.synchronizedMap(new HashMap<Integer, PizzeriaState>());
28
29     public LoadBalancerXVSM(int id, int port) {
30         super(port);
31
32         this.loadBalancerId = id;
33         this.groupAgentInfoContainer = useContainerOfSpaceWithPort(Util.GROUP_AGENT_INFO, Util.GROUP_AGENT_PORT);
34     }
35
36     /**
37      * The Load balancer listens for new pizzerias on the Group Agent Info container
38      */
39     public void listenForPizzerias() {
40         getDefaultBuilder("listenForPizzerias").setLookaround(true).setCref(groupAgentInfoContainer).setSpaceAction(new SpaceAction() {
41             @Override
42             public void onEntriesWritten(List<? extends Serializable> entries) throws Exception {
43
44                 final List<PizzeriaRegistration> pizzeriaRegistrations = castEntries(entries);
45                 for (PizzeriaRegistration registration : pizzeriaRegistrations) {
46                     final int pizzeriaId = registration.pizzeriaSpacePort;
47                     if (pizzeriaStates.get(pizzeriaId) == null) {
48                         final PizzeriaState pizzeriaState = new PizzeriaState(pizzeriaId);
49                         pizzeriaStates.put(pizzeriaId, pizzeriaState);
50                         listenToPizzeria(pizzeriaId);
51                     }
52                 }
53
54             }
55         }).createSpaceListenerImpl();
56     }
57
58     /**
59      * Whenever a new pizzeria emerges a timer is started which periodically checks the delivery load
60      * of the pizzeria and contemplates moving deliveries.
61      */
62     private void listenToPizzeria(int pizzeriaId) {
63         log.warn("Start listening to pizzeria {}", pizzeriaId);
64
65         Timer timer = new Timer();
66
67         timer.schedule(new LoadCheck(useContainerOfSpaceWithPort(Util.DELIVERY_ORDER_TAKEN, pizzeriaId), pizzeriaId), 100, FREQUENCY);
68
69     }
70
71     private class LoadCheck extends TimerTask {
72         public static final int DELIVERIES_NUMBER_THRESHOLD = 20;
73         public static final double DELIVERY_FACTOR_THRESHOLD = 0.2;
74         public static final int INTERVAL_FACTOR = 2;
75
76         private final ContainerReference container;
77         private final int pizzeriaId;
78
79         public LoadCheck(ContainerReference containerReference, int pizzeriaId) {
80             this.container = containerReference;
81             this.pizzeriaId = pizzeriaId;
82         }
83
84         @Override
85         public void run() {
86             log.info("Start running Check for pizzeriaId {}", pizzeriaId);
87             synchronized (pizzeriaStates) {
88                 try {
89                     List<DeliveryGroupData> deliveries = null;
90                     final PizzeriaState pizzeriaState = pizzeriaStates.get(pizzeriaId);
91                     try {
92                         deliveries = castEntries(capi.read(container, AnyCoordinator.newSelector(MzsConstants.Selecting.COUNT_MAX), MzsConstants.RequestTimeout.DEFAULT, null));
93                     } catch (Exception e) {
94                         log.warn("Exception while reading deliveries from Pizzeria {}", pizzeriaId);
95                         e.printStackTrace();
96                         pizzeriaState.setStatus(PizzeriaStatus.OFFLINE);
97                         return;
98                     }
99                     pizzeriaState.setStatus(PizzeriaStatus.ONLINE);
100
101                     final long lastUpdatedTimestamp = pizzeriaState.getLastUpdateTime();
102                     final long now = new Date().getTime();
103                     final long interval = now - lastUpdatedTimestamp;
104                     final int currentNumberDeliveries = deliveries.size();
105
106                     // there has to be a stored value for the pizzeria and it should be current enough
107                     if (lastUpdatedTimestamp != 0 && interval < FREQUENCY * INTERVAL_FACTOR && currentNumberDeliveries > DELIVERIES_NUMBER_THRESHOLD) {
108                         log.info("Pizzeria is handled...");
109                         final int id = getPizzeriaWithLessWaitingDeliveries(currentNumberDeliveries);
110                         if (id != 0) {
111                             final PizzeriaState otherPizzeria = pizzeriaStates.get(id);
112                             final int numberDeliveriesOtherPizzeria = otherPizzeria.getNumberDeliveries();
113
114                             log.info("This pizzeria has {} deliveries", currentNumberDeliveries);
115                             log.info("Other pizzeria {} has {} deliveries", id, numberDeliveriesOtherPizzeria);
116
117                             final int deliveryDifference = currentNumberDeliveries - (numberDeliveriesOtherPizzeria + currentNumberDeliveries) / INTERVAL_FACTOR;
118
119                             moveDeliveries(deliveryDifference, deliveries, pizzeriaId, id);
120
121                             log.info(String.format("Move %d deliveries from pizzeria %d to pizzeria %d", deliveryDifference, pizzeriaId, id));
122                         } else {
123                             log.info("No pizzeria with less deliveries found!");
124                         }
125                     }
126
127                     pizzeriaState.setNumberDeliveries(currentNumberDeliveries);
128                     pizzeriaState.setLastUpdateTime(now);
129
130
131                 } catch (Exception e) {
132                     e.printStackTrace();
133                 }
134             }
135         }
136
137         private void moveDeliveries(int deliveryDifference, List<DeliveryGroupData> deliveries, int pizzeriaId, int otherPizzeriaId) {
138             int movedDelivieries = 0;
139             final ContainerReference sourceOrderTakenContainer = useContainerOfSpaceWithPort(Util.DELIVERY_ORDER_TAKEN, pizzeriaId);
140             final ContainerReference sourceDeliveryGroupContainer = useContainerOfSpaceWithPort(Util.PIZZERIA_DELIVERY, pizzeriaId);
141             final ContainerReference targetPhoneCallsContainer = useContainerOfSpaceWithPort(Util.PHONE_CALLS, otherPizzeriaId);
142             final ContainerReference targetDeliveryGroupContainer = useContainerOfSpaceWithPort(Util.PIZZERIA_DELIVERY, otherPizzeriaId);
143
144             /**
145              * Take deliveryDifference times a delivery group from the source containers and put them into
146              * the containers of the target pizzeria space
147              */
148             for (DeliveryGroupData delivery : deliveries) {
149                 if (movedDelivieries >= deliveryDifference) return;
150
151                 try {
152
153                     final TransactionReference tx = capi.createTransaction(
154                             Util.SPACE_TRANSACTION_TIMEOUT,
155                             URI.create(String.format(Util.SERVER_ADDR, pizzeriaId)));
156                     final String errorMsg = "Cannot move the delivery as I can't take it!";
157                     DeliveryGroupData group = null;
158                     try {
159                         // Take any delivery from the source OrderTakenContainer
160                         DeliveryGroupData template = new DeliveryGroupData();
161                         template.setDeliveryStatus(null);
162                         group = takeMatchingEntity(template, sourceOrderTakenContainer, tx, MzsConstants.RequestTimeout.DEFAULT, errorMsg);
163                     } catch (Exception e) {
164                         log.warn(e.getMessage());
165                         continue;
166                     }
167                     try {
168                         // Take the delivery with the right id from the source DeliveryGroupContainer
169                         final DeliveryGroupData template = new DeliveryGroupData();
170                         template.setDeliveryStatus(null);
171                         template.setId(group.getId());
172                         List<DeliveryGroupData> groups = takeMatchingEntities(template, sourceDeliveryGroupContainer, tx, MzsConstants.RequestTimeout.DEFAULT, "Cannot take from sourceDeliveryGroupContainer");
173                         group = groups.get(0);
174                     } catch (Exception e) {
175                         log.warn(e.getMessage());
176                         continue;
177                     }
178                     group.setOriginalPizzeriaId(String.valueOf(pizzeriaId));
179                     group.setPizzeriaId(String.valueOf(otherPizzeriaId));
180                     group.setWaiterIdOfOrder(null);
181                     group.setLoadBalancerId(loadBalancerId);
182                     group.setDeliveryStatus(DeliveryStatus.MOVED);
183
184                     // Inform the source pizzeria that the delivery was moved
185                     sendItemsToContainer(Arrays.asList(group), sourceDeliveryGroupContainer, MzsConstants.RequestTimeout.DEFAULT, tx);
186                     capi.commitTransaction(tx);
187
188                     final TransactionReference otherTx = capi.createTransaction(
189                             Util.SPACE_TRANSACTION_TIMEOUT,
190                             URI.create(String.format(Util.SERVER_ADDR, otherPizzeriaId)));
191
192                     group.setDeliveryStatus(DeliveryStatus.START);
193
194                     // Send the deliveries to the other pizzeria
195                     sendItemsToContainer(Arrays.asList(group), targetPhoneCallsContainer, MzsConstants.RequestTimeout.DEFAULT, otherTx);
196                     sendItemsToContainer(Arrays.asList(group), targetDeliveryGroupContainer, MzsConstants.RequestTimeout.DEFAULT, otherTx);
197
198                     movedDelivieries++;
199
200                     capi.commitTransaction(otherTx);
201
202
203                 } catch (NullPointerException e) {
204                     // strange npe from space
205                 } catch (Exception e) {
206                     log.warn("Exception in moveDeliveries: {}", e.getMessage());
207                     e.printStackTrace();
208                 }
209
210             }
211         }
212
213         /**
214          * Returns the port of a pizzeria for which it is true: (numberOfDeliveries+currentNumberDeliveries* DELIVERY_FACTOR_THRESHOLD)<currentNumberDeliveries.
215          * If there is no such pizzeria available the method return 0.
216          */
217         private int getPizzeriaWithLessWaitingDeliveries(int currentNumberDeliveries) {
218             final Collection<PizzeriaState> states = pizzeriaStates.values();
219             final long now = new Date().getTime();
220             for (PizzeriaState state : states) {
221                 final long lastUpdateTime = state.getLastUpdateTime();
222                 final long interval = now - lastUpdateTime;
223                 final boolean differenceDeliveries = (state.getNumberDeliveries() + currentNumberDeliveries * DELIVERY_FACTOR_THRESHOLD) < currentNumberDeliveries;
224                 if (differenceDeliveries && state.getStatus() != PizzeriaStatus.OFFLINE && lastUpdateTime != 0 && interval < FREQUENCY * INTERVAL_FACTOR) {
225                     return state.getId();
226                 }
227             }
228
229             return 0;
230         }
231     }
232 }