【分散処理】PySpark ~ RDD / あれこれ編 ~

■ はじめに

https://dk521123.hatenablog.com/entry/2019/11/14/221126
https://dk521123.hatenablog.com/entry/2021/04/03/004254
https://dk521123.hatenablog.com/entry/2021/04/04/111057

の続き。
RDD(Resilient Distributed Dataset)について、
ちょっとしたTipsを纏めておく。

目次

【1】ファイル・エンコードを指定する
 [補足] textFile() について
【2】ファイル行数数をカウントする
【3】RDD <=> DataFrame の相互変換
【4】不正な行を取り除く又はピックアップする場合
 [補足] 不正と有効な行を効率よく分ける場合

【1】ファイル・エンコードを指定する

* use_unicode=False + .map(lambda x: x.decode(【指定エンコード】))で可能

サンプル

from pyspark import SparkContext
from pyspark.sql import SparkSession

spark_context = SparkContext()
spark = SparkSession(spark_context)

# ファイルをRDDとして取得
rdd = spark_context.textFile(
  "./test.txt", use_unicode=False) \
  .map(lambda x: x.decode('utf-8'))

[補足] textFile() について
https://spark.apache.org/docs/2.1.0/api/python/pyspark.html#pyspark.SparkContext.textFile

textFile(name, minPartitions=None, use_unicode=True)

* 「use_unicode=False」の場合
 If use_unicode is False,
the strings will be kept as str (encoding as utf-8),
which is faster and smaller than unicode. (Added in Spark 1.2)
=> つまりは、文字列 は str (エンコーディング:utf-8) として扱う?
=> じゃー「use_unicode=True」の意味が気になるので、
 実際のコードをのぞいてみたが、以下のコードにあるように
 stream.read()した際にそのまま返すかUTF-8でデコードするかの違い。

https://github.com/apache/spark/blob/21232377ba654babd0dc929b5278835beefcc6a1/python/pyspark/context.py#L626

def textFile(self, name, minPartitions=None, use_unicode=True):
  minPartitions = minPartitions or min(self.defaultParallelism, 2)
  return RDD(self._jsc.textFile(name, minPartitions), self,
    UTF8Deserializer(use_unicode))

https://github.com/apache/spark/blob/21232377ba654babd0dc929b5278835beefcc6a1/python/pyspark/serializers.py#L515

class UTF8Deserializer(Serializer):

    """
    Deserializes streams written by String.getBytes.
    """

    def __init__(self, use_unicode=True):
        self.use_unicode = use_unicode

    def loads(self, stream):
        length = read_int(stream)
        if length == SpecialLengths.END_OF_DATA_SECTION:
            raise EOFError
        elif length == SpecialLengths.NULL:
            return None
        s = stream.read(length)
        return s.decode("utf-8") if self.use_unicode else s

参考文献
https://stackoverflow.com/questions/39278774/pyspark-how-to-read-file-having-string-with-multiple-encoding

【2】ファイル行数数をカウントする

* count() でできる

サンプル

line_count = rdd.count()
print(f'Line count of raws : {line_count}')

【3】RDD <=> DataFrame の相互変換

長くなったので、以下の関連記事に分冊。

https://dk521123.hatenablog.com/entry/2021/05/19/143043

【4】不正な行を取り除く又はピックアップする場合

* filter を使う

サンプル

# 例:『不正な行=囲み文字(”)があった場合、囲まれていない行』の場合
import re

from pyspark import SparkContext
from pyspark.sql import SparkSession

spark_context = SparkContext()
spark = SparkSession(spark_context)

# ファイルをRDDとして取得
rdd = spark_context.textFile("./test.csv")
# 不正行をピックアップする
bad_raw_rdd = rdd.filter(lambda x: is_bad_row(x))
print(f'Count = {bad_raw_rdd.count()}')

# Ref : https://www.geeksforgeeks.org/python-program-to-count-words-in-a-sentence/
def is_bad_row(row):
  print(len(re.findall('"', row)))
  # ここでは、囲み文字(”)が偶数でない場合を不正とする
  return len(re.findall('"', row)) % 2 != 0

[補足] 不正と有効な行を効率よく分ける場合

* map() を使う

サンプル

import re

from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.types import StringType

spark_context = SparkContext()
spark = SparkSession(spark_context)

# ファイルをRDDとして取得
rdd = spark_context.textFile("./test.csv")

# ここで、不正であったかどうかの行を追加する
rdd_with_is_bad_row = rdd.map(lambda x: (x, is_bad_row(x)))
df = rdd_with_is_bad_row.toDF(['row', 'is_bad_row'])

# is_bad_rowで振り分けて、元に戻す
bad_row_rdd = df.rdd.filter(lambda x: x['is_bad_row']) \
  .map(lambda x: x['row'])
good_row_rdd = df.rdd.filter(lambda x: not x['is_bad_row']) \
  .map(lambda x: x['row'])

# DataFrameにして、確認してみる
bad_row_df = spark.createDataFrame(bad_row_rdd, StringType())
bad_row_df.show()
good_row_df = spark.createDataFrame(good_row_rdd, StringType())
good_row_df.show()

def is_bad_row(row):
  print(len(re.findall('"', row)))
  return len(re.findall('"', row)) % 2 != 0

関連記事

PySpark ~ 環境構築編 ~
https://dk521123.hatenablog.com/entry/2019/11/14/221126
PySpark ~ 入門編 ~
https://dk521123.hatenablog.com/entry/2021/04/03/004254
PySpark ~ RDD / 基本編 ~
https://dk521123.hatenablog.com/entry/2021/04/04/111057
PySpark ~ RDD <=> DataFrame の相互変換 ~
https://dk521123.hatenablog.com/entry/2021/05/19/143043
PySpark ~ DataFrame / データ操作編 ~
https://dk521123.hatenablog.com/entry/2020/01/04/150942
PySpark ~ DataFrame / テーブル・項目操作編 ~
https://dk521123.hatenablog.com/entry/2020/05/18/154829
PySpark ~ CSV / 基本編 ~
https://dk521123.hatenablog.com/entry/2019/11/24/225534
PySpark ~ CSV / あれこれ編 ~
https://dk521123.hatenablog.com/entry/2020/07/30/195226
PySpark ~ CSV / escape 編 ~
https://dk521123.hatenablog.com/entry/2020/11/23/224349
PySpark ~ エスケープされた区切り文字が含んだデータを扱う ~
https://dk521123.hatenablog.com/entry/2020/07/09/000832
PySpark ~ Parquet / 基本編 ~
https://dk521123.hatenablog.com/entry/2021/04/11/101305
PySpark ~ Hive ~
https://dk521123.hatenablog.com/entry/2020/05/14/222415
PySpark ~ ユーザ定義関数 UDF 編 ~
https://dk521123.hatenablog.com/entry/2020/05/20/195621
PySpark ~ データをクリーニングする ~
https://dk521123.hatenablog.com/entry/2020/07/08/162552
PySparkで入力ファイル名を取得するには
https://dk521123.hatenablog.com/entry/2021/04/12/145133
PySpark ~ PySpark経由でDBに接続する ~
https://dk521123.hatenablog.com/entry/2020/07/02/000000
Glue から Redshift/PostgreSQL に接続する ~ Python shell編 ~
https://dk521123.hatenablog.com/entry/2020/08/26/193237
Glue から Redshift/PostgreSQL に接続する ~ PySpark編 ~
https://dk521123.hatenablog.com/entry/2020/09/23/111741
PySpark でエラー「Total size ... is bigger than spark.driver.maxResultSize」が発生する
https://dk521123.hatenablog.com/entry/2021/04/22/131849