Useful Python Libraries#

The following libraries are available and recommended for use by Cal-ITP data analysts. Dependencies are managed in the data-analyses repo using a uv workspace. Running uv sync from the data-analyses/ root installs everything into a local .venv, which the “Pyproject Local” Jupyter kernel uses automatically.

Table of Contents#

  1. shared utils

  2. calitp-data-analysis
    - Query SQL
    - Accessing Google Cloud Storage Data

  3. Querying Data in BigQuery
    - SQLAlchemy
    - pandas
    - siuba
    - Basic Queries
    - Advanced Queries

  4. Add New Packages

  5. Updating calitp-data-analysis

  6. Appendix: calitp-data-infra

shared utils#

A set of shared utility functions used across analysis projects. Both shared_utils and calitp_data_analysis live in the data-analyses repo as workspace members. They are installed automatically when you run uv sync from the workspace root.

Setup#

From a JupyterHub terminal (or locally):

cd ~/data-analyses
uv sync

This installs shared_utils, calitp_data_analysis, and all baseline dependencies into .venv. Select the “Pyproject Local” kernel in JupyterLab to use this environment.

In notebook#

from calitp_data_analysis import geography_utils

geography_utils.WGS84

See data-analyses/starter_kit for examples on how to use shared_utils for general functions, charts, and maps.

calitp-data-analysis#

calitp-data-analysis is an internal library of utility functions used to access our warehouse data for analysis purposes. It lives in data-analyses/calitp-data-analysis/ as a uv workspace member and is installed automatically by uv sync.

import tbls#

DEPRECATED: tbls will be removed after calitp_data_analysis version 2025.8.10. Instead of AutoTable or the tbls instance, use query_sql() from calitp_data_analysis.sql to connect to and query a SQL database. See the query_sql() section below.

Most notably, you can include import tbls at the top of your notebook to import a table from the warehouse in the form of a tbls:

from calitp_data_analysis.tables import tbls

Example:

from calitp_data_analysis.tables import tbls

tbls.mart_gtfs.dim_agency()

Query SQL#

query_sql#

query_sql() is useful for simple queries against BigQuery datasets. You can optionally pass as_df=True to turn a SQL query result into a pandas DataFrame. If you need to construct a more detailed query or are implementing a query builder that has multiple usecases, you should probably use SQLAlchemy query building instead.

As an example, in a notebook:

from calitp_data_analysis.sql import query_sql

df_dim_agency = query_sql("""
SELECT
    *
FROM cal-itp-data-infra-staging.mart_gtfs.dim_agency
LIMIT 10""", as_df=True)

df_dim_agency.head()
key _gtfs_key feed_key agency_id agency_name agency_url agency_timezone agency_lang agency_phone agency_fare_url agency_email base64_url _dt _feed_valid_from _line_number feed_timezone
0 0a6b3005f15b55c36d7410de0bec017c 030477c8802e075a4d8aec4dca06115e 02e47543885ff7ca07fb0e14a848db3b 2 County Connection http://www.countyconnection.com/ America/Los_Angeles EN 925-676-7500 None None aHR0cDovL2NjY3RhLm9yZy9HVEZTL2dvb2dsZV90cmFuc2... 2026-01-06 2026-01-06 00:00:00+00:00 1 America/Los_Angeles
1 1af2a886017b908bb77093275dcc0b24 e7d07d5ad541db59f0f827ab94d86c95 5e4da850e3c56f72ab0d2867dde97200 2 County Connection http://www.countyconnection.com/ America/Los_Angeles EN 925-676-7500 None None aHR0cDovL2NjY3RhLm9yZy9HVEZTL2dvb2dsZV90cmFuc2... 2026-01-09 2026-01-09 00:00:00+00:00 1 America/Los_Angeles
2 bc03fbd73c743f96bf7bf7bcbe5ee4b7 bc03fbd73c743f96bf7bf7bcbe5ee4b7 2c611838944999887239bbe5af76b8ee 1 SLO Transit https://www.slocity.org/government/department-... America/Los_Angeles en (805) 541-2877 None slotransit@slocity.org aHR0cDovL2RhdGEucGVha3RyYW5zaXQuY29tL3N0YXRpY2... 2026-01-09 2026-01-09 00:00:00+00:00 1 America/Los_Angeles
3 b63515e5a4da9aeacfd36b85cc4ca42c b63515e5a4da9aeacfd36b85cc4ca42c ab159738f2929e515b7c4ac647238148 1 Golden Empire Transit District http://www.getbus.org/ America/Los_Angeles en (661) 869-2438 None None aHR0cDovL2V0YS5nZXRidXMub3JnL3J0dC9wdWJsaWMvdX... 2026-02-12 2026-02-12 03:00:00.971922+00:00 1 America/Los_Angeles
4 9309c476bcf544d526a4ae568e17852b 9309c476bcf544d526a4ae568e17852b 433540dc67834e8453ebe6bca5530a44 1 Golden Empire Transit District http://www.getbus.org/ America/Los_Angeles en (661) 869-2438 None None aHR0cDovL2V0YS5nZXRidXMub3JnL3J0dC9wdWJsaWMvdX... 2026-01-19 2026-01-19 03:00:00+00:00 1 America/Los_Angeles

