title: Task Runner - Scalable Workflow Engine slug: task-runner-workflow-engine description: DuckDB/S3 data operations with DAG support and per-task isolation status: Product published: published-wip category: Developer Tools technologies: - Python - DuckDB - S3 - Dramatiq - Docker - ECS github: https://github.com/macleodlabs/task-runner date: 2025-01-15 featured: false hero: false
Task Runner - Scalable Workflow Engine
Scalable workflow engine for DuckDB/S3 data operations with DAG support, built on Dramatiq with comprehensive error handling, scheduling, and data lineage tracking.
Workflow execution dashboard showing DAG visualization and real-time progress
Architecture
graph TB
subgraph "Workflow Definition"
YAML[YAML Workflow<br/>Spec]
CLI[CLI Interface]
end
subgraph "Task Execution"
ENGINE[Workflow Engine<br/>DAG Resolver]
RUNNER[Isolated Task<br/>Runner]
DOCKER[Docker<br/>Containers]
end
subgraph "Data Layer"
DUCKDB[(DuckDB<br/>Analytics)]
S3[(S3<br/>Storage)]
REDIS[(Redis<br/>Queue)]
end
subgraph "Monitoring"
LINEAGE[Data Lineage<br/>Tracker]
INCR[Incremental<br/>Processor]
METRICS[CloudWatch<br/>Metrics)]
end
YAML --> ENGINE
CLI --> ENGINE
ENGINE --> RUNNER
RUNNER --> DOCKER
DOCKER --> DUCKDB
DOCKER --> S3
ENGINE --> REDIS
RUNNER --> LINEAGE
RUNNER --> INCR
RUNNER --> METRICS
Key Innovation: Per-Task Isolation
Each task runs in its own Docker container with custom Python version and dependencies
Unlike traditional workflow engines, each task gets:
nodes:
- id: "process-data"
type: "duckdb_sql"
params:
sql: "SELECT * FROM data"
environment:
python_version: "3.11" # Task-specific Python
requirements:
- "duckdb==0.9.2" # Task-specific deps
- "pandas==2.1.4"
system_packages:
- "build-essential"
memory_limit: "2Gi"
cpu_limit: "1.0"
timeout_seconds: 3600
Benefits:
- No dependency conflicts between tasks
- Independent scaling per task type
- Version flexibility (Python 3.9-3.12)
- Resource optimization (right-size each task)
DRY Task Development
Inherit from BaseTask for consistent interface and common functionality
All tasks inherit from a common base:
from task_runner import BaseTask, TaskResult
class MyCustomTask(BaseTask):
"""
Base class provides:
- Configuration loading
- Parameter validation
- Logging with context
- Output saving
- Error handling
"""
def get_required_params(self) -> list:
"""Define required parameters"""
return ["input_file", "output_file"]
def execute(self) -> TaskResult:
"""Implement task logic"""
try:
input_file = self.get_param("input_file")
output_file = self.get_param("output_file")
# Your task logic here
result_data = self.process(input_file)
self.save_output(output_file, result_data)
return TaskResult(
success=True,
message="Task completed successfully",
data={"processed": True}
)
except Exception as e:
return TaskResult(
success=False,
message=f"Task failed: {e}",
error=str(e)
)
Data Lineage Tracking
Automatic tracking of data dependencies and transformations
Track data flow through workflows:
from task_runner import DataLineageTracker
tracker = DataLineageTracker(redis_client)
# Automatically records:
# - Data reads (which files/tables consumed)
# - Data transforms (source → target mappings)
# - Task dependencies (execution order)
# - Temporal lineage (when data was created)
# Query lineage
lineage = await tracker.get_asset_lineage(
asset_id="s3://bucket/processed/data.parquet",
direction="both" # upstream + downstream
)
# Result:
{
"asset": "s3://bucket/processed/data.parquet",
"upstream": [
{"asset": "s3://bucket/raw/data.json", "task": "download"},
{"asset": "s3://bucket/raw/data2.json", "task": "download"}
],
"downstream": [
{"asset": "s3://bucket/reports/summary.csv", "task": "analyze"}
],
"transformations": [
{
"task": "convert",
"operation": "json_to_parquet",
"timestamp": "2025-01-15T10:30:00Z"
}
]
}
Incremental Processing
Skip tasks when data hasn't changed, based on S3 ETags and checksums
Intelligent skip logic:
class IncrementalProcessor:
"""
Determines when tasks can be safely skipped
"""
async def should_skip_task(self, task_spec, context) -> bool:
"""
Skip if:
1. Data hasn't changed (S3 ETag match)
2. Parameters identical (checksum match)
3. Dependencies unchanged (upstream checksums)
4. Within max_age_seconds window
"""
# Check data staleness
if task_spec.incremental.max_age_seconds:
last_run = await self.get_last_run(task_spec.id)
if time.time() - last_run < task_spec.incremental.max_age_seconds:
# Check if source data changed
current_etag = await self.get_s3_etag(task_spec.source)
last_etag = await self.get_cached_etag(task_spec.id)
if current_etag == last_etag:
self.logger.info(f"Skipping {task_spec.id}: data unchanged")
return True
# Check parameter changes
param_checksum = self.calculate_checksum(task_spec.params)
last_checksum = await self.get_cached_checksum(task_spec.id)
if param_checksum != last_checksum:
return False # Parameters changed, must run
# Check dependency changes
if task_spec.incremental.dependency_check:
for dep in task_spec.dependencies:
dep_changed = await self.dependency_changed(dep)
if dep_changed:
return False
return True # Safe to skip
Workflow Example
DAG visualization showing parallel execution and dependencies
Complete data pipeline in YAML:
name: "daily-data-pipeline"
schedule:
cron: "0 2 * * *" # 2 AM daily
nodes:
- id: "download-data"
type: "download_to_s3"
params:
url: "https://api.example.com/data"
s3_uri: "s3://my-bucket/raw/data.json"
environment:
python_version: "3.9"
requirements: ["requests==2.31.0"]
memory_limit: "512Mi"
- id: "convert-to-parquet"
type: "s3_json_to_parquet_duckdb"
params:
src_glob: "s3://my-bucket/raw/*.json"
dst_uri: "s3://my-bucket/processed/data.parquet"
environment:
python_version: "3.11"
requirements: ["duckdb==0.9.2", "pandas==2.1.4"]
memory_limit: "2Gi"
incremental:
enabled: true
strategy: "hybrid"
max_age_seconds: 3600
checksum_fields: ["date", "source"]
dependency_check: true
needs: ["download-data"]
- id: "run-analysis"
type: "duckdb_sql"
params:
sql: |
SELECT
date,
COUNT(*) as record_count,
AVG(value) as avg_value
FROM read_parquet('s3://my-bucket/processed/data.parquet')
GROUP BY date
ORDER BY date
output_uri: "s3://my-bucket/reports/analysis.csv"
environment:
python_version: "3.12"
requirements: ["duckdb==0.9.2"]
memory_limit: "1Gi"
needs: ["convert-to-parquet"]
Available Task Types
DuckDB Operations
duckdb_sql- Execute SQL with S3 supports3_json_to_parquet_duckdb- Convert JSON/CSV to Parquetduckdb_aggregate- Pre-built aggregation tasks
S3 Operations
download_to_s3- Download URLs to S3s3_extract_archive- Extract compressed archivesgit_clone_to_s3- Clone repos to S3s3_sync- Sync between buckets
Infrastructure
ecs_run_task- Execute ECS Fargate taskslambda_invoke- Invoke Lambda functions
Scheduling System
Workflow scheduling with cron expressions and manual triggers
from task_runner import WorkflowScheduler, ScheduleConfig
scheduler = WorkflowScheduler(redis_client)
# Cron-based schedule
schedule = ScheduleConfig(
id="daily-pipeline",
name="Daily Data Pipeline",
workflow=workflow_spec,
schedule_type=ScheduleType.CRON,
cron_expression="0 2 * * *", # 2 AM daily
timezone="UTC"
)
await scheduler.add_schedule(schedule)
# Interval-based schedule
schedule = ScheduleConfig(
id="frequent-pipeline",
name="Hourly Updates",
workflow=workflow_spec,
schedule_type=ScheduleType.INTERVAL,
interval_seconds=3600 # Every hour
)
await scheduler.add_schedule(schedule)
Error Handling & Retries
Automatic retries with exponential backoff and dead letter queue
@broker.task(
retry_on_error=True,
max_retries=3,
retry_backoff=True, # Exponential backoff
acks_late=True # Acknowledge after completion
)
async def resilient_task(task_spec: TaskSpec):
"""
Automatic error handling:
- Retry with backoff (1s, 2s, 4s)
- Dead letter queue after max retries
- Error context preservation
- Alerting on failures
"""
try:
return await execute_task(task_spec)
except TemporaryError as e:
# Will be retried
raise
except PermanentError as e:
# Sent to DLQ immediately
await send_to_dlq(task_spec, e)
raise
Performance Metrics
Real-time metrics showing throughput, latency, and resource usage
Metric | Value | Notes
--------------------------|---------------|---------------------------
Tasks/second | 100+ | With auto-scaling
Task startup time | 2-5 seconds | Container initialization
DuckDB query perf | 1GB/s | S3 read throughput
Incremental skip rate | 40-60% | Typical data pipelines
Concurrent workflows | 50+ | Per worker pool
End-to-end latency | 30s - 5min | Depends on pipeline
Deployment Options
Docker Compose (Development)
docker compose up -d
task-runner workflow run --file pipeline.yaml
AWS ECS (Production)
# Auto-scaling based on queue depth
cdk deploy TaskRunnerStack
# Deploys:
# - ECS cluster with Fargate
# - Application Load Balancer
# - CloudWatch dashboards
# - Auto-scaling policies
Kubernetes (Enterprise)
helm install task-runner ./charts/task-runner
# Includes:
# - Worker deployments
# - Redis StatefulSet
# - Horizontal Pod Autoscaler
CLI Interface
Interactive CLI for workflow management and monitoring
# Run workflow
task-runner workflow run --tenant mycompany --file pipeline.yaml
# Check status
task-runner workflow status --job-id job-123
# View logs
task-runner workflow logs --job-id job-123 --follow
# Schedule management
task-runner schedule add --name "daily" --cron "0 2 * * *"
task-runner schedule list
task-runner schedule trigger --id daily-123
# Data lineage
task-runner lineage job --job-id job-123
task-runner lineage asset --uri s3://bucket/file.parquet
# Incremental stats
task-runner incremental summary --tenant mycompany
Use Cases
1. ETL Pipelines
Transform data from APIs/databases to analytics-ready formats with incremental processing.
2. ML Feature Engineering
Generate features from raw data with lineage tracking for model debugging.
3. Data Quality Checks
Scheduled validation workflows with alerting on failures.
4. Multi-Tenant Analytics
Isolated workflows per customer with quota management.
Technical Highlights
- Dramatiq Workers - Type-safe async job processing
- DuckDB Analytics - In-process OLAP database with S3 integration
- Docker Isolation - Per-task containers for clean dependencies
- Redis Queue - High-throughput job distribution
- CloudWatch Integration - Comprehensive monitoring and alerting
Scalable data engineering from MacLeod Labs