API Reference

Complete reference for the AbstractCore API. All examples work across any provider.

Table of Contents

Table of Contents Core Functions Classes Event System Retry Configuration Embeddings Exceptions Advanced Usage Patterns

Table of Contents

Core Functions

create_llm()

Creates an LLM provider instance.

def create_llm(
    provider: str,
    model: Optional[str] = None,
    retry_config: Optional[RetryConfig] = None,
    **kwargs
) -> AbstractCoreInterface

Parameters: - provider (str): Provider name ("openai", "anthropic", "ollama", "mlx", "lmstudio", "huggingface") - model (str, optional): Model name. If not provided, uses provider default - retry_config (RetryConfig, optional): Custom retry configuration - **kwargs: Provider-specific parameters

Provider-specific parameters: - api_key (str): API key for cloud providers - base_url (str): Custom endpoint URL - temperature (float): Sampling temperature (0.0-1.0, controls creativity) - seed (int): Random seed for deterministic outputs (✅ OpenAI, Ollama, MLX, HuggingFace, LMStudio; ⚠️ Anthropic issues warning) - max_tokens (int): Maximum output tokens - timeout (int): Request timeout in seconds - top_p (float): Nucleus sampling parameter

Returns: AbstractCoreInterface instance

Example:

from abstractcore import create_llm

# Basic usage
llm = create_llm("openai", model="gpt-4o-mini")

# With configuration
llm = create_llm(
    "anthropic",
    model="claude-haiku-4-5",
    temperature=0.7,
    max_tokens=1000,
    timeout=30
)

# Local provider
llm = create_llm("ollama", model="qwen2.5-coder:7b", base_url="http://localhost:11434")

Classes

AbstractCoreInterface

Base interface for all LLM providers. All providers implement this interface.

generate()

Generate text response from the LLM.

def generate(
    self,
    prompt: str,
    messages: Optional[List[Dict]] = None,
    system_prompt: Optional[str] = None,
    tools: Optional[List[Dict]] = None,
    response_model: Optional[BaseModel] = None,
    retry_strategy: Optional[Retry] = None,
    stream: bool = False,
    thinking: Optional[bool | str] = None,
    **kwargs
) -> Union[GenerateResponse, Iterator[GenerateResponse]]

Parameters: - prompt (str): Text prompt to generate from - messages (List[Dict], optional): Conversation messages in OpenAI format - system_prompt (str, optional): System prompt to set context - tools (List[Dict], optional): Tools the LLM can call - response_model (BaseModel, optional): Pydantic model for structured output - retry_strategy (Retry, optional): Custom retry strategy for structured output - stream (bool): Enable streaming response - thinking (bool | str, optional): Unified thinking/reasoning control ("auto"|"on"|"off" or "low"|"medium"|"high" when supported) - **kwargs: Additional generation parameters

Returns: - If stream=False: GenerateResponse - If stream=True: Iterator[GenerateResponse]

Examples:

Basic Generation:

response = llm.generate("What is machine learning?")
print(response.content)

With System Prompt:

response = llm.generate(
    "Explain Python decorators",
    system_prompt="You are a Python expert. Always provide code examples."
)

Structured Output:

from pydantic import BaseModel

class Person(BaseModel):
    name: str
    age: int

person = llm.generate(
    "Extract: John Doe is 25 years old",
    response_model=Person
)
print(f"{person.name}, age {person.age}")

See: Structured Output Guide for comprehensive documentation

Tool Calling:

def get_weather(city: str) -> str:
    return f"Weather in {city}: sunny, 22°C"

tools = [{
    "name": "get_weather",
    "description": "Get weather for a city",
    "parameters": {
        "type": "object",
        "properties": {"city": {"type": "string"}},
        "required": ["city"]
    }
}]

response = llm.generate("What's the weather in Paris?", tools=tools)

Streaming:

print("AI: ", end="")
for chunk in llm.generate(
    "Create a Python function with a tool",
    stream=True,
    tools=tools
):
    # Real-time chunk processing
    print(chunk.content or "", end="", flush=True)

    # Tool calls are surfaced as structured dicts; execute them in your host/runtime.
    if chunk.tool_calls:
        print(f"\nTool calls: {chunk.tool_calls}")

Streaming notes: - Streaming uses a unified processor across providers; exact chunking behavior depends on the backend. - Tool calls are surfaced as structured dicts in chunk.tool_calls; execute them in your host/runtime (pass-through by default). - If you need tool-call markup preserved/re-written in chunk.content, pass tool_call_tags=... (see Tool Call Syntax Rewriting). - In streaming mode, AbstractCore records a best-effort TTFT metric in chunk.metadata["_timing"]["ttft_ms"] when available (for debugging/observability).