Accessing Google Cloud Storage Data#

GCSPandas#

The GCSPandas class fetches the Google Cloud Storage (GCS) filesystem and surfaces functions to provide analysts a clear and consistent way of accessing data on GCS.

It’s recommended to memoize initialization of the class so that the GCS filesystem is fetched and cached the first time you call it and subsequent calls can reuse that cached filesystem.

from functools import cache

from calitp_data_analysis.gcs_pandas import GCSPandas

@cache
def gcs_pandas():
    return GCSPandas()
read_parquet#

Delegates to pandas.read_parquet, providing GCS Filesystem

gcs_pandas().read_parquet("gs://path/to/your/file.parquet")
read_csv#

Delegates to pandas.read_csv with the file at the path specified in the GCS filesystem

gcs_pandas().read_csv("gs://path/to/your/file.csv")
read_excel#

Delegates to pandas.read_excel with the file at the path specified in the GCS filesystem

gcs_pandas().read_excel("gs://path/to/your/file.xlsx")
data_frame_to_parquet#

Delegates to DataFrame.to_parquet, providing the GCS filesystem

import pandas as pd

data_frame = pd.DataFrame({'col1': ['name1', 'name2']})
gcs_pandas().data_frame_to_parquet(data_frame, "gs://path/to/your/file.parquet")

GCSGeoPandas#

The GCSGeoPandas class fetches the Google Cloud Storage (GCS) filesystem and surfaces functions to provide analysts a clear and consistent way of accessing geospatial resources.

It’s recommended to memoize initialization of the class so that the GCS filesystem is fetched and cached the first time you call it and subsequent calls can reuse that cached filesystem.

from functools import cache

from calitp_data_analysis.gcs_geopandas import GCSGeoPandas

@cache
def gcs_geopandas():
    return GCSGeoPandas()
read_parquet#

Delegates to geopandas.read_parquet, providing GCS Filesystem

gcs_geopandas().read_parquet("gs://path/to/your/file.parquet")
read_file#

Delegates to geopandas.read_file with the file at the path specified in the GCS filesystem

gcs_geopandas().read_file("gs://path/to/your/file.geojson")
geo_data_frame_to_parquet#

Delegates to GeoDataFrame.to_parquet, providing the GCS filesystem

import geopandas as gpd

data = {'col1': ['name1', 'name2'], 'geometry': [...]}
geo_data_frame = gpd.GeoDataFrame(data, crs="EPSG:4326")
gcs_geopandas().geo_data_frame_to_parquet(geo_data_frame, "gs://path/to/your/file.parquet")

Querying Data in BigQuery#

SQLAlchemy#

For more detailed queries or where a query builder pattern is needed for multiple use cases of the same base query, you can use SQLAlchemy’s ORM functionality. One of the biggest benefits of using an ORM like SQLAlchemy is its security features that mitigate risks common to writing raw SQL. Accidentally introducing SQL injection vulnerabilities is one of the most common issues that ORMs help prevent. Other benefits include more manageable and testable code and that ORM models provide an easy-to-reference resource collocated with your code.

Steps to querying with the ORM

  • Declare a model. For each of the tables you need to query, you will need to define a model if there isn’t one already.

  • Construct a query. Full documentation on query construction here.

  • Establish a DB connection and send the query.

    from calitp_data_analysis.sql import get_engine
    from sqlalchemy.orm import sessionmaker
    
    db_engine = get_engine(project="cal-itp-data-infra-staging")
    DBSession = sessionmaker(db_engine)
    
    ...
    
    statement = select(YourModel).where(YourModel.service_date == "2025-12-01")
    
    with DBSession() as session:
        return pd.read_sql(statement, session.bind)
    

    See SQLAlchemy Session Basics for more info.

pandas#

The library pandas is very commonly used in data analysis. See this Pandas Cheat Sheet for more inspiration.

siuba (DEPRECATED)#

