■ はじめに
https://dk521123.hatenablog.com/entry/2023/02/26/000000
で作ったテストデータファイルを作成する処理で パフォーマンステスト用に大容量のデータ行で作りたい。 ファイルは複数、別の種類もあるので、非同期で作って なるべく時間を節約したい。 そこで、Pythonの非同期処理をメモる。
目次
【1】Future パターン 【2】concurrent.futures 【3】サンプル 例1:実験コード 例2:テストデータファイル作成ツールの非同期版
【1】Future パターン
* 以下の関連記事を参照のこと => 処理の実行担当者は、処理が渡されると別スレッド上で処理を開始して、 メインスレッドには即座にFutureオブジェクトを返すこと
Future パターン
https://dk521123.hatenablog.com/entry/2014/01/18/000804
【2】concurrent.futures
* 非同期に実行できる呼び出し可能オブジェクトの高水準のインターフェース => 非同期のデザインパターン「Future パターン」を実装可能にしてくれるライブラリ cf. concurrent(コンカレント) = 同時に起こる、並列の
https://docs.python.org/ja/3/library/concurrent.futures.html
【3】サンプル
例1:実験コード
import concurrent.futures import logging import time logging.basicConfig( level=logging.DEBUG, format='%(threadName)s: %(message)s' ) def worker(x, y): logging.debug("start") time.sleep(5) result = x + y logging.debug(f"Done, {result}") return result def main(): with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor: f1 = executor.submit(worker, 2, 5) f2 = executor.submit(worker, 1, 4) f3 = executor.submit(worker, 3, 8) logging.debug(f"{f1.result()}") logging.debug(f"{f2.result()}") logging.debug(f"{f3.result()}") if __name__ == "__main__": main()
出力結果 (max_workers=3)
ThreadPoolExecutor-0_0: start ThreadPoolExecutor-0_1: start ThreadPoolExecutor-0_2: start <約5秒後> ThreadPoolExecutor-0_2: Done, 11 ThreadPoolExecutor-0_1: Done, 5 ThreadPoolExecutor-0_0: Done, 7 MainThread: 7 MainThread: 5 MainThread: 11
出力結果 (max_workers=1)
ThreadPoolExecutor-0_0: start ThreadPoolExecutor-0_0: Done, 7 << 終わるのが1つ1つ ThreadPoolExecutor-0_0: start MainThread: 7 ThreadPoolExecutor-0_0: Done, 5 << 終わるのが1つ1つ ThreadPoolExecutor-0_0: start MainThread: 5 ThreadPoolExecutor-0_0: Done, 11 << 終わるのが1つ1つ MainThread: 11
例2:テストデータファイル作成ツールの非同期版
import concurrent.futures import logging import time import os import csv import datetime import random # To initialize output_path = './output/test' os.makedirs(output_path, exist_ok=True) now_datetime = datetime.datetime.now() today = now_datetime.strftime('%Y%m%d') logging.basicConfig( level=logging.DEBUG, format='%(threadName)s: %(message)s' ) def generate_csv(file_path, body_data, header_list=[], delimiter=","): with open(file_path, 'w', newline="") as csv_file: writer = csv.writer(csv_file, delimiter=delimiter) if (len(header_list) != 0): writer.writerow(header_list) for row in body_data: writer.writerow(row) person_name_list = ["Mike", "Tom", "Kevin", "Smith"] city_list = ["Tokyo", "Osaka", "Fukuoka"] date_list = ["2022-12-31", "2019-01-01", "2023-08-15", "2020-01-29"] def generate_body(header_list, max_line=10): csv_body = [] for index in range(max_line): row = [] for csv_item in header_list: value = "" item = csv_item.lower() if item == 'id': id = str(index).zfill(10) value = f"44062120-d060-4c21-9cb5-{id}" elif item == 'name': value = get_value_from_list(index, person_name_list) elif item == 'city': value = get_value_from_list(index, city_list) elif item == 'age': value = str(int(random.uniform(15, 80))) elif item == 'registration_date' or item == 'birthday': value = get_value_from_list(index, date_list) else: value = "aaa" row.append(value) csv_body.append(row) return csv_body def get_value_from_list(index, target_list): target_index = index % (len(target_list)) return target_list[target_index] def csv_generate_worker(base_file_name, extension, header_list, nax_line, delimiter=","): body_data = generate_body(header_list, nax_line) generate_csv( f"{output_path}/{base_file_name}_{today}_003{extension}", body_data, header_list, delimiter ) return True def main(): logging.debug("Start") with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor: f1 = executor.submit( csv_generate_worker, "person", ".csv", ["id", "name", "city", "age", "registration_date", "remarks"], 1000000 ) f2 = executor.submit( csv_generate_worker, "customer", ".txt", ["id", "name", "birthday", "remarks"], 1000000 ) f3 = executor.submit( csv_generate_worker, "name_list", ".tsv", ["id", "name"], 1000000, "\t" ) logging.debug(f"{f1.result()}") logging.debug(f"{f2.result()}") logging.debug(f"{f3.result()}") if __name__ == "__main__": main()
関連記事
Future パターン
https://dk521123.hatenablog.com/entry/2014/01/18/000804
非同期処理 ~ async/await, Promise ~
https://dk521123.hatenablog.com/entry/2021/01/16/202822
Python ~ 入門編 ~
https://dk521123.hatenablog.com/entry/2014/08/07/231242
Python ~ 基本編 / 文字列 ~
https://dk521123.hatenablog.com/entry/2019/10/12/075251