AddIndividualPipeline
What it is
A pipeline that adds a named individual (instance) to an RDF triple store, or returns an existing matching individual if one is already present (based on a search workflow and score threshold).
Public API
-
AddIndividualPipelineConfiguration(PipelineConfiguration)(dataclass)- Purpose: Provide dependencies for the pipeline.
- Fields:
triple_store: ITripleStoreService- triple store service used to insert and fetch graphs.search_individual_configuration: SearchIndividualWorkflowConfiguration- configuration passed toSearchIndividualWorkflow.
-
AddIndividualPipelineParameters(PipelineParameters)(Pydantic model)- Purpose: Input parameters for adding/searching an individual.
- Fields:
individual_label: str- label for the individual (stored asrdfs:label).class_uri: str- class URI the individual will be typed as.threshold: Optional[int] = 80- score threshold (0–100) to accept an existing individual from search results.
-
AddIndividualPipeline(Pipeline)__init__(configuration: AddIndividualPipelineConfiguration)- Purpose: Initialize pipeline and internal
SearchIndividualWorkflow.
- Purpose: Initialize pipeline and internal
run(parameters: PipelineParameters) -> rdflib.Graph- Purpose:
- Searches for an existing individual matching
individual_labelwithinclass_uri. - If a match is found with
score >= threshold, returns the subject graph from the triple store. - Otherwise, creates a new individual with a UUID-based ABI URI, inserts it into the triple store, and returns the created graph.
- Searches for an existing individual matching
- Raises:
ValueErrorifparametersis notAddIndividualPipelineParameters.
- Purpose:
as_tools() -> list[langchain_core.tools.BaseTool]- Purpose: Exposes the pipeline as LangChain
StructuredTools:add_individual_to_triple_store(generic: requiresclass_uriandindividual_label)- Convenience tools that pre-fill
class_uri:add_commercial_organizationadd_personadd_websiteadd_skilladd_legal_nameadd_ticker_symboladd_linkedin_page
- Purpose: Exposes the pipeline as LangChain
as_api(...) -> None- Purpose: Present but not implemented (always returns
Noneand does not register routes).
- Purpose: Present but not implemented (always returns
Configuration/Dependencies
- Requires an
ITripleStoreServiceimplementation providing:insert(graph: rdflib.Graph) -> ...get_subject_graph(subject_uri: str) -> rdflib.Graph
- Uses
SearchIndividualWorkflowfromnaas_abi.workflows.SearchIndividualWorkflow:- Called via
search_individual(...)withclass_uriandsearch_label. - Expects results containing
scoreandindividual_uri.
- Called via
- RDF libraries/terms:
- Builds an
rdflib.Graph, binds namespaces (bfo,cco,abi,dcterms), and adds triples:(individual_uri, rdf:type, owl:NamedIndividual)(individual_uri, rdf:type, <class_uri>)(individual_uri, rdfs:label, "label")
- Builds an
- Namespaces used:
ABI = "http://ontology.naas.ai/abi/"CCO = "https://www.commoncoreontologies.org/"BFO = "http://purl.obolibrary.org/obo/"
Usage
from naas_abi.pipelines.AddIndividualPipeline import (
AddIndividualPipeline,
AddIndividualPipelineConfiguration,
AddIndividualPipelineParameters,
)
from naas_abi.workflows.SearchIndividualWorkflow import SearchIndividualWorkflowConfiguration
# Provide concrete implementations/configuration from your environment:
triple_store = ... # must implement ITripleStoreService
search_cfg = SearchIndividualWorkflowConfiguration(...)
pipeline = AddIndividualPipeline(
AddIndividualPipelineConfiguration(
triple_store=triple_store,
search_individual_configuration=search_cfg,
)
)
g = pipeline.run(
AddIndividualPipelineParameters(
individual_label="Naas.ai",
class_uri="https://www.commoncoreontologies.org/ont00000443",
threshold=80,
)
)
print(len(g), "triples")
Using the LangChain tools:
tools = pipeline.as_tools()
add_person = next(t for t in tools if t.name == "add_person")
result_graph = add_person.run({"individual_label": "Ada Lovelace"})
Caveats
- Search filtering only runs when
thresholdis notNone. Ifthreshold=None, no search result will be accepted and a new individual will always be created. - When multiple matches exceed the threshold, the first result is used.
as_apiis a no-op (no routes are exposed).