providers
What it is
Provider-facing service functions and data models used by the Nexus API to run chat completions (sync or streaming) against multiple AI backends:
- Anthropic (Claude)
- OpenAI
- Ollama (local)
- Cloudflare Workers AI
- Custom OpenAI-compatible HTTP endpoints
- ABI server streaming and in-process ABI agent streaming
- Optional Ollama tool-calling loop via a local
search_webAPI
Public API
Data models
-
class ProviderConfig(BaseModel)- Provider configuration (typically supplied by the frontend).
- Fields:
id,name,type,enabled,endpoint,api_key,account_id,model.
-
class Message(BaseModel)- Chat message representation.
- Fields:
role("user"|"assistant"|"system"),content, optionalimages(list of base64 strings).
Chat completion (non-streaming)
-
async def complete_chat(messages, config, system_prompt=None) -> str- Routes to the correct provider based on
config.type. - Supported
type:anthropic,openai,ollama,cloudflare,custom.
- Routes to the correct provider based on
-
async def complete_with_anthropic(messages, config, system_prompt=None) -> str- Uses
anthropicSDK; mergessystem_promptwith any"system"messages.
- Uses
-
async def complete_with_openai(messages, config, system_prompt=None) -> str- Uses
openaiSDK; prependssystem_promptas a"system"message if provided.
- Uses
-
async def complete_with_ollama(messages, config, system_prompt=None) -> str- Calls Ollama
POST {endpoint}/api/chatwithstream=False. - Supports
Message.imagesfor multimodal models.
- Calls Ollama
-
async def complete_with_cloudflare(messages, config, system_prompt=None) -> str- Calls Cloudflare Workers AI run endpoint; returns
data["result"]["response"]when present.
- Calls Cloudflare Workers AI run endpoint; returns
-
async def complete_with_custom(messages, config, system_prompt=None) -> str- Calls a custom OpenAI-compatible endpoint:
POST {endpoint}/v1/chat/completions.
- Calls a custom OpenAI-compatible endpoint:
Chat completion (streaming)
-
async def stream_with_ollama(messages, config, system_prompt=None) -> AsyncGenerator[str, None]- Streams Ollama JSON lines from
POST {endpoint}/api/chatwithstream=True. - Yields
message.contenttokens as they arrive.
- Streams Ollama JSON lines from
-
async def stream_with_openai_compatible(messages, config, system_prompt=None) -> AsyncGenerator[str, None]- Streams Server-Sent Events-like lines from
POST {endpoint}/chat/completions(expectsdata: ...lines). - Extracts and yields
choices[0].delta.content.
- Streams Server-Sent Events-like lines from
-
async def stream_with_cloudflare(messages, config, system_prompt=None) -> AsyncGenerator[str, None]- Streams Cloudflare Workers AI with
stream=Trueand yieldsdata["response"]fromdata: ...SSE lines.
- Streams Cloudflare Workers AI with
-
async def stream_with_ollama_tools(messages, config, system_prompt=None, tools=None) -> AsyncGenerator[str, None]- Streams Ollama, detects tool calls in
message.tool_calls, executes them viaexecute_tool, then continues streaming without tools. - Yields a
*Searching...*marker when tool execution begins.
- Streams Ollama, detects tool calls in
-
async def stream_with_abi(messages, config, system_prompt=None) -> AsyncGenerator[str, None]- Streams from an ABI server using SSE:
POST {endpoint}/agents/{agent_name}/stream-completion?token={token}
- Uses the latest
"user"message asprompt, and a hash-basedthread_id. - Prefers SSE event
ai_message; falls back tomessageifai_messagenever appears; suppresses consecutive duplicates.
- Streams from an ABI server using SSE:
-
async def stream_with_abi_inprocess(messages, config, thread_id=None) -> AsyncGenerator[str, None]- Invokes an ABI agent directly in-process via
agent.stream_invoke(...). - Resolves agent from the ABI module registry (and a local fallback set) using
_resolve_inprocess_abi_agent. - If
thread_idis provided and agent supports it, sets agent thread id.
- Invokes an ABI agent directly in-process via
Utilities
-
def is_multimodal_model(model_name: str) -> bool- Returns
Trueifmodel_namecontains any substring inMULTIMODAL_MODEL_PATTERNS.
- Returns
-
async def check_ollama_status(endpoint: str = "http://localhost:11434") -> dict- Calls
GET {endpoint}/api/tags, returns:status:"online"/"offline"models: list of model namesmultimodal_models: filtered list usingis_multimodal_model
- Calls
Tools
-
AVAILABLE_TOOLS: list[dict]- Currently defines one function tool:
search_webwith parameters{query, engine}.
- Currently defines one function tool:
-
async def execute_tool(tool_name: str, arguments: dict) -> str- Implements
search_webby calling local API:POST http://localhost:8000/api/search/webwith{query, engine, limit: 5}
- Returns formatted bullet results, or an error/empty message.
- Implements
Configuration/Dependencies
-
settings(fromnaas_abi.apps.nexus.apps.api.app.core.config) provides fallback credentials:settings.anthropic_api_keysettings.openai_api_keysettings.cloudflare_api_tokensettings.cloudflare_account_id
-
Optional SDK dependencies:
anthropic(required forcomplete_with_anthropic)openai(required forcomplete_with_openai)
-
HTTP client:
httpx.AsyncClientused for Ollama, Cloudflare, custom endpoints, tool execution, and ABI streaming.
-
Provider-specific expectations:
- Ollama default endpoint:
http://localhost:11434ifconfig.endpointis unset. - OpenAI-compatible streaming expects
data: ...lines and[DONE]termination. - Cloudflare model path is derived from
config.model.lstrip("@cf/").
- Ollama default endpoint:
Usage
Route a non-streaming completion (Ollama)
import asyncio
from naas_abi.apps.nexus.apps.api.app.services.providers import (
Message, ProviderConfig, complete_chat
)
async def main():
cfg = ProviderConfig(
id="ollama1",
name="Local Ollama",
type="ollama",
enabled=True,
endpoint="http://localhost:11434",
model="llama3",
)
msgs = [Message(role="user", content="Say hello.")]
text = await complete_chat(msgs, cfg)
print(text)
asyncio.run(main())Stream tokens (OpenAI-compatible endpoint)
import asyncio
from naas_abi.apps.nexus.apps.api.app.services.providers import (
Message, ProviderConfig, stream_with_openai_compatible
)
async def main():
cfg = ProviderConfig(
id="compat1",
name="Compat",
type="custom",
enabled=True,
endpoint="https://example.com/v1", # base URL that serves /chat/completions
api_key="YOUR_KEY",
model="gpt-4o-mini",
)
msgs = [Message(role="user", content="Stream a short poem.")]
async for chunk in stream_with_openai_compatible(msgs, cfg):
print(chunk, end="")
asyncio.run(main())Caveats
complete_chat()does not route to streaming functions or ABI streaming; streaming must be called explicitly.stream_with_openai_compatible()usesconfig.endpoint.rstrip("/")and posts to"{endpoint}/chat/completions"(note: not.../v1/...unless yourendpointalready includes it).stream_with_openai_compatible()assumes SSE-likedata: ...lines; non-SSE streaming formats will be ignored.complete_with_custom()posts to"{config.endpoint}/v1/chat/completions"; include the correct base URL accordingly.- Tool execution (
execute_tool) depends on a local service athttp://localhost:8000/api/search/web; failures are returned as text to the model stream. stream_with_abi()andstream_with_abi_inprocess()only send the latest user message as the prompt (they do not serialize the full message history into the request body).