API Reference
Complete reference for the AbstractCore API. All examples work across any provider.
Table of Contents
Table of Contents Core Functions Classes Event System Retry Configuration Embeddings Exceptions Advanced Usage PatternsTable of Contents
- Core Functions
- Classes
- AbstractCoreInterface
- BasicSession
- Event System
- Retry Configuration
- Embeddings
- Exceptions
Core Functions
create_llm()
Creates an LLM provider instance.
def create_llm(
provider: str,
model: Optional[str] = None,
retry_config: Optional[RetryConfig] = None,
**kwargs
) -> AbstractCoreInterface
Parameters:
- provider (str): Provider name ("openai", "anthropic", "ollama", "mlx", "lmstudio", "huggingface")
- model (str, optional): Model name. If not provided, uses provider default
- retry_config (RetryConfig, optional): Custom retry configuration
- **kwargs: Provider-specific parameters
Provider-specific parameters:
- api_key (str): API key for cloud providers
- base_url (str): Custom endpoint URL
- temperature (float): Sampling temperature (0.0-1.0, controls creativity)
- seed (int): Random seed for deterministic outputs (✅ OpenAI, Ollama, MLX, HuggingFace, LMStudio; ⚠️ Anthropic issues warning)
- max_tokens (int): Maximum output tokens
- timeout (int): Request timeout in seconds
- top_p (float): Nucleus sampling parameter
Returns: AbstractCoreInterface instance
Example:
from abstractcore import create_llm
# Basic usage
llm = create_llm("openai", model="gpt-4o-mini")
# With configuration
llm = create_llm(
"anthropic",
model="claude-haiku-4-5",
temperature=0.7,
max_tokens=1000,
timeout=30
)
# Local provider
llm = create_llm("ollama", model="qwen2.5-coder:7b", base_url="http://localhost:11434")
Classes
AbstractCoreInterface
Base interface for all LLM providers. All providers implement this interface.
generate()
Generate text response from the LLM.
def generate(
self,
prompt: str,
messages: Optional[List[Dict]] = None,
system_prompt: Optional[str] = None,
tools: Optional[List[Dict]] = None,
response_model: Optional[BaseModel] = None,
retry_strategy: Optional[Retry] = None,
stream: bool = False,
thinking: Optional[bool | str] = None,
**kwargs
) -> Union[GenerateResponse, Iterator[GenerateResponse]]
Parameters:
- prompt (str): Text prompt to generate from
- messages (List[Dict], optional): Conversation messages in OpenAI format
- system_prompt (str, optional): System prompt to set context
- tools (List[Dict], optional): Tools the LLM can call
- response_model (BaseModel, optional): Pydantic model for structured output
- retry_strategy (Retry, optional): Custom retry strategy for structured output
- stream (bool): Enable streaming response
- thinking (bool | str, optional): Unified thinking/reasoning control ("auto"|"on"|"off" or "low"|"medium"|"high" when supported)
- **kwargs: Additional generation parameters
Returns:
- If stream=False: GenerateResponse
- If stream=True: Iterator[GenerateResponse]
Examples:
Basic Generation:
response = llm.generate("What is machine learning?")
print(response.content)
With System Prompt:
response = llm.generate(
"Explain Python decorators",
system_prompt="You are a Python expert. Always provide code examples."
)
Structured Output:
from pydantic import BaseModel
class Person(BaseModel):
name: str
age: int
person = llm.generate(
"Extract: John Doe is 25 years old",
response_model=Person
)
print(f"{person.name}, age {person.age}")
See: Structured Output Guide for comprehensive documentation
Tool Calling:
def get_weather(city: str) -> str:
return f"Weather in {city}: sunny, 22°C"
tools = [{
"name": "get_weather",
"description": "Get weather for a city",
"parameters": {
"type": "object",
"properties": {"city": {"type": "string"}},
"required": ["city"]
}
}]
response = llm.generate("What's the weather in Paris?", tools=tools)
Streaming:
print("AI: ", end="")
for chunk in llm.generate(
"Create a Python function with a tool",
stream=True,
tools=tools
):
# Real-time chunk processing
print(chunk.content or "", end="", flush=True)
# Tool calls are surfaced as structured dicts; execute them in your host/runtime.
if chunk.tool_calls:
print(f"\nTool calls: {chunk.tool_calls}")
Streaming notes:
- Streaming uses a unified processor across providers; exact chunking behavior depends on the backend.
- Tool calls are surfaced as structured dicts in chunk.tool_calls; execute them in your host/runtime (pass-through by default).
- If you need tool-call markup preserved/re-written in chunk.content, pass tool_call_tags=... (see Tool Call Syntax Rewriting).
- In streaming mode, AbstractCore records a best-effort TTFT metric in chunk.metadata["_timing"]["ttft_ms"] when available (for debugging/observability).
agenerate()
Async version of generate() for concurrent request execution.
async def agenerate(
self,
prompt: str,
messages: Optional[List[Dict]] = None,
system_prompt: Optional[str] = None,
tools: Optional[List[Dict]] = None,
response_model: Optional[BaseModel] = None,
stream: bool = False,
**kwargs
) -> Union[GenerateResponse, AsyncIterator[GenerateResponse]]
Parameters: Same as generate()
Returns:
- If stream=False: GenerateResponse
- If stream=True: AsyncIterator[GenerateResponse]
Examples:
Basic Async:
import asyncio
async def main():
response = await llm.agenerate("What is quantum computing?")
print(response.content)
asyncio.run(main())
Concurrent Requests:
async def batch_process():
tasks = [
llm.agenerate("Summarize Python"),
llm.agenerate("Summarize JavaScript"),
llm.agenerate("Summarize Rust")
]
responses = await asyncio.gather(*tasks)
for response in responses:
print(response.content)
asyncio.run(batch_process())
Async Streaming:
async def stream_response():
async for chunk in llm.agenerate("Tell me a story", stream=True):
print(chunk.content, end='', flush=True)
asyncio.run(stream_response())
Multi-Provider Comparison:
async def compare_providers():
openai = create_llm("openai", model="gpt-4o-mini")
claude = create_llm("anthropic", model="claude-haiku-4-5")
responses = await asyncio.gather(
openai.agenerate("What is 2+2?"),
claude.agenerate("What is 2+2?")
)
print(f"OpenAI: {responses[0].content}")
print(f"Claude: {responses[1].content}")
asyncio.run(compare_providers())
Features:
- Works across AbstractCore providers (cloud + local); some use native async, others fall back to asyncio.to_thread()
- Faster batch operations via concurrent execution (depends on provider, network, and hardware)
- Full streaming support with AsyncIterator
- Compatible with FastAPI and async web frameworks
- Zero breaking changes to sync API
get_capabilities()
Get provider capabilities.
def get_capabilities(self) -> List[str]
Returns: List of capability strings
Example:
capabilities = llm.get_capabilities()
print(capabilities) # ['text_generation', 'tool_calling', 'streaming', 'vision']
unload_model(model_name)
Unload/cleanup resources for a specific model (best-effort).
def unload_model(self, model_name: str) -> None
For local providers (Ollama, MLX, HuggingFace, LMStudio), this explicitly frees model memory or releases client resources. For API providers (OpenAI, Anthropic), this is typically a no-op but safe to call.
Provider-specific behavior:
- Ollama: Sends keep_alive=0 to immediately unload from server
- MLX: Clears model/tokenizer references and forces garbage collection
- HuggingFace: Closes llama.cpp resources (GGUF) or clears model references
- LMStudio: Closes HTTP connection (server auto-manages via TTL)
- OpenAI/Anthropic: No-op (safe to call)
Example:
# Load and use a large model
llm = create_llm("ollama", model="qwen3-coder:30b")
response = llm.generate("Hello world")
# Explicitly free memory when done
llm.unload_model(llm.model)
del llm
# Now safe to load another large model
llm2 = create_llm("mlx", model="mlx-community/Qwen3-30B-4bit")
Use cases: - Test suites testing multiple models sequentially - Memory-constrained environments (<32GB RAM) - Sequential model loading in production systems
GenerateResponse
Response object from LLM generation with consistent token terminology and generation time tracking.
@dataclass
class GenerateResponse:
content: Optional[str]
raw_response: Any
model: Optional[str]
finish_reason: Optional[str]
usage: Optional[Dict[str, int]]
tool_calls: Optional[List[Dict]]
metadata: Optional[Dict]
gen_time: Optional[float] # Generation time in milliseconds
# Consistent token access properties
@property
def input_tokens(self) -> Optional[int]:
"""Get input tokens with consistent terminology."""
@property
def output_tokens(self) -> Optional[int]:
"""Get output tokens with consistent terminology."""
@property
def total_tokens(self) -> Optional[int]:
"""Get total tokens."""
Attributes:
- content (str): Generated text content
- raw_response (Any): Raw provider response
- model (str): Model used for generation
- finish_reason (str): Why generation stopped ("stop", "length", "tool_calls")
- usage (Dict): Token usage information
- tool_calls (List[Dict]): Tools called by the LLM
- metadata (Dict): Additional metadata (notably metadata["reasoning"] when a provider/model exposes thinking/reasoning)
- gen_time (float): Generation time in milliseconds, rounded to 1 decimal place
Token and Timing Access Examples:
response = llm.generate("Explain quantum computing")
# Best-effort access across supported providers (may be None depending on backend/config)
print(f"Input tokens: {response.input_tokens}") # None if usage isn't reported/estimated
print(f"Output tokens: {response.output_tokens}") # None if usage isn't reported/estimated
print(f"Total tokens: {response.total_tokens}") # None if usage isn't reported/estimated
print(f"Generation time: {response.gen_time}ms") # None if timing wasn't captured
# Comprehensive summary
print(f"Summary: {response.get_summary()}") # Model | Tokens | Time | Tools
# Raw usage dictionary (provider-specific format)
print(f"Usage details: {response.usage}")
Token Count Sources:
- Provider APIs: OpenAI, Anthropic, LMStudio (native API token counts)
- AbstractCore Calculation: MLX, HuggingFace (using token_utils.py)
- Mixed Sources: Ollama (combination of provider and calculated tokens)
Backward Compatibility: Legacy prompt_tokens and completion_tokens keys remain available in response.usage dictionary.
Methods:
has_tool_calls()
def has_tool_calls(self) -> bool
Returns True if the response contains tool calls.
get_tools_executed()
def get_tools_executed(self) -> List[str]
Returns list of tool names that were executed.
Example:
response = llm.generate("What's 2+2?", tools=[calculator_tool])
print(f"Content: {response.content}")
print(f"Model: {response.model}")
print(f"Tokens: {response.usage}")
if response.has_tool_calls():
print(f"Tools used: {response.get_tools_executed()}")
BasicSession
Manages conversation context and history.
class BasicSession:
def __init__(
self,
provider: AbstractCoreInterface,
system_prompt: Optional[str] = None,
temperature: Optional[float] = None,
seed: Optional[int] = None,
**kwargs
):
Parameters:
- provider (AbstractCoreInterface): LLM provider instance
- system_prompt (str, optional): System prompt for the conversation
- temperature (float, optional): Default temperature for all generations (0.0-1.0)
- seed (int, optional): Default seed for deterministic outputs (provider support varies)
- **kwargs: Additional session parameters (tools, timeouts, etc.)
Attributes:
- messages (List[Message]): Conversation history
- provider (AbstractCoreInterface): LLM provider
- system_prompt (str): System prompt
Methods:
generate()
def generate(self, prompt: str, **kwargs) -> GenerateResponse
Generate response and add to conversation history.
agenerate()
async def agenerate(
self,
prompt: str,
name: Optional[str] = None,
location: Optional[str] = None,
**kwargs
) -> Union[GenerateResponse, AsyncIterator[GenerateResponse]]
Async version of generate(). Maintains conversation history with async execution.
Example:
import asyncio
async def chat():
session = BasicSession(provider=llm)
# Async conversation
response1 = await session.agenerate("My name is Alice")
response2 = await session.agenerate("What's my name?")
print(response2.content) # References Alice
asyncio.run(chat())
add_message()
def add_message(self, role: str, content: str, **metadata) -> Message
Add message to conversation history.
clear_history()
def clear_history(self, keep_system: bool = True) -> None
Clear conversation history, optionally keeping system prompt.
save()
def save(self, filepath: Path) -> None
Save session to JSON file.
load()
@classmethod
def load(cls, filepath: Path, provider: AbstractCoreInterface) -> "BasicSession"
Load session from JSON file.
Example:
from abstractcore import create_llm, BasicSession
llm = create_llm("openai", model="gpt-4o-mini")
session = BasicSession(
provider=llm,
system_prompt="You are a helpful coding tutor.",
temperature=0.3, # Focused responses
seed=42 # Consistent outputs
)
# Multi-turn conversation
response1 = session.generate("What are Python decorators?")
response2 = session.generate("Show me an example", temperature=0.7) # Override for this call
print(f"Conversation has {len(session.messages)} messages")
# Save session
session.save(Path("conversation.json"))
# Load later
loaded_session = BasicSession.load(Path("conversation.json"), llm)
Message
Represents a conversation message.
@dataclass
class Message:
role: str
content: str
timestamp: Optional[datetime] = None
name: Optional[str] = None
metadata: Optional[Dict] = None
Methods:
to_dict()
def to_dict(self) -> Dict
Convert message to dictionary.
from_dict()
@classmethod
def from_dict(cls, data: Dict) -> "Message"
Create message from dictionary.
Event System
EventType
Available event types for monitoring.
class EventType(Enum):
# Generation events
GENERATION_STARTED = "generation_started"
GENERATION_COMPLETED = "generation_completed"
# Tool events
TOOL_STARTED = "tool_started"
TOOL_PROGRESS = "tool_progress"
TOOL_COMPLETED = "tool_completed"
# Error handling
ERROR = "error"
# Retry and resilience events
RETRY_ATTEMPTED = "retry_attempted"
RETRY_EXHAUSTED = "retry_exhausted"
# Useful events
VALIDATION_FAILED = "validation_failed"
SESSION_CREATED = "session_created"
SESSION_CLEARED = "session_cleared"
COMPACTION_STARTED = "compaction_started"
COMPACTION_COMPLETED = "compaction_completed"
# Runtime/workflow events
WORKFLOW_STEP_STARTED = "workflow_step_started"
WORKFLOW_STEP_COMPLETED = "workflow_step_completed"
WORKFLOW_STEP_WAITING = "workflow_step_waiting"
WORKFLOW_STEP_FAILED = "workflow_step_failed"
on_global()
Register global event handler.
def on_global(event_type: EventType, handler: Callable[[Event], None]) -> None
Parameters:
- event_type (EventType): Event type to listen for
- handler (Callable): Function to call when event occurs
Example:
from abstractcore.events import EventType, on_global
def cost_monitor(event):
cost = event.data.get("cost_usd")
if cost:
# NOTE: `cost_usd` is a best-effort estimate based on token usage.
print(f"Estimated cost: ${cost:.4f}")
def tool_monitor(event):
# Tool event payload shape varies by emitter.
# - Single-tool execution: {"tool_name": ..., "success": ..., ...}
# - Batch execution: {"tool_results": [{"name": ..., "success": ...}, ...], ...}
tool_name = event.data.get("tool_name")
if tool_name:
print(f"Tool completed: {tool_name} success={event.data.get('success')}")
return
for r in event.data.get("tool_results", []) or []:
print(f"Tool completed: {r.get('name')} success={r.get('success')} error={r.get('error')}")
# Register handlers
on_global(EventType.GENERATION_COMPLETED, cost_monitor)
on_global(EventType.TOOL_COMPLETED, tool_monitor)
# Now all LLM operations will trigger these handlers
llm = create_llm("openai", model="gpt-4o-mini")
response = llm.generate("Hello world")
Event
Event object passed to handlers.
@dataclass
class Event:
type: EventType
timestamp: datetime
data: Dict[str, Any]
source: Optional[str] = None
Retry Configuration
RetryConfig
Configuration for provider-level retry behavior.
@dataclass
class RetryConfig:
max_attempts: int = 3
initial_delay: float = 1.0
max_delay: float = 60.0
exponential_base: float = 2.0
use_jitter: bool = True
failure_threshold: int = 5
recovery_timeout: float = 60.0
half_open_max_calls: int = 2
Parameters:
- max_attempts (int): Maximum retry attempts
- initial_delay (float): Initial delay in seconds
- max_delay (float): Maximum delay in seconds
- exponential_base (float): Base for exponential backoff
- use_jitter (bool): Add randomness to delays
- failure_threshold (int): Circuit breaker failure threshold
- recovery_timeout (float): Circuit breaker recovery timeout
- half_open_max_calls (int): Max calls in half-open state
Example:
from abstractcore import create_llm
from abstractcore.core.retry import RetryConfig
config = RetryConfig(
max_attempts=5,
initial_delay=2.0,
use_jitter=True,
failure_threshold=3
)
llm = create_llm("openai", model="gpt-4o-mini", retry_config=config)
FeedbackRetry
Retry strategy for structured output validation failures.
class FeedbackRetry:
def __init__(self, max_attempts: int = 3):
self.max_attempts = max_attempts
Example:
from abstractcore.structured import FeedbackRetry
from pydantic import BaseModel
class User(BaseModel):
name: str
age: int
custom_retry = FeedbackRetry(max_attempts=5)
user = llm.generate(
"Extract user: John Doe, 25",
response_model=User,
retry_strategy=custom_retry
)
Embeddings
EmbeddingManager
Manages text embeddings using SOTA models.
class EmbeddingManager:
def __init__(
self,
model: str = "embeddinggemma",
backend: str = "auto",
output_dims: Optional[int] = None,
cache_size: int = 1000,
cache_dir: Optional[str] = None
):
Parameters:
- model (str): Model name ("embeddinggemma", "granite", "stella-400m")
- backend (str): Backend ("auto", "pytorch", "onnx")
- output_dims (int, optional): Truncate output dimensions
- cache_size (int): Memory cache size
- cache_dir (str, optional): Disk cache directory
Methods:
embed()
def embed(self, text: str) -> List[float]
Generate embedding for single text.
embed_batch()
def embed_batch(self, texts: List[str]) -> List[List[float]]
Generate embeddings for multiple texts (more efficient).
compute_similarity()
def compute_similarity(self, text1: str, text2: str) -> float
Compute cosine similarity between two texts.
Example:
from abstractcore.embeddings import EmbeddingManager
embedder = EmbeddingManager(model="embeddinggemma")
# Single embedding
embedding = embedder.embed("Hello world")
print(f"Embedding dimension: {len(embedding)}")
# Batch embeddings
embeddings = embedder.embed_batch(["Hello", "World", "AI"])
# Similarity
similarity = embedder.compute_similarity("cat", "kitten")
print(f"Similarity: {similarity:.3f}")
Exceptions
Base Exceptions
AbstractCoreError
class AbstractCoreError(Exception):
"""Base exception for AbstractCore."""
ProviderAPIError
class ProviderAPIError(AbstractCoreError):
"""Provider API error."""
ModelNotFoundError
class ModelNotFoundError(AbstractCoreError):
"""Model not found error."""
AuthenticationError
class AuthenticationError(ProviderAPIError):
"""Authentication error."""
RateLimitError
class RateLimitError(ProviderAPIError):
"""Rate limit error."""
Usage
from abstractcore.exceptions import ProviderAPIError, RateLimitError
try:
response = llm.generate("Hello world")
except RateLimitError:
print("Rate limited, wait and retry")
except ProviderAPIError as e:
print(f"API error: {e}")
except Exception as e:
print(f"Unexpected error: {e}")
Advanced Usage Patterns
Custom Provider Configuration
# Provider with all options
llm = create_llm(
provider="openai",
model="gpt-4o-mini",
api_key="your-key",
temperature=0.7,
max_tokens=1000,
top_p=0.9,
timeout=30,
retry_config=RetryConfig(max_attempts=5)
)
Multi-Provider Setup
providers = {
"fast": create_llm("openai", model="gpt-4o-mini"),
"smart": create_llm("openai", model="gpt-4o"),
"long_context": create_llm("anthropic", model="claude-haiku-4-5"),
"local": create_llm("ollama", model="qwen2.5-coder:7b")
}
def route_request(prompt, task_type="general"):
if task_type == "simple":
return providers["fast"].generate(prompt)
elif task_type == "complex":
return providers["smart"].generate(prompt)
elif len(prompt) > 50000:
return providers["long_context"].generate(prompt)
else:
return providers["local"].generate(prompt)
Production Monitoring
from abstractcore.events import EventType, on_global
import logging
# Setup logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Cost tracking
total_cost = 0.0
def production_monitor(event):
global total_cost
if event.type == EventType.GENERATION_COMPLETED:
cost = event.data.get("cost_usd")
if cost:
# NOTE: `cost_usd` is a best-effort estimate based on token usage.
total_cost += float(cost)
logger.info(f"Estimated cost: ${float(cost):.4f}, Total: ${total_cost:.4f}")
duration_ms = event.data.get("duration_ms")
if isinstance(duration_ms, (int, float)) and duration_ms > 10_000:
logger.warning(f"Slow request: {float(duration_ms):.0f}ms")
elif event.type == EventType.ERROR:
logger.error(f"Error: {event.data.get('error')}")
elif event.type == EventType.RETRY_ATTEMPTED:
logger.info(f"Retrying due to: {event.data.get('error_type')}")
on_global(EventType.GENERATION_COMPLETED, production_monitor)
on_global(EventType.ERROR, production_monitor)
on_global(EventType.RETRY_ATTEMPTED, production_monitor)
For more examples and use cases, see: - Getting Started - Basic setup and usage - Examples - Practical use cases - Prerequisites - Provider setup and configuration - Capabilities - What AbstractCore can do