DocumentOrchestration
What it is
- A Dagster-based orchestration builder for the document domain.
- Dynamically creates Dagster jobs and sensors based on
ABIModuleconfiguration:- File ingestion pipelines (one job+sensor per configured input path)
- Optional format-to-markdown pipelines (PDF/DOCX/PPTX)
Public API
class DocumentOrchestration(DagsterOrchestration)
@classmethod New() -> DocumentOrchestration- Builds and returns a
DocumentOrchestrationinstance containing adagster.Definitionsobject with:jobs: generated Dagster jobssensors: generated Dagster sensorsassets: emptyschedules: empty
- Builds and returns a
Module-level helpers exist (
_build_*_job_sensor) but are internal (prefixed with_) and not part of the public API.
Configuration/Dependencies
Dependencies
dagster(imported asdg)naas_abi_core.orchestrations.DagsterOrchestration.DagsterOrchestrationnaas_abi_marketplace.domains.document.ABIModulenaas_abi_marketplace.domains.document.FileIngestionConfiguration- Pipelines are imported lazily inside ops:
FilesIngestionPipelinePdfToMarkdownPipelineDocxToMarkdownPipelinePptxToMarkdownPipeline
Configuration inputs (via ABIModule.get_instance().configuration)
file_ingestion_pipelines: iterable ofFileIngestionConfigurationused to create ingestion jobs/sensors.- Each config is used to pass:
input_path,output_path,graph_name,recursive
- Each config is used to pass:
pdftomarkdown_enabled: if truthy, adds a PDF-to-Markdown job+sensor.docxtomarkdown_enabled: if present and truthy (defaults toTrueif missing), adds a DOCX-to-Markdown job+sensor.pptxtomarkdown_enabled: if present and truthy (defaults toTrueif missing), adds a PPTX-to-Markdown job+sensor.
Generated Dagster objects
- Each job is a single-op graph converted to a job.
- Each sensor:
- is tied to its job
- uses
minimum_interval_seconds=60 - currently always yields
[dg.RunRequest(run_key=None)]
Usage
from naas_abi_marketplace.domains.document.orchestrations.DocumentOrchestration import (
DocumentOrchestration,
)
# Build Dagster Definitions (jobs + sensors) from ABIModule configuration
orch = DocumentOrchestration.New()
# Access underlying dagster Definitions if needed
defs = orch.definitions # provided by DagsterOrchestration base (implementation-dependent)
Caveats
- Sensors do not implement file-change detection; they always emit a
RunRequestevery time they are evaluated (subject tominimum_interval_seconds=60). - Job/op names for file ingestion are derived from
input_pathand sanitized to alphanumeric/underscore; different paths could still collide after sanitization.