Custom Middleware
Problem
Section titled “Problem”You need to write custom middleware — feature flags, audit logging, request enrichment, or third-party integrations — but ergo’s accumulator-based return-value model differs fundamentally from Express or Fastify’s event/mutation approach. You need to understand the contract to write middleware that composes cleanly with the pipeline.
Solution
Section titled “Solution”The Middleware Contract
Section titled “The Middleware Contract”Every middleware function receives four arguments and returns
{value?, response?}:
function myMiddleware(options) { // Factory: runs once at startup const precomputed = buildConfig(options);
// Inner function: runs per-request return (req, res, domainAcc, responseAcc) => { // Return {value?, response?} or undefined return { value: { /* domain data */ } }; };}import type { IncomingMessage, ServerResponse } from "node:http";
interface MiddlewareResult { value?: Record<string, unknown>; response?: { statusCode?: number; headers?: [string, string | undefined][]; body?: unknown; };}
function myMiddleware(options: MyOptions) { const precomputed = buildConfig(options);
return ( req: IncomingMessage, res: ServerResponse, domainAcc: Record<string, unknown>, responseAcc: Record<string, unknown>, ): MiddlewareResult => { return { value: { /* domain data */ } }; };}Return value interpretation:
| Return Shape | Behavior |
|---|---|
{value} | Merged into the domain accumulator |
{response} | Merged into the response accumulator |
{value, response} | Both merges applied |
undefined or null | No-op — pipeline continues unchanged |
Plain object (no value/response keys) | Treated as {value: returnedObject} |
When response.statusCode is set, the pipeline breaks immediately —
no subsequent middleware executes.
Domain Accumulator Contribution
Section titled “Domain Accumulator Contribution”Use a {fn, setPath} config object to set a named path on the domain
accumulator. This is the standard pattern for middleware that produces
a single named result:
import { compose, handler } from "@centralping/ergo";
function featureFlags(options) { const flagService = createFlagClient(options); return async (req, res, acc) => { const flags = await flagService.evaluate(acc.auth?.sub); return { value: flags }; };}
const pipeline = compose( {fn: featureFlags({ endpoint: "https://flags.internal" }), setPath: "flags"}, // acc.flags is now available to subsequent middleware (req, res, acc) => ({ response: { body: { features: acc.flags, user: acc.auth?.sub }, }, }),);
export default handler(pipeline);import { compose, handler } from "@centralping/ergo";import type { IncomingMessage, ServerResponse } from "node:http";
interface Flags { [flag: string]: boolean }
function featureFlags(options: { endpoint: string }) { const flagService = createFlagClient(options); return async ( req: IncomingMessage, res: ServerResponse, acc: { auth?: { sub?: string } }, ): Promise<{ value: Flags }> => { const flags = await flagService.evaluate(acc.auth?.sub); return { value: flags }; };}
const pipeline = compose( {fn: featureFlags({ endpoint: "https://flags.internal" }), setPath: "flags"}, (req: IncomingMessage, res: ServerResponse, acc: { flags: Flags; auth?: { sub?: string } }) => ({ response: { body: { features: acc.flags, user: acc.auth?.sub }, }, }),);
export default handler(pipeline);function featureFlags(options) { const flagService = createFlagClient(options); return async (req, res, acc) => { const flags = await flagService.evaluate(acc.auth?.sub); return { value: flags }; };}
router.get("/dashboard", { use: [{fn: featureFlags({ endpoint: "https://flags.internal" }), setPath: "flags"}], execute: (req, res, acc) => ({ response: { body: { features: acc.flags, user: acc.auth?.sub }, }, }),});Without a setPath (bare function, no config object), the returned value is
merged via Object.assign(domainAcc, value) — use this when
contributing multiple keys:
function requestMetadata() { return (req, res, acc) => ({ value: { receivedAt: Date.now(), correlationId: req.headers["x-correlation-id"] ?? crypto.randomUUID(), }, });}Response Accumulator Contribution
Section titled “Response Accumulator Contribution”Return {response: {...}} to contribute headers, status codes, or
other response metadata:
function apiVersion(version) { const header = ["X-API-Version", version]; return () => ({ response: { headers: [header] }, });}Response Timing
Section titled “Response Timing”Measuring response time via the accumulator requires a two-step pattern
because middleware return values are captured before send() writes
the response. Register the timer with setPath: "timing" so downstream
middleware reads acc.timing.startedAt:
function responseTimer() { return (req, res, acc) => { return { value: { startedAt: performance.now() } }; };}
const pipeline = compose( {fn: responseTimer(), setPath: "timing"}, // ... other middleware ... (req, res, acc) => { const elapsed = (performance.now() - acc.timing.startedAt).toFixed(2); return { response: { headers: [["X-Response-Time", `${elapsed}ms`]], body: { result: "ok" }, }, }; },);Shared Computation
Section titled “Shared Computation”When multiple middleware or your execute handler need the same expensive lookup, use a factory that creates a memoized resolver. The first middleware to call it performs the fetch; subsequent accesses return the cached result. Store the resolver on the domain accumulator so it lives only for the current request:
import { compose, handler } from "@centralping/ergo";
function tenantResolver(options) { const db = createDbClient(options); return (req, res, acc) => { let result; let resolved = false; const resolve = async () => { if (!resolved) { result = await db.tenants.findById(acc.auth?.tenantId); resolved = true; } return result; }; return { value: { resolve } }; };}
const pipeline = compose( // ... auth middleware populates acc.auth ... {fn: tenantResolver({ connectionString: DB_URL }), setPath: "tenant"}, // Both permissions check and execute can call acc.tenant.resolve() // without redundant DB queries async (req, res, acc) => { const tenant = await acc.tenant.resolve(); return { response: { body: { name: tenant.name, plan: tenant.plan } }, }; },);
export default handler(pipeline);function tenantResolver(options) { const db = createDbClient(options); return (req, res, acc) => { let result; let resolved = false; const resolve = async () => { if (!resolved) { result = await db.tenants.findById(acc.auth?.tenantId); resolved = true; } return result; }; return { value: { resolve } }; };}
router.get("/tenant/profile", { use: [{fn: tenantResolver({ connectionString: DB_URL }), setPath: "tenant"}], execute: async (req, res, acc) => { const tenant = await acc.tenant.resolve(); return { response: { body: { name: tenant.name, plan: tenant.plan } }, }; },});Audit Logging
Section titled “Audit Logging”Record request context for compliance or analytics without affecting the response. An audit middleware stores structured entries on the accumulator; the execute handler (or a downstream logger) flushes them:
import { compose, handler } from "@centralping/ergo";
function auditLog(options) { const sink = createAuditSink(options); return (req, res, acc) => { const entry = { actor: acc.auth?.sub, action: `${req.method} ${acc.url?.pathname}`, timestamp: Date.now(), ip: req.socket.remoteAddress, }; sink.record(entry); return { value: entry }; };}
const pipeline = compose( // ... auth + url middleware ... {fn: auditLog({ stream: "api-audit" }), setPath: "audit"}, (req, res, acc) => ({ response: { body: { result: "ok", auditId: acc.audit.timestamp }, }, }),);
export default handler(pipeline);function auditLog(options) { const sink = createAuditSink(options); return (req, res, acc) => { const entry = { actor: acc.auth?.sub, action: `${req.method} ${acc.url?.pathname}`, timestamp: Date.now(), ip: req.socket.remoteAddress, }; sink.record(entry); return { value: entry }; };}
const router = createRouter({ defaults: { use: [{fn: auditLog({ stream: "api-audit" }), setPath: "audit"}], },});
router.post("/accounts", { execute: (req, res, acc) => ({ response: { statusCode: 201, body: { created: true, auditId: acc.audit.timestamp }, }, }),});Fast-Fail Rejection
Section titled “Fast-Fail Rejection”Set response.statusCode to break the pipeline immediately. Use this
for middleware that rejects requests before they reach the execute
handler:
import { compose, handler } from "@centralping/ergo";
function requireAuditContext() { return (req, res, acc) => { if (!acc.auth?.sub) { return { response: { statusCode: 401, body: { type: "https://developer.mozilla.org/en-US/docs/Web/HTTP/Status/401", title: "Unauthorized", detail: "Audit context requires authenticated identity.", }, }, }; } return { value: { auditActor: acc.auth.sub } }; };}
const pipeline = compose( // ... auth middleware ... {fn: requireAuditContext(), setPath: "audit"}, // Only reached if auth context exists (req, res, acc) => ({ response: { body: { actor: acc.audit.auditActor } }, }),);
export default handler(pipeline);function requireAuditContext() { return (req, res, acc) => { if (!acc.auth?.sub) { return { response: { statusCode: 401, body: { type: "https://developer.mozilla.org/en-US/docs/Web/HTTP/Status/401", title: "Unauthorized", detail: "Audit context requires authenticated identity.", }, }, }; } return { value: { auditActor: acc.auth.sub } }; };}
router.get("/admin/audit-log", { use: [{fn: requireAuditContext(), setPath: "audit"}], execute: (req, res, acc) => ({ response: { body: { actor: acc.audit.auditActor } }, }),});Adapting Express Middleware
Section titled “Adapting Express Middleware”fromConnect() wraps Connect/Express-style (req, res, next)
middleware for use in an ergo pipeline:
import { compose, handler, fromConnect } from "@centralping/ergo";import helmet from "helmet";
const pipeline = compose( fromConnect(helmet()), // ... remaining middleware);
export default handler(pipeline);Hybrid pattern — use fromConnect() for side effects and a native
middleware for accumulator contribution:
import { compose, handler, fromConnect } from "@centralping/ergo";import someExpressMiddleware from "some-express-middleware";
const pipeline = compose( // Side effects (sets res headers) fromConnect(someExpressMiddleware()), // Native middleware reads/writes accumulators {fn: myNativeMiddleware(), setPath: "enrichment"},);
export default handler(pipeline);Explanation
Section titled “Explanation”Pipeline Placement via use
Section titled “Pipeline Placement via use”In ergo-router, the use config key inserts custom middleware after
Stage 3 (Validation) and before Stage 4 (Execution). This means
custom middleware has full access to everything produced by earlier
stages — URL parsing, body parsing, authentication, cookies, and
validated parameters:
router.get("/resource/:id", { validate: { params: paramsSchema }, use: [{fn: myMiddleware(), setPath: "custom"}], // acc.url, acc.body, acc.auth, acc.cookies, acc.route.params // are all available inside myMiddleware execute: (req, res, acc) => ({ /* ... */ }),});Concatenation behavior: defaults.use and route-level use are
concatenated — defaults run first, then route-specific middleware:
const router = createRouter({ defaults: { use: [{fn: globalMiddleware(), setPath: "global"}], },});
router.get("/endpoint", { // Pipeline runs: ...defaults.use → ...route.use → execute use: [{fn: routeMiddleware(), setPath: "route"}], execute: (req, res, acc) => ({ /* ... */ }),});Disabling custom middleware: Set use: false on a route to skip
all custom middleware (both defaults and route-level):
router.get("/health", { use: false, execute: () => ({ response: { body: { status: "ok" } } }),});Accumulator Access
Section titled “Accumulator Access”Custom middleware in use position receives the full domain
accumulator built by the Negotiation, Authorization, and Validation
stages. Available keys depend on which middleware ran before yours:
| Key | Source Stage | Description |
|---|---|---|
acc.url | Negotiation | Parsed pathname, query (acc.url.query), search |
acc.cookies | Negotiation | Parsed cookies |
acc.auth | Authorization | Authentication result |
acc.body | Validation | Parsed request body (acc.body.parsed for the parsed content) |
acc.route.params | Router transport | Route path parameters (seeded by ergo-router before the pipeline runs) |
The response accumulator (responseAcc) contains any headers or
metadata set by earlier middleware (security headers, CORS, cache
directives).
Comparison with Express
Section titled “Comparison with Express”| Concern | Express/Connect | ergo |
|---|---|---|
| Data passing | req.user = data (mutation) | return {value: {user: data}} (accumulator) |
| Pipeline control | next() / next(err) | Return undefined (continue) or {response: {statusCode}} (break) |
| Response headers | res.setHeader() (imperative) | {response: {headers: [...]}} (declarative) |
| Error handling | (err, req, res, next) | {response: {statusCode, detail}} |
| Side effects | Encouraged (mutate req/res) | Avoided (pure return values) |
| Testability | Requires mocking req/res/next | Call function, assert return value |
Anti-Patterns
Section titled “Anti-Patterns”Mutating req or res Directly
Section titled “Mutating req or res Directly”// ❌ Express habit — mutates shared statefunction bad(req, res, acc) { req.user = acc.auth?.sub; res.setHeader("X-Custom", "value");}
// ✅ Return values — composable and testablefunction good(req, res, acc) { return { value: { user: acc.auth?.sub }, response: { headers: [["X-Custom", "value"]] }, };}Returning a Plain Object When You Mean Response Metadata
Section titled “Returning a Plain Object When You Mean Response Metadata”A plain object (without explicit value or response keys) is treated
as {value: returnedObject} and merged into the domain accumulator.
If you intend to contribute headers or a status code, wrap in response:
// ❌ This merges {headers: [...]} into the DOMAIN accumulatorfunction bad() { return { headers: [["X-Timing", "42ms"]] };}
// ✅ Explicit response — headers go to the response accumulatorfunction good() { return { response: { headers: [["X-Timing", "42ms"]] } };}Ignoring the Pipeline Break Signal
Section titled “Ignoring the Pipeline Break Signal”When responseAcc.statusCode is set by an earlier middleware, the
pipeline breaks and your middleware will not execute. Do not design
middleware that depends on running “cleanup” after a rejection — use
the response accumulator or res event listeners instead:
// ❌ Assumes this always runs — it won't after a 4xx/5xx breakfunction bad() { return (req, res, acc) => { doSetup(); // If earlier middleware set statusCode, this never executes return { value: { ready: true } }; };}
// ✅ If cleanup is needed, attach to res events at request timefunction good() { return (req, res) => { res.on("close", () => releaseResources()); return undefined; };}Using fromConnect() When You Need Accumulator Access
Section titled “Using fromConnect() When You Need Accumulator Access”fromConnect() always returns undefined — it cannot read the domain
accumulator or contribute data to it. If your middleware needs acc,
write a native ergo middleware:
// ❌ Cannot access acc.auth inside helmet()compose(fromConnect(helmet()), /* ... */);
// ✅ Native middleware with full accumulator accessfunction rateLimitByUser() { return (req, res, acc) => { const key = acc.auth?.sub ?? req.socket.remoteAddress; // ... rate limit logic using the accumulator ... return undefined; };}Module-Level Mutable State
Section titled “Module-Level Mutable State”Avoid storing per-request data in module-level variables. Each request must be independent — use factory closures or the domain accumulator:
// ❌ Shared mutable state — race conditions, memory leaksconst requestCache = new Map();function bad() { return (req, res, acc) => { requestCache.set(req, { /* data */ }); // Who deletes this entry? When? };}
// ✅ Closure-scoped per invocation — no shared statefunction good(options) { const db = createClient(options); return (req, res, acc) => { let result; let resolved = false; const resolve = async () => { if (!resolved) { result = await db.find(acc.auth?.tenantId); resolved = true; } return result; }; return { value: { resolve } }; };}