Storage Query API
Async read API for fetching, filtering, and counting captured events.
What it does
Once persistence is enabled, the client exposes three async read methods: get_event, list_events, and count_events. They run on the worker's asyncio loop via run_coroutine_threadsafe, so the connection pool stays bound to one loop and your calling thread isn't blocked on DB I/O.
All three methods share the same filter set: correlation_id, model, since / until, and errors_only.
When to use
- You need to fetch one event by ID for replay or inspection.
- You want to list events for a correlation, time window, or model.
- You want to count events matching a filter (errors per hour, calls per model, etc.) without loading them.
API
Methods on LeanLLM:
async def get_event(self, *, event_id: str) -> LLMEvent | None: ...
async def list_events(
self,
*,
correlation_id: str | None = None,
model: str | None = None,
since: datetime | None = None,
until: datetime | None = None,
errors_only: bool = False,
limit: int = 100,
offset: int = 0,
) -> list[LLMEvent]: ...
async def count_events(
self,
*,
correlation_id: str | None = None,
model: str | None = None,
since: datetime | None = None,
until: datetime | None = None,
errors_only: bool = False,
) -> int: ...
list_events returns events ordered by timestamp descending (most recent first).
Examples
Fetch a specific event
import asyncio
from leanllm import LeanLLM, LeanLLMConfig
client = LeanLLM(
api_key="sk-...",
config=LeanLLMConfig(database_url="sqlite:///events.db"),
)
async def main() -> None:
event = await client.get_event(event_id="evt-123")
if event is None:
print("not found")
else:
print(event.model, event.cost, event.latency_ms)
asyncio.run(main())
List events for a correlation
import asyncio
from leanllm import LeanLLM, LeanLLMConfig
client = LeanLLM(
api_key="sk-...",
config=LeanLLMConfig(database_url="sqlite:///events.db"),
)
async def main() -> None:
events = await client.list_events(
correlation_id="flow-42",
limit=200,
)
for ev in events:
print(ev.timestamp.isoformat(), ev.model, f"${ev.cost:.4f}")
asyncio.run(main())
Count errors in the last hour
import asyncio
from datetime import datetime, timedelta, timezone
from leanllm import LeanLLM, LeanLLMConfig
client = LeanLLM(
api_key="sk-...",
config=LeanLLMConfig(database_url="sqlite:///events.db"),
)
async def main() -> None:
since = datetime.now(timezone.utc) - timedelta(hours=1)
n = await client.count_events(since=since, errors_only=True)
print(f"{n} errors in the last hour")
asyncio.run(main())
Pagination
import asyncio
from leanllm import LeanLLM, LeanLLMConfig
client = LeanLLM(
api_key="sk-...",
config=LeanLLMConfig(database_url="sqlite:///events.db"),
)
async def main() -> None:
page_size = 50
offset = 0
while True:
page = await client.list_events(limit=page_size, offset=offset)
if not page:
break
for ev in page:
print(ev.event_id)
offset += page_size
asyncio.run(main())
Configuration
The query API reuses the persistence configuration:
| Field | Env var | Default | What it does |
|---|---|---|---|
database_url | LEANLLM_DATABASE_URL | None | Required — query API is unavailable without a self-hosted store. |
enable_persistence | LEANLLM_ENABLE_PERSISTENCE | true | If false, query methods raise RuntimeError. |
The remote backend (LEANLLM_API_KEY) does not currently support read queries — the methods raise NotImplementedError. Use the SaaS UI for remote inspection, or query a self-hosted Postgres / SQLite store.
Edge cases & gotchas
- Methods raise when persistence is disabled. If neither
database_urlnorleanllm_api_keyis set,get_event/list_events/count_eventsraiseRuntimeError("LeanLLM persistence is disabled — ..."). since/untilare timezone-aware datetimes. Pass UTCdatetimeinstances, not strings. Naive datetimes will be treated according to backend conventions and may surprise you.limitis capped by the backend. The default is100; very large values may be slow on Postgres without an index ontimestamp.- Order is descending by timestamp. Stable for the same query; not stable for events sharing the same timestamp at sub-second precision.