■ はじめに
https://dk521123.hatenablog.com/entry/2019/11/14/221126
https://dk521123.hatenablog.com/entry/2021/04/03/004254
の続き。 RDD(Resilient Distributed Dataset)をもう少し深堀する。
目次
【1】RDDの生成 1)parallelize 2)textFile 【2】アクション ~ Actions ~ 1)collect ※ 使用上の注意 2)first 3)count 4)saveAsTextFile 【3】変換 ~ Transformations ~ 1)filter 2)map 3)flatMap 4)reduce 【4】その他 1)isEmpty
【1】RDDの生成
1)parallelize
* リスト・タプルをRDDに変換する
サンプル
from pyspark import SparkContext from pyspark.sql import SparkSession spark_context = SparkContext() target_list = [1, 2, 3, 4, 5] rdd = spark_context.parallelize(target_list)
2)textFile
* ファイルを読み込む
サンプル
https://dk521123.hatenablog.com/entry/2021/04/03/004254
# より抜粋 # ファイルをRDDとして取得 rdd = spark_context.textFile("./test.csv")
【2】アクション ~ Actions ~
http://mogile.web.fc2.com/spark/rdd-programming-guide.html#actions
1)collect
* データセットの全要素を配列として返す
サンプル
print(rdd.collect())
※ 使用上の注意
* 大容量データを扱う際には、例外を発生する可能性があるので、 極力、使用を控えるように実装した方がいい。 => 詳細は、以下の関連記事を参照のこと
https://dk521123.hatenablog.com/entry/2021/04/22/131849
2)first
* 最初の行を返す
サンプル
https://dk521123.hatenablog.com/entry/2021/04/03/004254
# より抜粋 # 先頭行を取得 first_row = rdd.first() print(first_row)
3)count
* カウントする => ファイル行数数をカウントできる
サンプル
https://dk521123.hatenablog.com/entry/2021/04/03/004254
# より抜粋 # Count from RDD sam_count = rdd.filter(lambda row: 'Sam' in row).count() print(f'Count of Sam is {sam_count}')
4)saveAsTextFile
* Textファイルとして保存する
使用上の注意
saveAsTextFile(path) っとあるように、引数はパスであり、 rdd.saveAsTextFile('cannot_save_as_file.csv') のようにしても「cannot_save_as_file.csv」ってファイルができるのではなく 「cannot_save_as_file.csv」ってフォルダができて、その中にファイルが保存される
サンプル
rdd.saveAsTextFile('./rdds')
【3】変換 ~ Transformations ~
http://mogile.web.fc2.com/spark/rdd-programming-guide.html#transformations
1)filter
* フィルタ機能 => 条件が真のものだけ返す
サンプル
filtered_rdd = rdd.filter(lambda value: value >= 3) # [3, 4, 5] print(filtered_rdd.collect())
2)map
* RDDの各行に対して、実行結果を返す
サンプル
twice_rdd = rdd.map(lambda value: value * 2) # [2, 4, 6, 8, 10] print(twice_rdd.collect())
3)flatMap
* mapした結果をひとつのコレクション(flat)にする
サンプル
rdd = spark_context.parallelize(["Hello world!!", "Hi, Mike! What's up?"]) result_rdd = rdd.flatMap(lambda x: x.split()) # ['Hello', 'world!!', 'Hi,', 'Mike!', "What's", 'up?'] print(result_rdd.collect()) result2_rdd = rdd.map(lambda x: x.split()) # [['Hello', 'world!!'], ['Hi,', 'Mike!', "What's", 'up?']] print(result2_rdd.collect())
4)reduce
* 要素同士の演算を行う => 返却される結果は、RDDではない
サンプル
rdd = spark_context.parallelize([1, 2, 3, 4, 5]) result = rdd.reduce(lambda x, y: x + y) # 15 print(result)
【4】その他
1)isEmpty
https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.RDD.isEmpty.html
# True print(sc.parallelize([]).isEmpty()) # False print(sc.parallelize([1]).isEmpty())
参考文献
http://mogile.web.fc2.com/spark/rdd-programming-guide.html
https://qiita.com/sotetsuk/items/6e4e2953799078fd6027
https://qiita.com/miyamotok0105/items/bf3638607ef6cb95f01b
https://ex-ture.com/blog/2019/06/27/learn-databricks-spark-rdd-operations/
https://tech.jxpress.net/entry/pyspark-nyumon
https://dev.classmethod.jp/articles/apache-spark_rdd_investigation/
関連記事
PySpark ~ 環境構築編 ~
https://dk521123.hatenablog.com/entry/2019/11/14/221126
PySpark ~ 入門編 ~
https://dk521123.hatenablog.com/entry/2021/04/03/004254
PySpark ~ RDD / あれこれ編 ~
https://dk521123.hatenablog.com/entry/2021/04/06/001709
PySpark ~ DataFrame / データ操作編 ~
https://dk521123.hatenablog.com/entry/2020/01/04/150942
PySpark ~ DataFrame / テーブル・項目操作編 ~
https://dk521123.hatenablog.com/entry/2020/05/18/154829
PySpark ~ CSV / 基本編 ~
https://dk521123.hatenablog.com/entry/2019/11/24/225534
PySpark ~ CSV / あれこれ編 ~
https://dk521123.hatenablog.com/entry/2020/07/30/195226
PPySpark ~ CSV / escape 編 ~
https://dk521123.hatenablog.com/entry/2020/11/23/224349
PySpark ~ エスケープされた区切り文字が含んだデータを扱う ~
https://dk521123.hatenablog.com/entry/2020/07/09/000832
PySpark ~ Hive ~
https://dk521123.hatenablog.com/entry/2020/05/14/222415
PySpark ~ ユーザ定義関数 UDF 編 ~
https://dk521123.hatenablog.com/entry/2020/05/20/195621
PySpark ~ データをクリーニングする ~
https://dk521123.hatenablog.com/entry/2020/07/08/162552
PySparkで入力ファイル名を取得するには
https://dk521123.hatenablog.com/entry/2021/04/12/145133
PySpark でエラー「Total size ... is bigger than spark.driver.maxResultSize」が発生する
https://dk521123.hatenablog.com/entry/2021/04/22/131849