TinyWorkflow Usage Guide
TinyWorkflow is a lightweight Python workflow orchestration library designed for learning workflow concepts, prototyping, and AI experimentation. This guide covers everything you need to build durable workflows with state persistence, retry logic, and human-in-the-loop capabilities.
TinyWorkflow is designed for learning and lightweight use cases. For production-grade durable workflows, use Temporal, Azure Durable Functions, or DBOS.
Installation
From PyPI (once published)
pip install tinyworkflow
From Source
git clone https://github.com/scionoftech/tinyworkflow
cd tinyworkflow
pip install -e .
With Development Dependencies
pip install -e ".[dev]"
With All Database Drivers
pip install -e ".[all-databases]"
Quick Start
Create your first workflow in 5 minutes:
import asyncio
from tinyworkflow import workflow, activity, WorkflowContext, TinyWorkflowClient
# Define an activity
@activity(name="fetch_data")
async def fetch_data(url: str):
# Your data fetching logic
return {"data": "sample data"}
# Define a workflow
@workflow(name="simple_etl")
async def simple_workflow(ctx: WorkflowContext):
url = ctx.get_input("url")
# Execute activity
data = await ctx.execute_activity(fetch_data, url)
return {"status": "completed", "data": data}
# Run the workflow
async def main():
async with TinyWorkflowClient() as client:
run_id = await client.start_workflow(
"simple_etl",
input_data={"url": "https://api.example.com"}
)
print(f"Workflow started: {run_id}")
asyncio.run(main())
Core Concepts
Workflows
Workflows orchestrate multiple activities and define business logic. They are automatically persisted and can recover from failures.
from tinyworkflow import workflow, WorkflowContext, RetryPolicy
@workflow(
name="user_onboarding",
retry_policy=RetryPolicy(max_retries=3),
timeout=300.0 # 5 minutes
)
async def user_onboarding_workflow(ctx: WorkflowContext):
user_id = ctx.get_input("user_id")
# Sequential execution
user = await ctx.execute_activity(fetch_user, user_id)
await ctx.execute_activity(send_welcome_email, user)
return {"status": "completed"}
Activities
Activities are reusable tasks that perform a single unit of work. They support automatic retries and timeouts.
from tinyworkflow import activity, RetryPolicy
@activity(
name="fetch_user",
retry_policy=RetryPolicy(
max_retries=5,
initial_delay=1.0,
backoff_multiplier=2.0
),
timeout=30.0
)
async def fetch_user(user_id: str):
# Activity implementation
return {"id": user_id, "name": "John Doe"}
Workflow Context
The WorkflowContext provides runtime capabilities for your workflows:
| Method | Description |
|---|---|
get_input(key, default) |
Retrieve input data by key |
execute_activity(func, *args, **kwargs) |
Execute a single activity with retry logic |
execute_parallel(*activities) |
Execute multiple activities concurrently (fan-out/fan-in) |
wait_for_approval(key, timeout) |
Pause workflow for human approval |
Retry Policies
Configure automatic retry behavior with exponential backoff and jitter:
from tinyworkflow import RetryPolicy
retry_policy = RetryPolicy(
max_retries=5, # Maximum retry attempts
initial_delay=1.0, # First retry after 1 second
max_delay=60.0, # Cap delays at 60 seconds
backoff_multiplier=2.0, # Exponential: 1s, 2s, 4s, 8s, 16s...
jitter=True, # Add randomness to prevent thundering herd
jitter_factor=0.1 # ±10% randomness
)
Delay = min(initial_delay * (multiplier ^ attempt), max_delay) + jitter
Client API
Initialization
The TinyWorkflowClient is the main entry point for workflow operations:
# Context manager (recommended)
async with TinyWorkflowClient() as client:
# Use client
pass
# With custom database
async with TinyWorkflowClient(
database_url="postgresql+asyncpg://user:pass@localhost/tinyworkflow"
) as client:
pass
# With auto-worker
async with TinyWorkflowClient(auto_start_worker=True) as client:
pass
Workflow Execution
Start Workflow (Non-blocking)
run_id = await client.start_workflow(
"workflow_name",
input_data={"key": "value"},
workflow_id="optional_group_id"
)
print(f"Started: {run_id}")
Start and Wait for Completion
run_id = await client.start_workflow(
"workflow_name",
input_data={"key": "value"},
wait=True
)
workflow = await client.get_workflow_status(run_id)
print(f"Result: {workflow.output_data}")
Get Workflow Status
workflow = await client.get_workflow_status(run_id)
print(f"Status: {workflow.status}")
print(f"Output: {workflow.output_data}")
print(f"Error: {workflow.error}")
print(f"Retries: {workflow.retry_count}/{workflow.max_retries}")
List Workflow Executions
from tinyworkflow.models import WorkflowStatus
workflows = await client.list_workflow_executions(
status=WorkflowStatus.COMPLETED,
workflow_name="my_workflow",
limit=100,
offset=0
)
Get Audit Trail
events = await client.get_workflow_events(run_id, limit=100)
for event in events:
print(f"{event.timestamp}: {event.event_type} - {event.event_data}")
Scheduling
Cron Scheduling
# Run daily at 2 AM
job_id = await client.schedule_workflow(
"daily_backup",
cron_expression="0 2 * * *",
input_data={}
)
# Run every 5 minutes
await client.schedule_workflow("health_check", "*/5 * * * *")
# Run weekly on Monday at 9 AM
await client.schedule_workflow("weekly_report", "0 9 * * 1")
0 2 * * *- Every day at 2:00 AM0 9 * * 1- Every Monday at 9:00 AM0 * * * *- Every hour at minute 0*/5 * * * *- Every 5 minutes0 3 1 * *- 1st of month at 3:00 AM
Delayed Execution
# Run after 1 hour
await client.schedule_delayed_workflow(
"reminder_task",
delay_seconds=3600,
input_data={"task_id": "123"}
)
Approvals
Get Pending Approvals
pending = await client.get_pending_approvals()
for workflow in pending:
print(f"{workflow.run_id}: {workflow.approval_required}")
Approve/Reject Workflow
# Approve
await client.approve_workflow(run_id)
# Reject
await client.reject_workflow(run_id)
CLI Commands
Server & Worker
Start Web UI Server
# Basic (includes worker)
tinyworkflow server --import-workflows examples.workflows
# With custom port and database
tinyworkflow server \
--import-workflows myproject.workflows \
--port 8080 \
--db "postgresql+asyncpg://user:pass@localhost/tinyworkflow"
Always use --import-workflows to register workflows.
Must run from project root directory.
Start Background Worker
tinyworkflow worker --import-workflows examples.workflows
Workflow Control
List Registered Workflows
tinyworkflow workflows
Start Workflow
tinyworkflow start simple_etl --input '{"url": "https://api.example.com"}'
# Wait for completion
tinyworkflow start my_workflow --input '{"key": "value"}' --wait
Check Status
tinyworkflow status <run_id>
List Executions
# All executions
tinyworkflow list
# Filter by status
tinyworkflow list --status running
# Filter by workflow name
tinyworkflow list --workflow my_workflow --limit 50
Cancel Workflow
tinyworkflow cancel <run_id>
View Audit Trail
tinyworkflow events <run_id> --limit 100
Scheduling
Schedule with Cron
tinyworkflow schedule daily_backup "0 2 * * *" --input '{}'
Approvals
# List pending
tinyworkflow approvals
# Approve
tinyworkflow approve <run_id> --approve
# Reject
tinyworkflow approve <run_id> --reject
Workflow Patterns
Sequential Execution
Execute activities one after another in order:
@workflow(name="etl_pipeline")
async def etl_workflow(ctx: WorkflowContext):
# Extract
data = await ctx.execute_activity(fetch_data, url)
# Transform
processed = await ctx.execute_activity(process_data, data)
# Load
result = await ctx.execute_activity(save_result, processed)
return {"status": "completed", "result_id": result["id"]}
Use Cases: ETL pipelines, data processing, ordered task chains
Parallel Execution (Fan-out/Fan-in)
Execute multiple activities concurrently for better performance:
@workflow(name="user_report")
async def parallel_workflow(ctx: WorkflowContext):
user_id = ctx.get_input("user_id")
# Execute 3 activities in parallel
user, orders, preferences = await ctx.execute_parallel(
(fetch_user, (user_id,), {}),
(fetch_orders, (user_id,), {}),
(fetch_preferences, (user_id,), {})
)
# Continue with results
report = await ctx.execute_activity(generate_report, user, orders, preferences)
return {"report_id": report["id"]}
Use Cases: Multi-source data fetching, independent API calls, batch operations
Approval Workflows (Human-in-the-Loop)
Pause workflow execution for human decision-making:
@workflow(name="expense_approval")
async def approval_workflow(ctx: WorkflowContext):
amount = ctx.get_input("amount")
# Create claim
claim = await ctx.execute_activity(create_expense_claim, amount)
# High amounts require approval
if amount > 1000:
await ctx.execute_activity(notify_manager, claim)
# Pause and wait for approval
approved = await ctx.wait_for_approval(
f"manager_approval_{claim['claim_id']}",
timeout=300 # 5 minutes
)
if not approved:
return {"status": "rejected"}
# Process payment
payment = await ctx.execute_activity(process_payment, claim)
return {"status": "completed", "payment_id": payment["id"]}
Approve via CLI:
tinyworkflow approve <run_id> --approve
tinyworkflow approve <run_id> --reject
Use Cases: Purchase orders, expense approval, compliance workflows, content moderation
Retry with Exponential Backoff
Automatically retry failed activities with increasing delays:
@activity(
name="flaky_api",
retry_policy=RetryPolicy(
max_retries=5,
initial_delay=1.0,
max_delay=10.0,
backoff_multiplier=2.0,
jitter=True
)
)
async def flaky_api_call(endpoint: str):
# Automatically retried on failure
response = await http_client.get(endpoint)
return response.json()
@workflow(name="api_workflow")
async def retry_workflow(ctx: WorkflowContext):
# Activity automatically retries with exponential backoff
result = await ctx.execute_activity(flaky_api_call, "/api/data")
return {"status": "completed", "result": result}
- Attempt 1: 1s (1 × 2⁰)
- Attempt 2: 2s (1 × 2¹)
- Attempt 3: 4s (1 × 2²)
- Attempt 4: 8s (1 × 2³)
- Attempt 5: 10s (capped at max_delay)
Use Cases: Flaky APIs, transient failures, network retries, rate limiting
AI/ML Workflows
Orchestrate multi-step AI pipelines with retry logic and parallel processing:
@workflow(name="ai_content_pipeline")
async def ai_pipeline(ctx: WorkflowContext):
prompt = ctx.get_input("prompt")
# Generate content (with retry for API failures)
content = await ctx.execute_activity(generate_content, prompt)
# Parallel analysis: sentiment, moderation, keywords
sentiment, moderation, keywords = await ctx.execute_parallel(
(analyze_sentiment, (content["content"],), {}),
(moderate_content, (content["content"],), {}),
(extract_keywords, (content["content"],), {})
)
# Check moderation
if moderation["flagged"]:
return {"status": "rejected", "reason": "content_moderation"}
# Parallel translations
translations = await ctx.execute_parallel(
(translate, (content["content"], "es"), {}),
(translate, (content["content"], "fr"), {}),
(translate, (content["content"], "de"), {})
)
# Save results
await ctx.execute_activity(save_results, {
"content": content,
"sentiment": sentiment,
"keywords": keywords,
"translations": translations
})
return {"status": "completed", "content": content["content"]}
Use Cases: Content generation, document processing, sentiment analysis, ML pipelines, batch inference
Configuration
Database Configuration
SQLite (Default)
# Default database
async with TinyWorkflowClient() as client:
pass
# Custom path
async with TinyWorkflowClient(
database_url="sqlite+aiosqlite:///custom/path/db.db"
) as client:
pass
PostgreSQL
async with TinyWorkflowClient(
database_url="postgresql+asyncpg://user:password@localhost:5432/tinyworkflow"
) as client:
pass
Setup:
createdb tinyworkflow
# or
psql -c "CREATE DATABASE tinyworkflow;"
MySQL
async with TinyWorkflowClient(
database_url="mysql+asyncmy://user:password@localhost:3306/tinyworkflow?charset=utf8mb4"
) as client:
pass
Setup:
mysql -u root -p -e "CREATE DATABASE tinyworkflow CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;"
Environment Variables
export TINYWORKFLOW_DATABASE_URL="postgresql+asyncpg://user:pass@localhost/tinyworkflow"
tinyworkflow server --import-workflows examples.workflows
Connection Pooling
PostgreSQL and MySQL automatically use connection pooling:
- Pool Size: 10 connections
- Max Overflow: 20 additional connections
- Pool Pre-Ping: Enabled (verifies connections)
- Pool Recycle: 3600 seconds (1 hour)
Web UI
Start the web interface to manage workflows visually:
# IMPORTANT: Run from project root directory
cd /path/to/tinyworkflow
# Start server with workflow imports
tinyworkflow server --import-workflows examples.workflows --port 8080
Open http://localhost:8080 in your browser.
Features
- Dashboard with workflow statistics
- Start new workflows with custom input
- List and filter workflow executions
- View detailed workflow status and events
- Schedule workflows with cron expressions
- Approve/reject pending workflows
- Browse registered workflows and activities
- Must use
--import-workflowsto make workflows available - Must run from project root directory
- See Workflow Registration Guide for troubleshooting
Limitations
TinyWorkflow intentionally omits some advanced features to keep the codebase simple (~2000 LOC):
| Feature | TinyWorkflow | Temporal/DBOS |
|---|---|---|
| Workflow Replay | ❌ Retries from scratch | ✅ Resume from failure point |
| Deterministic Execution | ❌ Can use random(), datetime.now() | ✅ Enforced determinism |
| Durable Timers | ❌ asyncio.sleep() not persisted | ✅ Timers survive crashes |
| Signal System | ❌ No external events | ✅ Send signals to running workflows |
| Saga/Compensation | ❌ Manual rollback | ✅ Automatic compensation |
| Workflow Versioning | ❌ Code changes may break in-flight workflows | ✅ Version management |
What TinyWorkflow DOES Provide
- ✅ State persistence (SQLite/PostgreSQL/MySQL)
- ✅ Retry policies with exponential backoff
- ✅ Parallel execution (fan-out/fan-in)
- ✅ Event sourcing (audit trail)
- ✅ Human-in-the-loop (basic approvals)
- ✅ Scheduling (cron expressions)
- ✅ Web UI for workflow monitoring
- ✅ Zero infrastructure setup
Examples
Check the examples/ directory for complete working examples:
| Example | Description |
|---|---|
simple_workflow.py |
Basic ETL workflow with sequential activities |
parallel_workflow.py |
Parallel activity execution (fan-out/fan-in) |
approval_workflow.py |
Human-in-the-loop approval pattern |
retry_workflow.py |
Retry policies and failure handling |
scheduling_workflow.py |
Cron scheduling and delayed execution |
ai_content_pipeline.py |
AI content generation with analysis and moderation |
ai_document_processor.py |
AI document processing with parallel analysis |
database_configuration.py |
Multi-database configuration examples |
Running Examples
# Run directly
python examples/simple_workflow.py
python examples/retry_workflow.py
python examples/ai_content_pipeline.py
# Via web UI
tinyworkflow server --import-workflows examples.workflows
# Open http://localhost:8080 and run from dashboard
Start with simple_workflow.py, then progress through
parallel_workflow.py, retry_workflow.py, and
approval_workflow.py to understand core patterns.
TinyWorkflow v0.1.0 | GitHub | MIT License