DEPRECATED: siuba will be removed from calitp_data_analysis after version 2025.8.10 and other shared code within the next few months. Use SQLAlchemy and Pandas or GeoPandas functions directly instead. siuba uses these under the hood.

siuba is a tool that allows the same analysis code to run on a pandas DataFrame, as well as generate SQL for different databases. It supports most pandas Series methods analysts use. See the siuba docs for more information.

The examples below go through the basics of using siuba, collecting a database query to a local DataFrame, and showing SQL test queries that siuba code generates.

Basic Queries#

query_sql#

from calitp_data_analysis.sql import query_sql

# query agency information, then filter for a single gtfs feed,
# and then count how often each feed key occurs
query_sql(
    """
    SELECT feed_key, COUNT(*) AS n
    FROM mart_gtfs.dim_agency
    WHERE agency_id = 'BA' AND base64_url = 'aHR0cHM6Ly9hcGkuNTExLm9yZy90cmFuc2l0L2RhdGFmZWVkcz9vcGVyYXRvcl9pZD1SRw=='
    GROUP BY feed_key
    ORDER BY n DESC
    LIMIT 5
    """
)

siuba equivalent (DEPRECATED)#

DEPRECATED: We are removing siuba from the codebase. This is only here for illustration. Please do not introduce new siuba code.

from calitp_data_analysis.tables import tbls
from siuba import _, filter, count, collect, show_query

# query agency information, then filter for a single gtfs feed,
# and then count how often each feed key occurs
(tbls.mart_gtfs.dim_agency()
    >> filter(_.agency_id == 'BA', _.base64_url == 'aHR0cHM6Ly9hcGkuNTExLm9yZy90cmFuc2l0L2RhdGFmZWVkcz9vcGVyYXRvcl9pZD1SRw==')
    >> count(_.feed_key)
)

filter & collect#

query_sql#

Use SQL with SELECT and WHERE clauses for filtering

from calitp_data_analysis.sql import query_sql

agencies = query_sql(
    """
    SELECT * 
    FROM cal-itp-data-infra.mart_ntd.dim_annual_service_agencies
    WHERE state = 'CA' AND report_year = 2023
    """,
    as_df=True
)

# Use pandas `head()` method to show first 5 rows of data
agencies.head()

siuba equivalent (DEPRECATED)#

Note that siuba by default prints out a preview of the SQL query results. In order to fetch the results of the query as a pandas DataFrame, run collect().

from calitp_data_analysis.tables import tbls
from siuba import _, collect, filter

annnual_service_agencies = (
    tbls.mart_ntd.dim_annual_service_agencies()
    >> filter(_.state == "CA", _.report_year == 2023)
    >> collect()
)

select#

pandas/geopandas#

import geopandas as gpd

blocks = gpd.read_file('./tl_2020_06_tabblock20.zip')
blocks = blocks[['GEOID20', 'POP20', 'HOUSING20', 'geometry']]

siuba equivalent (DEPRECATED)#

import geopandas as gpd
from siuba import _, select


blocks = gpd.read_file('./tl_2020_06_tabblock20.zip')
blocks = blocks >> select(_.GEOID20, _.POP20, _.HOUSING20, _.geometry)

rename#

pandas/geopandas#

import geopandas as gpd

blocks = gpd.read_file('./tl_2020_06_tabblock20.zip')
blocks = blocks.rename(columns={'GEOID20': 'GEO_ID'})

siuba equivalent (DEPRECATED)#

import geopandas as gpd
from siuba import _, rename

blocks = gpd.read_file('./tl_2020_06_tabblock20.zip')
blocks = blocks >> rename(GEO_ID = _.GEOID20)

inner_join#

pandas/geopandas#

The below code will result in table having both GEO_ID and w_geocode columns with redundant data. To avoid this, you could either (a) first rename one of the columns to match the other and do a simpler merge using just the on parameter (no need then for left_on and right_on) or (b) do as show below and subsequently drop one of the redundant columns.

# df and df2 are both DataFrame
joined = df2.merge(df, left_on='GEO_ID', right_on='w_geocode')

siuba equivalent (DEPRECATED)#

# df and df2 are both DataFrame
joined = df2 >> inner_join(_, df, on={'GEO_ID':'w_geocode'})

Advanced Queries#

If you need to construct a more detailed query or are implementing a query builder that has multiple use cases, you should probably use SQLAlchemy ORM’s query building features.

Reusable base query#

Here’s an example adapted from shared_utils.schedule_rt_utils.filter_dim_gtfs_datasets

# Simplied from shared_utils.schedule_rt_utils.filter_dim_gtfs_datasets

