■ はじめに
https://dk521123.hatenablog.com/entry/2019/11/14/221126
https://dk521123.hatenablog.com/entry/2021/04/03/004254
https://dk521123.hatenablog.com/entry/2021/04/04/111057
の続き。 RDD(Resilient Distributed Dataset)について、 ちょっとしたTipsを纏めておく。
目次
【1】ファイル・エンコードを指定する [補足] textFile() について 【2】ファイル行数数をカウントする 【3】RDD <=> DataFrame の相互変換 【4】不正な行を取り除く又はピックアップする場合 [補足] 不正と有効な行を効率よく分ける場合
【1】ファイル・エンコードを指定する
* use_unicode=False + .map(lambda x: x.decode(【指定エンコード】))で可能
サンプル
from pyspark import SparkContext from pyspark.sql import SparkSession spark_context = SparkContext() spark = SparkSession(spark_context) # ファイルをRDDとして取得 rdd = spark_context.textFile( "./test.txt", use_unicode=False) \ .map(lambda x: x.decode('utf-8'))
[補足] textFile() について
https://spark.apache.org/docs/2.1.0/api/python/pyspark.html#pyspark.SparkContext.textFile
textFile(name, minPartitions=None, use_unicode=True) * 「use_unicode=False」の場合 If use_unicode is False, the strings will be kept as str (encoding as utf-8), which is faster and smaller than unicode. (Added in Spark 1.2) => つまりは、文字列 は str (エンコーディング:utf-8) として扱う? => じゃー「use_unicode=True」の意味が気になるので、 実際のコードをのぞいてみたが、以下のコードにあるように stream.read()した際にそのまま返すかUTF-8でデコードするかの違い。
def textFile(self, name, minPartitions=None, use_unicode=True): minPartitions = minPartitions or min(self.defaultParallelism, 2) return RDD(self._jsc.textFile(name, minPartitions), self, UTF8Deserializer(use_unicode))
class UTF8Deserializer(Serializer): """ Deserializes streams written by String.getBytes. """ def __init__(self, use_unicode=True): self.use_unicode = use_unicode def loads(self, stream): length = read_int(stream) if length == SpecialLengths.END_OF_DATA_SECTION: raise EOFError elif length == SpecialLengths.NULL: return None s = stream.read(length) return s.decode("utf-8") if self.use_unicode else s
【2】ファイル行数数をカウントする
* count() でできる
サンプル
line_count = rdd.count() print(f'Line count of raws : {line_count}')
【3】RDD <=> DataFrame の相互変換
長くなったので、以下の関連記事に分冊。
https://dk521123.hatenablog.com/entry/2021/05/19/143043
【4】不正な行を取り除く又はピックアップする場合
* filter を使う
サンプル
# 例:『不正な行=囲み文字(”)があった場合、囲まれていない行』の場合 import re from pyspark import SparkContext from pyspark.sql import SparkSession spark_context = SparkContext() spark = SparkSession(spark_context) # ファイルをRDDとして取得 rdd = spark_context.textFile("./test.csv") # 不正行をピックアップする bad_raw_rdd = rdd.filter(lambda x: is_bad_row(x)) print(f'Count = {bad_raw_rdd.count()}') # Ref : https://www.geeksforgeeks.org/python-program-to-count-words-in-a-sentence/ def is_bad_row(row): print(len(re.findall('"', row))) # ここでは、囲み文字(”)が偶数でない場合を不正とする return len(re.findall('"', row)) % 2 != 0
[補足] 不正と有効な行を効率よく分ける場合
* map() を使う
サンプル
import re from pyspark import SparkContext from pyspark.sql import SparkSession from pyspark.sql.types import StringType spark_context = SparkContext() spark = SparkSession(spark_context) # ファイルをRDDとして取得 rdd = spark_context.textFile("./test.csv") # ここで、不正であったかどうかの行を追加する rdd_with_is_bad_row = rdd.map(lambda x: (x, is_bad_row(x))) df = rdd_with_is_bad_row.toDF(['row', 'is_bad_row']) # is_bad_rowで振り分けて、元に戻す bad_row_rdd = df.rdd.filter(lambda x: x['is_bad_row']) \ .map(lambda x: x['row']) good_row_rdd = df.rdd.filter(lambda x: not x['is_bad_row']) \ .map(lambda x: x['row']) # DataFrameにして、確認してみる bad_row_df = spark.createDataFrame(bad_row_rdd, StringType()) bad_row_df.show() good_row_df = spark.createDataFrame(good_row_rdd, StringType()) good_row_df.show() def is_bad_row(row): print(len(re.findall('"', row))) return len(re.findall('"', row)) % 2 != 0
関連記事
PySpark ~ 環境構築編 ~
https://dk521123.hatenablog.com/entry/2019/11/14/221126
PySpark ~ 入門編 ~
https://dk521123.hatenablog.com/entry/2021/04/03/004254
PySpark ~ RDD / 基本編 ~
https://dk521123.hatenablog.com/entry/2021/04/04/111057
PySpark ~ RDD <=> DataFrame の相互変換 ~
https://dk521123.hatenablog.com/entry/2021/05/19/143043
PySpark ~ DataFrame / データ操作編 ~
https://dk521123.hatenablog.com/entry/2020/01/04/150942
PySpark ~ DataFrame / テーブル・項目操作編 ~
https://dk521123.hatenablog.com/entry/2020/05/18/154829
PySpark ~ CSV / 基本編 ~
https://dk521123.hatenablog.com/entry/2019/11/24/225534
PySpark ~ CSV / あれこれ編 ~
https://dk521123.hatenablog.com/entry/2020/07/30/195226
PySpark ~ CSV / escape 編 ~
https://dk521123.hatenablog.com/entry/2020/11/23/224349
PySpark ~ エスケープされた区切り文字が含んだデータを扱う ~
https://dk521123.hatenablog.com/entry/2020/07/09/000832
PySpark ~ Parquet / 基本編 ~
https://dk521123.hatenablog.com/entry/2021/04/11/101305
PySpark ~ Hive ~
https://dk521123.hatenablog.com/entry/2020/05/14/222415
PySpark ~ ユーザ定義関数 UDF 編 ~
https://dk521123.hatenablog.com/entry/2020/05/20/195621
PySpark ~ データをクリーニングする ~
https://dk521123.hatenablog.com/entry/2020/07/08/162552
PySparkで入力ファイル名を取得するには
https://dk521123.hatenablog.com/entry/2021/04/12/145133
PySpark ~ PySpark経由でDBに接続する ~
https://dk521123.hatenablog.com/entry/2020/07/02/000000
Glue から Redshift/PostgreSQL に接続する ~ Python shell編 ~
https://dk521123.hatenablog.com/entry/2020/08/26/193237
Glue から Redshift/PostgreSQL に接続する ~ PySpark編 ~
https://dk521123.hatenablog.com/entry/2020/09/23/111741
PySpark でエラー「Total size ... is bigger than spark.driver.maxResultSize」が発生する
https://dk521123.hatenablog.com/entry/2021/04/22/131849