Worker Thread パターンについて
* 別名 = 「Background Thread」、「Thread Pool(スレッドプール)」 * 呼び出しと実行の分離 * スレッドを使いまわすことによる処理能力(スループット)の向上
Worker Thread パターンをjava.util.concurrent を使って実装するには
* 以下の関連記事を参照のこと。http://blogs.yahoo.co.jp/dk521123/36245450.html
http://blogs.yahoo.co.jp/dk521123/33942167.html
構成
* Client = 依頼者(Threadを継承し、順次依頼する) + 仕事の Request (依頼)を作成し、Channel 役に渡す * Channel = 通信路(キューを保持) + Client から仕事の Request を受け取り、Worker に渡す * Worker = 作業者(Threadを継承し、順次処理する) + Channel から仕事の Request をもらい、その Request を処理する。 + Request がなかったら、新しい Request が届くまで待つ。 * Request = 依頼 + 仕事の Request 自体を表す + その仕事を実行するのに必要な情報を保持する
シーケンス
* よくできてる+--------+ +---------+ +--------+ | Client | | Channel | | Worker | +--------+ +---------+ +--------+ | | | | NEW +---------+ | | |------> | Request | | takeRequest | | +---------+ |<-------------| | | | | | putRequest | | | |------------------------->| | | | | takeRequest | | | |<-------------| | | | | | | | Request | | | |------------->| | | | | | | execute | | | |<--------------------------| | | | | | | | | | |-------------------------->| | | | | | | | |
補足
■ 関連しているパターン
[1] Guarded Suspensionパターン => 処理要求に対して、実行可能な状態になるまで待機させるデザインパターンhttp://blogs.yahoo.co.jp/dk521123/32928012.html
[2] Producer-Consumer パターンhttp://blogs.yahoo.co.jp/dk521123/32914080.html
■ Thread-Per-Message パターン との違い
* Worker Thread パターンではスレッドを使いまわし、リサイクルする
サンプル
* 実際には、以下の関連記事に書かれているjava.util.concurrentを使った方がいいがhttp://blogs.yahoo.co.jp/dk521123/36245450.html
http://blogs.yahoo.co.jp/dk521123/33942167.html
とりあえず、勉強なんで以下の本に近い形でサンプルを作成してみるhttp://www.hyuki.com/dp/dp2_ch08.pdf
Main.java
public class Main { public static void main(String[] args) { Channel channel = new Channel(5); channel.startWorker(); new Client("Yahoo", channel).start(); new Client("Google", channel).start(); new Client("Oracle", channel).start(); } }
Client.java
import java.util.Random; public class Client extends Thread { private final Channel channel; public Client(String clientName, Channel channel) { super(clientName); this.channel = channel; } public void run() { try { for (int i = 0; true; i++) { Request request = new Request(getName(), i); this.channel.putRequest(request); this.doHeavyJob(); } } catch (InterruptedException e) { } } private void doHeavyJob() throws InterruptedException { Random random = new Random(); Thread.sleep(random.nextInt(1000)); } }
Channel.java
import java.util.ArrayList; import java.util.List; import java.util.concurrent.ArrayBlockingQueue; public class Channel { // リクエストを保持しておくキュー ArrayBlockingQueue<Request> requests = new ArrayBlockingQueue<Request>(100); // Worker Thread の保持 private final List<Worker> threadPools; public Channel(int workerThreadNumber) { this.threadPools = new ArrayList<Worker>(); for (int i = 0; i < workerThreadNumber; i++) { this.threadPools.add(new Worker("Worker-" + i, this)); } } public void startWorker() { for (Worker threadPool : this.threadPools) { threadPool.start(); } } public synchronized void putRequest(Request request) { while (this.requests.size() > 0) { try { wait(); } catch (InterruptedException e) { } } this.requests.offer(request); notifyAll(); } public synchronized Request takeRequest() { while (this.requests.size() <= 0) { try { wait(); } catch (InterruptedException e) { } } Request request = this.requests.poll(); notifyAll(); return request; } }
Worker.java
public class Worker extends Thread { private final Channel channel; public Worker(String name, Channel channel) { super(name); this.channel = channel; } public void run() { while (true) { Request request = this.channel.takeRequest(); request.execute(); } } }
Request.java
import java.util.Random; public class Request { // 依頼者 private final String companyName; // リクエストの番号 private final int requestNumber; public Request(String companyName, int requestNumber) { this.companyName = companyName; this.requestNumber = requestNumber; } public void execute() { System.out.println( Thread.currentThread().getName() + " executes " + this); try { this.doHeavyJob(); } catch (InterruptedException e) { } } public String toString() { return "[ Request from " + this.companyName + " No." + this.requestNumber + " ]"; } private void doHeavyJob() throws InterruptedException { Random random = new Random(); Thread.sleep(random.nextInt(1000)); } }
出力
* Worker Threadは、誰が発行した Request かを考えず、受け取った Request をただひたする実行Worker-4 executes [ Request from Yahoo No.0 ] Worker-0 executes [ Request from Oracle No.0 ] Worker-3 executes [ Request from Google No.0 ] Worker-3 executes [ Request from Google No.1 ] Worker-4 executes [ Request from Oracle No.1 ] Worker-0 executes [ Request from Yahoo No.1 ] Worker-2 executes [ Request from Yahoo No.2 ] Worker-1 executes [ Request from Yahoo No.3 ]
参考文献
http://www.hyuki.com/dp/dp2_ch08.pdfhttp://pgcafe.moo.jp/JAVA/thread/main_7.htm