【Snowflake】指定した日付分アンロードするSQLを生成するPythonコード

■ はじめに

テーブル内のデータをアンロードして
テストデータを作るって作業をしていて
大分、久しぶりに、 Snowflakeスクリプトを書いたのだが
仕様とか色々大幅にかかりそうな予感を感じているので
Pythonコード化して対応しておく

【1】前提条件

* ストレージ統合(STORAGE INTEGRATION)は事前に作成しておくこと
 => 以下の関連記事を参照のこと

Snowflake ~ ストレージ統合の作成手順 ~
https://dk521123.hatenablog.com/entry/2023/07/27/000000

【1】サンプル

import codecs

output_file = "unload.sql"
base_db = "demo_db"
base_schema = "demo_schema"
start_date = "2023-08-01"
end_date = "2023-08-03"
target_s3_bucket_name = "your-s3-bucket"
target_integration = "storage_integration_for_unload"
target_unload_list = (
  ("demo_db1", "demo_sche1", "demo_table1", "'unload_demo_table1' || target_date || '_.txt'", "|", True),
  ("demo_db2", "demo_sche2", "demo_table2", "'unload_demo_table2' || target_date || '_.txt'", ",", False),
)

def build_sql(sql: str, target: str = ""):
  return sql + target + "\n"

# Start to generate SQL
sql = ""
sql = build_sql(sql, f"use database {base_db};")
sql = build_sql(sql, f"use schema {base_schema};")

sql = build_sql(sql)
sql = build_sql(sql, "-- Step1: Create each stage of table to unload")
for i, target_table_info in enumerate(target_unload_list):
  number = i + 1
  target_db_name = target_table_info[0]
  target_schema_name = target_table_info[1]
  target_table_name = target_table_info[2]
  target_table = f"{target_db_name}.{target_schema_name}.{target_table_name}"

  print(f"target_table = {target_table}")
  sql = build_sql(sql, f"-- [{number}] {target_table}")
  sql = build_sql(sql, f"CREATE OR REPLACE STAGE {base_db}.{base_schema}.{target_table_name}_unload_stage")
  sql = build_sql(sql, f"  URL='s3://{target_s3_bucket_name}/demo/{target_db_name}/{target_schema_name}/{target_table_name}/'")
  sql = build_sql(sql, f"  STORAGE_INTEGRATION={target_integration};")
  sql = build_sql(sql)

sql = build_sql(sql, "execute immediate $$")
sql = build_sql(sql, "declare")
sql = build_sql(sql, f"  start_date date default date('{start_date}');")
sql = build_sql(sql, f"  end_date date default date('{end_date}');")
sql = build_sql(sql, f"  final_statement varchar default '';")
sql = build_sql(sql, "begin")
sql = build_sql(sql, "  let target_date := start_date;")
sql = build_sql(sql, "  while (target_date <= end_date) do")
sql = build_sql(sql, "    let year := to_varchar(:target_date, 'yyyy');")
sql = build_sql(sql, "    let month := to_varchar(:target_date, 'MM');")
sql = build_sql(sql, "    let day := to_varchar(:target_date, 'dd');")
sql = build_sql(sql, "    let target_date_as_int := to_char(:target_date, 'YYYYMMDD')::int;")

