Airflowのメモ
目次
はじめに
Airflowはオープンソースでバッチ処理の開発・スケジュール・監視ができるアプリケーションです。
Pythonで構築されており、バッチ手順をコード化することでより柔軟に管理・作成することができます。
インストール
Dockerを利用する場合は、それぞれ下記のイメージが利用できます。
Dockerが入っていない場合は、Dockerインストールを参照ください。
- apache/airflow
# Airflowを管理するディレクトリを設定
export AIRFLOW_HOME=~/airflow
# インストールするバージョンの指定
AIRFLOW_VERSION=2.6.2
# Pythonのバージョン取得
PYTHON_VERSION="$(python3 --version | cut -d " " -f 2 | cut -d "." -f 1-2)"
# インストールに必要なファイルをファイルパスを取得
CONSTRAINT_URL="https://raw.githubusercontent.com/apache/airflow/constraints-${AIRFLOW_VERSION}/constraints-${PYTHON_VERSION}.txt"
# インストール
pip3 install "apache-airflow==${AIRFLOW_VERSION}" --constraint "${CONSTRAINT_URL}"
初期設定
Airflowに利用するデータベースとユーザを作成する。
CREATE DATABASE airflow_db CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;
CREATE USER 'airflow_user' IDENTIFIED BY 'airflow_pass';
GRANT ALL PRIVILEGES ON airflow_db.* TO 'airflow_user';
Airflowの初期DBがSQLiteになっているので、MySQLに変更する。
[core]
executor = LocalExecutor
[database]
sql_alchemy_conn = mysql+mysqldb://airflow_user:airflow_pass@localhost/airflow_db
# データベースの初期化
airflow db init
# ユーザの登録
airflow users create \
--username admin \
--firstname Peter \
--lastname Parker \
--role Admin \
--email [email protected]
スケジューラー
DAGのスケジュールにはcron方式とtimedelta方式があります。
ここではcronを利用したいと思います。
シンプルなルールであればAirflowで準備された下記のtimetableを利用することができます。
CronTriggerTimetable
下記の設定では2024年9月15日から毎日1時に起動します。
timetable=CronTriggerTimetable("0 1 * * *", timezone=pendulum.timezone("Asia/Tokyo")),
start_date=pendulum.datetime(2024, 9, 15, tz=pendulum.timezone("Asia/Tokyo")),
CronDataIntervalTimetable
cronの起動時刻の間隔に注目した設定です。
前回起動から今回起動のデータを処理するイメージです。
設定によって初回起動時にCronTriggerTimetableとは異なる動作をします。
Data interval startがStart date以降かつData interval endを迎えたタイミングで実行されます。
下記の設定では2024年9月16日から毎日1時に起動します。
15日の1時起動はData interval startが14日の1時のため実行されません。
timetable=CronDataIntervalTimetable("0 1 * * *", timezone=pendulum.timezone("Asia/Tokyo")),
start_date=pendulum.datetime(2024, 9, 15, tz=pendulum.timezone("Asia/Tokyo")),
Airflowコマンド
起動
バックグランドでスケジューラーとサーバを起動します。
airflow scheduler -D
airflow webserver -D
Dagの確認
下記コマンドで現在登録されているDagを確認できます。
airflow dags list
エラーが有る場合は、下記コマンドで原因を確認できます。
airflow dags list-import-errors
Taskの確認
特定のDagに入っているTaskを確認するコマンドです。
# リスト形式
airflow tasks list {Dag名}
# ツリー形式
airflow tasks list {Dag名} --tree
ローカル実行
Taskをローカル実行するコマンドです。
airflow tasks test {Dag名} {Task名}
メモ
SQLiteからMySQLにデータ移行
AirflowのデフォルトDBがSQLiteのため、途中からMySQLに変更する方法をメモします。
特にデータを引き継ぐ必要がなければ、初期設定から実施すれば大丈夫です。
SQLiteのデータをそのままインサートすることはできないため、SQLite3 to MySQLを利用します。
-f
でSQLiteのデータベースファイルを指定し、-d
でMySQLのデータベースを指定します。
他のオプションは、MySQLへの接続に利用します。
pip install sqlite3-to-mysql
sqlite3mysql -f ~/airflow/airflow.db -d airflow_db -uroot -h localhost -p
Hashicorp Vault
from airflow.providers.hashicorp.hooks.vault import VaultHook
vaultHook = VaultHook()
secretDict = vaultHook.get_secret("secret_path")
Log Mask
from airflow.utils.log.secrets_masker import mask_secret
mask_secret(mask_variable)
Remote log
S3に保存を参照してください。