【分散処理】PySpark ~ RDD / 基本編 ~

■ はじめに

https://dk521123.hatenablog.com/entry/2019/11/14/221126
https://dk521123.hatenablog.com/entry/2021/04/03/004254

の続き。

RDD(Resilient Distributed Dataset)をもう少し深堀する。

目次

【1】RDDの生成
 1)parallelize
 2)textFile
【2】アクション ~ Actions ~
 1)collect
  ※ 使用上の注意
 2)first
 3)count
 4)saveAsTextFile
【3】変換 ~ Transformations ~
 1)filter
 2)map
 3)flatMap
 4)reduce
【4】その他
 1)isEmpty

【1】RDDの生成

1)parallelize

* リスト・タプルをRDDに変換する

サンプル

from pyspark import SparkContext
from pyspark.sql import SparkSession

spark_context = SparkContext()

target_list = [1, 2, 3, 4, 5]
rdd = spark_context.parallelize(target_list)

2)textFile

* ファイルを読み込む

サンプル
https://dk521123.hatenablog.com/entry/2021/04/03/004254

# より抜粋
# ファイルをRDDとして取得
rdd = spark_context.textFile("./test.csv")

【2】アクション ~ Actions ~

http://mogile.web.fc2.com/spark/rdd-programming-guide.html#actions

1)collect

* データセットの全要素を配列として返す

サンプル

print(rdd.collect())

※ 使用上の注意

* 大容量データを扱う際には、例外を発生する可能性があるので、
 極力、使用を控えるように実装した方がいい。
 => 詳細は、以下の関連記事を参照のこと

https://dk521123.hatenablog.com/entry/2021/04/22/131849

2)first

* 最初の行を返す

サンプル
https://dk521123.hatenablog.com/entry/2021/04/03/004254

# より抜粋

# 先頭行を取得
first_row = rdd.first()
print(first_row)

3)count

* カウントする
 => ファイル行数数をカウントできる

サンプル
https://dk521123.hatenablog.com/entry/2021/04/03/004254

# より抜粋

# Count from RDD
sam_count = rdd.filter(lambda row: 'Sam' in row).count()
print(f'Count of Sam is {sam_count}')

4)saveAsTextFile

* Textファイルとして保存する

使用上の注意

saveAsTextFile(path)
っとあるように、引数はパスであり、
rdd.saveAsTextFile('cannot_save_as_file.csv')
のようにしても「cannot_save_as_file.csv」ってファイルができるのではなく
「cannot_save_as_file.csv」ってフォルダができて、その中にファイルが保存される

サンプル

rdd.saveAsTextFile('./rdds')

【3】変換 ~ Transformations ~

http://mogile.web.fc2.com/spark/rdd-programming-guide.html#transformations

1)filter

* フィルタ機能
 => 条件が真のものだけ返す

サンプル

filtered_rdd = rdd.filter(lambda value: value >= 3)
# [3, 4, 5]
print(filtered_rdd.collect())

2)map

* RDDの各行に対して、実行結果を返す

サンプル

twice_rdd = rdd.map(lambda value: value * 2)
# [2, 4, 6, 8, 10]
print(twice_rdd.collect())

3)flatMap

* mapした結果をひとつのコレクション(flat)にする

サンプル

rdd = spark_context.parallelize(["Hello world!!", "Hi, Mike! What's up?"])
result_rdd = rdd.flatMap(lambda x: x.split())
# ['Hello', 'world!!', 'Hi,', 'Mike!', "What's", 'up?']
print(result_rdd.collect())

result2_rdd = rdd.map(lambda x: x.split())
# [['Hello', 'world!!'], ['Hi,', 'Mike!', "What's", 'up?']]
print(result2_rdd.collect())

4)reduce

* 要素同士の演算を行う
 => 返却される結果は、RDDではない

サンプル

rdd = spark_context.parallelize([1, 2, 3, 4, 5])
result = rdd.reduce(lambda x, y: x + y)
# 15
print(result)

【4】その他

1)isEmpty

https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.RDD.isEmpty.html

# True
print(sc.parallelize([]).isEmpty())
# False
print(sc.parallelize([1]).isEmpty())

参考文献

http://mogile.web.fc2.com/spark/rdd-programming-guide.html
https://qiita.com/sotetsuk/items/6e4e2953799078fd6027
https://qiita.com/miyamotok0105/items/bf3638607ef6cb95f01b
https://ex-ture.com/blog/2019/06/27/learn-databricks-spark-rdd-operations/
https://tech.jxpress.net/entry/pyspark-nyumon
https://dev.classmethod.jp/articles/apache-spark_rdd_investigation/

関連記事

PySpark ~ 環境構築編 ~
https://dk521123.hatenablog.com/entry/2019/11/14/221126
PySpark ~ 入門編 ~
https://dk521123.hatenablog.com/entry/2021/04/03/004254
PySpark ~ RDD / あれこれ編 ~
https://dk521123.hatenablog.com/entry/2021/04/06/001709
PySpark ~ DataFrame / データ操作編 ~
https://dk521123.hatenablog.com/entry/2020/01/04/150942
PySpark ~ DataFrame / テーブル・項目操作編 ~
https://dk521123.hatenablog.com/entry/2020/05/18/154829
PySpark ~ CSV / 基本編 ~
https://dk521123.hatenablog.com/entry/2019/11/24/225534
PySpark ~ CSV / あれこれ編 ~
https://dk521123.hatenablog.com/entry/2020/07/30/195226
PPySpark ~ CSV / escape 編 ~
https://dk521123.hatenablog.com/entry/2020/11/23/224349
PySpark ~ エスケープされた区切り文字が含んだデータを扱う ~
https://dk521123.hatenablog.com/entry/2020/07/09/000832
PySpark ~ Hive ~
https://dk521123.hatenablog.com/entry/2020/05/14/222415
PySpark ~ ユーザ定義関数 UDF 編 ~
https://dk521123.hatenablog.com/entry/2020/05/20/195621
PySpark ~ データをクリーニングする ~
https://dk521123.hatenablog.com/entry/2020/07/08/162552
PySparkで入力ファイル名を取得するには
https://dk521123.hatenablog.com/entry/2021/04/12/145133
PySpark でエラー「Total size ... is bigger than spark.driver.maxResultSize」が発生する
https://dk521123.hatenablog.com/entry/2021/04/22/131849