■ はじめに
https://dk521123.hatenablog.com/entry/2023/12/07/060129
の続き。 今回は、Delete and Insert について、深堀っていく
目次
【1】構文 【2】サンプル 【3】補足:処理フロー 1)実際のログ
【1】構文
* delete and Insert に関しては、以下を組み合わせる + materialized='incremental' + incremental_strategy='delete+insert'
イメージ
{{ config( materialized='incremental', unique_key=['user_id' , ...], incremental_strategy='delete+insert' ) }}
【2】サンプル
[0] サンプルテーブル定義
DROP TABLE IF EXISTS daily_sample_purchase_history; CREATE TABLE daily_sample_purchase_history ( purchase_id VARCHAR NOT NULL, item_name VARCHAR NOT NULL, number INT NOT NULL, created_at DATE NOT NULL );
[1] 初期データ (1回目)
INSERT INTO daily_sample_purchase_history (purchase_id, item_name, number, created_at) VALUES ('Y001', 'Apple', 2, '2023-12-01'); INSERT INTO daily_sample_purchase_history (purchase_id, item_name, number, created_at) VALUES ('Y002', 'Melon', 5, '2023-12-01'); SELECT * FROM daily_sample_purchase_history; ~~~~~~ "purchase_id","item_name","number","created_at" "Y001","Apple","2","2023-12-01" "Y002","Melon","5","2023-12-01" ~~~~~~
[3] モデル「models/full_sample_purchase_history.sql」定義 (1回目)
{{ config( materialized='incremental', unique_key=['created_at'], incremental_strategy='delete+insert' ) }} WITH updated_purchase_history AS ( SELECT purchase_id AS purchase_id, item_name AS item_name, number AS number, created_at::DATE AS created_at FROM {{ source('demo_source', 'daily_sample_purchase_history') }} ) SELECT * FROM updated_purchase_history
[4] dbt run (1回目)
dbt run
[5] 確認
SELECT * FROM full_sample_purchase_history; ~~~~~~ "purchase_id","item_name","number","created_at" "X001","Apple","2","2023-12-01" "X002","Melon","5","2023-12-01" ~~~~~~
[6] 追加データ投入 (2回目)
-- 1回目のデータをクリア Truncate table daily_sample_purchase_history; -- 更新データ INSERT INTO daily_sample_purchase_history (purchase_id, item_name, number, created_at) VALUES ('Y001', 'Orange', 12, '2023-12-01'); INSERT INTO daily_sample_purchase_history (purchase_id, item_name, number, created_at) VALUES ('Y003', 'Pineapple', 21, '2023-12-01'); SELECT * FROM daily_sample_purchase_history; ~~~~~~ "purchase_id","item_name","number","created_at" "Y001","Orange","12","2023-12-01" "Y003","Pineapple","21","2023-12-01" ~~~~~~
[7] dbt run (2回目)
dbt run
[8] 確認
SELECT * FROM full_sample_purchase_history ORDER BY purchase_id; ~~~~~~ "purchase_id","item_name","number","created_at" "Y001","Orange","12","2023-12-01" "Y003","Pineapple","21","2023-12-01" ~~~~~~
[9] 追加データ投入 (3回目)
-- 2回目のデータをクリア Truncate table daily_sample_purchase_history; -- 更新データ INSERT INTO daily_sample_purchase_history (purchase_id, item_name, number, created_at) VALUES ('Y001', 'Mango', 23, '2023-12-02'); INSERT INTO daily_sample_purchase_history (purchase_id, item_name, number, created_at) VALUES ('Y005', 'Banana', 34, '2023-12-02'); SELECT * FROM daily_sample_purchase_history; ~~~~~~ "purchase_id","item_name","number","created_at" "Y001","Mango","23","2023-12-02" "Y005","Banana","34","2023-12-02" ~~~~~~
[10] dbt run (3回目)
dbt run
[11] 確認
SELECT * FROM full_sample_purchase_history ORDER BY purchase_id;
【3】補足:処理フロー
dbt が、裏でどういった処理を行なっているのか確認するために、
https://dk521123.hatenablog.com/entry/2025/09/04/000906
のコードをDelete and Insertに書き換えて、 dbt run --log-level DEBUGで、ログだししたところ、 以下のような処理をしていることがわかった。
処理フロー・概要
Step1:TMPテーブル(user_full__dbt_tmp042824425257)で新規作成 Step2:オリジナルテーブル(user_full)と TMPテーブル(user_full__dbt_tmp042824425257)の カラム情報を取得(一致しているか確認してる?) Step3:オリジナルテーブル(user_full)に対して、 TMPテーブル(user_full__dbt_tmp042824425257)を使って、 DELETE+INSERTをかける
1)実際のログ
[0m04:28:24 Began running node model.my_project.user_full [0m04:28:24 2 of 3 START sql incremental model public.user_full ............................ [RUN] [0m04:28:24 Re-using an available connection from the pool (formerly model.my_project.my_first_dbt_model, now model.my_project.user_full) [0m04:28:24 Began compiling node model.my_project.user_full [0m04:28:24 Writing injected SQL for node "model.my_project.user_full" [0m04:28:24 Began executing node model.my_project.user_full [0m04:28:24 Using postgres connection "model.my_project.user_full" [0m04:28:24 On model.my_project.user_full: /* {"app": "dbt", "dbt_version": "1.9.0", "profile_name": "my_project", "target_name": "dev", "node_id": "model.my_project.user_full"} */ ★ Step1:TMPテーブル(user_full__dbt_tmp042824425257)で新規作成 create temporary table "user_full__dbt_tmp042824425257" as ( WITH FULL_TABLE AS ( SELECT * FROM "dbt_db"."public"."user_full" ), USER_TABLE AS ( SELECT * FROM "dbt_db"."public"."user" ), USER_PII AS ( SELECT * FROM "dbt_db"."public"."user_pii" ), FINAL_TABLE AS ( SELECT COALESCE(u.user_id, pi.user_id) AS user_id, COALESCE(pi.name, f.name) AS name, COALESCE(u.gender, f.gender) AS gender, COALESCE(pi.email, f.email) AS email, COALESCE(u.remarks, f.remarks) AS remarks FROM USER_TABLE AS u FULL OUTER JOIN USER_PII AS pi ON u.user_id = pi.user_id LEFT OUTER JOIN FULL_TABLE AS f ON u.user_id = f.user_id OR pi.user_id = f.user_id ) SELECT * FROM FINAL_TABLE ); [0m04:28:24 Opening a new connection, currently in state closed [0m04:28:24 SQL status: SELECT 7 in 0.055 seconds [0m04:28:24 Using postgres connection "model.my_project.user_full" [0m04:28:24 On model.my_project.user_full: BEGIN [0m04:28:24 SQL status: BEGIN in 0.000 seconds [0m04:28:24 Using postgres connection "model.my_project.user_full" [0m04:28:24 On model.my_project.user_full: /* {"app": "dbt", "dbt_version": "1.9.0", "profile_name": "my_project", "target_name": "dev", "node_id": "model.my_project.user_full"} */ ★Step2:オリジナルテーブル(user_full)と TMPテーブル(user_full__dbt_tmp042824425257)の カラム情報を取得(一致しているか確認してる?) select column_name, data_type, character_maximum_length, numeric_precision, numeric_scale from INFORMATION_SCHEMA.columns where table_name = 'user_full__dbt_tmp042824425257' order by ordinal_position [0m04:28:24 SQL status: SELECT 5 in 0.020 seconds [0m04:28:24 Using postgres connection "model.my_project.user_full" [0m04:28:24 On model.my_project.user_full: /* {"app": "dbt", "dbt_version": "1.9.0", "profile_name": "my_project", "target_name": "dev", "node_id": "model.my_project.user_full"} */ select column_name, data_type, character_maximum_length, numeric_precision, numeric_scale from "dbt_db".INFORMATION_SCHEMA.columns where table_name = 'user_full' and table_schema = 'public' order by ordinal_position [0m04:28:24 SQL status: SELECT 5 in 0.001 seconds [0m04:28:24 Using postgres connection "model.my_project.user_full" [0m04:28:24 On model.my_project.user_full: /* {"app": "dbt", "dbt_version": "1.9.0", "profile_name": "my_project", "target_name": "dev", "node_id": "model.my_project.user_full"} */ select column_name, data_type, character_maximum_length, numeric_precision, numeric_scale from "dbt_db".INFORMATION_SCHEMA.columns where table_name = 'user_full' and table_schema = 'public' order by ordinal_position [0m04:28:24 SQL status: SELECT 5 in 0.001 seconds [0m04:28:24 Writing runtime sql for node "model.my_project.user_full" [0m04:28:24 Using postgres connection "model.my_project.user_full" [0m04:28:24 On model.my_project.user_full: /* {"app": "dbt", "dbt_version": "1.9.0", "profile_name": "my_project", "target_name": "dev", "node_id": "model.my_project.user_full"} */ ★Step3:オリジナルテーブル(user_full)に対して、 TMPテーブル(user_full__dbt_tmp042824425257)を使って、 DELETE+INSERTをかける delete from "dbt_db"."public"."user_full" where ( user_id) in ( select (user_id) from "user_full__dbt_tmp042824425257" ); insert into "dbt_db"."public"."user_full" ("user_id", "name", "gender", "email", "remarks") ( select "user_id", "name", "gender", "email", "remarks" from "user_full__dbt_tmp042824425257" ) [0m04:28:24 SQL status: INSERT 0 7 in 0.005 seconds [0m04:28:24 On model.my_project.user_full: COMMIT [0m04:28:24 Using postgres connection "model.my_project.user_full" [0m04:28:24 On model.my_project.user_full: COMMIT [0m04:28:24 SQL status: COMMIT in 0.001 seconds [0m04:28:24 On model.my_project.user_full: Close [0m04:28:24 Sending event: {'category': 'dbt', 'action': 'run_model', 'label': 'c3d43d45-e240-42e7-bcec-a93436871f57', 'context': [<snowplow_tracker.self_describing_json.SelfDescribingJson object at 0x7ffff8b74860>]} [0m04:28:24 2 of 3 OK created sql incremental model public.user_full ....................... [[32mINSERT 0 7[0m in 0.15s] [0m04:28:24 Finished running node model.my_project.user_full
関連記事
dbt ~ 基礎知識編 ~
https://dk521123.hatenablog.com/entry/2023/06/30/000000
dbt ~ 入門編 ~
https://dk521123.hatenablog.com/entry/2023/05/30/151003
dbt ~ 基本編 / Source ~
https://dk521123.hatenablog.com/entry/2023/12/08/111012
dbt ~ 更新 / 基礎知識編 ~
https://dk521123.hatenablog.com/entry/2023/12/07/060129
dbt ~ 更新 / Update or Insert / Insert ~
https://dk521123.hatenablog.com/entry/2023/12/19/224453
dbt ~ 更新 / DROP + CTAS ~
https://dk521123.hatenablog.com/entry/2023/12/04/000000