Batch Processing¶
Process multiple requests efficiently with batch processing. Ideal for bulk operations, data processing pipelines, and scenarios where real-time responses aren't required.
Overview¶
Batch processing allows you to:
- Submit multiple requests at once
- Process them asynchronously in the background
- Retrieve results later
- Track job status
- Handle partial failures gracefully
When to Use Batch Processing
- Processing large datasets
- Non-interactive workloads
- Cost optimization (some providers offer batch discounts)
- Background tasks
Basic Usage¶
basic_batch.py
from nagents import BatchManager, BatchRequest
batch_manager = BatchManager(provider=provider)
# Create batch requests
requests = [
BatchRequest(
id="req-1",
messages=[{"role": "user", "content": "Summarize: AI is transforming..."}]
),
BatchRequest(
id="req-2",
messages=[{"role": "user", "content": "Summarize: Machine learning..."}]
),
BatchRequest(
id="req-3",
messages=[{"role": "user", "content": "Summarize: Neural networks..."}]
),
]
# Submit batch job
job = await batch_manager.submit(requests)
print(f"Job ID: {job.id}")
# Check status
status = await batch_manager.get_status(job.id)
print(f"Status: {status}")
# Get results when complete
results = await batch_manager.get_results(job.id)
for result in results:
print(f"{result.id}: {result.response[:100]}...")
Batch Status¶
Jobs progress through these states:
stateDiagram-v2
[*] --> pending: submit()
pending --> processing: Job starts
processing --> completed: All done
processing --> failed: Fatal error
pending --> cancelled: cancel()
processing --> cancelled: cancel()
completed --> [*]
failed --> [*]
cancelled --> [*]
| Status | Description |
|---|---|
pending |
Job submitted, waiting to start |
processing |
Job is being processed |
completed |
All requests finished |
failed |
Job failed (fatal error) |
cancelled |
Job was cancelled |
Polling for Results¶
polling.py
import asyncio
from nagents import BatchStatus
job = await batch_manager.submit(requests)
while True:
status = await batch_manager.get_status(job.id)
match status:
case BatchStatus.COMPLETED:
print("Job completed!")
break
case BatchStatus.FAILED:
raise Exception("Batch job failed")
case BatchStatus.CANCELLED:
print("Job was cancelled")
break
case _:
print(f"Status: {status}, waiting...")
await asyncio.sleep(5) # (1)!
results = await batch_manager.get_results(job.id)
- Poll every 5 seconds. Adjust based on expected job duration.
Batch Store¶
Persist batch jobs and results for reliability:
persistent_batch.py
from pathlib import Path
from nagents import BatchManager, BatchStore
# Create persistent store
store = BatchStore(Path("batches.db"))
await store.initialize()
batch_manager = BatchManager(
provider=provider,
store=store, # Enable persistence
)
# Jobs and results are automatically persisted
job = await batch_manager.submit(requests)
# Even after restart, you can retrieve results
results = await batch_manager.get_results(job.id)
Production Use
Always use a BatchStore in production to:
- Survive process restarts
- Track job history
- Enable debugging and auditing
Cancelling Jobs¶
cancel_job.py
job = await batch_manager.submit(requests)
# Cancel if needed
await batch_manager.cancel(job.id)
status = await batch_manager.get_status(job.id)
assert status == BatchStatus.CANCELLED
Error Handling¶
Individual requests can fail without failing the entire batch:
error_handling.py
results = await batch_manager.get_results(job.id)
successful = []
failed = []
for result in results:
if result.error:
failed.append(result)
print(f":material-alert: Request {result.id} failed: {result.error}")
else:
successful.append(result)
print(f":material-check: Request {result.id}: {result.response[:50]}...")
print(f"\nSummary: {len(successful)} succeeded, {len(failed)} failed")
Complete Example¶
Best Practices¶
Batch Processing Tips
- Use meaningful IDs - Makes tracking and debugging easier
- Implement retry logic - Re-submit failed requests
- Use persistent storage - Always use
BatchStorein production - Handle partial failures - Check each result for errors
- Set reasonable poll intervals - Balance responsiveness vs. API calls
- Consider rate limits - Large batches may have provider-specific limits