UpdateTickerPipeline
What it is
- A
Pipeline that updates a ticker symbol individual in an RDF triple store.
- Currently supports adding (if missing) the
ABI.isTickerSymbolOf relationship from a ticker to an organization.
Public API
-
UpdateTickerPipelineConfiguration
- Fields:
triple_store: ITripleStoreService — triple store service used to read and write RDF graphs.
-
UpdateTickerPipelineParameters (Pydantic model)
individual_uri: str — URI of the ticker individual (validated by URI_REGEX).
organization_uri: Optional[str] — URI of the organization to link (validated by URI_REGEX).
-
UpdateTickerPipeline(configuration)
run(parameters: PipelineParameters) -> rdflib.Graph
- Validates parameter type (
UpdateTickerPipelineParameters required).
- Fetches the existing subject graph for
individual_uri.
- If
organization_uri is provided and the triple does not already exist, inserts:
(individual_uri, ABI.isTickerSymbolOf, organization_uri)
- Writes inserts via
triple_store.insert(...) and returns the updated graph.
as_tools() -> list[langchain_core.tools.BaseTool]
- Exposes a LangChain
StructuredTool named update_ticker that calls run(...).
as_api(...) -> None
- Present but does nothing (returns
None and does not register routes).
Configuration/Dependencies
- Requires an implementation of
ITripleStoreService providing at least:
get_subject_graph(subject: rdflib.term.URIRef) -> rdflib.Graph
insert(graph: rdflib.Graph) -> None
- Uses:
rdflib.Graph, rdflib.URIRef
naas_abi_core.utils.Graph.ABI (for ABI.isTickerSymbolOf)
URI_REGEX for URI validation in parameters
langchain_core.tools.StructuredTool for tool exposure
Usage
from rdflib import URIRef
from naas_abi.pipelines.UpdateTickerPipeline import (
UpdateTickerPipeline,
UpdateTickerPipelineConfiguration,
UpdateTickerPipelineParameters,
)
# Provide a triple store service that implements ITripleStoreService
triple_store = ... # ITripleStoreService
pipeline = UpdateTickerPipeline(
UpdateTickerPipelineConfiguration(triple_store=triple_store)
)
result_graph = pipeline.run(
UpdateTickerPipelineParameters(
individual_uri="http://ontology.naas.ai/abi/ticker/ABC",
organization_uri="http://ontology.naas.ai/abi/org/SomeOrganization",
)
)
print(len(result_graph))
Caveats
run() raises ValueError if parameters is not an UpdateTickerPipelineParameters instance.
as_api() is a no-op; no HTTP routes are created.
- The pipeline only adds the
isTickerSymbolOf triple when missing; it does not remove or replace existing relationships.