Practice: Refactoring assets to use resources
The following assets use the DuckDB database:
- taxi_zones
- manhattan_stats
- trips_by_week
Update these assets to use the DuckDBResource.
Check your work
The updated assets should look similar to the following code snippets. Click View answer to view them.
We’ll assume your code looks like the following for the rest of the module. If your solution and the code provided have differences, update your code to match the answers and re-materialize these assets to prepare for the next part of this lesson.
In assets/trips.py:
# assets/trips.py
@asset(
    deps=["taxi_zones_file"],
)
def taxi_zones(database: DuckDBResource) -> None:
    """
      The raw taxi zones dataset, loaded into a DuckDB database.
    """
    query = f"""
      create or replace table zones as (
        select
          LocationID as zone_id,
          zone,
          borough,
          the_geom as geometry
        from '{constants.TAXI_ZONES_FILE_PATH}'
      );
    """
    with database.get_connection() as conn:
        conn.execute(query)
In assets/metrics.py:
Update the imports in assets/metrics.py to the following:
from datetime import datetime, timedelta
from dagster import asset
from dagster_duckdb import DuckDBResource
import geopandas as gpd
import pandas as pd
from . import constants
Update the manhattan_stats asset:
# assets/metrics.py
@asset(
    deps=["taxi_trips", "taxi_zones"]
)
def manhattan_stats(database: DuckDBResource) -> None:
    """
      Metrics on taxi trips in Manhattan
    """
    query = """
      select
        zones.zone,
        zones.borough,
        zones.geometry,
        count(1) as num_trips,
      from trips
      left join zones on trips.pickup_zone_id = zones.zone_id
      where geometry is not null
      group by zone, borough, geometry
    """
    with database.get_connection() as conn:
        trips_by_zone = conn.execute(query).fetch_df()
    trips_by_zone["geometry"] = gpd.GeoSeries.from_wkt(trips_by_zone["geometry"])
    trips_by_zone = gpd.GeoDataFrame(trips_by_zone)
    with open(constants.MANHATTAN_STATS_FILE_PATH, 'w') as output_file:
        output_file.write(trips_by_zone.to_json())
Update the trips_by_week asset:
# assets/metrics.py
@asset(
    deps = ["taxi_trips"]
)
def trips_by_week(database: DuckDBResource) -> None:
    current_date = datetime.strptime("2023-01-01", constants.DATE_FORMAT)
    end_date = datetime.now()
    result = pd.DataFrame()
    while current_date < end_date:
        current_date_str = current_date.strftime(constants.DATE_FORMAT)
        query = f"""
          select
            vendor_id, total_amount, trip_distance, passenger_count
          from trips
          where pickup_datetime >= '{current_date_str}' and pickup_datetime < '{current_date_str}'::date + interval '1 week'
        """
        with database.get_connection() as conn:
            data_for_week = conn.execute(query).fetch_df()
        aggregate = data_for_week.agg({
            "vendor_id": "count",
            "total_amount": "sum",
            "trip_distance": "sum",
            "passenger_count": "sum"
        }).rename({"vendor_id": "num_trips"}).to_frame().T # type: ignore
        aggregate["period"] = current_date
        result = pd.concat([result, aggregate])
        current_date += timedelta(days=7)
    # clean up the formatting of the dataframe
    result['num_trips'] = result['num_trips'].astype(int)
    result['passenger_count'] = result['passenger_count'].astype(int)
    result['total_amount'] = result['total_amount'].round(2).astype(float)
    result['trip_distance'] = result['trip_distance'].round(2).astype(float)
    result = result[["period", "num_trips", "total_amount", "trip_distance", "passenger_count"]]
    result = result.sort_values(by="period")
    result.to_csv(constants.TRIPS_BY_WEEK_FILE_PATH, index=False)