【Python】CSVデータをPostgreSQLにインポートする ~ execute_values() 編 ~

■ はじめに

https://dk521123.hatenablog.com/entry/2020/05/08/175525

の続き。

今回は、「実装案2:オーソドックスにInsertする」を行う。

■ 実行メソッドについて

実行メソッドの候補として、以下がある。

1)executemany() ← パフォーマンスがいまいちだとか(詳細は、以下)
2)execute_batch()
3)execute_values() ← 今回は、実装のしやすさからこっちを選択

公式サイト:executemany()
https://www.psycopg.org/docs/cursor.html#cursor.executemany
公式サイト:execute_batch()
https://www.psycopg.org/docs/extras.html#psycopg2.extras.execute_batch
公式サイト:execute_values()
https://www.psycopg.org/docs/extras.html#psycopg2.extras.execute_values

executemany() だとパフォーマンスがいまいち

http://minamitama-minamitama.blogspot.com/2011/04/pythonpostgresqlinsert.html

で言っているように、現在は、どうも速度は大して変わらないらしい。

公式サイト:executemany()
https://www.psycopg.org/docs/cursor.html#cursor.executemany

より抜粋
~~~~~~~~~~~
Warning
In its current implementation this method is not faster than executing execute() in a loop.
For better performance you can use the functions described in Fast execution helpers.
~~~~~~~~~~~

https://www.psycopg.org/docs/extras.html#fast-exec

psycopg2.extras.execute_batch(cur, sql, argslist, page_size=100)
psycopg2.extras.execute_values(cur, sql, argslist, template=None, page_size=100, fetch=False)
が早いらしい。

■ 使用上の注意

【1】null値と空文字の値の入力に気を付けること

【1】null値と空文字の値の入力に気を付けること

 CSVの空文字は、そのまま空文字になってしまう。
null を設定する場合、明示的に、Pythonコード側で、
None を設定する必要がある。

 この問題だけであれば、以下の関連記事で行っている
copy_from() の 引数 null='\\N' を使えば、解決できる。
しかし、copy_from()は、以下の関連記事の「■ 使用上の注意」にある
別の問題がある、、、
(結局、要件次第で使い分ければいいと思うが、、、)

https://dk521123.hatenablog.com/entry/2020/05/08/175525

 代替え文字(例 "Nil", "null" etc)をCSVに埋め込んでおき、
コードの中で None に置き換えれば対応はできるが、、、
 ⇒ 代替え文字は、使用しないであろう文字がいいが、
  ぱっとみて null だと分かりやすくする必要があるっと
  (今回は「Nil」にしている)
 ⇒ 空文字を一律、Noneに替えることもできるが、
  PostgreSQLは、空文字 と null を区別している思想なので、
  上記の代替え文字を考えてみた。
  (空文字を null に変更したことによる影響も考える必要もあるので)

ちなみに、以下の関連記事にあるSQLで気が付いた問題。。。

https://dk521123.hatenablog.com/entry/2020/05/12/223114

■ 使用するデータ

SQL (CREATE TABLE) / pip

以下の関連記事と同じなので、省略。

https://dk521123.hatenablog.com/entry/2020/05/08/175525

CSV

今回も、「【テーブル名】.csv」として、
一行目は、ヘッダー(項目名)とする

※ 今回は、「""」に囲んでも問題なくINSERTできる。

customers.csv

id,customer_no,customer_name,sex,birth_date,created_at
"1","X0001","Mike","m","1972-08-09","2020-05-08 11:22:22.222"
"2","X0002","Smith","m","2001-12-29","2020-05-08 11:22:22.222"
"3","X0003","Naomi","f","1991-10-19","2020-05-08 11:22:22.222"
"4","X0004","Ken","m","1891-09-13","2020-05-08 11:22:22.222"

products.csv

id,product_name,price,description,created_at
"1","product - A","1200","Hello world\nTest","2020-05-08 11:22:22.222"
"2","product - B","234900","Nil","2020-05-08 11:22:22.222"
"3","product - C","200","","2020-05-08 11:22:22.222"
"4","product - D","1200","Hello, world!!","2020-05-08 11:22:22.222"
"5","product - E","1210","Test, AB","2020-05-08 11:22:22.222"

