Let's Build
Before we connect our log drain, we need to create our processing function. The goal of our processing function is to:
- Filter out requests that are not important to us, like requests to fonts, CSS files, JavaScript files, etc.
- Classify the incoming user agents to determine if they are humans, bots or agents.
The processing function will live on its own path in our application and can be called with a POST request to my-site.com/api/drain. We start with the entry point api/drain.ts.
import type { IncomingMessage, ServerResponse } from "node:http";
import { handleDrain } from "../src/handler.js";
export const config = {
runtime: "nodejs",
};
export default async function handler(
req: IncomingMessage,
res: ServerResponse
): Promise<void> {
if (req.method !== "POST") {
res.statusCode = 405;
res.end("method not allowed");
return;
}
const rawBody = await readBody(req);
const sigHeader = req.headers["x-vercel-signature"];
const signature = Array.isArray(sigHeader) ? sigHeader[0] : sigHeader;
const { status, body } = await handleDrain(rawBody, signature);
res.statusCode = status;
res.end(body);
}
function readBody(req: IncomingMessage): Promise<string> {
return new Promise((resolve, reject) => {
const chunks: Buffer[] = [];
req.on("data", (c: Buffer) => chunks.push(c));
req.on("end", () => resolve(Buffer.concat(chunks).toString("utf8")));
req.on("error", reject);
});
}
Next up is the actual processing with the handleDrain function. We will skip over the signature part here, just make sure you set VERCEL_DRAIN_SECRET with an empty string for now as an environment variable in the Vercel settings of your project. We'll get the secret when setting up our log drain so that no one but our log drain can call this function.
The core logic of handleDrain is straightforward: we parse and classify the raw logs into rows to insert. If we only want AI related rows, we filter out everything else. If any rows are still left we insert those rows.
const AI_ONLY = (process.env.AI_ONLY ?? "false").toLowerCase() === "true";
const rows = parseAndClassify(rawBody);
const toInsert = AI_ONLY ? rows.filter((r) => r.ai_category !== null) : rows;
if (toInsert.length === 0) {
return { status: 200, body: `ok 0 of ${rows.length}` };
}
try {
await insertRows(toInsert);
} catch (err) { ... }
The crucial part of course is the parsing and classifying. Again, you can see the full logic in our example repo, but I'll highlight a few things.
const rows = [];
for (const item of items) {
if (shouldSkipPath(path)) {
continue;
}
const userAgent = pickString(line, [
"proxy.userAgent",
"userAgent",
"request.headers.user-agent",
]);
const category = classify(userAgent);
rows.push({
event_id: asString(line.id),
received_at: now,
event_ts: eventTs,
event_hour: new Date(Math.floor(eventTs.getTime() / 3_600_000) * 3_600_000),
project_id: asString(line.projectId),
deployment_id: asString(line.deploymentId),
source: asString(line.source),
host: pickString(line, ["proxy.host", "host"]),
path,
method: pickString(line, ["proxy.method", "method"]),
status_code: pickNumber(line, ["proxy.statusCode", "statusCode"]),
user_agent: userAgent,
referer,
client_ip: anonymizeIp(pickString(line, ["proxy.clientIp", "clientIp"])),
region: asString(line.region),
request_id: pickString(line, ["proxy.requestId", "requestId"]),
ai_category: category,
ai_name: name,
raw: JSON.stringify(line),
});
}
The actual inserting of rows happens in db.ts. This makes sure the database and tables exist, a connection is ready, and then inserts the remaining rows into MotherDuck.
A quick note and some best practices around writing to MotherDuck. In general doing single row inserts into MotherDuck (or DuckDB) is bad practice. The good thing is that Vercel already batches logs when possible so we insert more rows at the same time. Depending on how much traffic you get to your site sometimes only one row will be available though and it's fine to write that row. Once you get to higher and higher volumes it might be worth switching to Ducklake, where data will first be written to your data lake as small files and then later compacted to more efficient file sizes.