Airflow Operational Considerations
Airflow Operational Considerations¶
We use Airflow to orchestrate our data ingest processes. This page describes how to handle cases where an Airflow DAG task fails. For general information about Airflow development, see the Airflow README in the data-infra GitHub repo.
When an Airflow DAG task fails, it will alert to the
#data-infra-alerts channel in Slack.
In that case, someone should respond according to the considerations described below and in each individual DAG’s documentation (available in each DAG subdirectory in the
data-infra GitHub repository).
Considerations for re-running or clearing DAGs¶
Below are considerations to take into account when re-running or clearing DAGs to address failures. You can consult the individual DAG’s documentation for information on which of the following categories each DAG falls into.
Now vs. data interval processing¶
There are roughly two types of Airflow DAGs in our system:
“Now” DAGs - mostly for executing code on a schedule (often scraping current data, or a fancy cron job), NOT orchestrating distributed processing of existing data
When these DAGs fail, and you’d like to re-run them, you should execute a new manual run rather than clearing a historical run.
Only the actual execution time matters if relevant (usually for timestamping data or artifacts)
Generally safe but not useful to execute multiple times simultaneously
There is no concept of backfilling via these DAGs
“Data interval processing” DAGs - these DAGs orchestrate processing of previously-captured data, or data than can be retrieved in a timestamped manner
When these DAGs fail, you should clear the historical task instances that failed. (Generally, these DAGs are expected to be 100% successful.)
Failures in these jobs may cause data to be missing from the data warehouse in unexpected ways: if a parse job fails, then the data that should have been processed will not be available in the warehouse. Sometimes this is resolved easily by clearing the failed parse job so that the data will be picked up in the next warehouse run (orchestrated by the
transform_warehouseDAG). However, because the data warehouse uses incremental models, it’s possible that if the failed job is not cleared quickly enough the missing data will not be picked up because the incremental lookback period will have passed.
Relies heavily on the
May not be entirely idempotent though we try; for example, validating RT data depends on Schedule data which may be late-arriving
Backfilling can generally be performed by clearing past task instances and letting them re-run
We try to avoid
depends_on_pastDAGs, so parallelization is possible during backfills
Scheduled vs. ad-hoc¶
Additionally, DAGs can either be scheduled or ad-hoc:
Scheduled DAGs are designed to be run regularly, based on the cron schedule set in the DAG’s
METADATA.ymlfile. All “data interval processing” DAGs will be scheduled.
Ad-hoc DAGs are designed to be run as one-offs, to automate a workflow that is risky or difficult for an individual user to run locally. These will have
schedule_interval: Nonein their
METADATA.ymlfiles. Only “now” DAGs can be ad-hoc.
How to clear a DAG or DAG task¶
Failures can be cleared (re-run) via the Airflow user interface (accessible via Composer here.)
This Airflow guide can help you use and interpret the Airflow UI.
The following DAGs may still be listed in the Airflow UI even though they are deprecated or indefinitely paused. They never need to be re-run. (They show up in the UI because the Airflow database has historical DAG/task entries even though the code has been deleted.)
When restarting a failed
PodOperator run, check the logs before restarting. If the logs show any indication that the prior run’s pod was not killed (for example, if the logs cut off abruptly without showing an explicit task failure), you should check that the pod associated with the failed run task has in fact been killed before clearing or restarting the Airflow task. If you don’t know how to check a pod status, please ask in the
#data-infra channel on Slack before proceeding.
Backfilling from the command line¶
From time-to-time some DAGs may need to be re-ran in order to populate new data.
Subject to the considerations outlined above, backfilling can be performed by clearing historical runs in the web interface, or via the CLI:
gcloud composer environments run calitp-airflow-prod --location=us-west2 backfill -- --start_date 2021-04-18 --end_date 2021-11-03 -x --reset_dagruns -y -t "gtfs_schedule_history_load" -i gtfs_loader