■ サンプル

pythonコード

import sys
import os
import glob
import logging
import csv
import traceback
import psycopg2
from psycopg2.extras import execute_values

HOST_NAME = "127.0.0.1"
DB_NAME = "your_db"
PORT_NUMBER = 5432
DB_USER = "postgres"
DB_PASSWORD = "password"
CSV_ENCODING = "utf-8"

TARGET_PATH = "C:\\work"

LOG_FORMAT = '%(asctime)s - %(name)s - %(levelname)s - %(message)s'

logging.basicConfig(
  level=logging.DEBUG, stream=sys.stdout, format=LOG_FORMAT)

def main():
  if not os.path.exists(TARGET_PATH):
    logging.error("Not exists target path : {}".format(TARGET_PATH))
    return

  try:
    with psycopg2.connect(
      dbname=DB_NAME, user=DB_USER, password=DB_PASSWORD,
      host=HOST_NAME, port=PORT_NUMBER) as connection:
      with connection.cursor() as cursor:

        target_csv_path = os.path.join(TARGET_PATH, "*.csv")
        logging.debug("target_csv_path : {}".format(
          target_csv_path))
        for index, csv_path in enumerate(
          glob.glob(target_csv_path)):
          logging.info("*" * 20 + " CSV path[{}] : {} ".format(
            index, csv_path) + "*" * 20)

          initialize_table_data_by_csv(cursor, csv_path)
        # 確認
        logging.info("=-" * 20)
        cursor.execute("select * from customers;")
        logging.info(cursor.fetchall())
        logging.info("=-" * 20)
        cursor.execute("select * from products;")
        logging.info(cursor.fetchall())
        logging.info("=-" * 20)

      logging.info("Commit")
      connection.commit()
  except Exception as ex:
    logging.error(ex)

def get_filename_without_ext(target_path: str) -> str:
  file_name = os.path.basename(target_path)
  return os.path.splitext(file_name)[0]

# ここが前回と違う点
def generate_insert_statement(
  table_name: str, csv_header: str) -> str:
  item = ""
  for index, csv_item in enumerate(csv_header):
    if index != 0:
      item = item + ","
    item = item + csv_item
  return "INSERT INTO {} ({}) VALUES %s".format(table_name, item)

def initialize_table_data_by_csv(
  cursor, csv_path: str, csv_encoding=CSV_ENCODING):
  try:
    with open(csv_path, mode="r", encoding=csv_encoding) as csv_file:
      csv_reader = csv.reader(csv_file)
      csv_header = next(csv_reader)
      logging.debug("CSV Header = {}".format(csv_header))

      table_name = get_filename_without_ext(csv_path)

      logging.info("Clean up(Delete) for table name [{}]".format(
        table_name))
      cursor.execute("DELETE FROM {}".format(table_name))

      # ここが前回と違う点
      insert_sql = generate_insert_statement(table_name, csv_header)

      params = []
      for csv_row in csv_reader:
        row_values = []
        for index, csv_value in enumerate(csv_row):
          if csv_value == "Nil":
            # None(Null)に変換
            logging.debug("[{}] - None".format(index))
            row_values.append(None)
          else:
            logging.debug("[{}] - [{}]".format(index, csv_value))
            row_values.append(csv_value)
        logging.debug("row loop done!")
        params.append(tuple(row_values))

      logging.info("Inserting [{}]".format(insert_sql))
      execute_values(cursor, insert_sql, params)
  except Exception as ex:
    logging.error(
      "Error in initialize_table_data_by_csv : {}, {}".format(
      ex, traceback.format_exc()))
    raise ex

if __name__ == '__main__':
  logging.info("Starting")

  main()

  logging.info("Done")

出力結果 (デバッグログ)

