---
sidebar_position: 2
title: Dagster
description: Orchestrate an incremental S3-to-MotherDuck data loading pipeline with Dagster and Python.
---

Use Dagster when you want asset lineage, schedules, retries, and run history around a Python data loading job. This guide builds a minimum viable Dagster asset that reads Parquet data from S3, loads rows newer than the last successful run, upserts them into MotherDuck, and stores a watermark for the next run.

The example uses a public S3 Parquet file from the MotherDuck sample data bucket. Replace the S3 path and column mapping with your own bucket layout when you move from the demo to your pipeline.

## How the pipeline works

```mermaid
graph LR
    S3[("S3 Parquet file")]:::yellow
    A["Dagster asset<br/>taxi_trips"]:::watermelon
    W[("ingestion_watermarks")]:::yellow
    T[("taxi_trips")]:::yellow

    W --> A
    S3 --> A
    A --> T
    A --> W
```

The asset keeps the state in MotherDuck:

- `taxi_trips` is the target table.
- `ingestion_watermarks` stores the latest `pickup_at` value loaded by this pipeline.
- Each run reads only rows where `tpep_pickup_datetime` is greater than the stored watermark.
- The target table has a primary key, so reprocessing the same row updates the existing row instead of creating a duplicate.

## Prerequisites

Before you start, ensure you have:

- Python 3.10 or later.
- `uv` for Python project and dependency management.
- A MotherDuck access token in `MOTHERDUCK_TOKEN`.
- A MotherDuck database name for the pipeline. The example creates the database if it doesn't exist.
- For private S3 buckets, a MotherDuck S3 secret. See [Amazon S3 credentials](/integrations/cloud-storage/amazon-s3/) for setup.

:::tip
Use a dedicated MotherDuck service account for scheduled ingestion jobs. This keeps ingestion compute, permissions, and cost attribution separate from analyst and application workloads. See [Hypertenancy](/concepts/hypertenancy/) for the compute isolation model.
:::

## Create the Dagster project

Create a small Python project and add Dagster with DuckDB:

```bash
> uv init dagster-motherduck-s3
> cd dagster-motherduck-s3
> uv add dagster dagster-webserver duckdb
```

Create `definitions.py`:

```python
import os
import re

import dagster as dg
import duckdb

S3_URI = os.getenv(
    "S3_URI",
    "s3://us-prd-motherduck-open-datasets/nyc_taxi/parquet/yellow_cab_nyc_2022_11.parquet",
)
MOTHERDUCK_DATABASE = os.getenv("MOTHERDUCK_DATABASE", "dagster_s3_demo")
PIPELINE_NAME = "dagster_s3_taxi_trips"

# Optional cap for running the demo quickly. Leave unset for a real pipeline.
INGESTION_END_TS = os.getenv("MOTHERDUCK_INGESTION_END_TS")

PUBLIC_DEMO_SCOPE = "s3://us-prd-motherduck-open-datasets/"


def database_identifier(name: str) -> str:
    if not re.fullmatch(r"[A-Za-z_][A-Za-z0-9_]*", name):
        raise ValueError("Use a database name with letters, numbers, and underscores.")
    return name


def open_motherduck_connection() -> duckdb.DuckDBPyConnection:
    database = database_identifier(MOTHERDUCK_DATABASE)
    con = duckdb.connect("md:")
    con.execute(f"CREATE DATABASE IF NOT EXISTS {database}")
    con.execute(f"USE {database}")

    if S3_URI.startswith(PUBLIC_DEMO_SCOPE):
        con.execute("""
            CREATE OR REPLACE TEMPORARY SECRET public_motherduck_open_data (
                TYPE S3,
                PROVIDER config,
                REGION 'us-east-1',
                SCOPE 's3://us-prd-motherduck-open-datasets/'
            )
        """)

    return con


@dg.asset
def taxi_trips(context: dg.AssetExecutionContext) -> dg.MaterializeResult:
    con = open_motherduck_connection()
    try:
        con.execute("""
            CREATE TABLE IF NOT EXISTS taxi_trips (
                trip_id VARCHAR PRIMARY KEY,
                pickup_at TIMESTAMP,
                dropoff_at TIMESTAMP,
                passenger_count DOUBLE,
                trip_distance DOUBLE,
                total_amount DOUBLE,
                source_file VARCHAR,
                loaded_at TIMESTAMP DEFAULT now()
            )
        """)
        con.execute("""
            CREATE TABLE IF NOT EXISTS ingestion_watermarks (
                pipeline_name VARCHAR PRIMARY KEY,
                last_pickup_at TIMESTAMP
            )
        """)
        con.execute("""
            INSERT INTO ingestion_watermarks
            VALUES (?, TIMESTAMP '1970-01-01')
            ON CONFLICT (pipeline_name) DO NOTHING
        """, [PIPELINE_NAME])

        last_pickup_at = con.execute(
            "SELECT last_pickup_at FROM ingestion_watermarks WHERE pipeline_name = ?",
            [PIPELINE_NAME],
        ).fetchone()[0]

        con.execute("""
            CREATE OR REPLACE TEMP TABLE new_taxi_trips AS
            SELECT
                md5(concat_ws('|',
                    VendorID::VARCHAR,
                    tpep_pickup_datetime::VARCHAR,
                    tpep_dropoff_datetime::VARCHAR,
                    PULocationID::VARCHAR,
                    DOLocationID::VARCHAR,
                    total_amount::VARCHAR
                )) AS trip_id,
                tpep_pickup_datetime AS pickup_at,
                tpep_dropoff_datetime AS dropoff_at,
                passenger_count,
                trip_distance,
                total_amount,
                filename AS source_file,
                now() AS loaded_at
            FROM read_parquet(?, filename = true)
            WHERE tpep_pickup_datetime > ?
              AND (? IS NULL OR tpep_pickup_datetime < ?::TIMESTAMP)
        """, [S3_URI, last_pickup_at, INGESTION_END_TS, INGESTION_END_TS])

        rows_loaded = con.execute("SELECT count(*) FROM new_taxi_trips").fetchone()[0]

        con.execute("""
            INSERT INTO taxi_trips BY NAME
            SELECT * FROM new_taxi_trips
            ON CONFLICT (trip_id) DO UPDATE SET
                pickup_at = excluded.pickup_at,
                dropoff_at = excluded.dropoff_at,
                passenger_count = excluded.passenger_count,
                trip_distance = excluded.trip_distance,
                total_amount = excluded.total_amount,
                source_file = excluded.source_file,
                loaded_at = excluded.loaded_at
        """)

        max_pickup_at = con.execute(
            "SELECT max(pickup_at) FROM new_taxi_trips"
        ).fetchone()[0]
        if max_pickup_at is not None:
            con.execute(
                "UPDATE ingestion_watermarks SET last_pickup_at = ? WHERE pipeline_name = ?",
                [max_pickup_at, PIPELINE_NAME],
            )

        total_rows = con.execute("SELECT count(*) FROM taxi_trips").fetchone()[0]
        context.log.info("Loaded %s rows into taxi_trips", rows_loaded)

        return dg.MaterializeResult(
            metadata={
                "rows_loaded": rows_loaded,
                "total_rows": total_rows,
                "last_pickup_at": str(max_pickup_at or last_pickup_at),
            }
        )
    finally:
        con.close()


daily_s3_ingestion = dg.ScheduleDefinition(
    name="daily_s3_taxi_trips",
    cron_schedule="0 2 * * *",
    target=[taxi_trips],
)

defs = dg.Definitions(
    assets=[taxi_trips],
    schedules=[daily_s3_ingestion],
)


if __name__ == "__main__":
    result = dg.materialize([taxi_trips])
    if not result.success:
        raise RuntimeError("Dagster materialization failed.")
```