for i, target_table_info in enumerate(target_unload_list):
  number = i + 1
  target_db_name = target_table_info[0]
  target_schema_name = target_table_info[1]
  target_table_name = target_table_info[2]
  target_file_name = target_table_info[3]
  target_delimiter = target_table_info[4]
  has_header = target_table_info[5]
  target_table = f"{target_db_name}.{target_schema_name}.{target_table_name}"

  sql = build_sql(sql, f"    -- ******* [{number}] {target_table} *******")
  sql = build_sql(sql, f"    -- Step2-{number}: Clean data file path if exists")
  sql = build_sql(sql, f"    let to_remove_file_{target_table_name} := 'REMOVE @{base_db}.{base_schema}.{target_table_name}_unload_stage/taget_date=' || target_date || '/';")
  sql = build_sql(sql, f"    final_statement := to_remove_file_{target_table_name};")
  sql = build_sql(sql, f"    EXECUTE IMMEDIATE :to_remove_file_{target_table_name};")
  sql = build_sql(sql, f"    -- Step3-{number}: Unload")
  sql = build_sql(sql, f"    let to_unload_file_{target_table_name} := 'COPY INTO @{base_db}.{base_schema}.{target_table_name}_unload_stage/taget_date=' || target_date || '/' || {target_file_name} ||")
  sql = build_sql(sql, f"    'FROM (' ||")
  sql = build_sql(sql, f"    '  SELECT * FROM {target_table} WHERE insertion_date=' || target_date ||")
  sql = build_sql(sql, f"    ')' ||")
  sql = build_sql(sql, f"    'FILE_FORMAT = (' ||")
  sql = build_sql(sql, f"    '  TYPE=CSV' ||")
  sql = build_sql(sql, f"    '  FIELD_DELIMITER = \\\'{target_delimiter}\\\'' ||")
  sql = build_sql(sql, f"    '  FIELD_OPTIONALLY_ENCLOSED_BY = \\\'\\\"\\\'' ||")
  sql = build_sql(sql, f"    '  NULL_IF=(\\\'\\\') ||")
  sql = build_sql(sql, f"    '  COMPRESSION = NONE' ||")
  sql = build_sql(sql, f"    ')' ||")
  if has_header:
    sql = build_sql(sql, f"    'HEADER = TRUE' ||")
  sql = build_sql(sql, f"    'SINGLE=TRUE';")
  sql = build_sql(sql, f"    final_statement := to_unload_file_{target_table_name};")
  sql = build_sql(sql, f"    EXECUTE IMMEDIATE :to_unload_file_{target_table_name};")
  sql = build_sql(sql)

sql = build_sql(sql, "    -- loop end")
sql = build_sql(sql, "    target_date := target_date + interval '1 days';")
sql = build_sql(sql, "  end while;")
sql = build_sql(sql, "  return final_statement;")
sql = build_sql(sql, "end;")
sql = build_sql(sql, "$$")
sql = build_sql(sql, ";")

with codecs.open(output_file, 'w', 'utf-8') as file:
  file.write(sql)

print(f"Done. See {output_file}")

出力例

use database demo_db;
use schema demo_schema;

-- Step1: Create each stage of table to unload
-- [1] demo_db1.demo_sche1.demo_table1
CREATE OR REPLACE STAGE demo_db.demo_schema.demo_table1_unload_stage
  URL='s3://your-s3-bucket/demo/demo_db1/demo_sche1/demo_table1/'
  STORAGE_INTEGRATION=storage_integration_for_unload;

-- [2] demo_db2.demo_sche2.demo_table2
CREATE OR REPLACE STAGE demo_db.demo_schema.demo_table2_unload_stage
  URL='s3://your-s3-bucket/demo/demo_db2/demo_sche2/demo_table2/'
  STORAGE_INTEGRATION=storage_integration_for_unload;

execute immediate $$
declare
  start_date date default date('2023-08-01');
  end_date date default date('2023-08-03');
  final_statement varchar default '';
