From 23f19205ba56c7c4f14fe6a7f797e9359c2cd124 Mon Sep 17 00:00:00 2001
From: Jan Vales <jan@jvales.net>
Date: Mon, 10 Jun 2013 15:53:05 +0200
Subject: [PATCH] [JMS] new WF works now (hopefully)

---
 .../tuwien/sbc/valesriegler/common/Util.java  |  2 +
 .../cook/actions/DeliveryOrderInfo.java       | 10 +-
 .../sbc/valesriegler/cook/jms/JMSCook.java    | 12 ++-
 .../DeliveryOrdersToCook.java                 | 49 +++++++++-
 .../jms/messageListeners/OrdersToCook.java    |  4 +-
 .../valesriegler/driver/jms/JMSDriver.java    | 14 +--
 .../CookedDeliveryOrders.java                 | 95 +++++++++++++++++++
 .../messageListeners/PendingDeliveries.java   | 64 -------------
 8 files changed, 167 insertions(+), 83 deletions(-)
 create mode 100644 src/main/java/at/ac/tuwien/sbc/valesriegler/driver/jms/messageListeners/CookedDeliveryOrders.java
 delete mode 100644 src/main/java/at/ac/tuwien/sbc/valesriegler/driver/jms/messageListeners/PendingDeliveries.java

