【dbt】dbt ~ 更新 / Delete and Insert ~

■ はじめに

https://dk521123.hatenablog.com/entry/2023/12/07/060129

の続き。

今回は、Delete and Insert について、深堀っていく

目次

【1】構文
【2】サンプル
【3】補足:処理フロー
 1)実際のログ

【1】構文

* delete and Insert に関しては、以下を組み合わせる
 + materialized='incremental'
 +  incremental_strategy='delete+insert'

イメージ

{{
    config(
        materialized='incremental',
        unique_key=['user_id' , ...],
        incremental_strategy='delete+insert'
    )
}}

【2】サンプル

[0] サンプルテーブル定義

DROP TABLE IF EXISTS daily_sample_purchase_history;
CREATE TABLE daily_sample_purchase_history (
  purchase_id VARCHAR NOT NULL,
  item_name VARCHAR NOT NULL,
  number INT NOT NULL,
  created_at DATE NOT NULL
);

[1] 初期データ (1回目)

INSERT INTO daily_sample_purchase_history (purchase_id, item_name, number, created_at) VALUES ('Y001', 'Apple', 2, '2023-12-01');
INSERT INTO daily_sample_purchase_history (purchase_id, item_name, number, created_at) VALUES ('Y002', 'Melon', 5, '2023-12-01');

SELECT * FROM daily_sample_purchase_history;
~~~~~~
"purchase_id","item_name","number","created_at"
"Y001","Apple","2","2023-12-01"
"Y002","Melon","5","2023-12-01"
~~~~~~

[3] モデル「models/full_sample_purchase_history.sql」定義 (1回目)

{{
  config(
    materialized='incremental',
    unique_key=['created_at'],
    incremental_strategy='delete+insert'
  )
}}

WITH
updated_purchase_history AS (
  SELECT
    purchase_id AS purchase_id,
    item_name AS item_name,
    number AS number,
    created_at::DATE AS created_at
  FROM
    {{ source('demo_source', 'daily_sample_purchase_history') }}
)

SELECT * FROM updated_purchase_history

[4] dbt run (1回目)

dbt run

[5] 確認

SELECT * FROM full_sample_purchase_history;
~~~~~~
"purchase_id","item_name","number","created_at"
"X001","Apple","2","2023-12-01"
"X002","Melon","5","2023-12-01"
~~~~~~

[6] 追加データ投入 (2回目)

-- 1回目のデータをクリア
Truncate table daily_sample_purchase_history;

-- 更新データ
INSERT INTO daily_sample_purchase_history (purchase_id, item_name, number, created_at) VALUES ('Y001', 'Orange', 12, '2023-12-01');
INSERT INTO daily_sample_purchase_history (purchase_id, item_name, number, created_at) VALUES ('Y003', 'Pineapple', 21, '2023-12-01');

SELECT * FROM daily_sample_purchase_history;
~~~~~~
"purchase_id","item_name","number","created_at"
"Y001","Orange","12","2023-12-01"
"Y003","Pineapple","21","2023-12-01"
~~~~~~

[7] dbt run (2回目)

dbt run

[8] 確認

SELECT * FROM full_sample_purchase_history ORDER BY purchase_id;
~~~~~~
"purchase_id","item_name","number","created_at"
"Y001","Orange","12","2023-12-01"
"Y003","Pineapple","21","2023-12-01"
~~~~~~

[9] 追加データ投入 (3回目)

-- 2回目のデータをクリア
Truncate table daily_sample_purchase_history;

-- 更新データ
INSERT INTO daily_sample_purchase_history (purchase_id, item_name, number, created_at) VALUES ('Y001', 'Mango', 23, '2023-12-02');
INSERT INTO daily_sample_purchase_history (purchase_id, item_name, number, created_at) VALUES ('Y005', 'Banana', 34, '2023-12-02');

SELECT * FROM daily_sample_purchase_history;
~~~~~~
"purchase_id","item_name","number","created_at"
"Y001","Mango","23","2023-12-02"
"Y005","Banana","34","2023-12-02"
~~~~~~

