Tema 2: JMS: Java Message Service (continuación)

2.5.Browser de Mensajes

En el tema anterior vimos como recibir mensajes de una cola, pero supongamos que solo queremos consultar la cola. Esto se puede hacer reemplazando los objetos QueueReceiver con objetos QueueBrowswer. En el método init() del ejemplo QueueBrowse.java se crea uno de estos objetos:

qbrowser = qsession.createBrowser(queue);

En el método displayQueue() se muestra como consultar la cola. En primer lugar se obtiene un Enumeration con los mensajes:

Enumeration e = qbrowser.getEnumeration();

A partir de aquí se itera obteniendo las propiedades del mensaje, tales como la prioridad, el identificador, etc...

...
if (! e.hasMoreElements()) {
      System.out.println("There are no messages on this queue.");
    } else {
      System.out.println("Queued JMS Messages: ");
      while (e.hasMoreElements()) {
        m = (Message) e.nextElement();
        System.out.println("Message ID " + m.getJMSMessageID() +
                           " delivered " + new Date(m.getJMSTimestamp()) +
                           " to " + m.getJMSDestination());
        System.out.print("\tExpires        ");
        if (m.getJMSExpiration() > 0) {
          System.out.println( new Date( m.getJMSExpiration()));
            }
        else
          System.out.println("never");
        System.out.println("\tPriority       " + m.getJMSPriority());
        System.out.println("\tMode           " + (
                      m.getJMSDeliveryMode() == DeliveryMode.PERSISTENT ?
                                       "PERSISTENT" : "NON_PERSISTENT"));
        System.out.println("\tCorrelation ID " + m.getJMSCorrelationID());
        System.out.println("\tReply to       " + m.getJMSReplyTo());
        System.out.println("\tMessage type   " + m.getJMSType());
        if (m instanceof TextMessage) {
          System.out.println("\tTextMessage    \"" + ((TextMessage)m).getText() + "\"");
        }
      }
    }

Estos métodos getXXXX tienen sus contrapartidas setXXXX. De todas maneras estas propiedades se establecen en el momento en que enviamos un mensaje.

2.6. Transacciones en JMS

El objetivo de las transacciones en este contexto es tratar un grupo de mensajes que se producen o consumen como una unidad atómica. Esto implica que en el momento en que una aplicacion realiza el commit de una transacción, todos los mensajes que recibió durante esta transacción se borran del sistema de mensajes y todos los que envió son distribuidos.

Por el contrario, si se realiza un rollback los mensajes recibidos son devueltos al sistema de mensajes y los que envió son descartados. Si un suscriptor de tópicos hace un rollback de un mensaje éste se re-envía al tópico. Cuandi un receptor de mensajes PTP hace el rollback el mensaje se re-envía a la cola para que otro consumidor lo pueda recibir.

La forma más directa de usar transacciones en JMS es crear una sesión transaccional, lo cual implica poner un flag a verdadero en el momento en que creamos la sesión. Tomando como referencia el ejemplo JMSDrawDemo.java, que iremos detallando durante esta sección, en el método initJMS() que es donde se inicia el contexto, se crea la sesión no-transaccional y otra transaccional, en un esquema Pub/sub:

...
session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
sessionTX = connection.createTopicSession(true, Session.AUTO_ACKNOWLEDGE);
...

donde session y sessionTX son objetos TopicSession. A partir de este punto los TopicPublisher que se creeen serán no-transaccionales o transaccionales en función de si se crean a partir de una sesión no-transaccional o transaccional:

topic = (Topic) ctx.lookup(TOPIC);
publisher = session.createPublisher(topic);
publisherTX = sessionTX.createPublisher(topic);

En lo que respecta al objeto TopicSubscriber llamado subscriber no haremos tal distinción y solamente nos interesa especificar si es de tipo durable o no. En el primer caso JMS almacena el mensaje en un medio persistente (como una BD) hasta asegurarse de que ha sido entregado o ha expirado, incluso si el suscriptor no estaba activo en el momento en que se envió el mensaje. A continuación se registra el oyente, se crea un mensaje y se inicia la conexión:

if (durableSubscriberID == null) {
      subscriber = session.createSubscriber(topic, "TRUE", noLocal);
    } else {
      subscriber = session.createDurableSubscriber(topic, durableSubscriberID);
    }
subscriber.setMessageListener(this);
msg = session.createMessage();
connection.start();

