地方在住IT系ニート

bkds

Airflowのメモ

はじめに

Airflowはオープンソースでバッチ処理の開発・スケジュール・監視ができるアプリケーションです。
Pythonで構築されており、バッチ手順をコード化することでより柔軟に管理・作成することができます。

インストール

Dockerを利用する場合は、それぞれ下記のイメージが利用できます。
Dockerが入っていない場合は、Dockerインストールを参照ください。

  • apache/airflow
# Airflowを管理するディレクトリを設定
export AIRFLOW_HOME=~/airflow

# インストールするバージョンの指定
AIRFLOW_VERSION=2.6.2

# Pythonのバージョン取得
PYTHON_VERSION="$(python3 --version | cut -d " " -f 2 | cut -d "." -f 1-2)"

# インストールに必要なファイルをファイルパスを取得
CONSTRAINT_URL="https://raw.githubusercontent.com/apache/airflow/constraints-${AIRFLOW_VERSION}/constraints-${PYTHON_VERSION}.txt"

# インストール
pip3 install "apache-airflow==${AIRFLOW_VERSION}" --constraint "${CONSTRAINT_URL}"

初期設定

Airflowに利用するデータベースとユーザを作成する。

CREATE DATABASE airflow_db CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;
CREATE USER 'airflow_user' IDENTIFIED BY 'airflow_pass';
GRANT ALL PRIVILEGES ON airflow_db.* TO 'airflow_user';

Airflowの初期DBがSQLiteになっているので、MySQLに変更する。

[core]
executor = LocalExecutor

[database]
sql_alchemy_conn = mysql+mysqldb://airflow_user:airflow_pass@localhost/airflow_db
# データベースの初期化
airflow db init 

# ユーザの登録
airflow users create \
    --username admin \
    --firstname Peter \
    --lastname Parker \
    --role Admin \
    --email [email protected]

スケジューラー

DAGのスケジュールにはcron方式とtimedelta方式があります。
ここではcronを利用したいと思います。
シンプルなルールであればAirflowで準備された下記のtimetableを利用することができます。

CronTriggerTimetable

下記の設定では2024年9月15日から毎日1時に起動します。

timetable=CronTriggerTimetable("0 1 * * *", timezone=pendulum.timezone("Asia/Tokyo")),
start_date=pendulum.datetime(2024, 9, 15, tz=pendulum.timezone("Asia/Tokyo")),

CronDataIntervalTimetable

cronの起動時刻の間隔に注目した設定です。
前回起動から今回起動のデータを処理するイメージです。
設定によって初回起動時にCronTriggerTimetableとは異なる動作をします。
Data interval startがStart date以降かつData interval endを迎えたタイミングで実行されます。
下記の設定では2024年9月16日から毎日1時に起動します。
15日の1時起動はData interval startが14日の1時のため実行されません。

timetable=CronDataIntervalTimetable("0 1 * * *", timezone=pendulum.timezone("Asia/Tokyo")),
start_date=pendulum.datetime(2024, 9, 15, tz=pendulum.timezone("Asia/Tokyo")),

Airflowコマンド

起動

バックグランドでスケジューラーとサーバを起動します。

airflow scheduler -D
airflow webserver -D

Dagの確認

下記コマンドで現在登録されているDagを確認できます。

airflow dags list

エラーが有る場合は、下記コマンドで原因を確認できます。

airflow dags list-import-errors

Taskの確認

特定のDagに入っているTaskを確認するコマンドです。

# リスト形式
airflow tasks list {Dag名}

# ツリー形式
airflow tasks list {Dag名} --tree

ローカル実行

Taskをローカル実行するコマンドです。

airflow tasks test {Dag名} {Task名}

メモ

SQLiteからMySQLにデータ移行

AirflowのデフォルトDBがSQLiteのため、途中からMySQLに変更する方法をメモします。
特にデータを引き継ぐ必要がなければ、初期設定から実施すれば大丈夫です。

SQLiteのデータをそのままインサートすることはできないため、SQLite3 to MySQLを利用します。
-fでSQLiteのデータベースファイルを指定し、-dでMySQLのデータベースを指定します。
他のオプションは、MySQLへの接続に利用します。

pip install sqlite3-to-mysql
sqlite3mysql -f ~/airflow/airflow.db -d airflow_db -uroot -h localhost -p

Hashicorp Vault

from airflow.providers.hashicorp.hooks.vault import VaultHook

vaultHook = VaultHook()
secretDict = vaultHook.get_secret("secret_path")

Log Mask

from airflow.utils.log.secrets_masker import mask_secret
mask_secret(mask_variable)

Remote log

S3に保存を参照してください。

にほんブログ村 IT技術ブログ IT技術メモへPVアクセスランキング にほんブログ村