【分散処理】PySpark ~ 入門編 ~

■ はじめに

https://dk521123.hatenablog.com/entry/2019/11/14/221126

の続き。

PySpark を使って、データ処理をすることになったので、メモする。

目次

【1】関連用語
 1)RDD(Resilient Distributed Dataset)
 2)DataFrame
【2】サンプル
 例1:テキストデータの読み込み

【1】関連用語

1)RDD(Resilient Distributed Dataset)
2)DataFrame

1)RDD(Resilient Distributed Dataset)

* 大規模データセットを操作するオブジェクト
 => このRDDによって、データおよび処理を分散する(っと理解。。。)

* Resilient Distributed Dataset = 耐障害分散データセット

cf. Resilient(レジリエント)
 = 回復力のある
 =  able to become strong, happy, or successful again after a difficult situation or event 

2)DataFrame

以下の関連記事を参照のこと

PySpark ~ DataFrame / データ操作編 ~
https://dk521123.hatenablog.com/entry/2020/01/04/150942
PySpark ~ DataFrame / テーブル・項目操作編 ~
https://dk521123.hatenablog.com/entry/2020/05/18/154829

【2】サンプル

例1:テキストデータの読み込み

from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType
from pyspark.sql.types import StructField
from pyspark.sql.types import StringType
from pyspark.sql.types import IntegerType
from pyspark.sql.types import DoubleType
from pyspark.sql.types import TimestampType


def main():
  # SparkContextオブジェクトを生成
  #  => SparkContextで Sparkアプリケーション全体を通して
  #    状態を保持・管理する
  spark_context = SparkContext()
  # ファイルをRDDとして取得
  rdd = spark_context.textFile("./test.csv")

  # 先頭行を取得
  first_row = rdd.first()
  print('************')
  print(first_row)
  print('************')

  # スキーマを定義
  # http://mogile.web.fc2.com/spark/sql-ref-datatypes.html
  schema = StructType(
    [
      StructField('id', IntegerType(), False),
      StructField('name', StringType(), False),
      StructField('height', DoubleType(), True),
      StructField('datetime', TimestampType(), True),
    ]
  )
  
  spark = SparkSession(spark_context)
  data_frame = spark.read \
    .option("header", "False") \
    .format("csv") \
    .schema(schema) \
    .csv(rdd)
  data_frame.show()

  # Count from data frame
  mike_count = data_frame.filter(data_frame['name'] == "Mike").count()
  print('++++++++++')
  print(f'Count of Mike is {mike_count}')
  print('++++++++++')
  # Count from RDD
  sam_count = rdd.filter(lambda row: 'Sam' in row).count()
  print('++++++++++')
  print(f'Count of Sam is {sam_count}')
  print('++++++++++')

if __name__ == '__main__':
  main()

test.csv

100,Mike,179.6,2018-03-20 10:41:20
101,Sam,167.9,2018-03-03 11:32:34
102,Kevin,189.2,2018-01-28 20:20:11

出力結果

************
100,Mike,179.6,2018-03-20 10:41:20
************
+---+-----+------+-------------------+
| id| name|height|           datetime|
+---+-----+------+-------------------+
|100| Mike| 179.6|2018-03-20 10:41:20|
|101|  Sam| 167.9|2018-03-03 11:32:34|
|102|Kevin| 189.2|2018-01-28 20:20:11|
+---+-----+------+-------------------+

++++++++++
Count of Mike is 1
++++++++++
++++++++++
Count of Sam is 1
++++++++++

参考文献

https://tech.jxpress.net/entry/pyspark-nyumon
https://qiita.com/t-yotsu/items/f15057f131a1b1ec6864
https://qiita.com/taka4sato/items/4ab2cf9e941599f1c0ca

関連記事

