Useful Python Libraries#

The following libraries are available and recommended for use by Cal-ITP data analysts. Dependencies are managed in the data-analyses repo using a uv workspace. Running uv sync from the data-analyses/ root installs everything into a local .venv, which the “Pyproject Local” Jupyter kernel uses automatically.

Table of Contents#

  1. shared utils

  2. calitp-data-analysis
    - Query SQL
    - Accessing Google Cloud Storage Data

  3. Querying Data in BigQuery
    - SQLAlchemy
    - pandas
    - siuba
    - Basic Queries
    - Advanced Queries

  4. Add New Packages

  5. Updating calitp-data-analysis

  6. Appendix: calitp-data-infra

shared utils#

A set of shared utility functions used across analysis projects. Both shared_utils and calitp_data_analysis live in the data-analyses repo as workspace members. They are installed automatically when you run uv sync from the workspace root.

Setup#

From a JupyterHub terminal (or locally):

cd ~/data-analyses
uv sync

This installs shared_utils, calitp_data_analysis, and all baseline dependencies into .venv. Select the “Pyproject Local” kernel in JupyterLab to use this environment.

In notebook#

from calitp_data_analysis import geography_utils

geography_utils.WGS84

See data-analyses/starter_kit for examples on how to use shared_utils for general functions, charts, and maps.

calitp-data-analysis#

calitp-data-analysis is an internal library of utility functions used to access our warehouse data for analysis purposes. It lives in data-analyses/calitp-data-analysis/ as a uv workspace member and is installed automatically by uv sync.

import tbls#

DEPRECATED: tbls will be removed after calitp_data_analysis version 2025.8.10. Instead of AutoTable or the tbls instance, use query_sql() from calitp_data_analysis.sql to connect to and query a SQL database. See the query_sql() section below.

Most notably, you can include import tbls at the top of your notebook to import a table from the warehouse in the form of a tbls:

from calitp_data_analysis.tables import tbls

Example:

from calitp_data_analysis.tables import tbls

tbls.mart_gtfs.dim_agency()

Query SQL#

query_sql#

query_sql() is useful for simple queries against BigQuery datasets. You can optionally pass as_df=True to turn a SQL query result into a pandas DataFrame. If you need to construct a more detailed query or are implementing a query builder that has multiple usecases, you should probably use SQLAlchemy query building instead.

As an example, in a notebook:

from calitp_data_analysis.sql import query_sql

df_dim_agency = query_sql("""
SELECT
    *
FROM cal-itp-data-infra-staging.mart_gtfs.dim_agency
LIMIT 10""", as_df=True)

df_dim_agency.head()
key _gtfs_key feed_key agency_id agency_name agency_url agency_timezone agency_lang agency_phone agency_fare_url agency_email base64_url _dt _feed_valid_from _line_number feed_timezone
0 e4f9bb913a48d7d185e357dd23d1fa92 32b153b0f47951c33dff64b97ef23592 d0d36d32ed3a5d02a127820d1727a0f4 10747950 Stanford Marguerite Shuttle https://transportation.stanford.edu/marguerite America/Los_Angeles en 650-723-9362 None marguerite@stanford.edu aHR0cHM6Ly90cmFuc3BvcnRhdGlvbi1mb3Jtcy5zdGFuZm... 2026-05-05 2026-05-05 03:00:00.927944+00:00 1 America/Los_Angeles
1 bcd643ef83668702b8fd34ca41e67b1e 9577aeef305727a4b79f58811b33e0a4 32e2c7f9bfa4858ac2cfea93cad3f843 19 South San Francisco Shuttle https://www.ssf.net America/Los_Angeles None None None None aHR0cHM6Ly93d3cuaXBzLXN5c3RlbXMuY29tL0dURlMvU2... 2026-05-05 2026-05-05 03:00:00.927944+00:00 1 America/Los_Angeles
2 0f5d6d3c24259f53ad348d20da6fff05 7f6ec44344d975bd7d1db081f8b786f7 ca4ad866044853fc6a11d778af0d3ba1 21 Bell Gardens https://www.bellgardens.org/ America/Los_Angeles None None None None aHR0cHM6Ly93d3cuaXBzLXN5c3RlbXMuY29tL0dURlMvU2... 2026-05-05 2026-05-05 03:00:00.927944+00:00 1 America/Los_Angeles
3 6694235701572c64cfcff45b89c95029 2409671de36209f978fdee80b4f92b11 e7fa2f4ec4884d6190bf13b376717142 23 San Fernando https://ci.san-fernando.ca.us/ America/Los_Angeles None None None None aHR0cHM6Ly93d3cuaXBzLXN5c3RlbXMuY29tL0dURlMvU2... 2026-05-05 2026-05-05 03:00:00.927944+00:00 1 America/Los_Angeles
4 d3489af3aeedddffe8fb7f8c59370dd6 6f9c9d7642880a7e0b35f2279e727878 c0579f61004c588c1bcfc9d9362c00fe 27 Covina https://www.covinaca.gov America/Los_Angeles None None None None aHR0cHM6Ly93d3cuaXBzLXN5c3RlbXMuY29tL0dURlMvU2... 2026-05-05 2026-05-05 03:00:00.927944+00:00 1 America/Los_Angeles

