■ はじめに
https://dk521123.hatenablog.com/entry/2019/11/14/221126
https://dk521123.hatenablog.com/entry/2019/11/24/225534
https://dk521123.hatenablog.com/entry/2020/01/04/150942
の続き。 今回は、Python から Hive / HiveQL を使えるやり方を 調べたので纏める
■ サンプル
例1
from os.path import expanduser, join, abspath from pyspark.sql import SparkSession from pyspark.sql import Row def main(): # warehouse_location points to the default location for managed databases and tables warehouse_location = abspath('spark-warehouse') spark = SparkSession.builder \ .appName("PySpark Demo") \ .config("spark.sql.warehouse.dir", warehouse_location) \ .enableHiveSupport() \ .getOrCreate() # spark is an existing SparkSession spark.sql("DROP TABLE IF EXISTS hello") spark.sql("CREATE TABLE hello (key INT, value STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LINES TERMINATED BY '\n'") # Linux # spark.sql("LOAD DATA LOCAL INPATH '/home/user/hello.csv' INTO TABLE hello") spark.sql("LOAD DATA LOCAL INPATH 'C:/work/hello/hello.csv' INTO TABLE hello") # Queries are expressed in HiveQL df = spark.sql("SELECT * FROM hello") df.show() if __name__ == '__main__': main()
hello.csv
1,Mike 2,Tom 3,Naomi
出力結果
+---+-----+ |key|value| +---+-----+ | 1| Mike| | 2| Tom| | 3|Naomi| +---+-----+
例2
from os.path import abspath from pyspark.sql import SparkSession from pyspark.sql import SQLContext # warehouse_location points to the default location for managed databases and tables warehouse_location = abspath('spark-warehouse') spark = SparkSession.builder \ .appName("PySpark Demo") \ .config("spark.sql.warehouse.dir", warehouse_location) \ .enableHiveSupport() \ .getOrCreate() spark_context = spark.sparkContext spark_context._jsc.hadoopConfiguration().set("mapreduce.fileoutputcommitter.algorithm.version", "2") # ロガー logger = spark_context._jvm.org.apache.log4j.LogManager.getLogger( "hello_spark") logger.info("Start") # SQLContext設定. sql_context = SQLContext(spark_context) # CSV読み込み customers = sql_context.read.format("com.databricks.spark.csv").options( delimiter=",", charset="UTF-8", header=True ).load("C:\\work\\hello\\customer.csv").toDF( "id", "name", "sex") # ビュー登録. customers.createOrReplaceTempView("customers") # SQL実行. result = sql_context.sql( "SELECT id, name, sex FROM customers AS c WHERE c.id=2" ).toDF("ID", "Name", "Sex") # ファイル出力. result.write.mode("overwrite").csv("C:\\work\\hello\\out") # 標準出力. result.show() logger.info("Done")
customer.csv
id,name,sex 1,Mike,m 2,Tom,m 3,Naomi,f
出力結果
+---+----+---+ | ID|Name|Sex| +---+----+---+ | 2| Tom| m| +---+----+---+
参考文献
http://mogile.web.fc2.com/spark/spark210/sql-programming-guide.html#hive-tables
https://www.tutorialspoint.com/spark_sql/spark_sql_hive_tables.htm
https://qiita.com/tomizawa-masaru/items/7238c42e214eca5ee788
関連記事
PySpark ~ 環境設定 + Hello World編 ~
https://dk521123.hatenablog.com/entry/2019/11/14/221126
PySpark ~ CSV編 ~
https://dk521123.hatenablog.com/entry/2019/11/24/225534
PySpark ~ DataFrame / データ操作編 ~
https://dk521123.hatenablog.com/entry/2020/01/04/150942
PySpark ~ DataFrame / テーブル・項目操作編 ~
https://dk521123.hatenablog.com/entry/2020/05/18/154829
PySpark ~ ユーザ定義関数 UDF 編 ~
https://dk521123.hatenablog.com/entry/2020/05/20/195621
PySpark ~ データをクリーニングする ~
https://dk521123.hatenablog.com/entry/2020/07/08/162552
Hive / HiveQL ~ あれこれ編 ~
https://dk521123.hatenablog.com/entry/2020/03/04/225943
Hive / HiveQL ~ 入門編 ~
https://dk521123.hatenablog.com/entry/2020/02/25/231235