agenerate()

Async version of generate() for concurrent request execution.

async def agenerate(
    self,
    prompt: str,
    messages: Optional[List[Dict]] = None,
    system_prompt: Optional[str] = None,
    tools: Optional[List[Dict]] = None,
    response_model: Optional[BaseModel] = None,
    stream: bool = False,
    **kwargs
) -> Union[GenerateResponse, AsyncIterator[GenerateResponse]]

Parameters: Same as generate()

Returns: - If stream=False: GenerateResponse - If stream=True: AsyncIterator[GenerateResponse]

Examples:

Basic Async:

import asyncio

async def main():
    response = await llm.agenerate("What is quantum computing?")
    print(response.content)

asyncio.run(main())

Concurrent Requests:

async def batch_process():
    tasks = [
        llm.agenerate("Summarize Python"),
        llm.agenerate("Summarize JavaScript"),
        llm.agenerate("Summarize Rust")
    ]
    responses = await asyncio.gather(*tasks)

    for response in responses:
        print(response.content)

asyncio.run(batch_process())

Async Streaming:

async def stream_response():
    async for chunk in llm.agenerate("Tell me a story", stream=True):
        print(chunk.content, end='', flush=True)

asyncio.run(stream_response())

Multi-Provider Comparison:

async def compare_providers():
    openai = create_llm("openai", model="gpt-4o-mini")
    claude = create_llm("anthropic", model="claude-haiku-4-5")

    responses = await asyncio.gather(
        openai.agenerate("What is 2+2?"),
        claude.agenerate("What is 2+2?")
    )

    print(f"OpenAI: {responses[0].content}")
    print(f"Claude: {responses[1].content}")

asyncio.run(compare_providers())

Features: - Works across AbstractCore providers (cloud + local); some use native async, others fall back to asyncio.to_thread() - Faster batch operations via concurrent execution (depends on provider, network, and hardware) - Full streaming support with AsyncIterator - Compatible with FastAPI and async web frameworks - Zero breaking changes to sync API

get_capabilities()

Get provider capabilities.

def get_capabilities(self) -> List[str]

Returns: List of capability strings

Example:

capabilities = llm.get_capabilities()
print(capabilities)  # ['text_generation', 'tool_calling', 'streaming', 'vision']

unload_model(model_name)

Unload/cleanup resources for a specific model (best-effort).

def unload_model(self, model_name: str) -> None

For local providers (Ollama, MLX, HuggingFace, LMStudio), this explicitly frees model memory or releases client resources. For API providers (OpenAI, Anthropic), this is typically a no-op but safe to call.

Provider-specific behavior: - Ollama: Sends keep_alive=0 to immediately unload from server - MLX: Clears model/tokenizer references and forces garbage collection - HuggingFace: Closes llama.cpp resources (GGUF) or clears model references - LMStudio: Closes HTTP connection (server auto-manages via TTL) - OpenAI/Anthropic: No-op (safe to call)

Example:

# Load and use a large model
llm = create_llm("ollama", model="qwen3-coder:30b")
response = llm.generate("Hello world")

# Explicitly free memory when done
llm.unload_model(llm.model)
del llm

# Now safe to load another large model
llm2 = create_llm("mlx", model="mlx-community/Qwen3-30B-4bit")

Use cases: - Test suites testing multiple models sequentially - Memory-constrained environments (<32GB RAM) - Sequential model loading in production systems

GenerateResponse

Response object from LLM generation with consistent token terminology and generation time tracking.

@dataclass
class GenerateResponse:
    content: Optional[str]
    raw_response: Any
    model: Optional[str]
    finish_reason: Optional[str]
    usage: Optional[Dict[str, int]]
    tool_calls: Optional[List[Dict]]
    metadata: Optional[Dict]
    gen_time: Optional[float]  # Generation time in milliseconds

    # Consistent token access properties
    @property
    def input_tokens(self) -> Optional[int]:
        """Get input tokens with consistent terminology."""

    @property
    def output_tokens(self) -> Optional[int]:
        """Get output tokens with consistent terminology."""

    @property
    def total_tokens(self) -> Optional[int]:
        """Get total tokens."""

Attributes: - content (str): Generated text content - raw_response (Any): Raw provider response - model (str): Model used for generation - finish_reason (str): Why generation stopped ("stop", "length", "tool_calls") - usage (Dict): Token usage information - tool_calls (List[Dict]): Tools called by the LLM - metadata (Dict): Additional metadata (notably metadata["reasoning"] when a provider/model exposes thinking/reasoning) - gen_time (float): Generation time in milliseconds, rounded to 1 decimal place

Token and Timing Access Examples:

response = llm.generate("Explain quantum computing")

# Best-effort access across supported providers (may be None depending on backend/config)
print(f"Input tokens: {response.input_tokens}")      # None if usage isn't reported/estimated
print(f"Output tokens: {response.output_tokens}")    # None if usage isn't reported/estimated
print(f"Total tokens: {response.total_tokens}")      # None if usage isn't reported/estimated
print(f"Generation time: {response.gen_time}ms")     # None if timing wasn't captured

# Comprehensive summary
print(f"Summary: {response.get_summary()}")  # Model | Tokens | Time | Tools

# Raw usage dictionary (provider-specific format)
print(f"Usage details: {response.usage}")

Token Count Sources: - Provider APIs: OpenAI, Anthropic, LMStudio (native API token counts) - AbstractCore Calculation: MLX, HuggingFace (using token_utils.py) - Mixed Sources: Ollama (combination of provider and calculated tokens)

Backward Compatibility: Legacy prompt_tokens and completion_tokens keys remain available in response.usage dictionary.

Methods:

has_tool_calls()

def has_tool_calls(self) -> bool

Returns True if the response contains tool calls.

get_tools_executed()

def get_tools_executed(self) -> List[str]

Returns list of tool names that were executed.

Example:

response = llm.generate("What's 2+2?", tools=[calculator_tool])

print(f"Content: {response.content}")
print(f"Model: {response.model}")
print(f"Tokens: {response.usage}")

if response.has_tool_calls():
    print(f"Tools used: {response.get_tools_executed()}")

BasicSession

Manages conversation context and history.

class BasicSession:
    def __init__(
        self,
        provider: AbstractCoreInterface,
        system_prompt: Optional[str] = None,
        temperature: Optional[float] = None,
        seed: Optional[int] = None,
        **kwargs
    ):

Parameters: - provider (AbstractCoreInterface): LLM provider instance - system_prompt (str, optional): System prompt for the conversation - temperature (float, optional): Default temperature for all generations (0.0-1.0) - seed (int, optional): Default seed for deterministic outputs (provider support varies) - **kwargs: Additional session parameters (tools, timeouts, etc.)

Attributes: - messages (List[Message]): Conversation history - provider (AbstractCoreInterface): LLM provider - system_prompt (str): System prompt

Methods:

generate()

def generate(self, prompt: str, **kwargs) -> GenerateResponse

Generate response and add to conversation history.

agenerate()

async def agenerate(
    self,
    prompt: str,
    name: Optional[str] = None,
    location: Optional[str] = None,
    **kwargs
) -> Union[GenerateResponse, AsyncIterator[GenerateResponse]]

Async version of generate(). Maintains conversation history with async execution.

Example:

import asyncio

async def chat():
    session = BasicSession(provider=llm)

    # Async conversation
    response1 = await session.agenerate("My name is Alice")
    response2 = await session.agenerate("What's my name?")

    print(response2.content)  # References Alice

asyncio.run(chat())

add_message()

def add_message(self, role: str, content: str, **metadata) -> Message

Add message to conversation history.

clear_history()

def clear_history(self, keep_system: bool = True) -> None

Clear conversation history, optionally keeping system prompt.

save()

def save(self, filepath: Path) -> None

Save session to JSON file.

load()

@classmethod
def load(cls, filepath: Path, provider: AbstractCoreInterface) -> "BasicSession"

Load session from JSON file.

Example:

from abstractcore import create_llm, BasicSession

llm = create_llm("openai", model="gpt-4o-mini")
session = BasicSession(
    provider=llm,
    system_prompt="You are a helpful coding tutor.",
    temperature=0.3,  # Focused responses
    seed=42          # Consistent outputs
)

# Multi-turn conversation
response1 = session.generate("What are Python decorators?")
response2 = session.generate("Show me an example", temperature=0.7)  # Override for this call

print(f"Conversation has {len(session.messages)} messages")

# Save session
session.save(Path("conversation.json"))

# Load later
loaded_session = BasicSession.load(Path("conversation.json"), llm)

Message

Represents a conversation message.

@dataclass
class Message:
    role: str
    content: str
    timestamp: Optional[datetime] = None
    name: Optional[str] = None
    metadata: Optional[Dict] = None

