Dagster+dbt : referring a dagster asset as a source in dbt model

3 min read 05-10-2024
Dagster+dbt : referring a dagster asset as a source in dbt model


Orchestrating Data Pipelines: Dagster Assets as DBT Model Sources

Modern data pipelines often involve complex workflows that span data extraction, transformation, and loading (ETL). Tools like Dagster and dbt excel in different stages of this process, making them powerful allies in building robust data infrastructure. This article delves into a seamless integration between these tools, demonstrating how to reference Dagster assets as sources for your dbt models.

The Scenario: Connecting Dagster Outputs to DBT Models

Imagine you're building a data pipeline for a retail company. You use Dagster to ingest sales data from multiple sources, clean it, and aggregate it into a structured format. Now, you want to leverage this processed data within dbt to build business intelligence dashboards and perform further analysis.

Here's a simplified example of a Dagster pipeline that extracts sales data and aggregates it by product category:

from dagster import pipeline, solid, AssetMaterialization

@solid
def extract_sales_data():
    # Code to extract sales data from multiple sources
    return sales_data

@solid
def clean_sales_data(sales_data):
    # Code to clean and transform the sales data
    return cleaned_sales_data

@solid
def aggregate_by_category(cleaned_sales_data):
    # Code to aggregate sales data by product category
    return aggregated_sales_data

@pipeline
def sales_pipeline():
    cleaned_sales_data = clean_sales_data(extract_sales_data())
    aggregated_sales_data = aggregate_by_category(cleaned_sales_data)
    
    # Mark the aggregated data as an asset
    AssetMaterialization.for_solid(aggregated_sales_data, "aggregated_sales_data")

This Dagster pipeline defines three solids: extract_sales_data, clean_sales_data, and aggregate_by_category. The final solid, aggregate_by_category, produces a DataFrame representing aggregated sales data by category. This output is marked as an asset, aggregated_sales_data, using AssetMaterialization.

Now, let's create a dbt model that utilizes this asset:

{{ config(materialized='view') }}

SELECT
    product_category,
    SUM(sales_amount) AS total_sales
FROM {{ ref('aggregated_sales_data') }}
GROUP BY product_category
ORDER BY total_sales DESC

This dbt model utilizes the ref function to reference the aggregated_sales_data asset from the Dagster pipeline. Dbt will seamlessly fetch this data and use it to build the view.

Why This Approach is Beneficial:

  1. Decoupling: Dagster focuses on data processing, while dbt excels in modeling and analysis. This integration promotes a clear separation of concerns.
  2. Data Lineage: By referencing Dagster assets, your dbt models inherit valuable data lineage information. This helps track the origins of your data and ensures traceability.
  3. Reusability: Dagster assets can be reused across multiple dbt models, streamlining your data workflows and eliminating redundant code.

Setting up the Integration:

  1. Dagster Configuration: Configure your Dagster repository to expose the aggregated_sales_data asset to dbt. This can be achieved using the asset_selection feature in your Dagster repository configuration.
  2. dbt Project Setup: Ensure your dbt project references the Dagster repository correctly. This might involve using environment variables to specify the Dagster repository location.
  3. dbt Model Definition: Use the ref function in your dbt models to access the Dagster asset.

Additional Insights:

  • Dynamic Asset Selection: Dbt's ref function can be used to dynamically select different Dagster assets based on conditions. For instance, you could create different dbt models that utilize different time periods of the aggregated sales data.
  • Data Versioning: Consider using Dagster's versioning capabilities to ensure your dbt models are using the correct versions of the processed data.
  • Testing and Observability: Leverage Dagster's testing framework and observability tools to ensure the data pipeline is robust and the dbt models are correctly consuming the data.

By leveraging the strengths of both Dagster and dbt, you can create efficient, scalable, and well-documented data pipelines. This integration empowers you to build data products and insights with greater confidence and agility.

Resources: