TinyWorkflow Usage Guide

TinyWorkflow is a lightweight Python workflow orchestration library designed for learning workflow concepts, prototyping, and AI experimentation. This guide covers everything you need to build durable workflows with state persistence, retry logic, and human-in-the-loop capabilities.

Important

TinyWorkflow is designed for learning and lightweight use cases. For production-grade durable workflows, use Temporal, Azure Durable Functions, or DBOS.

Installation

From PyPI (once published)

pip install tinyworkflow

From Source

git clone https://github.com/scionoftech/tinyworkflow
cd tinyworkflow
pip install -e .

With Development Dependencies

pip install -e ".[dev]"

With All Database Drivers

pip install -e ".[all-databases]"

Quick Start

Create your first workflow in 5 minutes:

import asyncio
from tinyworkflow import workflow, activity, WorkflowContext, TinyWorkflowClient

# Define an activity
@activity(name="fetch_data")
async def fetch_data(url: str):
    # Your data fetching logic
    return {"data": "sample data"}

# Define a workflow
@workflow(name="simple_etl")
async def simple_workflow(ctx: WorkflowContext):
    url = ctx.get_input("url")

    # Execute activity
    data = await ctx.execute_activity(fetch_data, url)

    return {"status": "completed", "data": data}

# Run the workflow
async def main():
    async with TinyWorkflowClient() as client:
        run_id = await client.start_workflow(
            "simple_etl",
            input_data={"url": "https://api.example.com"}
        )
        print(f"Workflow started: {run_id}")

asyncio.run(main())

Core Concepts

Workflows

Workflows orchestrate multiple activities and define business logic. They are automatically persisted and can recover from failures.

from tinyworkflow import workflow, WorkflowContext, RetryPolicy

@workflow(
    name="user_onboarding",
    retry_policy=RetryPolicy(max_retries=3),
    timeout=300.0  # 5 minutes
)
async def user_onboarding_workflow(ctx: WorkflowContext):
    user_id = ctx.get_input("user_id")

    # Sequential execution
    user = await ctx.execute_activity(fetch_user, user_id)
    await ctx.execute_activity(send_welcome_email, user)

    return {"status": "completed"}

Activities

Activities are reusable tasks that perform a single unit of work. They support automatic retries and timeouts.

from tinyworkflow import activity, RetryPolicy

@activity(
    name="fetch_user",
    retry_policy=RetryPolicy(
        max_retries=5,
        initial_delay=1.0,
        backoff_multiplier=2.0
    ),
    timeout=30.0
)
async def fetch_user(user_id: str):
    # Activity implementation
    return {"id": user_id, "name": "John Doe"}

Workflow Context

The WorkflowContext provides runtime capabilities for your workflows:

Method Description
get_input(key, default) Retrieve input data by key
execute_activity(func, *args, **kwargs) Execute a single activity with retry logic
execute_parallel(*activities) Execute multiple activities concurrently (fan-out/fan-in)
wait_for_approval(key, timeout) Pause workflow for human approval

Retry Policies

Configure automatic retry behavior with exponential backoff and jitter:

from tinyworkflow import RetryPolicy

retry_policy = RetryPolicy(
    max_retries=5,           # Maximum retry attempts
    initial_delay=1.0,       # First retry after 1 second
    max_delay=60.0,          # Cap delays at 60 seconds
    backoff_multiplier=2.0,  # Exponential: 1s, 2s, 4s, 8s, 16s...
    jitter=True,             # Add randomness to prevent thundering herd
    jitter_factor=0.1        # ±10% randomness
)
Retry Calculation

Delay = min(initial_delay * (multiplier ^ attempt), max_delay) + jitter

Client API

Initialization

The TinyWorkflowClient is the main entry point for workflow operations:

# Context manager (recommended)
async with TinyWorkflowClient() as client:
    # Use client
    pass

