Events¶
nagents uses an event-based streaming architecture. Events are emitted during agent execution to provide real-time updates on generation progress, tool calls, and completion.
Event Types Overview¶
| Event | Description | When Emitted |
|---|---|---|
TextChunkEvent |
Streaming text chunk | During streaming response |
TextDoneEvent |
Complete text | Non-streaming mode |
ToolCallEvent |
Model is calling a tool | Before tool execution |
ToolResultEvent |
Tool execution completed | After tool execution |
ErrorEvent |
Error occurred | On errors |
DoneEvent |
Generation complete | At the end |
Base Event¶
All events inherit from Event and include common fields:
@dataclass
class Event:
type: EventType # Type discriminator
timestamp: datetime # When the event occurred
usage: Usage # Token usage (always present)
Usage is Never None
The usage field is always present on events. During streaming, intermediate events may have zero counts until the final DoneEvent.
Usage Information¶
Track token usage for cost monitoring and optimization:
@dataclass
class TokenUsage:
prompt_tokens: int # Input tokens
completion_tokens: int # Output tokens
total_tokens: int # Sum of above
@dataclass
class Usage:
prompt_tokens: int # Current generation
completion_tokens: int # Current generation
total_tokens: int # Current generation
session: TokenUsage | None # Cumulative across tool rounds
async for event in agent.run("Hello"):
if isinstance(event, DoneEvent):
print(f"This request: {event.usage.total_tokens} tokens")
if event.usage.session:
print(f"Session total: {event.usage.session.total_tokens} tokens")
TextChunkEvent¶
Emitted for each streaming text chunk:
async for event in agent.run("Tell me a story"):
if isinstance(event, TextChunkEvent):
print(event.chunk, end="", flush=True) # (1)!
- Use
flush=Trueto ensure immediate output during streaming.
TextDoneEvent¶
Emitted when complete text is available (non-streaming mode):
async for event in agent.run("Hello", stream=False):
if isinstance(event, TextDoneEvent):
print(event.text)
ToolCallEvent¶
Emitted when the model calls a tool:
@dataclass
class ToolCallEvent(Event):
id: str # Unique call identifier
name: str # Tool function name
arguments: dict # Tool arguments
if isinstance(event, ToolCallEvent):
print(f"Calling {event.name}")
print(f" ID: {event.id}")
print(f" Args: {event.arguments}")
ToolResultEvent¶
Emitted after tool execution:
@dataclass
class ToolResultEvent(Event):
id: str # Matches ToolCallEvent.id
name: str # Tool function name
result: Any # Tool return value (if successful)
error: str | None # Error message (if failed)
duration_ms: float # Execution time in milliseconds
if isinstance(event, ToolResultEvent):
if event.error:
print(f":material-alert: {event.name} failed: {event.error}")
else:
print(f":material-check: {event.name} returned: {event.result}")
print(f" Duration: {event.duration_ms:.1f}ms")
ErrorEvent¶
Emitted when an error occurs:
@dataclass
class ErrorEvent(Event):
message: str # Error message
code: str | None # Error code (provider-specific)
recoverable: bool # Can the conversation continue?
if isinstance(event, ErrorEvent):
print(f"Error: {event.message}")
if event.code:
print(f"Code: {event.code}")
if not event.recoverable:
print("Cannot continue conversation")
DoneEvent¶
Emitted when generation is complete:
@dataclass
class DoneEvent(Event):
final_text: str # Complete response text
session_id: str | None # Session ID used
if isinstance(event, DoneEvent):
print(f"\n--- Generation Complete ---")
print(f"Response length: {len(event.final_text)} characters")
print(f"Session: {event.session_id}")
print(f"Tokens used: {event.usage.total_tokens}")
Complete Example¶
Here's a comprehensive example handling all event types:
Pattern Matching¶
Python's pattern matching makes event handling elegant:
Event Flow Diagram¶
stateDiagram-v2
[*] --> Processing: agent.run()
Processing --> TextChunk: Streaming response
TextChunk --> TextChunk: More chunks
TextChunk --> ToolCall: Tool needed
Processing --> ToolCall: Tool needed
ToolCall --> ToolResult: Tool executes
ToolResult --> Processing: Continue generation
TextChunk --> Done: Response complete
Processing --> Done: Response complete
Processing --> Error: Error occurs
Done --> [*]
Error --> [*]
Best Practices¶
Event Handling Tips
- Always handle
ErrorEvent- Don't let errors go unnoticed - Use
flush=Truefor streaming - Ensures immediate output - Track usage on
DoneEvent- For cost monitoring - Save
session_id- If you want to continue the conversation later - Handle
recoverableflag - Know when to retry vs. abort