Request Interception

Wrap every LLM call as a structured event without blocking the request path.

What it does

LeanLLM.chat() and LeanLLM.completion() route the call through LiteLLM, build a structured LLMEvent afterwards (success, error, or streaming), and enqueue it for a background worker thread. The request thread returns as soon as the provider response arrives — capture, persistence, retries and migrations all happen off the hot path.

The chat path is never blocking by design. Hooks, sampling decisions, content capture, and queue enqueue all stay synchronous and bounded. DB I/O lives entirely on the worker.

When to use

  • You want token, cost, latency, prompt and response captured for every model call.
  • You need streaming support with time-to-first-token measurement.
  • You want errors recorded as events too — not just successes.
  • You want to plug observability without coupling your code to a vendor.

API

leanllm.LeanLLM — the client. Everything goes through chat() / completion().

Signatures

client.chat(
    model: str,
    messages: list[dict[str, str]],
    labels: dict[str, str] | None = None,
    *,
    request_id: str | None = None,
    correlation_id: str | None = None,
    parent_request_id: str | None = None,
    context: LeanLLMContext | None = None,
    log: bool = True,
    sample: float | None = None,
    redaction_mode: RedactionMode | None = None,
    **kwargs,           # forwarded to LiteLLM (temperature, tools, stream, etc.)
)

client.completion(
    model: str,
    prompt: str,
    labels: dict[str, str] | None = None,
    *,
    # same keyword args as chat()
    **kwargs,
)

chat() returns whatever LiteLLM returns: a ModelResponse for non-streaming calls, or an iterator of chunks when stream=True.

Hooks

LeanLLM(
    api_key="sk-...",
    pre_call_hook=lambda snapshot: ...,   # before LiteLLM is invoked
    post_call_hook=lambda event: ...,     # after a successful event is built
    error_hook=lambda exc, snapshot: ..., # after an error event is built
    on_dropped_events=lambda count, reason: ...,
)

Hooks must not raise into the caller — exceptions inside a hook are logged and swallowed.

Examples

Basic call

from leanllm import LeanLLM, LeanLLMConfig

client = LeanLLM(
    api_key="sk-...",
    config=LeanLLMConfig(database_url="sqlite:///events.db"),
)

response = client.chat(
    model="gpt-4o-mini",
    messages=[{"role": "user", "content": "Say hi."}],
    labels={"feature": "demo"},
)
print(response.choices[0].message.content)

Streaming with time-to-first-token

from leanllm import LeanLLM, LeanLLMConfig

client = LeanLLM(
    api_key="sk-...",
    config=LeanLLMConfig(database_url="sqlite:///events.db"),
)

stream = client.chat(
    model="gpt-4o-mini",
    messages=[{"role": "user", "content": "Stream a haiku."}],
    stream=True,
)
for chunk in stream:
    print(chunk.choices[0].delta.content or "", end="", flush=True)

# After the stream completes, the worker has an event with
# time_to_first_token_ms and total_stream_time_ms populated.

Capturing errors as events

from leanllm import LeanLLM, LeanLLMConfig

def on_error(exc, snapshot):
    print("LLM call failed:", snapshot["model"], type(exc).__name__)

client = LeanLLM(
    api_key="sk-...",
    config=LeanLLMConfig(database_url="sqlite:///events.db"),
    error_hook=on_error,
)

try:
    client.chat(model="invalid/model", messages=[{"role": "user", "content": "x"}])
except Exception:
    pass

# An event with error_kind set is in the queue, regardless of sampling.

Configuration

FieldEnv varDefaultWhat it does
enable_persistenceLEANLLM_ENABLE_PERSISTENCEtrueMaster switch for queue + worker.
queue_max_sizeLEANLLM_QUEUE_MAX_SIZE10000Capacity of the in-memory queue; overflow drops oldest.
batch_sizeLEANLLM_BATCH_SIZE100Worker flush trigger (events).
flush_interval_msLEANLLM_FLUSH_INTERVAL_MS180000Worker flush trigger (ms).

Full list: configuration.

Edge cases & gotchas

  • log=False is a hard bypass. No event is built, no hooks fire, the call is a pure pass-through to LiteLLM. Use it for health checks and hot-path probes.
  • Errors always emit. Sampling does not apply to error events — they are operational signal.
  • Streaming events are emitted on stream close. total_tokens and content are aggregated across chunks; the event lands in the queue when the iterator is exhausted (or raises).
  • api_key is the provider key. LiteLLM picks the right provider based on the model string. The LeanLLM-specific token is LEANLLM_API_KEY, used only for the remote backend.
  • Persistence disabled = events stay in memory. Without database_url or leanllm_api_key, events still flow through client.last_event and client.recent_events() — useful for notebooks.

See also