■ はじめに
PySpark経由でDB(今回は「PostgreSQL」)に接続する方法を メモっておく。
■ 準備
JDBC 接続するので、DB の JDBC ドライバを用意しておくこと 今回は、PostgreSQLなので、以下のサイトからダウンロードした (今回は、「postgresql-42.2.14.jar」)
https://jdbc.postgresql.org/download.html
■ はまりポイント
PostgreSQLのドライバをうまく読み込めなくて、 以下のエラーが発生した点。 => 以下のサンプルのように 環境変数「PYSPARK_SUBMIT_ARGS」を設定 したりして解決した
エラー内容
An error occurred while calling o34.load. : java.lang.ClassNotFoundException: org.postgresql.Drive
■ サンプル
例1:環境変数を設定しないバージョン
from pyspark.sql import SparkSession spark = SparkSession \ .builder \ .appName("Hello Word") \ .config("spark.jars", "postgresql-42.2.14.jar") \ .getOrCreate() # テーブル「person」のデータをロードする data_frame = spark.read \ .format("jdbc") \ .option("url", "jdbc:postgresql://localhost:5432/sample_db") \ .option("user", "postgres") \ .option("dbtable", "person") \ .option("password", "password") \ .option("driver", "org.postgresql.Driver") \ .load() # 表示 data_frame.show() # 項目を絞る data_frame.select("name").show() # 複数で絞りたい場合 # data_frame.select("id", "name").show() # WHERE的に条件で絞る data_frame.filter(data_frame["name"] == "Mike").show()
出力結果例
+--------+-----+-------------+ | id| name|position_code| +--------+-----+-------------+ |X0000000|admin| P01| |X0000001| Mike| P01| |X0000002| Tom| P01| +--------+-----+-------------+ +-----+ | name| +-----+ |admin| | Mike| | Tom| +-----+ +--------+----+-------------+ | id|name|position_code| +--------+----+-------------+ |X0000001|Mike| P01| +--------+----+-------------+
例2:環境変数を設定したバージョン
import os from pyspark.sql import SparkSession # DB情報 db_host = "localhost" db_port = 5432 db_name = "sample_db" db_user = "postgres" db_password = "password" jdbc_driver_path = "C:\\tmp\\postgresql-42.2.19.jar" # 実行したいSQL query = "SELECT * FROM users" os.environ['PYSPARK_SUBMIT_ARGS'] = \ f'--packages org.postgresql:postgresql:42.2.19 pyspark-shell' spark = SparkSession.builder \ .appName("PySpark SQL Hello world") \ .config("spark.jars", jdbc_driver_path) \ .getOrCreate() df = spark.read \ .format("jdbc") \ .option("driver", "org.postgresql.Driver") \ .option("url", f"jdbc:postgresql://{db_host}:{db_port}/{db_name}") \ .option("user", db_user) \ .option("password", db_password) \ .option("query", query) \ .load() df.show()
実行結果
Ivy Default Cache set to: C:\Users\admin\.ivy2\cache The jars for the packages stored in: C:\Users\admin\.ivy2\jars :: loading settings :: url = jar:file:/C:/hoge/spark-2.4.7-bin-hadoop2.7/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml org.postgresql#postgresql added as a dependency :: resolving dependencies :: org.apache.spark#spark-submit-parent-XXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXX;1.0 confs: [default] found org.postgresql#postgresql;42.2.19 in central found org.checkerframehoge#checker-qual;3.5.0 in central :: resolution report :: resolve 273ms :: artifacts dl 14ms :: modules in use: org.checkerframehoge#checker-qual;3.5.0 from central in [default] org.postgresql#postgresql;42.2.19 from central in [default] --------------------------------------------------------------------- | | modules || artifacts | | conf | number| search|dwnlded|evicted|| number|dwnlded| --------------------------------------------------------------------- | default | 2 | 0 | 0 | 0 || 2 | 0 | --------------------------------------------------------------------- :: retrieving :: org.apache.spark#spark-submit-parent-XXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXX confs: [default] 0 artifacts copied, 2 already retrieved (0kB/13ms) 21/04/07 23:11:14 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). ★用意していたテーブルデータ★ +-------+---------+--------------------+ |user_id|user_name| created_at| +-------+---------+--------------------+ | 1| Mike|2019-12-19 21:50:...| | 2| Tom|2019-12-19 21:50:...| | 3| Sam|2019-12-19 21:50:...| | 4| Kevin|2019-12-19 21:50:...| | 5| Smith|2019-12-19 21:50:...| | 6| Ken|2019-12-19 21:50:...| | 10| Sam|2020-08-26 22:39:...| | 12| Naomi|2020-08-26 22:39:...| +-------+---------+--------------------+ C:\hoge\python>成功: PID 2064 のプロセス (PID 22664 の子プロセス) を終了しました。 成功: PID 22664 のプロセス (PID 932 の子プロセス) を終了しました。 成功: PID 932 のプロセス (PID 20424 の子プロセス) を終了しました。
参考文献
https://stackoverflow.com/questions/34948296/using-pyspark-to-connect-to-postgresql/37058979
https://cloudfish.hatenablog.com/entry/2018/08/03/191424
https://github.com/jupyter/notebook/issues/743
https://stackoverflow.com/questions/60183150/error-while-connecting-to-postgresql-with-pyspark-in-shell-org-postgresql-driv
関連記事
PySpark ~ 環境設定 + Hello World編 ~
https://dk521123.hatenablog.com/entry/2019/11/14/221126
PySpark ~ CSV編 ~
https://dk521123.hatenablog.com/entry/2019/11/24/225534
PySpark ~ DataFrame / データ操作編 ~
https://dk521123.hatenablog.com/entry/2020/01/04/150942
PySpark ~ DataFrame / テーブル・項目操作編 ~
https://dk521123.hatenablog.com/entry/2020/05/18/154829
PySpark ~ DataFrame / show() 編 ~
https://dk521123.hatenablog.com/entry/2021/04/26/161342
PySpark ~ データをクリーニングする ~
https://dk521123.hatenablog.com/entry/2020/07/08/162552
PySpark ~ Hive ~
https://dk521123.hatenablog.com/entry/2020/05/14/222415
Glue から Redshift/PostgreSQL に接続する ~ Python shell編 ~
https://dk521123.hatenablog.com/entry/2020/08/26/193237
Glue から Redshift/PostgreSQL に接続する ~ PySpark編 ~
https://dk521123.hatenablog.com/entry/2020/09/23/111741