python.
python6 min read

FastAPI Request-ID Middleware: Correlation Across Upstream Services

Build a request-id + structured logging middleware in FastAPI using contextvars, a JSON formatter, and propagation rules that survive async hops and outbound httpx calls.

FastAPI Request-ID Middleware: Correlation Across Upstream Services

A request without an ID is a needle you can't find. Once your FastAPI service starts fanning out to a database, a cache, and two or three upstream HTTP services, the only way to reconstruct what happened during a 503 is to thread one identifier through every log line, every span, and every outbound request. That identifier is the request-id, and the cleanest place to mint and propagate it is a small ASGI-aware middleware backed by contextvars.

This piece walks through a production-shaped pattern: accept an inbound X-Request-ID if a caller already set one, otherwise mint a fresh UUID, stash it in a ContextVar, attach it to every log record via a JSON formatter, return it on the response, and forward it on outbound httpx calls. The whole thing fits in roughly 80 lines and survives async def hops, background tasks, and concurrent requests without leaking state between them.

Why contextvars over a request-scoped dependency

FastAPI gives you two obvious ways to carry a value through a request: pass it explicitly as a dependency, or stash it on request.state. Both work for handler code, but neither reaches the logging layer. Your logger is invoked from utility modules, ORM hooks, and background tasks that never see the Request object. Threading request into every helper is the kind of refactor that loses the war against scope creep within a week.

contextvars.ContextVar, standardized in PEP 567, solves this. asyncio copies the current context whenever it schedules a coroutine, so a value set at the top of a request handler is visible inside any await-reachable code \u2014 including a logging.Formatter invoked three modules deep. Compared with thread-locals, contextvars are async-safe by construction; compared with request.state, they reach code that doesn't take a Request.

The tradeoff: background work spawned with asyncio.create_task from inside a request still inherits the context, which is usually what you want, but a long-lived worker started at app boot has no context unless you set one explicitly. The middleware below sets the var on every request and resets it on the way out, so cross-request bleed is impossible.

The middleware

Use BaseHTTPMiddleware if you want a small surface area, or write a pure ASGI middleware if you care about the extra 1-3ms that BaseHTTPMiddleware adds per request. For most services the readability wins.

import logging
import uuid
from contextvars import ContextVar
from typing import Awaitable, Callable

from fastapi import FastAPI, Request, Response
from starlette.middleware.base import BaseHTTPMiddleware

request_id_var: ContextVar[str] = ContextVar("request_id", default="-")

HEADER = "X-Request-ID"


class RequestIDMiddleware(BaseHTTPMiddleware):
    async def dispatch(
        self,
        request: Request,
        call_next: Callable[[Request], Awaitable[Response]],
    ) -> Response:
        incoming = request.headers.get(HEADER)
        rid = incoming if _looks_safe(incoming) else uuid.uuid4().hex
        token = request_id_var.set(rid)
        try:
            response = await call_next(request)
        finally:
            request_id_var.reset(token)
        response.headers[HEADER] = rid
        return response


def _looks_safe(value: str | None) -> bool:
    if not value:
        return False
    if len(value) > 128:
        return False
    return all(c.isalnum() or c in "-_" for c in value)

Two details matter here. First, you validate any inbound header before trusting it \u2014 log injection via a malicious X-Request-ID: \ FAKE-LINE: evil is a real attack and _looks_safe keeps you from echoing arbitrary bytes into your log stream. Second, you reset(token) in a finally block; without it, a 500 raised inside call_next leaves a stale ID visible to the next coroutine the worker handles.

The JSON formatter

Plain text logs are fine for a tutorial. In production you want one JSON object per line so Loki, Elasticsearch, or CloudWatch can index by request_id and reconstruct a trace.

import json
import logging
import time


class JSONFormatter(logging.Formatter):
    def format(self, record: logging.LogRecord) -> str:
        payload = {
            "ts": time.time(),
            "level": record.levelname,
            "logger": record.name,
            "msg": record.getMessage(),
            "request_id": request_id_var.get(),
        }
        if record.exc_info:
            payload["exc"] = self.formatException(record.exc_info)
        for key, value in getattr(record, "extra", {}).items():
            payload[key] = value
        return json.dumps(payload, default=str)


def configure_logging() -> None:
    handler = logging.StreamHandler()
    handler.setFormatter(JSONFormatter())
    root = logging.getLogger()
    root.handlers = [handler]
    root.setLevel(logging.INFO)

The formatter reads request_id_var.get() at format time, not at log-call time. That ordering matters: it means every logger.info(...) call inside a request, including ones in libraries that know nothing about your middleware, gets the right ID for free. default="-" on the ContextVar ensures app-boot logs that fire before any request still render valid JSON.

Propagating to upstream services

The whole point of a correlation ID collapses if your service calls a payments API and the payments team's logs show a different ID. Forward the header on every outbound httpx request via an event hook:

import httpx


async def _attach_request_id(request: httpx.Request) -> None:
    request.headers.setdefault(HEADER, request_id_var.get())


def make_http_client() -> httpx.AsyncClient:
    return httpx.AsyncClient(
        timeout=httpx.Timeout(5.0, connect=2.0),
        event_hooks={"request": [_attach_request_id]},
    )

Wire one shared AsyncClient into your app via lifespan and inject it into handlers as a dependency. Now any call your code makes \u2014 whether it's at the route level or buried in a service module \u2014 carries the inbound ID outward without ceremony. If the upstream service also runs this middleware, you get end-to-end correlation across two hops with zero extra plumbing.

For an alternative that gives you W3C-standard trace context instead of a custom header, OpenTelemetry's FastAPI instrumentation propagates traceparent automatically. Pick OTel when you've committed to a tracing backend like Tempo or Honeycomb; pick the manual middleware when you want one extra header in your logs without standing up a collector.

Wiring it together

def create_app() -> FastAPI:
    configure_logging()
    app = FastAPI()
    app.add_middleware(RequestIDMiddleware)
    return app


app = create_app()
log = logging.getLogger("api")


@app.get("/orders/{order_id}")
async def get_order(order_id: str) -> dict:
    log.info("fetching order", extra={"extra": {"order_id": order_id}})
    return {"order_id": order_id}

A request like curl -H 'X-Request-ID: abc123' http://localhost:8000/orders/42 produces a log line like {"ts": ..., "level": "INFO", "logger": "api", "msg": "fetching order", "request_id": "abc123", "order_id": "42"} and a response with X-Request-ID: abc123 echoed back. A request with no header gets a fresh 32-character hex ID, returned to the client so they can quote it in a support ticket.

What to watch for

Three failure modes show up in real deployments. Background tasks started via BackgroundTasks inherit context correctly, but ones scheduled through Celery or arq do not \u2014 you must serialize the ID into the task payload and re-set the ContextVar on the worker side. WebSocket connections live longer than one request; if you set the ID once at connect time it stays for the whole connection, which is usually fine but means a per-message ID needs a different scheme. And if you use BaseHTTPMiddleware, exceptions raised inside it are wrapped \u2014 a benchmark by encode/starlette#1715 measured 2-3\u00d7 the latency of pure ASGI middleware under load. For services handling 500+ rps, switch to pure ASGI; below that the difference is noise.

The pattern above gives you correlation IDs that survive await, structured JSON that ships straight to a log aggregator, and propagation that crosses service boundaries \u2014 all without touching a single route handler. Adding it to a fresh service takes about 15 minutes. Adding it to a logging-spaghetti legacy service takes longer, but pays for itself the first time you need to reconstruct what happened during a 4 AM incident.

References: