【dbt】Python with dbt ~基本編 ~

◾️はじめに

https://dk521123.hatenablog.com/entry/2024/07/20/034930

の続き。

Python の dbtRunner を使った簡単なサンプルを作ってみる

目次

【1】前準備
 1)SQL
【2】INSERT
 1)SQL
 2)Pythonコード
【3】SELECT
 1)Pythonコード

【1】前準備

* 使用している dbt CLI については、以下の関連記事を参照のこと

dbt ~ dbt CLI
https://dk521123.hatenablog.com/entry/2024/07/21/234811

1)SQL

-- データ作成
DROP TABLE IF EXISTS daily_sample_user;
CREATE TABLE daily_sample_user (
  user_id VARCHAR NOT NULL,
  user_name VARCHAR NOT NULL,
  remarks VARCHAR NULL,
  updated_at DATE NOT NULL,
  created_at DATE NOT NULL
);

-- データをクリア
-- Truncate table daily_sample_user;

-- 更新データ
INSERT INTO daily_sample_user(user_id, user_name, remarks, updated_at, created_at)
 VALUES ('X001', 'Mike', 'This is a demo', '2024-07-20', '2024-07-20');
INSERT INTO daily_sample_user(user_id, user_name, remarks, updated_at, created_at)
 VALUES ('X002', 'Tom', 'This is a demo', '2024-07-20', '2024-07-20');
INSERT INTO daily_sample_user(user_id, user_name, remarks, updated_at, created_at)
 VALUES ('X003', 'Smith', 'This is a demo', '2024-07-20', '2024-07-20');
INSERT INTO daily_sample_user(user_id, user_name, remarks, updated_at, created_at)
 VALUES ('X004', 'Kevin', 'This is a demo', '2024-07-20', '2024-07-20');
INSERT INTO daily_sample_user(user_id, user_name, remarks, updated_at, created_at)
 VALUES ('X005', 'Ann', 'This is a demo', '2024-07-20', '2024-07-20');

【1】INSERT

* INSERTするサンプルを作ってみる

1)SQL

full_sample_user.sql

{{
  config(
    materialized='incremental',
    unique_key='user_id',
    incremental_strategy='append',
    tags=["sample_tag"]
  )
}}

WITH
insert_user AS (
  SELECT
    user_id AS user_id,
    user_name AS user_name,
    remarks AS remarks,
    updated_at AS updated_at,
    created_at AS created_at
  FROM
    {{ source('demo_source', 'daily_sample_user') }}
)

SELECT * FROM insert_user

2)Pythonコード

demo.py

from dbt.cli.main import dbtRunner, dbtRunnerResult


# initialize
dbt = dbtRunner()

# create CLI args as a list of strings
cli_args = ["run", "--select", "tag:sample_tag"]

# run the command
response: dbtRunnerResult = dbt.invoke(cli_args)

if response.success:
  for r in response.result:
    print(f"{r.node.name}: {r.status}")
    print(r.adapter_response)
else:
  print(f"response = {response}")

出力結果例

16:26:06  Running with dbt=1.8.3
16:26:06  Registered adapter: postgres=1.8.2
16:26:07  Found 5 models, 4 data tests, 2 sources, 415 macros
16:26:07
16:26:07  Concurrency: 1 threads (target='dev')
16:26:07
16:26:07  1 of 1 START sql incremental model hello.full_sample_user ...................... [RUN]
16:26:07  1 of 1 OK created sql incremental model hello.full_sample_user ................. [SELECT 5 in 0.24s]
16:26:08  
16:26:08  Finished running 1 incremental model in 0 hours 0 minutes and 0.84 seconds (0.84s).
16:26:08  
16:26:08  Completed successfully
16:26:08
16:26:08  Done. PASS=1 WARN=0 ERROR=0 SKIP=0 TOTAL=1
full_sample_user: success << 「print(f"{r.node.name}: {r.status}")」
{'_message': 'SELECT 5', 'code': 'SELECT', 'rows_affected': 5} << 「print(r.adapter_response)」

【2】SELECT

* SELECTするサンプルを作って、INSERTしたデータを表示する

1)Pythonコード

from dbt.cli.main import dbtRunner, dbtRunnerResult


# initialize
dbt = dbtRunner()

# create CLI args as a list of strings
cli_args = [
  "show", "--inline", "select * from hello.full_sample_user" "--limit 20",
#  "show", "--inline", "select * from hello.daily_sample_user" "--limit 20",
#   "--profiles-dir", "./", "--project-dir", "./dbt/my_project/",
]

# run the command
response: dbtRunnerResult = dbt.invoke(cli_args)

if response.success:
  # inspect the results
  for r in res.result:
    print(f"{r.node.name}: {r.status}")
    table = r.agate_table
    print(table)
    for row in table:
      print(f'{row["user_id"].strip()} | {row["user_name"].strip()} | {row["remarks"].strip()}')
else:
  print(f"response = {response}")

出力結果例

17:53:05  Running with dbt=1.8.3
17:53:06  Registered adapter: postgres=1.8.2
17:53:06  Found 5 models, 4 data tests, 1 sql operation, 2 sources, 415 macros
17:53:06
17:53:07  Concurrency: 1 threads (target='dev')
17:53:07
17:53:07  Previewing inline node:
| user_id | user_name | remarks        | updated_at | created_at |
| ------- | --------- | -------------- | ---------- | ---------- |
| X001    | Mike      | This is a demo | 2024-07-20 | 2024-07-20 |
| X002    | Tom       | This is a demo | 2024-07-20 | 2024-07-20 |
| X003    | Smith     | This is a demo | 2024-07-20 | 2024-07-20 |
| X004    | Kevin     | This is a demo | 2024-07-20 | 2024-07-20 |
| X005    | Ann       | This is a demo | 2024-07-20 | 2024-07-20 |

inline_query: success << 「print(f"{r.node.name}: {r.status}")」
| column     | data_type | << 「print(table)」
| ---------- | --------- |
| user_id    | Text      |
| user_name  | Text      |
| remarks    | Text      |
| updated_at | Date      |
| created_at | Date      |

X001 | Mike | This is a demo << 「print(f'{row["user_id"].strip()} | {row["user_name"].strip()} | {row["remarks"].strip()}')」
X002 | Tom | This is a demo
X003 | Smith | This is a demo
X004 | Kevin | This is a demo
X005 | Ann | This is a demo

参考文献

https://dev.classmethod.jp/articles/dbt-try-show-command/

関連記事

dbt ~ 基礎知識編 ~
https://dk521123.hatenablog.com/entry/2023/06/30/000000
dbt ~ 環境設定編 ~
https://dk521123.hatenablog.com/entry/2023/12/16/152147
dbt ~ 入門編 ~
https://dk521123.hatenablog.com/entry/2023/05/30/151003
dbt CLI ~ 入門編 ~
https://dk521123.hatenablog.com/entry/2024/07/21/234811
dbt CLI ~ SELECT実行 / dbt show ~
https://dk521123.hatenablog.com/entry/2025/04/27/125533
Python with dbt ~ 入門編 ~
https://dk521123.hatenablog.com/entry/2024/07/20/034930
Python with dbt ~ rows_affected ~
https://dk521123.hatenablog.com/entry/2024/09/02/234559