DuckDB & dbt | End-To-End Data Engineering Project (2/3)

2024/03/22 - 9 min read

BY

dbt is a great and straightforward tool for building production-ready data pipelines with SQL. It acts as a toolkit that assists in packaging, testing, and deploying your SQL pipelines. However, there's a common misconception that dbt itself processes data. In reality, it's a client that sends SQL commands to a cloud data warehouse, where the actual computing occurs. As a result, you always depend on this cloud service, and the development loop can sometimes be challenging.

In this blog, we'll explore how the development experience can be significantly improved through the use of DuckDB and dbt. We'll learn how to streamline your architecture, accelerate certain pipelines, and finally allow you to write genuine unit tests. We'll also cover some best practices for AWS S3 authentication and managing incremental pipelines.

All the source code is available on GitHub. And for those who prefer watching over reading, I've got a video for you.

Quick recap on part 1 : ingestion

In the first part of our end-to-end data engineering project, we gathered data from PyPi to obtain download statistics for a specific Python library, DuckDB, using Python. In this second part, we'll transform this raw data using dbt and DuckDB to prepare a dataset ready for data visualization, which will be the focus of part three in this series.

Don't worry if you haven't completed the first part of the project; we've got you covered. We have some sample raw data available in a public AWS S3 bucket that you can use as input for the transformation pipeline.

dbt & DuckDB Integration

In dbt, we connect to various databases through adapters, which are defined in a YAML file. These adapters make it easy to switch quickly between different environments. Typically, your Python process (dbt) would send the query to the target database.

However, since DuckDB is an embedded database and just another Python library to install (without any cloud dependency), we can run the computation within the same Python dbt process!

In this dbt project, we'll look at two setups (aka targets):

  1. Reading and writing from S3 when using dbt and DuckDB locally (our "dev" setup)
  2. Reading from S3 and pushing the result back to MotherDuck (our "prod" setup)
arch

Since MotherDuck is DuckDB in the cloud, you benefit from a seamless transition from working locally to scaling in the cloud. Moreover, for part 3, as we aim to create a dashboard with a BI tool, which mostly relies on a SQL engine to fetch data, MotherDuck will prove to be very useful.

Let's dive into the code.

Building the SQL pipeline

Setup

Our initial repository has a monolithic structure with the first part of the series located under /ingestion. We'll create a new folder under /transform for the code discussed in this blog.

First off, we need to add the dbt package dependency. As of now, MotherDuck supports only one version of DuckDB. We're using Poetry as our package manager, so to install dbt and the appropriate DuckDB version, simply execute:

Copy code

poetry add dbt-duckdb[md]

Next, initiate the dbt repository under ./transform with:

Copy code

dbt init pypi_metrics

You should now see a structure with some folders pre-created for you:

Copy code

. ├── analyses ├── dbt_project.yml ├── macros ├── models ├── package-lock.yml ├── packages.yml ├── profiles.yml ├── seeds ├── snapshots ├── target └── tests

Exploring the Data and Building the Model

To start, I want to explore the raw data. You can access a free public sample here: s3://us-prd-motherduck-open-datasets/pypi/sample_tutorial/pypi_file_downloads/*/*/*.parquet

A straightforward way to begin is by using the DuckDB CLI. You can find the installation steps online. A useful setup I recommend -if you are using VSCode- is opening a terminal in VSCode and configuring a shortcut to send commands from the editor to the terminal (the opened DuckDB CLI). I assigned the cmd+k shortcut to this specific command in my JSON Keyboard Shortcuts settings.

Copy code

{ "key": "cmd+k", "command": "workbench.action.terminal.runSelectedText" },

That way, you are building your SQL query directly at the right place, in a SQL file

As you can see on the above screenshot, you can easily describe a remote parquet file using :

Copy code

DESCRIBE TABLE 's3://us-prd-motherduck-open-datasets/pypi/sample_tutorial/pypi_file_downloads/*/*/*.parquet';

This data shows each row as a download of a specific Python project, already filtered for the duckdb project.

Our transformations should include:

  • Selecting only relevant columns and unnesting as necessary.
  • Converting the Python version to include only minor versions (e.g., 3.9.1 -> 3.9) for more meaningful aggregation.
  • Aggregating the download count per day to streamline our insights.
  • Adding a load_id (based on a hash) for incremental loading.

The final model is as follows:

Copy code

