UpdateTickerPipeline
What it is
- A
Pipelinethat updates a ticker symbol individual in an RDF triple store. - Currently supports adding (if missing) the
ABI.isTickerSymbolOfrelationship from a ticker to an organization.
Public API
-
UpdateTickerPipelineConfiguration- Fields:
triple_store: ITripleStoreService- triple store service used to read and write RDF graphs.
- Fields:
-
UpdateTickerPipelineParameters(Pydantic model)individual_uri: str- URI of the ticker individual (validated byURI_REGEX).organization_uri: Optional[str]- URI of the organization to link (validated byURI_REGEX).
-
UpdateTickerPipeline(configuration)run(parameters: PipelineParameters) -> rdflib.Graph- Validates parameter type (
UpdateTickerPipelineParametersrequired). - Fetches the existing subject graph for
individual_uri. - If
organization_uriis provided and the triple does not already exist, inserts:(individual_uri, ABI.isTickerSymbolOf, organization_uri)
- Writes inserts via
triple_store.insert(...)and returns the updated graph.
- Validates parameter type (
as_tools() -> list[langchain_core.tools.BaseTool]- Exposes a LangChain
StructuredToolnamedupdate_tickerthat callsrun(...).
- Exposes a LangChain
as_api(...) -> None- Present but does nothing (returns
Noneand does not register routes).
- Present but does nothing (returns
Configuration/Dependencies
- Requires an implementation of
ITripleStoreServiceproviding at least:get_subject_graph(subject: rdflib.term.URIRef) -> rdflib.Graphinsert(graph: rdflib.Graph) -> None
- Uses:
rdflib.Graph,rdflib.URIRefnaas_abi_core.utils.Graph.ABI(forABI.isTickerSymbolOf)URI_REGEXfor URI validation in parameterslangchain_core.tools.StructuredToolfor tool exposure
Usage
from rdflib import URIRef
from naas_abi.pipelines.UpdateTickerPipeline import (
UpdateTickerPipeline,
UpdateTickerPipelineConfiguration,
UpdateTickerPipelineParameters,
)
# Provide a triple store service that implements ITripleStoreService
triple_store = ... # ITripleStoreService
pipeline = UpdateTickerPipeline(
UpdateTickerPipelineConfiguration(triple_store=triple_store)
)
result_graph = pipeline.run(
UpdateTickerPipelineParameters(
individual_uri="http://ontology.naas.ai/abi/ticker/ABC",
organization_uri="http://ontology.naas.ai/abi/org/SomeOrganization",
)
)
print(len(result_graph))
Caveats
run()raisesValueErrorifparametersis not anUpdateTickerPipelineParametersinstance.as_api()is a no-op; no HTTP routes are created.- The pipeline only adds the
isTickerSymbolOftriple when missing; it does not remove or replace existing relationships.