【Python】CSVデータをPostgreSQLにインポートする ~ copy_from() 編 ~

■ はじめに

Pythonコードにより、CSVファイルを入力データとして、
PostgreSQLに対してデータをInsertする

なお、PostgreSQLライブラリについては、
以下の関連記事の「psycopg2」を使う

PythonPostgreSQL を使う ~ psycopg2編 ~
https://dk521123.hatenablog.com/entry/2020/05/06/141029

■ 実装案

実装案1:COPYコマンドを利用する
実装案2:オーソドックスにInsertする

実装案1:COPYコマンドを利用する

copy_from(file, table, sep=',', columns=(item1, item2, ...))を使う

https://www.psycopg.org/docs/cursor.html#cursor.copy_from

※ PostgreSQL の COPY コマンド については、以下の関連記事を参照のこと。

https://dk521123.hatenablog.com/entry/2020/06/11/112650

使用上の注意

【1】文字・文字列データ「""」で囲うと意図したデータが挿入されない
 ⇒ CHAR(1)で囲むとエラーになった
 ⇒ VARCHARで囲むと””まで含んでデータを挿入してしまった
 ⇒ 例えば、「,"xxxx,xxxx",」のようなことができない

【2】スペースが「...,[Space],...」な感じで含まれている場合、そのままスペースが挿入される

実装案2:オーソドックスにInsertする

以下の関連記事を参照のこと

https://dk521123.hatenablog.com/entry/2020/05/09/113559

Hello World

使用しているテーブル「customers」は、
後述の「使用するデータ」の「SQL (CREATE TABLE)」を参照のこと

Pythonコード

import io
import psycopg2

with psycopg2.connect(
  dbname="hive_db", user="postgres", password="password",
  host="localhost", port=5432) as connection:
    connection.set_isolation_level(0)
    with connection.cursor() as cursor:
      # 一旦クリア
      cursor.execute("DELETE FROM customers")
      # データ
      input = \
        '1,X0001,Mike,m,1972-08-09,2020-05-08 11:22:22.222\n' + \
        '2,X0002,Smith,\\N,1972-08-09,2020-05-08 11:22:22.222\n' + \
        '3,X0003,Naomi,,1972-08-09,2020-05-08 11:22:22.222\n'
      with io.StringIO(input) as file_io:
        # ここに注目
        print("Copy From")
        cursor.copy_from(
          file_io,
          'customers',
          sep=',',
          # \\N=>Noneにしてくれる
          # (ただ単に値がない場合は、半角SP' 'になる)
          null='\\N',
          columns=(
            'id',
            'customer_no',
            'customer_name',
            'sex',
            'birth_date',
            'created_at'))
      # 確認
      cursor.execute("select * from customers;")
      print(cursor.fetchall())

出力結果

Copy From
[(1, 'X0001', 'Mike', 'm', datetime.date(1972, 8, 9), datetime.datetime(2020, 5, 8, 11, 22, 22, 222000)), (2, 'X0002', 'Smith', 
None, datetime.date(1972, 8, 9), datetime.datetime(2020, 5, 8, 11, 22, 22, 222000)), (3, 'X0003', 'Naomi', ' ', datetime.date(1972, 8, 9), datetime.datetime(2020, 5, 8, 11, 22, 22, 222000))]

■ 使用するデータ

SQL (CREATE TABLE)

