カタカタブログ

SIerで働くITエンジニアがカタカタした記録を残す技術ブログ。Java, Oracle Database, Linuxが中心です。たまに数学やデータ分析なども。

WebLogic 12c(12.1.3)でJMSキューを試す

WebLogic 12c上にJMSキューを作成し、キューとのメッセージ送受信を試してみたのでメモ。キューのエンキュー・デキューのプログラムはJNDIでキューにアクセスしているので、WebLogicのキューであることは特に意識していない。

JMSキュー作成

まずWebLogic管理コンソールにログインし、
[サービス] > [メッセージング] > [JMSモジュール]を選択する。
f:id:osn_th:20150501002113p:plain
JMSモジュールを新規で作成する。
f:id:osn_th:20150501002116p:plain
接続ファクトリとキューを作成する。プログラムからはJNDI名でアクセスするので、以下のように定義しておく。

接続ファクトリ : jms/cf

キュー : jms/q

f:id:osn_th:20150501002119p:plain

キュー送信処理(エンキュー)

キューができたので、キューとのメッセージをやりとりするクラスを作り、そこに送受信用のメソッドを定義していく。とりあえず送信用はこんなかんじで作成。引数としてテキストをStringで受け取り、それをエンキューするというシンプルな処理だと以下のようなコードで実現できる(コネクションのクローズ周りはちょっとロジックが甘いかも。。)なお、ソースコードの全体は最後に載せるので、ここではメソッドのみ抜粋する。

  public static void sendQueue(String text) throws NamingException {
    Context ctx = new InitialContext();
    QueueConnectionFactory factory = (QueueConnectionFactory) ctx.lookup("jms/cf");
    Queue queue = (Queue) ctx.lookup("jms/q");
    try {
      QueueConnection conn = factory.createQueueConnection();
      QueueSession session = conn.createQueueSession(false, QueueSession.AUTO_ACKNOWLEDGE);
      QueueSender sender = session.createSender(queue);
      TextMessage message = session.createTextMessage();
      message.setText(text);
      sender.send(message);
      
      sender.close();
      session.close();
      conn.close();
    } catch (JMSException e) {
      e.printStackTrace();
    } finally {
      ctx.close();
    }
  }
  

キュー受信処理(デキュー)

続いて受信側を実装する。送信のときとほぼ同じような形になり、Senderクラスの代わりにReceiverクラスを使う。receiveNoWaitメソッドを使っているので、キューにメッセージがなければすぐにメソッドが終了し、nullが返ってくる。ここでrecieveメソッドを使うと、次のエンキューまで同期で待つことができ、receive(long timeout)とするとtimeout値(ミリ秒)を設定できる。

  public static String receiveQueue() throws NamingException {
    Context ctx = new InitialContext();
    QueueConnectionFactory factory = (QueueConnectionFactory) ctx.lookup("jms/cf");
    Queue queue = (Queue) ctx.lookup("jms/q");
    try {
      QueueConnection conn = factory.createQueueConnection();
      QueueSession session = conn.createQueueSession(false, QueueSession.AUTO_ACKNOWLEDGE);
      QueueReceiver receiver = session.createReceiver(queue);
      conn.start();
      TextMessage message = (TextMessage) receiver.receiveNoWait();
      if (message == null) {
        return "No message recieved.";
      }
      return message.getText();
    } catch (JMSException e) {
      e.printStackTrace();
      return "recieve message failed.";
    } finally {
      ctx.close();
    }
  }

ちなみにこれは1件だけのメッセージを受信するが、全てのメッセージを一度に受信したい場合は以下のようにdo-whileでnullが返ってくるまでreceiveメソッドを呼び出すと、複数メッセージを取得できる。また、デキューするときにロールバックするようにすると全てのキューのメッセージを取り出しつつ、メッセージをロールバックするのでメッセージはキューに残ったままとなる。これにより、擬似的にキュー内のメッセージを一覧化することもできる(引数のisRollbackをtrueにして呼び出す)。

  public static String receiveQueueAll(boolean isRollback) throws NamingException {
    Context ctx = new InitialContext();
    QueueConnectionFactory factory = (QueueConnectionFactory) ctx.lookup("jms/cf");
    Queue queue = (Queue) ctx.lookup("jms/q");
    try {
      QueueConnection conn = factory.createQueueConnection();
      QueueSession session = isRollback ? conn.createQueueSession(true, -1) : conn.createQueueSession(false, QueueSession.AUTO_ACKNOWLEDGE);
      QueueReceiver receiver = session.createReceiver(queue);
      conn.start();
      StringBuilder sb = new StringBuilder();
      TextMessage message;
      do {
        message = (TextMessage)receiver.receiveNoWait();
        if (message != null) {
          sb.append(message.getText()).append(System.lineSeparator());
        }
      } while (message != null);
      if (isRollback) {
        session.rollback();
      }
      
      String ret = sb.toString();
      if (ret.length() == 0) {
        return "No message recieved.";
      }
      return ret;
    } catch (JMSException e) {
      e.printStackTrace();
      return "recieving message was failed.";
    } finally {
      ctx.close();
    }
  }

