The Code: Concrete Implementation
Here's the implementation we use in production. You can run this code yourself to test the approach.
It should be noted that this is simply SQL, wrapped in python. We will share the SQL here first, and then the actual python subsequently.
$ duckdb md:
INSTALL postgres;
LOAD postgres;
ATTACH 'PG_CONNECTION_STRING' AS pg (TYPE POSTGRES, READ_ONLY);
ATTACH 'md:MD_DATABASE';
USE MD_DATABASE;
CREATE OR REPLACE TABLE first_table AS SELECT * FROM pg.first_table;
Step 1: Attach both databases
First, we establish connections to both Postgres and MotherDuck:
def run():
pg_connection_string = "postgresql://username:password@hostname:5432/dbname"
md_database = "analytics_replica"
duck_con = duckdb.connect()
duck_con.sql(f"ATTACH '{pg_connection_string}' AS pg (TYPE POSTGRES, READ_ONLY);")
duck_con.sql(f"ATTACH 'md:{md_database}'; USE {md_database}")
ctas_from_diff_db(duck_con)
last_sync_time(duck_con)
You can test this by replacing the connection strings with your own and running the script.
Step 2: Replicate the tables
Here's the exact function that handles the table replication:
def ctas_from_diff_db(duck_con):
start_time = time.time()
duck_con.sql("CREATE OR REPLACE TABLE first_table AS SELECT * FROM pg.first_table;")
print(f"Replicated first_table table in {time.time() - start_time:.2f} seconds")
...
The output will show you exactly how long each table takes to replicate.
Step 3: Track the sync timestamp
To maintain an audit trail of sync operations, we record the exact time when each sync completes. This is useful so that end consumers understand the freshness of the data when they use it to make decisions, and for automated freshness checks.
def last_sync_time(duck_con):
duck_con.sql(
"CREATE OR REPLACE TABLE last_sync_time AS SELECT current_timestamp AS last_sync_time;"
)
result = duck_con.sql("SELECT * FROM last_sync_time").fetchall()
print(f"Sync completed and recorded at: {result[0][0]}")
You can verify the synchronization by comparing data in your source and destination:
source_count = duck_con.sql("SELECT COUNT(*) FROM pg.databases").fetchone()[0]
dest_count = duck_con.sql("SELECT COUNT(*) FROM databases").fetchone()[0]
print(f"Source database has {source_count} rows")
print(f"Destination has {dest_count} rows")
assert source_count == dest_count, "Row counts don't match!"