【分散処理】PySpark ~ PySpark経由でDBに接続する ~

■ はじめに

PySpark経由でDB(今回は「PostgreSQL」)に接続する方法を
メモっておく。

■ 準備

JDBC 接続するので、DB の JDBC ドライバを用意しておくこと

今回は、PostgreSQLなので、以下のサイトからダウンロードした
(今回は、「postgresql-42.2.14.jar」)

https://jdbc.postgresql.org/download.html

■ はまりポイント

PostgreSQLのドライバをうまく読み込めなくて、
以下のエラーが発生した点。
 => 以下のサンプルのように
  環境変数「PYSPARK_SUBMIT_ARGS」を設定
  したりして解決した

エラー内容

An error occurred while calling o34.load. :
java.lang.ClassNotFoundException: org.postgresql.Drive

■ サンプル

例1:環境変数を設定しないバージョン

from pyspark.sql import SparkSession

spark = SparkSession \
  .builder \
  .appName("Hello Word") \
  .config("spark.jars", "postgresql-42.2.14.jar") \
  .getOrCreate()

# テーブル「person」のデータをロードする
data_frame = spark.read \
  .format("jdbc") \
  .option("url", "jdbc:postgresql://localhost:5432/sample_db") \
  .option("user", "postgres") \
  .option("dbtable", "person") \
  .option("password", "password") \
  .option("driver", "org.postgresql.Driver") \
  .load()

# 表示
data_frame.show()

# 項目を絞る
data_frame.select("name").show()

# 複数で絞りたい場合
# data_frame.select("id", "name").show()

# WHERE的に条件で絞る
data_frame.filter(data_frame["name"] == "Mike").show()

出力結果例

+--------+-----+-------------+
|      id| name|position_code|
+--------+-----+-------------+
|X0000000|admin|          P01|
|X0000001| Mike|          P01|
|X0000002|  Tom|          P01|
+--------+-----+-------------+

+-----+
| name|
+-----+
|admin|
| Mike|
|  Tom|
+-----+

+--------+----+-------------+
|      id|name|position_code|
+--------+----+-------------+
|X0000001|Mike|          P01|
+--------+----+-------------+

例2:環境変数を設定したバージョン

import os

from pyspark.sql import SparkSession

# DB情報
db_host = "localhost"
db_port = 5432
db_name = "sample_db"
db_user = "postgres"
db_password = "password"
jdbc_driver_path = "C:\\tmp\\postgresql-42.2.19.jar"

# 実行したいSQL
query = "SELECT * FROM users"

os.environ['PYSPARK_SUBMIT_ARGS'] = \
  f'--packages org.postgresql:postgresql:42.2.19 pyspark-shell'

spark = SparkSession.builder \
  .appName("PySpark SQL Hello world") \
  .config("spark.jars", jdbc_driver_path) \
  .getOrCreate()

df = spark.read \
  .format("jdbc") \
  .option("driver", "org.postgresql.Driver") \
  .option("url", f"jdbc:postgresql://{db_host}:{db_port}/{db_name}") \
  .option("user", db_user) \
  .option("password", db_password) \
  .option("query", query) \
  .load()

df.show()

実行結果

Ivy Default Cache set to: C:\Users\admin\.ivy2\cache
The jars for the packages stored in: C:\Users\admin\.ivy2\jars
:: loading settings :: url = jar:file:/C:/hoge/spark-2.4.7-bin-hadoop2.7/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
org.postgresql#postgresql added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-XXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXX;1.0
        confs: [default]
        found org.postgresql#postgresql;42.2.19 in central
        found org.checkerframehoge#checker-qual;3.5.0 in central
:: resolution report :: resolve 273ms :: artifacts dl 14ms
        :: modules in use:
        org.checkerframehoge#checker-qual;3.5.0 from central in [default]
        org.postgresql#postgresql;42.2.19 from central in [default]
        ---------------------------------------------------------------------
        |                  |            modules            ||   artifacts   |
        |       conf       | number| search|dwnlded|evicted|| number|dwnlded|
        ---------------------------------------------------------------------
        |      default     |   2   |   0   |   0   |   0   ||   2   |   0   |
        ---------------------------------------------------------------------
:: retrieving :: org.apache.spark#spark-submit-parent-XXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXX
        confs: [default]
        0 artifacts copied, 2 already retrieved (0kB/13ms)
21/04/07 23:11:14 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
★用意していたテーブルデータ★
+-------+---------+--------------------+
|user_id|user_name|          created_at|
+-------+---------+--------------------+
|      1|     Mike|2019-12-19 21:50:...|
|      2|      Tom|2019-12-19 21:50:...|
|      3|      Sam|2019-12-19 21:50:...|
|      4|    Kevin|2019-12-19 21:50:...|
|      5|    Smith|2019-12-19 21:50:...|
|      6|      Ken|2019-12-19 21:50:...|
|     10|      Sam|2020-08-26 22:39:...|
|     12|    Naomi|2020-08-26 22:39:...|
+-------+---------+--------------------+


C:\hoge\python>成功: PID 2064 のプロセス (PID 22664 の子プロセス) を終了しました。
成功: PID 22664 のプロセス (PID 932 の子プロセス) を終了しました。
成功: PID 932 のプロセス (PID 20424 の子プロセス) を終了しました。

参考文献

https://stackoverflow.com/questions/34948296/using-pyspark-to-connect-to-postgresql/37058979
https://cloudfish.hatenablog.com/entry/2018/08/03/191424
https://github.com/jupyter/notebook/issues/743
https://stackoverflow.com/questions/60183150/error-while-connecting-to-postgresql-with-pyspark-in-shell-org-postgresql-driv

関連記事

PySpark ~ 環境設定 + Hello World編 ~
https://dk521123.hatenablog.com/entry/2019/11/14/221126
PySpark ~ CSV編 ~
https://dk521123.hatenablog.com/entry/2019/11/24/225534
PySpark ~ DataFrame / データ操作編 ~
https://dk521123.hatenablog.com/entry/2020/01/04/150942
PySpark ~ DataFrame / テーブル・項目操作編 ~
https://dk521123.hatenablog.com/entry/2020/05/18/154829
PySpark ~ DataFrame / show() 編 ~
https://dk521123.hatenablog.com/entry/2021/04/26/161342
PySpark ~ データをクリーニングする ~
https://dk521123.hatenablog.com/entry/2020/07/08/162552
PySpark ~ Hive ~
https://dk521123.hatenablog.com/entry/2020/05/14/222415
Glue から Redshift/PostgreSQL に接続する ~ Python shell編 ~
https://dk521123.hatenablog.com/entry/2020/08/26/193237
Glue から Redshift/PostgreSQL に接続する ~ PySpark編 ~
https://dk521123.hatenablog.com/entry/2020/09/23/111741