Useful Python Libraries#
The following libraries are available and recommended for use by Cal-ITP data analysts. Dependencies are managed in the data-analyses repo using a uv workspace. Running uv sync from the data-analyses/ root installs everything into a local .venv, which the “Pyproject Local” Jupyter kernel uses automatically.
Table of Contents#
calitp-data-analysis#
calitp-data-analysis is an internal library of utility functions used to access our warehouse data for analysis purposes. It lives in data-analyses/calitp-data-analysis/ as a uv workspace member and is installed automatically by uv sync.
import tbls#
DEPRECATED: tbls will be removed after calitp_data_analysis version 2025.8.10. Instead of AutoTable or the tbls
instance, use query_sql() from calitp_data_analysis.sql to connect to and query a SQL database. See the query_sql()
section below.
Most notably, you can include import tbls at the top of your notebook to import a table from the warehouse in the form of a tbls:
from calitp_data_analysis.tables import tbls
Example:
from calitp_data_analysis.tables import tbls
tbls.mart_gtfs.dim_agency()
Query SQL#
query_sql#
query_sql() is useful for simple queries against BigQuery datasets. You can optionally pass as_df=True
to turn a SQL query result into a pandas DataFrame. If you need to construct a more detailed query or are implementing
a query builder that has multiple usecases, you should probably use SQLAlchemy query building instead.
As an example, in a notebook:
from calitp_data_analysis.sql import query_sql
df_dim_agency = query_sql("""
SELECT
*
FROM cal-itp-data-infra-staging.mart_gtfs.dim_agency
LIMIT 10""", as_df=True)
df_dim_agency.head()
| key | _gtfs_key | feed_key | agency_id | agency_name | agency_url | agency_timezone | agency_lang | agency_phone | agency_fare_url | agency_email | base64_url | _dt | _feed_valid_from | _line_number | feed_timezone | |
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| 0 | 0a6b3005f15b55c36d7410de0bec017c | 030477c8802e075a4d8aec4dca06115e | 02e47543885ff7ca07fb0e14a848db3b | 2 | County Connection | http://www.countyconnection.com/ | America/Los_Angeles | EN | 925-676-7500 | None | None | aHR0cDovL2NjY3RhLm9yZy9HVEZTL2dvb2dsZV90cmFuc2... | 2026-01-06 | 2026-01-06 00:00:00+00:00 | 1 | America/Los_Angeles |
| 1 | 1af2a886017b908bb77093275dcc0b24 | e7d07d5ad541db59f0f827ab94d86c95 | 5e4da850e3c56f72ab0d2867dde97200 | 2 | County Connection | http://www.countyconnection.com/ | America/Los_Angeles | EN | 925-676-7500 | None | None | aHR0cDovL2NjY3RhLm9yZy9HVEZTL2dvb2dsZV90cmFuc2... | 2026-01-09 | 2026-01-09 00:00:00+00:00 | 1 | America/Los_Angeles |
| 2 | bc03fbd73c743f96bf7bf7bcbe5ee4b7 | bc03fbd73c743f96bf7bf7bcbe5ee4b7 | 2c611838944999887239bbe5af76b8ee | 1 | SLO Transit | https://www.slocity.org/government/department-... | America/Los_Angeles | en | (805) 541-2877 | None | slotransit@slocity.org | aHR0cDovL2RhdGEucGVha3RyYW5zaXQuY29tL3N0YXRpY2... | 2026-01-09 | 2026-01-09 00:00:00+00:00 | 1 | America/Los_Angeles |
| 3 | b63515e5a4da9aeacfd36b85cc4ca42c | b63515e5a4da9aeacfd36b85cc4ca42c | ab159738f2929e515b7c4ac647238148 | 1 | Golden Empire Transit District | http://www.getbus.org/ | America/Los_Angeles | en | (661) 869-2438 | None | None | aHR0cDovL2V0YS5nZXRidXMub3JnL3J0dC9wdWJsaWMvdX... | 2026-02-12 | 2026-02-12 03:00:00.971922+00:00 | 1 | America/Los_Angeles |
| 4 | 9309c476bcf544d526a4ae568e17852b | 9309c476bcf544d526a4ae568e17852b | 433540dc67834e8453ebe6bca5530a44 | 1 | Golden Empire Transit District | http://www.getbus.org/ | America/Los_Angeles | en | (661) 869-2438 | None | None | aHR0cDovL2V0YS5nZXRidXMub3JnL3J0dC9wdWJsaWMvdX... | 2026-01-19 | 2026-01-19 03:00:00+00:00 | 1 | America/Los_Angeles |
Accessing Google Cloud Storage Data#
GCSPandas#
The GCSPandas class fetches the Google Cloud Storage (GCS) filesystem and surfaces functions to provide analysts a clear and consistent way of accessing data on GCS.
It’s recommended to memoize initialization of the class so that the GCS filesystem is fetched and cached the first time you call it and subsequent calls can reuse that cached filesystem.
from functools import cache
from calitp_data_analysis.gcs_pandas import GCSPandas
@cache
def gcs_pandas():
return GCSPandas()
read_parquet#
Delegates to pandas.read_parquet, providing GCS Filesystem
gcs_pandas().read_parquet("gs://path/to/your/file.parquet")
read_csv#
Delegates to pandas.read_csv with the file at the path specified in the GCS filesystem
gcs_pandas().read_csv("gs://path/to/your/file.csv")
read_excel#
Delegates to pandas.read_excel with the file at the path specified in the GCS filesystem
gcs_pandas().read_excel("gs://path/to/your/file.xlsx")
data_frame_to_parquet#
Delegates to DataFrame.to_parquet, providing the GCS filesystem
import pandas as pd
data_frame = pd.DataFrame({'col1': ['name1', 'name2']})
gcs_pandas().data_frame_to_parquet(data_frame, "gs://path/to/your/file.parquet")
GCSGeoPandas#
The GCSGeoPandas class fetches the Google Cloud Storage (GCS) filesystem and surfaces functions to provide analysts a clear and consistent way of accessing geospatial resources.
It’s recommended to memoize initialization of the class so that the GCS filesystem is fetched and cached the first time you call it and subsequent calls can reuse that cached filesystem.
from functools import cache
from calitp_data_analysis.gcs_geopandas import GCSGeoPandas
@cache
def gcs_geopandas():
return GCSGeoPandas()
read_parquet#
Delegates to geopandas.read_parquet, providing GCS Filesystem
gcs_geopandas().read_parquet("gs://path/to/your/file.parquet")
read_file#
Delegates to geopandas.read_file with the file at the path specified in the GCS filesystem
gcs_geopandas().read_file("gs://path/to/your/file.geojson")
geo_data_frame_to_parquet#
Delegates to GeoDataFrame.to_parquet, providing the GCS filesystem
import geopandas as gpd
data = {'col1': ['name1', 'name2'], 'geometry': [...]}
geo_data_frame = gpd.GeoDataFrame(data, crs="EPSG:4326")
gcs_geopandas().geo_data_frame_to_parquet(geo_data_frame, "gs://path/to/your/file.parquet")
Querying Data in BigQuery#
SQLAlchemy#
For more detailed queries or where a query builder pattern is needed for multiple use cases of the same base query, you can use SQLAlchemy’s ORM functionality. One of the biggest benefits of using an ORM like SQLAlchemy is its security features that mitigate risks common to writing raw SQL. Accidentally introducing SQL injection vulnerabilities is one of the most common issues that ORMs help prevent. Other benefits include more manageable and testable code and that ORM models provide an easy-to-reference resource collocated with your code.
Steps to querying with the ORM
Declare a model. For each of the tables you need to query, you will need to define a model if there isn’t one already.
Construct a query. Full documentation on query construction here.
Establish a DB connection and send the query.
from calitp_data_analysis.sql import get_engine from sqlalchemy.orm import sessionmaker db_engine = get_engine(project="cal-itp-data-infra-staging") DBSession = sessionmaker(db_engine) ... statement = select(YourModel).where(YourModel.service_date == "2025-12-01") with DBSession() as session: return pd.read_sql(statement, session.bind)
See SQLAlchemy Session Basics for more info.
pandas#
The library pandas is very commonly used in data analysis. See this Pandas Cheat Sheet for more inspiration.
siuba (DEPRECATED)#
DEPRECATED: siuba will be removed from calitp_data_analysis after version 2025.8.10 and other shared code within the
next few months. Use SQLAlchemy and Pandas or GeoPandas functions directly instead. siuba uses these under the hood.
siuba is a tool that allows the same analysis code to run on a pandas DataFrame,
as well as generate SQL for different databases.
It supports most pandas Series methods analysts use. See the siuba docs for more information.
The examples below go through the basics of using siuba, collecting a database query to a local DataFrame, and showing SQL test queries that siuba code generates.
Basic Queries#
query_sql#
from calitp_data_analysis.sql import query_sql
# query agency information, then filter for a single gtfs feed,
# and then count how often each feed key occurs
query_sql(
"""
SELECT feed_key, COUNT(*) AS n
FROM mart_gtfs.dim_agency
WHERE agency_id = 'BA' AND base64_url = 'aHR0cHM6Ly9hcGkuNTExLm9yZy90cmFuc2l0L2RhdGFmZWVkcz9vcGVyYXRvcl9pZD1SRw=='
GROUP BY feed_key
ORDER BY n DESC
LIMIT 5
"""
)
siuba equivalent (DEPRECATED)#
DEPRECATED: We are removing siuba from the codebase. This is only here for illustration. Please do not introduce new siuba code.
from calitp_data_analysis.tables import tbls
from siuba import _, filter, count, collect, show_query
# query agency information, then filter for a single gtfs feed,
# and then count how often each feed key occurs
(tbls.mart_gtfs.dim_agency()
>> filter(_.agency_id == 'BA', _.base64_url == 'aHR0cHM6Ly9hcGkuNTExLm9yZy90cmFuc2l0L2RhdGFmZWVkcz9vcGVyYXRvcl9pZD1SRw==')
>> count(_.feed_key)
)
filter & collect#
query_sql#
Use SQL with SELECT and WHERE clauses for filtering
from calitp_data_analysis.sql import query_sql
agencies = query_sql(
"""
SELECT *
FROM cal-itp-data-infra.mart_ntd.dim_annual_service_agencies
WHERE state = 'CA' AND report_year = 2023
""",
as_df=True
)
# Use pandas `head()` method to show first 5 rows of data
agencies.head()
siuba equivalent (DEPRECATED)#
Note that siuba by default prints out a preview of the SQL query results.
In order to fetch the results of the query as a pandas DataFrame, run collect().
from calitp_data_analysis.tables import tbls
from siuba import _, collect, filter
annnual_service_agencies = (
tbls.mart_ntd.dim_annual_service_agencies()
>> filter(_.state == "CA", _.report_year == 2023)
>> collect()
)
select#
pandas/geopandas#
import geopandas as gpd
blocks = gpd.read_file('./tl_2020_06_tabblock20.zip')
blocks = blocks[['GEOID20', 'POP20', 'HOUSING20', 'geometry']]
siuba equivalent (DEPRECATED)#
import geopandas as gpd
from siuba import _, select
blocks = gpd.read_file('./tl_2020_06_tabblock20.zip')
blocks = blocks >> select(_.GEOID20, _.POP20, _.HOUSING20, _.geometry)
rename#
pandas/geopandas#
import geopandas as gpd
blocks = gpd.read_file('./tl_2020_06_tabblock20.zip')
blocks = blocks.rename(columns={'GEOID20': 'GEO_ID'})
siuba equivalent (DEPRECATED)#
import geopandas as gpd
from siuba import _, rename
blocks = gpd.read_file('./tl_2020_06_tabblock20.zip')
blocks = blocks >> rename(GEO_ID = _.GEOID20)
inner_join#
pandas/geopandas#
The below code will result in table having both GEO_ID and w_geocode columns with redundant data.
To avoid this, you could either (a) first rename one of the columns to match the other and
do a simpler merge using just the on parameter (no need then for left_on and
right_on) or (b) do as show below and subsequently drop one of the redundant columns.
# df and df2 are both DataFrame
joined = df2.merge(df, left_on='GEO_ID', right_on='w_geocode')
siuba equivalent (DEPRECATED)#
# df and df2 are both DataFrame
joined = df2 >> inner_join(_, df, on={'GEO_ID':'w_geocode'})
Advanced Queries#
If you need to construct a more detailed query or are implementing a query builder that has multiple use cases, you should probably use SQLAlchemy ORM’s query building features.
Reusable base query#
Here’s an example adapted from shared_utils.schedule_rt_utils.filter_dim_gtfs_datasets
# Simplied from shared_utils.schedule_rt_utils.filter_dim_gtfs_datasets
def filter_dim_gtfs_datasets(
keep_cols: list[str] = ["key", "name", "type", "regional_feed_type", "uri", "base64_url"],
custom_filtering: dict = None,
get_df: bool = True,
) -> Union[pd.DataFrame, sqlalchemy.sql.selectable.Select]:
"""
Filter mart_transit_database.dim_gtfs_dataset table
and keep only the valid rows that passed data quality checks.
"""
dim_gtfs_dataset_columns = []
for column in keep_cols:
new_column = getattr(DimGtfsDataset, column)
dim_gtfs_dataset_columns.append(new_column)
search_conditions = [DimGtfsDataset.data_quality_pipeline == True]
for k, v in (custom_filtering or {}).items():
search_conditions.append(getattr(DimGtfsDataset, k).in_(v))
statement = select(*dim_gtfs_dataset_columns).where(and_(*search_conditions))
if get_df:
with DBSession() as session:
return pd.read_sql(statement, session.bind)
else:
return statement
Use of this function for query building
# Simplied from shared_utils.gtfs_utils_v2.schedule_daily_feed_to_gtfs_dataset_name
dim_gtfs_datasets = schedule_rt_utils.filter_dim_gtfs_datasets(
keep_cols=["key", "name", "type", "regional_feed_type"],
custom_filtering={"type": ["schedule"]},
get_df=False, # return a SQLAlchemy statement so that you can continue to build the query
)
statement = (
dim_gtfs_datasets.with_only_columns(
DimGtfsDataset.regional_feed_type,
DimGtfsDataset.type,
FctDailyScheduleFeed.date,
FctDailyScheduleFeed.feed_key,
FctDailyScheduleFeed.gtfs_dataset_key,
FctDailyScheduleFeed.gtfs_dataset_name,
)
.join(
FctDailyScheduleFeed,
and_(
FctDailyScheduleFeed.gtfs_dataset_key == DimGtfsDataset.key,
FctDailyScheduleFeed.gtfs_dataset_name == DimGtfsDataset.name,
),
)
.where(FctDailyScheduleFeed.date == selected_date)
)
with DBSession() as session:
return pd.read_sql(statement, session.bind)
Useful functions for query building#
Select.add_columns- Add columns to existing Select clauseSelect.with_only_columns- Replace columns in an existing Select clauseColumn.label- Provide an alias for a column you’re selecting. Results in SQL like<columnname> AS <name>.expression.func- Generate SQL function expressions. This is useful for when you need to use a BigQuery-specific expression that is not generically supported in SQLAlchemy. For example:from sqlalchemy import String, select, func ... select( # Produces column like {"caltrans_district": "07 - Los Angeles / Ventura"} func.concat( func.lpad(cast(DimCountyGeography.caltrans_district, String), 2, "0"), " - ", DimCountyGeography.caltrans_district_name, ).label("caltrans_district"), ).where(...)
Add New Packages#
Most packages an analyst needs are already in the workspace baseline (installed by uv sync). If your project needs an additional package:
Workspace-wide dep (useful to many projects): add it to the
[project] dependenciesindata-analyses/pyproject.toml, then runuv lock && uv sync.Project-specific dep: add a
pyproject.tomlin your project’s subdirectory (seeportfolio/pyproject.tomlfor an example), add the project as a workspace member, and runuv lock && uv sync.
Commit the updated uv.lock alongside your changes so teammates get the same versions.
Updating calitp-data-analysis#
calitp-data-analysis now lives in the data-analyses repo as a uv workspace member. To update it:
Edit the source code in
data-analyses/calitp-data-analysis/calitp_data_analysis/.If you’re adding a new dependency, add it to
calitp-data-analysis/pyproject.tomland runuv lockfrom the workspace root.Open a PR in the
data-analysesrepo. Your changes take effect for all workspace users after merge — no PyPI publish step needed.
The PyPI package is frozen at its last published version for external consumers (e.g. reports). It does not need to be updated for internal use.
Appendix: calitp-data-infra#
The calitp-data-infra package, used primarily by warehouse mainainers and data pipeline developers, includes utilities that analysts will likely not need need to interact with directly (and therefore generally won’t need to install), but which may be helpful to be aware of. For instance, the get_secret_by_name() and get_secrets_by_label() functions in the package’s auth module are used to interact with Google’s Secret Manager, the service that securely stores API keys and other sensitive information that underpins many of our data syncs.
You can read more about the calitp-data-infra Python package here.