【Python】Python ~ 非同期 / concurrent.futures ~

■ はじめに

https://dk521123.hatenablog.com/entry/2023/02/26/000000

で作ったテストデータファイルを作成する処理で
パフォーマンステスト用に大容量のデータ行で作りたい。
ファイルは複数、別の種類もあるので、非同期で作って
なるべく時間を節約したい。

そこで、Pythonの非同期処理をメモる。

目次

【1】Future パターン
【2】concurrent.futures
【3】サンプル
 例1:実験コード
 例2:テストデータファイル作成ツールの非同期版

【1】Future パターン

* 以下の関連記事を参照のこと
 =>  処理の実行担当者は、処理が渡されると別スレッド上で処理を開始して、
  メインスレッドには即座にFutureオブジェクトを返すこと

Future パターン
https://dk521123.hatenablog.com/entry/2014/01/18/000804

【2】concurrent.futures

* 非同期に実行できる呼び出し可能オブジェクトの高水準のインターフェース
 => 非同期のデザインパターン「Future パターン」を実装可能にしてくれるライブラリ

cf. concurrent(コンカレント) = 同時に起こる、並列の

https://docs.python.org/ja/3/library/concurrent.futures.html

【3】サンプル

例1:実験コード

import concurrent.futures
import logging
import time

logging.basicConfig(
  level=logging.DEBUG,
  format='%(threadName)s: %(message)s'
)

def worker(x, y):
  logging.debug("start")
  time.sleep(5)
  result = x + y
  logging.debug(f"Done, {result}")
  return result

def main():
  with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
    f1 = executor.submit(worker, 2, 5)
    f2 = executor.submit(worker, 1, 4)
    f3 = executor.submit(worker, 3, 8)
    logging.debug(f"{f1.result()}")
    logging.debug(f"{f2.result()}")
    logging.debug(f"{f3.result()}")

if __name__ == "__main__":
  main()

出力結果 (max_workers=3)

ThreadPoolExecutor-0_0: start
ThreadPoolExecutor-0_1: start
ThreadPoolExecutor-0_2: start
<約5秒後>
ThreadPoolExecutor-0_2: Done, 11
ThreadPoolExecutor-0_1: Done, 5
ThreadPoolExecutor-0_0: Done, 7
MainThread: 7
MainThread: 5
MainThread: 11

出力結果 (max_workers=1)

ThreadPoolExecutor-0_0: start
ThreadPoolExecutor-0_0: Done, 7 << 終わるのが1つ1つ
ThreadPoolExecutor-0_0: start
MainThread: 7
ThreadPoolExecutor-0_0: Done, 5 << 終わるのが1つ1つ
ThreadPoolExecutor-0_0: start
MainThread: 5
ThreadPoolExecutor-0_0: Done, 11 << 終わるのが1つ1つ
MainThread: 11

例2:テストデータファイル作成ツールの非同期版

import concurrent.futures
import logging
import time
import os
import csv
import datetime
import random

# To initialize
output_path = './output/test'
os.makedirs(output_path, exist_ok=True)

now_datetime = datetime.datetime.now()
today = now_datetime.strftime('%Y%m%d')

logging.basicConfig(
  level=logging.DEBUG,
  format='%(threadName)s: %(message)s'
)

def generate_csv(file_path, body_data, header_list=[], delimiter=","):
  with open(file_path, 'w', newline="") as csv_file:
    writer = csv.writer(csv_file, delimiter=delimiter)
    if (len(header_list) != 0):
      writer.writerow(header_list)
    for row in body_data:
      writer.writerow(row)

person_name_list = ["Mike", "Tom", "Kevin", "Smith"]
city_list = ["Tokyo", "Osaka", "Fukuoka"]
date_list = ["2022-12-31", "2019-01-01", "2023-08-15", "2020-01-29"]

def generate_body(header_list, max_line=10):
  csv_body = []
  for index in range(max_line):
    row = []
    for csv_item in header_list:
      value = ""
      item = csv_item.lower()
      if item == 'id':
        id = str(index).zfill(10)
        value = f"44062120-d060-4c21-9cb5-{id}"
      elif item == 'name':
        value = get_value_from_list(index, person_name_list)
      elif item == 'city':
        value = get_value_from_list(index, city_list)
      elif item == 'age':
        value = str(int(random.uniform(15, 80)))
      elif item == 'registration_date' or item == 'birthday':
        value = get_value_from_list(index, date_list)
      else:
        value = "aaa"
      row.append(value)
    csv_body.append(row)
  return csv_body

def get_value_from_list(index, target_list):
  target_index = index % (len(target_list))
  return target_list[target_index]

def csv_generate_worker(base_file_name, extension, header_list, nax_line, delimiter=","):
  body_data = generate_body(header_list, nax_line)
  generate_csv(
    f"{output_path}/{base_file_name}_{today}_003{extension}",
    body_data,
    header_list,
    delimiter
  )
  return True

def main():
  logging.debug("Start")
  with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
    f1 = executor.submit(
      csv_generate_worker,
      "person",
      ".csv",
      ["id", "name", "city", "age", "registration_date", "remarks"],
      1000000
    )
    f2 = executor.submit(
      csv_generate_worker,
      "customer",
      ".txt",
      ["id", "name", "birthday", "remarks"],
      1000000
    )
    f3 = executor.submit(
      csv_generate_worker,
      "name_list",
      ".tsv",
      ["id", "name"],
      1000000,
      "\t"
    )
    logging.debug(f"{f1.result()}")
    logging.debug(f"{f2.result()}")
    logging.debug(f"{f3.result()}")

if __name__ == "__main__":
  main()

関連記事

Future パターン
https://dk521123.hatenablog.com/entry/2014/01/18/000804
非同期処理 ~ async/await, Promise ~
https://dk521123.hatenablog.com/entry/2021/01/16/202822
Python ~ 入門編 ~
https://dk521123.hatenablog.com/entry/2014/08/07/231242
Python ~ 基本編 / 文字列 ~
https://dk521123.hatenablog.com/entry/2019/10/12/075251
Python ~ 非同期 / マルチスレッド ~
https://dk521123.hatenablog.com/entry/2023/07/05/195802