Request Interception
Wrap every LLM call as a structured event without blocking the request path.
What it does
LeanLLM.chat() and LeanLLM.completion() route the call through LiteLLM, build a structured LLMEvent afterwards (success, error, or streaming), and enqueue it for a background worker thread. The request thread returns as soon as the provider response arrives — capture, persistence, retries and migrations all happen off the hot path.
The chat path is never blocking by design. Hooks, sampling decisions, content capture, and queue enqueue all stay synchronous and bounded. DB I/O lives entirely on the worker.
When to use
- You want token, cost, latency, prompt and response captured for every model call.
- You need streaming support with time-to-first-token measurement.
- You want errors recorded as events too — not just successes.
- You want to plug observability without coupling your code to a vendor.
API
leanllm.LeanLLM — the client. Everything goes through chat() / completion().
Signatures
client.chat(
model: str,
messages: list[dict[str, str]],
labels: dict[str, str] | None = None,
*,
request_id: str | None = None,
correlation_id: str | None = None,
parent_request_id: str | None = None,
context: LeanLLMContext | None = None,
log: bool = True,
sample: float | None = None,
redaction_mode: RedactionMode | None = None,
**kwargs, # forwarded to LiteLLM (temperature, tools, stream, etc.)
)
client.completion(
model: str,
prompt: str,
labels: dict[str, str] | None = None,
*,
# same keyword args as chat()
**kwargs,
)
chat() returns whatever LiteLLM returns: a ModelResponse for non-streaming calls, or an iterator of chunks when stream=True.
Hooks
LeanLLM(
api_key="sk-...",
pre_call_hook=lambda snapshot: ..., # before LiteLLM is invoked
post_call_hook=lambda event: ..., # after a successful event is built
error_hook=lambda exc, snapshot: ..., # after an error event is built
on_dropped_events=lambda count, reason: ...,
)
Hooks must not raise into the caller — exceptions inside a hook are logged and swallowed.
Examples
Basic call
from leanllm import LeanLLM, LeanLLMConfig
client = LeanLLM(
api_key="sk-...",
config=LeanLLMConfig(database_url="sqlite:///events.db"),
)
response = client.chat(
model="gpt-4o-mini",
messages=[{"role": "user", "content": "Say hi."}],
labels={"feature": "demo"},
)
print(response.choices[0].message.content)
Streaming with time-to-first-token
from leanllm import LeanLLM, LeanLLMConfig
client = LeanLLM(
api_key="sk-...",
config=LeanLLMConfig(database_url="sqlite:///events.db"),
)
stream = client.chat(
model="gpt-4o-mini",
messages=[{"role": "user", "content": "Stream a haiku."}],
stream=True,
)
for chunk in stream:
print(chunk.choices[0].delta.content or "", end="", flush=True)
# After the stream completes, the worker has an event with
# time_to_first_token_ms and total_stream_time_ms populated.
Capturing errors as events
from leanllm import LeanLLM, LeanLLMConfig
def on_error(exc, snapshot):
print("LLM call failed:", snapshot["model"], type(exc).__name__)
client = LeanLLM(
api_key="sk-...",
config=LeanLLMConfig(database_url="sqlite:///events.db"),
error_hook=on_error,
)
try:
client.chat(model="invalid/model", messages=[{"role": "user", "content": "x"}])
except Exception:
pass
# An event with error_kind set is in the queue, regardless of sampling.
Configuration
| Field | Env var | Default | What it does |
|---|---|---|---|
enable_persistence | LEANLLM_ENABLE_PERSISTENCE | true | Master switch for queue + worker. |
queue_max_size | LEANLLM_QUEUE_MAX_SIZE | 10000 | Capacity of the in-memory queue; overflow drops oldest. |
batch_size | LEANLLM_BATCH_SIZE | 100 | Worker flush trigger (events). |
flush_interval_ms | LEANLLM_FLUSH_INTERVAL_MS | 180000 | Worker flush trigger (ms). |
Full list: configuration.
Edge cases & gotchas
log=Falseis a hard bypass. No event is built, no hooks fire, the call is a pure pass-through to LiteLLM. Use it for health checks and hot-path probes.- Errors always emit. Sampling does not apply to error events — they are operational signal.
- Streaming events are emitted on stream close.
total_tokensand content are aggregated across chunks; the event lands in the queue when the iterator is exhausted (or raises). api_keyis the provider key. LiteLLM picks the right provider based on themodelstring. The LeanLLM-specific token isLEANLLM_API_KEY, used only for the remote backend.- Persistence disabled = events stay in memory. Without
database_urlorleanllm_api_key, events still flow throughclient.last_eventandclient.recent_events()— useful for notebooks.