Esta aplicación ejemplo se lanza desde dos clientes (ver instrucciones HTML en el directorio de ejemplos de JMS). Cada cliente crea un panel en donde se puede seleccionar si la sesión va a ser transaccional o no y después puede pinchar para añadir iconos. Cada icono se traduce en un mensaje que se publica. Esto se hace en el método publishPoint que recibe como argumento las coordenadas donde hemos pinchado con el ratón. A partir de estas coordenadas construye un mensaje y almacena todos los puntos enviados en el vector pointsSent:

private void publishPoint(int x, int y)
  {
    try {
      msg.setIntProperty("x", x);
      msg.setIntProperty("y", y);
      pointsSent.addElement(new Point(x, y));
      if (isTransacted) {
        publisherTX.publish(msg, deliveryMode(), 5, 0);
      } else {
        publisher.publish(msg, deliveryMode(), 5, 0);
      }
    } catch (JMSException jmse) {
      jmse.printStackTrace();
    }
  }

El mensaje es publicado de forma transaccional o no según el caso. El método setIntProperty() establece un campo de propiedad (property field).

Una vez publicado, el suscriptor lo recibe a través del método onMessage() que obtiene con getIntProperty() los enteros y pinta una imagen de una furgoneta (caso no-transaccional) o un asterisco (caso transaccional):

public void onMessage(Message msg)
  {
    try {
      String command = msg.getStringProperty("command");
      if (command != null && command.equals("clearScreen")) {
        clearScreen();
      } else {
        int x = msg.getIntProperty("x");
        int y = msg.getIntProperty("y");
        Point p = new Point(x,y);
        updateImage(p);
      }
    } catch (JMSException e) {
      e.printStackTrace();
    }
  }

En el panel de cada cliente hay un botón para hacer commit y otro para hacer rollback. Con ellos se llama respectivamente a los métodos doCommit() y doRollback() desde los que se invocan a los métodos commit() y rollback() de la sesión:

  private void doCommit()
  {
    try {
      sessionTX.commit();
      commitButton.setEnabled(false);
      rollbackButton.setEnabled(false);
      repaint();
    } catch (JMSException jmse) {
      jmse.printStackTrace();
    }
  }

private void doRollback()
  {
    try {
      sessionTX.rollback();
      pointsSent.removeAllElements();
      commitButton.setEnabled(false);
      rollbackButton.setEnabled(false);
      offScreenImage = null;
      repaint();
    } catch (JMSException jmse) {
      jmse.printStackTrace();
    }
  }

En una sesión transaccional, los asteriscos no se pintan hasta que se hace commit y se borran del panel si se decide hacer rollback.

2.7. Interacción con Aplicaciones

Vamos a ver un ejemplo de un servlet que interacciona, a través de un mensaje MAP (pares de atributo valor) con un cliente que a su vez invoca a un EJB para realizar una tarea concreta. El código del cliente se encuentra en el fichero TraderReceive.java. También en este ejemplo utilizamos transacciones, pero a través del package javax.transaction. Se usa un esquema Pub/sub

En el método init() se acaba creando un objeto Trader a partir de una sesión EJB, y una transacción:

public void init(Context ctx, String topicName)
       throws NamingException, JMSException, RemoteException, CreateException
  {
    connectionFactory = (TopicConnectionFactory) ctx.lookup(JMS_FACTORY);

    connection = connectionFactory.createTopicConnection();
    connection.setClientID("traderReceive");
    session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
    topic = (Topic) ctx.lookup(topicName);
    subscriber = session.createDurableSubscriber(topic, "traderReceive");
    TraderHome brokerage = (TraderHome) ctx.lookup(EJB_HOME);
    ejbTrader = brokerage.create();
    tx = (javax.transaction.UserTransaction) ctx.lookup(TX);
    connection.start();
  }

La recepción y tratamiento de los mensajes se realiza en el método processMessages() en donde:

1. Nos quedamos en modo recepción:

Message msg = subscriber.receive();

2. Iniciamos la transacción:

tx.begin();

3. Leemos el contenido del objeto MapMessage:

MapMessage m = (MapMessage) msg;
String customerName = m.getString("CustomerName");
String tradeType = m.getString("TradeType");
String symbol = m.getString("Symbol");
int numberOfShares = m.getInt("Shares");

4. Detectamos el tipo de comando y lanzamos las órden correspondiente. Si ésta tiene éxtito entonces se confirma la transacción y en caso contrario se deshace:

...
if ("sell".equalsIgnoreCase(tradeType)) {
  tr = ejbTrader.sell(symbol, numberOfShares);
  System.out.println("Sold " + tr.getNumberTraded());
  tx.commit();
} else {
  System.out.println("Rolling Back Transaction");
  tx.rollback();
  System.out.println("Unknown TradeType: "+tradeType);
}
...