Introducing Flights: agent-native data pipelines in MotherDuckJoin the livestream

Skip to main content

Dagster

Use Dagster when you want asset lineage, schedules, retries, and run history around a Python data loading job. This guide builds a minimum viable Dagster asset that reads Parquet data from S3, loads rows newer than the last successful run, upserts them into MotherDuck, and stores a watermark for the next run.

The example uses a public S3 Parquet file from the MotherDuck sample data bucket. Replace the S3 path and column mapping with your own bucket layout when you move from the demo to your pipeline.

How the pipeline works

The asset keeps the state in MotherDuck:

  • taxi_trips is the target table.
  • ingestion_watermarks stores the latest pickup_at value loaded by this pipeline.
  • Each run reads only rows where tpep_pickup_datetime is greater than the stored watermark.
  • The target table has a primary key, so reprocessing the same row updates the existing row instead of creating a duplicate.

Prerequisites

Before you start, ensure you have:

  • Python 3.10 or later.
  • uv for Python project and dependency management.
  • A MotherDuck access token in MOTHERDUCK_TOKEN.
  • A MotherDuck database name for the pipeline. The example creates the database if it doesn't exist.
  • For private S3 buckets, a MotherDuck S3 secret. See Amazon S3 credentials for setup.
tip

Use a dedicated MotherDuck service account for scheduled ingestion jobs. This keeps ingestion compute, permissions, and cost attribution separate from analyst and application workloads. See Hypertenancy for the compute isolation model.

Create the Dagster project

Create a small Python project and add Dagster with DuckDB:

> uv init dagster-motherduck-s3
> cd dagster-motherduck-s3
> uv add dagster dagster-webserver duckdb

Create definitions.py:

import os
import re

import dagster as dg
import duckdb

S3_URI = os.getenv(
"S3_URI",
"s3://us-prd-motherduck-open-datasets/nyc_taxi/parquet/yellow_cab_nyc_2022_11.parquet",
)
MOTHERDUCK_DATABASE = os.getenv("MOTHERDUCK_DATABASE", "dagster_s3_demo")
PIPELINE_NAME = "dagster_s3_taxi_trips"

# Optional cap for running the demo quickly. Leave unset for a real pipeline.
INGESTION_END_TS = os.getenv("MOTHERDUCK_INGESTION_END_TS")

PUBLIC_DEMO_SCOPE = "s3://us-prd-motherduck-open-datasets/"


def database_identifier(name: str) -> str:
if not re.fullmatch(r"[A-Za-z_][A-Za-z0-9_]*", name):
raise ValueError("Use a database name with letters, numbers, and underscores.")
return name


def open_motherduck_connection() -> duckdb.DuckDBPyConnection:
database = database_identifier(MOTHERDUCK_DATABASE)
con = duckdb.connect("md:")
con.execute(f"CREATE DATABASE IF NOT EXISTS {database}")
con.execute(f"USE {database}")

if S3_URI.startswith(PUBLIC_DEMO_SCOPE):
con.execute("""
CREATE OR REPLACE TEMPORARY SECRET public_motherduck_open_data (
TYPE S3,
PROVIDER config,
REGION 'us-east-1',
SCOPE 's3://us-prd-motherduck-open-datasets/'
)
""")

return con


@dg.asset
def taxi_trips(context: dg.AssetExecutionContext) -> dg.MaterializeResult:
con = open_motherduck_connection()
try:
con.execute("""
CREATE TABLE IF NOT EXISTS taxi_trips (
trip_id VARCHAR PRIMARY KEY,
pickup_at TIMESTAMP,
dropoff_at TIMESTAMP,
passenger_count DOUBLE,
trip_distance DOUBLE,
total_amount DOUBLE,
source_file VARCHAR,
loaded_at TIMESTAMP DEFAULT now()
)
""")
con.execute("""
CREATE TABLE IF NOT EXISTS ingestion_watermarks (
pipeline_name VARCHAR PRIMARY KEY,
last_pickup_at TIMESTAMP
)
""")
con.execute("""
INSERT INTO ingestion_watermarks
VALUES (?, TIMESTAMP '1970-01-01')
ON CONFLICT (pipeline_name) DO NOTHING
""", [PIPELINE_NAME])

last_pickup_at = con.execute(
"SELECT last_pickup_at FROM ingestion_watermarks WHERE pipeline_name = ?",
[PIPELINE_NAME],
).fetchone()[0]