PySpark ~ 環境構築編 ~
https://dk521123.hatenablog.com/entry/2019/11/14/221126
PySpark ~ RDD / 基本編 ~
https://dk521123.hatenablog.com/entry/2021/04/04/111057
PySpark ~ RDD / あれこれ編 ~
https://dk521123.hatenablog.com/entry/2021/04/06/001709
PySpark ~ RDD <=> DataFrame の相互変換 ~
https://dk521123.hatenablog.com/entry/2021/05/19/143043
PySpark ~ DataFrame / データ操作編 ~
https://dk521123.hatenablog.com/entry/2020/01/04/150942
PySpark ~ DataFrame / データ集計編 ~
https://dk521123.hatenablog.com/entry/2021/05/25/111051
PySpark ~ DataFrame / テーブル・項目操作編 ~
https://dk521123.hatenablog.com/entry/2020/05/18/154829
PySpark ~ DataFrame / 項目数を取得するには ~
https://dk521123.hatenablog.com/entry/2020/08/28/183706
PySpark ~ DataFrame / show() 編 ~
https://dk521123.hatenablog.com/entry/2021/04/26/161342
PySpark ~ DB・テーブル・項目取得編 ~
https://dk521123.hatenablog.com/entry/2021/05/24/144317
PySpark ~ CSV / 基本編 ~
https://dk521123.hatenablog.com/entry/2019/11/24/225534
PySpark ~ CSV / Read/Writeのプロパティ ~
https://dk521123.hatenablog.com/entry/2020/07/30/195226
PySpark ~ CSV / White Spaceの扱い ~
https://dk521123.hatenablog.com/entry/2021/04/29/075903
PySpark ~ CSV / Null・空文字・異常値の扱い ~
https://dk521123.hatenablog.com/entry/2021/06/01/142457
PySpark ~ _corrupt_record ~
https://dk521123.hatenablog.com/entry/2022/02/14/153845
PySpark ~ CSV / escape ~
https://dk521123.hatenablog.com/entry/2020/11/23/224349
PySpark ~ CSV / MultiLine対応 ~
https://dk521123.hatenablog.com/entry/2022/02/04/181842
PySpark ~ 出力ファイル / 空ファイル対応, 1ファイルに纏める ~
https://dk521123.hatenablog.com/entry/2021/04/03/004254
PySpark ~ エスケープされた区切り文字が含んだデータを扱う ~
https://dk521123.hatenablog.com/entry/2020/07/09/000832
PySpark ~ Parquet / 基本編 ~
https://dk521123.hatenablog.com/entry/2021/04/11/101305
PySpark ~ パーティション
https://dk521123.hatenablog.com/entry/2021/05/13/110811
PySpark ~ Hive ~
https://dk521123.hatenablog.com/entry/2020/05/14/222415
PySpark ~ ユーザ定義関数 UDF 編 ~
https://dk521123.hatenablog.com/entry/2020/05/20/195621
PySpark ~ UDFの各定義方法でのサンプル ~
https://dk521123.hatenablog.com/entry/2021/05/27/100132
PySpark ~ UDF の使用上の注意 ~
https://dk521123.hatenablog.com/entry/2021/05/20/095706
PySpark ~ データをクリーニングする ~
https://dk521123.hatenablog.com/entry/2020/07/08/162552
PySparkで入力ファイル名を取得するには
https://dk521123.hatenablog.com/entry/2021/04/12/145133
PySpark で 出力ファイル名を変更する
https://dk521123.hatenablog.com/entry/2021/05/12/003047
PySpark ~ PySpark経由でDBに接続する ~
https://dk521123.hatenablog.com/entry/2020/07/02/000000
PySpark でエラー「Exception: It appears ...」が表示された
https://dk521123.hatenablog.com/entry/2020/07/10/192404
PySpark でエラー「Total size ... is bigger than spark.driver.maxResultSize」が発生する
https://dk521123.hatenablog.com/entry/2021/04/22/131849
AWS Glue上で エラー「Dynamic partition strict mode requires ...」が発生する
https://dk521123.hatenablog.com/entry/2021/05/17/120443
Apache Spark ~ 環境設定 / Windows編 ~
https://dk521123.hatenablog.com/entry/2019/09/18/214814
Apache Spark ~ 入門編 ~
https://dk521123.hatenablog.com/entry/2019/09/14/123206
Visual Studio Code ~ 入門編 ~
https://dk521123.hatenablog.com/entry/2019/10/20/230323