【Airflow】Apache Airflow ~ 基本編 ~

■ はじめに

https://dk521123.hatenablog.com/entry/2021/07/18/004531
https://dk521123.hatenablog.com/entry/2021/07/24/233012

の続き。

Apache Airflow の基本的な構成などをメモしておく。

目次

【1】Airflow の基本構成
 1)Web Server
 2)Scheduler
 3)Executer
【2】Operator(オペレータ)
 1)BashOperator
 2)PythonOperator
 3)EmailOperator
【3】DAG(Directed acyclic graph;有向非巡回グラフ)
 1)DAG の定義方法
 2)DAGの依存関係の指定
【4】サンプル

【1】Airflow の基本構成

以下の3つで構成されている。

1)Web Server
2)Scheduler
3)Executer

1)Web Server

* 管理画面表示部
* Python の Web アプリケーションフレームワーク Flask で実装

2)Scheduler

* Job実行のスケジュール管理部

3)Executer

* Job実行部

【2】Operator(オペレータ)

* ETL の Job 単位を定義するクラス
* この オペレーターを DAG で関連づけてETL処理を定義する

https://airflow.apache.org/docs/apache-airflow/stable/howto/operator/index.html

1)BashOperator

* Bashシェルを実行する際のクラス

2)PythonOperator

* Pythonを実行する際のクラス

3)EmailOperator

* Email を送信するためのクラス

https://airflow.apache.org/docs/apache-airflow/stable/_api/airflow/operators/email/index.html

4)Sensor

* 指定時間、ファイル等を取得できるまで待つためのクラス

https://airflow.apache.org/docs/apache-airflow/stable/_api/airflow/sensors/index.html

【3】DAG(Directed acyclic graph;有向非巡回グラフ)

https://airflow.apache.org/docs/apache-airflow/stable/concepts/dags.html

1)DAG の定義方法

表記法 - 1

with DAG("my_dag_name") as dag:
  op = DummyOperator(task_id="task")

表記法 - 2

my_dag = DAG("my_dag_name")
op = DummyOperator(task_id="task", dag=my_dag)

表記法 - 3

@dag(start_date=days_ago(2))
def generate_dag():
  op = DummyOperator(task_id="task")

dag = generate_dag()

2)DAGの依存関係の指定

Airflow1.8以降

job1 >> job2

# 「job2 << job1」でも可能

Airflow1.8より前のVersion

job1.set_downstream(job2)

# 「job2.set_upstream(job1)」でも可能

【4】サンプル

from datetime import timedelta
from textwrap import dedent

# The DAG object; we'll need this to instantiate a DAG
from airflow import DAG

# Operators; we need this to operate!
from airflow.operators.bash import BashOperator
from airflow.utils.dates import days_ago
from airflow.operators.python import PythonOperator
import logging

from airflow.utils.trigger_rule import TriggerRule

# 今回呼び出すhello world関数
def say_hello():
  logging.info('hello world')

# These args will get passed on to each operator
# You can override them on a per-task basis during operator initialization
default_args = {
  'owner': 'your-name',
  'depends_on_past': False,
  'email': ['your-email@gmail.com'],
  'email_on_failure': False,
  'email_on_retry': False,
  'retries': 1,
  'retry_delay': timedelta(minutes=5),
  # 'queue': 'bash_queue',
  # 'pool': 'backfill',
  # 'priority_weight': 10,
  # 'end_date': datetime(2016, 1, 1),
  # 'wait_for_downstream': False,
  # 'dag': dag,
  # 'sla': timedelta(hours=2),
  # 'execution_timeout': timedelta(seconds=300),
  # 'on_failure_callback': some_function,
  # 'on_success_callback': some_other_function,
  # 'on_retry_callback': another_function,
  # 'sla_miss_callback': yet_another_function,
  # 'trigger_rule': 'all_success'
}

with DAG(
  'hello_world',
  default_args=default_args,
  description='This is a simple demo DAG for Hello World',
  schedule_interval=timedelta(days=1),
  start_date=days_ago(2),
  tags=['hello_world'],
) as dag:
  # job1, job2 and job3 are examples of tasks
  #  created by instantiating operators
  job1 = BashOperator(
    task_id='printing_date',
    bash_command='date',
  )
  job2 = BashOperator(
    task_id='sleep_5',
    depends_on_past=False,
    bash_command='sleep 5',
    retries=3,
  )
  job1.doc_md = dedent(
    """\
    #### Task Documentation
    You can document your task using the attributes `doc_md` (markdown),
    `doc` (plain text), `doc_rst`, `doc_json`, `doc_yaml` which gets
    rendered in the UI's Task Instance Details page.
    ![img](http://montcs.bloomu.edu/~bobmon/Semesters/2012-01/491/import%20soul.png)
    """
  )
  dag.doc_md = __doc__  # providing that you have a docstring at the beggining of the DAG
  dag.doc_md = """
    This is a documentation placed anywhere
  """  # otherwise, type it like this
  templated_command = dedent(
    """
   {% for i in range(5) %}
        echo "{{ ds }}"
        echo "{{ macros.ds_add(ds, 7)}}"
        echo "{{ params.my_param }}"
    {% endfor %}
    """
  )
  job3 = BashOperator(
    task_id='templated_command',
    depends_on_past=False,
    bash_command=templated_command,
    params={'my_param': 'Parameter I passed in'},
  )
  # hello world用のPythonOperator
  job4 = PythonOperator(
    task_id='say_hello',
    python_callable=say_hello,
    trigger_rule=TriggerRule.ONE_SUCCESS,
    dag=dag
  )

  job1 >> job2 >> job4
  job1 >> job3 >> job4