con.execute("""
CREATE OR REPLACE TEMP TABLE new_taxi_trips AS
SELECT
md5(concat_ws('|',
VendorID::VARCHAR,
tpep_pickup_datetime::VARCHAR,
tpep_dropoff_datetime::VARCHAR,
PULocationID::VARCHAR,
DOLocationID::VARCHAR,
total_amount::VARCHAR
)) AS trip_id,
tpep_pickup_datetime AS pickup_at,
tpep_dropoff_datetime AS dropoff_at,
passenger_count,
trip_distance,
total_amount,
filename AS source_file,
now() AS loaded_at
FROM read_parquet(?, filename = true)
WHERE tpep_pickup_datetime > ?
AND (? IS NULL OR tpep_pickup_datetime < ?::TIMESTAMP)
""", [S3_URI, last_pickup_at, INGESTION_END_TS, INGESTION_END_TS])

rows_loaded = con.execute("SELECT count(*) FROM new_taxi_trips").fetchone()[0]

con.execute("""
INSERT INTO taxi_trips BY NAME
SELECT * FROM new_taxi_trips
ON CONFLICT (trip_id) DO UPDATE SET
pickup_at = excluded.pickup_at,
dropoff_at = excluded.dropoff_at,
passenger_count = excluded.passenger_count,
trip_distance = excluded.trip_distance,
total_amount = excluded.total_amount,
source_file = excluded.source_file,
loaded_at = excluded.loaded_at
""")

max_pickup_at = con.execute(
"SELECT max(pickup_at) FROM new_taxi_trips"
).fetchone()[0]
if max_pickup_at is not None:
con.execute(
"UPDATE ingestion_watermarks SET last_pickup_at = ? WHERE pipeline_name = ?",
[max_pickup_at, PIPELINE_NAME],
)

total_rows = con.execute("SELECT count(*) FROM taxi_trips").fetchone()[0]
context.log.info("Loaded %s rows into taxi_trips", rows_loaded)

return dg.MaterializeResult(
metadata={
"rows_loaded": rows_loaded,
"total_rows": total_rows,
"last_pickup_at": str(max_pickup_at or last_pickup_at),
}
)
finally:
con.close()


daily_s3_ingestion = dg.ScheduleDefinition(
name="daily_s3_taxi_trips",
cron_schedule="0 2 * * *",
target=[taxi_trips],
)

defs = dg.Definitions(
assets=[taxi_trips],
schedules=[daily_s3_ingestion],
)


if __name__ == "__main__":
result = dg.materialize([taxi_trips])
if not result.success:
raise RuntimeError("Dagster materialization failed.")

Run the ingestion

Set the MotherDuck token and database name:

> export MOTHERDUCK_TOKEN="<motherduck_token>"
> export MOTHERDUCK_DATABASE="dagster_s3_demo"

For the public demo file, you can cap the first run to one day of taxi trips so the example finishes quickly:

> export MOTHERDUCK_INGESTION_END_TS="2022-11-02"

Run the asset once from Python:

> uv run python definitions.py

Run the same command again. The second run should load 0 rows because the first run advanced the watermark.

Verify the loaded rows in MotherDuck:

SELECT count(*) FROM taxi_trips;

SELECT pipeline_name, last_pickup_at
FROM ingestion_watermarks;

When you use your own S3 data, remove MOTHERDUCK_INGESTION_END_TS and replace:

  • S3_URI with your s3://<bucket>/<prefix>/*.parquet path.
  • The SELECT list in new_taxi_trips with your source columns.
  • The watermark column with a stable source timestamp, such as updated_at or created_at.
  • The primary key expression with the source system's durable row key.

Run it in Dagster

Start the Dagster UI from the same directory:

> uv run dagster dev -f definitions.py

Open http://localhost:3000, select the taxi_trips asset, and materialize it. Dagster records the asset materialization, metadata, logs, and schedule definition.

To use the schedule in a long-running Dagster deployment, keep the daily_s3_taxi_trips schedule enabled and run a Dagster daemon. For local one-off testing, uv run python definitions.py is enough.

Production considerations

This example is intentionally small. Before using the pattern in production:

  • Use a dedicated service account token with only the permissions needed for ingestion.
  • Store private bucket credentials as a MotherDuck S3 secret instead of embedding AWS keys in code.
  • Keep S3 files in Parquet and avoid very small files. See S3 import best practices.
  • Use a source-provided primary key for upserts. Hashing source fields is useful for demos but less stable than a real key.
  • Use a source timestamp that only moves forward for watermarking. If your source sends late-arriving records, add a small overlap window and deduplicate by primary key.