Active Object パターン
* 並行処理のパターン * 別名 : Actor パターン
用途
* 外部から非同期にメッセージを受け取っても、自分固有のスレッドで、 自分の都合のいいタイミングで処理を実行させたい場合に利用する
処理の流れ
* Client :処理を要求するスレッド * Servant :処理内容を記述したオブジェクト * Scheduler:Servant役の呼び出す(Worker Threadパターン) * Proxy :Client役からの要求は、Proxy役へのメソッド呼び出しとして実現 → Proxy役は、その要求を1つのオブジェクトに変換 → Producer-Consumerパターンを使って、Scheduler役に渡す [1] 「メソッドの呼び出し(invoke)要求」を非同期に受け取る [2] スケジューラがその要求順序を制御する。 [3] 実際に要求を実行(execution)するのはScheduler役 → 実行結果をFutureパターンを使って、Client役に返す
サンプル
Main.java
public class Main { public static void main(String[] args) { ActiveObject activeObject = ActiveObjectFactory.create(); try { new MakerClientThread("Alice", activeObject).start(); new MakerClientThread("Bobby", activeObject).start(); new DisplayClientThread("Chris", activeObject).start(); Thread.sleep(5000); } catch (InterruptedException e) { } finally { System.out.println("*** shutdown ***"); activeObject.shutdown(); } } }
IActiveObject.java
import java.util.concurrent.Future; public interface IActiveObject { public abstract Future<String> makeString(int count, char fillchar); public abstract void displayString(String string); public abstract void shutdown(); }
ActiveObject.java
import java.util.concurrent.Executors; import java.util.concurrent.ExecutorService; import java.util.concurrent.Callable; import java.util.concurrent.Future; // ActiveObjectインタフェースの実装クラス public class ActiveObject implements IActiveObject { private final ExecutorService service = Executors.newSingleThreadExecutor(); // サービスの終了 public void shutdown() { this.service.shutdown(); } // 戻り値のある呼び出し public Future<String> makeString(final int count, final char fillchar) { // リクエスト class MakeStringRequest implements Callable<String> { public String call() { char[] buffer = new char[count]; for (int i = 0; i < count; i++) { buffer[i] = fillchar; try { Thread.sleep(100); } catch (InterruptedException e) { } } return new String(buffer); } } // リクエストの発行 return this.service.submit(new MakeStringRequest()); } // 戻り値のない呼び出し public void displayString(final String string) { // リクエスト class DisplayStringRequest implements Runnable { public void run() { try { System.out.println("displayString: " + string); Thread.sleep(10); } catch (InterruptedException e) { } } } // リクエストの発行 this.service.execute(new DisplayStringRequest()); } }
ActiveObjectFactory.java
public class ActiveObjectFactory { public static IActiveObject create() { return new ActiveObject(); } }
MakerClientThread.java
import java.util.concurrent.Future; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.CancellationException; import java.util.concurrent.ExecutionException; public class MakerClientThread extends Thread { private final IActiveObject activeObject; private final char fillchar; public MakerClientThread(String name, IActiveObject activeObject) { super(name); this.activeObject = activeObject; this.fillchar = name.charAt(0); } public void run() { try { for (int i = 0; true; i++) { // 戻り値のある呼び出し Future<String> future = activeObject.makeString(i, fillchar); Thread.sleep(10); String value = future.get(); System.out.println(Thread.currentThread().getName() + ": value = " + value); } } catch (RejectedExecutionException e) { System.out.println(Thread.currentThread().getName() + ":" + e); } catch (CancellationException e) { System.out.println(Thread.currentThread().getName() + ":" + e); } catch (ExecutionException e) { System.out.println(Thread.currentThread().getName() + ":" + e); } catch (InterruptedException e) { System.out.println(Thread.currentThread().getName() + ":" + e); } } }
DisplayClientThread.java
import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.CancellationException; public class DisplayClientThread extends Thread { private final IActiveObject activeObject; public DisplayClientThread(String name, IActiveObject activeObject) { super(name); this.activeObject = activeObject; } public void run() { try { for (int i = 0; true; i++) { // 戻り値のない呼び出し String string = Thread.currentThread().getName() + " " + i; activeObject.displayString(string); Thread.sleep(200); } } catch (RejectedExecutionException e) { System.out.println(Thread.currentThread().getName() + ":" + e); } catch (CancellationException e) { System.out.println(Thread.currentThread().getName() + ":" + e); } catch (InterruptedException e) { System.out.println(Thread.currentThread().getName() + ":" + e); } } }
補足
関連しているパターン
[1] Producer-Consumer パターンhttp://blogs.yahoo.co.jp/dk521123/32914080.html
[2] Future パターンhttp://blogs.yahoo.co.jp/dk521123/32918692.html
[3] Worker Thread パターンhttp://blogs.yahoo.co.jp/dk521123/32918314.html
[4] Thread-Specific Storage パターンhttp://blogs.yahoo.co.jp/dk521123/32910143.html
参考文献
http://alpha-netzilla.blogspot.jp/2010/11/design-pattern.htmlhttp://waman.hatenablog.com/entry/20111222/1324505791
http://d.hatena.ne.jp/otuzak/20080527/1211864889