Skip to content

Custom Middleware

You need to write custom middleware — feature flags, audit logging, request enrichment, or third-party integrations — but ergo’s accumulator-based return-value model differs fundamentally from Express or Fastify’s event/mutation approach. You need to understand the contract to write middleware that composes cleanly with the pipeline.

Every middleware function receives four arguments and returns {value?, response?}:

function myMiddleware(options) {
// Factory: runs once at startup
const precomputed = buildConfig(options);
// Inner function: runs per-request
return (req, res, domainAcc, responseAcc) => {
// Return {value?, response?} or undefined
return { value: { /* domain data */ } };
};
}

Return value interpretation:

Return ShapeBehavior
{value}Merged into the domain accumulator
{response}Merged into the response accumulator
{value, response}Both merges applied
undefined or nullNo-op — pipeline continues unchanged
Plain object (no value/response keys)Treated as {value: returnedObject}

When response.statusCode is set, the pipeline breaks immediately — no subsequent middleware executes.

Use a {fn, setPath} config object to set a named path on the domain accumulator. This is the standard pattern for middleware that produces a single named result:

import { compose, handler } from "@centralping/ergo";
function featureFlags(options) {
const flagService = createFlagClient(options);
return async (req, res, acc) => {
const flags = await flagService.evaluate(acc.auth?.sub);
return { value: flags };
};
}
const pipeline = compose(
{fn: featureFlags({ endpoint: "https://flags.internal" }), setPath: "flags"},
// acc.flags is now available to subsequent middleware
(req, res, acc) => ({
response: {
body: { features: acc.flags, user: acc.auth?.sub },
},
}),
);
export default handler(pipeline);

Without a setPath (bare function, no config object), the returned value is merged via Object.assign(domainAcc, value) — use this when contributing multiple keys:

function requestMetadata() {
return (req, res, acc) => ({
value: {
receivedAt: Date.now(),
correlationId: req.headers["x-correlation-id"] ?? crypto.randomUUID(),
},
});
}

Return {response: {...}} to contribute headers, status codes, or other response metadata:

function apiVersion(version) {
const header = ["X-API-Version", version];
return () => ({
response: { headers: [header] },
});
}

Measuring response time via the accumulator requires a two-step pattern because middleware return values are captured before send() writes the response. Register the timer with setPath: "timing" so downstream middleware reads acc.timing.startedAt:

function responseTimer() {
return (req, res, acc) => {
return { value: { startedAt: performance.now() } };
};
}
const pipeline = compose(
{fn: responseTimer(), setPath: "timing"},
// ... other middleware ...
(req, res, acc) => {
const elapsed = (performance.now() - acc.timing.startedAt).toFixed(2);
return {
response: {
headers: [["X-Response-Time", `${elapsed}ms`]],
body: { result: "ok" },
},
};
},
);

When multiple middleware or your execute handler need the same expensive lookup, use a factory that creates a memoized resolver. The first middleware to call it performs the fetch; subsequent accesses return the cached result. Store the resolver on the domain accumulator so it lives only for the current request:

import { compose, handler } from "@centralping/ergo";
function tenantResolver(options) {
const db = createDbClient(options);
return (req, res, acc) => {
let result;
let resolved = false;
const resolve = async () => {
if (!resolved) {
result = await db.tenants.findById(acc.auth?.tenantId);
resolved = true;
}
return result;
};
return { value: { resolve } };
};
}
const pipeline = compose(
// ... auth middleware populates acc.auth ...
{fn: tenantResolver({ connectionString: DB_URL }), setPath: "tenant"},
// Both permissions check and execute can call acc.tenant.resolve()
// without redundant DB queries
async (req, res, acc) => {
const tenant = await acc.tenant.resolve();
return {
response: { body: { name: tenant.name, plan: tenant.plan } },
};
},
);
export default handler(pipeline);

Record request context for compliance or analytics without affecting the response. An audit middleware stores structured entries on the accumulator; the execute handler (or a downstream logger) flushes them:

import { compose, handler } from "@centralping/ergo";
function auditLog(options) {
const sink = createAuditSink(options);
return (req, res, acc) => {
const entry = {
actor: acc.auth?.sub,
action: `${req.method} ${acc.url?.pathname}`,
timestamp: Date.now(),
ip: req.socket.remoteAddress,
};
sink.record(entry);
return { value: entry };
};
}
const pipeline = compose(
// ... auth + url middleware ...
{fn: auditLog({ stream: "api-audit" }), setPath: "audit"},
(req, res, acc) => ({
response: {
body: { result: "ok", auditId: acc.audit.timestamp },
},
}),
);
export default handler(pipeline);

Set response.statusCode to break the pipeline immediately. Use this for middleware that rejects requests before they reach the execute handler:

import { compose, handler } from "@centralping/ergo";
function requireAuditContext() {
return (req, res, acc) => {
if (!acc.auth?.sub) {
return {
response: {
statusCode: 401,
body: {
type: "https://developer.mozilla.org/en-US/docs/Web/HTTP/Status/401",
title: "Unauthorized",
detail: "Audit context requires authenticated identity.",
},
},
};
}
return { value: { auditActor: acc.auth.sub } };
};
}
const pipeline = compose(
// ... auth middleware ...
{fn: requireAuditContext(), setPath: "audit"},
// Only reached if auth context exists
(req, res, acc) => ({
response: { body: { actor: acc.audit.auditActor } },
}),
);
export default handler(pipeline);

fromConnect() wraps Connect/Express-style (req, res, next) middleware for use in an ergo pipeline:

