Airflow Operational Considerations#

We use Airflow to orchestrate our data ingest processes. This page describes how to handle cases where an Airflow DAG task fails. For general information about Airflow development, see the Airflow README in the data-infra GitHub repo.

Monitoring DAGs#

When an Airflow DAG task fails, it will alert to the #alerts-data-infra channel in Slack.

In that case, someone should respond according to the considerations described below and in each individual DAG’s documentation (available in each DAG subdirectory in the data-infra GitHub repository).

Considerations for re-running or clearing DAGs#

Below are considerations to take into account when re-running or clearing DAGs to address failures. You can consult the individual DAG’s documentation for information on which of the following categories each DAG falls into.

Now vs. data interval processing#

There are roughly two types of Airflow DAGs in our system:

  • “Now” DAGs - mostly for executing code on a schedule (often scraping current data, or a fancy cron job), NOT orchestrating distributed processing of existing data

    • When these DAGs fail, and you’d like to re-run them, you should execute a new manual run rather than clearing a historical run.

    • Only the actual execution time matters if relevant (usually for timestamping data or artifacts)

    • Generally safe but not useful to execute multiple times simultaneously

    • There is no concept of backfilling via these DAGs

  • “Data interval processing” DAGs - these DAGs orchestrate processing of previously-captured data, or data than can be retrieved in a timestamped manner

    • When these DAGs fail, you should clear the historical task instances that failed. (Generally, these DAGs are expected to be 100% successful.)

    • Failures in these jobs may cause data to be missing from the data warehouse in unexpected ways: if a parse job fails, then the data that should have been processed will not be available in the warehouse. Sometimes this is resolved easily by clearing the failed parse job so that the data will be picked up in the next warehouse run (orchestrated by the transform_warehouse DAG). However, because the data warehouse uses incremental models, it’s possible that if the failed job is not cleared quickly enough the missing data will not be picked up because the incremental lookback period will have passed.

    • Relies heavily on the execution_date or data_interval_start/end concepts

    • May not be entirely idempotent though we try; for example, validating RT data depends on Schedule data which may be late-arriving

    • Backfilling can generally be performed by clearing past task instances and letting them re-run

    • We try to avoid depends_on_past DAGs, so parallelization is possible during backfills

Scheduled vs. ad-hoc#

Additionally, DAGs can either be scheduled or ad-hoc:

  • Scheduled DAGs are designed to be run regularly, based on the cron schedule set in the DAG’s METADATA.yml file. All “data interval processing” DAGs will be scheduled.

  • Ad-hoc DAGs are designed to be run as one-offs, to automate a workflow that is risky or difficult for an individual user to run locally. These will have schedule_interval: None in their METADATA.yml files. Only “now” DAGs can be ad-hoc.

How to clear a DAG or DAG task#

Failures can be cleared (re-run) via the Airflow user interface (accessible via Composer here.)

This Airflow guide can help you use and interpret the Airflow UI.

PodOperators#

When restarting a failed run of a DAG that utilizes a PodOperator, check the logs before restarting. If the logs show any indication that the prior run’s pod was not killed (for example, if the logs cut off abruptly without showing an explicit task failure), you should check that the Kubernetes pod associated with the failed run task has in fact been killed before clearing or restarting the Airflow task. Users with proper access to Kubernetes Engine in Google Cloud can check for any live workloads that correspond to the pod referenced in the failed Airflow task’s run logs.

Re-triggering from the command line#

If you need to re-run a large number of DAGs, you can use the dags trigger Airflow 2.x command. Later versions of Airflow may have a different syntax.

This example creates a new DAG run for parse_and_validate_rt for the execution date of August 30, 2025 at 11:15pm UTC in the cal-itp-data-infra-staging environment:

$ gcloud composer environments run calitp-staging-composer --project cal-itp-data-infra-staging --location us-west2 dags trigger -- -e 2025-08-30T23:15:00+00:00 parse_and_validate_rt

Running a large number of tasks#

If you need to process a large number of tasks, you will need to increase the size of the Google Cloud Composer environment, add another scheduler, and increase the pool of running worker instances.

We use Terraform to manage the Composer environment. To tell Terraform to make these changes, open the environment.tf file for your environment, then make the following edits:

--- a/iac/cal-itp-data-infra-staging/composer/us/environment.tf
+++ b/iac/cal-itp-data-infra-staging/composer/us/environment.tf
@@ -16,8 +16,8 @@ resource "google_composer_environment" "calitp-composer" {
      scheduler {
         cpu        = 2
         memory_gb  = 2
         storage_gb = 1
-        count      = 1
+        count      = 2
       }
       web_server {
         cpu        = 1
@@ -29,17 +29,17 @@ resource "google_composer_environment" "calitp-composer" {
         memory_gb  = 13
         storage_gb = 5
         min_count  = 1
-        max_count  = 8
+        max_count  = 32
       }
     }

-    environment_size = "ENVIRONMENT_SIZE_MEDIUM"
+    environment_size = "ENVIRONMENT_SIZE_LARGE"

     software_config {
       image_version = "composer-2.10.2-airflow-2.9.3"

       airflow_config_overrides = {
-        celery-worker_concurrency                  = 4
+        celery-worker_concurrency                  = 6
         core-dag_file_processor_timeout            = 1200
         core-dagbag_import_timeout                 = 600
         core-dags_are_paused_at_creation           = true

You will need to open a pull request in order for Terraform to apply these changes. You will need to revert these changes when you are done.