Agent
What it is
A LangGraph/LangChain-based orchestration layer that runs a chat model with tool calling, supports sub-agent handoff via tools, persists conversation state via a checkpointer (in-memory or PostgreSQL), and can stream events (tool usage/response, AI messages) including SSE-friendly output.
Public API
Functions
-
create_checkpointer() -> BaseCheckpointSaver
Creates a conversation checkpointer:- Uses PostgreSQL-backed checkpointer when
POSTGRES_URLis set (shared across instances). - Falls back to in-memory
MemorySaverotherwise (or on PostgreSQL init failure).
- Uses PostgreSQL-backed checkpointer when
-
close_shared_checkpointer() -> None
Closes the shared PostgreSQL connection (if used) and resets the shared checkpointer. -
make_handoff_tool(agent: Agent, parent_graph: bool = False) -> BaseTool
Creates a tool namedtransfer_to_<agent_name>that, when called, returns alanggraph.types.Commandto route execution to the target agent node and injects aToolMessageindicating handoff.
Data / Configuration classes
-
class AgentSharedState
Shared mutable routing state:thread_id: strcurrent_active_agent: Optional[str]supervisor_agent: Optional[str]requesting_help: bool- Setters validate names via
Agent.validate_name.
-
class AgentConfiguration(dataclass)
Hooks and system prompt configuration:on_tool_usage(message)on_tool_response(message)on_ai_message(message, agent_name)system_prompt: str | Callable[[list[AnyMessage]], str]get_system_prompt(messages)returns computed prompt.
-
class CompletionQuery(pydantic.BaseModel)
Request model used by the FastAPI endpoints:prompt: strthread_id: str | int
Events (dataclasses)
Event(payload)ToolUsageEventToolResponseEventAIMessageEvent(payload, agent_name)FinalStateEvent
Main class: class Agent(Expose)
Key constructor and methods:
-
Agent.__init__(...)
Creates an agent with:- tool binding (if model supports it),
- optional sub-agents (added as graph nodes and exposed via
transfer_to_*tools), - optional default tools injection (
enable_default_tools=True), - memory via provided
memoryorcreate_checkpointer(), - compiled LangGraph
graphusing the checkpointer.
-
Agent.validate_name(name: str) -> str(static)
Ensures node/tool-safe names (^[a-zA-Z0-9_-]+$), replacing invalid characters with_. -
Agent.prepare_tools(tools, agents) -> (list[Tool|BaseTool], list[Agent])
Normalizes tools and agents:- Converts
Agentinstances into handoff tools (viaas_tools()). - Deduplicates by tool/agent name.
- Converts
-
Agent.as_tools(parent_graph: bool = False) -> list[BaseTool]
Exposes this agent as a single handoff tool:transfer_to_<agent_name>. -
Agent.build_graph(patcher: Optional[Callable] = None) -> None
Builds and compiles the internal LangGraph state machine. If provided,patchercan modify the graph before compilation. -
Agent.stream(prompt: str) -> Generator[tuple|dict|Any, None, None]
Streams LangGraph execution chunks (graph.stream(..., subgraphs=True)) and emits internal callbacks/events for:- tool usage (AIMessage with tool calls),
- tool response (ToolMessage),
- AI messages (from specific nodes).
-
Agent.invoke(prompt: str) -> str
Runsstream()to completion and returns the final message content (best-effort extraction from last chunk). -
Agent.stream_invoke(prompt: str) -> Generator[dict, None, None]
Runsinvoke()in a background thread and yields SSE-style events:tool_usage,tool_response,ai_message, then line-bufferedmessage, and finaldone.
-
Agent.reset() -> None
Starts a new conversation thread by incrementing numericthread_idor generating a new UUID. -
Agent.duplicate(queue: Queue | None = None, agent_shared_state: AgentSharedState | None = None) -> Agent
Clones the agent configuration into a new instance with:- shared checkpointer,
- new/shared
AgentSharedState, - shared event queue if provided,
- recursively duplicated sub-agents.
-
Agent.as_api(router: APIRouter, ...) -> None
Registers FastAPI endpoints:POST /<route_name>/completion→ returnsinvoke(prompt)for the providedthread_id.POST /<route_name>/stream-completion→ returnsEventSourceResponseusingstream_invoke(prompt).
-
Callback registration:
on_tool_usage(callback)on_tool_response(callback)on_ai_message(callback)(also propagates to sub-agents)
-
Properties:
name: str,description: strtools: list[Tool|BaseTool](structured tools)structured_toolsagents: list[Agent]state: AgentSharedStatechat_model: BaseChatModelconfiguration: AgentConfiguration
-
Misc:
hello() -> strreturns"Hello".
Configuration/Dependencies
Environment variables
-
POSTGRES_URL
When set,create_checkpointer()attempts PostgreSQL-backed persistence vialanggraph.checkpoint.postgres.PostgresSaver+psycopg.- Uses a shared global connection/checkpointer per process.
- Retries connection up to 3 times with 2s delay.
-
ENV=dev
Randomizesstate.thread_idto a UUID during agent initialization.
Key runtime dependencies
- LangChain core:
BaseChatModel, tools (tool,BaseTool,StructuredTool), messages (HumanMessage,AIMessage,ToolMessage, etc.) - LangGraph:
StateGraph,Command,MemorySaver(and optional Postgres saver) - SSE:
sse_starlette.sse.EventSourceResponse(for API streaming) - Internal:
default_tools(self)(optional injection)can_bind_tools(model)(checks tool-binding support)naas_abi_core.models.Model.ChatModel(wrapper type supported in constructor)
Usage
Minimal invocation
from langchain_core.messages import HumanMessage
from langchain_core.language_models import BaseChatModel
from naas_abi_core.services.agent.Agent import Agent
# You must provide a concrete BaseChatModel implementation.
model: BaseChatModel = ... # e.g., a LangChain chat model instance
agent = Agent(
name="main",
description="My agent",
chat_model=model,
)
print(agent.invoke("Hello!"))
Register callbacks (tool usage / responses / AI messages)
from naas_abi_core.services.agent.Agent import Agent
from langchain_core.language_models import BaseChatModel
model: BaseChatModel = ...
agent = Agent(name="main", description="Demo", chat_model=model)
agent.on_tool_usage(lambda msg: print("TOOL USAGE:", msg.tool_calls))
agent.on_tool_response(lambda msg: print("TOOL RESPONSE:", getattr(msg, "content", msg)))
agent.on_ai_message(lambda msg, who: print(f"AI ({who}):", msg.content))
agent.invoke("Do something that calls tools.")
FastAPI endpoints
from fastapi import FastAPI
from naas_abi_core.services.agent.Agent import Agent
from langchain_core.language_models import BaseChatModel
app = FastAPI()
model: BaseChatModel = ...
agent = Agent(name="main", description="API agent", chat_model=model)
agent.as_api(app.router, route_name="agent")
# POST /agent/completion with JSON: {"prompt": "...", "thread_id": "1"}
# POST /agent/stream-completion with same payload (SSE)
Caveats
Agent.New(...)is declared but not implemented (raisesNotImplementedError).- Tool calling depends on the chat model supporting
bind_tools(); when unsupported, tools are not available and a warning is logged. - Name validation replaces invalid characters (including
:) with_; this affects agent/tool names used as LangGraph node IDs. - When a sub-agent has a supervisor set, a
request_helptool is injected and can route control back to the supervisor by settingcurrent_active_agentand returning aCommandto the parent graph. appproperty returnsself._app, but_appis not defined in this file (access may fail unless set elsewhere).