FilesIngestionPipeline
What it is
A pipeline that lists file objects from an object-storage prefix, uploads each file into the document knowledge graph (as File resources), and optionally deletes the source objects after ingestion.
Public API
-
FilesIngestionPipelineConfiguration(PipelineConfiguration)- Configuration container for the pipeline (currently no additional fields).
-
FilesIngestionPipelineParameters(PipelineParameters)input_path: str: Object-storage prefix to ingest (recursive).output_path: str: Destination directory/prefix where ingested files should be saved.graph_name: str(default:"http://ontology.naas.ai/graph/document"): Target RDF graph IRI.recursive: bool(default:True): Whether to traverse prefixes recursively.processor_iri: str(default:"http://ontology.naas.ai/abi/document/FileIngestionProcessor"): IRI recorded as processor.delete_from_input: bool(default:False): Delete source objects after ingestion (also applies to already-ingested files).
-
FilesIngestionPipeline(Pipeline)run(parameters: PipelineParameters) -> rdflib.Graph- Lists objects under
input_path, fetches content and metadata, deduplicates by SHA-256 (viafile_already_ingested), creates/uploadsFileresources, returns an RDFLibGraphcontaining the generated triples.
- Lists objects under
as_tools() -> list[BaseTool]- Returns an empty list.
as_api()- Declared but not implemented (
pass).
- Declared but not implemented (
Configuration/Dependencies
-
Requires an initialized
ABIModule(singleton viaABIModule.get_instance()) providing:engine.services.object_storagewith:list_objects(prefix=...)get_object(prefix="", key=...)get_object_metadata(prefix="", key=...)delete_object(prefix="", key=...)
engine.services.triple_storeis referenced by a private helper (__ensure_graph_exists) but not used inrun().
-
Uses:
file_already_ingested(sha256: str, graph_name: str)for deduplication.File.UploadAndCreateFile(...)to create/upload and emit RDF triples.rdflib.Graphfor output.
Usage
from naas_abi_marketplace.domains.document.pipelines.FilesIngestion.FilesIngestionPipeline import (
FilesIngestionPipeline,
FilesIngestionPipelineConfiguration,
FilesIngestionPipelineParameters,
)
pipeline = FilesIngestionPipeline(FilesIngestionPipelineConfiguration())
g = pipeline.run(
FilesIngestionPipelineParameters(
input_path="documents/my_folder",
output_path="ingested/documents",
graph_name="http://ontology.naas.ai/graph/document",
recursive=True,
delete_from_input=False,
)
)
print(g.serialize(format="turtle"))
Caveats
run()requiresFilesIngestionPipelineParameters; otherwise it raisesValueError.- Listing treats object storage keys as POSIX-like paths; directory detection depends on
list_objects()behavior and may probe entries by attempting to list them as prefixes. - The pipeline computes SHA-256 by downloading full object content; large objects may be expensive.
graph_nameis annotated asstr, but the__main__example passes anrdflib.URIRef; ensure your call site matches what your pipeline framework expects.- Graph creation is not ensured during
run(); the private__ensure_graph_exists()is defined but not invoked.