Methods:

to_dict()

def to_dict(self) -> Dict

Convert message to dictionary.

from_dict()

@classmethod
def from_dict(cls, data: Dict) -> "Message"

Create message from dictionary.

Event System

EventType

Available event types for monitoring.

class EventType(Enum):
    # Generation events
    GENERATION_STARTED = "generation_started"
    GENERATION_COMPLETED = "generation_completed"

    # Tool events
    TOOL_STARTED = "tool_started"
    TOOL_PROGRESS = "tool_progress"
    TOOL_COMPLETED = "tool_completed"

    # Error handling
    ERROR = "error"

    # Retry and resilience events
    RETRY_ATTEMPTED = "retry_attempted"
    RETRY_EXHAUSTED = "retry_exhausted"

    # Useful events
    VALIDATION_FAILED = "validation_failed"
    SESSION_CREATED = "session_created"
    SESSION_CLEARED = "session_cleared"
    COMPACTION_STARTED = "compaction_started"
    COMPACTION_COMPLETED = "compaction_completed"

    # Runtime/workflow events
    WORKFLOW_STEP_STARTED = "workflow_step_started"
    WORKFLOW_STEP_COMPLETED = "workflow_step_completed"
    WORKFLOW_STEP_WAITING = "workflow_step_waiting"
    WORKFLOW_STEP_FAILED = "workflow_step_failed"

on_global()

Register global event handler.

def on_global(event_type: EventType, handler: Callable[[Event], None]) -> None

Parameters: - event_type (EventType): Event type to listen for - handler (Callable): Function to call when event occurs

Example:

from abstractcore.events import EventType, on_global

def cost_monitor(event):
    cost = event.data.get("cost_usd")
    if cost:
        # NOTE: `cost_usd` is a best-effort estimate based on token usage.
        print(f"Estimated cost: ${cost:.4f}")

def tool_monitor(event):
    # Tool event payload shape varies by emitter.
    # - Single-tool execution: {"tool_name": ..., "success": ..., ...}
    # - Batch execution: {"tool_results": [{"name": ..., "success": ...}, ...], ...}
    tool_name = event.data.get("tool_name")
    if tool_name:
        print(f"Tool completed: {tool_name} success={event.data.get('success')}")
        return

    for r in event.data.get("tool_results", []) or []:
        print(f"Tool completed: {r.get('name')} success={r.get('success')} error={r.get('error')}")

# Register handlers
on_global(EventType.GENERATION_COMPLETED, cost_monitor)
on_global(EventType.TOOL_COMPLETED, tool_monitor)

# Now all LLM operations will trigger these handlers
llm = create_llm("openai", model="gpt-4o-mini")
response = llm.generate("Hello world")

Event

Event object passed to handlers.

@dataclass
class Event:
    type: EventType
    timestamp: datetime
    data: Dict[str, Any]
    source: Optional[str] = None

Retry Configuration

RetryConfig

Configuration for provider-level retry behavior.

@dataclass
class RetryConfig:
    max_attempts: int = 3
    initial_delay: float = 1.0
    max_delay: float = 60.0
    exponential_base: float = 2.0
    use_jitter: bool = True
    failure_threshold: int = 5
    recovery_timeout: float = 60.0
    half_open_max_calls: int = 2

Parameters: - max_attempts (int): Maximum retry attempts - initial_delay (float): Initial delay in seconds - max_delay (float): Maximum delay in seconds - exponential_base (float): Base for exponential backoff - use_jitter (bool): Add randomness to delays - failure_threshold (int): Circuit breaker failure threshold - recovery_timeout (float): Circuit breaker recovery timeout - half_open_max_calls (int): Max calls in half-open state

Example:

from abstractcore import create_llm
from abstractcore.core.retry import RetryConfig

config = RetryConfig(
    max_attempts=5,
    initial_delay=2.0,
    use_jitter=True,
    failure_threshold=3
)

llm = create_llm("openai", model="gpt-4o-mini", retry_config=config)

FeedbackRetry

Retry strategy for structured output validation failures.

class FeedbackRetry:
    def __init__(self, max_attempts: int = 3):
        self.max_attempts = max_attempts

Example:

from abstractcore.structured import FeedbackRetry
from pydantic import BaseModel

class User(BaseModel):
    name: str
    age: int

custom_retry = FeedbackRetry(max_attempts=5)

user = llm.generate(
    "Extract user: John Doe, 25",
    response_model=User,
    retry_strategy=custom_retry
)

Embeddings

EmbeddingManager

Manages text embeddings using SOTA models.

class EmbeddingManager:
    def __init__(
        self,
        model: str = "embeddinggemma",
        backend: str = "auto",
        output_dims: Optional[int] = None,
        cache_size: int = 1000,
        cache_dir: Optional[str] = None
    ):

Parameters: - model (str): Model name ("embeddinggemma", "granite", "stella-400m") - backend (str): Backend ("auto", "pytorch", "onnx") - output_dims (int, optional): Truncate output dimensions - cache_size (int): Memory cache size - cache_dir (str, optional): Disk cache directory

Methods:

embed()

def embed(self, text: str) -> List[float]

Generate embedding for single text.

embed_batch()

def embed_batch(self, texts: List[str]) -> List[List[float]]

Generate embeddings for multiple texts (more efficient).

compute_similarity()

def compute_similarity(self, text1: str, text2: str) -> float

Compute cosine similarity between two texts.

Example:

from abstractcore.embeddings import EmbeddingManager

embedder = EmbeddingManager(model="embeddinggemma")

# Single embedding
embedding = embedder.embed("Hello world")
print(f"Embedding dimension: {len(embedding)}")

# Batch embeddings
embeddings = embedder.embed_batch(["Hello", "World", "AI"])

# Similarity
similarity = embedder.compute_similarity("cat", "kitten")
print(f"Similarity: {similarity:.3f}")

Exceptions

Base Exceptions

AbstractCoreError

class AbstractCoreError(Exception):
    """Base exception for AbstractCore."""

ProviderAPIError

class ProviderAPIError(AbstractCoreError):
    """Provider API error."""

ModelNotFoundError

class ModelNotFoundError(AbstractCoreError):
    """Model not found error."""

AuthenticationError

class AuthenticationError(ProviderAPIError):
    """Authentication error."""

RateLimitError

class RateLimitError(ProviderAPIError):
    """Rate limit error."""

Usage

from abstractcore.exceptions import ProviderAPIError, RateLimitError

try:
    response = llm.generate("Hello world")
except RateLimitError:
    print("Rate limited, wait and retry")
except ProviderAPIError as e:
    print(f"API error: {e}")
except Exception as e:
    print(f"Unexpected error: {e}")

Advanced Usage Patterns

Custom Provider Configuration

# Provider with all options
llm = create_llm(
    provider="openai",
    model="gpt-4o-mini",
    api_key="your-key",
    temperature=0.7,
    max_tokens=1000,
    top_p=0.9,
    timeout=30,
    retry_config=RetryConfig(max_attempts=5)
)

Multi-Provider Setup

providers = {
    "fast": create_llm("openai", model="gpt-4o-mini"),
    "smart": create_llm("openai", model="gpt-4o"),
    "long_context": create_llm("anthropic", model="claude-haiku-4-5"),
    "local": create_llm("ollama", model="qwen2.5-coder:7b")
}

def route_request(prompt, task_type="general"):
    if task_type == "simple":
        return providers["fast"].generate(prompt)
    elif task_type == "complex":
        return providers["smart"].generate(prompt)
    elif len(prompt) > 50000:
        return providers["long_context"].generate(prompt)
    else:
        return providers["local"].generate(prompt)

Production Monitoring

from abstractcore.events import EventType, on_global
import logging

# Setup logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# Cost tracking
total_cost = 0.0

def production_monitor(event):
    global total_cost

    if event.type == EventType.GENERATION_COMPLETED:
        cost = event.data.get("cost_usd")
        if cost:
            # NOTE: `cost_usd` is a best-effort estimate based on token usage.
            total_cost += float(cost)
            logger.info(f"Estimated cost: ${float(cost):.4f}, Total: ${total_cost:.4f}")

        duration_ms = event.data.get("duration_ms")
        if isinstance(duration_ms, (int, float)) and duration_ms > 10_000:
            logger.warning(f"Slow request: {float(duration_ms):.0f}ms")

    elif event.type == EventType.ERROR:
        logger.error(f"Error: {event.data.get('error')}")

    elif event.type == EventType.RETRY_ATTEMPTED:
        logger.info(f"Retrying due to: {event.data.get('error_type')}")

on_global(EventType.GENERATION_COMPLETED, production_monitor)
on_global(EventType.ERROR, production_monitor)
on_global(EventType.RETRY_ATTEMPTED, production_monitor)

For more examples and use cases, see: - Getting Started - Basic setup and usage - Examples - Practical use cases - Prerequisites - Provider setup and configuration - Capabilities - What AbstractCore can do