【Snowflake】SparkからSnowflakeへの接続について考える

■ はじめに

AWS Glue(Spark) から Snowflake へ接続する必要がでてきたので
方法について、調べてみた

目次

【0】単純にSQLを実行する
【1】Sparkコネクタ
【2】全体構成
【3】転送モード
 1)内部転送
 2)外部転送
【4】Snowflakeコネクタの設定
 1)サポートしているバージョン
 2)リンク先
 3)Snowflakeコネクタのバージョン
【5】サンプル

【0】単純にSQLを実行する

* 単純にSQLを実行するだけなら、Pythonコネクタなどで実行すればいい。

https://dev.classmethod.jp/articles/how-to-install-snowflake-python-connector/
https://dev.classmethod.jp/articles/exec-query-to-snowflake-with-sql-file-via-python/

* JDBCドライバも用意されている。
 => 以下の関連記事も参照

ScalaJDBC / DB接続 ~
https://dk521123.hatenablog.com/entry/2023/03/26/000950

【1】Sparkコネクタ

* Spark用のSnowflakeコネクターが用意されている

https://docs.snowflake.com/ja/user-guide/spark-connector-overview

【2】全体構成

* 以下の公式ドキュメントの構成図が分かりやすい
 => Spark の DataFrame -> JDBC -> Snowflakeコネクタ -> Snowflake
  でアクセスしている

https://docs.snowflake.com/ja/user-guide/spark-connector-overview#interaction-between-snowflake-and-spark

メモ:JDBCとの違い

* 上記の図を見ると、JDBCドライバから使って、直接アクセスした方が早いのでは
 っと思ってしまったのだが、公式ドキュメントに以下の記述あり

https://docs.snowflake.com/ja/user-guide/spark-connector-overview#interaction-between-snowflake-and-spark

より抜粋 (文中の「ただし...」以降に注目)
~~~~~~~~~
注釈
SnowflakeとApache Sparkを接続するために、
Spark用のSnowflakeコネクターは厳密には必要ありません。
他のサードパーティの JDBC ドライバーを使用できます。
---★ここから注目
ただし、Snowflake JDBC ドライバーと組み合わせたコネクターは
2つのシステム間で大量のデータを転送するために最適化されているため、
Spark用のSnowflakeコネクターを使用することをお勧めします。
また、SparkからSnowflakeへのクエリプッシュダウンをサポートすることにより、
パフォーマンスが向上します。
~~~~~~~~~

【3】転送モード

https://docs.snowflake.com/ja/user-guide/spark-connector-overview#data-transfer

* 以下の2つの転送モードがある
1)内部転送 << 基本、こっちを使う方がいいらしい
2)外部転送

使用上の注意

* 以下の場合、「2)外部転送」になってしまう

[1]古いSparkコネクター(バージョン2.1.x以下)
[2] 転送に36時間以上かかる可能性がある場合

1)内部転送

* Snowflakeによって内部/透過的に作成および管理される一時的な場所を使用
 => 動作は、以下の公式ドキュメントの通り。

https://docs.snowflake.com/ja/user-guide/spark-connector-overview#internal-data-transfer

より抜粋
~~~~~~
[1] Snowflakeに接続し、Snowflakeでセッションを初期化すると、
 コネクターは内部ステージを作成します。
[2] Snowflakeセッションの期間中、コネクターはステージを使用して
 データを格納し、そのデータを宛先に転送します。
 <= AWS の場合、S3に一時的にデータを格納し、
[3] Snowflakeセッションの最後に、コネクターはステージをドロップし、
 それによりステージ内のすべての一時データを削除します。
~~~~~~

メモ:気になった点

*『セッション期間中』とあるので、Glue からだと
 Glue のTimeoutを調整しておかないと短かったら
 切られてしまう可能性がありそう
