Orchestrating Data Pipelines: Dagster Assets as DBT Model Sources
Modern data pipelines often involve complex workflows that span data extraction, transformation, and loading (ETL). Tools like Dagster and dbt excel in different stages of this process, making them powerful allies in building robust data infrastructure. This article delves into a seamless integration between these tools, demonstrating how to reference Dagster assets as sources for your dbt models.
The Scenario: Connecting Dagster Outputs to DBT Models
Imagine you're building a data pipeline for a retail company. You use Dagster to ingest sales data from multiple sources, clean it, and aggregate it into a structured format. Now, you want to leverage this processed data within dbt to build business intelligence dashboards and perform further analysis.
Here's a simplified example of a Dagster pipeline that extracts sales data and aggregates it by product category:
from dagster import pipeline, solid, AssetMaterialization
@solid
def extract_sales_data():
# Code to extract sales data from multiple sources
return sales_data
@solid
def clean_sales_data(sales_data):
# Code to clean and transform the sales data
return cleaned_sales_data
@solid
def aggregate_by_category(cleaned_sales_data):
# Code to aggregate sales data by product category
return aggregated_sales_data
@pipeline
def sales_pipeline():
cleaned_sales_data = clean_sales_data(extract_sales_data())
aggregated_sales_data = aggregate_by_category(cleaned_sales_data)
# Mark the aggregated data as an asset
AssetMaterialization.for_solid(aggregated_sales_data, "aggregated_sales_data")
This Dagster pipeline defines three solids: extract_sales_data
, clean_sales_data
, and aggregate_by_category
. The final solid, aggregate_by_category
, produces a DataFrame
representing aggregated sales data by category. This output is marked as an asset, aggregated_sales_data
, using AssetMaterialization
.
Now, let's create a dbt model that utilizes this asset:
{{ config(materialized='view') }}
SELECT
product_category,
SUM(sales_amount) AS total_sales
FROM {{ ref('aggregated_sales_data') }}
GROUP BY product_category
ORDER BY total_sales DESC
This dbt model utilizes the ref
function to reference the aggregated_sales_data
asset from the Dagster pipeline. Dbt will seamlessly fetch this data and use it to build the view.
Why This Approach is Beneficial:
- Decoupling: Dagster focuses on data processing, while dbt excels in modeling and analysis. This integration promotes a clear separation of concerns.
- Data Lineage: By referencing Dagster assets, your dbt models inherit valuable data lineage information. This helps track the origins of your data and ensures traceability.
- Reusability: Dagster assets can be reused across multiple dbt models, streamlining your data workflows and eliminating redundant code.
Setting up the Integration:
- Dagster Configuration: Configure your Dagster repository to expose the
aggregated_sales_data
asset to dbt. This can be achieved using theasset_selection
feature in your Dagster repository configuration. - dbt Project Setup: Ensure your dbt project references the Dagster repository correctly. This might involve using environment variables to specify the Dagster repository location.
- dbt Model Definition: Use the
ref
function in your dbt models to access the Dagster asset.
Additional Insights:
- Dynamic Asset Selection: Dbt's
ref
function can be used to dynamically select different Dagster assets based on conditions. For instance, you could create different dbt models that utilize different time periods of the aggregated sales data. - Data Versioning: Consider using Dagster's versioning capabilities to ensure your dbt models are using the correct versions of the processed data.
- Testing and Observability: Leverage Dagster's testing framework and observability tools to ensure the data pipeline is robust and the dbt models are correctly consuming the data.
By leveraging the strengths of both Dagster and dbt, you can create efficient, scalable, and well-documented data pipelines. This integration empowers you to build data products and insights with greater confidence and agility.
Resources:
- Dagster Documentation: https://docs.dagster.io/
- dbt Documentation: https://docs.getdbt.com/
- Dagster and DBT Integration: https://docs.dagster.io/integrations/dbt