WITH pre_aggregated_data AS ( SELECT timestamp :: date as download_date, details.system.name AS system_name, details.system.release AS system_release, file.version AS version, project, country_code, details.cpu, CASE WHEN details.python IS NULL THEN NULL ELSE CONCAT( SPLIT_PART(details.python, '.', 1), '.', SPLIT_PART(details.python, '.', 2) ) END AS python_version FROM {{ dbt_unit_testing.source('external_source', 'pypi_file_downloads') }} WHERE download_date >= '{{ var("start_date") }}' AND download_date < '{{ var("end_date") }}' ) SELECT MD5(CONCAT_WS('|', download_date, system_name, system_release, version, project, country_code, cpu, python_version)) AS load_id, download_date, system_name, system_release, version, project, country_code, cpu, python_version, COUNT(*) AS daily_download_sum FROM pre_aggregated_data GROUP BY ALL

Notable points include:

  • Filtering is always done between a dbt variable including start_date and end_date for easy data reprocessing.
  • The source table is abstracted with {{ dbt_unit_testing.source('external_source', 'pypi_file_downloads') }} for unit testing purposes (more on that further in the blog).

Before we get to unit testing, let's review our configuration files, mainly sources.yml and dbt_project.yml and profiles.yml.

YAML configurations files

Sources are defined in sources.yml in /transform/pypi_metrics/models/sources.yml

Copy code

version: 2 sources: - name: external_source meta: external_location: "{{ env_var('TRANSFORM_S3_PATH_INPUT') }}" tables: - name: pypi_file_downloads

We're using an external location (AWS S3) with a nickname that we referred to in our model's FROM statement earlier.

We've also made the S3 path flexible so it can be provided through environment variables.

To manage these environment variables smoothly, we use a Makefile along with a .env file. At the beginning of the Makefile, you'll see:

Copy code

include .env export

In the code repository, there's an env.template file. You can copy this to create a .env file and enter the necessary values.

Next, we initiate the dbt run through an entry in the Makefile named pypi-transform:

Copy code

pypi-transform: cd $$DBT_FOLDER && \ dbt run \ --target $$DBT_TARGET \ --vars '{"start_date": "$(START_DATE)", "end_date": "$(END_DATE)"}'

Let's have a look now on our dbt_project.yml

Copy code

models: pypi_metrics: pypi_daily_stats: +materialized: "{{ 'incremental' if target.name == 'prod' else 'table' }}" +unique_key: load_id +pre-hook: "{% if target.name == 'dev' %}CALL load_aws_credentials(){% endif %}" +post-hook: "{% if target.name == 'dev' %}{{ export_partition_data('download_date', this.name ) }}{% endif %}"

As mentioned before, we have two setups: one for local running and read/writing to AWS S3, and another using MotherDuck, designated as dev and prod targets, respectively.

These settings are outlined in our profiles.yml:

Copy code

pypi_metrics: outputs: dev: type: duckdb path: /tmp/dbt.duckdb prod: type: duckdb path: "md:" target: dev

The only difference between running locally and using MotherDuck is the path setting. Using md: triggers authentication with MotherDuck, which checks for a token in the motherduck_token environment variable. You can get this token from your MotherDuck account settings page.

We face a few challenges:

  • dbt doesn't support incremental loading when writing to an external source like AWS S3.
  • We need to authenticate with AWS S3.

Thankfully, DuckDB offers extensions that simplify authentication and read/write operations to AWS S3. To address the first challenge, we write to AWS S3 with partitions, allowing us to process within a specific time frame and overwrite any existing partitions.

We use a simple macro, export_partition_data.sql, for this:

Copy code

{% macro export_partition_data(date_column, table) %} {% set s3_path = env_var('TRANSFORM_S3_PATH_OUTPUT', 'my-bucket-path') %} COPY ( SELECT *, YEAR({{ date_column }}) AS year, MONTH({{ date_column }}) AS month FROM {{ table }} ) TO '{{ s3_path }}/{{ table }}' (FORMAT PARQUET, PARTITION_BY (year, month), OVERWRITE_OR_IGNORE 1, COMPRESSION 'ZSTD', ROW_GROUP_SIZE 1000000); {% endmacro %}

With dbt running DuckDB, it creates an internal table from the model, allowing us to easily export this data to any format and remote storage (AWS S3/GCP Cloud storage) using the COPY command.

Notable points include:

  • The AWS S3 path is set as an environment variable.
  • We use a date column for partition generation. For instance, our data will be stored as s3://my-bucket/my_data/year=2024/month=04.