[10] dbt run (3回目)

dbt run

[11] 確認

SELECT * FROM full_sample_purchase_history ORDER BY purchase_id;

【3】補足:処理フロー

dbt が、裏でどういった処理を行なっているのか確認するために、

https://dk521123.hatenablog.com/entry/2025/09/04/000906

のコードをDelete and Insertに書き換えて、
dbt run --log-level DEBUGで、ログだししたところ、
以下のような処理をしていることがわかった。

処理フロー・概要

Step1:TMPテーブル(user_full__dbt_tmp042824425257)で新規作成

Step2:オリジナルテーブル(user_full)と
   TMPテーブル(user_full__dbt_tmp042824425257)の
   カラム情報を取得(一致しているか確認してる?)

Step3:オリジナルテーブル(user_full)に対して、
   TMPテーブル(user_full__dbt_tmp042824425257)を使って、
   DELETE+INSERTをかける

1)実際のログ

[0m04:28:24  Began running node model.my_project.user_full
[0m04:28:24  2 of 3 START sql incremental model public.user_full ............................ [RUN]
[0m04:28:24  Re-using an available connection from the pool (formerly model.my_project.my_first_dbt_model, now model.my_project.user_full)
[0m04:28:24  Began compiling node model.my_project.user_full
[0m04:28:24  Writing injected SQL for node "model.my_project.user_full"
[0m04:28:24  Began executing node model.my_project.user_full
[0m04:28:24  Using postgres connection "model.my_project.user_full"
[0m04:28:24  On model.my_project.user_full: /* {"app": "dbt", "dbt_version": "1.9.0", "profile_name": "my_project", "target_name": "dev", "node_id": "model.my_project.user_full"} */

★ Step1:TMPテーブル(user_full__dbt_tmp042824425257)で新規作成
  create temporary table "user_full__dbt_tmp042824425257"
    as  
  (
WITH
FULL_TABLE AS (    
        SELECT * FROM "dbt_db"."public"."user_full"
),
USER_TABLE AS (
    SELECT * FROM "dbt_db"."public"."user"
),
USER_PII AS (
    SELECT * FROM "dbt_db"."public"."user_pii"
),
FINAL_TABLE AS (
    SELECT
    COALESCE(u.user_id, pi.user_id) AS user_id,
    COALESCE(pi.name, f.name) AS name,
    COALESCE(u.gender, f.gender) AS gender,
    COALESCE(pi.email, f.email) AS email,
    COALESCE(u.remarks, f.remarks) AS remarks
    FROM
    USER_TABLE AS u
    FULL OUTER JOIN
    USER_PII AS pi
    ON
    u.user_id = pi.user_id
    LEFT OUTER JOIN
    FULL_TABLE AS f
    ON
    u.user_id = f.user_id
    OR pi.user_id = f.user_id
)

SELECT * FROM FINAL_TABLE
  );
  
[0m04:28:24  Opening a new connection, currently in state closed
[0m04:28:24  SQL status: SELECT 7 in 0.055 seconds
[0m04:28:24  Using postgres connection "model.my_project.user_full"
[0m04:28:24  On model.my_project.user_full: BEGIN
[0m04:28:24  SQL status: BEGIN in 0.000 seconds
[0m04:28:24  Using postgres connection "model.my_project.user_full"
[0m04:28:24  On model.my_project.user_full: /* {"app": "dbt", "dbt_version": "1.9.0", "profile_name": "my_project", "target_name": "dev", "node_id": "model.my_project.user_full"} */

★Step2:オリジナルテーブル(user_full)と
 TMPテーブル(user_full__dbt_tmp042824425257)の
 カラム情報を取得(一致しているか確認してる?)

      select
          column_name,
          data_type,
          character_maximum_length,
          numeric_precision,
          numeric_scale
      from INFORMATION_SCHEMA.columns
      where table_name = 'user_full__dbt_tmp042824425257'
      order by ordinal_position
  
[0m04:28:24  SQL status: SELECT 5 in 0.020 seconds
[0m04:28:24  Using postgres connection "model.my_project.user_full"
[0m04:28:24  On model.my_project.user_full: /* {"app": "dbt", "dbt_version": "1.9.0", "profile_name": "my_project", "target_name": "dev", "node_id": "model.my_project.user_full"} */

      select
          column_name,
          data_type,
          character_maximum_length,
          numeric_precision,
          numeric_scale
      from "dbt_db".INFORMATION_SCHEMA.columns
      where table_name = 'user_full'
        and table_schema = 'public'
      order by ordinal_position
  
[0m04:28:24  SQL status: SELECT 5 in 0.001 seconds
[0m04:28:24  Using postgres connection "model.my_project.user_full"
[0m04:28:24  On model.my_project.user_full: /* {"app": "dbt", "dbt_version": "1.9.0", "profile_name": "my_project", "target_name": "dev", "node_id": "model.my_project.user_full"} */

      select
          column_name,
          data_type,
          character_maximum_length,
          numeric_precision,
          numeric_scale
      from "dbt_db".INFORMATION_SCHEMA.columns
      where table_name = 'user_full'
        and table_schema = 'public'
      order by ordinal_position

[0m04:28:24  SQL status: SELECT 5 in 0.001 seconds
[0m04:28:24  Writing runtime sql for node "model.my_project.user_full"
[0m04:28:24  Using postgres connection "model.my_project.user_full"
[0m04:28:24  On model.my_project.user_full: /* {"app": "dbt", "dbt_version": "1.9.0", "profile_name": "my_project", "target_name": "dev", "node_id": "model.my_project.user_full"} */

★Step3:オリジナルテーブル(user_full)に対して、
 TMPテーブル(user_full__dbt_tmp042824425257)を使って、
 DELETE+INSERTをかける

            delete from "dbt_db"."public"."user_full"
            where (
                user_id) in (
                select (user_id)
                from "user_full__dbt_tmp042824425257"
            );

    insert into "dbt_db"."public"."user_full" ("user_id", "name", "gender", "email", "remarks")
    (
        select "user_id", "name", "gender", "email", "remarks"
        from "user_full__dbt_tmp042824425257"
    )
  
[0m04:28:24  SQL status: INSERT 0 7 in 0.005 seconds
[0m04:28:24  On model.my_project.user_full: COMMIT
[0m04:28:24  Using postgres connection "model.my_project.user_full"
[0m04:28:24  On model.my_project.user_full: COMMIT
[0m04:28:24  SQL status: COMMIT in 0.001 seconds
[0m04:28:24  On model.my_project.user_full: Close
[0m04:28:24  Sending event: {'category': 'dbt', 'action': 'run_model', 'label': 'c3d43d45-e240-42e7-bcec-a93436871f57', 'context': [<snowplow_tracker.self_describing_json.SelfDescribingJson object at 0x7ffff8b74860>]}
[0m04:28:24  2 of 3 OK created sql incremental model public.user_full ....................... [[32mINSERT 0 7[0m in 0.15s]
[0m04:28:24  Finished running node model.my_project.user_full

関連記事

dbt ~ 基礎知識編 ~
https://dk521123.hatenablog.com/entry/2023/06/30/000000
dbt ~ 入門編 ~
https://dk521123.hatenablog.com/entry/2023/05/30/151003
dbt ~ 基本編 / Source ~
https://dk521123.hatenablog.com/entry/2023/12/08/111012
dbt ~ 更新 / 基礎知識編 ~
https://dk521123.hatenablog.com/entry/2023/12/07/060129
dbt ~ 更新 / Update or Insert / Insert ~
https://dk521123.hatenablog.com/entry/2023/12/19/224453
dbt ~ 更新 / DROP + CTAS ~
https://dk521123.hatenablog.com/entry/2023/12/04/000000