def filter_dim_gtfs_datasets(
    keep_cols: list[str] = ["key", "name", "type", "regional_feed_type", "uri", "base64_url"],
    custom_filtering: dict = None,
    get_df: bool = True,
) -> Union[pd.DataFrame, sqlalchemy.sql.selectable.Select]:
    """
    Filter mart_transit_database.dim_gtfs_dataset table
    and keep only the valid rows that passed data quality checks.
    """
    dim_gtfs_dataset_columns = []

    for column in keep_cols:
        new_column = getattr(DimGtfsDataset, column)
        dim_gtfs_dataset_columns.append(new_column)

    search_conditions = [DimGtfsDataset.data_quality_pipeline == True]

    for k, v in (custom_filtering or {}).items():
        search_conditions.append(getattr(DimGtfsDataset, k).in_(v))

    statement = select(*dim_gtfs_dataset_columns).where(and_(*search_conditions))

    if get_df:
        with DBSession() as session:
            return pd.read_sql(statement, session.bind)
    else:
        return statement

Use of this function for query building

# Simplied from shared_utils.gtfs_utils_v2.schedule_daily_feed_to_gtfs_dataset_name

dim_gtfs_datasets = schedule_rt_utils.filter_dim_gtfs_datasets(
    keep_cols=["key", "name", "type", "regional_feed_type"],
    custom_filtering={"type": ["schedule"]},
    get_df=False, # return a SQLAlchemy statement so that you can continue to build the query
)

statement = (
    dim_gtfs_datasets.with_only_columns(
        DimGtfsDataset.regional_feed_type,
        DimGtfsDataset.type,
        FctDailyScheduleFeed.date,
        FctDailyScheduleFeed.feed_key,
        FctDailyScheduleFeed.gtfs_dataset_key,
        FctDailyScheduleFeed.gtfs_dataset_name,
    )
    .join(
        FctDailyScheduleFeed,
        and_(
            FctDailyScheduleFeed.gtfs_dataset_key == DimGtfsDataset.key,
            FctDailyScheduleFeed.gtfs_dataset_name == DimGtfsDataset.name,
        ),
    )
    .where(FctDailyScheduleFeed.date == selected_date)
)

with DBSession() as session:
    return pd.read_sql(statement, session.bind)

Useful functions for query building#

  • Select.add_columns - Add columns to existing Select clause

  • Select.with_only_columns - Replace columns in an existing Select clause

  • Column.label - Provide an alias for a column you’re selecting. Results in SQL like <columnname> AS <name>.

  • expression.func - Generate SQL function expressions. This is useful for when you need to use a BigQuery-specific expression that is not generically supported in SQLAlchemy. For example:

    from sqlalchemy import String, select, func
    
    ...
    
    select(
       # Produces column like {"caltrans_district": "07 - Los Angeles / Ventura"}  
      func.concat(
          func.lpad(cast(DimCountyGeography.caltrans_district, String), 2, "0"),
          " - ",
          DimCountyGeography.caltrans_district_name,
      ).label("caltrans_district"),
    ).where(...)
    

Add New Packages#

Most packages an analyst needs are already in the workspace baseline (installed by uv sync). If your project needs an additional package:

  1. Workspace-wide dep (useful to many projects): add it to the [project] dependencies in data-analyses/pyproject.toml, then run uv lock && uv sync.

  2. Project-specific dep: add a pyproject.toml in your project’s subdirectory (see portfolio/pyproject.toml for an example), add the project as a workspace member, and run uv lock && uv sync.

Commit the updated uv.lock alongside your changes so teammates get the same versions.

Updating calitp-data-analysis#

calitp-data-analysis now lives in the data-analyses repo as a uv workspace member. To update it:

  1. Edit the source code in data-analyses/calitp-data-analysis/calitp_data_analysis/.

  2. If you’re adding a new dependency, add it to calitp-data-analysis/pyproject.toml and run uv lock from the workspace root.

  3. Open a PR in the data-analyses repo. Your changes take effect for all workspace users after merge — no PyPI publish step needed.

The PyPI package is frozen at its last published version for external consumers (e.g. reports). It does not need to be updated for internal use.

Appendix: calitp-data-infra#

The calitp-data-infra package, used primarily by warehouse mainainers and data pipeline developers, includes utilities that analysts will likely not need need to interact with directly (and therefore generally won’t need to install), but which may be helpful to be aware of. For instance, the get_secret_by_name() and get_secrets_by_label() functions in the package’s auth module are used to interact with Google’s Secret Manager, the service that securely stores API keys and other sensitive information that underpins many of our data syncs.

You can read more about the calitp-data-infra Python package here.