MergeIndividualsPipeline
What it is
A pipeline that merges RDF individuals in a triplestore by transferring triples from one URI (uri_to_merge) into another (uri_to_keep), then removing the original triples associated with uri_to_merge. It also persists insert/remove graphs to object storage as Turtle files.
Public API
-
MergeIndividualsPipelineConfiguration(PipelineConfiguration)- Fields
triple_store: ITripleStoreService— triplestore backend used forquery/insert/remove.object_storage: IObjectStorageDomain— storage backend used to save TTL outputs.datastore_path: str = "datastore/ontology/merged_individual"— output directory prefix used by storage.
- Fields
-
MergeIndividualsPipelineParameters(PipelineParameters)- Fields
merge_pairs: list[tuple[str, str]]— list of(uri_to_keep, uri_to_merge)pairs.
- Validation
- Must be non-empty.
- Each element must be a tuple of exactly 2 strings.
- Each URI must match
URI_REGEX. - URIs in a pair must be different.
- Fields
-
MergeIndividualsPipeline(Pipeline)run(parameters: PipelineParameters) -> rdflib.Graph- Executes one or more merge operations from
MergeIndividualsPipelineParameters.merge_pairs. - Returns a combined
rdflib.Graphcontaining the resulting subject graphs for eachuri_to_keep.
- Executes one or more merge operations from
as_tools() -> list[langchain_core.tools.BaseTool]- Exposes a LangChain
StructuredToolnamedmerge_individualsusingMergeIndividualsPipelineParametersas schema.
- Exposes a LangChain
as_api(...) -> None- Present but does not register any routes (returns
None).
- Present but does not register any routes (returns
get_all_triples_for_uri(uri: str)- Queries all triples where
uriappears as subject or object.
- Queries all triples where
Configuration/Dependencies
- Requires implementations of:
ITripleStoreService(used forquery,insert,remove)IObjectStorageDomain(used viaStorageUtils.save_triples)
- Uses:
SPARQLUtils.get_subject_graph(uri)to return the final merged graph per kept URI.rdflibfor graph construction and serialization.
- Output files written (per merge pair) under
datastore_path:"{uri_to_keep_label}_{last_segment}_merged.ttl"— inserted triples"{uri_to_merge_label}_{last_segment}_removed.ttl"— removed triples
Usage
from naas_abi_core.engine.Engine import Engine
from naas_abi import ABIModule
from naas_abi_marketplace.domains.ontology_engineer.pipelines.MergeIndividualsPipeline import (
MergeIndividualsPipeline,
MergeIndividualsPipelineConfiguration,
MergeIndividualsPipelineParameters,
)
# Boot engine/services (project-specific)
engine = Engine()
engine.load(module_names=["naas_abi_marketplace.domains.ontology_engineer"])
triple_store = ABIModule.get_instance().engine.services.triple_store
object_storage = ABIModule.get_instance().engine.services.object_storage
config = MergeIndividualsPipelineConfiguration(
triple_store=triple_store,
object_storage=object_storage,
)
pipeline = MergeIndividualsPipeline(config)
result = pipeline.run(
MergeIndividualsPipelineParameters(
merge_pairs=[
("http://ontology.naas.ai/abi/KEEP_URI", "http://ontology.naas.ai/abi/MERGE_URI"),
]
)
)
print(len(result))
Caveats
- Merges are done by:
- Copying non-duplicate triples where
uri_to_mergeis the subject intouri_to_keep(excludingrdfs:labelandabi:universal_name). - Converting
rdfs:labelandabi:universal_namefromuri_to_mergeintoskos:altLabelonuri_to_keep. - Rewriting triples where
uri_to_mergeis the object to point touri_to_keep(if not already present). - Removing all triples returned for
uri_to_merge(both subject and object occurrences).
- Copying non-duplicate triples where
- Output filenames include labels; if labels are missing or contain filesystem-hostile characters, saving may fail depending on the storage implementation.