TripleStoreService
What it is
A service façade over a triple store adapter (ITripleStorePort) that:
- Inserts/removes RDF triples (
rdflib.Graph) into a triple store. - Runs SPARQL queries (including adapter-defined “views”).
- Manages named graphs (create/clear/drop/list).
- Publishes insert/delete events to a message bus when the service is wired.
- Loads and tracks Turtle schema files with change detection via stored metadata.
On initialization it also inserts an internal ontology (SCHEMA_TTL) describing schema metadata (internal:Schema, internal:hash, internal:filePath, etc.).
Public API
Class: TripleStoreService(triple_store_adapter: ITripleStorePort)
- Purpose: Primary entry point for triple store CRUD, querying, subscriptions, and schema loading.
Methods
-
insert(triples: Graph, graph_name: URIRef | None = None) -> None- Inserts all triples into the store via the adapter.
- If
services_wiredisTrue, publishes one bus message per triple on a topic derived from graph/s/p/o hashes.
-
remove(triples: Graph, graph_name: URIRef | None = None) -> None- Removes all triples via the adapter.
- If
services_wiredisTrue, publishes delete events similarly toinsert.
-
get() -> Graph- Returns the full graph from the adapter.
-
query(query: str) -> rdflib.query.Result- Executes a SPARQL query via the adapter.
-
query_view(view: str, query: str) -> rdflib.query.Result- Executes a view-specific query via the adapter.
-
create_graph(graph_name: URIRef) -> None- Creates a named graph via the adapter.
-
clear_graph(graph_name: URIRef | None = None) -> None- Clears a named graph (or default graph if
None) via the adapter.
- Clears a named graph (or default graph if
-
drop_graph(graph_name: URIRef) -> None- Drops a named graph via the adapter.
-
list_graphs() -> list[URIRef]- Lists graphs via the adapter.
-
subscribe(topic: tuple[URIRef | None, URIRef | None, URIRef | None], callback: Callable[[bytes], None], event_type: OntologyEvent | None = None, graph_name: URIRef | str | None = "*") -> None- Subscribes to bus topics for triple insert/delete events.
topicis an (s, p, o) pattern;Noneacts as wildcard.event_type:INSERT,DELETE, orNonefor all.graph_name:*for all graphs,Nonefor default graph, or a specific graph identifier.
-
get_subject_graph(subject: str) -> Graph- Returns a subgraph for a given subject (converted to
URIRef) via the adapter.
- Returns a subgraph for a given subject (converted to
Schema management
-
load_schemas(filepaths: List[str]) -> None- Loads multiple schema files, first building a cache of existing schema metadata to speed checks.
-
load_schema(filepath: str, schema_cache: Graph | None = None) -> None- Loads a Turtle schema file into the triple store if not present.
- If already present, compares stored content hash vs current file content hash and:
- If unchanged: optionally cleans up duplicate metadata entries and returns.
- If changed: computes triple additions/deletions between old and new Turtle graphs, updates the store, and updates stored metadata.
- Stores schema metadata as
internal:Schemainstances with:internal:hash,internal:filePath,internal:fileLastUpdateTime,internal:content(base64).
-
get_schema_graph() -> Graph- Reconstructs and returns a merged RDF graph of all stored schema contents (
internal:content) by base64-decoding and parsing Turtle.
- Reconstructs and returns a merged RDF graph of all stored schema contents (
Configuration/Dependencies
- Adapter dependency: Requires an
ITripleStorePortimplementation passed to the constructor. The adapter provides storage operations (insert,remove,get,query, graph management, etc.). - Messaging dependency (optional at runtime):
- When
services_wiredisTrue, usesself.services.bus.topic_publish(...)andself.services.bus.topic_consume(...).
- When
- Libraries:
rdflibfor RDF graphs and SPARQL results.
- Internal schema ontology:
SCHEMA_TTLis parsed and inserted into the store on service initialization.
Usage
Basic insert/query (requires a real ITripleStorePort implementation)
from rdflib import Graph, URIRef, Literal
from rdflib.namespace import RDF
# triple_store_adapter must implement ITripleStorePort
store = TripleStoreService(triple_store_adapter)
g = Graph()
s = URIRef("http://example.org/Alice")
g.add((s, RDF.type, URIRef("http://example.org/Person")))
g.add((s, URIRef("http://example.org/name"), Literal("Alice")))
store.insert(g)
res = store.query("SELECT ?s WHERE { ?s a <http://example.org/Person> }")
for row in res:
print(row)
Subscribe to inserts of any triple (when services are wired)
from rdflib.namespace import RDF
def on_msg(payload: bytes) -> None:
print(payload.decode("utf-8").strip())
store.subscribe(
topic=(None, None, None), # wildcard s/p/o
event_type=OntologyEvent.INSERT, # only inserts
graph_name="*", # all graphs
callback=on_msg,
)
Load and track Turtle schemas from files
store.load_schemas(["/path/to/schema1.ttl", "/path/to/schema2.ttl"])
schema_graph = store.get_schema_graph()
print(len(schema_graph))
Caveats
- Per-triple bus publishing:
insert()/remove()publish one message per triple whenservices_wiredisTrue, which can be expensive for large graphs. - Schema timestamp stored as
os.path.getmtime:internal:fileLastUpdateTimeis stored as a numeric timestamp string (not an RDFxsd:dateTimeliteral). load_schema()error handling: Exceptions are caught, logged, and a traceback is printed; errors are not re-raised.