Lineage & Execution Graph

Reconstruct multi-step LLM flows from captured events with subtree cost, latency, and tokens.

What it does

Given a list of LLMEvents, build_execution_graphs(events=...) groups them by correlation_id, links each child to its parent_request_id, and returns one ExecutionGraph per correlation. Each ExecutionNode carries its own metrics plus aggregated subtree_cost, subtree_latency_ms, and subtree_tokens.

parse_tool_calls(raw=...) converts LLMEvent.tool_calls (raw provider dicts) into typed ToolCallRecords with parsed arguments.

Chain is a small helper that auto-advances parent_request_id across a sequence of calls — pass it as post_call_hook and it'll record itself.

When to use

  • You ran a multi-step agent and want to see the call tree with cost roll-ups.
  • You need to render a flame chart or DAG of an LLM workflow.
  • You want typed tool-call records (id, name, arguments dict) instead of raw dicts.
  • You want effortless parent linking across calls without threading IDs through your code.

API

Re-exported from leanllm:

  • ExecutionGraph, ExecutionNode — Pydantic models.
  • ToolCallRecord — typed tool call.
  • build_execution_graphs(events=...) — graph builder.
  • parse_tool_calls(raw=...) — raw dicts → typed records.
  • Chain — auto-advancing parent linker.

Signatures

class ExecutionNode(BaseModel):
    event_id: str
    parent_request_id: str | None
    correlation_id: str | None
    model: str
    provider: str
    cost: float
    latency_ms: int
    input_tokens: int
    output_tokens: int
    total_tokens: int
    tool_calls: list[ToolCallRecord]
    children: list[ExecutionNode]
    subtree_cost: float
    subtree_latency_ms: int
    subtree_tokens: int

class ExecutionGraph(BaseModel):
    correlation_id: str | None
    roots: list[ExecutionNode]

    def flatten(self) -> list[ExecutionNode]: ...
    def to_ordered_steps(self) -> list[ExecutionNode]: ...
    def total_cost(self) -> float: ...
    def total_latency_ms(self) -> int: ...
    def total_tokens(self) -> int: ...

def build_execution_graphs(*, events: list[LLMEvent]) -> list[ExecutionGraph]: ...
def parse_tool_calls(*, raw: list[dict] | None) -> list[ToolCallRecord]: ...

class Chain:
    def __init__(self, *, correlation_id: str | None = None) -> None: ...
    @property
    def correlation_id(self) -> str: ...
    @property
    def last_request_id(self) -> str | None: ...
    def kwargs(self) -> dict[str, Any]: ...
    def record(self, *, event: LLMEvent) -> None: ...
    def reset(self) -> None: ...
    def __call__(self, event: LLMEvent) -> None: ...   # post_call_hook signature

Examples

Build a graph from a correlation

import asyncio
from leanllm import LeanLLM, LeanLLMConfig, build_execution_graphs

client = LeanLLM(
    api_key="sk-...",
    config=LeanLLMConfig(database_url="sqlite:///events.db"),
)

async def main() -> None:
    events = await client.list_events(correlation_id="flow-42", limit=200)
    graphs = build_execution_graphs(events=events)
    for graph in graphs:
        print(graph.correlation_id, "→",
              f"${graph.total_cost():.4f}",
              f"{graph.total_latency_ms()}ms",
              f"{graph.total_tokens()} tokens")
        for node in graph.flatten():
            indent = ""  # the tree shape lives in node.children
            print(node.event_id, node.model, f"${node.cost:.4f}")

asyncio.run(main())
from leanllm import LeanLLM, LeanLLMConfig, Chain

chain = Chain()
client = LeanLLM(
    api_key="sk-...",
    config=LeanLLMConfig(database_url="sqlite:///events.db"),
    post_call_hook=chain,   # `Chain.__call__(event)` advances the pointer
)

r1 = client.chat(
    model="gpt-4o-mini",
    messages=[{"role": "user", "content": "Plan."}],
    **chain.kwargs(),
)
r2 = client.chat(
    model="gpt-4o-mini",
    messages=[{"role": "user", "content": "Execute."}],
    **chain.kwargs(),
)
# r2's stored event has parent_request_id == r1's event_id, both share the
# same correlation_id (chain.correlation_id).

Parse tool calls from a stored event

import asyncio
from leanllm import LeanLLM, LeanLLMConfig, parse_tool_calls

client = LeanLLM(
    api_key="sk-...",
    config=LeanLLMConfig(database_url="sqlite:///events.db"),
)

async def main() -> None:
    event = await client.get_event(event_id="<event with tool calls>")
    records = parse_tool_calls(raw=event.tool_calls)
    for tc in records:
        print(tc.tool_name, tc.arguments)

asyncio.run(main())

Configuration

Lineage uses parent_request_id and correlation_id from LLMEvent. Two ways to set them:

  • explicitly per call (client.chat(..., correlation_id=..., parent_request_id=...)),
  • automatically with auto_chain=True (see DX helpers) and trace() (see context).

Edge cases & gotchas

  • Cross-correlation parents are dropped. If a parent_request_id points to an event in a different correlation, the child becomes a root in its own group.
  • Missing parents become roots. Events whose parent isn't in the input list are treated as top-level for the graph build.
  • Sampling can break chains. Sampled-out events don't reach storage, so a parent_request_id may point to nothing. Use sparingly when lineage matters.
  • Chain only records events with the matching correlation. Events emitted with a different correlation_id (e.g. nested trace()) won't advance the chain.

See also