PostgresIntegration
What it is
- A PostgreSQL integration client built on
psycopg2that:- Executes SQL queries (optionally with parameters)
- Returns results as Python dicts or a pandas
DataFrame - Lists tables and fetches table schema from
information_schema
- Includes
as_tools()to expose the integration as LangChainStructuredTooltools.
Public API
PostgresIntegrationConfiguration
Dataclass configuration for connecting to PostgreSQL.
- Fields:
host: strport: intdatabase: str(mapped todbnamein psycopg2)user: strpassword: strsslmode: str = "require"
PostgresIntegration
Integration client.
-
execute_pandas_query(query: str) -> pd.DataFrame- Runs a SQL query and returns a pandas
DataFrameviapd.read_sql_query.
- Runs a SQL query and returns a pandas
-
execute_query(query: str, params: Optional[Union[Tuple, Dict]] = None, fetch: bool = True) -> Union[List[Dict], int]- Executes a SQL statement.
- If
fetch=True, returnsList[Dict]rows (cursor usesRealDictCursor). - If
fetch=False, commits and returnsrowcount.
-
list_tables() -> List[str]- Returns table names in the
publicschema frominformation_schema.tables.
- Returns table names in the
-
get_table_schema(table_name: str) -> List[Dict[str, Any]]- Returns column info (
column_name,data_type,is_nullable) frominformation_schema.columnsfor the given table.
- Returns column info (
as_tools(configuration: PostgresIntegrationConfiguration) -> list
Converts the integration into LangChain tools (langchain_core.tools.StructuredTool):
postgres_execute_query→PostgresIntegration.execute_querypostgres_list_tables→PostgresIntegration.list_tablespostgres_get_table_schema→PostgresIntegration.get_table_schema
Configuration/Dependencies
- Requires:
psycopg2pandasnaas_abi_core(Integration,IntegrationConfiguration,IntegrationConnectionError)
- For
as_tools():langchain_corepydantic
- Connection parameters are taken from
PostgresIntegrationConfigurationand passed topsycopg2.connect(...)includingsslmode.
Usage
Basic usage
from naas_abi_marketplace.applications.postgres.integrations.PostgresIntegration import (
PostgresIntegration,
PostgresIntegrationConfiguration,
)
cfg = PostgresIntegrationConfiguration(
host="localhost",
port=5432,
database="postgres",
user="postgres",
password="postgres",
sslmode="require",
)
pg = PostgresIntegration(cfg)
tables = pg.list_tables()
print(tables)
rows = pg.execute_query("SELECT 1 AS value", fetch=True)
print(rows)
affected = pg.execute_query("CREATE TABLE IF NOT EXISTS t(x int)", fetch=False)
print(affected)
df = pg.execute_pandas_query("SELECT 1 AS value")
print(df)
LangChain tools
from naas_abi_marketplace.applications.postgres.integrations.PostgresIntegration import (
as_tools,
PostgresIntegrationConfiguration,
)
cfg = PostgresIntegrationConfiguration(
host="localhost", port=5432, database="postgres", user="postgres", password="postgres"
)
tools = as_tools(cfg)
# tools is a list of StructuredTool instances
Caveats
get_table_schema(table_name)interpolatestable_namedirectly into SQL (no parameterization), so callers must ensuretable_nameis trusted.- All failures are wrapped and re-raised as
IntegrationConnectionError(including query errors), with a prefixed message.