1 package at.ac.tuwien.sbc.valesriegler.xvsm;
3 import java.io.Serializable;
5 import java.util.ArrayList;
6 import java.util.Arrays;
9 import java.util.concurrent.atomic.AtomicLong;
11 import org.mozartspaces.capi3.Coordinator;
12 import org.mozartspaces.capi3.CountNotMetException;
13 import org.mozartspaces.capi3.FifoCoordinator;
14 import org.mozartspaces.capi3.LindaCoordinator;
15 import org.mozartspaces.capi3.LindaCoordinator.LindaSelector;
16 import org.mozartspaces.core.Capi;
17 import org.mozartspaces.core.CapiUtil;
18 import org.mozartspaces.core.ContainerReference;
19 import org.mozartspaces.core.DefaultMzsCore;
20 import org.mozartspaces.core.Entry;
21 import org.mozartspaces.core.MzsConstants;
22 import org.mozartspaces.core.MzsCore;
23 import org.mozartspaces.core.MzsCoreException;
24 import org.mozartspaces.core.MzsTimeoutException;
25 import org.mozartspaces.core.TransactionReference;
26 import org.mozartspaces.notifications.NotificationManager;
27 import org.mozartspaces.notifications.Operation;
28 import org.slf4j.Logger;
29 import org.slf4j.LoggerFactory;
31 import at.ac.tuwien.sbc.valesriegler.common.HasId;
32 import at.ac.tuwien.sbc.valesriegler.common.Util;
33 import at.ac.tuwien.sbc.valesriegler.types.GroupData;
35 public abstract class AbstractXVSMConnector {
36 public static Object lockObject = new Object();
37 public static AtomicLong timeOflastOperation = new AtomicLong(new Date().getTime());
39 private static final Logger log = LoggerFactory.getLogger(AbstractXVSMConnector.class);
41 protected ContainerReference tableAssignedContainer;
42 protected ContainerReference assignTableContainer;
43 protected ContainerReference takeOrderContainer;
44 protected ContainerReference orderTakenContainer;
45 protected ContainerReference preparePizzasContainer;
46 protected ContainerReference deliverPizzasContainer;
47 protected ContainerReference paymentRequestContainer;
48 protected ContainerReference freeTablesContainer;
49 protected ContainerReference pizzaInProgressContainer;
50 protected ContainerReference orderCompleteContainer;
51 protected ContainerReference isEatingContainer;
52 protected ContainerReference paymentDoneContainer;
53 protected ContainerReference infoContainer;
55 protected NotificationManager notificationMgr;
57 public AbstractXVSMConnector() {
58 initSpaceCommunication();
61 public void initSpaceCommunication() {
63 MzsCore core = DefaultMzsCore.newInstanceWithoutSpace();
64 capi = new Capi(core);
65 notificationMgr = new NotificationManager(core);
66 } catch (Exception e) {
67 log.error("Space connection could not be established! Have you started the Space Server?");
68 handleSpaceErrorAndTerminate(e);
72 protected ContainerReference useContainer(String containerName) {
74 return CapiUtil.lookupOrCreateContainer(containerName, URI.create(Util.SERVER_ADDR), createCoordinators(new FifoCoordinator(), new LindaCoordinator(false)), null, capi);
75 } catch (MzsCoreException e) {
76 handleSpaceErrorAndTerminate(e);
79 throw new RuntimeException("Could not Create container " + containerName);
82 protected ContainerReference useContainer(String containerName, List<Coordinator> coordinators) {
84 return CapiUtil.lookupOrCreateContainer(containerName, URI.create(Util.SERVER_ADDR), coordinators, null, capi);
85 } catch (MzsCoreException e) {
86 handleSpaceErrorAndTerminate(e);
89 throw new RuntimeException("Could not Create container " + containerName);
92 protected List<Coordinator> createCoordinators(Coordinator... coordinator) {
93 return Arrays.asList(coordinator);
96 protected void handleSpaceErrorAndTerminate(Exception e) {
97 log.error(e.getMessage());
102 protected void createNotification(SpaceListener listener,
103 ContainerReference cref) {
104 listener.startHandlingAbsenceOfNotifications();
106 notificationMgr.createNotification(cref, listener, Operation.WRITE);
107 } catch (Exception e) {
108 handleSpaceErrorAndTerminate(e);
112 protected <T extends Serializable> void sendItemsToContainer(
113 List<T> items, ContainerReference cref, long timeout, TransactionReference tx) {
116 List<Entry> entries = new ArrayList<>();
117 for (Serializable item : items) {
118 entries.add(new Entry(item));
120 capi.write(entries, cref, timeout, tx);
121 } catch (Exception e) {
122 log.info(e.getMessage());
127 @SuppressWarnings("unchecked")
128 protected <T extends Serializable> T takeMatchingEntity(
129 T entity, ContainerReference ref, TransactionReference tx, long timeout, String errorMsg)
130 throws MzsCoreException {
132 LindaSelector sel = LindaCoordinator.newSelector(entity, 1);
134 ArrayList<Serializable> entities = capi.take(ref, sel, timeout, tx);
136 return (T) CapiUtil.getSingleEntry(entities);
137 } catch (CountNotMetException e) {
138 capi.rollbackTransaction(tx);
140 throw new EntityNotFoundByTemplate(errorMsg);
141 } catch(MzsTimeoutException e) {
142 capi.rollbackTransaction(tx);
144 throw new EntityNotFoundByTemplate(errorMsg);
148 protected <T extends Serializable> List<T> takeMatchingEntities(
149 T entity, ContainerReference ref, TransactionReference tx, long timeout, String errorMsg)
150 throws MzsCoreException {
152 LindaSelector sel = LindaCoordinator.newSelector(entity, MzsConstants.Selecting.COUNT_MAX);
154 ArrayList<Serializable> entities = capi.take(ref, sel, timeout, tx);
156 return (List<T>) entities;
157 } catch (CountNotMetException e) {
158 capi.rollbackTransaction(tx);
160 throw new EntityNotFoundByTemplate(errorMsg);
161 } catch(MzsTimeoutException e) {
162 capi.rollbackTransaction(tx);
164 throw new EntityNotFoundByTemplate(errorMsg);
168 protected <T extends Serializable> List<T> castEntries(List<? extends Serializable> entries) {
169 List<T> newList = new ArrayList<T>();
170 if(entries.size() == 0) return newList;
172 Serializable firstEntry = entries.get(0);
173 if (firstEntry instanceof Entry) {
175 List<Entry> newEntries = (List<Entry>) entries;
176 for (Entry entry : newEntries) {
177 newList.add((T) entry.getValue());
181 return (List<T>) entries;
186 protected <T extends HasId> T getSingleEntity(final List<T> entities) {
187 if(entities.size() != 1) {
188 throw new RuntimeException("Only one entity was expected!");
190 return entities.get(0);
193 protected<T extends Serializable> GroupData getSingleGroup(final List<T> entities) {
194 List<GroupData> groups = castEntries(entities);
195 if(groups.size() != 1) {
196 throw new RuntimeException("Only one group was expected!");
198 return groups.get(0);