MergeIndividualsPipeline
What it is
A Pipeline that merges two RDF individuals in a triplestore by:
- Copying selected triples from
uri_to_mergetouri_to_keep(avoiding duplicates). - Rewriting references where
uri_to_mergeappears as an object to instead point touri_to_keep. - Removing all original triples involving
uri_to_merge(as subject or object). - Returning the resulting subject graph for
uri_to_keep.
It also writes the inserted/removed triples to Turtle files under a configured datastore path.
Public API
Classes
-
MergeIndividualsPipelineConfiguration(PipelineConfiguration)- Fields:
triple_store: ITripleStoreService- triplestore port used for querying/inserting/removing.datastore_path: str = "datastore/ontology/merged_individual"- folder used to store.ttlaudit files.
- Fields:
-
MergeIndividualsPipelineParameters(PipelineParameters)- Fields (validated against
URI_REGEX):uri_to_keep: str- URI that remains.uri_to_merge: str- URI that is merged intouri_to_keepand then removed.
- Fields (validated against
-
MergeIndividualsPipeline(Pipeline)get_all_triples_for_uri(uri: str)- Queries the triplestore for all triples where
uriappears as subject or object.
- Queries the triplestore for all triples where
run(parameters: PipelineParameters) -> rdflib.Graph- Executes the merge and returns the subject graph for
uri_to_keep.
- Executes the merge and returns the subject graph for
as_tools() -> list[langchain_core.tools.BaseTool]- Exposes the pipeline as a LangChain
StructuredToolnamedmerge_individuals.
- Exposes the pipeline as a LangChain
as_api(...) -> None- Present but does not register any routes (returns
None).
- Present but does not register any routes (returns
Configuration/Dependencies
- Requires an
ITripleStoreServiceimplementation provided viaMergeIndividualsPipelineConfiguration.triple_store. - Uses
ABIModule.get_instance().engine.services.triple_storeand.object_storageinternally to initialize:SPARQLUtils(forget_subject_graph).StorageUtils(to save inserted/removed graphs as Turtle files).
- RDF processing uses
rdflib(Graph,URIRef,Literal, and vocabularies likeRDFS,SKOS). - Special handling:
rdfs:labelandabi:universal_namefrom the merged URI becomeskos:altLabelon the kept URI.
Usage
from naas_abi import ABIModule
from naas_abi_core.engine.Engine import Engine
from naas_abi.pipelines.MergeIndividualsPipeline import (
MergeIndividualsPipeline,
MergeIndividualsPipelineConfiguration,
MergeIndividualsPipelineParameters,
)
engine = Engine()
engine.load(module_names=["naas_abi"])
triple_store_service = ABIModule.get_instance().engine.services.triple_store
pipeline = MergeIndividualsPipeline(
MergeIndividualsPipelineConfiguration(triple_store=triple_store_service)
)
result_graph = pipeline.run(
MergeIndividualsPipelineParameters(
uri_to_keep="http://ontology.naas.ai/abi/<kept-id>",
uri_to_merge="http://ontology.naas.ai/abi/<merged-id>",
)
)
print(result_graph.serialize(format="turtle"))
Caveats
run()requiresMergeIndividualsPipelineParameters; any otherPipelineParameterstype raisesValueError.- The pipeline removes all triples where
uri_to_mergeis subject or object (after inserting replacements where applicable). - Only triples where the merged URI is the subject are considered for copying to the kept URI; duplicates (same predicate/object) are skipped.
as_api()is a no-op (no HTTP endpoints are exposed by this class).