← Back to Portfolio

Task Runner - Workflow Engine

Data Engineering

Scalable workflow engine for DuckDB/S3 operations with per-task isolation and data lineage tracking.

Python DuckDB S3 Dramatiq Docker ECS

title: Task Runner - Scalable Workflow Engine slug: task-runner-workflow-engine description: DuckDB/S3 data operations with DAG support and per-task isolation status: Product published: published-wip category: Developer Tools technologies: - Python - DuckDB - S3 - Dramatiq - Docker - ECS github: https://github.com/macleodlabs/task-runner date: 2025-01-15 featured: false hero: false

Task Runner - Scalable Workflow Engine

Scalable workflow engine for DuckDB/S3 data operations with DAG support, built on Dramatiq with comprehensive error handling, scheduling, and data lineage tracking.

Task Runner Dashboard Workflow execution dashboard showing DAG visualization and real-time progress

Architecture

graph TB
    subgraph "Workflow Definition"
        YAML[YAML Workflow<br/>Spec]
        CLI[CLI Interface]
    end
    
    subgraph "Task Execution"
        ENGINE[Workflow Engine<br/>DAG Resolver]
        RUNNER[Isolated Task<br/>Runner]
        DOCKER[Docker<br/>Containers]
    end
    
    subgraph "Data Layer"
        DUCKDB[(DuckDB<br/>Analytics)]
        S3[(S3<br/>Storage)]
        REDIS[(Redis<br/>Queue)]
    end
    
    subgraph "Monitoring"
        LINEAGE[Data Lineage<br/>Tracker]
        INCR[Incremental<br/>Processor]
        METRICS[CloudWatch<br/>Metrics)]
    end
    
    YAML --> ENGINE
    CLI --> ENGINE
    ENGINE --> RUNNER
    RUNNER --> DOCKER
    DOCKER --> DUCKDB
    DOCKER --> S3
    ENGINE --> REDIS
    RUNNER --> LINEAGE
    RUNNER --> INCR
    RUNNER --> METRICS

Key Innovation: Per-Task Isolation

Task Isolation Architecture Each task runs in its own Docker container with custom Python version and dependencies

Unlike traditional workflow engines, each task gets:

nodes:
  - id: "process-data"
    type: "duckdb_sql"
    params:
      sql: "SELECT * FROM data"
      environment:
        python_version: "3.11"        # Task-specific Python
        requirements:
          - "duckdb==0.9.2"           # Task-specific deps
          - "pandas==2.1.4"
        system_packages:
          - "build-essential"
        memory_limit: "2Gi"
        cpu_limit: "1.0"
        timeout_seconds: 3600

Benefits:

DRY Task Development

BaseTask Interface Inherit from BaseTask for consistent interface and common functionality

All tasks inherit from a common base:

from task_runner import BaseTask, TaskResult

class MyCustomTask(BaseTask):
    """
    Base class provides:
    - Configuration loading
    - Parameter validation  
    - Logging with context
    - Output saving
    - Error handling
    """
    
    def get_required_params(self) -> list:
        """Define required parameters"""
        return ["input_file", "output_file"]
    
    def execute(self) -> TaskResult:
        """Implement task logic"""
        try:
            input_file = self.get_param("input_file")
            output_file = self.get_param("output_file")
            
            # Your task logic here
            result_data = self.process(input_file)
            self.save_output(output_file, result_data)
            
            return TaskResult(
                success=True,
                message="Task completed successfully",
                data={"processed": True}
            )
        except Exception as e:
            return TaskResult(
                success=False,
                message=f"Task failed: {e}",
                error=str(e)
            )

Data Lineage Tracking

Data Lineage Graph Automatic tracking of data dependencies and transformations

Track data flow through workflows:

from task_runner import DataLineageTracker

tracker = DataLineageTracker(redis_client)

# Automatically records:
# - Data reads (which files/tables consumed)
# - Data transforms (source → target mappings)
# - Task dependencies (execution order)
# - Temporal lineage (when data was created)

# Query lineage
lineage = await tracker.get_asset_lineage(
    asset_id="s3://bucket/processed/data.parquet",
    direction="both"  # upstream + downstream
)

# Result:
{
    "asset": "s3://bucket/processed/data.parquet",
    "upstream": [
        {"asset": "s3://bucket/raw/data.json", "task": "download"},
        {"asset": "s3://bucket/raw/data2.json", "task": "download"}
    ],
    "downstream": [
        {"asset": "s3://bucket/reports/summary.csv", "task": "analyze"}
    ],
    "transformations": [
        {
            "task": "convert",
            "operation": "json_to_parquet",
            "timestamp": "2025-01-15T10:30:00Z"
        }
    ]
}

Incremental Processing

Incremental Processing Skip tasks when data hasn't changed, based on S3 ETags and checksums

Intelligent skip logic:

class IncrementalProcessor:
    """
    Determines when tasks can be safely skipped
    """
    
    async def should_skip_task(self, task_spec, context) -> bool:
        """
        Skip if:
        1. Data hasn't changed (S3 ETag match)
        2. Parameters identical (checksum match)
        3. Dependencies unchanged (upstream checksums)
        4. Within max_age_seconds window
        """
        
        # Check data staleness
        if task_spec.incremental.max_age_seconds:
            last_run = await self.get_last_run(task_spec.id)
            if time.time() - last_run < task_spec.incremental.max_age_seconds:
                # Check if source data changed
                current_etag = await self.get_s3_etag(task_spec.source)
                last_etag = await self.get_cached_etag(task_spec.id)
                
                if current_etag == last_etag:
                    self.logger.info(f"Skipping {task_spec.id}: data unchanged")
                    return True
        
        # Check parameter changes
        param_checksum = self.calculate_checksum(task_spec.params)
        last_checksum = await self.get_cached_checksum(task_spec.id)
        
        if param_checksum != last_checksum:
            return False  # Parameters changed, must run
        
        # Check dependency changes
        if task_spec.incremental.dependency_check:
            for dep in task_spec.dependencies:
                dep_changed = await self.dependency_changed(dep)
                if dep_changed:
                    return False
        
        return True  # Safe to skip