diff --git a/src/main/java/at/ac/tuwien/sbc/valesriegler/common/Util.java b/src/main/java/at/ac/tuwien/sbc/valesriegler/common/Util.java
index 671dc09..3db298a 100644
--- a/src/main/java/at/ac/tuwien/sbc/valesriegler/common/Util.java
+++ b/src/main/java/at/ac/tuwien/sbc/valesriegler/common/Util.java
@@ -66,6 +66,8 @@ public abstract class Util {
 	public static final int GROUP_AGENT_PORT = 9876;
 	public static final int DELIVERY_CUSTOMERS_PORT = 9877;
 
+	public static final String JMS_DELIVERY_DESTINATION = "tcp://localhost:61611?jms.prefetchPolicy.all=1";
+
 	public static String getId(int id) {
 		return (id != 0 && id != -1) ? String.valueOf(id) : "";
 	}
diff --git a/src/main/java/at/ac/tuwien/sbc/valesriegler/cook/actions/DeliveryOrderInfo.java b/src/main/java/at/ac/tuwien/sbc/valesriegler/cook/actions/DeliveryOrderInfo.java
index 71e75e6..0abcc11 100644
--- a/src/main/java/at/ac/tuwien/sbc/valesriegler/cook/actions/DeliveryOrderInfo.java
+++ b/src/main/java/at/ac/tuwien/sbc/valesriegler/cook/actions/DeliveryOrderInfo.java
@@ -2,8 +2,8 @@ package at.ac.tuwien.sbc.valesriegler.cook.actions;
 
 import java.io.Serializable;
 
-import at.ac.tuwien.sbc.valesriegler.common.AbstractAction;
-import at.ac.tuwien.sbc.valesriegler.types.GroupData;
+import at.ac.tuwien.sbc.valesriegler.common.AbstractDeliveryAction;
+import at.ac.tuwien.sbc.valesriegler.types.DeliveryGroupData;
 
 /**
  * response to the group's interest in pizza.
@@ -11,11 +11,11 @@ import at.ac.tuwien.sbc.valesriegler.types.GroupData;
  * @author jan
  * 
  */
-public class DeliveryOrderInfo extends AbstractAction implements Serializable {
+public class DeliveryOrderInfo extends AbstractDeliveryAction implements Serializable {
 	private final int cookId;
 
-	public DeliveryOrderInfo(GroupData groupdata, int cookId) {
-		super(groupdata);
+	public DeliveryOrderInfo(DeliveryGroupData deliveryGroupData, int cookId) {
+		super(deliveryGroupData);
 		this.cookId = cookId;
 	}
 
diff --git a/src/main/java/at/ac/tuwien/sbc/valesriegler/cook/jms/JMSCook.java b/src/main/java/at/ac/tuwien/sbc/valesriegler/cook/jms/JMSCook.java
index c2378da..79026eb 100644
--- a/src/main/java/at/ac/tuwien/sbc/valesriegler/cook/jms/JMSCook.java
+++ b/src/main/java/at/ac/tuwien/sbc/valesriegler/cook/jms/JMSCook.java
@@ -10,6 +10,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import at.ac.tuwien.sbc.valesriegler.common.HasId;
+import at.ac.tuwien.sbc.valesriegler.cook.jms.messageListeners.DeliveryOrdersToCook;
 import at.ac.tuwien.sbc.valesriegler.cook.jms.messageListeners.OrdersToCook;
 
 /**
@@ -34,9 +35,14 @@ public class JMSCook implements HasId {
 			Connection connection = connectionFactory.createConnection();
 			connection.start();
 
-			Session sessWantToSit = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
-			MessageConsumer consWantToSit = sessWantToSit.createConsumer(sessWantToSit.createQueue("OrdersToCook"));
-			consWantToSit.setMessageListener(new OrdersToCook(this));
+			Session sessOrdersToCook = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+			MessageConsumer consOrdersToCook = sessOrdersToCook.createConsumer(sessOrdersToCook.createQueue("OrdersToCook"));
+			consOrdersToCook.setMessageListener(new OrdersToCook(this));
+
+			Session sessDeliveryOrdersToCook = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+			MessageConsumer consDeliveryOrdersToCook = sessDeliveryOrdersToCook.createConsumer(sessDeliveryOrdersToCook
+					.createQueue("DeliveryOrdersToCook"));
+			consDeliveryOrdersToCook.setMessageListener(new DeliveryOrdersToCook(this));
 		} catch (JMSException e) {
 			log.error("EXCEPTION!", e);
 		}
diff --git a/src/main/java/at/ac/tuwien/sbc/valesriegler/cook/jms/messageListeners/DeliveryOrdersToCook.java b/src/main/java/at/ac/tuwien/sbc/valesriegler/cook/jms/messageListeners/DeliveryOrdersToCook.java
index 049add1..973b778 100644
--- a/src/main/java/at/ac/tuwien/sbc/valesriegler/cook/jms/messageListeners/DeliveryOrdersToCook.java
+++ b/src/main/java/at/ac/tuwien/sbc/valesriegler/cook/jms/messageListeners/DeliveryOrdersToCook.java
@@ -1,15 +1,25 @@
 package at.ac.tuwien.sbc.valesriegler.cook.jms.messageListeners;
 
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
 import javax.jms.JMSException;
 import javax.jms.Message;
 import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
 import javax.jms.ObjectMessage;
+import javax.jms.Session;
 
+import org.apache.activemq.ActiveMQConnectionFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import at.ac.tuwien.sbc.valesriegler.cook.actions.DeliveryOrderInfo;
 import at.ac.tuwien.sbc.valesriegler.cook.jms.JMSCook;
 import at.ac.tuwien.sbc.valesriegler.group.actions.DeliveryOrderRequest;
+import at.ac.tuwien.sbc.valesriegler.types.OrderStatus;
+import at.ac.tuwien.sbc.valesriegler.types.Pizza;
+import at.ac.tuwien.sbc.valesriegler.types.PizzaOrder;
+import at.ac.tuwien.sbc.valesriegler.types.PizzaOrderStatus;
 
 /**
  * Cook the requested pizza.
@@ -37,11 +47,44 @@ public class DeliveryOrdersToCook implements MessageListener {
 						DeliveryOrderRequest dor = (DeliveryOrderRequest) obj;
 						log.debug("Received: " + dor);
 
-						// generate random delay
-						Thread.sleep((long) (Math.random() * 10000));
+						for (PizzaOrder pizzaorder : dor.getDeliveryGroupData().getOrder().getOrderedPizzas()) {
+							pizzaorder.setStatus(PizzaOrderStatus.IN_PREPARATION);
+						}
 
-						// TODO:
+						ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(cook.getCONNECTSTRING());
+						Connection connection = connectionFactory.createConnection();
+						connection.start();
 
+						// inform pizzeria
+						Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+						MessageProducer prod = session.createProducer(session.createQueue("PizzeriaConnector"));
+						prod.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+						DeliveryOrderInfo doi = new DeliveryOrderInfo(dor.getDeliveryGroupData(), cook.getId());
+						prod.send(session.createObjectMessage(doi));
+						session.close();
+
+						// generate delay
+						for (PizzaOrder po : dor.getDeliveryGroupData().getOrder().getOrderedPizzas()) {
+							Thread.sleep(po.getPizzaType().duration * 1000);
+							po.setStatus(PizzaOrderStatus.DONE);
+							Pizza p = Pizza.createPizzaFromPizzaOrder(po, cook.getId(), false);
+							dor.getGroupdata().getOrder().getCookedPizzas().add(p);
+						}
+						dor.getGroupdata().getOrder().setStatus(OrderStatus.DELIVERY_PENDING);
+
+						// let pizzas be delivered.
+						session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+						prod = session.createProducer(session.createQueue("CookedDeliveryOrders"));
+						prod.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+						prod.send(session.createObjectMessage(doi));
+
+						// inform pizeria
+						prod = session.createProducer(session.createQueue("PizzeriaConnector"));
+						prod.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+						// WTF: oi = new OrderInfo(orderrequest.getGroupdata(), cook.getId());
+						prod.send(session.createObjectMessage(doi));
+						session.close();
+						connection.close();
 					} else {
 						log.warn("Received unknown Object: " + obj);
 					}
diff --git a/src/main/java/at/ac/tuwien/sbc/valesriegler/cook/jms/messageListeners/OrdersToCook.java b/src/main/java/at/ac/tuwien/sbc/valesriegler/cook/jms/messageListeners/OrdersToCook.java
index 5c61c7b..62b6875 100644
--- a/src/main/java/at/ac/tuwien/sbc/valesriegler/cook/jms/messageListeners/OrdersToCook.java
+++ b/src/main/java/at/ac/tuwien/sbc/valesriegler/cook/jms/messageListeners/OrdersToCook.java
@@ -63,7 +63,7 @@ public class OrdersToCook implements MessageListener {
 						prod.send(session.createObjectMessage(oi));
 						session.close();
 
-						// generate random delay
+						// generate delay
 						for (PizzaOrder po : orderrequest.getGroupdata().getOrder().getOrderedPizzas()) {
 							Thread.sleep(po.getPizzaType().duration * 1000);
 							po.setStatus(PizzaOrderStatus.DONE);
@@ -81,7 +81,7 @@ public class OrdersToCook implements MessageListener {
 						// inform pizeria
 						prod = session.createProducer(session.createQueue("PizzeriaConnector"));
 						prod.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
-						oi = new OrderInfo(orderrequest.getGroupdata(), cook.getId());
+						// WTF: oi = new OrderInfo(orderrequest.getGroupdata(), cook.getId());
 						prod.send(session.createObjectMessage(oi));
 						session.close();
 
diff --git a/src/main/java/at/ac/tuwien/sbc/valesriegler/driver/jms/JMSDriver.java b/src/main/java/at/ac/tuwien/sbc/valesriegler/driver/jms/JMSDriver.java
index 2aac4e7..06318be 100644
--- a/src/main/java/at/ac/tuwien/sbc/valesriegler/driver/jms/JMSDriver.java
+++ b/src/main/java/at/ac/tuwien/sbc/valesriegler/driver/jms/JMSDriver.java
@@ -2,12 +2,15 @@ package at.ac.tuwien.sbc.valesriegler.driver.jms;
 
 import javax.jms.Connection;
 import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.Session;
 
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import at.ac.tuwien.sbc.valesriegler.common.HasId;
+import at.ac.tuwien.sbc.valesriegler.driver.jms.messageListeners.CookedDeliveryOrders;
 
 /**
  * JMSDriver registers all the Listeners.
@@ -27,7 +30,7 @@ public class JMSDriver implements HasId {
 	public JMSDriver(String jmsURL, int id) {
 		CONNECTSTRING = jmsURL;
 		this.id = id;
-		log.info("I AM A JMSCook WITH ID {}", this.id);
+		log.info("I AM A JMSDriver WITH ID {}", this.id);
 
 		try {
 			// Connecting to the Broker and to the output queue
@@ -35,11 +38,10 @@ public class JMSDriver implements HasId {
 			Connection connection = connectionFactory.createConnection();
 			connection.start();
 
-			// Session sessWantToSit = connection.createSession(false,
-			// Session.CLIENT_ACKNOWLEDGE);
-			// MessageConsumer consWantToSit =
-			// sessWantToSit.createConsumer(sessWantToSit.createQueue("OrdersToCook"));
-			// consWantToSit.setMessageListener(new OrdersToCook(this));
+			Session sessCookedDeliveryOrders = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+			MessageConsumer consCookedDeliveryOrders = sessCookedDeliveryOrders.createConsumer(sessCookedDeliveryOrders
+					.createQueue("OrdersToCook"));
+			consCookedDeliveryOrders.setMessageListener(new CookedDeliveryOrders(this));
 		} catch (JMSException e) {
 			log.error("EXCEPTION!", e);
 		}
diff --git a/src/main/java/at/ac/tuwien/sbc/valesriegler/driver/jms/messageListeners/CookedDeliveryOrders.java b/src/main/java/at/ac/tuwien/sbc/valesriegler/driver/jms/messageListeners/CookedDeliveryOrders.java
new file mode 100644
index 0000000..4d2d889
--- /dev/null
+++ b/src/main/java/at/ac/tuwien/sbc/valesriegler/driver/jms/messageListeners/CookedDeliveryOrders.java
@@ -0,0 +1,95 @@
+package at.ac.tuwien.sbc.valesriegler.driver.jms.messageListeners;
+
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.ObjectMessage;
+import javax.jms.Session;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import at.ac.tuwien.sbc.valesriegler.common.Util;
+import at.ac.tuwien.sbc.valesriegler.cook.actions.DeliveryOrderInfo;
+import at.ac.tuwien.sbc.valesriegler.driver.jms.JMSDriver;
+import at.ac.tuwien.sbc.valesriegler.group.actions.DeliveryOrderResponse;
+
+/**
+ * 
+ * 
+ * @author jan
+ * 
+ */
+public class CookedDeliveryOrders implements MessageListener {
+	private static final Logger log = LoggerFactory.getLogger(CookedDeliveryOrders.class);
+	private final JMSDriver driver;
+
+	public CookedDeliveryOrders(JMSDriver driver) {
+		this.driver = driver;
+	}
+
+	@Override
+	public void onMessage(Message msg) {
+		try {
+			synchronized (driver) {
+				if (msg instanceof ObjectMessage) {
+					ObjectMessage objMsg = (ObjectMessage) msg;
+					Object obj = objMsg.getObject();
+
+					if (obj instanceof DeliveryOrderResponse) {
+						DeliveryOrderResponse dor = (DeliveryOrderResponse) obj;
+						log.debug("Received: " + dor);
+
+						// generate random delay
+						Thread.sleep((long) (Math.random() * 10000));
+						String addr = dor.getDeliveryGroupData().getAddress();
+
+						// Deliver to destination broker
+						ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(Util.JMS_DELIVERY_DESTINATION);
+						Connection connection = connectionFactory.createConnection();
+						connection.start();
+
+						Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+						MessageProducer prod = session.createProducer(session.createQueue(addr));
+						prod.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+						DeliveryOrderInfo doi = new DeliveryOrderInfo(dor.getDeliveryGroupData(), driver.getId());
+						prod.send(session.createObjectMessage(doi));
+						session.close();
+						connection.close();
+
+						// Inform pizzeria
+						connectionFactory = new ActiveMQConnectionFactory(driver.getCONNECTSTRING());
+						connection = connectionFactory.createConnection();
+						connection.start();
+
+						session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+						prod = session.createProducer(session.createQueue("PizzeriaConnector"));
+						prod.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+						prod.send(session.createObjectMessage(doi));
+						session.close();
+						connection.close();
+
+					} else {
+						log.warn("Received unknown Object: " + obj);
+					}
+				} else {
+					log.warn("Received unknown Message: " + msg);
+				}
+				msg.acknowledge();
+			}
+		} catch (JMSException e) {
+			log.error("EXCEPTION!", e);
+		} catch (InterruptedException e) {
+			log.error("EXCEPTION!", e);
+		}
+	}
+
+	@Override
+	public String toString() {
+		return "PendingDeliveries [driver=" + driver + "]";
+	}
+}
diff --git a/src/main/java/at/ac/tuwien/sbc/valesriegler/driver/jms/messageListeners/PendingDeliveries.java b/src/main/java/at/ac/tuwien/sbc/valesriegler/driver/jms/messageListeners/PendingDeliveries.java
deleted file mode 100644
index 75f0308..0000000
--- a/src/main/java/at/ac/tuwien/sbc/valesriegler/driver/jms/messageListeners/PendingDeliveries.java
+++ /dev/null
@@ -1,64 +0,0 @@
-package at.ac.tuwien.sbc.valesriegler.driver.jms.messageListeners;
-
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageListener;
-import javax.jms.ObjectMessage;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import at.ac.tuwien.sbc.valesriegler.driver.jms.JMSDriver;
-import at.ac.tuwien.sbc.valesriegler.group.actions.DeliveryOrderRequest;
-
-/**
- * 
- * 
- * @author jan
- * 
- */
-public class PendingDeliveries implements MessageListener {
-	private static final Logger log = LoggerFactory.getLogger(PendingDeliveries.class);
-	private final JMSDriver driver;
-
-	public PendingDeliveries(JMSDriver driver) {
-		this.driver = driver;
-	}
-
-	@Override
-	public void onMessage(Message msg) {
-		try {
-			synchronized (driver) {
-				if (msg instanceof ObjectMessage) {
-					ObjectMessage objMsg = (ObjectMessage) msg;
-					Object obj = objMsg.getObject();
-
-					if (obj instanceof DeliveryOrderRequest) {
-						DeliveryOrderRequest dor = (DeliveryOrderRequest) obj;
-						log.debug("Received: " + dor);
-
-						// generate random delay
-						Thread.sleep((long) (Math.random() * 10000));
-
-						// TODO:
-
-					} else {
-						log.warn("Received unknown Object: " + obj);
-					}
-				} else {
-					log.warn("Received unknown Message: " + msg);
-				}
-				msg.acknowledge();
-			}
-		} catch (JMSException e) {
-			log.error("EXCEPTION!", e);
-		} catch (InterruptedException e) {
-			log.error("EXCEPTION!", e);
-		}
-	}
-
-	@Override
-	public String toString() {
-		return "PendingDeliveries [driver=" + driver + "]";
-	}
-}
-- 
2.43.0