# With custom database
async with TinyWorkflowClient(
    database_url="postgresql+asyncpg://user:pass@localhost/tinyworkflow"
) as client:
    pass

# With auto-worker
async with TinyWorkflowClient(auto_start_worker=True) as client:
    pass

Workflow Execution

Start Workflow (Non-blocking)

run_id = await client.start_workflow(
    "workflow_name",
    input_data={"key": "value"},
    workflow_id="optional_group_id"
)
print(f"Started: {run_id}")

Start and Wait for Completion

run_id = await client.start_workflow(
    "workflow_name",
    input_data={"key": "value"},
    wait=True
)
workflow = await client.get_workflow_status(run_id)
print(f"Result: {workflow.output_data}")

Get Workflow Status

workflow = await client.get_workflow_status(run_id)

print(f"Status: {workflow.status}")
print(f"Output: {workflow.output_data}")
print(f"Error: {workflow.error}")
print(f"Retries: {workflow.retry_count}/{workflow.max_retries}")

List Workflow Executions

from tinyworkflow.models import WorkflowStatus

workflows = await client.list_workflow_executions(
    status=WorkflowStatus.COMPLETED,
    workflow_name="my_workflow",
    limit=100,
    offset=0
)

Get Audit Trail

events = await client.get_workflow_events(run_id, limit=100)

for event in events:
    print(f"{event.timestamp}: {event.event_type} - {event.event_data}")

Scheduling

Cron Scheduling

# Run daily at 2 AM
job_id = await client.schedule_workflow(
    "daily_backup",
    cron_expression="0 2 * * *",
    input_data={}
)

# Run every 5 minutes
await client.schedule_workflow("health_check", "*/5 * * * *")

# Run weekly on Monday at 9 AM
await client.schedule_workflow("weekly_report", "0 9 * * 1")
Common Cron Expressions:
  • 0 2 * * * - Every day at 2:00 AM
  • 0 9 * * 1 - Every Monday at 9:00 AM
  • 0 * * * * - Every hour at minute 0
  • */5 * * * * - Every 5 minutes
  • 0 3 1 * * - 1st of month at 3:00 AM

Delayed Execution

# Run after 1 hour
await client.schedule_delayed_workflow(
    "reminder_task",
    delay_seconds=3600,
    input_data={"task_id": "123"}
)

Approvals

Get Pending Approvals

pending = await client.get_pending_approvals()

for workflow in pending:
    print(f"{workflow.run_id}: {workflow.approval_required}")

Approve/Reject Workflow

# Approve
await client.approve_workflow(run_id)

# Reject
await client.reject_workflow(run_id)

CLI Commands

Server & Worker

Start Web UI Server

# Basic (includes worker)
tinyworkflow server --import-workflows examples.workflows

# With custom port and database
tinyworkflow server \
  --import-workflows myproject.workflows \
  --port 8080 \
  --db "postgresql+asyncpg://user:pass@localhost/tinyworkflow"
Critical: Workflow Registration

Always use --import-workflows to register workflows. Must run from project root directory.

Start Background Worker

tinyworkflow worker --import-workflows examples.workflows

Workflow Control

List Registered Workflows

tinyworkflow workflows

Start Workflow

tinyworkflow start simple_etl --input '{"url": "https://api.example.com"}'

# Wait for completion
tinyworkflow start my_workflow --input '{"key": "value"}' --wait

Check Status

tinyworkflow status <run_id>

List Executions

# All executions
tinyworkflow list

# Filter by status
tinyworkflow list --status running

# Filter by workflow name
tinyworkflow list --workflow my_workflow --limit 50

Cancel Workflow

tinyworkflow cancel <run_id>

View Audit Trail

tinyworkflow events <run_id> --limit 100

Scheduling

Schedule with Cron

tinyworkflow schedule daily_backup "0 2 * * *" --input '{}'

Approvals

# List pending
tinyworkflow approvals

# Approve
tinyworkflow approve <run_id> --approve

