1 package at.ac.tuwien.sbc.valesriegler.xvsm;
4 import at.ac.tuwien.sbc.valesriegler.common.Util;
5 import at.ac.tuwien.sbc.valesriegler.types.DeliveryGroupData;
6 import at.ac.tuwien.sbc.valesriegler.types.DeliveryStatus;
7 import at.ac.tuwien.sbc.valesriegler.xvsm.loadbalancer.PizzeriaState;
8 import at.ac.tuwien.sbc.valesriegler.xvsm.loadbalancer.PizzeriaStatus;
9 import at.ac.tuwien.sbc.valesriegler.xvsm.spacehelpers.SpaceAction;
10 import org.mozartspaces.capi3.AnyCoordinator;
11 import org.mozartspaces.core.ContainerReference;
12 import org.mozartspaces.core.MzsConstants;
13 import org.mozartspaces.core.TransactionReference;
14 import org.slf4j.Logger;
15 import org.slf4j.LoggerFactory;
17 import java.io.Serializable;
21 public class LoadBalancerXVSM extends AbstractXVSMConnector {
22 private static final Logger log = LoggerFactory.getLogger(LoadBalancerXVSM.class);
23 private static final int FREQUENCY = 1500;
25 private int loadBalancerId;
26 private Set<Integer> pizzeriaIdentifiers;
27 private Map<Integer, PizzeriaState> pizzeriaStates = Collections.synchronizedMap(new HashMap<Integer, PizzeriaState>());
29 public LoadBalancerXVSM(int id, int port) {
32 this.loadBalancerId = id;
33 this.groupAgentInfoContainer = useContainerOfSpaceWithPort(Util.GROUP_AGENT_INFO, Util.GROUP_AGENT_PORT);
37 * The Load balancer listens for new pizzerias on the Group Agent Info container
39 public void listenForPizzerias() {
40 getDefaultBuilder("listenForPizzerias").setLookaround(true).setCref(groupAgentInfoContainer).setSpaceAction(new SpaceAction() {
42 public void onEntriesWritten(List<? extends Serializable> entries) throws Exception {
44 final List<PizzeriaRegistration> pizzeriaRegistrations = castEntries(entries);
45 for (PizzeriaRegistration registration : pizzeriaRegistrations) {
46 final int pizzeriaId = registration.pizzeriaSpacePort;
47 if (pizzeriaStates.get(pizzeriaId) == null) {
48 final PizzeriaState pizzeriaState = new PizzeriaState(pizzeriaId);
49 pizzeriaStates.put(pizzeriaId, pizzeriaState);
50 listenToPizzeria(pizzeriaId);
55 }).createSpaceListenerImpl();
59 * Whenever a new pizzeria emerges a timer is started which periodically checks the delivery load
60 * of the pizzeria and contemplates moving deliveries.
62 private void listenToPizzeria(int pizzeriaId) {
63 log.warn("Start listening to pizzeria {}", pizzeriaId);
65 Timer timer = new Timer();
67 timer.schedule(new LoadCheck(useContainerOfSpaceWithPort(Util.DELIVERY_ORDER_TAKEN, pizzeriaId), pizzeriaId), 100, FREQUENCY);
71 private class LoadCheck extends TimerTask {
72 public static final int DELIVERIES_NUMBER_THRESHOLD = 20;
73 public static final double DELIVERY_FACTOR_THRESHOLD = 0.2;
74 public static final int INTERVAL_FACTOR = 2;
76 private final ContainerReference container;
77 private final int pizzeriaId;
79 public LoadCheck(ContainerReference containerReference, int pizzeriaId) {
80 this.container = containerReference;
81 this.pizzeriaId = pizzeriaId;
86 log.info("Start running Check for pizzeriaId {}", pizzeriaId);
87 synchronized (pizzeriaStates) {
89 List<DeliveryGroupData> deliveries = null;
90 final PizzeriaState pizzeriaState = pizzeriaStates.get(pizzeriaId);
92 deliveries = castEntries(capi.read(container, AnyCoordinator.newSelector(MzsConstants.Selecting.COUNT_MAX), MzsConstants.RequestTimeout.DEFAULT, null));
93 } catch (Exception e) {
94 log.warn("Exception while reading deliveries from Pizzeria {}", pizzeriaId);
96 pizzeriaState.setStatus(PizzeriaStatus.OFFLINE);
99 pizzeriaState.setStatus(PizzeriaStatus.ONLINE);
101 final long lastUpdatedTimestamp = pizzeriaState.getLastUpdateTime();
102 final long now = new Date().getTime();
103 final long interval = now - lastUpdatedTimestamp;
104 final int currentNumberDeliveries = deliveries.size();
106 // there has to be a stored value for the pizzeria and it should be current enough
107 if (lastUpdatedTimestamp != 0 && interval < FREQUENCY * INTERVAL_FACTOR && currentNumberDeliveries > DELIVERIES_NUMBER_THRESHOLD) {
108 log.info("Pizzeria is handled...");
109 final int id = getPizzeriaWithLessWaitingDeliveries(currentNumberDeliveries);
111 final PizzeriaState otherPizzeria = pizzeriaStates.get(id);
112 final int numberDeliveriesOtherPizzeria = otherPizzeria.getNumberDeliveries();
114 log.info("This pizzeria has {} deliveries", currentNumberDeliveries);
115 log.info("Other pizzeria {} has {} deliveries", id, numberDeliveriesOtherPizzeria);
117 final int deliveryDifference = currentNumberDeliveries - (numberDeliveriesOtherPizzeria + currentNumberDeliveries) / INTERVAL_FACTOR;
119 moveDeliveries(deliveryDifference, deliveries, pizzeriaId, id);
121 log.info(String.format("Move %d deliveries from pizzeria %d to pizzeria %d", deliveryDifference, pizzeriaId, id));
123 log.info("No pizzeria with less deliveries found!");
127 pizzeriaState.setNumberDeliveries(currentNumberDeliveries);
128 pizzeriaState.setLastUpdateTime(now);
131 } catch (Exception e) {
137 private void moveDeliveries(int deliveryDifference, List<DeliveryGroupData> deliveries, int pizzeriaId, int otherPizzeriaId) {
138 int movedDelivieries = 0;
139 final ContainerReference sourceOrderTakenContainer = useContainerOfSpaceWithPort(Util.DELIVERY_ORDER_TAKEN, pizzeriaId);
140 final ContainerReference sourceDeliveryGroupContainer = useContainerOfSpaceWithPort(Util.PIZZERIA_DELIVERY, pizzeriaId);
141 final ContainerReference targetPhoneCallsContainer = useContainerOfSpaceWithPort(Util.PHONE_CALLS, otherPizzeriaId);
142 final ContainerReference targetDeliveryGroupContainer = useContainerOfSpaceWithPort(Util.PIZZERIA_DELIVERY, otherPizzeriaId);
145 * Take deliveryDifference times a delivery group from the source containers and put them into
146 * the containers of the target pizzeria space
148 for (DeliveryGroupData delivery : deliveries) {
149 if (movedDelivieries >= deliveryDifference) return;
153 final TransactionReference tx = capi.createTransaction(
154 Util.SPACE_TRANSACTION_TIMEOUT,
155 URI.create(String.format(Util.SERVER_ADDR, pizzeriaId)));
156 final String errorMsg = "Cannot move the delivery as I can't take it!";
157 DeliveryGroupData group = null;
159 // Take any delivery from the source OrderTakenContainer
160 DeliveryGroupData template = new DeliveryGroupData();
161 template.setDeliveryStatus(null);
162 group = takeMatchingEntity(template, sourceOrderTakenContainer, tx, MzsConstants.RequestTimeout.DEFAULT, errorMsg);
163 } catch (Exception e) {
164 log.warn(e.getMessage());
168 // Take the delivery with the right id from the source DeliveryGroupContainer
169 final DeliveryGroupData template = new DeliveryGroupData();
170 template.setDeliveryStatus(null);
171 template.setId(group.getId());
172 List<DeliveryGroupData> groups = takeMatchingEntities(template, sourceDeliveryGroupContainer, tx, MzsConstants.RequestTimeout.DEFAULT, "Cannot take from sourceDeliveryGroupContainer");
173 group = groups.get(0);
174 } catch (Exception e) {
175 log.warn(e.getMessage());
178 group.setOriginalPizzeriaId(String.valueOf(pizzeriaId));
179 group.setPizzeriaId(String.valueOf(otherPizzeriaId));
180 group.setWaiterIdOfOrder(null);
181 group.setLoadBalancerId(loadBalancerId);
182 group.setDeliveryStatus(DeliveryStatus.MOVED);
184 // Inform the source pizzeria that the delivery was moved
185 sendItemsToContainer(Arrays.asList(group), sourceDeliveryGroupContainer, MzsConstants.RequestTimeout.DEFAULT, tx);
186 capi.commitTransaction(tx);
188 final TransactionReference otherTx = capi.createTransaction(
189 Util.SPACE_TRANSACTION_TIMEOUT,
190 URI.create(String.format(Util.SERVER_ADDR, otherPizzeriaId)));
192 group.setDeliveryStatus(DeliveryStatus.START);
194 // Send the deliveries to the other pizzeria
195 sendItemsToContainer(Arrays.asList(group), targetPhoneCallsContainer, MzsConstants.RequestTimeout.DEFAULT, otherTx);
196 sendItemsToContainer(Arrays.asList(group), targetDeliveryGroupContainer, MzsConstants.RequestTimeout.DEFAULT, otherTx);
200 capi.commitTransaction(otherTx);
203 } catch (NullPointerException e) {
204 // strange npe from space
205 } catch (Exception e) {
206 log.warn("Exception in moveDeliveries: {}", e.getMessage());
214 * Returns the port of a pizzeria for which it is true: (numberOfDeliveries+currentNumberDeliveries* DELIVERY_FACTOR_THRESHOLD)<currentNumberDeliveries.
215 * If there is no such pizzeria available the method return 0.
217 private int getPizzeriaWithLessWaitingDeliveries(int currentNumberDeliveries) {
218 final Collection<PizzeriaState> states = pizzeriaStates.values();
219 final long now = new Date().getTime();
220 for (PizzeriaState state : states) {
221 final long lastUpdateTime = state.getLastUpdateTime();
222 final long interval = now - lastUpdateTime;
223 final boolean differenceDeliveries = (state.getNumberDeliveries() + currentNumberDeliveries * DELIVERY_FACTOR_THRESHOLD) < currentNumberDeliveries;
224 if (differenceDeliveries && state.getStatus() != PizzeriaStatus.OFFLINE && lastUpdateTime != 0 && interval < FREQUENCY * INTERVAL_FACTOR) {
225 return state.getId();