◾️はじめに
https://dk521123.hatenablog.com/entry/2024/07/20/034930
の続き。 Python の dbtRunner を使った簡単なサンプルを作ってみる
目次
【1】前準備 1)SQL 【2】INSERT 1)SQL 2)Pythonコード 【3】SELECT 1)Pythonコード
【1】前準備
* 使用している dbt CLI については、以下の関連記事を参照のこと
dbt ~ dbt CLI ~
https://dk521123.hatenablog.com/entry/2024/07/21/234811
1)SQL
-- データ作成 DROP TABLE IF EXISTS daily_sample_user; CREATE TABLE daily_sample_user ( user_id VARCHAR NOT NULL, user_name VARCHAR NOT NULL, remarks VARCHAR NULL, updated_at DATE NOT NULL, created_at DATE NOT NULL ); -- データをクリア -- Truncate table daily_sample_user; -- 更新データ INSERT INTO daily_sample_user(user_id, user_name, remarks, updated_at, created_at) VALUES ('X001', 'Mike', 'This is a demo', '2024-07-20', '2024-07-20'); INSERT INTO daily_sample_user(user_id, user_name, remarks, updated_at, created_at) VALUES ('X002', 'Tom', 'This is a demo', '2024-07-20', '2024-07-20'); INSERT INTO daily_sample_user(user_id, user_name, remarks, updated_at, created_at) VALUES ('X003', 'Smith', 'This is a demo', '2024-07-20', '2024-07-20'); INSERT INTO daily_sample_user(user_id, user_name, remarks, updated_at, created_at) VALUES ('X004', 'Kevin', 'This is a demo', '2024-07-20', '2024-07-20'); INSERT INTO daily_sample_user(user_id, user_name, remarks, updated_at, created_at) VALUES ('X005', 'Ann', 'This is a demo', '2024-07-20', '2024-07-20');
【1】INSERT
* INSERTするサンプルを作ってみる
1)SQL
full_sample_user.sql
{{ config( materialized='incremental', unique_key='user_id', incremental_strategy='append', tags=["sample_tag"] ) }} WITH insert_user AS ( SELECT user_id AS user_id, user_name AS user_name, remarks AS remarks, updated_at AS updated_at, created_at AS created_at FROM {{ source('demo_source', 'daily_sample_user') }} ) SELECT * FROM insert_user
2)Pythonコード
demo.py
from dbt.cli.main import dbtRunner, dbtRunnerResult # initialize dbt = dbtRunner() # create CLI args as a list of strings cli_args = ["run", "--select", "tag:sample_tag"] # run the command response: dbtRunnerResult = dbt.invoke(cli_args) if response.success: for r in response.result: print(f"{r.node.name}: {r.status}") print(r.adapter_response) else: print(f"response = {response}")
出力結果例
16:26:06 Running with dbt=1.8.3 16:26:06 Registered adapter: postgres=1.8.2 16:26:07 Found 5 models, 4 data tests, 2 sources, 415 macros 16:26:07 16:26:07 Concurrency: 1 threads (target='dev') 16:26:07 16:26:07 1 of 1 START sql incremental model hello.full_sample_user ...................... [RUN] 16:26:07 1 of 1 OK created sql incremental model hello.full_sample_user ................. [SELECT 5 in 0.24s] 16:26:08 16:26:08 Finished running 1 incremental model in 0 hours 0 minutes and 0.84 seconds (0.84s). 16:26:08 16:26:08 Completed successfully 16:26:08 16:26:08 Done. PASS=1 WARN=0 ERROR=0 SKIP=0 TOTAL=1 full_sample_user: success << 「print(f"{r.node.name}: {r.status}")」 {'_message': 'SELECT 5', 'code': 'SELECT', 'rows_affected': 5} << 「print(r.adapter_response)」
【2】SELECT
* SELECTするサンプルを作って、INSERTしたデータを表示する
1)Pythonコード
from dbt.cli.main import dbtRunner, dbtRunnerResult # initialize dbt = dbtRunner() # create CLI args as a list of strings cli_args = [ "show", "--inline", "select * from hello.full_sample_user" "--limit 20", # "show", "--inline", "select * from hello.daily_sample_user" "--limit 20", # "--profiles-dir", "./", "--project-dir", "./dbt/my_project/", ] # run the command response: dbtRunnerResult = dbt.invoke(cli_args) if response.success: # inspect the results for r in res.result: print(f"{r.node.name}: {r.status}") table = r.agate_table print(table) for row in table: print(f'{row["user_id"].strip()} | {row["user_name"].strip()} | {row["remarks"].strip()}') else: print(f"response = {response}")
出力結果例
17:53:05 Running with dbt=1.8.3 17:53:06 Registered adapter: postgres=1.8.2 17:53:06 Found 5 models, 4 data tests, 1 sql operation, 2 sources, 415 macros 17:53:06 17:53:07 Concurrency: 1 threads (target='dev') 17:53:07 17:53:07 Previewing inline node: | user_id | user_name | remarks | updated_at | created_at | | ------- | --------- | -------------- | ---------- | ---------- | | X001 | Mike | This is a demo | 2024-07-20 | 2024-07-20 | | X002 | Tom | This is a demo | 2024-07-20 | 2024-07-20 | | X003 | Smith | This is a demo | 2024-07-20 | 2024-07-20 | | X004 | Kevin | This is a demo | 2024-07-20 | 2024-07-20 | | X005 | Ann | This is a demo | 2024-07-20 | 2024-07-20 | inline_query: success << 「print(f"{r.node.name}: {r.status}")」 | column | data_type | << 「print(table)」 | ---------- | --------- | | user_id | Text | | user_name | Text | | remarks | Text | | updated_at | Date | | created_at | Date | X001 | Mike | This is a demo << 「print(f'{row["user_id"].strip()} | {row["user_name"].strip()} | {row["remarks"].strip()}')」 X002 | Tom | This is a demo X003 | Smith | This is a demo X004 | Kevin | This is a demo X005 | Ann | This is a demo
参考文献
https://dev.classmethod.jp/articles/dbt-try-show-command/
関連記事
dbt ~ 基礎知識編 ~
https://dk521123.hatenablog.com/entry/2023/06/30/000000
dbt ~ 環境設定編 ~
https://dk521123.hatenablog.com/entry/2023/12/16/152147
dbt ~ 入門編 ~
https://dk521123.hatenablog.com/entry/2023/05/30/151003
dbt CLI ~ 入門編 ~
https://dk521123.hatenablog.com/entry/2024/07/21/234811
dbt CLI ~ SELECT実行 / dbt show ~
https://dk521123.hatenablog.com/entry/2025/04/27/125533
Python with dbt ~ 入門編 ~
https://dk521123.hatenablog.com/entry/2024/07/20/034930
Python with dbt ~ rows_affected ~
https://dk521123.hatenablog.com/entry/2024/09/02/234559