-- customers
DROP TABLE IF EXISTS customers;
CREATE TABLE customers(
  id INTEGER PRIMARY KEY,
  customer_no VARCHAR (50),
  customer_name VARCHAR (50) NOT NULL,
  sex CHAR(1),
  birth_date DATE,
  created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

-- products
DROP TABLE IF EXISTS products;
CREATE TABLE products(
  id INTEGER PRIMARY KEY,
  product_name VARCHAR (50) NOT NULL,
  price INTEGER NOT NULL,
  description VARCHAR (200),
  created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

CSV

今回は、「【テーブル名】.csv」として、
一行目は、ヘッダー(項目名)とする

customers.csv

id,customer_no,customer_name,sex,birth_date,created_at
1,X0001,Mike,m,1972-08-09,2020-05-08 11:22:22.222
2,X0002,Tom,m,2001-12-29,2020-05-08 11:22:22.222
3,X0003,Naomi,f,1991-10-19,2020-05-08 11:22:22.222

products.csv

id,product_name,price,description,created_at
1,product - A,1200,Hello world\nTest,2020-05-08 11:22:22.222
2,product - B,234900,,2020-05-08 11:22:22.222
3,product - C,23000,,2020-05-08 11:22:22.222

pip

pip install -r requirements.txt
でインストールできるようにしておく

requirements.txt

psycopg2

■ サンプル

import sys
import os
import glob
import logging
import csv
import traceback
import psycopg2

HOST_NAME = "127.0.0.1"
DB_NAME = "your_db"
PORT_NUMBER = 5432
DB_USER = "postgres"
DB_PASSWORD = "password"
CSV_ENCODING = "utf-8"

TARGET_PATH = "C:\\"

LOG_FORMAT = '%(asctime)s - %(name)s - %(levelname)s - %(message)s'

logging.basicConfig(
  level=logging.DEBUG, stream=sys.stdout, format=LOG_FORMAT)

def main():
  if not os.path.exists(TARGET_PATH):
    logging.error("Not exists target path : {}".format(TARGET_PATH))
    return

  try:
    with psycopg2.connect(
      dbname=DB_NAME, user=DB_USER, password=DB_PASSWORD,
      host=HOST_NAME, port=PORT_NUMBER) as connection:
      with connection.cursor() as cursor:

        target_csv_path = os.path.join(TARGET_PATH, "*.csv")
        logging.debug("target_csv_path : {}".format(
          target_csv_path))
        for index, csv_path in enumerate(
          glob.glob(target_csv_path)):
          logging.info("*" * 20 + " CSV path[{}] : {} ".format(
            index, csv_path) + "*" * 20)

          initialize_table_data_by_csv(cursor, csv_path)

      logging.info("Commit")
      connection.commit()
  except Exception as ex:
    logging.error(ex)

def get_filename_without_ext(target_path: str) -> str:
  file_name = os.path.basename(target_path)
  return os.path.splitext(file_name)[0]

def initialize_table_data_by_csv(
  cursor, csv_path: str, csv_encoding=CSV_ENCODING):
  try:
    with open(csv_path, mode="r", encoding=csv_encoding) as csv_file:
      csv_reader = csv.reader(csv_file)
      header = next(csv_reader)
      logging.debug("Header = {}".format(header))

      table_name = get_filename_without_ext(csv_path)

      logging.info("Clean up(Delete) for table name [{}]".format(
        table_name))
      cursor.execute("DELETE FROM {}".format(table_name))

      logging.info("Inserting(COPY) for table name [{}]".format(
        table_name))
      cursor.copy_from(
        csv_file,
        table_name,
        sep=",",
        columns=tuple(header))

  except Exception as ex:
    logging.error(
      "Error in initialize_table_data_by_csv : {}, {}".format(
      ex, traceback.format_exc()))
    raise ex

if __name__ == '__main__':
  logging.info("Starting")

  main()

  logging.info("Done")

出力結果

2020-05-08 23:27:39,671 - root - INFO - Starting
2020-05-08 23:27:39,735 - root - DEBUG - target_csv_path : C:\work\*.csv
2020-05-08 23:27:39,736 - root - INFO - ******************** CSV path[0] : .\customers.csv ********************     
2020-05-08 23:27:39,737 - root - DEBUG - Header = ['id', 'customer_no', 'customer_name', 'sex', 'birth_date', 'created_at']
2020-05-08 23:27:39,745 - root - INFO - Deleting for table name [customers]
2020-05-08 23:27:39,751 - root - INFO - Inserting(COPY) for table name [customers]
2020-05-08 23:27:39,758 - root - INFO - ******************** CSV path[1] : .\products.csv ********************      
2020-05-08 23:27:39,760 - root - DEBUG - Header = ['id', 'product_name', 'price', 'description', 'created_at']
2020-05-08 23:27:39,761 - root - INFO - Deleting for table name [products]
2020-05-08 23:27:39,762 - root - INFO - Inserting(COPY) for table name [products]
2020-05-08 23:27:39,764 - root - INFO - Commit
2020-05-08 23:27:39,773 - root - INFO - Done

参考文献

https://qiita.com/yokotate/items/d2c67668585efac2861b

関連記事

CSVデータをPostgreSQLにインポートする ~ execute_values() 編 ~
https://dk521123.hatenablog.com/entry/2020/05/09/113559
PythonPostgreSQL を使う ~ psycopg2編 ~
https://dk521123.hatenablog.com/entry/2020/05/06/141029
Python ~ 基本編 / CSV
https://dk521123.hatenablog.com/entry/2019/11/07/214108
Python ~ 標準ログ / Logging ~
https://dk521123.hatenablog.com/entry/2020/02/03/231518
Python ~ 基本編 / コマンドライン引数 ~
https://dk521123.hatenablog.com/entry/2019/10/11/223651
Python ~ 基本編 / フォルダ・ファイル操作 ~
https://dk521123.hatenablog.com/entry/2019/09/02/000000
Python ~ 基本編 / 文字列 ~
https://dk521123.hatenablog.com/entry/2019/10/12/075251
DBクライアントツール
https://dk521123.hatenablog.com/entry/2016/05/08/152815
COPY コマンド ~ COPY FROM / TO ~
https://dk521123.hatenablog.com/entry/2020/06/11/112650