* 利用すると奇麗に書けそうだが、
 転送期間中もGlueは動作するので、
 GlueコストとSnowflakeコスト同時払うことになるので
 この程度なら、「GlueでS3上に保存し、コネクタからSQLを
 Snowflakeへ向かって実行(SQL文の最後にREMOVE文でファイル削除)」
 でいいんじゃないかなっと少し思った。

2)外部転送

https://docs.snowflake.com/ja/user-guide/spark-connector-overview#external-data-transfer

* ユーザーが作成および管理する通常は一時的な保管場所を使用
 => AWSの場合、転送データファイルが作成され、S3バケットに格納

【4】Snowflakeコネクタの設定

https://docs.snowflake.com/ja/user-guide/spark-connector-install

1)サポートしているバージョン

https://docs.snowflake.com/ja/user-guide/spark-connector-install#supported-versions

項目 バージョン メモ
ネクターバージョン 2.x
対応するSparkバージョン Spark 3.3、3.2、3.1 Glue4.0(Apache Spark 3.3.0)/3.0(Apache Spark 3.1.1)ともに問題ない
対応するScalaバージョン Scala 2.13、2.12 Glue4.0(Scala2.12?13?)/3.0(Scala2.12)ともに問題ない
データソース名 net.snowflake.spark.snowflake --- v2.4.14
パッケージ名(インポートされたクラスの場合) net.snowflake.spark.snowflake

2)リンク先

パッケージ配布
https://central.sonatype.com/artifact/net.snowflake/spark-snowflake_2.13/2.11.1-spark_3.3

ソースコード
https://github.com/snowflakedb/spark-snowflake
https://github.com/snowflakedb/spark-snowflake/releases

3)Snowflakeコネクタのバージョン

https://docs.snowflake.com/ja/user-guide/spark-connector-install#label-spark-connector-install-maven

パッケージの命名規則

N.N.N-spark_P.P

N.N.N は、Snowflakeバージョン(例: 2.11.0)
P.P は、Sparkバージョン(例: 3.3)
 => 2.11.0-spark_3.3

【5】サンプル

https://docs.snowflake.com/ja/user-guide/spark-connector-use#sample-scala-program

import org.apache.spark.sql._
//import org.apache.spark.sql.SaveMode

// ★ Snowflakeへのアクセス情報 ★

//
// Configure your Snowflake environment
//
var sfOptions = Map(
    "sfURL" -> "<account_identifier>.snowflakecomputing.com",
    "sfUser" -> "<user_name>",
    "sfPassword" -> "<password>",
    "sfDatabase" -> "<database>",
    "sfSchema" -> "<schema>",
    "sfWarehouse" -> "<warehouse>"
)

Read

//
// Create a DataFrame from a Snowflake table
//
val df: DataFrame = sqlContext.read
    .format(SNOWFLAKE_SOURCE_NAME)
    .options(sfOptions)
    .option("dbtable", "t1")
    .load()

//
// DataFrames can also be populated via a SQL query
//
val df: DataFrame = sqlContext.read
    .format(SNOWFLAKE_SOURCE_NAME)
    .options(sfOptions)
    .option("query", "select c1, count(*) from t1 group by c1")
    .load()

Write

//
// Join, augment, aggregate, etc. the data in Spark and then use the
// Data Source API to write the data back to a table in Snowflake
//
df.write
    .format(SNOWFLAKE_SOURCE_NAME)
    .options(sfOptions)
    .option("dbtable", "t2")
    .mode(SaveMode.Overwrite)
    .save()

関連記事

AWS Glue ~ Scalaでの実装 ~
https://dk521123.hatenablog.com/entry/2023/03/17/000000
Snowflake ~ 基礎知識編 ~
https://dk521123.hatenablog.com/entry/2021/11/02/130111
Snowflake ~ 入門編 / Hello world
https://dk521123.hatenablog.com/entry/2021/11/22/212520
ScalaJDBC / DB接続 ~
https://dk521123.hatenablog.com/entry/2023/03/26/000950