Skip to main content

Dagster

Dagster is ABI's orchestration layer for scheduled jobs, data pipelines, and asset materialization.


Starting Dagster

make dagster-dev           # Development server with live reload
make dagster-up # Background service
make dagster-ui # Open web UI in browser (port 3001)

What Dagster does in ABI

Dagster orchestrates ABI pipelines as assets - materialized data artifacts with lineage tracking, scheduling, and failure recovery. Use it when you want to:

  • Run data ingestion pipelines on a schedule (e.g. sync LinkedIn data every hour).
  • Track which data assets are stale and need re-computation.
  • Retry failed pipeline runs automatically.
  • Monitor pipeline health in a visual UI.

Defining a Dagster asset from a pipeline

# orchestrations/MyOrchestration.py
from dagster import asset, AssetExecutionContext
from naas_abi.modules.custom.my_module.pipelines.MyPipeline import (
MyPipeline, MyPipelineConfiguration, MyPipelineParameters,
)

@asset(
name="my_service_items",
description="Items from MyService, ingested into the knowledge graph",
group_name="my_module",
)
def my_service_items_asset(context: AssetExecutionContext) -> None:
pipeline = MyPipeline(
MyPipelineConfiguration(
integration_config=...,
triple_store_service=...,
)
)
result = pipeline.run(MyPipelineParameters())
context.log.info(f"Ingested {result['triples_added']} triples")

Scheduling assets

from dagster import ScheduleDefinition, define_asset_job

hourly_sync_job = define_asset_job(
name="hourly_myservice_sync",
selection=["my_service_items"],
)

hourly_schedule = ScheduleDefinition(
job=hourly_sync_job,
cron_schedule="0 * * * *", # Every hour
)

Service management

make dagster-status         # Check asset materialization status
make dagster-materialize # Trigger all assets immediately
make dagster-down # Stop Dagster service