]> git.somenet.org - pub/jan/sbc.git/blob - src/main/java/at/ac/tuwien/sbc/valesriegler/xvsm/WaiterXVSM.java
Solve some space concurrency bugs
[pub/jan/sbc.git] / src / main / java / at / ac / tuwien / sbc / valesriegler / xvsm / WaiterXVSM.java
1 package at.ac.tuwien.sbc.valesriegler.xvsm;
2
3 import java.io.Serializable;
4 import java.net.URI;
5 import java.util.Arrays;
6 import java.util.Collections;
7 import java.util.Iterator;
8 import java.util.List;
9
10 import org.mozartspaces.capi3.AnyCoordinator;
11 import org.mozartspaces.capi3.FifoCoordinator;
12 import org.mozartspaces.capi3.LindaCoordinator;
13 import org.mozartspaces.core.ContainerReference;
14 import org.mozartspaces.core.MzsConstants;
15 import org.mozartspaces.core.MzsConstants.RequestTimeout;
16 import org.mozartspaces.core.MzsCoreException;
17 import org.mozartspaces.core.TransactionException;
18 import org.mozartspaces.core.TransactionReference;
19 import org.mozartspaces.notifications.Notification;
20 import org.mozartspaces.notifications.NotificationListener;
21 import org.mozartspaces.notifications.Operation;
22 import org.slf4j.Logger;
23 import org.slf4j.LoggerFactory;
24
25 import at.ac.tuwien.sbc.valesriegler.common.Util;
26 import at.ac.tuwien.sbc.valesriegler.types.GroupData;
27 import at.ac.tuwien.sbc.valesriegler.types.GroupState;
28 import at.ac.tuwien.sbc.valesriegler.types.Order;
29 import at.ac.tuwien.sbc.valesriegler.types.OrderStatus;
30 import at.ac.tuwien.sbc.valesriegler.types.Pizza;
31 import at.ac.tuwien.sbc.valesriegler.types.Table;
32
33 public class WaiterXVSM extends AbstractXVSMConnector {
34         private static final Logger log = LoggerFactory.getLogger(WaiterXVSM.class);
35         private final int waiterId;
36
37         public WaiterXVSM(int waiterId) {
38                 super();
39
40                 this.waiterId = waiterId;
41
42                 freeTablesContainer = useContainer(Util.FREE_TABLES);
43                 assignTableContainer = useContainer(Util.ASSIGN_TABLE);
44                 takeOrderContainer = useContainer(Util.TAKE_ORDER);
45                 orderTakenContainer = useContainer(Util.ORDER_TAKEN);
46                 preparePizzasContainer = useContainer(Util.PREPARE_PIZZAS);
47                 orderCompleteContainer = useContainer(Util.ORDER_COMPLETE);
48                 deliverPizzasContainer = useContainer(Util.DELIVER_PIZZAS);
49                 paymentRequestContainer = useContainer(Util.PAYMENT_REQUEST);
50                 paymentDoneContainer = useContainer(Util.PAYMENT_DONE);
51                 tableAssignedContainer = useContainer(Util.TABLE_ASSIGNED);
52         }
53
54         public void listenForFreeTable() {
55                 SpaceListener listener = new SpaceListenerImpl(capi, freeTablesContainer, false) {
56
57                         @Override
58                         void onEntriesWritten(List<? extends Serializable> entries) throws Exception {
59
60                                 List<Table> tables = castEntries(entries);
61
62                                 for (Table table : tables) {
63
64                                         TransactionReference tx = capi.createTransaction(
65                                                         Util.SPACE_TRANSACTION_TIMEOUT,
66                                                         URI.create(Util.SERVER_ADDR));
67
68                                         // Acquire a lock for the free table in the
69                                         // FreeTableContainer
70                                         int id = table.getId();
71
72                                         Table tableTemplate = new Table(id);
73                                         try {
74                                                 Table lockedFreeTable = takeMatchingEntity(tableTemplate,
75                                                                 freeTablesContainer, tx, RequestTimeout.DEFAULT,
76                                                                 String.format("There was no free table found with id %d", id));
77
78
79                                                 GroupData groupTemplate = new GroupData();
80                                                 GroupData lockedGroup = takeMatchingEntity(groupTemplate,
81                                                                 assignTableContainer, tx, RequestTimeout.DEFAULT,
82                                                                 "There is no group waiting for a table at the moment");
83
84                                                 assignGroupToTable(lockedGroup, lockedFreeTable, tx);
85                                         } catch (Exception e) {
86 //                                              log.info(e.getMessage());
87                                         }
88                                 }
89                         }
90                 };
91
92                 createNotification(listener, freeTablesContainer);
93         }
94
95
96
97         public void listenForNewGuests() {
98                 SpaceListener listener = new SpaceListenerImpl(capi, assignTableContainer) {
99                         
100                         @Override
101                         void onEntriesWritten(List<? extends Serializable> entries)
102                                         throws Exception {
103                                 log.info("New guest groups have arrived");
104
105                                 List<GroupData> groups = castEntries(entries);
106
107                                 for (GroupData group : groups) {
108                                         
109                                         TransactionReference tx = capi.createTransaction(
110                                                 Util.SPACE_TRANSACTION_TIMEOUT,
111                                                 URI.create(Util.SERVER_ADDR));
112
113                                         // Acquire a lock for the group in the
114                                         // AssignTableContainer
115                                         String groupNotFound = String.format("Group with id %d was already assigned a table by another waiter!", group.getId());
116
117                                         try {
118                                                 GroupData lockedGroup = takeMatchingEntity(
119                                                                 new GroupData(group.getId()),
120                                                                 assignTableContainer, tx,
121                                                                 RequestTimeout.DEFAULT, groupNotFound);
122                                                 // Acquire a lock for one free table in the
123                                                 // TablesContainer
124                                                 String noFreeTable = String.format("No free table for group with id %d could be found", group.getId());
125                                                 Table lockedFreeTable = takeMatchingEntity(new Table(null), freeTablesContainer, tx, RequestTimeout.DEFAULT,
126                                                                 noFreeTable);
127
128                                                 assignGroupToTable(lockedGroup, lockedFreeTable, tx);
129                                         } catch (Exception e) {
130 //                                              log.info(e.getMessage());
131                                         }
132                                 }
133                         }
134                 };
135                 
136                 createNotification(listener, assignTableContainer);
137         }
138
139         public void listenForPaymentRequest() {
140                 SpaceListener paymentListener = new SpaceListenerImpl(capi, paymentRequestContainer) {
141                         
142                         @Override
143                         void onEntriesWritten(List<? extends Serializable> entries)
144                                         throws Exception {
145
146                                 List<GroupData> groups = castEntries(entries);
147                                 
148                                 for (GroupData groupData : groups) {
149                                         TransactionReference tx = capi.createTransaction(
150                                                         Util.SPACE_TRANSACTION_TIMEOUT,
151                                                         URI.create(Util.SERVER_ADDR));
152                                         GroupData entity = new GroupData(groupData.getId());
153
154                                         // Acquire the lock so that another waiter can't do the same
155                                         // thing!
156                                         String paymentRequestTakenByOtherWaiter = String.format(
157                                                         "The payment request for group %d was already taken by an other waiter!",
158                                                         groupData.getId());
159                                         try {
160                                                 takeMatchingEntity(entity, paymentRequestContainer, tx, RequestTimeout.DEFAULT, paymentRequestTakenByOtherWaiter);
161
162                                                 groupData.setPayingWaiter(waiterId);
163
164                                                 sendItemsToContainer(Arrays.asList(groupData), paymentDoneContainer, RequestTimeout.ZERO, tx);
165
166                                                 capi.commitTransaction(tx);
167                                         } catch (Exception e) {
168 //                                              log.info(e.getMessage());
169                                         }
170                                 }
171                         }
172                 };
173
174                 createNotification(paymentListener, paymentRequestContainer);
175         }
176
177         public void listenForOrderRequests() {
178                 SpaceListener ordersListener = new SpaceListenerImpl(capi, takeOrderContainer) {
179                         
180                         @Override
181                         void onEntriesWritten(List<? extends Serializable> entries)
182                                         throws Exception {
183
184                                 List<GroupData> groups = castEntries(entries);
185                 
186                                 for (GroupData groupData : groups) {
187
188                                         TransactionReference tx = capi.createTransaction(
189                                                         Util.SPACE_TRANSACTION_TIMEOUT,
190                                                         URI.create(Util.SERVER_ADDR));
191                                         GroupData entity = new GroupData(groupData.getId());
192                                         entity.setState(GroupState.SITTING);
193
194                                         try {
195                                                 // Acquire the lock so that another waiter can't do the same thing!
196                                                 String orderTakenByOtherWaiter = String.format(
197                                                                 "The order for group %d was already taken by an other waiter!",
198                                                                 groupData.getId());
199                                                 takeMatchingEntity(entity, takeOrderContainer, tx, RequestTimeout.DEFAULT, orderTakenByOtherWaiter);
200
201                                                 groupData.setOrderWaiter(waiterId);
202                                                 groupData.setState(GroupState.ORDERED);
203                                                 Order order = groupData.getOrder();
204                                                 order.setStatus(OrderStatus.ORDERED);
205
206                                                 
207                                                 // send the order as a whole to the space
208                                                 sendItemsToContainer(Arrays.asList(groupData),
209                                                                 orderTakenContainer, RequestTimeout.ZERO, tx);
210                                                 sendItemsToContainer(order.getOrderedPizzas(),
211                                                                 preparePizzasContainer, RequestTimeout.ZERO, tx);
212                                                 capi.commitTransaction(tx);
213
214                                                 log.info("Waiter has taken order from group {}",
215                                                                 groupData.getId());
216                                         } catch (Exception e) {
217 //                                              log.info(e.getMessage());
218                                         }
219                                 } 
220                         }
221                 };
222                 
223                 createNotification(ordersListener, takeOrderContainer);
224         }
225
226         public void listenForPreparedPizzas() {
227                 /**
228                  * A waiter gets informed when a new pizza is complete. He takes the
229                  * orderId of the pizza and looks up the corresponding order from which
230                  * he gets the number of necessary pizzas of the order. He then tries to
231                  * fetch all pizzas with the corresponding orderId and compares the
232                  * number of those pizzas with the number of necessary pizzas. If all
233                  * pizzas of an order are complete he then delivers them!
234                  */
235                 SpaceListener preparedPizzasListener = new SpaceListenerImpl(capi, deliverPizzasContainer) {
236                                         
237                         @Override
238                         void onEntriesWritten(List<? extends Serializable> entries)
239                                         throws Exception {
240
241                                 List<Pizza> pizzas = castEntries(entries);
242                 
243                                 for (Pizza pizza : pizzas) {
244                                         int orderId = pizza.getOrderId();
245
246                                         TransactionReference tx = capi.createTransaction(
247                                                         Util.SPACE_TRANSACTION_TIMEOUT,
248                                                         URI.create(Util.SERVER_ADDR));
249
250                                         try {
251                                                 GroupData entity = new GroupData();
252                                                 entity.setState(null);
253                                                 Order order = new Order();
254                                                 order.setId(orderId);
255                                                 entity.setOrder(order);
256
257                                                 GroupData groupData = takeMatchingEntity(entity,
258                                                                 orderTakenContainer, tx, RequestTimeout.DEFAULT,
259                                                                 "Another waiter just checks if the order is complete");
260                                                 int groupId = groupData.getId();
261                                                 int numberOfPizzas = groupData.getOrder().getNumberOfPizzas();
262
263                                                 Pizza pizzaTemplate = new Pizza();
264                                                 pizzaTemplate.setOrderId(orderId);
265
266                                                 List<Pizza> pizzasOfOrder = takeMatchingEntities(
267                                                                 pizzaTemplate, deliverPizzasContainer, tx,
268                                                                 RequestTimeout.DEFAULT,
269                                                                 "Cannot take the pizzas from the deliverPizzasContainer");
270
271                                                 if (pizzasOfOrder.size() == numberOfPizzas) {
272                                                         GroupData group = new GroupData();
273                                                         group.setServingWaiter(waiterId);
274                                                         Order completeOrder = new Order();
275                                                         completeOrder.setId(orderId);
276                                                         completeOrder.setGroupId(groupId);
277                                                         group.setOrder(completeOrder);
278                                                         sendItemsToContainer(Arrays.asList(group),
279                                                                         orderCompleteContainer, RequestTimeout.DEFAULT,
280                                                                         tx);
281                                                         capi.commitTransaction(tx);
282                                                 } else {
283                                                         log.info("Not yet all pizzas prepared! Order with id "
284                                                                         + orderId + " has " + numberOfPizzas
285                                                                         + " pizzas, but only " + pizzasOfOrder.size()
286                                                                         + " pizzas are ready by now!");
287                                                         capi.rollbackTransaction(tx);
288                                                 }
289                                         } catch (NullPointerException e) {
290                                                 
291                                         } catch (Exception e) {
292                                                 capi.rollbackTransaction(tx);
293                                         }
294                                 }
295                         }
296                 };
297
298                 createNotification(preparedPizzasListener, deliverPizzasContainer);
299         }
300
301         private void assignGroupToTable(GroupData lockedGroup,
302                         Table lockedFreeTable, TransactionReference tx)
303                         throws MzsCoreException {
304                 // The new group sits down at the table
305                 lockedFreeTable.setGroupId(lockedGroup.getId());
306
307                 // The new group now wants to order
308                 lockedGroup.setState(GroupState.SITTING);
309                 lockedGroup.setTable(lockedFreeTable);
310                 lockedGroup.setTableWaiter(waiterId);
311
312                 sendItemsToContainer(Arrays.asList(lockedFreeTable),
313                                 tableAssignedContainer, RequestTimeout.ZERO, tx);
314                 sendItemsToContainer(Arrays.asList(lockedGroup), takeOrderContainer,
315                                 RequestTimeout.ZERO, tx);
316
317                 try {
318                         capi.commitTransaction(tx);
319                         log.info("Assigned table to group with groupId {}",
320                                         lockedGroup.getId());
321                 } catch (Exception e) {
322                         log.info("Assigning a table to group with groupId {} failed",
323                                         lockedGroup.getId());
324                         log.info(e.getMessage());
325                 }
326         }
327
328 }