【分散処理】PySpark ~ CSV / 基本編 ~

■ はじめに

https://dk521123.hatenablog.com/entry/2019/11/14/221126
https://dk521123.hatenablog.com/entry/2021/04/03/004254

の続き。
今回は、PySpark で CSV を扱う。

補足

エスケープされた区切り文字が含んだデータを扱う場合は
以下の関連記事を参照のこと。

PySpark ~ エスケープされた区切り文字が含んだデータを扱う ~
https://dk521123.hatenablog.com/entry/2020/07/09/000832

目次

【1】関連するAPI
 1)読み込み
 2)書き込み
【2】補足
 1)読み込み時でファイルがなかった場合の挙動
【3】サンプル
 例1:CSVファイルの読み込み
 例2:CSVファイルの書き込み
 例3:distinct 重複値の削除

【1】関連するAPI

1)読み込み

spark.read.csv()
https://spark.apache.org/docs/latest/api/python/pyspark.sql.html?highlight=read%20csv

2)書き込み

data_frame.write.mode('書き込みモード').csv()
https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrameWriter

* mode : 書き込みモード(ファイルがあった場合の振る舞いを指定)
  + append : 追記するモード
  + overwrite : 上書きするモード
  + ignore : 無視するモード(何もしない?)
  + error or errorifexists : 例外を発生させるモード(デフォルト)

* それ以外のオプションについては、以下の関連記事を参照のこと

PySpark ~ CSV / Read/Writeのプロパティ ~
https://dk521123.hatenablog.com/entry/2020/07/30/195226

【2】補足

1)読み込み時でファイルがなかった場合の挙動

ファイルが存在しない場合(アクセス権がない場合も)、以下の例外が発生する
=====
pyspark.sql.utils.AnalysisException: 'Path does not exist: file:/C:/xxx/xxx/xxxx.csv;'
=====

サンプル

from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.utils import AnalysisException


spark_context = SparkContext()
spark = SparkSession(spark_context)

try:
  data_frame = spark.read.csv("dummy.csv", header=False)
  data_frame.show()
except AnalysisException as ex:
  print("AnalysisException = " + str(ex))
  raise ex
except Exception as ex:
  print("Exception = " + str(ex))
  raise ex

【3】サンプル

例1:CSVファイルの読み込み

from pyspark import SparkContext
from pyspark.sql import SparkSession

def main():
  spark_context = SparkContext()
  spark = SparkSession(spark_context)

  # ★ここが違う点★
  data_frame = spark.read.csv("hello.csv", header=True)
  data_frame.show()

if __name__ == '__main__':
  main()

入力ファイル:hello.csv

id,name,remarks
001,Mike,-
002,Tom,Hello
003,Smith,World
004,Nick,!!

出力結果

+---+-----+-------+
| id| name|remarks|
+---+-----+-------+
|001| Mike|      -|
|002|  Tom|  Hello|
|003|Smith|  World|
|004| Nick|     !!|
+---+-----+-------+

例2:CSVファイルの書き込み

from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType
from pyspark.sql.types import StructField
from pyspark.sql.types import StringType

spark_context = SparkContext()
spark = SparkSession(spark_context)

data_frame = spark.read.csv(
  "./hello_without_header.csv")
# StructFiled([項目名], [データ型], [Nullを許容するか(True:許容する)])
schema = StructType([
  StructField('id', StringType(), True),
  StructField('name', StringType(), True)
])
data_frame = spark.createDataFrame(
  data_frame.rdd, schema)
# 書き込み
data_frame.write.mode('overwrite').csv(
  "./out",
  header=True,
  quoteAll=True)

data_frame.show()

print("Done")

入力ファイル:hello_without_header.csv

1,Mike
2,Tom
3,Naomi

出力ファイル:out/part-xxxxx-xxxxx-xxxx-xxxx-xxx-xxxxxxxxxxx-xxxx.csv

"id","name"
"1","Mike"
"2","Tom"
"3","Naomi"

例3:distinct 重複値の削除

from pyspark import SparkContext
from pyspark.sql import SparkSession

def main():
  spark_context = SparkContext()
  spark = SparkSession(spark_context)

  data_frame = spark.read.csv("hello.csv", header=True)
  # 重複値の削除
  count_for_id = data_frame.select("id").distinct().count()
  print(count_for_id)

if __name__ == '__main__':
  main()

入力ファイル:hello.csv

id,name,remarks
001,Mike,-
002,Tom,Hello
003,Smith,World
001,Mike,!!
004,Nick,!!
004,Nick,!!
003,Smith,!!
003,Smith,!!
005,Ken,!!

出力結果

5

参考文献

https://fisproject.jp/2018/04/methods-of-the-pyspark-sql-dataframe-class/
https://qiita.com/taka4sato/items/4ab2cf9e941599f1c0ca
CSVの書き込み
https://qiita.com/paulxll/items/1c0833782cd4e1de86e2
今後役に立ちそうなサイト
https://www.atmarkit.co.jp/ait/articles/0902/27/news129.html
https://www.atmarkit.co.jp/ait/articles/0903/09/news094.html

関連記事

PySpark ~ 入門編 ~
https://dk521123.hatenablog.com/entry/2021/04/03/004254
PySpark ~ エスケープされた区切り文字が含んだデータを扱う ~
https://dk521123.hatenablog.com/entry/2020/07/09/000832
PySpark ~ CSV / Read/Writeのプロパティ ~
https://dk521123.hatenablog.com/entry/2020/07/30/195226
PySpark ~ CSV / White Spaceの扱い ~
https://dk521123.hatenablog.com/entry/2021/04/29/075903
PySpark ~ CSV / Null・空文字・異常値の扱い ~
https://dk521123.hatenablog.com/entry/2021/06/01/142457
PySpark ~ _corrupt_record ~
https://dk521123.hatenablog.com/entry/2022/02/14/153845
PySpark ~ CSV / escape ~
https://dk521123.hatenablog.com/entry/2020/11/23/224349
PySpark ~ CSV / MultiLine対応 ~
https://dk521123.hatenablog.com/entry/2022/02/04/181842
PySpark ~ 出力ファイル / 空ファイル対応, 1ファイルに纏める ~
https://dk521123.hatenablog.com/entry/2021/04/03/004254
PySpark ~ DataFrame / データ操作編 ~
https://dk521123.hatenablog.com/entry/2020/01/04/150942
PySpark ~ DataFrame / テーブル・項目操作編 ~
https://dk521123.hatenablog.com/entry/2020/05/18/154829
PySpark ~ パーティション
https://dk521123.hatenablog.com/entry/2021/05/13/110811
PySpark ~ パーティション単位で上書きするには ~
https://dk521123.hatenablog.com/entry/2021/07/07/093147
PySpark で 出力ファイル名を変更する
https://dk521123.hatenablog.com/entry/2021/05/12/003047
PySpark ~ 環境設定編 ~
https://dk521123.hatenablog.com/entry/2019/11/14/221126
PySpark ~ ユーザ定義関数 UDF 編 ~
https://dk521123.hatenablog.com/entry/2020/05/20/195621
Apache Spark ~ 環境設定 / Windows編 ~
https://dk521123.hatenablog.com/entry/2019/09/18/214814