## Run the ingestion

Set the MotherDuck token and database name:

```bash
> export MOTHERDUCK_TOKEN="<motherduck_token>"
> export MOTHERDUCK_DATABASE="dagster_s3_demo"
```

For the public demo file, you can cap the first run to one day of taxi trips so the example finishes quickly:

```bash
> export MOTHERDUCK_INGESTION_END_TS="2022-11-02"
```

Run the asset once from Python:

```bash
> uv run python definitions.py
```

Run the same command again. The second run should load `0` rows because the first run advanced the watermark.

Verify the loaded rows in MotherDuck:

```sql
SELECT count(*) FROM taxi_trips;

SELECT pipeline_name, last_pickup_at
FROM ingestion_watermarks;
```

When you use your own S3 data, remove `MOTHERDUCK_INGESTION_END_TS` and replace:

- `S3_URI` with your `s3://<bucket>/<prefix>/*.parquet` path.
- The `SELECT` list in `new_taxi_trips` with your source columns.
- The watermark column with a stable source timestamp, such as `updated_at` or `created_at`.
- The primary key expression with the source system's durable row key.

## Run it in Dagster

Start the Dagster UI from the same directory:

```bash
> uv run dagster dev -f definitions.py
```

Open `http://localhost:3000`, select the `taxi_trips` asset, and materialize it. Dagster records the asset materialization, metadata, logs, and schedule definition.

To use the schedule in a long-running Dagster deployment, keep the `daily_s3_taxi_trips` schedule enabled and run a Dagster daemon. For local one-off testing, `uv run python definitions.py` is enough.

## Production considerations

This example is intentionally small. Before using the pattern in production:

- Use a dedicated service account token with only the permissions needed for ingestion.
- Store private bucket credentials as a MotherDuck S3 secret instead of embedding AWS keys in code.
- Keep S3 files in Parquet and avoid very small files. See [S3 import best practices](/key-tasks/cloud-storage/s3-import-best-practices/).
- Use a source-provided primary key for upserts. Hashing source fields is useful for demos but less stable than a real key.
- Use a source timestamp that only moves forward for watermarking. If your source sends late-arriving records, add a small overlap window and deduplicate by primary key.

## Related content

- [Amazon S3 credentials](/integrations/cloud-storage/amazon-s3/)
- [S3 import best practices](/key-tasks/cloud-storage/s3-import-best-practices/)
- [Connecting to MotherDuck](/key-tasks/authenticating-and-connecting-to-motherduck/connecting-to-motherduck/)
- [Hypertenancy](/concepts/hypertenancy/)


---

## Docs feedback

MotherDuck accepts optional user-submitted feedback about this page at `POST https://motherduck.com/docs/api/feedback/agent`.
For agents and automated tools, feedback submission should be user-confirmed before sending.

Payload:

```json
{
  "page_path": "/key-tasks/data-warehousing/orchestration/dagster/",
  "page_title": "Dagster",
  "text": "<the user's feedback, max 2000 characters>",
  "source": "<optional identifier for your interface, for example 'claude.ai' or 'chatgpt'>"
}
```

`page_path` and `text` are required; `page_title` and `source` are optional. Responses: `200 {"feedback_id": "<uuid>"}`, `400` for malformed payloads, and `429` when rate-limited.
