From 23f19205ba56c7c4f14fe6a7f797e9359c2cd124 Mon Sep 17 00:00:00 2001 From: Jan Vales Date: Mon, 10 Jun 2013 15:53:05 +0200 Subject: [PATCH] [JMS] new WF works now (hopefully) --- .../tuwien/sbc/valesriegler/common/Util.java | 2 + .../cook/actions/DeliveryOrderInfo.java | 10 +- .../sbc/valesriegler/cook/jms/JMSCook.java | 12 ++- .../DeliveryOrdersToCook.java | 49 +++++++++- .../jms/messageListeners/OrdersToCook.java | 4 +- .../valesriegler/driver/jms/JMSDriver.java | 14 +-- .../CookedDeliveryOrders.java | 95 +++++++++++++++++++ .../messageListeners/PendingDeliveries.java | 64 ------------- 8 files changed, 167 insertions(+), 83 deletions(-) create mode 100644 src/main/java/at/ac/tuwien/sbc/valesriegler/driver/jms/messageListeners/CookedDeliveryOrders.java delete mode 100644 src/main/java/at/ac/tuwien/sbc/valesriegler/driver/jms/messageListeners/PendingDeliveries.java diff --git a/src/main/java/at/ac/tuwien/sbc/valesriegler/common/Util.java b/src/main/java/at/ac/tuwien/sbc/valesriegler/common/Util.java index 671dc09..3db298a 100644 --- a/src/main/java/at/ac/tuwien/sbc/valesriegler/common/Util.java +++ b/src/main/java/at/ac/tuwien/sbc/valesriegler/common/Util.java @@ -66,6 +66,8 @@ public abstract class Util { public static final int GROUP_AGENT_PORT = 9876; public static final int DELIVERY_CUSTOMERS_PORT = 9877; + public static final String JMS_DELIVERY_DESTINATION = "tcp://localhost:61611?jms.prefetchPolicy.all=1"; + public static String getId(int id) { return (id != 0 && id != -1) ? String.valueOf(id) : ""; } diff --git a/src/main/java/at/ac/tuwien/sbc/valesriegler/cook/actions/DeliveryOrderInfo.java b/src/main/java/at/ac/tuwien/sbc/valesriegler/cook/actions/DeliveryOrderInfo.java index 71e75e6..0abcc11 100644 --- a/src/main/java/at/ac/tuwien/sbc/valesriegler/cook/actions/DeliveryOrderInfo.java +++ b/src/main/java/at/ac/tuwien/sbc/valesriegler/cook/actions/DeliveryOrderInfo.java @@ -2,8 +2,8 @@ package at.ac.tuwien.sbc.valesriegler.cook.actions; import java.io.Serializable; -import at.ac.tuwien.sbc.valesriegler.common.AbstractAction; -import at.ac.tuwien.sbc.valesriegler.types.GroupData; +import at.ac.tuwien.sbc.valesriegler.common.AbstractDeliveryAction; +import at.ac.tuwien.sbc.valesriegler.types.DeliveryGroupData; /** * response to the group's interest in pizza. @@ -11,11 +11,11 @@ import at.ac.tuwien.sbc.valesriegler.types.GroupData; * @author jan * */ -public class DeliveryOrderInfo extends AbstractAction implements Serializable { +public class DeliveryOrderInfo extends AbstractDeliveryAction implements Serializable { private final int cookId; - public DeliveryOrderInfo(GroupData groupdata, int cookId) { - super(groupdata); + public DeliveryOrderInfo(DeliveryGroupData deliveryGroupData, int cookId) { + super(deliveryGroupData); this.cookId = cookId; } diff --git a/src/main/java/at/ac/tuwien/sbc/valesriegler/cook/jms/JMSCook.java b/src/main/java/at/ac/tuwien/sbc/valesriegler/cook/jms/JMSCook.java index c2378da..79026eb 100644 --- a/src/main/java/at/ac/tuwien/sbc/valesriegler/cook/jms/JMSCook.java +++ b/src/main/java/at/ac/tuwien/sbc/valesriegler/cook/jms/JMSCook.java @@ -10,6 +10,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import at.ac.tuwien.sbc.valesriegler.common.HasId; +import at.ac.tuwien.sbc.valesriegler.cook.jms.messageListeners.DeliveryOrdersToCook; import at.ac.tuwien.sbc.valesriegler.cook.jms.messageListeners.OrdersToCook; /** @@ -34,9 +35,14 @@ public class JMSCook implements HasId { Connection connection = connectionFactory.createConnection(); connection.start(); - Session sessWantToSit = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); - MessageConsumer consWantToSit = sessWantToSit.createConsumer(sessWantToSit.createQueue("OrdersToCook")); - consWantToSit.setMessageListener(new OrdersToCook(this)); + Session sessOrdersToCook = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + MessageConsumer consOrdersToCook = sessOrdersToCook.createConsumer(sessOrdersToCook.createQueue("OrdersToCook")); + consOrdersToCook.setMessageListener(new OrdersToCook(this)); + + Session sessDeliveryOrdersToCook = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + MessageConsumer consDeliveryOrdersToCook = sessDeliveryOrdersToCook.createConsumer(sessDeliveryOrdersToCook + .createQueue("DeliveryOrdersToCook")); + consDeliveryOrdersToCook.setMessageListener(new DeliveryOrdersToCook(this)); } catch (JMSException e) { log.error("EXCEPTION!", e); } diff --git a/src/main/java/at/ac/tuwien/sbc/valesriegler/cook/jms/messageListeners/DeliveryOrdersToCook.java b/src/main/java/at/ac/tuwien/sbc/valesriegler/cook/jms/messageListeners/DeliveryOrdersToCook.java index 049add1..973b778 100644 --- a/src/main/java/at/ac/tuwien/sbc/valesriegler/cook/jms/messageListeners/DeliveryOrdersToCook.java +++ b/src/main/java/at/ac/tuwien/sbc/valesriegler/cook/jms/messageListeners/DeliveryOrdersToCook.java @@ -1,15 +1,25 @@ package at.ac.tuwien.sbc.valesriegler.cook.jms.messageListeners; +import javax.jms.Connection; +import javax.jms.DeliveryMode; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; +import javax.jms.MessageProducer; import javax.jms.ObjectMessage; +import javax.jms.Session; +import org.apache.activemq.ActiveMQConnectionFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import at.ac.tuwien.sbc.valesriegler.cook.actions.DeliveryOrderInfo; import at.ac.tuwien.sbc.valesriegler.cook.jms.JMSCook; import at.ac.tuwien.sbc.valesriegler.group.actions.DeliveryOrderRequest; +import at.ac.tuwien.sbc.valesriegler.types.OrderStatus; +import at.ac.tuwien.sbc.valesriegler.types.Pizza; +import at.ac.tuwien.sbc.valesriegler.types.PizzaOrder; +import at.ac.tuwien.sbc.valesriegler.types.PizzaOrderStatus; /** * Cook the requested pizza. @@ -37,11 +47,44 @@ public class DeliveryOrdersToCook implements MessageListener { DeliveryOrderRequest dor = (DeliveryOrderRequest) obj; log.debug("Received: " + dor); - // generate random delay - Thread.sleep((long) (Math.random() * 10000)); + for (PizzaOrder pizzaorder : dor.getDeliveryGroupData().getOrder().getOrderedPizzas()) { + pizzaorder.setStatus(PizzaOrderStatus.IN_PREPARATION); + } - // TODO: + ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(cook.getCONNECTSTRING()); + Connection connection = connectionFactory.createConnection(); + connection.start(); + // inform pizzeria + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer prod = session.createProducer(session.createQueue("PizzeriaConnector")); + prod.setDeliveryMode(DeliveryMode.NON_PERSISTENT); + DeliveryOrderInfo doi = new DeliveryOrderInfo(dor.getDeliveryGroupData(), cook.getId()); + prod.send(session.createObjectMessage(doi)); + session.close(); + + // generate delay + for (PizzaOrder po : dor.getDeliveryGroupData().getOrder().getOrderedPizzas()) { + Thread.sleep(po.getPizzaType().duration * 1000); + po.setStatus(PizzaOrderStatus.DONE); + Pizza p = Pizza.createPizzaFromPizzaOrder(po, cook.getId(), false); + dor.getGroupdata().getOrder().getCookedPizzas().add(p); + } + dor.getGroupdata().getOrder().setStatus(OrderStatus.DELIVERY_PENDING); + + // let pizzas be delivered. + session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + prod = session.createProducer(session.createQueue("CookedDeliveryOrders")); + prod.setDeliveryMode(DeliveryMode.NON_PERSISTENT); + prod.send(session.createObjectMessage(doi)); + + // inform pizeria + prod = session.createProducer(session.createQueue("PizzeriaConnector")); + prod.setDeliveryMode(DeliveryMode.NON_PERSISTENT); + // WTF: oi = new OrderInfo(orderrequest.getGroupdata(), cook.getId()); + prod.send(session.createObjectMessage(doi)); + session.close(); + connection.close(); } else { log.warn("Received unknown Object: " + obj); } diff --git a/src/main/java/at/ac/tuwien/sbc/valesriegler/cook/jms/messageListeners/OrdersToCook.java b/src/main/java/at/ac/tuwien/sbc/valesriegler/cook/jms/messageListeners/OrdersToCook.java index 5c61c7b..62b6875 100644 --- a/src/main/java/at/ac/tuwien/sbc/valesriegler/cook/jms/messageListeners/OrdersToCook.java +++ b/src/main/java/at/ac/tuwien/sbc/valesriegler/cook/jms/messageListeners/OrdersToCook.java @@ -63,7 +63,7 @@ public class OrdersToCook implements MessageListener { prod.send(session.createObjectMessage(oi)); session.close(); - // generate random delay + // generate delay for (PizzaOrder po : orderrequest.getGroupdata().getOrder().getOrderedPizzas()) { Thread.sleep(po.getPizzaType().duration * 1000); po.setStatus(PizzaOrderStatus.DONE); @@ -81,7 +81,7 @@ public class OrdersToCook implements MessageListener { // inform pizeria prod = session.createProducer(session.createQueue("PizzeriaConnector")); prod.setDeliveryMode(DeliveryMode.NON_PERSISTENT); - oi = new OrderInfo(orderrequest.getGroupdata(), cook.getId()); + // WTF: oi = new OrderInfo(orderrequest.getGroupdata(), cook.getId()); prod.send(session.createObjectMessage(oi)); session.close(); diff --git a/src/main/java/at/ac/tuwien/sbc/valesriegler/driver/jms/JMSDriver.java b/src/main/java/at/ac/tuwien/sbc/valesriegler/driver/jms/JMSDriver.java index 2aac4e7..06318be 100644 --- a/src/main/java/at/ac/tuwien/sbc/valesriegler/driver/jms/JMSDriver.java +++ b/src/main/java/at/ac/tuwien/sbc/valesriegler/driver/jms/JMSDriver.java @@ -2,12 +2,15 @@ package at.ac.tuwien.sbc.valesriegler.driver.jms; import javax.jms.Connection; import javax.jms.JMSException; +import javax.jms.MessageConsumer; +import javax.jms.Session; import org.apache.activemq.ActiveMQConnectionFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import at.ac.tuwien.sbc.valesriegler.common.HasId; +import at.ac.tuwien.sbc.valesriegler.driver.jms.messageListeners.CookedDeliveryOrders; /** * JMSDriver registers all the Listeners. @@ -27,7 +30,7 @@ public class JMSDriver implements HasId { public JMSDriver(String jmsURL, int id) { CONNECTSTRING = jmsURL; this.id = id; - log.info("I AM A JMSCook WITH ID {}", this.id); + log.info("I AM A JMSDriver WITH ID {}", this.id); try { // Connecting to the Broker and to the output queue @@ -35,11 +38,10 @@ public class JMSDriver implements HasId { Connection connection = connectionFactory.createConnection(); connection.start(); - // Session sessWantToSit = connection.createSession(false, - // Session.CLIENT_ACKNOWLEDGE); - // MessageConsumer consWantToSit = - // sessWantToSit.createConsumer(sessWantToSit.createQueue("OrdersToCook")); - // consWantToSit.setMessageListener(new OrdersToCook(this)); + Session sessCookedDeliveryOrders = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + MessageConsumer consCookedDeliveryOrders = sessCookedDeliveryOrders.createConsumer(sessCookedDeliveryOrders + .createQueue("OrdersToCook")); + consCookedDeliveryOrders.setMessageListener(new CookedDeliveryOrders(this)); } catch (JMSException e) { log.error("EXCEPTION!", e); } diff --git a/src/main/java/at/ac/tuwien/sbc/valesriegler/driver/jms/messageListeners/CookedDeliveryOrders.java b/src/main/java/at/ac/tuwien/sbc/valesriegler/driver/jms/messageListeners/CookedDeliveryOrders.java new file mode 100644 index 0000000..4d2d889 --- /dev/null +++ b/src/main/java/at/ac/tuwien/sbc/valesriegler/driver/jms/messageListeners/CookedDeliveryOrders.java @@ -0,0 +1,95 @@ +package at.ac.tuwien.sbc.valesriegler.driver.jms.messageListeners; + +import javax.jms.Connection; +import javax.jms.DeliveryMode; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.jms.ObjectMessage; +import javax.jms.Session; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import at.ac.tuwien.sbc.valesriegler.common.Util; +import at.ac.tuwien.sbc.valesriegler.cook.actions.DeliveryOrderInfo; +import at.ac.tuwien.sbc.valesriegler.driver.jms.JMSDriver; +import at.ac.tuwien.sbc.valesriegler.group.actions.DeliveryOrderResponse; + +/** + * + * + * @author jan + * + */ +public class CookedDeliveryOrders implements MessageListener { + private static final Logger log = LoggerFactory.getLogger(CookedDeliveryOrders.class); + private final JMSDriver driver; + + public CookedDeliveryOrders(JMSDriver driver) { + this.driver = driver; + } + + @Override + public void onMessage(Message msg) { + try { + synchronized (driver) { + if (msg instanceof ObjectMessage) { + ObjectMessage objMsg = (ObjectMessage) msg; + Object obj = objMsg.getObject(); + + if (obj instanceof DeliveryOrderResponse) { + DeliveryOrderResponse dor = (DeliveryOrderResponse) obj; + log.debug("Received: " + dor); + + // generate random delay + Thread.sleep((long) (Math.random() * 10000)); + String addr = dor.getDeliveryGroupData().getAddress(); + + // Deliver to destination broker + ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(Util.JMS_DELIVERY_DESTINATION); + Connection connection = connectionFactory.createConnection(); + connection.start(); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer prod = session.createProducer(session.createQueue(addr)); + prod.setDeliveryMode(DeliveryMode.NON_PERSISTENT); + DeliveryOrderInfo doi = new DeliveryOrderInfo(dor.getDeliveryGroupData(), driver.getId()); + prod.send(session.createObjectMessage(doi)); + session.close(); + connection.close(); + + // Inform pizzeria + connectionFactory = new ActiveMQConnectionFactory(driver.getCONNECTSTRING()); + connection = connectionFactory.createConnection(); + connection.start(); + + session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + prod = session.createProducer(session.createQueue("PizzeriaConnector")); + prod.setDeliveryMode(DeliveryMode.NON_PERSISTENT); + prod.send(session.createObjectMessage(doi)); + session.close(); + connection.close(); + + } else { + log.warn("Received unknown Object: " + obj); + } + } else { + log.warn("Received unknown Message: " + msg); + } + msg.acknowledge(); + } + } catch (JMSException e) { + log.error("EXCEPTION!", e); + } catch (InterruptedException e) { + log.error("EXCEPTION!", e); + } + } + + @Override + public String toString() { + return "PendingDeliveries [driver=" + driver + "]"; + } +} diff --git a/src/main/java/at/ac/tuwien/sbc/valesriegler/driver/jms/messageListeners/PendingDeliveries.java b/src/main/java/at/ac/tuwien/sbc/valesriegler/driver/jms/messageListeners/PendingDeliveries.java deleted file mode 100644 index 75f0308..0000000 --- a/src/main/java/at/ac/tuwien/sbc/valesriegler/driver/jms/messageListeners/PendingDeliveries.java +++ /dev/null @@ -1,64 +0,0 @@ -package at.ac.tuwien.sbc.valesriegler.driver.jms.messageListeners; - -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageListener; -import javax.jms.ObjectMessage; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import at.ac.tuwien.sbc.valesriegler.driver.jms.JMSDriver; -import at.ac.tuwien.sbc.valesriegler.group.actions.DeliveryOrderRequest; - -/** - * - * - * @author jan - * - */ -public class PendingDeliveries implements MessageListener { - private static final Logger log = LoggerFactory.getLogger(PendingDeliveries.class); - private final JMSDriver driver; - - public PendingDeliveries(JMSDriver driver) { - this.driver = driver; - } - - @Override - public void onMessage(Message msg) { - try { - synchronized (driver) { - if (msg instanceof ObjectMessage) { - ObjectMessage objMsg = (ObjectMessage) msg; - Object obj = objMsg.getObject(); - - if (obj instanceof DeliveryOrderRequest) { - DeliveryOrderRequest dor = (DeliveryOrderRequest) obj; - log.debug("Received: " + dor); - - // generate random delay - Thread.sleep((long) (Math.random() * 10000)); - - // TODO: - - } else { - log.warn("Received unknown Object: " + obj); - } - } else { - log.warn("Received unknown Message: " + msg); - } - msg.acknowledge(); - } - } catch (JMSException e) { - log.error("EXCEPTION!", e); - } catch (InterruptedException e) { - log.error("EXCEPTION!", e); - } - } - - @Override - public String toString() { - return "PendingDeliveries [driver=" + driver + "]"; - } -} -- 2.43.0