■ 現象
https://dk521123.hatenablog.com/entry/2020/07/09/000832
の機能を、クラス内部に組み込んだ際(詳細は後述の「修正後」を参照)に 以下「■ エラーメッセージ」が表示された
■ エラーメッセージ
Exception: It appears that you are attempting to reference SparkContext from a broadcast variable, action, or transforamtion. SparkContext can only be used on the driver, not in code that it run on workers. For more information, see SPARK-5063.
■ 原因
以下のサイトに載っていた
https://github.com/maxpumperla/elephas/issues/47
class Foo: def __init__(self, spark_context: pyspark.SparkContext): self._spark_context = spark_context self.just_plain_string = 'foobar' def do_work(self): rdd = self._spark_context.range(0, 10) rdd.map(lambda i: self.just_plain_string + str(i)) # self.just_plain_string が原因 return rdd.collect() # So the problem here is that i using class variable in my lambda expression. # Internally pyspark tries to pickle everything it gets, # so it pickles the whole object Foo wich contains reference to spark context.
■ 解決案
PySparkでlambda を使用し、その内部でクラスのプロパティ・定数を参照する場合、 クラスのプロパティ・定数を一旦ローカル変数に置き換える (以下の「修正後」の「★一時的にローカル変数として定義★」を参照)
修正前
class SampleDemo: DELIMITER = "," DELIMITER_WITH_ESCAPE = "\\" + DELIMITER REPLACED_CHAR = "@@@DELIMITER@@@" def __init__(self, spark_context: pyspark.SparkContext): self._spark_context = spark_context self.just_plain_string = 'foobar' def read_csv(self, input_file): input_file = spark_context.textFile(input_file) rdd = csv_file.map( lambda line: line.replace( self.DELIMITER_WITH_ESCAPE, self.REPLACED_CHAR).split(self.DELIMITER)) # 以下省略
修正後
class SampleDemo: DELIMITER = "," DELIMITER_WITH_ESCAPE = "\\" + DELIMITER REPLACED_CHAR = "@@@DELIMITER@@@" def __init__(self, spark_context: pyspark.SparkContext): self._spark_context = spark_context self.just_plain_string = 'foobar' def read_csv(self, input_file): input_file = spark_context.textFile(input_file) # ★一時的にローカル変数として定義★ local_delimiter_with_escape = self.DELIMITER_WITH_ESCAPE local_replaced_char = self.REPLACED_CHAR local_delimiter = self.DELIMITER rdd = csv_file.map( lambda line: line.replace( local_delimiter_with_escape , local_replaced_char).split(local_delimiter)) # 以下省略
関連記事
PySpark ~ 環境設定 + Hello World編 ~
https://dk521123.hatenablog.com/entry/2019/11/14/221126
PySpark ~ エスケープされた区切り文字が含んだデータを扱う ~
https://dk521123.hatenablog.com/entry/2020/07/09/000832
PySpark ~ CSV / あれこれ編 ~
https://dk521123.hatenablog.com/entry/2020/07/30/195226
PySpark でエラー「Total size ... is bigger than spark.driver.maxResultSize」が発生する
https://dk521123.hatenablog.com/entry/2021/04/22/131849