1 package at.ac.tuwien.sbc.valesriegler.xvsm;
4 import at.ac.tuwien.sbc.valesriegler.common.Util;
5 import at.ac.tuwien.sbc.valesriegler.types.DeliveryGroupData;
6 import at.ac.tuwien.sbc.valesriegler.types.DeliveryStatus;
7 import at.ac.tuwien.sbc.valesriegler.types.PizzaOrder;
8 import at.ac.tuwien.sbc.valesriegler.xvsm.loadbalancer.PizzeriaState;
9 import at.ac.tuwien.sbc.valesriegler.xvsm.loadbalancer.PizzeriaStatus;
10 import at.ac.tuwien.sbc.valesriegler.xvsm.spacehelpers.SpaceAction;
11 import org.mozartspaces.capi3.AnyCoordinator;
12 import org.mozartspaces.core.ContainerReference;
13 import org.mozartspaces.core.MzsConstants;
14 import org.mozartspaces.core.MzsCoreException;
15 import org.mozartspaces.core.TransactionReference;
16 import org.slf4j.Logger;
17 import org.slf4j.LoggerFactory;
19 import java.io.Serializable;
23 public class LoadBalancerXVSM extends AbstractXVSMConnector {
24 private static final Logger log = LoggerFactory.getLogger(LoadBalancerXVSM.class);
25 private static final int FREQUENCY = 1500;
27 private int loadBalancerId;
28 private Set<Integer> pizzeriaIdentifiers;
29 private Map<Integer, PizzeriaState> pizzeriaStates = Collections.synchronizedMap(new HashMap<Integer, PizzeriaState>());
31 public LoadBalancerXVSM(int id, int port) {
34 this.loadBalancerId = id;
35 this.groupAgentInfoContainer = useContainerOfSpaceWithPort(Util.GROUP_AGENT_INFO, Util.GROUP_AGENT_PORT);
39 * The Load balancer listens for new pizzerias on the Group Agent Info container
41 public void listenForPizzerias() {
42 getDefaultBuilder("listenForPizzerias").setLookaround(true).setCref(groupAgentInfoContainer).setSpaceAction(new SpaceAction() {
44 public void onEntriesWritten(List<? extends Serializable> entries) throws Exception {
46 final List<PizzeriaRegistration> pizzeriaRegistrations = castEntries(entries);
47 for (PizzeriaRegistration registration : pizzeriaRegistrations) {
48 final int pizzeriaId = registration.pizzeriaSpacePort;
49 if (pizzeriaStates.get(pizzeriaId) == null) {
50 final PizzeriaState pizzeriaState = new PizzeriaState(pizzeriaId);
51 pizzeriaStates.put(pizzeriaId, pizzeriaState);
52 listenToPizzeria(pizzeriaId);
57 }).createSpaceListenerImpl();
61 * Whenever a new pizzeria emerges a timer is started which periodically checks the delivery load
62 * of the pizzeria and contemplates moving deliveries.
64 private void listenToPizzeria(int pizzeriaId) {
65 log.warn("Start listening to pizzeria {}", pizzeriaId);
67 Timer timer = new Timer();
69 timer.schedule(new LoadCheck(useContainerOfSpaceWithPort(Util.DELIVERY_ORDER_TAKEN, pizzeriaId), pizzeriaId), 100, FREQUENCY);
73 private class LoadCheck extends TimerTask {
74 public static final int DELIVERIES_NUMBER_THRESHOLD = 20;
75 public static final double DELIVERY_FACTOR_THRESHOLD = 0.2;
76 public static final int INTERVAL_FACTOR = 2;
78 private final ContainerReference container;
79 private final int pizzeriaId;
81 public LoadCheck(ContainerReference containerReference, int pizzeriaId) {
82 this.container = containerReference;
83 this.pizzeriaId = pizzeriaId;
88 log.info("Start running Check for pizzeriaId {}", pizzeriaId);
89 synchronized (pizzeriaStates) {
91 List<DeliveryGroupData> deliveries = null;
92 final PizzeriaState pizzeriaState = pizzeriaStates.get(pizzeriaId);
94 deliveries = castEntries(capi.read(container, AnyCoordinator.newSelector(MzsConstants.Selecting.COUNT_MAX), MzsConstants.RequestTimeout.DEFAULT, null));
95 } catch (Exception e) {
96 log.warn("Exception while reading deliveries from Pizzeria {}", pizzeriaId);
98 pizzeriaState.setStatus(PizzeriaStatus.OFFLINE);
101 pizzeriaState.setStatus(PizzeriaStatus.ONLINE);
103 final long lastUpdatedTimestamp = pizzeriaState.getLastUpdateTime();
104 final long now = new Date().getTime();
105 final long interval = now - lastUpdatedTimestamp;
106 final int currentNumberDeliveries = deliveries.size();
108 // there has to be a stored value for the pizzeria and it should be current enough
109 if (lastUpdatedTimestamp != 0 && interval < FREQUENCY * INTERVAL_FACTOR && currentNumberDeliveries > DELIVERIES_NUMBER_THRESHOLD) {
110 log.info("Pizzeria is handled...");
111 final int id = getPizzeriaWithLessWaitingDeliveries(currentNumberDeliveries);
113 final PizzeriaState otherPizzeria = pizzeriaStates.get(id);
114 final int numberDeliveriesOtherPizzeria = otherPizzeria.getNumberDeliveries();
116 log.info("This pizzeria has {} deliveries", currentNumberDeliveries);
117 log.info("Other pizzeria {} has {} deliveries", id, numberDeliveriesOtherPizzeria);
119 final int deliveryDifference = currentNumberDeliveries - (numberDeliveriesOtherPizzeria + currentNumberDeliveries) / INTERVAL_FACTOR;
121 moveDeliveries(deliveryDifference, deliveries, pizzeriaId, id);
123 log.info(String.format("Move %d deliveries from pizzeria %d to pizzeria %d", deliveryDifference, pizzeriaId, id));
125 log.info("No pizzeria with less deliveries found!");
129 pizzeriaState.setNumberDeliveries(currentNumberDeliveries);
130 pizzeriaState.setLastUpdateTime(now);
133 } catch (Exception e) {
139 private void moveDeliveries(int deliveryDifference, List<DeliveryGroupData> deliveries, int pizzeriaId, int otherPizzeriaId) {
140 int movedDelivieries = 0;
141 final ContainerReference sourceOrderTakenContainer = useContainerOfSpaceWithPort(Util.DELIVERY_ORDER_TAKEN, pizzeriaId);
142 final ContainerReference sourceDeliveryGroupContainer = useContainerOfSpaceWithPort(Util.PIZZERIA_DELIVERY, pizzeriaId);
143 final ContainerReference sourcePizzaContainer = useContainerOfSpaceWithPort(Util.PREPARE_DELIVERY_PIZZAS, pizzeriaId);
144 final ContainerReference targetPhoneCallsContainer = useContainerOfSpaceWithPort(Util.PHONE_CALLS, otherPizzeriaId);
145 final ContainerReference targetDeliveryGroupContainer = useContainerOfSpaceWithPort(Util.PIZZERIA_DELIVERY, otherPizzeriaId);
148 * Take deliveryDifference times a delivery group from the source containers and put them into
149 * the containers of the target pizzeria space
151 for (DeliveryGroupData delivery : deliveries) {
152 if (movedDelivieries >= deliveryDifference) return;
156 final TransactionReference tx = capi.createTransaction(
157 Util.SPACE_TRANSACTION_TIMEOUT,
158 URI.create(String.format(Util.SERVER_ADDR, pizzeriaId)));
159 final String errorMsg = "Cannot move the delivery as I can't take it!";
160 DeliveryGroupData group = null;
162 // Take any delivery from the source OrderTakenContainer
163 DeliveryGroupData template = new DeliveryGroupData();
164 template.setDeliveryStatus(null);
165 group = takeMatchingEntity(template, sourceOrderTakenContainer, tx, MzsConstants.RequestTimeout.DEFAULT, errorMsg);
166 } catch (Exception e) {
167 log.warn(e.getMessage());
171 // Take the delivery with the right id from the source DeliveryGroupContainer
172 final DeliveryGroupData template = new DeliveryGroupData();
173 template.setDeliveryStatus(null);
174 template.setId(group.getId());
175 List<DeliveryGroupData> groups = takeMatchingEntities(template, sourceDeliveryGroupContainer, tx, MzsConstants.RequestTimeout.DEFAULT, "Cannot take from sourceDeliveryGroupContainer");
176 group = groups.get(0);
177 } catch (Exception e) {
178 log.warn(e.getMessage());
183 // Take the pizzas from the container
184 final PizzaOrder pizza = new PizzaOrder();
185 pizza.setOrderId(group.getOrder().getId());
186 takeMatchingEntities(pizza, sourcePizzaContainer, tx, MzsConstants.RequestTimeout.DEFAULT, "Cannot take from sourcePizzaContainer");
187 } catch (MzsCoreException e) {
188 log.warn(e.getMessage());
192 group.setOriginalPizzeriaId(String.valueOf(pizzeriaId));
193 group.setPizzeriaId(String.valueOf(otherPizzeriaId));
194 group.setWaiterIdOfOrder(null);
195 group.setLoadBalancerId(loadBalancerId);
196 group.setDeliveryStatus(DeliveryStatus.MOVED);
198 // Inform the source pizzeria that the delivery was moved
199 sendItemsToContainer(Arrays.asList(group), sourceDeliveryGroupContainer, MzsConstants.RequestTimeout.DEFAULT, tx);
200 capi.commitTransaction(tx);
202 final TransactionReference otherTx = capi.createTransaction(
203 Util.SPACE_TRANSACTION_TIMEOUT,
204 URI.create(String.format(Util.SERVER_ADDR, otherPizzeriaId)));
206 group.setDeliveryStatus(DeliveryStatus.START);
208 // Send the deliveries to the other pizzeria
209 sendItemsToContainer(Arrays.asList(group), targetPhoneCallsContainer, MzsConstants.RequestTimeout.DEFAULT, otherTx);
210 sendItemsToContainer(Arrays.asList(group), targetDeliveryGroupContainer, MzsConstants.RequestTimeout.DEFAULT, otherTx);
214 capi.commitTransaction(otherTx);
217 } catch (NullPointerException e) {
218 // strange npe from space
219 } catch (Exception e) {
220 log.warn("Exception in moveDeliveries: {}", e.getMessage());
228 * Returns the port of a pizzeria for which it is true: (numberOfDeliveries+currentNumberDeliveries* DELIVERY_FACTOR_THRESHOLD)<currentNumberDeliveries.
229 * If there is no such pizzeria available the method return 0.
231 private int getPizzeriaWithLessWaitingDeliveries(int currentNumberDeliveries) {
232 final Collection<PizzeriaState> states = pizzeriaStates.values();
233 final long now = new Date().getTime();
234 for (PizzeriaState state : states) {
235 final long lastUpdateTime = state.getLastUpdateTime();
236 final long interval = now - lastUpdateTime;
237 final boolean differenceDeliveries = (state.getNumberDeliveries() + currentNumberDeliveries * DELIVERY_FACTOR_THRESHOLD) < currentNumberDeliveries;
238 if (differenceDeliveries && state.getStatus() != PizzeriaStatus.OFFLINE && lastUpdateTime != 0 && interval < FREQUENCY * INTERVAL_FACTOR) {
239 return state.getId();