Async/Await Guide
Complete guide to using async/await with AbstractCore for concurrent LLM operations.
Table of Contents
Overview Provider support Basic Usage Async Streaming Session Async Multi-Provider Comparisons FastAPI Integration Batch Document Processing Error Handling Practical tips Common Patterns Why MLX/HuggingFace Use Fallback Best Practices Learning Resources SummaryOverview
AbstractCore exposes agenerate() for async generation across providers.
- HTTP-based providers (OpenAI-compatible endpoints, OpenRouter, Ollama, LMStudio, vLLM, etc.) implement native async I/O.
- In-process local inference providers (MLX, HuggingFace) use an
asyncio.to_thread()fallback to avoid blocking the event loop.
Concurrency can improve throughput when requests are I/O-bound (network calls). For local inference, throughput is limited by your hardware and the model runtime.
Provider support
| Provider | Async implementation |
|---|---|
openai, anthropic |
Native async SDK clients (when installed) |
HTTP-based providers (ollama, lmstudio, openrouter, vllm, openai-compatible, …) |
httpx.AsyncClient (native async HTTP) |
mlx, huggingface |
asyncio.to_thread() fallback (keeps the event loop responsive) |
Basic Usage
Single Async Request
import asyncio
from abstractcore import create_llm
async def main():
llm = create_llm("openai", model="gpt-4o-mini")
# Single async request
response = await llm.agenerate("What is Python?")
print(response.content)
asyncio.run(main())
Concurrent Requests
import asyncio
from abstractcore import create_llm
async def main():
llm = create_llm("ollama", model="qwen3:4b")
# Execute 3 requests concurrently
tasks = [
llm.agenerate(f"Summarize {topic}")
for topic in ["Python", "JavaScript", "Rust"]
]
# Gather runs all tasks concurrently
responses = await asyncio.gather(*tasks)
for i, response in enumerate(responses):
print(f"\n{['Python', 'JavaScript', 'Rust'][i]}:")
print(response.content)
asyncio.run(main())
Async Streaming
Basic Streaming
import asyncio
from abstractcore import create_llm
async def main():
llm = create_llm("anthropic", model="claude-haiku-4-5")
# Step 1: await the generator
stream_gen = await llm.agenerate(
"Write a haiku about coding",
stream=True
)
# Step 2: async for over the chunks
async for chunk in stream_gen:
if chunk.content:
print(chunk.content, end="", flush=True)
print()
asyncio.run(main())
Concurrent Streaming
import asyncio
from abstractcore import create_llm
async def stream_response(llm, topic, label):
"""Stream a single response with label."""
print(f"\n{label}:")
stream_gen = await llm.agenerate(f"Explain {topic} in one sentence", stream=True)
async for chunk in stream_gen:
if chunk.content:
print(chunk.content, end="", flush=True)
print()
async def main():
llm = create_llm("openai", model="gpt-4o-mini")
# Stream 3 responses concurrently
await asyncio.gather(
stream_response(llm, "Python", "Python"),
stream_response(llm, "JavaScript", "JavaScript"),
stream_response(llm, "Rust", "Rust")
)
asyncio.run(main())
Session Async
Async Conversation Management
import asyncio
from abstractcore import create_llm
from abstractcore.core.session import BasicSession
async def main():
llm = create_llm("openai", model="gpt-4o-mini")
session = BasicSession(provider=llm)
# Maintain conversation history with async
response1 = await session.agenerate("What is Python?")
print(response1.content)
response2 = await session.agenerate("What are its main use cases?")
print(response2.content)
# Session tracks full conversation history
print(f"\nConversation length: {len(session.conversation_history)} messages")
asyncio.run(main())
Concurrent Sessions
import asyncio
from abstractcore import create_llm
from abstractcore.core.session import BasicSession
async def chat_session(llm, topic, name):
"""Run independent chat session."""
session = BasicSession(provider=llm)
response1 = await session.agenerate(f"What is {topic}?")
response2 = await session.agenerate("Give me a simple example")
print(f"\n{name}:")
print(f" Question 1: {response1.content[:50]}...")
print(f" Question 2: {response2.content[:50]}...")
async def main():
llm = create_llm("anthropic", model="claude-haiku-4-5")
# Run 3 independent conversations concurrently
await asyncio.gather(
chat_session(llm, "Python", "Session 1"),
chat_session(llm, "JavaScript", "Session 2"),
chat_session(llm, "Rust", "Session 3")
)
asyncio.run(main())
Multi-Provider Comparisons
Concurrent Provider Queries
import asyncio
from abstractcore import create_llm
async def query_provider(provider_name, model, prompt):
"""Query a single provider."""
llm = create_llm(provider_name, model=model)
response = await llm.agenerate(prompt)
return {
"provider": provider_name,
"model": model,
"response": response.content
}
async def main():
prompt = "What is the capital of France?"
# Query multiple providers simultaneously
results = await asyncio.gather(
query_provider("openai", "gpt-4o-mini", prompt),
query_provider("anthropic", "claude-haiku-4-5", prompt),
query_provider("ollama", "qwen3:4b", prompt)
)
for result in results:
print(f"\n{result['provider']} ({result['model']}):")
print(result['response'])
asyncio.run(main())
Provider Consensus
import asyncio
from abstractcore import create_llm
async def main():
prompt = "Is the Earth flat? Answer yes or no."
# Get consensus from 3 providers
llm_openai = create_llm("openai", model="gpt-4o-mini")
llm_anthropic = create_llm("anthropic", model="claude-haiku-4-5")
llm_ollama = create_llm("ollama", model="qwen3:4b")
responses = await asyncio.gather(
llm_openai.agenerate(prompt),
llm_anthropic.agenerate(prompt),
llm_ollama.agenerate(prompt)
)
answers = [r.content.strip().lower() for r in responses]
print(f"Answers: {answers}")
print(f"Consensus: {'Yes' if answers.count('no') >= 2 else 'No'}")
asyncio.run(main())
FastAPI Integration
Async HTTP Endpoints
from fastapi import FastAPI
from abstractcore import create_llm
app = FastAPI()
llm = create_llm("openai", model="gpt-4o-mini")
@app.post("/generate")
async def generate(prompt: str):
"""Non-blocking LLM generation endpoint."""
response = await llm.agenerate(prompt)
return {"response": response.content}
@app.post("/batch")
async def batch_generate(prompts: list[str]):
"""Process multiple prompts concurrently."""
tasks = [llm.agenerate(p) for p in prompts]
responses = await asyncio.gather(*tasks)
return {
"responses": [r.content for r in responses]
}
# Run with: uvicorn your_app:app --reload
Streaming Endpoint
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from abstractcore import create_llm
import asyncio
app = FastAPI()
llm = create_llm("anthropic", model="claude-haiku-4-5")
async def stream_response(prompt: str):
"""Generate streaming response."""
stream_gen = await llm.agenerate(prompt, stream=True)
async for chunk in stream_gen:
if chunk.content:
yield f"data: {chunk.content}\n\n"
@app.post("/stream")
async def stream_generate(prompt: str):
"""Streaming LLM generation endpoint."""
return StreamingResponse(
stream_response(prompt),
media_type="text/event-stream"
)
Batch Document Processing
Concurrent Document Summaries
import asyncio
from abstractcore import create_llm
from abstractcore.processing import Summarizer
async def summarize_document(summarizer, doc_path):
"""Summarize single document."""
result = summarizer.summarize(
input_source=doc_path,
style="executive",
length="brief"
)
return {
"path": doc_path,
"summary": result.summary
}
async def main():
llm = create_llm("openai", model="gpt-4o-mini")
summarizer = Summarizer(llm)
documents = [
"report1.pdf",
"report2.pdf",
"report3.pdf"
]
# Summarize all documents concurrently
tasks = [summarize_document(summarizer, doc) for doc in documents]
results = await asyncio.gather(*tasks)
for result in results:
print(f"\n{result['path']}:")
print(result['summary'])
asyncio.run(main())
Error Handling
Graceful Error Recovery
import asyncio
from abstractcore import create_llm
from abstractcore.exceptions import ProviderAPIError
async def safe_generate(llm, prompt, label):
"""Generate with error handling."""
try:
response = await llm.agenerate(prompt)
return {"label": label, "content": response.content, "error": None}
except ProviderAPIError as e:
return {"label": label, "content": None, "error": str(e)}
async def main():
llm = create_llm("openai", model="gpt-4o-mini")
# Some requests may fail - continue processing others
results = await asyncio.gather(
safe_generate(llm, "Valid prompt 1", "Task 1"),
safe_generate(llm, "Valid prompt 2", "Task 2"),
safe_generate(llm, "Valid prompt 3", "Task 3")
)
for result in results:
if result["error"]:
print(f"{result['label']}: ERROR - {result['error']}")
else:
print(f"{result['label']}: {result['content']}")
asyncio.run(main())
Practical tips
1. Prefer native-async providers when possible
# ✅ Native async HTTP (I/O-bound)
llm = create_llm("ollama", model="qwen3:4b")
# ✅ Native async SDK (cloud APIs)
llm = create_llm("openai", model="gpt-4o-mini")
# ⚠️ Fallback: runs sync generation in a thread (keeps the event loop responsive)
llm = create_llm("mlx", model="mlx-community/Qwen3-4B-4bit")
2. Batch Similar Operations
# ✅ GOOD: Single gather for all tasks
tasks = [llm.agenerate(f"Task {i}") for i in range(10)]
results = await asyncio.gather(*tasks)
# ❌ BAD: Sequential awaits lose concurrency benefit
results = []
for i in range(10):
result = await llm.agenerate(f"Task {i}")
results.append(result)
3. Mix Async with Sync I/O
import asyncio
from abstractcore import create_llm
async def main():
llm = create_llm("anthropic", model="claude-haiku-4-5")
# Concurrent: LLM generation + file I/O
llm_task = llm.agenerate("Explain async")
file_task = asyncio.to_thread(read_large_file, "data.txt")
response, data = await asyncio.gather(llm_task, file_task)
# Both completed concurrently!
Common Patterns
Retry with Exponential Backoff
import asyncio
from abstractcore import create_llm
async def generate_with_retry(llm, prompt, max_retries=3):
"""Generate with exponential backoff retry."""
for attempt in range(max_retries):
try:
return await llm.agenerate(prompt)
except Exception as e:
if attempt == max_retries - 1:
raise
wait_time = 2 ** attempt
print(f"Retry {attempt + 1} after {wait_time}s...")
await asyncio.sleep(wait_time)
async def main():
llm = create_llm("openai", model="gpt-4o-mini")
response = await generate_with_retry(llm, "What is Python?")
print(response.content)
asyncio.run(main())
Rate Limiting
import asyncio
from abstractcore import create_llm
class RateLimiter:
def __init__(self, max_per_second):
self.max_per_second = max_per_second
self.semaphore = asyncio.Semaphore(max_per_second)
self.reset_task = None
async def acquire(self):
await self.semaphore.acquire()
# Release after 1 second
if not self.reset_task or self.reset_task.done():
self.reset_task = asyncio.create_task(self._release_after_delay())
async def _release_after_delay(self):
await asyncio.sleep(1.0)
self.semaphore.release()
async def main():
llm = create_llm("openai", model="gpt-4o-mini")
limiter = RateLimiter(max_per_second=5)
# Process 20 requests with 5 requests/second limit
async def limited_generate(prompt):
await limiter.acquire()
return await llm.agenerate(prompt)
tasks = [limited_generate(f"Task {i}") for i in range(20)]
results = await asyncio.gather(*tasks)
asyncio.run(main())
Progress Tracking
import asyncio
from abstractcore import create_llm
async def generate_with_progress(llm, prompts):
"""Generate with real-time progress tracking."""
completed = 0
total = len(prompts)
async def track_task(prompt):
nonlocal completed
response = await llm.agenerate(prompt)
completed += 1
print(f"Progress: {completed}/{total} ({completed/total*100:.1f}%)")
return response
tasks = [track_task(p) for p in prompts]
return await asyncio.gather(*tasks)
async def main():
llm = create_llm("ollama", model="qwen3:4b")
prompts = [f"Task {i}" for i in range(10)]
results = await generate_with_progress(llm, prompts)
print(f"\nCompleted {len(results)} tasks!")
asyncio.run(main())
Why MLX/HuggingFace Use Fallback
MLX and HuggingFace providers use asyncio.to_thread() fallback because:
- No Async Library APIs: Neither
mlx_lmnortransformersexpose async Python APIs - Direct Function Calls: No HTTP layer to enable concurrent I/O
- Industry Standard: Same pattern used by LangChain, Pydantic-AI for CPU-bound operations
- Event Loop Responsive: Fallback keeps event loop responsive for mixing with I/O
# MLX/HF async example (fallback keeps event loop responsive)
import asyncio
from abstractcore import create_llm
async def main():
llm = create_llm("mlx", model="mlx-community/Qwen3-4B-4bit")
# Can mix MLX inference with async I/O
inference_task = llm.agenerate("What is Python?")
io_task = fetch_data_from_api() # Async I/O
# Both run concurrently - event loop not blocked!
response, data = await asyncio.gather(inference_task, io_task)
asyncio.run(main())
If you run local inference behind an OpenAI-compatible HTTP server (for example, via LM Studio), you can use the lmstudio (or openai-compatible) provider for native async I/O to the server:
llm = create_llm("lmstudio", model="local-model", base_url="http://localhost:1234/v1")
Best Practices
1. Always Use asyncio.gather() for Concurrent Tasks
# ✅ CORRECT: All tasks run concurrently
results = await asyncio.gather(*[llm.agenerate(p) for p in prompts])
# ❌ WRONG: Sequential execution (no concurrency)
results = [await llm.agenerate(p) for p in prompts]
2. Await Stream Generator First
# ✅ CORRECT: Two-step pattern
stream_gen = await llm.agenerate(prompt, stream=True)
async for chunk in stream_gen:
print(chunk.content, end="")
# ❌ WRONG: Missing await before async for
async for chunk in llm.agenerate(prompt, stream=True): # Error!
print(chunk.content, end="")
3. Close Resources Properly
# ✅ GOOD: Clean shutdown
llm = create_llm("openai", model="gpt-4o-mini")
try:
response = await llm.agenerate("Test")
finally:
llm.unload_model(llm.model) # Closes async client
4. Handle Errors in Concurrent Operations
# ✅ GOOD: Catch errors per-task
async def safe_task(prompt):
try:
return await llm.agenerate(prompt)
except Exception as e:
return f"Error: {e}"
results = await asyncio.gather(*[safe_task(p) for p in prompts])
Learning Resources
- Educational Demo: examples/async_cli_demo.py - 8 core async/await patterns
- Test Suite:
tests/async/test_async_providers.py- real implementation examples - Concurrency & Throughput: concurrency.md - practical guidance for local inference
Summary
- ✅
agenerate()works across providers - ✅ Use
asyncio.gather()for concurrent (I/O-bound) requests - ✅ HTTP-based providers use native async; MLX/HuggingFace use a thread fallback to keep the event loop responsive
- ✅ Async streaming uses a 2-step pattern:
stream_gen = await llm.agenerate(..., stream=True)thenasync for ... - ✅ Works well in FastAPI and other async frameworks
Get Started:
pip install abstractcore
# Try the educational async demo
python examples/async_cli_demo.py --provider ollama --model qwen3:4b