Creating a Pipeline
Pipelines transform raw data from integrations into OWL/RDF triples and store them in the knowledge graph. They are the semantic ingestion layer.
Role in the stack
Integration (raw API data) → Pipeline (transforms to RDF) → Triple Store
A pipeline:
- Calls an integration to fetch raw data.
- Maps that data to ontology classes and properties.
- Writes RDF triples to the triple store.
- Is exposed as a tool for agents and as an API endpoint.
Pipeline class
# pipelines/MyPipeline.py
from naas_abi_core.pipeline.pipeline import Pipeline, PipelineConfiguration
from naas_abi_core.pipeline.pipeline import PipelineParameters
from naas_abi_core.services.triple_store.TripleStoreService import TripleStoreService
from naas_abi.modules.custom.my_module.integrations.MyIntegration import (
MyIntegration, MyIntegrationConfiguration,
)
from dataclasses import dataclass
from pydantic import BaseModel, Field
from rdflib import Graph, URIRef, Literal, Namespace
from rdflib.namespace import RDF, RDFS, OWL
ABI = Namespace("http://ontology.naas.ai/abi/")
CCO = Namespace("https://www.commoncoreontologies.org/")
@dataclass
class MyPipelineConfiguration(PipelineConfiguration):
integration_config: MyIntegrationConfiguration
triple_store_service: TripleStoreService
class MyPipelineParameters(PipelineParameters):
filter: str | None = Field(None, description="Optional filter for items to ingest")
class MyPipeline(Pipeline):
__configuration: MyPipelineConfiguration
def __init__(self, configuration: MyPipelineConfiguration):
self.__configuration = configuration
self.__integration = MyIntegration(configuration.integration_config)
def run(self, parameters: MyPipelineParameters) -> dict:
# 1. Fetch raw data
items = self.__integration.list_items(filter=parameters.filter)
# 2. Build an RDF graph
g = Graph()
for item in items:
item_uri = URIRef(f"http://ontology.naas.ai/abi/item/{item['id']}")
# Map to ontology class
g.add((item_uri, RDF.type, CCO.InformationContentEntity))
# Map properties
if item.get("name"):
g.add((item_uri, RDFS.label, Literal(item["name"])))
if item.get("url"):
g.add((item_uri, ABI.url, Literal(item["url"])))
# 3. Store in triple store
triples_added = len(g)
self.__configuration.triple_store_service.insert_graph(g)
return {"triples_added": triples_added, "items_processed": len(items)}
def as_tools(self) -> list:
from langchain_core.tools import StructuredTool
return [
StructuredTool(
name="ingest_myservice_items",
description="Fetch items from MyService and store them in the knowledge graph.",
func=lambda **kwargs: self.run(MyPipelineParameters(**kwargs)),
args_schema=MyPipelineParameters,
)
]
def as_api(self, router) -> None:
from fastapi import APIRouter
@router.post("/pipelines/myservice/ingest")
def run_pipeline(parameters: MyPipelineParameters):
return self.run(parameters)
Using onto2py generated classes
If your ontology has been processed by onto2py, you can use typed Python classes instead of raw RDF construction:
# onto2py generates this from MyOntology.ttl:
from naas_abi.modules.custom.my_module.ontologies.MyOntology import MyItem
# Use it in the pipeline:
item_obj = MyItem(
uri=f"http://ontology.naas.ai/abi/item/{item['id']}",
label=item["name"],
url=item["url"],
)
g += item_obj.to_graph()
See [[building/creating-an-ontology|Creating an Ontology]] for onto2py setup.
Registering in Dagster
Pipelines can be scheduled via Dagster. Define a Dagster asset in your module:
from dagster import asset
@asset
def my_pipeline_asset(context):
pipeline = MyPipeline(MyPipelineConfiguration(...))
return pipeline.run(MyPipelineParameters())
Best practices
- Idempotent: running the same pipeline twice should not create duplicate triples. Use
owl:hasKeyin your ontology for entity deduplication, or check for existing triples before inserting. - Schema-first: define (or reuse) an ontology class for every entity type you ingest before writing the pipeline.
- Small batches: for large datasets, paginate the integration and commit triples in batches to avoid memory pressure.
- Log progress: use
abi.loggerfor structured logging; avoidprint().