2020-05-13 21:46:33,215 - root - INFO - Starting
2020-05-13 21:46:33,266 - root - DEBUG - target_csv_path : C:\work\*.csv
2020-05-13 21:46:33,274 - root - INFO - ******************** CSV path[0] : C:\work\customers.csv ********************
2020-05-13 21:46:33,274 - root - DEBUG - CSV Header = ['id', 'customer_no', 'customer_name', 'sex', 'birth_date', 'created_at']
2020-05-13 21:46:33,274 - root - INFO - Clean up(Delete) for table name [customers]
2020-05-13 21:46:33,278 - root - DEBUG - [0] - [1]
2020-05-13 21:46:33,280 - root - DEBUG - [1] - [X0001]
2020-05-13 21:46:33,281 - root - DEBUG - [2] - [Mike]
2020-05-13 21:46:33,282 - root - DEBUG - [3] - [m]
2020-05-13 21:46:33,283 - root - DEBUG - [4] - [1972-08-09]
2020-05-13 21:46:33,294 - root - DEBUG - [5] - [2020-05-08 11:22:22.222]
2020-05-13 21:46:33,295 - root - DEBUG - row loop done!
2020-05-13 21:46:33,296 - root - DEBUG - [0] - [2]
2020-05-13 21:46:33,297 - root - DEBUG - [1] - [X0002]
2020-05-13 21:46:33,298 - root - DEBUG - [2] - [Smith]
2020-05-13 21:46:33,299 - root - DEBUG - [3] - [m]
2020-05-13 21:46:33,309 - root - DEBUG - [4] - [2001-12-29]
2020-05-13 21:46:33,309 - root - DEBUG - [5] - [2020-05-08 11:22:22.222]
2020-05-13 21:46:33,310 - root - DEBUG - row loop done!
2020-05-13 21:46:33,311 - root - DEBUG - [0] - [3]
2020-05-13 21:46:33,312 - root - DEBUG - [1] - [X0003]
2020-05-13 21:46:33,312 - root - DEBUG - [2] - [Naomi]
2020-05-13 21:46:33,313 - root - DEBUG - [3] - [f]
2020-05-13 21:46:33,313 - root - DEBUG - [4] - [1991-10-19]
2020-05-13 21:46:33,313 - root - DEBUG - [5] - [2020-05-08 11:22:22.222]
2020-05-13 21:46:33,314 - root - DEBUG - row loop done!
2020-05-13 21:46:33,314 - root - DEBUG - [0] - [4]
2020-05-13 21:46:33,314 - root - DEBUG - [1] - [X0004]
2020-05-13 21:46:33,315 - root - DEBUG - [2] - [Ken]
2020-05-13 21:46:33,315 - root - DEBUG - [3] - [m]
2020-05-13 21:46:33,316 - root - DEBUG - [4] - [1891-09-13]
2020-05-13 21:46:33,325 - root - DEBUG - [5] - [2020-05-08 11:22:22.222]
2020-05-13 21:46:33,326 - root - DEBUG - row loop done!
2020-05-13 21:46:33,327 - root - INFO - Inserting [INSERT INTO customers (id,customer_no,customer_name,sex,birth_date,created_at) VALUES %s]
2020-05-13 21:46:33,331 - root - INFO - ******************** CSV path[1] : C:\work\products.csv ********************
2020-05-13 21:46:33,332 - root - DEBUG - CSV Header = ['id', 'product_name', 'price', 'description', 'created_at']
2020-05-13 21:46:33,342 - root - INFO - Clean up(Delete) for table name [products]
2020-05-13 21:46:33,345 - root - DEBUG - [0] - [1]
2020-05-13 21:46:33,346 - root - DEBUG - [1] - [product - A]
2020-05-13 21:46:33,347 - root - DEBUG - [2] - [1200]
2020-05-13 21:46:33,348 - root - DEBUG - [3] - [Hello world\nTest]
2020-05-13 21:46:33,348 - root - DEBUG - [4] - [2020-05-08 11:22:22.222]
2020-05-13 21:46:33,349 - root - DEBUG - row loop done!
2020-05-13 21:46:33,358 - root - DEBUG - [0] - [2]
2020-05-13 21:46:33,358 - root - DEBUG - [1] - [product - B]
2020-05-13 21:46:33,359 - root - DEBUG - [2] - [234900]
2020-05-13 21:46:33,360 - root - DEBUG - [3] - None
2020-05-13 21:46:33,361 - root - DEBUG - [4] - [2020-05-08 11:22:22.222]
2020-05-13 21:46:33,361 - root - DEBUG - row loop done!
2020-05-13 21:46:33,362 - root - DEBUG - [0] - [3]
2020-05-13 21:46:33,362 - root - DEBUG - [1] - [product - C]
2020-05-13 21:46:33,362 - root - DEBUG - [2] - [200]
2020-05-13 21:46:33,363 - root - DEBUG - [3] - []
2020-05-13 21:46:33,363 - root - DEBUG - [4] - [2020-05-08 11:22:22.222]
2020-05-13 21:46:33,363 - root - DEBUG - row loop done!
2020-05-13 21:46:33,364 - root - DEBUG - [0] - [4]
2020-05-13 21:46:33,364 - root - DEBUG - [1] - [product - D]
2020-05-13 21:46:33,364 - root - DEBUG - [2] - [1200]
2020-05-13 21:46:33,365 - root - DEBUG - [3] - [Hello, world!!]
2020-05-13 21:46:33,374 - root - DEBUG - [4] - [2020-05-08 11:22:22.222]
2020-05-13 21:46:33,374 - root - DEBUG - row loop done!
2020-05-13 21:46:33,375 - root - DEBUG - [0] - [5]
2020-05-13 21:46:33,376 - root - DEBUG - [1] - [product - E]
2020-05-13 21:46:33,377 - root - DEBUG - [2] - [1210]
2020-05-13 21:46:33,377 - root - DEBUG - [3] - [Test, AB]
2020-05-13 21:46:33,377 - root - DEBUG - [4] - [2020-05-08 11:22:22.222]
2020-05-13 21:46:33,378 - root - DEBUG - row loop done!
2020-05-13 21:46:33,378 - root - INFO - Inserting [INSERT INTO products (id,product_name,price,description,created_at) VALUES %s]
2020-05-13 21:46:33,380 - root - INFO - =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
2020-05-13 21:46:33,380 - root - INFO - [(1, 'X0001', 'Mike', 'm', datetime.date(1972, 8, 9), datetime.datetime(2020, 5, 8, 11, 
22, 22, 222000)), (2, 'X0002', 'Smith', 'm', datetime.date(2001, 12, 29), datetime.datetime(2020, 5, 8, 11, 22, 22, 222000)), (3, 'X0003', 'Naomi', 'f', datetime.date(1991, 10, 19), datetime.datetime(2020, 5, 8, 11, 22, 22, 222000)), (4, 'X0004', 'Ken', 'm', datetime.date(1891, 9, 13), datetime.datetime(2020, 5, 8, 11, 22, 22, 222000))]
2020-05-13 21:46:33,382 - root - INFO - =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
2020-05-13 21:46:33,391 - root - INFO - [(1, 'product - A', 1200, 'Hello world\\nTest', datetime.datetime(2020, 5, 8, 11, 22, 22, 222000)), (2, 'product - B', 234900, None, datetime.datetime(2020, 5, 8, 11, 22, 22, 222000)), (3, 'product - C', 200, '', datetime.datetime(2020, 5, 8, 11, 22, 22, 222000)), (4, 'product - D', 1200, 'Hello, world!!', datetime.datetime(2020, 5, 8, 11, 22, 22, 222000)), (5, 'product - E', 1210, 'Test, AB', datetime.datetime(2020, 5, 8, 11, 22, 22, 222000))]
2020-05-13 21:46:33,393 - root - INFO - =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
2020-05-13 21:46:33,394 - root - INFO - Commit
2020-05-13 21:46:33,396 - root - INFO - Done

関連記事

Pythonを使ってCSVデータをPostgreSQLにインポートする ~ copy_from() 編 ~
https://dk521123.hatenablog.com/entry/2020/05/08/175525
Python ~ 基本編 / 文字列 ~
https://dk521123.hatenablog.com/entry/2019/10/12/075251
DBクライアントツール
https://dk521123.hatenablog.com/entry/2016/05/08/152815
psqlCSV出力する
https://dk521123.hatenablog.com/entry/2020/02/08/001155