# Reject
tinyworkflow approve <run_id> --reject

Workflow Patterns

Sequential Execution

Execute activities one after another in order:

@workflow(name="etl_pipeline")
async def etl_workflow(ctx: WorkflowContext):
    # Extract
    data = await ctx.execute_activity(fetch_data, url)

    # Transform
    processed = await ctx.execute_activity(process_data, data)

    # Load
    result = await ctx.execute_activity(save_result, processed)

    return {"status": "completed", "result_id": result["id"]}

Use Cases: ETL pipelines, data processing, ordered task chains

Parallel Execution (Fan-out/Fan-in)

Execute multiple activities concurrently for better performance:

@workflow(name="user_report")
async def parallel_workflow(ctx: WorkflowContext):
    user_id = ctx.get_input("user_id")

    # Execute 3 activities in parallel
    user, orders, preferences = await ctx.execute_parallel(
        (fetch_user, (user_id,), {}),
        (fetch_orders, (user_id,), {}),
        (fetch_preferences, (user_id,), {})
    )

    # Continue with results
    report = await ctx.execute_activity(generate_report, user, orders, preferences)

    return {"report_id": report["id"]}
Performance Benefit: 3 activities × 2 seconds each = 2 seconds total (vs 6 seconds sequential)

Use Cases: Multi-source data fetching, independent API calls, batch operations

Approval Workflows (Human-in-the-Loop)

Pause workflow execution for human decision-making:

@workflow(name="expense_approval")
async def approval_workflow(ctx: WorkflowContext):
    amount = ctx.get_input("amount")

    # Create claim
    claim = await ctx.execute_activity(create_expense_claim, amount)

    # High amounts require approval
    if amount > 1000:
        await ctx.execute_activity(notify_manager, claim)

        # Pause and wait for approval
        approved = await ctx.wait_for_approval(
            f"manager_approval_{claim['claim_id']}",
            timeout=300  # 5 minutes
        )

        if not approved:
            return {"status": "rejected"}

    # Process payment
    payment = await ctx.execute_activity(process_payment, claim)

    return {"status": "completed", "payment_id": payment["id"]}

Approve via CLI:

tinyworkflow approve <run_id> --approve
tinyworkflow approve <run_id> --reject

Use Cases: Purchase orders, expense approval, compliance workflows, content moderation

Retry with Exponential Backoff

Automatically retry failed activities with increasing delays:

@activity(
    name="flaky_api",
    retry_policy=RetryPolicy(
        max_retries=5,
        initial_delay=1.0,
        max_delay=10.0,
        backoff_multiplier=2.0,
        jitter=True
    )
)
async def flaky_api_call(endpoint: str):
    # Automatically retried on failure
    response = await http_client.get(endpoint)
    return response.json()

@workflow(name="api_workflow")
async def retry_workflow(ctx: WorkflowContext):
    # Activity automatically retries with exponential backoff
    result = await ctx.execute_activity(flaky_api_call, "/api/data")
    return {"status": "completed", "result": result}
Retry Delays:
  • Attempt 1: 1s (1 × 2⁰)
  • Attempt 2: 2s (1 × 2¹)
  • Attempt 3: 4s (1 × 2²)
  • Attempt 4: 8s (1 × 2³)
  • Attempt 5: 10s (capped at max_delay)

Use Cases: Flaky APIs, transient failures, network retries, rate limiting

AI/ML Workflows

Orchestrate multi-step AI pipelines with retry logic and parallel processing:

