■ はじめに
AWS Glue(Spark) から Snowflake へ接続する必要がでてきたので 方法について、調べてみた
目次
【0】単純にSQLを実行する 【1】Sparkコネクタ 【2】全体構成 【3】転送モード 1)内部転送 2)外部転送 【4】Snowflakeコネクタの設定 1)サポートしているバージョン 2)リンク先 3)Snowflakeコネクタのバージョン 【5】サンプル
【0】単純にSQLを実行する
* 単純にSQLを実行するだけなら、Pythonコネクタなどで実行すればいい。
https://dev.classmethod.jp/articles/how-to-install-snowflake-python-connector/
https://dev.classmethod.jp/articles/exec-query-to-snowflake-with-sql-file-via-python/
* JDBCドライバも用意されている。 => 以下の関連記事も参照
Scala ~ JDBC / DB接続 ~
https://dk521123.hatenablog.com/entry/2023/03/26/000950
【1】Sparkコネクタ
* Spark用のSnowflakeコネクターが用意されている
https://docs.snowflake.com/ja/user-guide/spark-connector-overview
【2】全体構成
* 以下の公式ドキュメントの構成図が分かりやすい => Spark の DataFrame -> JDBC -> Snowflakeコネクタ -> Snowflake でアクセスしている
メモ:JDBCとの違い
* 上記の図を見ると、JDBCドライバから使って、直接アクセスした方が早いのでは っと思ってしまったのだが、公式ドキュメントに以下の記述あり
より抜粋 (文中の「ただし...」以降に注目) ~~~~~~~~~ 注釈 SnowflakeとApache Sparkを接続するために、 Spark用のSnowflakeコネクターは厳密には必要ありません。 他のサードパーティの JDBC ドライバーを使用できます。 ---★ここから注目 ただし、Snowflake JDBC ドライバーと組み合わせたコネクターは 2つのシステム間で大量のデータを転送するために最適化されているため、 Spark用のSnowflakeコネクターを使用することをお勧めします。 また、SparkからSnowflakeへのクエリプッシュダウンをサポートすることにより、 パフォーマンスが向上します。 ~~~~~~~~~
【3】転送モード
https://docs.snowflake.com/ja/user-guide/spark-connector-overview#data-transfer
* 以下の2つの転送モードがある 1)内部転送 << 基本、こっちを使う方がいいらしい 2)外部転送
使用上の注意
* 以下の場合、「2)外部転送」になってしまう [1]古いSparkコネクター(バージョン2.1.x以下) [2] 転送に36時間以上かかる可能性がある場合
1)内部転送
* Snowflakeによって内部/透過的に作成および管理される一時的な場所を使用 => 動作は、以下の公式ドキュメントの通り。
https://docs.snowflake.com/ja/user-guide/spark-connector-overview#internal-data-transfer
より抜粋 ~~~~~~ [1] Snowflakeに接続し、Snowflakeでセッションを初期化すると、 コネクターは内部ステージを作成します。 [2] Snowflakeセッションの期間中、コネクターはステージを使用して データを格納し、そのデータを宛先に転送します。 <= AWS の場合、S3に一時的にデータを格納し、 [3] Snowflakeセッションの最後に、コネクターはステージをドロップし、 それによりステージ内のすべての一時データを削除します。 ~~~~~~
メモ:気になった点
*『セッション期間中』とあるので、Glue からだと Glue のTimeoutを調整しておかないと短かったら 切られてしまう可能性がありそう * 利用すると奇麗に書けそうだが、 転送期間中もGlueは動作するので、 GlueコストとSnowflakeコスト同時払うことになるので この程度なら、「GlueでS3上に保存し、コネクタからSQLを Snowflakeへ向かって実行(SQL文の最後にREMOVE文でファイル削除)」 でいいんじゃないかなっと少し思った。
2)外部転送
https://docs.snowflake.com/ja/user-guide/spark-connector-overview#external-data-transfer
* ユーザーが作成および管理する通常は一時的な保管場所を使用 => AWSの場合、転送データファイルが作成され、S3バケットに格納
【4】Snowflakeコネクタの設定
https://docs.snowflake.com/ja/user-guide/spark-connector-install
1)サポートしているバージョン
https://docs.snowflake.com/ja/user-guide/spark-connector-install#supported-versions
項目 | バージョン | メモ |
---|---|---|
コネクターバージョン | 2.x | |
対応するSparkバージョン | Spark 3.3、3.2、3.1 | Glue4.0(Apache Spark 3.3.0)/3.0(Apache Spark 3.1.1)ともに問題ない |
対応するScalaバージョン | Scala 2.13、2.12 | Glue4.0(Scala2.12?13?)/3.0(Scala2.12)ともに問題ない |
データソース名 | net.snowflake.spark.snowflake --- v2.4.14 | |
パッケージ名(インポートされたクラスの場合) | net.snowflake.spark.snowflake |
2)リンク先
パッケージ配布
https://central.sonatype.com/artifact/net.snowflake/spark-snowflake_2.13/2.11.1-spark_3.3
ソースコード
https://github.com/snowflakedb/spark-snowflake
https://github.com/snowflakedb/spark-snowflake/releases
3)Snowflakeコネクタのバージョン
https://docs.snowflake.com/ja/user-guide/spark-connector-install#label-spark-connector-install-maven
パッケージの命名規則
N.N.N-spark_P.P N.N.N は、Snowflakeバージョン(例: 2.11.0) P.P は、Sparkバージョン(例: 3.3) => 2.11.0-spark_3.3
【5】サンプル
https://docs.snowflake.com/ja/user-guide/spark-connector-use#sample-scala-program
import org.apache.spark.sql._ //import org.apache.spark.sql.SaveMode // ★ Snowflakeへのアクセス情報 ★ // // Configure your Snowflake environment // var sfOptions = Map( "sfURL" -> "<account_identifier>.snowflakecomputing.com", "sfUser" -> "<user_name>", "sfPassword" -> "<password>", "sfDatabase" -> "<database>", "sfSchema" -> "<schema>", "sfWarehouse" -> "<warehouse>" )
Read
// // Create a DataFrame from a Snowflake table // val df: DataFrame = sqlContext.read .format(SNOWFLAKE_SOURCE_NAME) .options(sfOptions) .option("dbtable", "t1") .load() // // DataFrames can also be populated via a SQL query // val df: DataFrame = sqlContext.read .format(SNOWFLAKE_SOURCE_NAME) .options(sfOptions) .option("query", "select c1, count(*) from t1 group by c1") .load()
Write
// // Join, augment, aggregate, etc. the data in Spark and then use the // Data Source API to write the data back to a table in Snowflake // df.write .format(SNOWFLAKE_SOURCE_NAME) .options(sfOptions) .option("dbtable", "t2") .mode(SaveMode.Overwrite) .save()
関連記事
AWS Glue ~ Scalaでの実装 ~
https://dk521123.hatenablog.com/entry/2023/03/17/000000
Snowflake ~ 基礎知識編 ~
https://dk521123.hatenablog.com/entry/2021/11/02/130111
Snowflake ~ 入門編 / Hello world ~
https://dk521123.hatenablog.com/entry/2021/11/22/212520
Scala ~ JDBC / DB接続 ~
https://dk521123.hatenablog.com/entry/2023/03/26/000950