]> git.somenet.org - pub/jan/sbc.git/blob - src/main/java/at/ac/tuwien/sbc/valesriegler/xvsm/LoadBalancerXVSM.java
[XVSM] Add presentation
[pub/jan/sbc.git] / src / main / java / at / ac / tuwien / sbc / valesriegler / xvsm / LoadBalancerXVSM.java
1 package at.ac.tuwien.sbc.valesriegler.xvsm;
2
3
4 import at.ac.tuwien.sbc.valesriegler.common.Util;
5 import at.ac.tuwien.sbc.valesriegler.types.DeliveryGroupData;
6 import at.ac.tuwien.sbc.valesriegler.types.DeliveryStatus;
7 import at.ac.tuwien.sbc.valesriegler.types.PizzaOrder;
8 import at.ac.tuwien.sbc.valesriegler.xvsm.loadbalancer.PizzeriaState;
9 import at.ac.tuwien.sbc.valesriegler.xvsm.loadbalancer.PizzeriaStatus;
10 import at.ac.tuwien.sbc.valesriegler.xvsm.spacehelpers.SpaceAction;
11 import org.mozartspaces.capi3.AnyCoordinator;
12 import org.mozartspaces.core.ContainerReference;
13 import org.mozartspaces.core.MzsConstants;
14 import org.mozartspaces.core.MzsCoreException;
15 import org.mozartspaces.core.TransactionReference;
16 import org.slf4j.Logger;
17 import org.slf4j.LoggerFactory;
18
19 import java.io.Serializable;
20 import java.net.URI;
21 import java.util.*;
22
23 public class LoadBalancerXVSM extends AbstractXVSMConnector {
24     private static final Logger log = LoggerFactory.getLogger(LoadBalancerXVSM.class);
25     private static final int FREQUENCY = 1500;
26
27     private int loadBalancerId;
28     private Set<Integer> pizzeriaIdentifiers;
29     private Map<Integer, PizzeriaState> pizzeriaStates = Collections.synchronizedMap(new HashMap<Integer, PizzeriaState>());
30
31     public LoadBalancerXVSM(int id, int port) {
32         super(port);
33
34         this.loadBalancerId = id;
35         this.groupAgentInfoContainer = useContainerOfSpaceWithPort(Util.GROUP_AGENT_INFO, Util.GROUP_AGENT_PORT);
36     }
37
38     /**
39      * The Load balancer listens for new pizzerias on the Group Agent Info container
40      */
41     public void listenForPizzerias() {
42         getDefaultBuilder("listenForPizzerias").setLookaround(true).setCref(groupAgentInfoContainer).setSpaceAction(new SpaceAction() {
43             @Override
44             public void onEntriesWritten(List<? extends Serializable> entries) throws Exception {
45
46                 final List<PizzeriaRegistration> pizzeriaRegistrations = castEntries(entries);
47                 for (PizzeriaRegistration registration : pizzeriaRegistrations) {
48                     final int pizzeriaId = registration.pizzeriaSpacePort;
49                     if (pizzeriaStates.get(pizzeriaId) == null) {
50                         final PizzeriaState pizzeriaState = new PizzeriaState(pizzeriaId);
51                         pizzeriaStates.put(pizzeriaId, pizzeriaState);
52                         listenToPizzeria(pizzeriaId);
53                     }
54                 }
55
56             }
57         }).createSpaceListenerImpl();
58     }
59
60     /**
61      * Whenever a new pizzeria emerges a timer is started which periodically checks the delivery load
62      * of the pizzeria and contemplates moving deliveries.
63      */
64     private void listenToPizzeria(int pizzeriaId) {
65         log.warn("Start listening to pizzeria {}", pizzeriaId);
66
67         Timer timer = new Timer();
68
69         timer.schedule(new LoadCheck(useContainerOfSpaceWithPort(Util.DELIVERY_ORDER_TAKEN, pizzeriaId), pizzeriaId), 100, FREQUENCY);
70
71     }
72
73     private class LoadCheck extends TimerTask {
74         public static final int DELIVERIES_NUMBER_THRESHOLD = 20;
75         public static final double DELIVERY_FACTOR_THRESHOLD = 0.2;
76         public static final int INTERVAL_FACTOR = 2;
77
78         private final ContainerReference container;
79         private final int pizzeriaId;
80
81         public LoadCheck(ContainerReference containerReference, int pizzeriaId) {
82             this.container = containerReference;
83             this.pizzeriaId = pizzeriaId;
84         }
85
86         @Override
87         public void run() {
88             log.info("Start running Check for pizzeriaId {}", pizzeriaId);
89             synchronized (pizzeriaStates) {
90                 try {
91                     List<DeliveryGroupData> deliveries = null;
92                     final PizzeriaState pizzeriaState = pizzeriaStates.get(pizzeriaId);
93                     try {
94                         deliveries = castEntries(capi.read(container, AnyCoordinator.newSelector(MzsConstants.Selecting.COUNT_MAX), MzsConstants.RequestTimeout.DEFAULT, null));
95                     } catch (Exception e) {
96                         log.warn("Exception while reading deliveries from Pizzeria {}", pizzeriaId);
97                         e.printStackTrace();
98                         pizzeriaState.setStatus(PizzeriaStatus.OFFLINE);
99                         return;
100                     }
101                     pizzeriaState.setStatus(PizzeriaStatus.ONLINE);
102
103                     final long lastUpdatedTimestamp = pizzeriaState.getLastUpdateTime();
104                     final long now = new Date().getTime();
105                     final long interval = now - lastUpdatedTimestamp;
106                     final int currentNumberDeliveries = deliveries.size();
107
108                     // there has to be a stored value for the pizzeria and it should be current enough
109                     if (lastUpdatedTimestamp != 0 && interval < FREQUENCY * INTERVAL_FACTOR && currentNumberDeliveries > DELIVERIES_NUMBER_THRESHOLD) {
110                         log.info("Pizzeria is handled...");
111                         final int id = getPizzeriaWithLessWaitingDeliveries(currentNumberDeliveries);
112                         if (id != 0) {
113                             final PizzeriaState otherPizzeria = pizzeriaStates.get(id);
114                             final int numberDeliveriesOtherPizzeria = otherPizzeria.getNumberDeliveries();
115
116                             log.info("This pizzeria has {} deliveries", currentNumberDeliveries);
117                             log.info("Other pizzeria {} has {} deliveries", id, numberDeliveriesOtherPizzeria);
118
119                             final int deliveryDifference = currentNumberDeliveries - (numberDeliveriesOtherPizzeria + currentNumberDeliveries) / INTERVAL_FACTOR;
120
121                             moveDeliveries(deliveryDifference, deliveries, pizzeriaId, id);
122
123                             log.info(String.format("Move %d deliveries from pizzeria %d to pizzeria %d", deliveryDifference, pizzeriaId, id));
124                         } else {
125                             log.info("No pizzeria with less deliveries found!");
126                         }
127                     }
128
129                     pizzeriaState.setNumberDeliveries(currentNumberDeliveries);
130                     pizzeriaState.setLastUpdateTime(now);
131
132
133                 } catch (Exception e) {
134                     e.printStackTrace();
135                 }
136             }
137         }
138
139         private void moveDeliveries(int deliveryDifference, List<DeliveryGroupData> deliveries, int pizzeriaId, int otherPizzeriaId) {
140             int movedDelivieries = 0;
141             final ContainerReference sourceOrderTakenContainer = useContainerOfSpaceWithPort(Util.DELIVERY_ORDER_TAKEN, pizzeriaId);
142             final ContainerReference sourceDeliveryGroupContainer = useContainerOfSpaceWithPort(Util.PIZZERIA_DELIVERY, pizzeriaId);
143             final ContainerReference sourcePizzaContainer = useContainerOfSpaceWithPort(Util.PREPARE_DELIVERY_PIZZAS, pizzeriaId);
144             final ContainerReference targetPhoneCallsContainer = useContainerOfSpaceWithPort(Util.PHONE_CALLS, otherPizzeriaId);
145             final ContainerReference targetDeliveryGroupContainer = useContainerOfSpaceWithPort(Util.PIZZERIA_DELIVERY, otherPizzeriaId);
146
147             /**
148              * Take deliveryDifference times a delivery group from the source containers and put them into
149              * the containers of the target pizzeria space
150              */
151             for (DeliveryGroupData delivery : deliveries) {
152                 if (movedDelivieries >= deliveryDifference) return;
153
154                 try {
155
156                     final TransactionReference tx = capi.createTransaction(
157                             Util.SPACE_TRANSACTION_TIMEOUT,
158                             URI.create(String.format(Util.SERVER_ADDR, pizzeriaId)));
159                     final String errorMsg = "Cannot move the delivery as I can't take it!";
160                     DeliveryGroupData group = null;
161                     try {
162                         // Take any delivery from the source OrderTakenContainer
163                         DeliveryGroupData template = new DeliveryGroupData();
164                         template.setDeliveryStatus(null);
165                         group = takeMatchingEntity(template, sourceOrderTakenContainer, tx, MzsConstants.RequestTimeout.DEFAULT, errorMsg);
166                     } catch (Exception e) {
167                         log.warn(e.getMessage());
168                         continue;
169                     }
170                     try {
171                         // Take the delivery with the right id from the source DeliveryGroupContainer
172                         final DeliveryGroupData template = new DeliveryGroupData();
173                         template.setDeliveryStatus(null);
174                         template.setId(group.getId());
175                         List<DeliveryGroupData> groups = takeMatchingEntities(template, sourceDeliveryGroupContainer, tx, MzsConstants.RequestTimeout.DEFAULT, "Cannot take from sourceDeliveryGroupContainer");
176                         group = groups.get(0);
177                     } catch (Exception e) {
178                         log.warn(e.getMessage());
179                         continue;
180                     }
181
182                     try {
183                         // Take the pizzas from the container
184                         final PizzaOrder pizza = new PizzaOrder();
185                         pizza.setOrderId(group.getOrder().getId());
186                         takeMatchingEntities(pizza, sourcePizzaContainer, tx, MzsConstants.RequestTimeout.DEFAULT, "Cannot take from sourcePizzaContainer");
187                     } catch (MzsCoreException e) {
188                         log.warn(e.getMessage());
189                         continue;
190                     }
191
192                     group.setOriginalPizzeriaId(String.valueOf(pizzeriaId));
193                     group.setPizzeriaId(String.valueOf(otherPizzeriaId));
194                     group.setWaiterIdOfOrder(null);
195                     group.setLoadBalancerId(loadBalancerId);
196                     group.setDeliveryStatus(DeliveryStatus.MOVED);
197
198                     // Inform the source pizzeria that the delivery was moved
199                     sendItemsToContainer(Arrays.asList(group), sourceDeliveryGroupContainer, MzsConstants.RequestTimeout.DEFAULT, tx);
200                     capi.commitTransaction(tx);
201
202                     final TransactionReference otherTx = capi.createTransaction(
203                             Util.SPACE_TRANSACTION_TIMEOUT,
204                             URI.create(String.format(Util.SERVER_ADDR, otherPizzeriaId)));
205
206                     group.setDeliveryStatus(DeliveryStatus.START);
207
208                     // Send the deliveries to the other pizzeria
209                     sendItemsToContainer(Arrays.asList(group), targetPhoneCallsContainer, MzsConstants.RequestTimeout.DEFAULT, otherTx);
210                     sendItemsToContainer(Arrays.asList(group), targetDeliveryGroupContainer, MzsConstants.RequestTimeout.DEFAULT, otherTx);
211
212                     movedDelivieries++;
213
214                     capi.commitTransaction(otherTx);
215
216
217                 } catch (NullPointerException e) {
218                     // strange npe from space
219                 } catch (Exception e) {
220                     log.warn("Exception in moveDeliveries: {}", e.getMessage());
221                     e.printStackTrace();
222                 }
223
224             }
225         }
226
227         /**
228          * Returns the port of a pizzeria for which it is true: (numberOfDeliveries+currentNumberDeliveries* DELIVERY_FACTOR_THRESHOLD)<currentNumberDeliveries.
229          * If there is no such pizzeria available the method return 0.
230          */
231         private int getPizzeriaWithLessWaitingDeliveries(int currentNumberDeliveries) {
232             final Collection<PizzeriaState> states = pizzeriaStates.values();
233             final long now = new Date().getTime();
234             for (PizzeriaState state : states) {
235                 final long lastUpdateTime = state.getLastUpdateTime();
236                 final long interval = now - lastUpdateTime;
237                 final boolean differenceDeliveries = (state.getNumberDeliveries() + currentNumberDeliveries * DELIVERY_FACTOR_THRESHOLD) < currentNumberDeliveries;
238                 if (differenceDeliveries && state.getStatus() != PizzeriaStatus.OFFLINE && lastUpdateTime != 0 && interval < FREQUENCY * INTERVAL_FACTOR) {
239                     return state.getId();
240                 }
241             }
242
243             return 0;
244         }
245     }
246 }