InsertDataSPARQLPipeline
What it is
- A pipeline that executes a SPARQL
INSERT DATAstatement against an in-memoryrdflib.Graph, then inserts the resulting triples into a configured triple store (ITripleStoreService). - Includes helper tooling to extract/clean an
INSERT DATAstatement from fenced code blocks (sparql ...).
Public API
Classes
-
InsertDataSPARQLPipelineConfiguration(PipelineConfiguration)- Fields
triple_store: ITripleStoreService- target triple store service used for insertion.
- Fields
-
InsertDataSPARQLPipelineParameters(PipelineParameters)- Fields
sparql_statement: str- SPARQLINSERT DATAstatement (optionally wrapped in ```sparql fences).
- Fields
-
InsertDataSPARQLPipeline(Pipeline)__init__(configuration: InsertDataSPARQLPipelineConfiguration)- Stores configuration (notably the triple store service).
get_sparql_from_text(parameters: InsertDataSPARQLPipelineParameters) -> str- Strips optional ```sparql fences and returns the text if it contains
"INSERT DATA". - Otherwise returns a message string indicating no statement was found.
- Strips optional ```sparql fences and returns the text if it contains
run(parameters: PipelineParameters) -> rdflib.Graph- Validates
parameterstype (InsertDataSPARQLPipelineParametersrequired). - Creates a new
rdflib.Graph, binds namespaces (bfo,cco,abi), runsgraph.update(...). - If triples were inserted (
len(graph) > 0), callsconfiguration.triple_store.insert(graph). - Returns the resulting
Graph(or an emptyGraph()on SPARQL execution failure).
- Validates
as_tools() -> list[langchain_core.tools.BaseTool]- Exposes two LangChain
StructuredTools:insert_data_sparql: runs the pipeline withsparql_statement.extract_sparql_from_text: returns extracted/cleaned SPARQL text.
- Exposes two LangChain
as_api(...) -> None- Present but does not register any endpoints (no-op; returns
None).
- Present but does not register any endpoints (no-op; returns
Configuration/Dependencies
- Requires an implementation of
naas_abi_core.services.triple_store.TripleStorePorts.ITripleStoreServicewith aninsert(graph: rdflib.Graph)method. - Uses:
rdflib.Graphfor SPARQL update execution and triple storage before insertion.langchain_core.tools.StructuredToolfor tool exposure.
- Namespace bindings applied to the graph:
bfo = http://purl.obolibrary.org/obo/cco = https://www.commoncoreontologies.org/abi = http://ontology.naas.ai/abi/
Usage
from naas_abi.pipelines.InsertDataSPARQLPipeline import (
InsertDataSPARQLPipeline,
InsertDataSPARQLPipelineConfiguration,
InsertDataSPARQLPipelineParameters,
)
# Provide a real ITripleStoreService from your environment
triple_store_service = ... # must implement .insert(rdflib.Graph)
pipeline = InsertDataSPARQLPipeline(
InsertDataSPARQLPipelineConfiguration(triple_store=triple_store_service)
)
sparql = """
PREFIX abi: <http://ontology.naas.ai/abi/>
PREFIX owl: <http://www.w3.org/2002/07/owl#>
INSERT DATA {
abi:john a owl:NamedIndividual .
}
"""
graph = pipeline.run(InsertDataSPARQLPipelineParameters(sparql_statement=sparql))
print(len(graph))
Caveats
run()only acceptsInsertDataSPARQLPipelineParameters; otherwise it raisesValueError.get_sparql_from_text()performs a simple substring check for"INSERT DATA"; it may return a non-query message string which will later causegraph.update(...)to fail and return an empty graph.as_api()is a no-op (does not expose HTTP endpoints).