Accessing Google Cloud Storage Data#

GCSPandas#

The GCSPandas class fetches the Google Cloud Storage (GCS) filesystem and surfaces functions to provide analysts a clear and consistent way of accessing data on GCS.

It’s recommended to memoize initialization of the class so that the GCS filesystem is fetched and cached the first time you call it and subsequent calls can reuse that cached filesystem.

from functools import cache

from calitp_data_analysis.gcs_pandas import GCSPandas

@cache
def gcs_pandas():
    return GCSPandas()
read_parquet#

Delegates to pandas.read_parquet, providing GCS Filesystem

gcs_pandas().read_parquet("gs://path/to/your/file.parquet")
read_csv#

Delegates to pandas.read_csv with the file at the path specified in the GCS filesystem

gcs_pandas().read_csv("gs://path/to/your/file.csv")
read_excel#

Delegates to pandas.read_excel with the file at the path specified in the GCS filesystem

gcs_pandas().read_excel("gs://path/to/your/file.xlsx")
data_frame_to_parquet#

Delegates to DataFrame.to_parquet, providing the GCS filesystem

import pandas as pd

data_frame = pd.DataFrame({'col1': ['name1', 'name2']})
gcs_pandas().data_frame_to_parquet(data_frame, "gs://path/to/your/file.parquet")

GCSGeoPandas#

The GCSGeoPandas class fetches the Google Cloud Storage (GCS) filesystem and surfaces functions to provide analysts a clear and consistent way of accessing geospatial resources.

It’s recommended to memoize initialization of the class so that the GCS filesystem is fetched and cached the first time you call it and subsequent calls can reuse that cached filesystem.

from functools import cache

from calitp_data_analysis.gcs_geopandas import GCSGeoPandas

@cache
def gcs_geopandas():
    return GCSGeoPandas()
read_parquet#

Delegates to geopandas.read_parquet, providing GCS Filesystem

gcs_geopandas().read_parquet("gs://path/to/your/file.parquet")
read_file#

Delegates to geopandas.read_file with the file at the path specified in the GCS filesystem

gcs_geopandas().read_file("gs://path/to/your/file.geojson")
geo_data_frame_to_parquet#

Delegates to GeoDataFrame.to_parquet, providing the GCS filesystem

import geopandas as gpd

data = {'col1': ['name1', 'name2'], 'geometry': [...]}
geo_data_frame = gpd.GeoDataFrame(data, crs="EPSG:4326")
gcs_geopandas().geo_data_frame_to_parquet(geo_data_frame, "gs://path/to/your/file.parquet")

Querying Data in BigQuery#

SQLAlchemy#

For more detailed queries or where a query builder pattern is needed for multiple use cases of the same base query, you can use SQLAlchemy’s ORM functionality. One of the biggest benefits of using an ORM like SQLAlchemy is its security features that mitigate risks common to writing raw SQL. Accidentally introducing SQL injection vulnerabilities is one of the most common issues that ORMs help prevent. Other benefits include more manageable and testable code and that ORM models provide an easy-to-reference resource collocated with your code.

Steps to querying with the ORM

  • Declare a model. For each of the tables you need to query, you will need to define a model if there isn’t one already.

  • Construct a query. Full documentation on query construction here.

  • Establish a DB connection and send the query.

    from calitp_data_analysis.sql import get_engine
    from sqlalchemy.orm import sessionmaker
    
    db_engine = get_engine(project="cal-itp-data-infra-staging")
    DBSession = sessionmaker(db_engine)
    
    ...
    
    statement = select(YourModel).where(YourModel.service_date == "2025-12-01")
    
    with DBSession() as session:
        return pd.read_sql(statement, session.bind)
    

    See SQLAlchemy Session Basics for more info.

pandas#

The library pandas is very commonly used in data analysis. See this Pandas Cheat Sheet for more inspiration.

siuba (DEPRECATED)#