適当にメッセージ送受信画面を作る

上のメソッドを呼び出すような画面を適当に作って読んでみる。単純なServletとJSPで上記メソッドを呼び出すだけのものなのでコードの掲載は割愛するが、WebLogicにデプロイして画面を開くとこんなかんじ。

まず、aaaというメッセージをEnqueueしてみる。Enqueueボタンをクリックすると、入力したTextを引数にsendQueueメソッドが呼ばれるようになっている。
f:id:osn_th:20150501002121p:plain
続いて、Dequeueしてみる。Dequeueボタンをクリックすると、receiveQueueメソッドを呼び出し、結果を画面に表示する。
f:id:osn_th:20150501002124p:plain
もう一度Dequeueすると、今度はメッセージがないと言われる。
f:id:osn_th:20150501002126p:plain
最後は、[aaa] -> [bbb] -> [ccc]と3つのメッセージをEnqueueしてみる。
その後、DequeueAllする。DequeueAllボタンをクリックすると、receiveQueueAll(true)メソッドを呼び出す。引数のisRollbackがtrueなので、メッセージは取得できるもののデキューはロールバックされるため、キューの中身は変わらない。
f:id:osn_th:20150501002128p:plain
全てのメッセージが一度に取得できる。もう一度クリックすると、同じ内容が確認できることから、キューの中身が変わっていないことが分かる。

まとめ

以上、WebLogic上のJMSキュー作成と、簡単なキュー送受信のアプリケーションを作ってみた。キューを使うと複数のアプリケーション間の通信が疎結合になるので、異なるサーバ間にデプロイされるものや外部サーバーと連携するようなアプリケーションに使うと便利。

Appendix 〜JMS連携クラスのサンプル・コード〜

JMSLogic.java

package jms;

import javax.jms.JMSException;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueReceiver;
import javax.jms.QueueSender;
import javax.jms.QueueSession;
import javax.jms.TextMessage;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;

public class JmsLogic {
  
  public static void sendQueue(String text) throws NamingException {
    Context ctx = new InitialContext();
    QueueConnectionFactory factory = (QueueConnectionFactory) ctx.lookup("jms/cf");
    Queue queue = (Queue) ctx.lookup("jms/q");
    try {
      QueueConnection conn = factory.createQueueConnection();
      QueueSession session = conn.createQueueSession(false, QueueSession.AUTO_ACKNOWLEDGE);
      QueueSender sender = session.createSender(queue);
      TextMessage message = session.createTextMessage();
      message.setText(text);
      sender.send(message);
      
      sender.close();
      session.close();
      conn.close();
    } catch (JMSException e) {
      e.printStackTrace();
    } finally {
      ctx.close();
    }
  }
  
  public static String receiveQueue() throws NamingException {
    Context ctx = new InitialContext();
    QueueConnectionFactory factory = (QueueConnectionFactory) ctx.lookup("jms/cf");
    Queue queue = (Queue) ctx.lookup("jms/q");
    try {
      QueueConnection conn = factory.createQueueConnection();
      QueueSession session = conn.createQueueSession(false, QueueSession.AUTO_ACKNOWLEDGE);
      QueueReceiver receiver = session.createReceiver(queue);
      conn.start();
      TextMessage message = (TextMessage) receiver.receiveNoWait();
      if (message == null) {
        return "No message recieved.";
      }
      return message.getText();
    } catch (JMSException e) {
      e.printStackTrace();
      return "recieve message failed.";
    } finally {
      ctx.close();
    }
  }
  
  public static String receiveQueueAll() throws NamingException {
    return receiveQueueAll(true);
  }
  
  public static String receiveQueueAll(boolean isRollback) throws NamingException {
    Context ctx = new InitialContext();
    QueueConnectionFactory factory = (QueueConnectionFactory) ctx.lookup("jms/cf");
    Queue queue = (Queue) ctx.lookup("jms/q");
    try {
      QueueConnection conn = factory.createQueueConnection();
      QueueSession session = isRollback ? conn.createQueueSession(true, -1) : conn.createQueueSession(false, QueueSession.AUTO_ACKNOWLEDGE);
      QueueReceiver receiver = session.createReceiver(queue);
      conn.start();
      StringBuilder sb = new StringBuilder();
      TextMessage message;
      do {
        message = (TextMessage)receiver.receiveNoWait();
        if (message != null) {
          sb.append(message.getText()).append(System.lineSeparator());
        }
      } while (message != null);
      if (isRollback) {
        session.rollback();
      }
      
      String ret = sb.toString();
      if (ret.length() == 0) {
        return "No message recieved.";
      }
      return ret;
    } catch (JMSException e) {
      e.printStackTrace();
      return "recieving message was failed.";
    } finally {
      ctx.close();
    }
  }
}

以上。