Airflow Operational Considerations#
We use Airflow to orchestrate our data ingest processes. This page describes how to handle cases where an Airflow DAG task fails. For general information about Airflow development, see the Airflow README in the data-infra GitHub repo.
Monitoring DAGs#
When an Airflow DAG task fails, it will alert to the #alerts-data-infra channel in Slack.
In that case, someone should respond according to the considerations described below and in each individual DAG’s documentation (available in each DAG subdirectory in the data-infra GitHub repository).
Considerations for re-running or clearing DAGs#
Below are considerations to take into account when re-running or clearing DAGs to address failures. You can consult the individual DAG’s documentation for information on which of the following categories each DAG falls into.
Now vs. data interval processing#
There are roughly two types of Airflow DAGs in our system:
“Now” DAGs - mostly for executing code on a schedule (often scraping current data, or a fancy cron job), NOT orchestrating distributed processing of existing data
When these DAGs fail, and you’d like to re-run them, you should execute a new manual run rather than clearing a historical run.
Only the actual execution time matters if relevant (usually for timestamping data or artifacts)
Generally safe but not useful to execute multiple times simultaneously
There is no concept of backfilling via these DAGs
“Data interval processing” DAGs - these DAGs orchestrate processing of previously-captured data, or data than can be retrieved in a timestamped manner
When these DAGs fail, you should clear the historical task instances that failed. (Generally, these DAGs are expected to be 100% successful.)
Failures in these jobs may cause data to be missing from the data warehouse in unexpected ways: if a parse job fails, then the data that should have been processed will not be available in the warehouse. Sometimes this is resolved easily by clearing the failed parse job so that the data will be picked up in the next warehouse run (orchestrated by the
transform_warehouseDAG). However, because the data warehouse uses incremental models, it’s possible that if the failed job is not cleared quickly enough the missing data will not be picked up because the incremental lookback period will have passed.Relies heavily on the
execution_dateordata_interval_start/endconceptsMay not be entirely idempotent though we try; for example, validating RT data depends on Schedule data which may be late-arriving
Backfilling can generally be performed by clearing past task instances and letting them re-run
We try to avoid
depends_on_pastDAGs, so parallelization is possible during backfills
Scheduled vs. ad-hoc#
Additionally, DAGs can either be scheduled or ad-hoc:
Scheduled DAGs are designed to be run regularly, based on the cron schedule set in the DAG’s
METADATA.ymlfile. All “data interval processing” DAGs will be scheduled.Ad-hoc DAGs are designed to be run as one-offs, to automate a workflow that is risky or difficult for an individual user to run locally. These will have
schedule_interval: Nonein theirMETADATA.ymlfiles. Only “now” DAGs can be ad-hoc.
How to clear a DAG or DAG task#
Failures can be cleared (re-run) via the Airflow user interface (accessible via Composer here.)
This Airflow guide can help you use and interpret the Airflow UI.
PodOperators#
When restarting a failed run of a DAG that utilizes a PodOperator, check the logs before restarting. If the logs show any indication that the prior run’s pod was not killed (for example, if the logs cut off abruptly without showing an explicit task failure), you should check that the Kubernetes pod associated with the failed run task has in fact been killed before clearing or restarting the Airflow task. Users with proper access to Kubernetes Engine in Google Cloud can check for any live workloads that correspond to the pod referenced in the failed Airflow task’s run logs.
Re-triggering from the command line#
If you need to re-run a large number of DAGs, you can use the dags trigger Airflow 2.x command. Later versions of Airflow may have a different syntax.
This example creates a new DAG run for parse_and_validate_rt for the execution date of August 30, 2025 at 11:15pm UTC in the cal-itp-data-infra-staging environment:
$ gcloud composer environments run calitp-staging-composer --project cal-itp-data-infra-staging --location us-west2 dags trigger -- -e 2025-08-30T23:15:00+00:00 parse_and_validate_rt
Running a large number of tasks#
If you need to process a large number of tasks, you will need to increase the size of the Google Cloud Composer environment, add another scheduler, and increase the pool of running worker instances.
We use Terraform to manage the Composer environment. To tell Terraform to make these changes, open the environment.tf file for your environment, then make the following edits:
--- a/iac/cal-itp-data-infra-staging/composer/us/environment.tf
+++ b/iac/cal-itp-data-infra-staging/composer/us/environment.tf
@@ -16,8 +16,8 @@ resource "google_composer_environment" "calitp-composer" {
scheduler {
cpu = 2
memory_gb = 2
storage_gb = 1
- count = 1
+ count = 2
}
web_server {
cpu = 1
@@ -29,17 +29,17 @@ resource "google_composer_environment" "calitp-composer" {
memory_gb = 13
storage_gb = 5
min_count = 1
- max_count = 8
+ max_count = 32
}
}
- environment_size = "ENVIRONMENT_SIZE_MEDIUM"
+ environment_size = "ENVIRONMENT_SIZE_LARGE"
software_config {
image_version = "composer-2.10.2-airflow-2.9.3"
airflow_config_overrides = {
- celery-worker_concurrency = 4
+ celery-worker_concurrency = 6
core-dag_file_processor_timeout = 1200
core-dagbag_import_timeout = 600
core-dags_are_paused_at_creation = true
You will need to open a pull request in order for Terraform to apply these changes. You will need to revert these changes when you are done.