参考文献

https://future-architect.github.io/articles/20200131/
https://analytics.livesense.co.jp/entry/2018/02/06/132842

関連記事

Apache Airflow ~ 環境構築編 ~
https://dk521123.hatenablog.com/entry/2021/07/18/004531
Apache Airflow ~ 入門編 ~
https://dk521123.hatenablog.com/entry/2021/07/24/233012

【Python】 Pandas ~ apply / transform ~

■ はじめに

Pandas の apply() と transform() について、
触れる機会があったので、メモしておく。

目次

【1】apply()
【2】transform()

【1】apply()

* 行全体や列全体に対して、同じ操作をしたいときに使用する

API
https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.apply.html

サンプル

import pandas as pd

data_frame = pd.DataFrame([
  ['Mike', 21],
  ['Tom', 19],
  ['Jimmy', 34],
], columns=["name", "age"])

print(data_frame)

print('**************')

data_frame['name'] = data_frame['name'].apply(
  lambda x: x if (x == 'Mike') else (x.lower()))
data_frame['age'] = data_frame['age'].apply(
  lambda x: x + 1)

print(data_frame)

出力結果

    name  age
0   Mike   21
1    Tom   19
2  Jimmy   34
**************
    name  age
0   Mike   22
1    tom   20
2  jimmy   35

参考文献

https://note.nkmk.me/python-pandas-map-applymap-apply/

【2】transform()

* グループごとの統計情報を使ってすべての行を集計したい場合に使用する

API
https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.transform.html

サンプル

import pandas as pd

data_frame = pd.DataFrame([
  ['Tokyo', '2021-07-01', 28],
  ['Tokyo', '2021-07-02', 35],
  ['Tokyo', '2021-07-03', 34],
  ['Osaka', '2021-07-01', 27],
  ['Osaka', '2021-07-02', 40],
  ['Osaka', '2021-07-03', 38],
], columns=["city", "date", "temperature"])

print(data_frame)

print('**************')

data_frame['mean'] = \
  data_frame.groupby(['city'])['temperature'].transform(lambda x: x.mean())
print(data_frame)

出力結果

    city        date  temperature
0  Tokyo  2021-07-01           28
1  Tokyo  2021-07-02           35
2  Tokyo  2021-07-03           34
3  Osaka  2021-07-01           27
4  Osaka  2021-07-02           40
5  Osaka  2021-07-03           38
**************
    city        date  temperature       mean
0  Tokyo  2021-07-01           28  32.333333
1  Tokyo  2021-07-02           35  32.333333
2  Tokyo  2021-07-03           34  32.333333
3  Osaka  2021-07-01           27  35.000000
4  Osaka  2021-07-02           40  35.000000
5  Osaka  2021-07-03           38  35.000000

参考文献

https://qiita.com/greenteabiscuit/items/132e0f9b1479926e07e0
https://toonanote.com/pandas-%E3%81%AE-apply-%E3%81%A8-transform-%E3%81%AE%E9%81%95%E3%81%84%E3%82%92%E7%9F%A5%E3%82%8B/

関連記事

Pandas ~入門編 ~
https://dk521123.hatenablog.com/entry/2019/10/22/014957
Pandas ~ 基本編 ~
https://dk521123.hatenablog.com/entry/2020/10/14/000000
Pandas ~ 基本編 / データのクレンジング ~
https://dk521123.hatenablog.com/entry/2020/04/06/235555
Pandas ~ データ集計編 ~
https://dk521123.hatenablog.com/entry/2021/04/07/105858
Python ~ 基本編 / CSV
https://dk521123.hatenablog.com/entry/2019/11/07/214108
NumPy ~ 入門編 ~
https://dk521123.hatenablog.com/entry/2018/03/28/224532
Python 3 エンジニア認定データ分析試験
https://dk521123.hatenablog.com/entry/2020/12/12/000000

【Airflow】Apache Airflow ~ 入門編 ~

■ はじめに

https://dk521123.hatenablog.com/entry/2021/07/18/004531

の続き。
今回は、Apache Airflow に関する Hello world的なことを行ってみる。

目次

【1】参考になるサイト
【2】サンプル
【3】実行コマンド
 1)コードの確認
 2)DAGの確認
 3)各タスクのテスト
 4)DAGを実行する
 5)スケジューラ起動