@workflow(name="ai_content_pipeline")
async def ai_pipeline(ctx: WorkflowContext):
    prompt = ctx.get_input("prompt")

    # Generate content (with retry for API failures)
    content = await ctx.execute_activity(generate_content, prompt)

    # Parallel analysis: sentiment, moderation, keywords
    sentiment, moderation, keywords = await ctx.execute_parallel(
        (analyze_sentiment, (content["content"],), {}),
        (moderate_content, (content["content"],), {}),
        (extract_keywords, (content["content"],), {})
    )

    # Check moderation
    if moderation["flagged"]:
        return {"status": "rejected", "reason": "content_moderation"}

    # Parallel translations
    translations = await ctx.execute_parallel(
        (translate, (content["content"], "es"), {}),
        (translate, (content["content"], "fr"), {}),
        (translate, (content["content"], "de"), {})
    )

    # Save results
    await ctx.execute_activity(save_results, {
        "content": content,
        "sentiment": sentiment,
        "keywords": keywords,
        "translations": translations
    })

    return {"status": "completed", "content": content["content"]}

Use Cases: Content generation, document processing, sentiment analysis, ML pipelines, batch inference

Configuration

Database Configuration

SQLite (Default)

# Default database
async with TinyWorkflowClient() as client:
    pass

# Custom path
async with TinyWorkflowClient(
    database_url="sqlite+aiosqlite:///custom/path/db.db"
) as client:
    pass

PostgreSQL

async with TinyWorkflowClient(
    database_url="postgresql+asyncpg://user:password@localhost:5432/tinyworkflow"
) as client:
    pass

Setup:

createdb tinyworkflow
# or
psql -c "CREATE DATABASE tinyworkflow;"

MySQL

async with TinyWorkflowClient(
    database_url="mysql+asyncmy://user:password@localhost:3306/tinyworkflow?charset=utf8mb4"
) as client:
    pass

Setup:

mysql -u root -p -e "CREATE DATABASE tinyworkflow CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;"

Environment Variables

export TINYWORKFLOW_DATABASE_URL="postgresql+asyncpg://user:pass@localhost/tinyworkflow"
tinyworkflow server --import-workflows examples.workflows

Connection Pooling

PostgreSQL and MySQL automatically use connection pooling:

Web UI

Start the web interface to manage workflows visually:

# IMPORTANT: Run from project root directory
cd /path/to/tinyworkflow

# Start server with workflow imports
tinyworkflow server --import-workflows examples.workflows --port 8080

Open http://localhost:8080 in your browser.

Features

Requirements
  • Must use --import-workflows to make workflows available
  • Must run from project root directory
  • See Workflow Registration Guide for troubleshooting

Limitations

TinyWorkflow intentionally omits some advanced features to keep the codebase simple (~2000 LOC):

Feature TinyWorkflow Temporal/DBOS
Workflow Replay ❌ Retries from scratch ✅ Resume from failure point
Deterministic Execution ❌ Can use random(), datetime.now() ✅ Enforced determinism
Durable Timers ❌ asyncio.sleep() not persisted ✅ Timers survive crashes
Signal System ❌ No external events ✅ Send signals to running workflows
Saga/Compensation ❌ Manual rollback ✅ Automatic compensation
Workflow Versioning ❌ Code changes may break in-flight workflows ✅ Version management

What TinyWorkflow DOES Provide

Examples

Check the examples/ directory for complete working examples:

Example Description
simple_workflow.py Basic ETL workflow with sequential activities
parallel_workflow.py Parallel activity execution (fan-out/fan-in)
approval_workflow.py Human-in-the-loop approval pattern
retry_workflow.py Retry policies and failure handling
scheduling_workflow.py Cron scheduling and delayed execution
ai_content_pipeline.py AI content generation with analysis and moderation
ai_document_processor.py AI document processing with parallel analysis
database_configuration.py Multi-database configuration examples

Running Examples

# Run directly
python examples/simple_workflow.py
python examples/retry_workflow.py
python examples/ai_content_pipeline.py

# Via web UI
tinyworkflow server --import-workflows examples.workflows
# Open http://localhost:8080 and run from dashboard
Learn by Example

Start with simple_workflow.py, then progress through parallel_workflow.py, retry_workflow.py, and approval_workflow.py to understand core patterns.


TinyWorkflow v0.1.0 | GitHub | MIT License