Workflow Example

Workflow DAG DAG visualization showing parallel execution and dependencies

Complete data pipeline in YAML:

name: "daily-data-pipeline"
schedule:
  cron: "0 2 * * *"  # 2 AM daily

nodes:
  - id: "download-data"
    type: "download_to_s3"
    params:
      url: "https://api.example.com/data"
      s3_uri: "s3://my-bucket/raw/data.json"
      environment:
        python_version: "3.9"
        requirements: ["requests==2.31.0"]
        memory_limit: "512Mi"
  
  - id: "convert-to-parquet"
    type: "s3_json_to_parquet_duckdb"
    params:
      src_glob: "s3://my-bucket/raw/*.json"
      dst_uri: "s3://my-bucket/processed/data.parquet"
      environment:
        python_version: "3.11"
        requirements: ["duckdb==0.9.2", "pandas==2.1.4"]
        memory_limit: "2Gi"
      incremental:
        enabled: true
        strategy: "hybrid"
        max_age_seconds: 3600
        checksum_fields: ["date", "source"]
        dependency_check: true
    needs: ["download-data"]
  
  - id: "run-analysis"
    type: "duckdb_sql"
    params:
      sql: |
        SELECT 
          date,
          COUNT(*) as record_count,
          AVG(value) as avg_value
        FROM read_parquet('s3://my-bucket/processed/data.parquet')
        GROUP BY date
        ORDER BY date
      output_uri: "s3://my-bucket/reports/analysis.csv"
      environment:
        python_version: "3.12"
        requirements: ["duckdb==0.9.2"]
        memory_limit: "1Gi"
    needs: ["convert-to-parquet"]

Available Task Types

DuckDB Operations

S3 Operations

Infrastructure

Scheduling System

Scheduler Dashboard Workflow scheduling with cron expressions and manual triggers

from task_runner import WorkflowScheduler, ScheduleConfig

scheduler = WorkflowScheduler(redis_client)

# Cron-based schedule
schedule = ScheduleConfig(
    id="daily-pipeline",
    name="Daily Data Pipeline",
    workflow=workflow_spec,
    schedule_type=ScheduleType.CRON,
    cron_expression="0 2 * * *",  # 2 AM daily
    timezone="UTC"
)

await scheduler.add_schedule(schedule)

# Interval-based schedule  
schedule = ScheduleConfig(
    id="frequent-pipeline",
    name="Hourly Updates",
    workflow=workflow_spec,
    schedule_type=ScheduleType.INTERVAL,
    interval_seconds=3600  # Every hour
)

await scheduler.add_schedule(schedule)

Error Handling & Retries

Error Recovery Automatic retries with exponential backoff and dead letter queue

@broker.task(
    retry_on_error=True,
    max_retries=3,
    retry_backoff=True,  # Exponential backoff
    acks_late=True  # Acknowledge after completion
)
async def resilient_task(task_spec: TaskSpec):
    """
    Automatic error handling:
    - Retry with backoff (1s, 2s, 4s)
    - Dead letter queue after max retries
    - Error context preservation
    - Alerting on failures
    """
    try:
        return await execute_task(task_spec)
    except TemporaryError as e:
        # Will be retried
        raise
    except PermanentError as e:
        # Sent to DLQ immediately
        await send_to_dlq(task_spec, e)
        raise

Performance Metrics

Performance Dashboard Real-time metrics showing throughput, latency, and resource usage

Metric                    | Value         | Notes
--------------------------|---------------|---------------------------
Tasks/second             | 100+          | With auto-scaling
Task startup time        | 2-5 seconds   | Container initialization
DuckDB query perf        | 1GB/s         | S3 read throughput
Incremental skip rate    | 40-60%        | Typical data pipelines
Concurrent workflows     | 50+           | Per worker pool
End-to-end latency       | 30s - 5min    | Depends on pipeline

Deployment Options

Docker Compose (Development)

docker compose up -d
task-runner workflow run --file pipeline.yaml

AWS ECS (Production)

# Auto-scaling based on queue depth
cdk deploy TaskRunnerStack

# Deploys:
# - ECS cluster with Fargate
# - Application Load Balancer
# - CloudWatch dashboards
# - Auto-scaling policies

Kubernetes (Enterprise)

helm install task-runner ./charts/task-runner
# Includes:
# - Worker deployments
# - Redis StatefulSet
# - Horizontal Pod Autoscaler

CLI Interface

CLI Commands Interactive CLI for workflow management and monitoring

# Run workflow
task-runner workflow run --tenant mycompany --file pipeline.yaml

# Check status
task-runner workflow status --job-id job-123

# View logs
task-runner workflow logs --job-id job-123 --follow

# Schedule management
task-runner schedule add --name "daily" --cron "0 2 * * *"
task-runner schedule list
task-runner schedule trigger --id daily-123

# Data lineage
task-runner lineage job --job-id job-123
task-runner lineage asset --uri s3://bucket/file.parquet

# Incremental stats
task-runner incremental summary --tenant mycompany

Use Cases

1. ETL Pipelines

Transform data from APIs/databases to analytics-ready formats with incremental processing.

2. ML Feature Engineering

Generate features from raw data with lineage tracking for model debugging.

3. Data Quality Checks

Scheduled validation workflows with alerting on failures.

4. Multi-Tenant Analytics

Isolated workflows per customer with quota management.

Technical Highlights


Scalable data engineering from MacLeod Labs