Agent
What it is
A LangGraph/LangChain-based orchestration layer that:
- Wraps a
BaseChatModel(orChatModel) and binds tools (including sub-agents exposed as “handoff” tools). - Maintains conversation state via a checkpointer (in-memory by default, PostgreSQL if configured).
- Exposes synchronous (
invoke) and streaming (stream,stream_invoke) execution, plus optional FastAPI endpoints (as_api).
Public API
Module-level functions
create_checkpointer() -> BaseCheckpointSaver- Creates a checkpointer:
- PostgreSQL-backed when
POSTGRES_URLis set (shared connection, initialized once). - Otherwise
MemorySaver(in-memory).
- PostgreSQL-backed when
- Creates a checkpointer:
close_shared_checkpointer() -> None- Closes and clears the shared PostgreSQL connection/checkpointer (if used).
State / configuration models
class AgentSharedState- Shared cross-agent state:
thread_id: strcurrent_active_agent: Optional[str]supervisor_agent: Optional[str]requesting_help: bool
- Methods: getters/setters for all fields.
- Shared cross-agent state:
class ABIAgentState(MessagesState)- LangGraph state:
messagesplussystem_prompt: str.
- LangGraph state:
class AgentConfiguration- Callbacks and system prompt configuration:
on_tool_usage(message)on_tool_response(message)on_ai_message(message, agent_name)system_prompt: str | Callable[[list[AnyMessage]], str]
get_system_prompt(messages) -> str
- Callbacks and system prompt configuration:
class CompletionQuery(BaseModel)- FastAPI request schema:
prompt: strthread_id: str | int
- FastAPI request schema:
Events (used for SSE streaming)
Event(payload)ToolUsageEvent(payload)ToolResponseEvent(payload)AIMessageEvent(payload, agent_name)FinalStateEvent(payload)
Main class: Agent
Constructor:
Agent(...)- Key parameters:
name: str,description: strchat_model: BaseChatModel | ChatModeltools: list[Tool | BaseTool | Agent](default tools are injected automatically)agents: list[Agent](sub-agents; also exposed as handoff tools)memory: BaseCheckpointSaver | None(defaults tocreate_checkpointer())state: AgentSharedStateconfiguration: AgentConfigurationevent_queue: Queue | None(used bystream_invoke)native_tools: list[dict](passed through to model tool binding)
- Key parameters:
Core methods/properties:
invoke(prompt: str) -> str- Runs a prompt through the compiled graph and returns the final message content (if any).
stream(prompt: str) -> Generator[dict[str, Any] | Any, None, None]- Streams LangGraph execution chunks; also triggers callbacks/events for tool usage/response and AI messages.
stream_invoke(prompt: str) -> Generator[dict, None, None]- Server-Sent-Events (SSE) style generator using the internal event queue:
- emits
tool_usage,tool_response,ai_message,message,done.
- emits
- Server-Sent-Events (SSE) style generator using the internal event queue:
reset() -> None- Advances
thread_id(numeric increment if possible, else new UUID).
- Advances
duplicate(queue: Queue | None = None, agent_shared_state: AgentSharedState | None = None) -> Agent- Creates a new agent instance with duplicated sub-agents and shared memory backend; uses provided/shared state and event queue.
as_api(router: APIRouter, ...) -> None- Registers FastAPI endpoints:
POST /{route}/completionPOST /{route}/stream-completion(SSE)
- Registers FastAPI endpoints:
- Callback registration:
on_tool_usage(callback)on_tool_response(callback)on_ai_message(callback)(propagates to sub-agents)
- Tool/sub-agent exposure:
as_tools(parent_graph: bool = False) -> list[BaseTool](handoff tool(s))tools(property): structured tools available to the agentagents(property): configured sub-agents
- Introspection/config:
name,description,state,chat_model,configuration,structured_tools
hello() -> str- Returns
"Hello".
- Returns
Default injected tools (via default_tools()):
get_time_date(timezone="Europe/Paris")(return_direct=False)list_tools_available()(return_direct=True)list_subagents_available()(return_direct=True)list_intents_available()(return_direct=True; depends on_intentsbeing present elsewhere)- Conditionally:
request_help(reason)ifstate.supervisor_agentexists and differs fromself.nameget_current_active_agent(),get_supervisor_agent()if a supervisor exists or there are sub-agentsread_makefile()only whenstate.supervisor_agent == self.nameandENV=dev
Handoff tool factory
make_handoff_tool(agent: Agent, parent_graph: bool = False) -> BaseTool- Creates
transfer_to_{agent.name}tool that returns aCommandrouting execution to the target agent node, appending aToolMessageto the message history.
- Creates
Configuration/Dependencies
Environment variables
POSTGRES_URL- If set,
create_checkpointer()attempts to use PostgreSQL-backed persistence vialanggraph.checkpoint.postgres.PostgresSaverandpsycopg. - Uses a shared connection/checkpointer across instances; closed at process exit (and via
close_shared_checkpointer()).
- If set,
ENV- If
ENV == "dev":- Agent constructor randomizes
thread_idwith a UUID. - Adds
read_makefiletool when the agent is its own supervisor (state.supervisor_agent == self.name).
- Agent constructor randomizes
- If
Runtime dependencies (non-exhaustive)
- LangChain core (
langchain_core.*) for models/messages/tools. - LangGraph (
langgraph.*) forStateGraph,Command, checkpointers. sse_starlettefor SSE streaming response (EventSourceResponse).- Optional for PostgreSQL memory:
psycopg,langgraph.checkpoint.postgres.
Usage
Minimal invocation (requires a LangChain BaseChatModel implementation)
from langchain_core.language_models import BaseChatModel
from naas_abi_core.services.agent.Agent import Agent
# Provide a concrete BaseChatModel instance from your LangChain provider
chat_model: BaseChatModel = ...
agent = Agent(
name="assistant",
description="General assistant",
chat_model=chat_model,
)
print(agent.invoke("What time is it in Europe/Paris?"))
With a sub-agent (handoff tool is auto-created)
from naas_abi_core.services.agent.Agent import Agent, AgentSharedState
chat_model = ... # BaseChatModel
shared_state = AgentSharedState(supervisor_agent="supervisor")
math_agent = Agent(
name="math",
description="Handles math questions",
chat_model=chat_model,
state=AgentSharedState(supervisor_agent="supervisor"),
)
supervisor = Agent(
name="supervisor",
description="Routes to specialists",
chat_model=chat_model,
agents=[math_agent],
state=shared_state,
)
print(supervisor.invoke("@math What is 2+2?"))
Caveats
- Tool binding is attempted only if the model supports
.bind_tools(...); otherwise tools are not available (a warning is logged). call_tools()assumes tool names exist in_tools_by_name; missing tool names will raise aKeyError.invoke()extracts the final response by inspecting the last streamed chunk; if no messages are present it returns"".stream_invoke()uses a sharedQueue; if reused across concurrent requests without isolating queues, events may interleave.