【分散処理】PySpark ~ DB・テーブル・項目取得編 ~

■ はじめに

https://dk521123.hatenablog.com/entry/2020/01/04/150942
https://dk521123.hatenablog.com/entry/2020/05/18/154829
https://dk521123.hatenablog.com/entry/2020/08/28/183706

の続き。

 今回は、テーブルデータを集計するにあたり、
テーブル名一覧や項目名を取得するやり方をメモする。

目次

【1】DB一覧を取得する
【2】テーブル一覧を取得する
【3】テーブル項目一覧を取得する
 1)DataFrame.columns
 2)DataFrame.printSchema()
 注意)spark.catalog.listColumns() について
【4】サンプル

【1】DB一覧を取得する

* spark.catalog.listDatabases() で取得可能

サンプル

# 一部抜粋

print("** DB一覧の取得 ************************")
for db_info in spark.catalog.listDatabases():
  print(db_info)
  print(db_info.name)

出力結果

** DB一覧の取得 ************************
Database(name='default', description='default database', locationUri='file:/C:/xxx/spark-warehouse')
default

【2】テーブル一覧を取得する

spark.catalog.listTables() で取得可能

サンプル

# 一部抜粋

print("** Table一覧の取得 ************************")
for table_info in spark.catalog.listTables(db_info.name):
  print(table_info)
  print(table_info.name)

出力結果

** Table一覧の取得 ************************
Table(name='table_dummy_1', database=None, description=None, tableType='TEMPORARY', isTemporary=True)
table_dummy_1

【3】テーブル項目一覧を取得する

* DataFrame.columns で取得

※ おまけ:デバッグ表示として…
* DataFrame.printSchema()で表示

1)DataFrame.columns

* 以下の関連記事でも使用している。

https://dk521123.hatenablog.com/entry/2020/08/28/183706

API仕様
https://spark.apache.org/docs/3.1.1/api/python/reference/api/pyspark.sql.DataFrame.columns.html
サンプル

# 一部抜粋

print(df.columns)

出力結果

['id', 'name', 'job']

2)DataFrame.printSchema()

サンプル

# 一部抜粋

df.printSchema()

出力結果

root
 |-- id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- job: string (nullable = true)

注意点)spark.catalog.listColumns() について

Standalone環境において、
spark.catalog.listColumns() で取得を試みたのだが、
うまくいかなかった
~~~~~~~~~~~~~~~~~~~~~
  raise AnalysisException(s.split(': ', 1)[1], stackTrace)
pyspark.sql.utils.AnalysisException:
"Table 'table_dummy_1' does not exist in database 'default'.;"
~~~~~~~~~~~~~~~~~~~~~

調べてみたところ、以下のサイトを見つけた。

https://github.com/delta-io/delta/issues/573

より抜粋
~~~~~~~
spark.catalog.listColumns is a private Spark API.
Spark doesn't provide a proper interface to allow Delta to inject its metadata.
~~~~~~~

【4】サンプル

from pyspark.sql import SparkSession

spark = SparkSession \
  .builder \
  .appName("Demo") \
  .getOrCreate()

df = spark.read.csv(
  "./hello_world.csv", header=True)

print(df.columns)

df.printSchema()

# 動作確認用にダミーでテーブルを作成
df.createOrReplaceTempView("table_dummy_1")
df.createOrReplaceTempView("table_dummy_2")
df.createOrReplaceTempView("table_dummy_3")

# DB一覧の取得
print("** DB一覧の取得 ************************")
for db_info in spark.catalog.listDatabases():
  print(db_info)
  print(db_info.name)
  # Table一覧の取得
  print("** Table一覧の取得 ************************")
  for table_info in spark.catalog.listTables(db_info.name):
    print(table_info)
    print(table_info.name)
    # 例外発生(「注意点)spark.catalog.listColumns() について」参照)
    # 項目一覧の取得
    #print("** 項目一覧の取得 ************************")
    #for column_info in spark.catalog.listColumns('table_dummy_3'):
    #  print(column_info)

入力ファイル「hello_world.csv

id,name,job
x0001,Mike,Sales
x0002,Tom,IT
x0003,Sam,Sales
x0004,Kevin,Human resources
x0005,Bob,IT
x0006,Alice,Banking
x0007,Carol,IT
x0008,Tom,Banking
x0009,Mike,IT
x0010,Bob,Sales

関連記事

PySpark ~ 環境構築編 ~
https://dk521123.hatenablog.com/entry/2019/11/14/221126
PySpark ~ 入門編 ~
https://dk521123.hatenablog.com/entry/2021/04/03/004254
PySpark ~ DataFrame / データ操作編 ~
https://dk521123.hatenablog.com/entry/2020/01/04/150942
PySpark ~ DataFrame / データ集計編 ~
https://dk521123.hatenablog.com/entry/2021/05/25/111051
PySpark ~ DataFrame / テーブル・項目操作編 ~
https://dk521123.hatenablog.com/entry/2020/05/18/154829
PySpark ~ DataFrame / 項目数を取得するには ~
https://dk521123.hatenablog.com/entry/2020/08/28/183706
PySpark ~ DataFrame / show() 編 ~
https://dk521123.hatenablog.com/entry/2021/04/26/161342