【トラブル】【PySpark】PySpark でエラー「Exception: It appears ...」が表示された

■ 現象

https://dk521123.hatenablog.com/entry/2020/07/09/000832

の機能を、クラス内部に組み込んだ際(詳細は後述の「修正後」を参照)に
以下「■ エラーメッセージ」が表示された

■ エラーメッセージ

Exception: It appears that you are attempting
 to reference SparkContext from a broadcast variable, action, or transforamtion.
SparkContext can only be used on the driver, not in code that it run on workers.
For more information, see SPARK-5063.

■ 原因

以下のサイトに載っていた

https://github.com/maxpumperla/elephas/issues/47

class Foo:
    def __init__(self, spark_context: pyspark.SparkContext):
        self._spark_context = spark_context
        self.just_plain_string = 'foobar'

    def do_work(self):
        rdd = self._spark_context.range(0, 10)
        rdd.map(lambda i: self.just_plain_string + str(i)) # self.just_plain_string が原因
        return rdd.collect()

# So the problem here is that i using class variable in my lambda expression.
# Internally pyspark tries to pickle everything it gets,
# so it pickles the whole object Foo wich contains reference to spark context.

■ 解決案

PySparkでlambda を使用し、その内部でクラスのプロパティ・定数を参照する場合、
クラスのプロパティ・定数を一旦ローカル変数に置き換える
(以下の「修正後」の「★一時的にローカル変数として定義★」を参照)

修正前

class SampleDemo:
    DELIMITER = ","
    DELIMITER_WITH_ESCAPE = "\\" + DELIMITER
    REPLACED_CHAR = "@@@DELIMITER@@@"

    def __init__(self, spark_context: pyspark.SparkContext):
        self._spark_context = spark_context
        self.just_plain_string = 'foobar'

    def read_csv(self, input_file):
        input_file = spark_context.textFile(input_file)
        rdd = csv_file.map(
            lambda line: line.replace(
                    self.DELIMITER_WITH_ESCAPE, self.REPLACED_CHAR).split(self.DELIMITER))
        # 以下省略

修正後

class SampleDemo:
    DELIMITER = ","
    DELIMITER_WITH_ESCAPE = "\\" + DELIMITER
    REPLACED_CHAR = "@@@DELIMITER@@@"

    def __init__(self, spark_context: pyspark.SparkContext):
        self._spark_context = spark_context
        self.just_plain_string = 'foobar'

    def read_csv(self, input_file):
        input_file = spark_context.textFile(input_file)

        # ★一時的にローカル変数として定義★
        local_delimiter_with_escape = self.DELIMITER_WITH_ESCAPE
        local_replaced_char = self.REPLACED_CHAR
        local_delimiter = self.DELIMITER
        rdd = csv_file.map(
            lambda line: line.replace(
                    local_delimiter_with_escape , local_replaced_char).split(local_delimiter))
        # 以下省略

関連記事

PySpark ~ 環境設定 + Hello World編 ~
https://dk521123.hatenablog.com/entry/2019/11/14/221126
PySpark ~ エスケープされた区切り文字が含んだデータを扱う ~
https://dk521123.hatenablog.com/entry/2020/07/09/000832
PySpark ~ CSV / あれこれ編 ~
https://dk521123.hatenablog.com/entry/2020/07/30/195226
PySpark でエラー「Total size ... is bigger than spark.driver.maxResultSize」が発生する
https://dk521123.hatenablog.com/entry/2021/04/22/131849