■ はじめに
https://dk521123.hatenablog.com/entry/2019/11/14/221126
https://dk521123.hatenablog.com/entry/2021/04/03/004254
の続き。 今回は、PySpark で CSV を扱う。
補足
エスケープされた区切り文字が含んだデータを扱う場合は 以下の関連記事を参照のこと。
PySpark ~ エスケープされた区切り文字が含んだデータを扱う ~
https://dk521123.hatenablog.com/entry/2020/07/09/000832
目次
【1】関連するAPI 1)読み込み 2)書き込み 【2】補足 1)読み込み時でファイルがなかった場合の挙動 【3】サンプル 例1:CSVファイルの読み込み 例2:CSVファイルの書き込み 例3:distinct 重複値の削除
【1】関連するAPI
1)読み込み
spark.read.csv()
https://spark.apache.org/docs/latest/api/python/pyspark.sql.html?highlight=read%20csv
2)書き込み
data_frame.write.mode('書き込みモード').csv()
https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrameWriter
* mode : 書き込みモード(ファイルがあった場合の振る舞いを指定) + append : 追記するモード + overwrite : 上書きするモード + ignore : 無視するモード(何もしない?) + error or errorifexists : 例外を発生させるモード(デフォルト) * それ以外のオプションについては、以下の関連記事を参照のこと
PySpark ~ CSV / Read/Writeのプロパティ ~
https://dk521123.hatenablog.com/entry/2020/07/30/195226
【2】補足
1)読み込み時でファイルがなかった場合の挙動
ファイルが存在しない場合(アクセス権がない場合も)、以下の例外が発生する ===== pyspark.sql.utils.AnalysisException: 'Path does not exist: file:/C:/xxx/xxx/xxxx.csv;' =====
サンプル
from pyspark import SparkContext from pyspark.sql import SparkSession from pyspark.sql.utils import AnalysisException spark_context = SparkContext() spark = SparkSession(spark_context) try: data_frame = spark.read.csv("dummy.csv", header=False) data_frame.show() except AnalysisException as ex: print("AnalysisException = " + str(ex)) raise ex except Exception as ex: print("Exception = " + str(ex)) raise ex
【3】サンプル
例1:CSVファイルの読み込み
from pyspark import SparkContext from pyspark.sql import SparkSession def main(): spark_context = SparkContext() spark = SparkSession(spark_context) # ★ここが違う点★ data_frame = spark.read.csv("hello.csv", header=True) data_frame.show() if __name__ == '__main__': main()
入力ファイル:hello.csv
id,name,remarks 001,Mike,- 002,Tom,Hello 003,Smith,World 004,Nick,!!
出力結果
+---+-----+-------+ | id| name|remarks| +---+-----+-------+ |001| Mike| -| |002| Tom| Hello| |003|Smith| World| |004| Nick| !!| +---+-----+-------+
例2:CSVファイルの書き込み
from pyspark import SparkContext from pyspark.sql import SparkSession from pyspark.sql.types import StructType from pyspark.sql.types import StructField from pyspark.sql.types import StringType spark_context = SparkContext() spark = SparkSession(spark_context) data_frame = spark.read.csv( "./hello_without_header.csv") # StructFiled([項目名], [データ型], [Nullを許容するか(True:許容する)]) schema = StructType([ StructField('id', StringType(), True), StructField('name', StringType(), True) ]) data_frame = spark.createDataFrame( data_frame.rdd, schema) # 書き込み data_frame.write.mode('overwrite').csv( "./out", header=True, quoteAll=True) data_frame.show() print("Done")
入力ファイル:hello_without_header.csv
1,Mike 2,Tom 3,Naomi
出力ファイル:out/part-xxxxx-xxxxx-xxxx-xxxx-xxx-xxxxxxxxxxx-xxxx.csv
"id","name" "1","Mike" "2","Tom" "3","Naomi"
例3:distinct 重複値の削除
from pyspark import SparkContext from pyspark.sql import SparkSession def main(): spark_context = SparkContext() spark = SparkSession(spark_context) data_frame = spark.read.csv("hello.csv", header=True) # 重複値の削除 count_for_id = data_frame.select("id").distinct().count() print(count_for_id) if __name__ == '__main__': main()
入力ファイル:hello.csv
id,name,remarks 001,Mike,- 002,Tom,Hello 003,Smith,World 001,Mike,!! 004,Nick,!! 004,Nick,!! 003,Smith,!! 003,Smith,!! 005,Ken,!!
出力結果
5
参考文献
https://fisproject.jp/2018/04/methods-of-the-pyspark-sql-dataframe-class/
https://qiita.com/taka4sato/items/4ab2cf9e941599f1c0ca
CSVの書き込み
https://qiita.com/paulxll/items/1c0833782cd4e1de86e2
今後役に立ちそうなサイト
https://www.atmarkit.co.jp/ait/articles/0902/27/news129.html
https://www.atmarkit.co.jp/ait/articles/0903/09/news094.html
関連記事
PySpark ~ 入門編 ~
https://dk521123.hatenablog.com/entry/2021/04/03/004254
PySpark ~ エスケープされた区切り文字が含んだデータを扱う ~
https://dk521123.hatenablog.com/entry/2020/07/09/000832
PySpark ~ CSV / Read/Writeのプロパティ ~
https://dk521123.hatenablog.com/entry/2020/07/30/195226
PySpark ~ CSV / White Spaceの扱い ~
https://dk521123.hatenablog.com/entry/2021/04/29/075903
PySpark ~ CSV / Null・空文字・異常値の扱い ~
https://dk521123.hatenablog.com/entry/2021/06/01/142457
PySpark ~ _corrupt_record ~
https://dk521123.hatenablog.com/entry/2022/02/14/153845
PySpark ~ CSV / escape ~
https://dk521123.hatenablog.com/entry/2020/11/23/224349
PySpark ~ CSV / MultiLine対応 ~
https://dk521123.hatenablog.com/entry/2022/02/04/181842
PySpark ~ 出力ファイル / 空ファイル対応, 1ファイルに纏める ~
https://dk521123.hatenablog.com/entry/2021/04/03/004254
PySpark ~ DataFrame / データ操作編 ~
https://dk521123.hatenablog.com/entry/2020/01/04/150942
PySpark ~ DataFrame / テーブル・項目操作編 ~
https://dk521123.hatenablog.com/entry/2020/05/18/154829
PySpark ~ パーティション ~
https://dk521123.hatenablog.com/entry/2021/05/13/110811
PySpark ~ パーティション単位で上書きするには ~
https://dk521123.hatenablog.com/entry/2021/07/07/093147
PySpark で 出力ファイル名を変更する
https://dk521123.hatenablog.com/entry/2021/05/12/003047
PySpark ~ 環境設定編 ~
https://dk521123.hatenablog.com/entry/2019/11/14/221126
PySpark ~ ユーザ定義関数 UDF 編 ~
https://dk521123.hatenablog.com/entry/2020/05/20/195621
Apache Spark ~ 環境設定 / Windows編 ~
https://dk521123.hatenablog.com/entry/2019/09/18/214814