MergeIndividualsPipeline
What it is
A Pipeline that merges two RDF individuals in a triplestore by:
- Copying non-duplicate triples from
uri_to_mergetouri_to_keep(with special handling for labels) - Rewriting references to
uri_to_mergeso they point touri_to_keep - Removing all triples involving
uri_to_merge(both as subject and object) - Returning the resulting subject graph for
uri_to_keep
Public API
Classes
-
MergeIndividualsPipelineConfiguration(PipelineConfiguration)- Purpose: Provide dependencies and output settings.
- Fields:
triple_store: ITripleStoreService— triplestore used for query/insert/removedatastore_path: str = "datastore/ontology/merged_individual"— where TTL snapshots are saved
-
MergeIndividualsPipelineParameters(PipelineParameters)- Purpose: Input parameters for the merge.
- Fields:
uri_to_keep: str— URI that remains (validated byURI_REGEX)uri_to_merge: str— URI to merge intouri_to_keepthen remove (validated byURI_REGEX)
-
MergeIndividualsPipeline(Pipeline)- Purpose: Executes the merge in the triplestore.
Methods (MergeIndividualsPipeline)
-
get_all_triples_for_uri(uri: str)- Purpose: SPARQL query for all triples where
uriappears as subject or object. - Returns: triplestore query result rows of
(?s, ?p, ?o).
- Purpose: SPARQL query for all triples where
-
run(parameters: PipelineParameters) -> rdflib.Graph- Purpose: Performs the merge transaction:
- Inserts:
- For triples where
uri_to_mergeis subject:- Copies predicates/objects to
uri_to_keepunless predicate isrdfs:labelorabi:universal_name - Skips if
(uri_to_keep, p, o)already exists - Preserves
Literaldatatype and language tags
- Copies predicates/objects to
- For
rdfs:labelandabi:universal_nameonuri_to_merge:- Adds them to
uri_to_keepasskos:altLabel
- Adds them to
- For triples where
uri_to_mergeis object:- Rewrites to use
uri_to_keepas object - Skips if
(s, p, uri_to_keep)already exists
- Rewrites to use
- For triples where
- Removes:
- Removes all triples where
uri_to_mergewas subject or object
- Removes all triples where
- Persists TTL snapshots via
StorageUtils.save_triples(...) - Applies changes using
triple_store.insert(...)andtriple_store.remove(...)
- Inserts:
- Returns:
SPARQLUtils.get_subject_graph(uri_to_keep)
- Purpose: Performs the merge transaction:
-
as_tools() -> list[langchain_core.tools.BaseTool]- Purpose: Exposes the pipeline as a LangChain
StructuredToolnamedmerge_individuals.
- Purpose: Exposes the pipeline as a LangChain
-
as_api(...) -> None- Purpose: API exposure hook; currently does nothing and returns
None.
- Purpose: API exposure hook; currently does nothing and returns
Configuration/Dependencies
- Requires an
ITripleStoreServiceimplementation with:query(sparql: str)insert(graph: rdflib.Graph)remove(graph: rdflib.Graph)
- Uses
ABIModule.get_instance().engine.servicesfor:triple_store(indirectly viaSPARQLUtils)object_storage(viaStorageUtils)
- Writes snapshot TTL files under
MergeIndividualsPipelineConfiguration.datastore_path.
Usage
Minimal example (script-style)
from naas_abi import ABIModule
from naas_abi_core.engine.Engine import Engine
from naas_abi.pipelines.MergeIndividualsPipeline import (
MergeIndividualsPipeline,
MergeIndividualsPipelineConfiguration,
MergeIndividualsPipelineParameters,
)
engine = Engine()
engine.load(module_names=["naas_abi"])
triple_store = ABIModule.get_instance().engine.services.triple_store
pipeline = MergeIndividualsPipeline(
MergeIndividualsPipelineConfiguration(triple_store=triple_store)
)
result_graph = pipeline.run(
MergeIndividualsPipelineParameters(
uri_to_keep="http://ontology.naas.ai/abi/<keep-id>",
uri_to_merge="http://ontology.naas.ai/abi/<merge-id>",
)
)
print(result_graph.serialize(format="turtle"))
As a LangChain tool
tool = pipeline.as_tools()[0]
tool.invoke({
"uri_to_keep": "http://ontology.naas.ai/abi/<keep-id>",
"uri_to_merge": "http://ontology.naas.ai/abi/<merge-id>",
})
Caveats
run()requiresMergeIndividualsPipelineParameters; otherwise it raisesValueError.- All triples involving
uri_to_mergeare removed (both where it is subject and where it is object). rdfs:labelandabi:universal_namefromuri_to_mergeare not copied as-is; they are added asskos:altLabelonuri_to_keep.- Duplicate checks are performed against the pre-merge snapshot of
uri_to_keeptriples (i.e., duplicates created by earlier inserts in the same run are not re-checked againstgraph_insert).