【デザインパターン】【非同期】Active Object パターン

Active Object パターン

 * 並行処理のパターン
 * 別名 : Actor パターン

用途

 * 外部から非同期にメッセージを受け取っても、自分固有のスレッドで、
  自分の都合のいいタイミングで処理を実行させたい場合に利用する

処理の流れ

 * Client   :処理を要求するスレッド
 * Servant  :処理内容を記述したオブジェクト
 * Scheduler:Servant役の呼び出す(Worker Threadパターン)
 * Proxy    :Client役からの要求は、Proxy役へのメソッド呼び出しとして実現
 → Proxy役は、その要求を1つのオブジェクトに変換
 → Producer-Consumerパターンを使って、Scheduler役に渡す

 [1] 「メソッドの呼び出し(invoke)要求」を非同期に受け取る
 [2] スケジューラがその要求順序を制御する。
 [3] 実際に要求を実行(execution)するのはScheduler役
 → 実行結果をFutureパターンを使って、Client役に返す

サンプル

Main.java

public class Main {
   public static void main(String[] args) {
      ActiveObject activeObject = ActiveObjectFactory.create();
      try {
         new MakerClientThread("Alice", activeObject).start();
         new MakerClientThread("Bobby", activeObject).start();
         new DisplayClientThread("Chris", activeObject).start();
         Thread.sleep(5000);
      } catch (InterruptedException e) {
      } finally {
         System.out.println("*** shutdown ***");
         activeObject.shutdown();
      }
   }
}

IActiveObject.java

import java.util.concurrent.Future;

public interface IActiveObject {
   public abstract Future<String> makeString(int count, char fillchar);

   public abstract void displayString(String string);

   public abstract void shutdown();
}

ActiveObject.java

import java.util.concurrent.Executors;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Callable;
import java.util.concurrent.Future;

// ActiveObjectインタフェースの実装クラス
public class ActiveObject implements IActiveObject {
   private final ExecutorService service = Executors.newSingleThreadExecutor();

   // サービスの終了
   public void shutdown() {
      this.service.shutdown();
   }

   // 戻り値のある呼び出し
   public Future<String> makeString(final int count, final char fillchar) {
      // リクエスト
      class MakeStringRequest implements Callable<String> {
         public String call() {
            char[] buffer = new char[count];
            for (int i = 0; i < count; i++) {
               buffer[i] = fillchar;
               try {
                  Thread.sleep(100);
               } catch (InterruptedException e) {
               }
            }
            return new String(buffer);
         }
      }
      // リクエストの発行
      return this.service.submit(new MakeStringRequest());
   }

   // 戻り値のない呼び出し
   public void displayString(final String string) {
      // リクエスト
      class DisplayStringRequest implements Runnable {
         public void run() {
            try {
               System.out.println("displayString: " + string);
               Thread.sleep(10);
            } catch (InterruptedException e) {
            }
         }
      }
      // リクエストの発行
      this.service.execute(new DisplayStringRequest());
   }
}

ActiveObjectFactory.java

public class ActiveObjectFactory {
   public static IActiveObject create() {
      return new ActiveObject();
   }
}

MakerClientThread.java

import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;

public class MakerClientThread extends Thread {
   private final IActiveObject activeObject;
   private final char fillchar;

   public MakerClientThread(String name, IActiveObject activeObject) {
      super(name);
      this.activeObject = activeObject;
      this.fillchar = name.charAt(0);
   }

   public void run() {
      try {
         for (int i = 0; true; i++) {
            // 戻り値のある呼び出し
            Future<String> future = activeObject.makeString(i, fillchar);
            Thread.sleep(10);
            String value = future.get();
            System.out.println(Thread.currentThread().getName() + ": value = "
                  + value);
         }
      } catch (RejectedExecutionException e) {
         System.out.println(Thread.currentThread().getName() + ":" + e);
      } catch (CancellationException e) {
         System.out.println(Thread.currentThread().getName() + ":" + e);
      } catch (ExecutionException e) {
         System.out.println(Thread.currentThread().getName() + ":" + e);
      } catch (InterruptedException e) {
         System.out.println(Thread.currentThread().getName() + ":" + e);
      }
   }
}

DisplayClientThread.java

import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.CancellationException;

public class DisplayClientThread extends Thread {
   private final IActiveObject activeObject;

   public DisplayClientThread(String name, IActiveObject activeObject) {
      super(name);
      this.activeObject = activeObject;
   }

   public void run() {
      try {
         for (int i = 0; true; i++) {
            // 戻り値のない呼び出し
            String string = Thread.currentThread().getName() + " " + i;
            activeObject.displayString(string);
            Thread.sleep(200);
         }
      } catch (RejectedExecutionException e) {
         System.out.println(Thread.currentThread().getName() + ":" + e);
      } catch (CancellationException e) {
         System.out.println(Thread.currentThread().getName() + ":" + e);
      } catch (InterruptedException e) {
         System.out.println(Thread.currentThread().getName() + ":" + e);
      }
   }
}

補足

関連しているパターン

 [1] Producer-Consumer パターン
http://blogs.yahoo.co.jp/dk521123/32914080.html
 [2] Future パターン
http://blogs.yahoo.co.jp/dk521123/32918692.html
 [3] Worker Thread パターン
http://blogs.yahoo.co.jp/dk521123/32918314.html
 [4] Thread-Specific Storage パターン
http://blogs.yahoo.co.jp/dk521123/32910143.html


関連記事

デザインパターン / マルチスレッド の分類 ~目次~

http://blogs.yahoo.co.jp/dk521123/34195603.html

java.util.concurrent について

http://blogs.yahoo.co.jp/dk521123/32538961.html