UpdateCommercialOrganizationPipeline
What it is
A pipeline that updates RDF properties for an existing commercial organization individual in a triple store. It conditionally inserts missing triples (does not duplicate existing statements) and returns the updated RDF graph.
Public API
-
UpdateCommercialOrganizationPipelineConfiguration(PipelineConfiguration)- Holds dependencies for the pipeline.
- Fields:
triple_store: ITripleStoreService- service used to read/insert RDF triples.
-
UpdateCommercialOrganizationPipelineParameters(PipelineParameters)- Input schema (Pydantic-style annotations) for updates:
individual_uri: str(required) - URI of the commercial organization (validated byURI_REGEX).legal_uri: Optional[str]- URI for legal name individual.ticker_uri: Optional[str]- URI for ticker individual.website_uri: Optional[str]- URI for website individual.linkedin_page_uri: Optional[str]- URI for LinkedIn organization page individual.logo_url: Optional[str]- logo URL literal (patternhttps?://.*).
- Input schema (Pydantic-style annotations) for updates:
-
UpdateCommercialOrganizationPipeline(Pipeline)__init__(configuration: UpdateCommercialOrganizationPipelineConfiguration)- Constructs the pipeline with a triple store dependency.
run(parameters: PipelineParameters) -> rdflib.Graph- Validates parameter type (
UpdateCommercialOrganizationPipelineParameters). - Loads the subject graph for
individual_urifrom the triple store. - For each provided optional field, checks if the corresponding triple already exists; if not, adds it to an insert graph and inserts it into the triple store.
- Returns the original graph merged with inserted triples.
- Validates parameter type (
as_tools() -> list[langchain_core.tools.BaseTool]- Exposes a LangChain
StructuredToolnamedupdate_commercial_organizationthat callsrun(...).
- Exposes a LangChain
as_api(router: APIRouter, ...) -> None- Present but currently does nothing (returns
Nonewithout registering routes).
- Present but currently does nothing (returns
Configuration/Dependencies
- Requires an implementation of
ITripleStoreServicewith at least:get_subject_graph(subject: rdflib.term.URIRef) -> rdflib.Graphinsert(graph: rdflib.Graph) -> None
- Uses RDF predicates from
naas_abi_core.utils.Graph.ABI:ABI.hasLegalName,ABI.hasTickerSymbol,ABI.hasWebsite,ABI.hasLinkedInPage,ABI.logo
- Input URI validation uses
URI_REGEX.
Usage
from naas_abi.pipelines.UpdateCommercialOrganizationPipeline import (
UpdateCommercialOrganizationPipeline,
UpdateCommercialOrganizationPipelineConfiguration,
UpdateCommercialOrganizationPipelineParameters,
)
# triple_store must implement ITripleStoreService
config = UpdateCommercialOrganizationPipelineConfiguration(triple_store=triple_store)
pipeline = UpdateCommercialOrganizationPipeline(config)
params = UpdateCommercialOrganizationPipelineParameters(
individual_uri="http://example.org/org/123",
legal_uri="http://example.org/legalname/abc",
website_uri="http://example.org/website/xyz",
logo_url="https://cdn.example.org/logo.png",
)
updated_graph = pipeline.run(params)
Caveats
run()only inserts missing triples; it does not remove or overwrite existing values.as_api(...)is a no-op in this implementation (no HTTP endpoints are registered).run()raisesValueErrorifparametersis not anUpdateCommercialOrganizationPipelineParametersinstance.