【分散処理】PySpark ~ Hive ~

■ はじめに

https://dk521123.hatenablog.com/entry/2019/11/14/221126
https://dk521123.hatenablog.com/entry/2019/11/24/225534
https://dk521123.hatenablog.com/entry/2020/01/04/150942

の続き。
今回は、Python から Hive / HiveQL を使えるやり方を
調べたので纏める

■ サンプル

例1

from os.path import expanduser, join, abspath

from pyspark.sql import SparkSession
from pyspark.sql import Row

def main():
  # warehouse_location points to the default location for managed databases and tables
  warehouse_location = abspath('spark-warehouse')

  spark = SparkSession.builder \
    .appName("PySpark Demo") \
    .config("spark.sql.warehouse.dir", warehouse_location) \
    .enableHiveSupport() \
    .getOrCreate()
  
  # spark is an existing SparkSession
  spark.sql("DROP TABLE IF EXISTS hello")
  spark.sql("CREATE TABLE hello (key INT, value STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LINES TERMINATED BY '\n'")
  # Linux
  # spark.sql("LOAD DATA LOCAL INPATH '/home/user/hello.csv' INTO TABLE hello")
  spark.sql("LOAD DATA LOCAL INPATH 'C:/work/hello/hello.csv' INTO TABLE hello")

  # Queries are expressed in HiveQL
  df = spark.sql("SELECT * FROM hello")
  df.show()

if __name__ == '__main__':
  main()

hello.csv

1,Mike
2,Tom
3,Naomi

出力結果

+---+-----+
|key|value|
+---+-----+
|  1| Mike|
|  2|  Tom|
|  3|Naomi|
+---+-----+

例2

from os.path import abspath

from pyspark.sql import SparkSession
from pyspark.sql import SQLContext

# warehouse_location points to the default location for managed databases and tables
warehouse_location = abspath('spark-warehouse')

spark = SparkSession.builder \
  .appName("PySpark Demo") \
  .config("spark.sql.warehouse.dir", warehouse_location) \
  .enableHiveSupport() \
  .getOrCreate()
spark_context = spark.sparkContext
spark_context._jsc.hadoopConfiguration().set("mapreduce.fileoutputcommitter.algorithm.version", "2")

# ロガー
logger = spark_context._jvm.org.apache.log4j.LogManager.getLogger(
  "hello_spark")
logger.info("Start")

# SQLContext設定.
sql_context = SQLContext(spark_context)

# CSV読み込み
customers = sql_context.read.format("com.databricks.spark.csv").options(
  delimiter=",", charset="UTF-8", header=True
  ).load("C:\\work\\hello\\customer.csv").toDF(
    "id", "name", "sex")
# ビュー登録.
customers.createOrReplaceTempView("customers")

# SQL実行.
result = sql_context.sql(
  "SELECT id, name, sex FROM customers AS c WHERE c.id=2"
).toDF("ID", "Name", "Sex")

# ファイル出力.
result.write.mode("overwrite").csv("C:\\work\\hello\\out")

# 標準出力.
result.show()

logger.info("Done")

customer.csv

id,name,sex
1,Mike,m
2,Tom,m
3,Naomi,f

出力結果

+---+----+---+
| ID|Name|Sex|
+---+----+---+
|  2| Tom|  m|
+---+----+---+

参考文献

http://mogile.web.fc2.com/spark/spark210/sql-programming-guide.html#hive-tables
https://www.tutorialspoint.com/spark_sql/spark_sql_hive_tables.htm
https://qiita.com/tomizawa-masaru/items/7238c42e214eca5ee788

関連記事

PySpark ~ 環境設定 + Hello World編 ~
https://dk521123.hatenablog.com/entry/2019/11/14/221126
PySpark ~ CSV編 ~
https://dk521123.hatenablog.com/entry/2019/11/24/225534
PySpark ~ DataFrame / データ操作編 ~
https://dk521123.hatenablog.com/entry/2020/01/04/150942
PySpark ~ DataFrame / テーブル・項目操作編 ~
https://dk521123.hatenablog.com/entry/2020/05/18/154829
PySpark ~ ユーザ定義関数 UDF 編 ~
https://dk521123.hatenablog.com/entry/2020/05/20/195621
PySpark ~ データをクリーニングする ~
https://dk521123.hatenablog.com/entry/2020/07/08/162552
Hive / HiveQL ~ あれこれ編 ~
https://dk521123.hatenablog.com/entry/2020/03/04/225943
Hive / HiveQL ~ 入門編 ~
https://dk521123.hatenablog.com/entry/2020/02/25/231235