For authentication, we use another extension and invoke CALL load_aws_credentials() as a pre-hook in the dbt_project.yml, looking for the default profile under ~/.aws.

With all configurations set for different environments, let's dive into unit testing.

Unit Testing the Model

DuckDB operates in-process, allowing us to iterate quickly on our model since computation occurs locally within the same dbt process. dbt is improving unit tests in its April 1.8 release, but currently, it's challenging to run tests without cloud dependencies. While you could install Postgres locally, it's an additional step.

For unit testing, we use the dbt-unit-testing dbt package, added to a packages.yml file at the root of your dbt directory:

Copy code

packages: - git: "https://github.com/EqualExperts/dbt-unit-testing" revision: v0.4.12

First, install the package by running dbt deps. This step allows us to use SQL for defining our mock data, both the input and the expected outcome, and then run the model using dbt-duckdb right on our local machine.

Next, dive into the tests folder and craft a new SQL file named test_pypi_daily_stats.sql:

Copy code

{{ config(tags=['unit-test']) }} {% call dbt_unit_testing.test ('pypi_daily_stats','check_duckdb_downloads_on_20230402') %} {% call dbt_unit_testing.mock_source('external_source', 'pypi_file_downloads') %} SELECT '2023-04-02 14:49:15+02'::timestamp AS timestamp, 'US' AS country_code, '/packages/38/5b/...' AS url, 'duckdb' AS project, NULL AS file, -- Assuming the 'file' struct is not essential for this test STRUCT_PACK( installer := NULL, python := '3.8.2', implementation := NULL, distro := NULL, system := STRUCT_PACK(name := 'Linux', release := '4.15.0-66-generic'), cpu := 'x86_64', openssl_version := NULL, setuptools_version := NULL, rustc_version := NULL ) AS details, 'TLSv1.2' AS tls_protocol, 'ECDHE-RSA-AES128-GCM-SHA256' AS tls_cipher UNION ALL SELECT '2023-04-02 14:49:15+02'::timestamp AS timestamp, 'US' AS country_code, '/packages/38/5b/...' AS url, 'duckdb' AS project, NULL AS file, -- Assuming the 'file' struct is not essential for this test STRUCT_PACK( installer := NULL, python := '3.9.1', implementation := NULL, distro := NULL, system := STRUCT_PACK(name := 'Linux', release := '4.15.0-66-generic'), cpu := 'x86_64', openssl_version := NULL, setuptools_version := NULL, rustc_version := NULL ) AS details, 'TLSv1.2' AS tls_protocol, 'ECDHE-RSA-AES128-GCM-SHA256' AS tls_cipher -- Add more rows as needed for your test {% endcall %} {% call dbt_unit_testing.expect() %} SELECT '2023-04-02'::date AS download_date, 'duckdb' AS project, '3.8' AS python_version, 'x86_64' AS cpu, 'Linux' AS system_name, 2 AS daily_download_sum -- Adjust this based on the expected outcome of your test {% endcall %} {% endcall %}

This test is structured in three key parts:

  1. Specifying which model we're testing with {% call dbt_unit_testing.test('pypi_daily_stats', 'check_duckdb_downloads_on_20230402') %}.
  2. Creating mock source data using {% call dbt_unit_testing.mock_source('external_source', 'pypi_file_downloads') %}, which uses SQL to simulate the data. This method allows for the easy definition of complex data structures, perfect for working with DuckDB.
  3. Defining the expected results with {% call dbt_unit_testing.expect() %} to verify our model's output.

Run the test by executing:

Copy code

dbt test

Or, use the Makefile shortcut make pypi-transform-test to initiate testing directly from the project's root folder.

The testing process is swift, typically taking less than two seconds!

A New Developer Experience

This blog has highlighted the dbt-duckdb adapter's contributions, showcasing it as more than a new dbt destination. It introduces a revitalized developer experience, enabling local prototyping, cloud-independent unit testing, and smooth transitions to cloud deployments with MotherDuck. Up next in this series, we'll breathe life into our PyPi dataset by creating a dashboard.

In the meantimes, keep quacking and keep coding.

CONTENT
  1. Quick recap on part 1 : ingestion
  2. dbt & DuckDB Integration
  3. Building the SQL pipeline
  4. Unit Testing the Model
  5. A New Developer Experience

Start using MotherDuck now!

blog subscription icon

Subscribe to motherduck blog