TripleStoreService
What it is
- A service facade over an
ITripleStorePortadapter that:- Inserts/removes RDF triples into named graphs.
- Runs SPARQL queries (optionally against a “view”).
- Manages a dedicated internal “schema graph” that tracks loaded Turtle schema files and their metadata (hash, file path, last update time, base64 content).
- Optionally publishes triple insert/delete events on a service bus when
services_wiredis enabled.
Public API
Class: TripleStoreService(ServiceBase, ITripleStoreService)
Constructor:
__init__(triple_store_adapter: ITripleStorePort)- Stores the adapter.
- Initializes an internal schema graph at
http://ontology.naas.ai/graph/schema. - Inserts
SCHEMA_TTL(internal ontology describing schema metadata) into that schema graph.
Core triple-store operations:
insert(triples: rdflib.Graph, graph_name: rdflib.term.URIRef) -> None- Inserts all triples into
graph_namevia the adapter. - If
services_wiredisTrue, publishes one bus message per triple on a hashed topicts.insert....
- Inserts all triples into
remove(triples: rdflib.Graph, graph_name: rdflib.term.URIRef) -> None- Removes all triples from
graph_namevia the adapter. - If
services_wiredisTrue, publishes one bus message per triple on a hashed topicts.delete....
- Removes all triples from
get() -> rdflib.Graph- Returns the adapter’s graph (implementation-defined by adapter).
query(query: str) -> rdflib.query.Result- Executes a SPARQL query through the adapter.
query_view(view: str, query: str) -> rdflib.query.Result- Executes a SPARQL query against a named “view” through the adapter.
Graph management:
create_graph(graph_name: URIRef) -> Noneclear_graph(graph_name: URIRef) -> Nonedrop_graph(graph_name: URIRef) -> Nonelist_graphs() -> list[URIRef]
Subscriptions / notifications:
subscribe(topic: tuple[URIRef|None, URIRef|None, URIRef|None], callback: Callable[[bytes], None], event_type: OntologyEvent|None = None, graph_name: URIRef|str = "*") -> None- Subscribes to bus topics for triple insert/delete events.
Nonein the(s, p, o)tuple acts as a wildcard.event_typesupportsOntologyEvent.INSERT,OntologyEvent.DELETE, orNonefor wildcard.graph_name="*"matches any graph; otherwise it is hashed into the topic.
get_subject_graph(subject: str, graph_name: str = "*") -> rdflib.Graph- Delegates to adapter to retrieve a subject-focused graph.
Schema management (stored in the internal schema graph):
load_schemas(filepaths: List[str]) -> None- Preloads existing schema metadata into a local cache graph, then calls
load_schema()for each filepath.
- Preloads existing schema metadata into a local cache graph, then calls
load_schema(filepath: str, schema_cache: Graph | None = None) -> None- If metadata for
filepathexists:- Compares current file hash vs stored hash.
- If changed, computes added/removed triples by parsing old vs new Turtle content and applies
insert/removeto the schema graph. - Updates the stored metadata (hash, last update time, base64 content).
- Removes duplicate metadata subjects for the same
filepath.
- If metadata does not exist:
- Parses the Turtle file and inserts its triples into the schema graph.
- Inserts a new
internal:Schemametadata node with hash, filepath, mtime, and base64 content.
- Exceptions are caught; errors are logged and a traceback is printed.
- If metadata for
remove_schema(filepath: str) -> None- Finds schema metadata entries by
internal:filePath. - Reconstructs schema triples from stored base64 content and removes:
- The schema triples from the schema graph.
- The associated metadata triples.
- Exceptions are caught; errors are logged and a traceback is printed.
- Finds schema metadata entries by
get_schema_graph() -> rdflib.Graph- Rebuilds and returns a merged
rdflib.Graphof all loaded schema contents by decoding each storedinternal:content(base64 Turtle) and parsing it.
- Rebuilds and returns a merged
Configuration/Dependencies
- Requires an adapter implementing
ITripleStorePortwith methods used here:insert,remove,get,query,query_view,create_graph,clear_graph,drop_graph,list_graphs,get_subject_graph.
- Uses
rdflibforGraph,URIRef, parsing Turtle, and SPARQL result types. - Event publishing/subscription relies on
ServiceBase.services.bus:- Publishes to/consumes from bus namespace
"triple_store". - Only active when
services_wiredisTrue.
- Publishes to/consumes from bus namespace
- Internal schema graph IRI:
http://ontology.naas.ai/graph/schema.
Usage
from rdflib import Graph, URIRef
from naas_abi_core.services.triple_store.TripleStoreService import TripleStoreService
# Provide an adapter that implements ITripleStorePort (not shown here).
adapter = ... # e.g., a filesystem/db-backed triple store adapter
ts = TripleStoreService(adapter)
g = Graph()
g.parse(data='@prefix ex: <http://ex/> . ex:a ex:b ex:c .', format="turtle")
graph_name = URIRef("http://example.org/graph/main")
ts.create_graph(graph_name)
ts.insert(g, graph_name=graph_name)
res = ts.query("SELECT * WHERE { ?s ?p ?o }")
for row in res:
print(row)
# Load a Turtle schema file into the internal schema graph and track metadata.
ts.load_schema("path/to/schema.ttl")
# Get the merged graph of all loaded schema contents.
schema_graph = ts.get_schema_graph()
print(len(schema_graph))
Caveats
insert()/remove()assertgraph_nameis notNone;create_graph()/clear_graph()also assert it is aURIRef.- Schema loading assumes Turtle content:
- Existing schemas are parsed using
format="turtle"from stored base64 content. - New schemas are parsed via
Graph().parse(filepath)(rdflib will infer format; typically.ttlis expected).
- Existing schemas are parsed using
load_schema()andremove_schema()swallow exceptions (log + print traceback) rather than raising.- Event topics are hash-based (SHA-256 truncated) per graph/s/p/o, so subscribers must use
subscribe()to construct matching patterns rather than guessing raw topic strings.