【1】参考になるサイト

* 以下の公式サイトのチュートリアルを参考に行う

https://airflow.apache.org/docs/apache-airflow/stable/tutorial.html

【2】サンプル

* 以下のコードを「~/airflow/dags」配下に保存する
* 「~/airflow/airflow.cfg」の「dags_folder」に保存先が設定されている
~~~~~~
[core]
# The folder where your airflow pipelines live, most likely a
# subfolder in a code repository. This path must be absolute.
dags_folder = /home/your-user-name/airflow/dags
~~~~~~
* 名前は「hello_world」とする

~/airflow/dags/hello_world_dag.py

from datetime import timedelta
from textwrap import dedent

# The DAG object; we'll need this to instantiate a DAG
from airflow import DAG

# Operators; we need this to operate!
from airflow.operators.bash import BashOperator
from airflow.utils.dates import days_ago


# These args will get passed on to each operator
# You can override them on a per-task basis during operator initialization
default_args = {
  'owner': 'your-name',
  'depends_on_past': False,
  'email': ['your-email@gmail.com'],
  'email_on_failure': False,
  'email_on_retry': False,
  'retries': 1,
  'retry_delay': timedelta(minutes=5),
  # 'queue': 'bash_queue',
  # 'pool': 'backfill',
  # 'priority_weight': 10,
  # 'end_date': datetime(2016, 1, 1),
  # 'wait_for_downstream': False,
  # 'dag': dag,
  # 'sla': timedelta(hours=2),
  # 'execution_timeout': timedelta(seconds=300),
  # 'on_failure_callback': some_function,
  # 'on_success_callback': some_other_function,
  # 'on_retry_callback': another_function,
  # 'sla_miss_callback': yet_another_function,
  # 'trigger_rule': 'all_success'
}

# DAG(Directed acyclic graph;有向非巡回グラフ)を作成する
# 名前は「hello_world」とする
with DAG(
  'hello_world',
  default_args=default_args,
  description='This is a simple demo DAG for Hello World',
  schedule_interval=timedelta(days=1),
  start_date=days_ago(2),
  tags=['hello_world'],
) as dag:
  # job1, job2 and job3 are examples of tasks
  #  created by instantiating operators
  job1 = BashOperator(
    task_id='printing_date',
    bash_command='date',
  )
  job2 = BashOperator(
    task_id='sleep_5',
    depends_on_past=False,
    bash_command='sleep 5',
    retries=3,
  )
  job1.doc_md = dedent(
    """\
    #### Task Documentation
    You can document your task using the attributes `doc_md` (markdown),
    `doc` (plain text), `doc_rst`, `doc_json`, `doc_yaml` which gets
    rendered in the UI's Task Instance Details page.
    ![img](http://montcs.bloomu.edu/~bobmon/Semesters/2012-01/491/import%20soul.png)
    """
  )
  dag.doc_md = __doc__  # providing that you have a docstring at the beggining of the DAG
  dag.doc_md = """
    This is a documentation placed anywhere
  """  # otherwise, type it like this
  templated_command = dedent(
    """
   {% for i in range(5) %}
        echo "{{ ds }}"
        echo "{{ macros.ds_add(ds, 7)}}"
        echo "{{ params.my_param }}"
    {% endfor %}
    """
  )
  job3 = BashOperator(
    task_id='templated_command',
    depends_on_past=False,
    bash_command=templated_command,
    params={'my_param': 'Parameter I passed in'},
  )
  job1 >> [job2, job3]

【3】実行コマンド

1)コードの確認

https://airflow.apache.org/docs/apache-airflow/stable/tutorial.html#running-the-script

# 以下を実行し、エラーが出ないことを確認
python ~/airflow/dags/hello_world_dag.py

2)DAGの確認

https://airflow.apache.org/docs/apache-airflow/stable/tutorial.html#command-line-metadata-validation

# print the list of active DAGs
airflow dags list

# prints the list of tasks in the "hello_world" DAG
airflow tasks list hello_world

# prints the hierarchy of tasks in the "hello_world" DAG
airflow tasks list hello_world --tree

3)各タスクのテスト

https://airflow.apache.org/docs/apache-airflow/stable/tutorial.html#id2

airflow tasks test hello_world printing_date 2021-07-24

airflow tasks test hello_world sleep_5 2021-07-24

airflow tasks test hello_world templated_command 2021-07-24

4)DAGを実行する

https://airflow.apache.org/docs/apache-airflow/stable/tutorial.html#backfill

airflow dags backfill hello_world \
    --start-date 2021-07-24 \
    --end-date 2021-07-26

5)スケジューラ起動

airflow scheduler

参考文献

https://qiita.com/chan-p/items/526bbed95fdc73142c59

■ 関連記事

Apache Airflow ~ 環境構築編 ~
https://dk521123.hatenablog.com/entry/2021/07/18/004531
Apache Airflow ~ 基本編 ~
https://dk521123.hatenablog.com/entry/2021/07/28/234319