Adding partitions to assets
In this section, you’ll update the assets in assets/trips.py
to use the partitions.
Starting with taxi_trips_file
, the asset code should currently look like this:
@asset
def taxi_trips_file() -> None:
"""
The raw parquet files for the taxi trips dataset. Sourced from the NYC Open Data portal.
"""
month_to_fetch = '2023-03'
raw_trips = requests.get(
f"https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_{month_to_fetch}.parquet"
)
with open(constants.TAXI_TRIPS_TEMPLATE_FILE_PATH.format(month_to_fetch), "wb") as output_file:
output_file.write(raw_trips.content)
In this asset, month_to_fetch
is set to 2023-03
to retrieve trip data for March 2023. Using the monthly_partition
, you can update the asset to use the partition to retrieve three months of data.
To add the partition to the asset:
Import the
monthly_partition
from thepartitions
folder by adding the following to the top ofassets/trips.py
:from ..partitions import monthly_partition
In the asset decorator (
@asset
), add apartitions_def
parameter equal tomonthly_partition
:@asset( partitions_def=monthly_partition )
In Dagster, the
context
argument provides you with metadata about the current materialization. To access it, include it as the first argument in the asset definition function. You can enable type hinting for this by importingAssetExecutionContext
fromdagster
and adding it to the function signature. For example, the updated asset definition should look like this:from dagster import asset, AssetExecutionContext @asset( partitions_def=monthly_partition ) def taxi_trips_file(context: AssetExecutionContext) -> None:
Note: The
context
argument isn’t specific to partitions. However, this is the first time you've used it in Dagster University. Thecontext
argument provides information about how Dagster is running and materializing your asset. For example, you can use it to find out which partition Dagster is materializing, which job triggered the materialization, or what metadata was attached to its previous materializations.In the original asset code, the logic was hard-coded to specifically fetch data for March 2023 (
'2023-03'
). Use thecontext
argument’spartition_key
property to dynamically fetch a specific partition’s month of data:@asset( partitions_def=monthly_partition ) def taxi_trips_file(context) -> None: partition_date_str = context.partition_key
In the NYC OpenData source system, the taxi trip files are structured in a
YYYY-MM
format. However,context.partition_key
supplies the materializing partition’s date as a string in theYYYY-MM-DD
format. Slice the string to make it match the format expected by our source system and replace our existing declaration of themonth_to_fetch
variable:@asset( partitions_def=monthly_partition ) def taxi_trips_file(context) -> None: partition_date_str = context.partition_key month_to_fetch = partition_date_str[:-3]
After following the steps above, the taxi_trips_file
asset should look similar to the code snippet below:
from ..partitions import monthly_partition
@asset(
partitions_def=monthly_partition
)
def taxi_trips_file(context) -> None:
"""
The raw parquet files for the taxi trips dataset. Sourced from the NYC Open Data portal.
"""
partition_date_str = context.partition_key
month_to_fetch = partition_date_str[:-3]
raw_trips = requests.get(
f"https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_{month_to_fetch}.parquet"
)
with open(constants.TAXI_TRIPS_TEMPLATE_FILE_PATH.format(month_to_fetch), "wb") as output_file:
output_file.write(raw_trips.content)