import { compose, handler, fromConnect } from "@centralping/ergo";
import helmet from "helmet";
const pipeline = compose(
fromConnect(helmet()),
// ... remaining middleware
);
export default handler(pipeline);

Hybrid pattern — use fromConnect() for side effects and a native middleware for accumulator contribution:

import { compose, handler, fromConnect } from "@centralping/ergo";
import someExpressMiddleware from "some-express-middleware";
const pipeline = compose(
// Side effects (sets res headers)
fromConnect(someExpressMiddleware()),
// Native middleware reads/writes accumulators
{fn: myNativeMiddleware(), setPath: "enrichment"},
);
export default handler(pipeline);

In ergo-router, the use config key inserts custom middleware after Stage 3 (Validation) and before Stage 4 (Execution). This means custom middleware has full access to everything produced by earlier stages — URL parsing, body parsing, authentication, cookies, and validated parameters:

router.get("/resource/:id", {
validate: { params: paramsSchema },
use: [{fn: myMiddleware(), setPath: "custom"}],
// acc.url, acc.body, acc.auth, acc.cookies, acc.route.params
// are all available inside myMiddleware
execute: (req, res, acc) => ({ /* ... */ }),
});

Concatenation behavior: defaults.use and route-level use are concatenated — defaults run first, then route-specific middleware:

const router = createRouter({
defaults: {
use: [{fn: globalMiddleware(), setPath: "global"}],
},
});
router.get("/endpoint", {
// Pipeline runs: ...defaults.use → ...route.use → execute
use: [{fn: routeMiddleware(), setPath: "route"}],
execute: (req, res, acc) => ({ /* ... */ }),
});

Disabling custom middleware: Set use: false on a route to skip all custom middleware (both defaults and route-level):

router.get("/health", {
use: false,
execute: () => ({ response: { body: { status: "ok" } } }),
});

Custom middleware in use position receives the full domain accumulator built by the Negotiation, Authorization, and Validation stages. Available keys depend on which middleware ran before yours:

KeySource StageDescription
acc.urlNegotiationParsed pathname, query (acc.url.query), search
acc.cookiesNegotiationParsed cookies
acc.authAuthorizationAuthentication result
acc.bodyValidationParsed request body (acc.body.parsed for the parsed content)
acc.route.paramsRouter transportRoute path parameters (seeded by ergo-router before the pipeline runs)

The response accumulator (responseAcc) contains any headers or metadata set by earlier middleware (security headers, CORS, cache directives).

ConcernExpress/Connectergo
Data passingreq.user = data (mutation)return {value: {user: data}} (accumulator)
Pipeline controlnext() / next(err)Return undefined (continue) or {response: {statusCode}} (break)
Response headersres.setHeader() (imperative){response: {headers: [...]}} (declarative)
Error handling(err, req, res, next){response: {statusCode, detail}}
Side effectsEncouraged (mutate req/res)Avoided (pure return values)
TestabilityRequires mocking req/res/nextCall function, assert return value
// ❌ Express habit — mutates shared state
function bad(req, res, acc) {
req.user = acc.auth?.sub;
res.setHeader("X-Custom", "value");
}
// ✅ Return values — composable and testable
function good(req, res, acc) {
return {
value: { user: acc.auth?.sub },
response: { headers: [["X-Custom", "value"]] },
};
}

Returning a Plain Object When You Mean Response Metadata

Section titled “Returning a Plain Object When You Mean Response Metadata”

A plain object (without explicit value or response keys) is treated as {value: returnedObject} and merged into the domain accumulator. If you intend to contribute headers or a status code, wrap in response:

// ❌ This merges {headers: [...]} into the DOMAIN accumulator
function bad() {
return { headers: [["X-Timing", "42ms"]] };
}
// ✅ Explicit response — headers go to the response accumulator
function good() {
return { response: { headers: [["X-Timing", "42ms"]] } };
}

When responseAcc.statusCode is set by an earlier middleware, the pipeline breaks and your middleware will not execute. Do not design middleware that depends on running “cleanup” after a rejection — use the response accumulator or res event listeners instead:

// ❌ Assumes this always runs — it won't after a 4xx/5xx break
function bad() {
return (req, res, acc) => {
doSetup();
// If earlier middleware set statusCode, this never executes
return { value: { ready: true } };
};
}
// ✅ If cleanup is needed, attach to res events at request time
function good() {
return (req, res) => {
res.on("close", () => releaseResources());
return undefined;
};
}

Using fromConnect() When You Need Accumulator Access

Section titled “Using fromConnect() When You Need Accumulator Access”

fromConnect() always returns undefined — it cannot read the domain accumulator or contribute data to it. If your middleware needs acc, write a native ergo middleware:

// ❌ Cannot access acc.auth inside helmet()
compose(fromConnect(helmet()), /* ... */);
// ✅ Native middleware with full accumulator access
function rateLimitByUser() {
return (req, res, acc) => {
const key = acc.auth?.sub ?? req.socket.remoteAddress;
// ... rate limit logic using the accumulator ...
return undefined;
};
}

Avoid storing per-request data in module-level variables. Each request must be independent — use factory closures or the domain accumulator:

// ❌ Shared mutable state — race conditions, memory leaks
const requestCache = new Map();
function bad() {
return (req, res, acc) => {
requestCache.set(req, { /* data */ });
// Who deletes this entry? When?
};
}
// ✅ Closure-scoped per invocation — no shared state
function good(options) {
const db = createClient(options);
return (req, res, acc) => {
let result;
let resolved = false;
const resolve = async () => {
if (!resolved) {
result = await db.find(acc.auth?.tenantId);
resolved = true;
}
return result;
};
return { value: { resolve } };
};
}