Storage Query API

Async read API for fetching, filtering, and counting captured events.

What it does

Once persistence is enabled, the client exposes three async read methods: get_event, list_events, and count_events. They run on the worker's asyncio loop via run_coroutine_threadsafe, so the connection pool stays bound to one loop and your calling thread isn't blocked on DB I/O.

All three methods share the same filter set: correlation_id, model, since / until, and errors_only.

When to use

  • You need to fetch one event by ID for replay or inspection.
  • You want to list events for a correlation, time window, or model.
  • You want to count events matching a filter (errors per hour, calls per model, etc.) without loading them.

API

Methods on LeanLLM:

async def get_event(self, *, event_id: str) -> LLMEvent | None: ...

async def list_events(
    self,
    *,
    correlation_id: str | None = None,
    model: str | None = None,
    since: datetime | None = None,
    until: datetime | None = None,
    errors_only: bool = False,
    limit: int = 100,
    offset: int = 0,
) -> list[LLMEvent]: ...

async def count_events(
    self,
    *,
    correlation_id: str | None = None,
    model: str | None = None,
    since: datetime | None = None,
    until: datetime | None = None,
    errors_only: bool = False,
) -> int: ...

list_events returns events ordered by timestamp descending (most recent first).

Examples

Fetch a specific event

import asyncio
from leanllm import LeanLLM, LeanLLMConfig

client = LeanLLM(
    api_key="sk-...",
    config=LeanLLMConfig(database_url="sqlite:///events.db"),
)

async def main() -> None:
    event = await client.get_event(event_id="evt-123")
    if event is None:
        print("not found")
    else:
        print(event.model, event.cost, event.latency_ms)

asyncio.run(main())

List events for a correlation

import asyncio
from leanllm import LeanLLM, LeanLLMConfig

client = LeanLLM(
    api_key="sk-...",
    config=LeanLLMConfig(database_url="sqlite:///events.db"),
)

async def main() -> None:
    events = await client.list_events(
        correlation_id="flow-42",
        limit=200,
    )
    for ev in events:
        print(ev.timestamp.isoformat(), ev.model, f"${ev.cost:.4f}")

asyncio.run(main())

Count errors in the last hour

import asyncio
from datetime import datetime, timedelta, timezone
from leanllm import LeanLLM, LeanLLMConfig

client = LeanLLM(
    api_key="sk-...",
    config=LeanLLMConfig(database_url="sqlite:///events.db"),
)

async def main() -> None:
    since = datetime.now(timezone.utc) - timedelta(hours=1)
    n = await client.count_events(since=since, errors_only=True)
    print(f"{n} errors in the last hour")

asyncio.run(main())

Pagination

import asyncio
from leanllm import LeanLLM, LeanLLMConfig

client = LeanLLM(
    api_key="sk-...",
    config=LeanLLMConfig(database_url="sqlite:///events.db"),
)

async def main() -> None:
    page_size = 50
    offset = 0
    while True:
        page = await client.list_events(limit=page_size, offset=offset)
        if not page:
            break
        for ev in page:
            print(ev.event_id)
        offset += page_size

asyncio.run(main())

Configuration

The query API reuses the persistence configuration:

FieldEnv varDefaultWhat it does
database_urlLEANLLM_DATABASE_URLNoneRequired — query API is unavailable without a self-hosted store.
enable_persistenceLEANLLM_ENABLE_PERSISTENCEtrueIf false, query methods raise RuntimeError.

The remote backend (LEANLLM_API_KEY) does not currently support read queries — the methods raise NotImplementedError. Use the SaaS UI for remote inspection, or query a self-hosted Postgres / SQLite store.

Edge cases & gotchas

  • Methods raise when persistence is disabled. If neither database_url nor leanllm_api_key is set, get_event / list_events / count_events raise RuntimeError("LeanLLM persistence is disabled — ...").
  • since / until are timezone-aware datetimes. Pass UTC datetime instances, not strings. Naive datetimes will be treated according to backend conventions and may surprise you.
  • limit is capped by the backend. The default is 100; very large values may be slow on Postgres without an index on timestamp.
  • Order is descending by timestamp. Stable for the same query; not stable for events sharing the same timestamp at sub-second precision.

See also