begin
  let target_date := start_date;
  while (target_date <= end_date) do
    let year := to_varchar(:target_date, 'yyyy');
    let month := to_varchar(:target_date, 'MM');
    let day := to_varchar(:target_date, 'dd');
    let target_date_as_int := to_char(:target_date, 'YYYYMMDD')::int;
    -- ******* [1] demo_db1.demo_sche1.demo_table1 *******
    -- Step2-1: Clean data file path if exists
    let to_remove_file_demo_table1 := 'REMOVE @demo_db.demo_schema.demo_table1_unload_stage/taget_date=' || target_date || '/';
    final_statement := to_remove_file_demo_table1;
    EXECUTE IMMEDIATE :to_remove_file_demo_table1;
    -- Step3-1: Unload
    let to_unload_file_demo_table1 := 'COPY INTO @demo_db.demo_schema.demo_table1_unload_stage/taget_date=' || target_date || '/' || 'unload_demo_table1' || target_date || '_.txt' ||
    'FROM (' ||
    '  SELECT * FROM demo_db1.demo_sche1.demo_table1 WHERE insertion_date=' || target_date ||
    ')' ||
    'FILE_FORMAT = (' ||
    '  TYPE=CSV' ||
    '  FIELD_DELIMITER = \'|\'' ||
    '  FIELD_OPTIONALLY_ENCLOSED_BY = \'\"\'' ||
    '  NULL_IF=(\'\') ||
    '  COMPRESSION = NONE' ||
    ')' ||
    'HEADER = TRUE' ||
    'SINGLE=TRUE';
    final_statement := to_unload_file_demo_table1;
    EXECUTE IMMEDIATE :to_unload_file_demo_table1;

    -- ******* [2] demo_db2.demo_sche2.demo_table2 *******
    -- Step2-2: Clean data file path if exists
    let to_remove_file_demo_table2 := 'REMOVE @demo_db.demo_schema.demo_table2_unload_stage/taget_date=' || target_date || '/';
    final_statement := to_remove_file_demo_table2;
    EXECUTE IMMEDIATE :to_remove_file_demo_table2;
    -- Step3-2: Unload
    let to_unload_file_demo_table2 := 'COPY INTO @demo_db.demo_schema.demo_table2_unload_stage/taget_date=' || target_date || '/' || 'unload_demo_table2' || target_date || '_.txt' ||
    'FROM (' ||
    '  SELECT * FROM demo_db2.demo_sche2.demo_table2 WHERE insertion_date=' || target_date ||
    ')' ||
    'FILE_FORMAT = (' ||
    '  TYPE=CSV' ||
    '  FIELD_DELIMITER = \',\'' ||
    '  FIELD_OPTIONALLY_ENCLOSED_BY = \'\"\'' ||
    '  NULL_IF=(\'\') ||
    '  COMPRESSION = NONE' ||
    ')' ||
    'SINGLE=TRUE';
    final_statement := to_unload_file_demo_table2;
    EXECUTE IMMEDIATE :to_unload_file_demo_table2;

    -- loop end
    target_date := target_date + interval '1 days';
  end while;
  return final_statement;
end;
$$
;

関連記事

Snowflake ~ 基礎知識編 ~
https://dk521123.hatenablog.com/entry/2021/11/02/130111
Snowflake ~ 入門編 / Hello world
https://dk521123.hatenablog.com/entry/2021/11/22/212520
Snowflake ~ データ アンロード ~
https://dk521123.hatenablog.com/entry/2022/07/04/172738
Snowflake ~ Removeコマンド ~
https://dk521123.hatenablog.com/entry/2022/09/26/150259
Snowflake ~ ストレージ統合の作成手順 ~
https://dk521123.hatenablog.com/entry/2023/07/27/000000
Snowflake ~ 日時関連 ~
https://dk521123.hatenablog.com/entry/2022/06/17/113003
Snowflake ~ 日時関連 / 日時取得関数 ~
https://dk521123.hatenablog.com/entry/2022/09/02/092144
Snowflake ~ 文字列操作関連 ~
https://dk521123.hatenablog.com/entry/2022/10/01/000000
Snowflake ~ SEQUENCE ~
https://dk521123.hatenablog.com/entry/2023/07/14/091918
Snowflake】ストアド ~ 入門編 ~
https://dk521123.hatenablog.com/entry/2022/12/11/202904
Snowflake】ストアド ~ 基本編 / ループ ~
https://dk521123.hatenablog.com/entry/2022/12/16/143349
ファイル読込・書込
https://dk521123.hatenablog.com/entry/2019/10/07/000000