Loading dbt models into Dagster as assets
Now is the moment that we’ve been building up to since the beginning of this module. Let’s see your dbt models in a Dagster asset graph!
Turning dbt models into assets with @dbt_assets
The star of the show here is the @dbt_assets
decorator. This is a specialized asset decorator that wraps around a dbt project to tell Dagster what dbt models exist. In the body of the @dbt_assets
definition, you write exactly how you want Dagster to run your dbt models.
Many Dagster projects may only need one @dbt_assets
-decorated function that manages the entire dbt project. However, you may need to create multiple definitions for various reasons, such as:
- You have multiple dbt projects
- You want to exclude certain dbt models
- You want to only execute
dbt run
and notdbt build
on specific models - You want to customize what happens after certain models finish, such as sending a notification
- You need to configure some sets of models differently
We’ll only create one @dbt_assets
definition for now, but in a later lesson, we’ll encounter a use case for needing another @dbt_assets
definition.
Loading the models as assets
Create a new file in the
assets
directory calleddbt.py
.Add the following imports to the top of the file:
from dagster import AssetExecutionContext from dagster_dbt import dbt_assets, DbtCliResource from ..project import dbt_project
Next, we'll use the
@dbt_assets
decorator to create a new asset function and provide it with a reference to the project's manifest file:@dbt_assets( manifest=dbt_project.manifest_path, ) def dbt_analytics(context: AssetExecutionContext, dbt: DbtCliResource):
Here, we used
dbt_project.manifest_path
to provide the reference to the project's manifest file. This is possible because thedbt_project
representation we created earlier contains the manifest path, accessible by using themanifest_path
attribute.Finally, add the following to the body of
dbt_analytics
function:yield from dbt.cli(["run"], context=context).stream()
Notice we provided two arguments here. The first argument is the
context
, which indicates which dbt models to run and any related configurations. The second refers to the dbt resource you’ll be using to run dbt.Let’s review what’s happening in this line in a bit more detail:
- We use the
dbt
argument (which is aDbtCliResource
) to execute a dbt command through its.cli
method. - The
.stream()
method fetches the events and results of this dbt execution.- This is one of multiple ways to get the Dagster events, such as what models materialized or tests passed. We recommend starting with this and exploring other methods in the future as your use cases grow (such as fetching the run artifacts after a run). In this case, the above line will execute
dbt run
.
- This is one of multiple ways to get the Dagster events, such as what models materialized or tests passed. We recommend starting with this and exploring other methods in the future as your use cases grow (such as fetching the run artifacts after a run). In this case, the above line will execute
- The results of the
stream
are a Python generator of what Dagster events happened. We usedyield from
(not justyield
!) to have Dagster track asset materializations.
- We use the
At this point, dbt.py
should look like this:
from dagster import AssetExecutionContext
from dagster_dbt import DbtCliResource, dbt_assets
from ..project import dbt_project
@dbt_assets(
manifest=dbt_project.manifest_path,
)
def dbt_analytics(context: AssetExecutionContext, dbt: DbtCliResource):
yield from dbt.cli(["run"], context=context).stream()