DEPRECATED: siuba will be removed from calitp_data_analysis after version 2025.8.10 and other shared code within the next few months. Use SQLAlchemy and Pandas or GeoPandas functions directly instead. siuba uses these under the hood.

siuba is a tool that allows the same analysis code to run on a pandas DataFrame, as well as generate SQL for different databases. It supports most pandas Series methods analysts use. See the siuba docs for more information.

The examples below go through the basics of using siuba, collecting a database query to a local DataFrame, and showing SQL test queries that siuba code generates.

Basic Queries#

query_sql#

from calitp_data_analysis.sql import query_sql

# query agency information, then filter for a single gtfs feed,
# and then count how often each feed key occurs
query_sql(
    """
    SELECT feed_key, COUNT(*) AS n
    FROM mart_gtfs.dim_agency
    WHERE agency_id = 'BA' AND base64_url = 'aHR0cHM6Ly9hcGkuNTExLm9yZy90cmFuc2l0L2RhdGFmZWVkcz9vcGVyYXRvcl9pZD1SRw=='
    GROUP BY feed_key
    ORDER BY n DESC
    LIMIT 5
    """
)

siuba equivalent (DEPRECATED)#

DEPRECATED: We are removing siuba from the codebase. This is only here for illustration. Please do not introduce new siuba code.

from calitp_data_analysis.tables import tbls
from siuba import _, filter, count, collect, show_query

# query agency information, then filter for a single gtfs feed,
# and then count how often each feed key occurs
(tbls.mart_gtfs.dim_agency()
    >> filter(_.agency_id == 'BA', _.base64_url == 'aHR0cHM6Ly9hcGkuNTExLm9yZy90cmFuc2l0L2RhdGFmZWVkcz9vcGVyYXRvcl9pZD1SRw==')
    >> count(_.feed_key)
)

filter & collect#

query_sql#

Use SQL with SELECT and WHERE clauses for filtering

from calitp_data_analysis.sql import query_sql

agencies = query_sql(
    """
    SELECT * 
    FROM cal-itp-data-infra.mart_ntd.dim_annual_service_agencies
    WHERE state = 'CA' AND report_year = 2023
    """,
    as_df=True
)

# Use pandas `head()` method to show first 5 rows of data
agencies.head()

siuba equivalent (DEPRECATED)#

Note that siuba by default prints out a preview of the SQL query results. In order to fetch the results of the query as a pandas DataFrame, run collect().

from calitp_data_analysis.tables import tbls
from siuba import _, collect, filter

annnual_service_agencies = (
    tbls.mart_ntd.dim_annual_service_agencies()
    >> filter(_.state == "CA", _.report_year == 2023)
    >> collect()
)

select#

pandas/geopandas#

import geopandas as gpd

blocks = gpd.read_file('./tl_2020_06_tabblock20.zip')
blocks = blocks[['GEOID20', 'POP20', 'HOUSING20', 'geometry']]

siuba equivalent (DEPRECATED)#

import geopandas as gpd
from siuba import _, select


blocks = gpd.read_file('./tl_2020_06_tabblock20.zip')
blocks = blocks >> select(_.GEOID20, _.POP20, _.HOUSING20, _.geometry)

rename#

pandas/geopandas#

import geopandas as gpd

blocks = gpd.read_file('./tl_2020_06_tabblock20.zip')
blocks = blocks.rename(columns={'GEOID20': 'GEO_ID'})

siuba equivalent (DEPRECATED)#

import geopandas as gpd
from siuba import _, rename

blocks = gpd.read_file('./tl_2020_06_tabblock20.zip')
blocks = blocks >> rename(GEO_ID = _.GEOID20)

inner_join#

pandas/geopandas#

The below code will result in table having both GEO_ID and w_geocode columns with redundant data. To avoid this, you could either (a) first rename one of the columns to match the other and do a simpler merge using just the on parameter (no need then for left_on and right_on) or (b) do as show below and subsequently drop one of the redundant columns.

# df and df2 are both DataFrame
joined = df2.merge(df, left_on='GEO_ID', right_on='w_geocode')

siuba equivalent (DEPRECATED)#

# df and df2 are both DataFrame
joined = df2 >> inner_join(_, df, on={'GEO_ID':'w_geocode'})

Advanced Queries#

If you need to construct a more detailed query or are implementing a query builder that has multiple use cases, you should probably use SQLAlchemy ORM’s query building features.

Reusable base query#

Here’s an example adapted from shared_utils.schedule_rt_utils.filter_dim_gtfs_datasets

# Simplied from shared_utils.schedule_rt_utils.filter_dim_gtfs_datasets

