CapabilityMesh Technical Documentation
Universal capability discovery and negotiation for multi-agent systems
Introduction
CapabilityMesh is the first and only Python package providing universal capability discovery and negotiation across all major agent frameworks including CrewAI, AutoGen, LangGraph, A2A, and custom agents.
What Makes CapabilityMesh Unique?
🔍 Universal Discovery
Discover agents across ANY framework with semantic search and natural language queries.
🤝 Multi-Framework
Works with CrewAI, AutoGen, LangGraph, A2A, and custom agents in the same workflow.
🛡️ Built-in Trust
Automatic trust scoring based on execution success rates with 5 trust levels.
💾 Flexible Storage
InMemory, SQLite with FTS5, or Redis backends for any deployment scenario.
🧠 Semantic Search
Find agents using natural language, not just keyword matching.
⚡ Zero Config
Works out of the box with sensible defaults. 5 lines to working discovery!
Problem: Framework Fragmentation
Today's multi-agent ecosystem is fragmented:
- CrewAI agents can't discover AutoGen agents
- LangGraph workflows can't find A2A services
- No standard way to query "which agents can translate?"
- Manual integration required for every framework pair
- No trust or reputation across frameworks
Solution: CapabilityMesh
CapabilityMesh provides a universal discovery layer that works with ALL frameworks:
- ✅ Register agents from any framework
- ✅ Discover capabilities with natural language
- ✅ Track trust and reliability automatically
- ✅ Store agents in memory, SQLite, or Redis
- ✅ Execute tasks through unified interface
- ✅ A2A protocol compatible
Installation
Core Package
# Basic installation (includes semantic search, trust, in-memory storage)
pip install capabilitymesh
With Optional Dependencies
# SQLite persistence with full-text search
pip install capabilitymesh[sqlite]
# Redis for distributed discovery (see DISCOVERY_ARCHITECTURE.md)
pip install capabilitymesh[redis]
# Framework integrations
pip install capabilitymesh[crewai]
pip install capabilitymesh[autogen]
pip install capabilitymesh[langgraph]
# Everything
pip install capabilitymesh[all]
Requirements
- Python 3.9+
- Core dependencies: pydantic, httpx, cryptography, pyjwt, nest-asyncio
- Optional: aiosqlite, redis, sentence-transformers
Quick Start
Get started with CapabilityMesh in just 5 lines of code:
from capabilitymesh import Mesh
mesh = Mesh() # Zero-config, works immediately!
@mesh.agent(name="translator", capabilities=["translation", "nlp"])
def translate(text: str, target_lang: str = "es") -> str:
return f"[Translated to {target_lang}]: {text}"
# Discover agents with natural language
agents = await mesh.discover("translate text to Spanish")
result = await mesh.execute(agents[0].id, "Hello world!", target_lang="es")
Complete Example
import asyncio
from capabilitymesh import Mesh, TrustLevel
async def main():
# Initialize mesh
mesh = Mesh()
# Register multiple agents
@mesh.agent(name="summarizer", capabilities=["summarization", "nlp"])
async def summarize(text: str) -> str:
return f"Summary: {text[:100]}..."
@mesh.agent(name="translator", capabilities=["translation", "nlp"])
def translate(text: str, target_lang: str = "es") -> str:
return f"[{target_lang}] {text}"
@mesh.agent(name="analyzer", capabilities=["sentiment-analysis", "nlp"])
def analyze_sentiment(text: str) -> dict:
return {"sentiment": "positive", "confidence": 0.95}
# List all agents
agents = await mesh.list_agents()
print(f"Registered agents: {len(agents)}")
# Discover by semantic query
nlp_agents = await mesh.discover("process natural language")
print(f"NLP agents: {[a.name for a in nlp_agents]}")
# Execute tasks
result = await mesh.execute(nlp_agents[0].id, "Long text to summarize...")
print(f"Result: {result}")
# Check trust scores
trust_score = await mesh.trust.get_score(nlp_agents[0].id)
print(f"Trust: {trust_score.level.name}")
if __name__ == "__main__":
asyncio.run(main())
Key Concepts
1. Mesh
The Mesh is the central hub that manages all agents, capabilities, and coordination. It provides a unified interface for registration, discovery, and execution.
2. Capabilities
Capabilities describe what an agent can do. They can be simple strings ("translation") or rich objects with schemas, versioning, and constraints.
3. Discovery
Discovery is the process of finding agents that match a query. CapabilityMesh supports both exact matching and semantic search using natural language.
4. Trust
Trust scores track agent reliability based on execution results. Scores auto-adjust from UNTRUSTED to VERIFIED based on success rates.
5. Storage
Storage backends persist agent registrations. CapabilityMesh supports InMemory (default), SQLite (with FTS5), and Redis.
Architecture Overview
┌─────────────────────────────────────────────┐
│ Mesh │
│ (Central hub for all operations) │
└──────────┬──────────────┬──────────────┬───┘
│ │ │
┌──────▼─────┐ ┌─────▼─────┐ ┌─────▼─────┐
│ Storage │ │ Embedder │ │ Trust │
│ Backend │ │ (Search) │ │ Manager │
└────────────┘ └───────────┘ └───────────┘
│ │ │
┌──────▼──────────────▼──────────────▼───┐
│ Agent Registry │
│ (Framework-agnostic storage) │
└─────────────────────────────────────────┘
Mesh API
The Mesh class is the main entry point for CapabilityMesh.
Constructor
Mesh.__init__()
Mesh(storage=None, embedder=None, trust_manager=None)
Initialize a new Mesh instance.
Storage backend for agent persistence. Defaults to InMemoryStorage().
Embedder for semantic search. Auto-selected if None (uses KeywordEmbedder by default).
Trust manager for tracking reliability. Auto-created if None.
Example:
from capabilitymesh import Mesh
from capabilitymesh.storage import SQLiteStorage
# Default (in-memory)
mesh = Mesh()
# With SQLite persistence
mesh = Mesh(storage=SQLiteStorage("agents.db"))
Registration Methods
mesh.register()
async def register(agent, name=None, capabilities=None, agent_type=None, metadata=None) -> AgentIdentity
Register an agent with the mesh.
Agent to register (function, class, or framework agent)
Agent name (auto-generated if None)
List of capabilities (strings or Capability objects)
Agent type (auto-detected from framework if None)
Additional metadata
Returns: AgentIdentity - The identity of the registered agent
Example:
# Register a function
def my_agent(task: str) -> str:
return f"Processed: {task}"
identity = await mesh.register(
agent=my_agent,
name="my-agent",
capabilities=["processing", "task-execution"]
)
@mesh.agent()
@mesh.agent(name=None, capabilities=None, agent_type=None, metadata=None)
Decorator for registering agent functions. The agent is registered immediately when the decorator is applied (fixed in v1.0.0-alpha.2).
Example:
@mesh.agent(name="translator", capabilities=["translation", "nlp"])
def translate(text: str, target_lang: str = "es") -> str:
return f"[{target_lang}] {text}"
# Agent is immediately discoverable - no need to call the function first!
agents = await mesh.discover("translation") # ✅ Works immediately
# Async functions work too!
@mesh.agent(name="analyzer", capabilities=["analysis"])
async def analyze(data: dict) -> dict:
await asyncio.sleep(0.1) # Async processing
return {"result": "analyzed"}
Discovery Methods
mesh.discover()
async def discover(query, limit=5, filters=None, min_similarity=0.001, min_trust=None) -> List[AgentInfo]
Discover agents matching a query using semantic search.
Search query (natural language or keyword)
Maximum number of results (default: 5)
Additional filters (agent_type, tags, etc.)
Minimum similarity score (0.0 - 1.0, default: 0.001)
Minimum trust level (filters by trust)
Returns: List[AgentInfo] - List of matching agents
Example:
# Natural language query
agents = await mesh.discover("translate text to Spanish")
# With filters
agents = await mesh.discover(
"data processing",
limit=10,
min_trust=TrustLevel.MEDIUM
)
# Keyword match
agents = await mesh.discover("nlp", limit=5)
mesh.list_agents()
async def list_agents() -> List[AgentInfo]
List all registered agents.
Returns: List[AgentInfo] - All registered agents
Example:
agents = await mesh.list_agents()
for agent in agents:
print(f"{agent.name}: {[c.name for c in agent.capabilities]}")
Execution Methods
mesh.execute()
async def execute(agent_id, task, **kwargs) -> Any
Execute a task with an agent. Automatically tracks trust scores.
Agent's unique identifier
Task data to execute
Additional parameters passed to agent
Returns: Result from agent execution
Raises: ExecutionError if execution fails
Example:
agents = await mesh.discover("translation")
if agents:
result = await mesh.execute(
agents[0].id,
"Hello world",
target_lang="fr"
)
print(result) # "[fr] Hello world"
mesh.get_native()
def get_native(agent_id) -> Any
Get the native agent object (function, class, or framework agent).
Example:
native_func = mesh.get_native(agent_id)
if native_func:
result = native_func("direct call") # Call directly
AgentInfo
Information about a registered agent returned from discovery methods.
Attributes
| Attribute | Type | Description |
|---|---|---|
id |
str | Unique agent identifier (UUID) |
name |
str | Human-readable agent name |
agent_type |
AgentType | Agent type (FUNCTION, SERVICE, A2A, etc.) |
capabilities |
List[Capability] | List of agent capabilities |
metadata |
Dict | Additional metadata |
registered_at |
datetime | Registration timestamp |
Example:
agents = await mesh.discover("translation")
for agent in agents:
print(f"ID: {agent.id}")
print(f"Name: {agent.name}")
print(f"Type: {agent.agent_type}")
print(f"Capabilities: {[c.name for c in agent.capabilities]}")
print(f"Registered: {agent.registered_at}")
Capability API
Capabilities define what agents can do. They can be simple strings or rich objects with schemas, versioning, and constraints.
Simple Capabilities
The quickest way is to use string capabilities:
@mesh.agent(capabilities=["translation", "nlp", "language"])
def my_agent(text: str) -> str:
return text
Rich Capabilities
For production systems, use the full Capability class:
Capability.create_simple()
Capability.create_simple(name, description="", tags=[], version="1.0.0")
Create a simple capability with minimal configuration.
Example:
from capabilitymesh import Capability
capability = Capability.create_simple(
name="text-summarization",
description="Summarize long documents into key points",
tags=["nlp", "summarization", "text-processing"],
version="2.1.0"
)
Advanced Capability Features
1. Capability Versioning
from capabilitymesh import Capability, CapabilityVersion
capability = Capability(
name="translate-v2",
version=CapabilityVersion(major=2, minor=1, patch=0),
description="Translation with context awareness"
)
# Check compatibility
v1 = CapabilityVersion(major=1, minor=0, patch=0)
v2 = CapabilityVersion(major=2, minor=0, patch=0)
print(v1.is_compatible(v2)) # False (different major version)
2. Performance Constraints
from capabilitymesh import Capability, CapabilityConstraints
capability = Capability(
name="fast-translation",
constraints=CapabilityConstraints(
max_response_time_ms=100, # Must respond in 100ms
max_cost_per_call=0.001, # Max $0.001 per call
min_availability=0.999, # 99.9% uptime
rate_limit_per_minute=1000 # 1000 calls/minute
)
)
3. Semantic Metadata
from capabilitymesh import Capability, SemanticMetadata
capability = Capability(
name="ml-translation",
semantic=SemanticMetadata(
tags=["nlp", "translation", "ml"],
categories=["Natural Language Processing", "Machine Learning"],
domains=["linguistics", "ai"],
keywords=["translate", "language", "convert"]
)
)
4. Input/Output Schemas
from capabilitymesh import (
Capability,
CapabilityInputOutput,
IOFormat
)
capability = Capability(
name="structured-api",
input_spec=CapabilityInputOutput(
format=IOFormat.JSON,
json_schema={
"type": "object",
"properties": {
"text": {"type": "string"},
"language": {"type": "string"}
},
"required": ["text"]
},
description="Input must be JSON with text and optional language"
),
output_spec=CapabilityInputOutput(
format=IOFormat.JSON,
json_schema={
"type": "object",
"properties": {
"translated": {"type": "string"},
"confidence": {"type": "number"}
}
},
description="Returns translated text with confidence score"
)
)
Identity & Address API
Every agent has a unique identity and one or more network addresses.
AgentIdentity
Unique identifier for an agent with cryptographic capabilities.
| Attribute | Type | Description |
|---|---|---|
id |
str | Unique identifier (UUID) |
name |
str | Human-readable name |
agent_type |
AgentType | Type of agent |
public_key |
Optional[str] | RSA public key (PEM format) |
addresses |
List[AgentAddress] | Network addresses |
AgentAddress
Network address for communicating with an agent.
| Attribute | Type | Description |
|---|---|---|
protocol |
str | Protocol (http, grpc, mqtt, etc.) |
host |
str | Hostname or IP |
port |
int | Port number |
path |
Optional[str] | Path (for HTTP/REST) |
Example:
from capabilitymesh import AgentIdentity, AgentAddress, AgentType
identity = AgentIdentity(
name="my-service",
agent_type=AgentType.SERVICE,
addresses=[
AgentAddress(
protocol="http",
host="api.example.com",
port=8080,
path="/agent/execute"
)
]
)
Storage Backends
CapabilityMesh supports multiple storage backends for different deployment scenarios.
Comparison
| Backend | Persistence | Search | Distribution | Best For |
|---|---|---|---|---|
| InMemory | No | Basic | Single process | Development, testing |
| SQLite | Yes (file) | Full-text (FTS5) | Single instance | Production (single server) |
| Redis | Yes (remote) | Basic | Multi-instance | Distributed systems, cloud |
When to Use Each
- InMemory: Fast development, unit tests, ephemeral agents
- SQLite: Production single-instance, excellent search, file-based
- Redis: Microservices, horizontal scaling, shared registry
InMemory Storage
Default storage backend. Fast, simple, no persistence.
Usage
from capabilitymesh import Mesh, InMemoryStorage
# Automatic (default)
mesh = Mesh()
# Explicit
mesh = Mesh(storage=InMemoryStorage())
Characteristics
- ✅ Zero configuration
- ✅ Fastest performance
- ✅ Perfect for development
- ❌ No persistence (data lost on restart)
- ❌ Single process only
SQLite Storage
File-based persistence with FTS5 full-text search.
Installation
pip install capabilitymesh[sqlite]
Usage
from capabilitymesh import Mesh
from capabilitymesh.storage import SQLiteStorage
# File-based
mesh = Mesh(storage=SQLiteStorage("agents.db"))
# In-memory SQLite (for testing)
mesh = Mesh(storage=SQLiteStorage(":memory:"))
Features
- ✅ File-based persistence
- ✅ FTS5 full-text search
- ✅ SQL queries for complex filters
- ✅ ACID transactions
- ✅ No external dependencies
Full-Text Search
SQLite storage automatically indexes agents for fast full-text search:
# Search by name, description, or capabilities
agents = await mesh.discover("translate natural language processing")
# FTS5 finds matches in:
# - Agent name
# - Agent description
# - All capability names and descriptions
Database Schema
-- Agents table
CREATE TABLE agents (
id TEXT PRIMARY KEY,
name TEXT NOT NULL,
agent_type TEXT NOT NULL,
description TEXT,
registered_at TIMESTAMP,
metadata TEXT -- JSON
);
-- Capabilities table
CREATE TABLE capabilities (
id TEXT PRIMARY KEY,
agent_id TEXT,
name TEXT NOT NULL,
description TEXT,
capability_data TEXT, -- JSON
FOREIGN KEY(agent_id) REFERENCES agents(id)
);
-- Trust scores table
CREATE TABLE trust_scores (
agent_id TEXT PRIMARY KEY,
trust_level INTEGER,
success_count INTEGER,
failure_count INTEGER,
total_executions INTEGER,
last_execution TIMESTAMP,
FOREIGN KEY(agent_id) REFERENCES agents(id)
);
-- FTS5 full-text search index
CREATE VIRTUAL TABLE agents_fts USING fts5(
agent_id UNINDEXED,
name,
description,
capabilities -- concatenated capability names
);
Redis Storage
Distributed storage for multi-instance deployments.
Installation
pip install capabilitymesh[redis]
Usage
from capabilitymesh import Mesh
from capabilitymesh.storage import RedisStorage
# Connect to Redis
mesh = Mesh(storage=RedisStorage(
host="localhost",
port=6379,
db=0,
password=None, # Optional
prefix="capabilitymesh:", # Key prefix
ttl=None # Optional TTL in seconds
))
Features
- ✅ Distributed storage (multiple instances)
- ✅ Horizontal scaling
- ✅ High availability (Redis Cluster)
- ✅ Optional TTL for ephemeral agents
- ✅ Pub/sub for real-time updates
Redis Key Schema
# Agent data
capabilitymesh:agents:{agent_id} -> Hash
# Capabilities
capabilitymesh:capabilities:{cap_id} -> Hash
# Agent ID index
capabilitymesh:agent_ids -> Set
# Trust scores
capabilitymesh:trust:{agent_id} -> Hash
Multi-Instance Setup
# Instance 1
mesh1 = Mesh(storage=RedisStorage(host="redis.example.com"))
await mesh1.register(agent1)
# Instance 2 (sees agent1!)
mesh2 = Mesh(storage=RedisStorage(host="redis.example.com"))
agents = await mesh2.list_agents() # Contains agent1!
redis-server or use Docker: docker run -d -p 6379:6379 redis
- Use A2A adapters for HTTP-based agents
- Implement custom RPC/message queue coordination
- See Discovery Architecture for patterns
Trust Management
CapabilityMesh includes a built-in trust system that automatically tracks agent reliability based on execution results.
How It Works
- Every time you call
mesh.execute(), the result (success/failure) is recorded - Trust scores automatically adjust based on success rate and execution count
- You can filter discovery by minimum trust level
- Manual trust levels can override automatic calculations
Automatic Trust Tracking
from capabilitymesh import Mesh
mesh = Mesh()
@mesh.agent(name="reliable", capabilities=["task"])
def reliable_agent(x):
return x * 2 # Always succeeds
@mesh.agent(name="unstable", capabilities=["task"])
def unstable_agent(x):
if x % 2 == 0:
raise ValueError("Failed!")
return x * 2
# Execute multiple times
for i in range(20):
try:
await mesh.execute(reliable_id, i)
await mesh.execute(unstable_id, i)
except:
pass # unstable_agent fails 50% of the time
# Check trust scores
reliable_score = await mesh.trust.get_score(reliable_id)
print(f"Reliable: {reliable_score.level.name}") # HIGH or VERIFIED
print(f"Success rate: {reliable_score.success_rate:.1%}") # ~100%
unstable_score = await mesh.trust.get_score(unstable_id)
print(f"Unstable: {unstable_score.level.name}") # LOW or MEDIUM
print(f"Success rate: {unstable_score.success_rate:.1%}") # ~50%
Trust-Based Discovery
from capabilitymesh import TrustLevel
# Only discover agents with MEDIUM or higher trust
trusted_agents = await mesh.discover(
"task processing",
min_trust=TrustLevel.MEDIUM
)
# Only get highly trusted agents
verified_agents = await mesh.discover(
"critical operations",
min_trust=TrustLevel.HIGH
)
Trust Levels
CapabilityMesh uses a 5-level trust system:
| Level | Value | Criteria | Description |
|---|---|---|---|
| UNTRUSTED | 0 | Never executed | New agents start here |
| LOW | 1 | < 50% success OR < 5 executions | Unreliable or untested |
| MEDIUM | 2 | 50-80% success, ≥ 5 executions | Moderately reliable |
| HIGH | 3 | 80-95% success, ≥ 10 executions | Very reliable |
| VERIFIED | 4 | > 95% success, ≥ 20 executions OR manual | Extremely reliable or manually verified |
Trust Level Progression
UNTRUSTED (0 executions)
↓
LOW (< 5 executions OR low success rate)
↓
MEDIUM (5+ executions, 50-80% success)
↓
HIGH (10+ executions, 80-95% success)
↓
VERIFIED (20+ executions, > 95% success OR manually set)
Trust API
SimpleTrustManager
Accessible via mesh.trust.
mesh.trust.get_score()
async def get_score(agent_id: str) -> TrustScore
Get trust score for an agent.
Returns: TrustScore object with metrics
score = await mesh.trust.get_score(agent_id)
print(f"Level: {score.level.name}")
print(f"Success rate: {score.success_rate:.1%}")
print(f"Total executions: {score.total_executions}")
print(f"Last execution: {score.last_execution}")
mesh.trust.set_level()
async def set_level(agent_id: str, level: TrustLevel, reason: str = None)
Manually set trust level (overrides auto-calculation).
from capabilitymesh import TrustLevel
# Manually verify an agent
await mesh.trust.set_level(
agent_id,
TrustLevel.VERIFIED,
reason="Passed security audit"
)
# Blacklist an agent
await mesh.trust.set_level(
agent_id,
TrustLevel.UNTRUSTED,
reason="Security vulnerability found"
)
mesh.trust.record_execution()
async def record_execution(agent_id: str, success: bool, duration: float = 0.0)
Record execution result. Automatically called by mesh.execute().
# Usually you don't call this directly, but you can:
await mesh.trust.record_execution(agent_id, success=True, duration=0.123)
mesh.trust.list_trusted()
async def list_trusted(min_level: TrustLevel = TrustLevel.MEDIUM) -> List[TrustScore]
List all agents meeting minimum trust level.
# Get all highly trusted agents
trusted = await mesh.trust.list_trusted(TrustLevel.HIGH)
for score in trusted:
print(f"{score.agent_id}: {score.level.name} ({score.success_rate:.0%})")
TrustScore Object
| Attribute | Type | Description |
|---|---|---|
agent_id |
str | Agent identifier |
level |
TrustLevel | Current trust level |
success_count |
int | Number of successful executions |
failure_count |
int | Number of failed executions |
total_executions |
int | Total execution count |
success_rate |
float | Success rate (0.0 - 1.0) |
last_execution |
datetime | Timestamp of last execution |
manually_set |
bool | Whether level was manually set |
Embeddings & Semantic Search
CapabilityMesh uses embeddings to enable semantic discovery with natural language queries.
Default: KeywordEmbedder
By default, CapabilityMesh uses a TF-IDF based keyword embedder that requires no external dependencies:
from capabilitymesh import Mesh
mesh = Mesh() # Automatically uses KeywordEmbedder
# Natural language queries work!
agents = await mesh.discover("translate text to Spanish")
agents = await mesh.discover("process natural language")
agents = await mesh.discover("analyze sentiment of reviews")
How Semantic Search Works
- When you register an agent, its capabilities are embedded into vectors
- When you search, the query is also embedded
- Cosine similarity finds the best matches
- Results are ranked by similarity score
Custom Embedders
You can provide your own embedder:
from capabilitymesh import Mesh, Embedder
class MyEmbedder(Embedder):
async def embed(self, text: str) -> List[float]:
# Your embedding logic here
# Could use OpenAI, sentence-transformers, etc.
return vector
mesh = Mesh(embedder=MyEmbedder())
Framework Integration
CapabilityMesh works with agents from any framework. Here's how to integrate popular frameworks:
CrewAI
from capabilitymesh import Mesh
from crewai import Agent as CrewAgent
mesh = Mesh()
# Create CrewAI agent
researcher = CrewAgent(
role="Senior Research Analyst",
goal="Uncover cutting-edge developments in AI",
backstory="Expert researcher with deep domain knowledge",
verbose=True
)
# Register with CapabilityMesh
await mesh.register(
researcher,
name="ai-researcher",
capabilities=["research", "analysis", "information-gathering"]
)
# Discover and use
agents = await mesh.discover("research AI developments")
# agents[0] is the CrewAI researcher!
AutoGen
from capabilitymesh import Mesh
from autogen import AssistantAgent, UserProxyAgent
mesh = Mesh()
# Create AutoGen agents
coder = AssistantAgent(
name="coder",
system_message="You are an expert Python developer",
llm_config={"config_list": [...]}
)
reviewer = AssistantAgent(
name="code_reviewer",
system_message="You review code for quality and security"
)
# Register with CapabilityMesh
await mesh.register(coder, name="python-coder", capabilities=["coding", "python"])
await mesh.register(reviewer, name="code-reviewer", capabilities=["review", "security"])
# Discover across frameworks
coders = await mesh.discover("write python code")
# Returns AutoGen coder!
LangGraph
from capabilitymesh import Mesh
from langgraph.graph import StateGraph
mesh = Mesh()
# Create LangGraph workflow
workflow = StateGraph(...)
# ... define nodes and edges ...
# Register with CapabilityMesh
await mesh.register(
workflow,
name="data-pipeline",
capabilities=["data-processing", "workflow", "etl"]
)
# Discover
pipelines = await mesh.discover("process and transform data")
A2A Protocol
from capabilitymesh import Mesh
mesh = Mesh()
# A2A agents are automatically detected and work seamlessly
# CapabilityMesh can convert any agent to/from A2A format
Custom Agents
from capabilitymesh import Mesh
mesh = Mesh()
# Any callable works!
class MyAgent:
def execute(self, task):
return f"Processed: {task}"
agent = MyAgent()
await mesh.register(agent, name="my-agent", capabilities=["task-processing"])
# Or just functions
@mesh.agent(capabilities=["calculation"])
def calculator(a: int, b: int) -> int:
return a + b
Multi-Agent Workflows
Coordinate multiple agents to accomplish complex tasks.
Sequential Pipeline
from capabilitymesh import Mesh
mesh = Mesh()
# Register pipeline agents
@mesh.agent(capabilities=["extraction"])
async def extract(pdf_path):
return "Extracted text..."
@mesh.agent(capabilities=["summarization"])
async def summarize(text):
return "Summary..."
@mesh.agent(capabilities=["translation"])
async def translate(text):
return "Translated..."
# Execute pipeline
pdf = "document.pdf"
extractors = await mesh.discover("extract text from pdf")
text = await mesh.execute(extractors[0].id, pdf)
summarizers = await mesh.discover("summarize text")
summary = await mesh.execute(summarizers[0].id, text)
translators = await mesh.discover("translate to spanish")
result = await mesh.execute(translators[0].id, summary)
Parallel Processing
import asyncio
from capabilitymesh import Mesh
mesh = Mesh()
# Process multiple items in parallel
items = ["doc1.pdf", "doc2.pdf", "doc3.pdf"]
extractors = await mesh.discover("extract text")
extractor_id = extractors[0].id
# Execute in parallel
results = await asyncio.gather(*[
mesh.execute(extractor_id, item)
for item in items
])
# results = ["text1", "text2", "text3"]
Conditional Routing
from capabilitymesh import Mesh
mesh = Mesh()
async def process_document(doc):
# Analyze document type
analyzers = await mesh.discover("analyze document")
doc_type = await mesh.execute(analyzers[0].id, doc)
# Route to appropriate handler
if doc_type == "pdf":
handlers = await mesh.discover("extract pdf")
elif doc_type == "image":
handlers = await mesh.discover("ocr text extraction")
else:
handlers = await mesh.discover("generic text extraction")
return await mesh.execute(handlers[0].id, doc)
Error Handling with Fallbacks
from capabilitymesh import Mesh, TrustLevel
mesh = Mesh()
async def robust_execution(capability, task):
# Get agents sorted by trust
agents = await mesh.discover(
capability,
min_trust=TrustLevel.MEDIUM
)
# Try agents in order until one succeeds
for agent in agents:
try:
result = await mesh.execute(agent.id, task)
return result
except Exception as e:
print(f"Agent {agent.name} failed: {e}")
continue
raise Exception("All agents failed")
# Usage
result = await robust_execution("translation", "Hello world")
Fan-Out / Fan-In
import asyncio
from capabilitymesh import Mesh
mesh = Mesh()
async def parallel_analysis(text):
# Fan-out: Send to multiple analyzers
sentiment_agents = await mesh.discover("sentiment analysis")
entity_agents = await mesh.discover("entity extraction")
topic_agents = await mesh.discover("topic classification")
# Execute all in parallel
sentiment, entities, topics = await asyncio.gather(
mesh.execute(sentiment_agents[0].id, text),
mesh.execute(entity_agents[0].id, text),
mesh.execute(topic_agents[0].id, text)
)
# Fan-in: Combine results
return {
"sentiment": sentiment,
"entities": entities,
"topics": topics
}
Best Practices
1. Use Descriptive Capability Names
# Good
capabilities=["translation", "english-to-spanish", "nlp", "language-processing"]
# Less effective
capabilities=["translate", "lang"]
2. Add Rich Metadata
from capabilitymesh import Capability, SemanticMetadata
capability = Capability(
name="ml-translation",
description="Neural machine translation with context awareness",
semantic=SemanticMetadata(
tags=["nlp", "translation", "ml", "neural"],
categories=["Natural Language Processing"],
domains=["linguistics", "ai"]
)
)
3. Use Trust Levels in Production
# Always filter by trust in production
agents = await mesh.discover(
"critical financial calculation",
min_trust=TrustLevel.HIGH # Only highly reliable agents
)
4. Choose the Right Storage
# Development
mesh = Mesh() # InMemory (default)
# Production (single instance)
mesh = Mesh(storage=SQLiteStorage("agents.db"))
# Production (distributed)
mesh = Mesh(storage=RedisStorage(host="redis.example.com"))
5. Handle Errors Gracefully
from capabilitymesh.core.exceptions import ExecutionError
try:
result = await mesh.execute(agent_id, task)
except ExecutionError as e:
print(f"Execution failed: {e}")
# Try fallback agent
agents = await mesh.discover(capability)
result = await mesh.execute(agents[1].id, task)
6. Use Async Where Possible
# Preferred (async)
agents = await mesh.discover("query")
result = await mesh.execute(agent_id, task)
# Sync wrappers available but less efficient
# agents = mesh.discover_sync("query")
# result = mesh.execute_sync(agent_id, task)
7. Version Your Capabilities
from capabilitymesh import Capability, CapabilityVersion
# Good: Semantic versioning
capability = Capability(
name="translation-api",
version=CapabilityVersion(major=2, minor=1, patch=0)
)
# Check compatibility
if capability.version.is_compatible(client_version):
# Safe to use
...
8. Monitor Trust Scores
# Regularly check agent health
async def health_check():
agents = await mesh.list_agents()
for agent in agents:
score = await mesh.trust.get_score(agent.id)
if score.level < TrustLevel.MEDIUM:
print(f"⚠️ {agent.name} has low trust: {score.success_rate:.1%}")
if score.total_executions > 100 and score.success_rate < 0.5:
print(f"🚨 {agent.name} is unreliable, consider removing")
Example: Basic Usage
Complete example demonstrating registration, discovery, and execution.
import asyncio
from capabilitymesh import Mesh, Capability
async def main():
# Initialize mesh
mesh = Mesh()
# Method 1: Register with decorator
@mesh.agent(name="summarizer", capabilities=["summarization", "nlp"])
async def summarize(text: str) -> str:
return f"Summary: {text[:100]}..."
# Method 2: Register with Capability objects
capability = Capability.create_simple(
name="translation",
description="Translate text between languages",
tags=["nlp", "language", "translation"]
)
@mesh.agent(name="translator", capabilities=[capability])
def translate(text: str, target_lang: str = "es") -> str:
return f"[{target_lang}] {text}"
# Method 3: Register explicitly
def analyze(text: str) -> dict:
return {"sentiment": "positive", "confidence": 0.95}
await mesh.register(
agent=analyze,
name="sentiment-analyzer",
capabilities=["sentiment-analysis", "nlp"]
)
# List all agents
print("\n=== Registered Agents ===")
agents = await mesh.list_agents()
for agent in agents:
caps = ", ".join([c.name for c in agent.capabilities])
print(f"{agent.name}: [{caps}]")
# Discover by semantic query
print("\n=== Discovery ===")
nlp_agents = await mesh.discover("process natural language")
print(f"NLP agents: {[a.name for a in nlp_agents]}")
# Execute tasks
print("\n=== Execution ===")
text = "CapabilityMesh is a powerful framework for multi-agent systems."
# Summarize
summarizers = await mesh.discover("summarize")
if summarizers:
result = await mesh.execute(summarizers[0].id, text)
print(f"Summary: {result}")
# Translate
translators = await mesh.discover("translate")
if translators:
result = await mesh.execute(translators[0].id, text, target_lang="fr")
print(f"Translation: {result}")
# Analyze sentiment
analyzers = await mesh.discover("sentiment")
if analyzers:
result = await mesh.execute(analyzers[0].id, text)
print(f"Sentiment: {result}")
if __name__ == "__main__":
asyncio.run(main())
Example: Storage Backends
import asyncio
from capabilitymesh import Mesh
from capabilitymesh.storage import InMemoryStorage, SQLiteStorage
async def demo_storage():
print("=== InMemory Storage ===")
mesh1 = Mesh(storage=InMemoryStorage())
@mesh1.agent(capabilities=["task"])
def agent1(x):
return x * 2
agents = await mesh1.list_agents()
print(f"Agents: {len(agents)}")
print("\n=== SQLite Storage ===")
mesh2 = Mesh(storage=SQLiteStorage("demo.db"))
@mesh2.agent(capabilities=["task"])
def agent2(x):
return x * 3
# Persists to disk!
agents = await mesh2.list_agents()
print(f"Agents: {len(agents)}")
# Full-text search
results = await mesh2.discover("task processing agent")
print(f"Found: {[a.name for a in results]}")
asyncio.run(demo_storage())
Example: Trust Management
import asyncio
import random
from capabilitymesh import Mesh, TrustLevel
async def demo_trust():
mesh = Mesh()
@mesh.agent(name="reliable", capabilities=["task"])
def reliable_agent(x):
return x * 2 # Always works
@mesh.agent(name="unstable", capabilities=["task"])
def unstable_agent(x):
if random.random() < 0.5:
raise ValueError("Random failure")
return x * 2
# Get agent IDs
agents = await mesh.list_agents()
reliable_id = next(a.id for a in agents if a.name == "reliable")
unstable_id = next(a.id for a in agents if a.name == "unstable")
# Execute multiple times
print("Building trust history...")
for i in range(20):
try:
await mesh.execute(reliable_id, i)
except:
pass
try:
await mesh.execute(unstable_id, i)
except:
pass
# Check trust scores
print("\n=== Trust Scores ===")
reliable_score = await mesh.trust.get_score(reliable_id)
print(f"Reliable agent:")
print(f" Level: {reliable_score.level.name}")
print(f" Success rate: {reliable_score.success_rate:.1%}")
print(f" Executions: {reliable_score.total_executions}")
unstable_score = await mesh.trust.get_score(unstable_id)
print(f"\nUnstable agent:")
print(f" Level: {unstable_score.level.name}")
print(f" Success rate: {unstable_score.success_rate:.1%}")
print(f" Executions: {unstable_score.total_executions}")
# Trust-based discovery
print("\n=== Trust-Based Discovery ===")
trusted = await mesh.discover("task", min_trust=TrustLevel.MEDIUM)
print(f"Trusted agents (MEDIUM+): {[a.name for a in trusted]}")
asyncio.run(demo_trust())
Example: Semantic Search
import asyncio
from capabilitymesh import Mesh, Capability
async def demo_semantic_search():
mesh = Mesh()
# Register agents with rich descriptions
@mesh.agent(
name="summarizer",
capabilities=[
Capability.create_simple(
"text-summarization",
"Extract key points and create concise summaries of long documents",
tags=["nlp", "summarization", "text-processing", "extraction"]
)
]
)
def summarize(text):
return text[:100]
@mesh.agent(
name="translator",
capabilities=[
Capability.create_simple(
"translation",
"Convert text from one language to another with context awareness",
tags=["nlp", "translation", "language", "localization"]
)
]
)
def translate(text, lang="es"):
return f"[{lang}] {text}"
@mesh.agent(
name="sentiment-analyzer",
capabilities=[
Capability.create_simple(
"sentiment-analysis",
"Analyze emotional tone and sentiment of text reviews and feedback",
tags=["nlp", "sentiment", "emotion", "analysis"]
)
]
)
def analyze_sentiment(text):
return {"sentiment": "positive"}
# Semantic queries
print("=== Semantic Search Examples ===\n")
queries = [
"I need to make this document shorter",
"Convert text to Spanish",
"What is the emotional tone of customer reviews?",
"Process natural language",
]
for query in queries:
print(f"Query: '{query}'")
agents = await mesh.discover(query, limit=2)
print(f"Found: {[a.name for a in agents]}\n")
asyncio.run(demo_semantic_search())
Troubleshooting
Common Issues
1. ModuleNotFoundError
ModuleNotFoundError: No module named 'capabilitymesh'
Solution:
pip install capabilitymesh
2. Redis Connection Error
Redis connection failed: Error connecting to localhost:6379
Solution:
# Start Redis server
redis-server
# Or use Docker
docker run -d -p 6379:6379 redis
3. SQLite FTS5 Not Available
SQLite FTS5 extension not available
Solution:
pip install pysqlite3-binary # Includes FTS5
4. Async Execution Errors
RuntimeError: This event loop is already running
Solution:
# Use asyncio.run() for top-level
asyncio.run(main())
# Or use nest_asyncio in Jupyter
import nest_asyncio
nest_asyncio.apply()
5. No Agents Found
agents = await mesh.discover("query")
# agents is empty []
Solution:
- Check that agents are registered before discovery
- Lower
min_similaritythreshold - Use more specific capability names
- Try exact keyword match instead of semantic
6. Trust Scores Not Updating
Solution: Trust scores update after each mesh.execute() call. Ensure you're using mesh.execute() and not calling agents directly.
FAQ
General Questions
Q: Do I need to use a specific framework?
A: No! CapabilityMesh works with ANY framework (CrewAI, AutoGen, LangGraph, A2A) and plain Python functions.
Q: Is CapabilityMesh production-ready?
A: v1.0.0-alpha.2 is feature-complete with 139 passing tests. It's ready for production use, but expect some refinement based on community feedback.
Q: What's the performance impact?
A: Minimal. InMemory discovery is < 1ms. SQLite is < 10ms. Redis depends on network latency.
Q: Can I use multiple storage backends?
A: Not in the same Mesh instance, but you can create multiple Mesh instances with different storage backends.
Technical Questions
Q: How does semantic search work without an LLM?
A: By default, CapabilityMesh uses TF-IDF based keyword embeddings. For better results, you can plugin OpenAI or sentence-transformers embedders (coming in v1.0-beta).
Q: Are trust scores persistent?
A: Yes! Trust scores are stored in the storage backend (SQLite or Redis). They persist across restarts.
Q: Can I disable trust tracking?
A: Not currently, but you can ignore trust scores in discovery by not setting min_trust.
Q: How do I migrate between storage backends?
A: You'll need to re-register agents with the new storage backend. We're working on a migration tool for v1.1.
Framework Integration
Q: Does CapabilityMesh execute framework agents directly?
A: It depends. For Python functions, yes. For framework agents, you may need to call the framework's execution API. mesh.get_native() gives you the original agent.
Q: Can I mix agents from different frameworks?
A: Yes! That's the whole point. Register agents from CrewAI, AutoGen, and LangGraph, then discover and coordinate them together.
Q: Is A2A protocol required?
A: No. A2A compatibility is built-in, but it's not required. CapabilityMesh works with or without A2A.
Changelog
v1.0.0-alpha.2 (2025-12-30)
Decorator Fix Release
Core Features
- ✅ Mesh API for agent registration and discovery
- ✅ Multi-framework support (CrewAI, AutoGen, LangGraph, A2A)
- ✅ Fixed @mesh.agent() decorator - Immediate registration, no wrapper overhead
- ✅ Semantic search with keyword embeddings
- ✅ Three storage backends (InMemory, SQLite, Redis)
- ✅ SimpleTrustManager with 5-level trust system
- ✅ Rich capability schemas with versioning
- ✅ A2A protocol compatibility
Storage
- ✅ InMemoryStorage (default, zero-config)
- ✅ SQLiteStorage with FTS5 full-text search
- ✅ RedisStorage for distributed deployments
Trust System
- ✅ Automatic trust scoring based on execution results
- ✅ 5 trust levels (UNTRUSTED → VERIFIED)
- ✅ Manual trust level overrides
- ✅ Trust-based discovery filtering
- ✅ Trust statistics and reporting
Testing
- ✅ 139 Tests passing (100%)
- ✅ Comprehensive unit tests
- ✅ Integration tests for all features
- ✅ Framework integration tests
Documentation
- ✅ Complete README with examples
- ✅ 6 comprehensive examples
- ✅ EXAMPLES_GUIDE.md
- ✅ DISCOVERY_ARCHITECTURE.md - Deployment patterns guide
- ✅ DECORATOR_FIX_SUMMARY.md - Technical details of decorator improvements
- ✅ CHANGELOG.md - Complete version history
- ✅ API documentation (this file)
- ✅ CONTRIBUTING.md
Planned Releases
v1.0.0-beta.1 (Coming Soon)
- LocalEmbedder (sentence-transformers)
- OpenAIEmbedder for better semantic search
- Additional storage backends
- Performance optimizations
- Community feedback incorporation
v1.0.0 (Stable)
- Production hardening
- Documentation site
- Tutorial videos
- Case studies
v1.1.0
- P2P discovery (mDNS, Gossip, DHT)
- Advanced negotiation protocols
- CLI tools
- Migration utilities
CapabilityMesh v1.0.0-alpha.2
The first and only universal capability mesh for multi-agent systems
GitHub • PyPI • Issues • Discussions
Built with ❤️ by Sai Kumar Yava