def filter_dim_gtfs_datasets(
    keep_cols: list[str] = ["key", "name", "type", "regional_feed_type", "uri", "base64_url"],
    custom_filtering: dict = None,
    get_df: bool = True,
) -> Union[pd.DataFrame, sqlalchemy.sql.selectable.Select]:
    """
    Filter mart_transit_database.dim_gtfs_dataset table
    and keep only the valid rows that passed data quality checks.
    """
    dim_gtfs_dataset_columns = []

    for column in keep_cols:
        new_column = getattr(DimGtfsDataset, column)
        dim_gtfs_dataset_columns.append(new_column)

    search_conditions = [DimGtfsDataset.data_quality_pipeline == True]

    for k, v in (custom_filtering or {}).items():
        search_conditions.append(getattr(DimGtfsDataset, k).in_(v))

    statement = select(*dim_gtfs_dataset_columns).where(and_(*search_conditions))

    if get_df:
        with DBSession() as session:
            return pd.read_sql(statement, session.bind)
    else:
        return statement

Use of this function for query building

# Simplied from shared_utils.gtfs_utils_v2.schedule_daily_feed_to_gtfs_dataset_name

dim_gtfs_datasets = schedule_rt_utils.filter_dim_gtfs_datasets(
    keep_cols=["key", "name", "type", "regional_feed_type"],
    custom_filtering={"type": ["schedule"]},
    get_df=False, # return a SQLAlchemy statement so that you can continue to build the query
)

statement = (
    dim_gtfs_datasets.with_only_columns(
        DimGtfsDataset.regional_feed_type,
        DimGtfsDataset.type,
        FctDailyScheduleFeed.date,
        FctDailyScheduleFeed.feed_key,
        FctDailyScheduleFeed.gtfs_dataset_key,
        FctDailyScheduleFeed.gtfs_dataset_name,
    )
    .join(
        FctDailyScheduleFeed,
        and_(
            FctDailyScheduleFeed.gtfs_dataset_key == DimGtfsDataset.key,
            FctDailyScheduleFeed.gtfs_dataset_name == DimGtfsDataset.name,
        ),
    )
    .where(FctDailyScheduleFeed.date == selected_date)
)

with DBSession() as session:
    return pd.read_sql(statement, session.bind)

Useful functions for query building#

  • Select.add_columns - Add columns to existing Select clause

  • Select.with_only_columns - Replace columns in an existing Select clause

  • Column.label - Provide an alias for a column you’re selecting. Results in SQL like <columnname> AS <name>.

  • expression.func - Generate SQL function expressions. This is useful for when you need to use a BigQuery-specific expression that is not generically supported in SQLAlchemy. For example:

    from sqlalchemy import String, select, func
    
    ...
    
    select(
       # Produces column like {"caltrans_district": "07 - Los Angeles / Ventura"}  
      func.concat(
          func.lpad(cast(DimCountyGeography.caltrans_district, String), 2, "0"),
          " - ",
          DimCountyGeography.caltrans_district_name,
      ).label("caltrans_district"),
    ).where(...)
    

Add New Packages#

Most packages an analyst needs are already in the workspace baseline (installed by uv sync). If your project needs an additional package:

  1. Workspace-wide dep (useful to many projects): add it to the [project] dependencies in data-analyses/pyproject.toml, then run uv lock && uv sync.

  2. Project-specific dep: add a pyproject.toml in your project’s subdirectory (see portfolio/pyproject.toml for an example), add the project as a workspace member, and run uv lock && uv sync.

Commit the updated uv.lock alongside your changes so teammates get the same versions.

Updating calitp-data-analysis#

calitp-data-analysis now lives in the data-analyses repo as a uv workspace member. To update it:

  1. Edit the source code in data-analyses/calitp-data-analysis/calitp_data_analysis/.

  2. If you’re adding a new dependency, add it to calitp-data-analysis/pyproject.toml and run uv lock from the workspace root.

  3. Open a PR in the data-analyses repo. Your changes take effect for all workspace users after merge — no PyPI publish step needed.

The PyPI package is frozen at its last published version for external consumers (e.g. reports). It does not need to be updated for internal use.

Appendix: calitp-data-infra#

The calitp-data-infra package, used primarily by warehouse mainainers and data pipeline developers, includes utilities that analysts will likely not need need to interact with directly (and therefore generally won’t need to install), but which may be helpful to be aware of. For instance, the get_secret_by_name() and get_secrets_by_label() functions in the package’s auth module are used to interact with Google’s Secret Manager, the service that securely stores API keys and other sensitive information that underpins many of our data syncs.

You can read more about the calitp-data-infra Python package here.