CapabilityMesh Technical Documentation

Universal capability discovery and negotiation for multi-agent systems

v1.0.0-alpha.2 139 Tests Passing Production Ready

Introduction

CapabilityMesh is the first and only Python package providing universal capability discovery and negotiation across all major agent frameworks including CrewAI, AutoGen, LangGraph, A2A, and custom agents.

🎯 Core Value: Build multi-agent systems where agents from different frameworks can discover and collaborate with each other seamlessly.

What Makes CapabilityMesh Unique?

🔍 Universal Discovery

Discover agents across ANY framework with semantic search and natural language queries.

🤝 Multi-Framework

Works with CrewAI, AutoGen, LangGraph, A2A, and custom agents in the same workflow.

🛡️ Built-in Trust

Automatic trust scoring based on execution success rates with 5 trust levels.

💾 Flexible Storage

InMemory, SQLite with FTS5, or Redis backends for any deployment scenario.

🧠 Semantic Search

Find agents using natural language, not just keyword matching.

⚡ Zero Config

Works out of the box with sensible defaults. 5 lines to working discovery!

Problem: Framework Fragmentation

Today's multi-agent ecosystem is fragmented:

  • CrewAI agents can't discover AutoGen agents
  • LangGraph workflows can't find A2A services
  • No standard way to query "which agents can translate?"
  • Manual integration required for every framework pair
  • No trust or reputation across frameworks

Solution: CapabilityMesh

CapabilityMesh provides a universal discovery layer that works with ALL frameworks:

  • ✅ Register agents from any framework
  • ✅ Discover capabilities with natural language
  • ✅ Track trust and reliability automatically
  • ✅ Store agents in memory, SQLite, or Redis
  • ✅ Execute tasks through unified interface
  • ✅ A2A protocol compatible

Installation

Core Package

# Basic installation (includes semantic search, trust, in-memory storage)
pip install capabilitymesh

With Optional Dependencies

# SQLite persistence with full-text search
pip install capabilitymesh[sqlite]

# Redis for distributed discovery (see DISCOVERY_ARCHITECTURE.md)
pip install capabilitymesh[redis]

# Framework integrations
pip install capabilitymesh[crewai]
pip install capabilitymesh[autogen]
pip install capabilitymesh[langgraph]

# Everything
pip install capabilitymesh[all]

Requirements

  • Python 3.9+
  • Core dependencies: pydantic, httpx, cryptography, pyjwt, nest-asyncio
  • Optional: aiosqlite, redis, sentence-transformers
✅ Verified: Tested on Python 3.9, 3.10, 3.11, and 3.12. All 139 Tests passing on Windows, macOS, and Linux.

Quick Start

Get started with CapabilityMesh in just 5 lines of code:

from capabilitymesh import Mesh

mesh = Mesh()  # Zero-config, works immediately!

@mesh.agent(name="translator", capabilities=["translation", "nlp"])
def translate(text: str, target_lang: str = "es") -> str:
    return f"[Translated to {target_lang}]: {text}"

# Discover agents with natural language
agents = await mesh.discover("translate text to Spanish")
result = await mesh.execute(agents[0].id, "Hello world!", target_lang="es")
💡 That's it! No configuration, no setup, no complexity. Just register and discover.

Complete Example

import asyncio
from capabilitymesh import Mesh, TrustLevel

async def main():
    # Initialize mesh
    mesh = Mesh()

    # Register multiple agents
    @mesh.agent(name="summarizer", capabilities=["summarization", "nlp"])
    async def summarize(text: str) -> str:
        return f"Summary: {text[:100]}..."

    @mesh.agent(name="translator", capabilities=["translation", "nlp"])
    def translate(text: str, target_lang: str = "es") -> str:
        return f"[{target_lang}] {text}"

    @mesh.agent(name="analyzer", capabilities=["sentiment-analysis", "nlp"])
    def analyze_sentiment(text: str) -> dict:
        return {"sentiment": "positive", "confidence": 0.95}

    # List all agents
    agents = await mesh.list_agents()
    print(f"Registered agents: {len(agents)}")

    # Discover by semantic query
    nlp_agents = await mesh.discover("process natural language")
    print(f"NLP agents: {[a.name for a in nlp_agents]}")

    # Execute tasks
    result = await mesh.execute(nlp_agents[0].id, "Long text to summarize...")
    print(f"Result: {result}")

    # Check trust scores
    trust_score = await mesh.trust.get_score(nlp_agents[0].id)
    print(f"Trust: {trust_score.level.name}")

if __name__ == "__main__":
    asyncio.run(main())
💡 Deployment Note: This example runs in a single process with all agents in memory. For distributed systems (microservices, multi-process deployments), see Discovery Architecture for patterns using Redis storage and A2A adapters.

Key Concepts

1. Mesh

The Mesh is the central hub that manages all agents, capabilities, and coordination. It provides a unified interface for registration, discovery, and execution.

2. Capabilities

Capabilities describe what an agent can do. They can be simple strings ("translation") or rich objects with schemas, versioning, and constraints.

3. Discovery

Discovery is the process of finding agents that match a query. CapabilityMesh supports both exact matching and semantic search using natural language.

4. Trust

Trust scores track agent reliability based on execution results. Scores auto-adjust from UNTRUSTED to VERIFIED based on success rates.

5. Storage

Storage backends persist agent registrations. CapabilityMesh supports InMemory (default), SQLite (with FTS5), and Redis.

Architecture Overview

┌─────────────────────────────────────────────┐
│                    Mesh                     │
│  (Central hub for all operations)          │
└──────────┬──────────────┬──────────────┬───┘
           │              │              │
    ┌──────▼─────┐ ┌─────▼─────┐ ┌─────▼─────┐
    │  Storage   │ │ Embedder  │ │   Trust   │
    │  Backend   │ │ (Search)  │ │  Manager  │
    └────────────┘ └───────────┘ └───────────┘
           │              │              │
    ┌──────▼──────────────▼──────────────▼───┐
    │         Agent Registry                  │
    │  (Framework-agnostic storage)           │
    └─────────────────────────────────────────┘

Mesh API

The Mesh class is the main entry point for CapabilityMesh.

Constructor

Mesh.__init__()

Mesh(storage=None, embedder=None, trust_manager=None)

Initialize a new Mesh instance.

storage Optional[Storage]

Storage backend for agent persistence. Defaults to InMemoryStorage().

embedder Optional[Embedder]

Embedder for semantic search. Auto-selected if None (uses KeywordEmbedder by default).

trust_manager Optional[SimpleTrustManager]

Trust manager for tracking reliability. Auto-created if None.

Example:

from capabilitymesh import Mesh
from capabilitymesh.storage import SQLiteStorage

# Default (in-memory)
mesh = Mesh()

# With SQLite persistence
mesh = Mesh(storage=SQLiteStorage("agents.db"))

Registration Methods

mesh.register()

async def register(agent, name=None, capabilities=None, agent_type=None, metadata=None) -> AgentIdentity

Register an agent with the mesh.

agent Any

Agent to register (function, class, or framework agent)

name Optional[str]

Agent name (auto-generated if None)

capabilities Optional[List[str | Capability]]

List of capabilities (strings or Capability objects)

agent_type Optional[AgentType]

Agent type (auto-detected from framework if None)

metadata Optional[Dict]

Additional metadata

Returns: AgentIdentity - The identity of the registered agent

Example:

# Register a function
def my_agent(task: str) -> str:
    return f"Processed: {task}"

identity = await mesh.register(
    agent=my_agent,
    name="my-agent",
    capabilities=["processing", "task-execution"]
)

@mesh.agent()

@mesh.agent(name=None, capabilities=None, agent_type=None, metadata=None)

Decorator for registering agent functions. The agent is registered immediately when the decorator is applied (fixed in v1.0.0-alpha.2).

✅ Fixed in v1.0.0-alpha.2: Decorator now registers agents immediately with no wrapper overhead. Works perfectly with both sync and async functions.

Example:

@mesh.agent(name="translator", capabilities=["translation", "nlp"])
def translate(text: str, target_lang: str = "es") -> str:
    return f"[{target_lang}] {text}"

# Agent is immediately discoverable - no need to call the function first!
agents = await mesh.discover("translation")  # ✅ Works immediately

# Async functions work too!
@mesh.agent(name="analyzer", capabilities=["analysis"])
async def analyze(data: dict) -> dict:
    await asyncio.sleep(0.1)  # Async processing
    return {"result": "analyzed"}

Discovery Methods

mesh.discover()

async def discover(query, limit=5, filters=None, min_similarity=0.001, min_trust=None) -> List[AgentInfo]

Discover agents matching a query using semantic search.

query str

Search query (natural language or keyword)

limit int

Maximum number of results (default: 5)

filters Optional[Dict]

Additional filters (agent_type, tags, etc.)

min_similarity float

Minimum similarity score (0.0 - 1.0, default: 0.001)

min_trust Optional[TrustLevel]

Minimum trust level (filters by trust)

Returns: List[AgentInfo] - List of matching agents

Example:

# Natural language query
agents = await mesh.discover("translate text to Spanish")

# With filters
agents = await mesh.discover(
    "data processing",
    limit=10,
    min_trust=TrustLevel.MEDIUM
)

# Keyword match
agents = await mesh.discover("nlp", limit=5)

mesh.list_agents()

async def list_agents() -> List[AgentInfo]

List all registered agents.

Returns: List[AgentInfo] - All registered agents

Example:

agents = await mesh.list_agents()
for agent in agents:
    print(f"{agent.name}: {[c.name for c in agent.capabilities]}")

Execution Methods

mesh.execute()

async def execute(agent_id, task, **kwargs) -> Any

Execute a task with an agent. Automatically tracks trust scores.

agent_id str

Agent's unique identifier

task Any

Task data to execute

**kwargs Any

Additional parameters passed to agent

Returns: Result from agent execution

Raises: ExecutionError if execution fails

Example:

agents = await mesh.discover("translation")
if agents:
    result = await mesh.execute(
        agents[0].id,
        "Hello world",
        target_lang="fr"
    )
    print(result)  # "[fr] Hello world"

mesh.get_native()

def get_native(agent_id) -> Any

Get the native agent object (function, class, or framework agent).

Example:

native_func = mesh.get_native(agent_id)
if native_func:
    result = native_func("direct call")  # Call directly

AgentInfo

Information about a registered agent returned from discovery methods.

Attributes

Attribute Type Description
id str Unique agent identifier (UUID)
name str Human-readable agent name
agent_type AgentType Agent type (FUNCTION, SERVICE, A2A, etc.)
capabilities List[Capability] List of agent capabilities
metadata Dict Additional metadata
registered_at datetime Registration timestamp

Example:

agents = await mesh.discover("translation")
for agent in agents:
    print(f"ID: {agent.id}")
    print(f"Name: {agent.name}")
    print(f"Type: {agent.agent_type}")
    print(f"Capabilities: {[c.name for c in agent.capabilities]}")
    print(f"Registered: {agent.registered_at}")

Capability API

Capabilities define what agents can do. They can be simple strings or rich objects with schemas, versioning, and constraints.

Simple Capabilities

The quickest way is to use string capabilities:

@mesh.agent(capabilities=["translation", "nlp", "language"])
def my_agent(text: str) -> str:
    return text

Rich Capabilities

For production systems, use the full Capability class:

Capability.create_simple()

Capability.create_simple(name, description="", tags=[], version="1.0.0")

Create a simple capability with minimal configuration.

Example:

from capabilitymesh import Capability

capability = Capability.create_simple(
    name="text-summarization",
    description="Summarize long documents into key points",
    tags=["nlp", "summarization", "text-processing"],
    version="2.1.0"
)

Advanced Capability Features

1. Capability Versioning

from capabilitymesh import Capability, CapabilityVersion

capability = Capability(
    name="translate-v2",
    version=CapabilityVersion(major=2, minor=1, patch=0),
    description="Translation with context awareness"
)

# Check compatibility
v1 = CapabilityVersion(major=1, minor=0, patch=0)
v2 = CapabilityVersion(major=2, minor=0, patch=0)
print(v1.is_compatible(v2))  # False (different major version)

2. Performance Constraints

from capabilitymesh import Capability, CapabilityConstraints

capability = Capability(
    name="fast-translation",
    constraints=CapabilityConstraints(
        max_response_time_ms=100,     # Must respond in 100ms
        max_cost_per_call=0.001,       # Max $0.001 per call
        min_availability=0.999,        # 99.9% uptime
        rate_limit_per_minute=1000     # 1000 calls/minute
    )
)

3. Semantic Metadata

from capabilitymesh import Capability, SemanticMetadata

capability = Capability(
    name="ml-translation",
    semantic=SemanticMetadata(
        tags=["nlp", "translation", "ml"],
        categories=["Natural Language Processing", "Machine Learning"],
        domains=["linguistics", "ai"],
        keywords=["translate", "language", "convert"]
    )
)

4. Input/Output Schemas

from capabilitymesh import (
    Capability,
    CapabilityInputOutput,
    IOFormat
)

capability = Capability(
    name="structured-api",
    input_spec=CapabilityInputOutput(
        format=IOFormat.JSON,
        json_schema={
            "type": "object",
            "properties": {
                "text": {"type": "string"},
                "language": {"type": "string"}
            },
            "required": ["text"]
        },
        description="Input must be JSON with text and optional language"
    ),
    output_spec=CapabilityInputOutput(
        format=IOFormat.JSON,
        json_schema={
            "type": "object",
            "properties": {
                "translated": {"type": "string"},
                "confidence": {"type": "number"}
            }
        },
        description="Returns translated text with confidence score"
    )
)

Identity & Address API

Every agent has a unique identity and one or more network addresses.

AgentIdentity

Unique identifier for an agent with cryptographic capabilities.

Attribute Type Description
id str Unique identifier (UUID)
name str Human-readable name
agent_type AgentType Type of agent
public_key Optional[str] RSA public key (PEM format)
addresses List[AgentAddress] Network addresses

AgentAddress

Network address for communicating with an agent.

Attribute Type Description
protocol str Protocol (http, grpc, mqtt, etc.)
host str Hostname or IP
port int Port number
path Optional[str] Path (for HTTP/REST)

Example:

from capabilitymesh import AgentIdentity, AgentAddress, AgentType

identity = AgentIdentity(
    name="my-service",
    agent_type=AgentType.SERVICE,
    addresses=[
        AgentAddress(
            protocol="http",
            host="api.example.com",
            port=8080,
            path="/agent/execute"
        )
    ]
)

Storage Backends

CapabilityMesh supports multiple storage backends for different deployment scenarios.

Comparison

Backend Persistence Search Distribution Best For
InMemory No Basic Single process Development, testing
SQLite Yes (file) Full-text (FTS5) Single instance Production (single server)
Redis Yes (remote) Basic Multi-instance Distributed systems, cloud

When to Use Each

  • InMemory: Fast development, unit tests, ephemeral agents
  • SQLite: Production single-instance, excellent search, file-based
  • Redis: Microservices, horizontal scaling, shared registry
📘 Discovery Architecture: Redis enables discovery across multiple processes/machines, but execution is process-local for Python functions. For true distributed execution, use A2A adapters (HTTP-based agents). See Discovery Architecture Guide for deployment patterns.

InMemory Storage

Default storage backend. Fast, simple, no persistence.

Usage

from capabilitymesh import Mesh, InMemoryStorage

# Automatic (default)
mesh = Mesh()

# Explicit
mesh = Mesh(storage=InMemoryStorage())

Characteristics

  • ✅ Zero configuration
  • ✅ Fastest performance
  • ✅ Perfect for development
  • ❌ No persistence (data lost on restart)
  • ❌ Single process only
💡 Tip: InMemory is perfect for getting started and unit tests. Switch to SQLite for production.

SQLite Storage

File-based persistence with FTS5 full-text search.

Installation

pip install capabilitymesh[sqlite]

Usage

from capabilitymesh import Mesh
from capabilitymesh.storage import SQLiteStorage

# File-based
mesh = Mesh(storage=SQLiteStorage("agents.db"))

# In-memory SQLite (for testing)
mesh = Mesh(storage=SQLiteStorage(":memory:"))

Features

  • ✅ File-based persistence
  • ✅ FTS5 full-text search
  • ✅ SQL queries for complex filters
  • ✅ ACID transactions
  • ✅ No external dependencies

Full-Text Search

SQLite storage automatically indexes agents for fast full-text search:

# Search by name, description, or capabilities
agents = await mesh.discover("translate natural language processing")

# FTS5 finds matches in:
# - Agent name
# - Agent description
# - All capability names and descriptions

Database Schema

-- Agents table
CREATE TABLE agents (
    id TEXT PRIMARY KEY,
    name TEXT NOT NULL,
    agent_type TEXT NOT NULL,
    description TEXT,
    registered_at TIMESTAMP,
    metadata TEXT  -- JSON
);

-- Capabilities table
CREATE TABLE capabilities (
    id TEXT PRIMARY KEY,
    agent_id TEXT,
    name TEXT NOT NULL,
    description TEXT,
    capability_data TEXT,  -- JSON
    FOREIGN KEY(agent_id) REFERENCES agents(id)
);

-- Trust scores table
CREATE TABLE trust_scores (
    agent_id TEXT PRIMARY KEY,
    trust_level INTEGER,
    success_count INTEGER,
    failure_count INTEGER,
    total_executions INTEGER,
    last_execution TIMESTAMP,
    FOREIGN KEY(agent_id) REFERENCES agents(id)
);

-- FTS5 full-text search index
CREATE VIRTUAL TABLE agents_fts USING fts5(
    agent_id UNINDEXED,
    name,
    description,
    capabilities  -- concatenated capability names
);
✅ Recommended: SQLite is the recommended storage for production single-instance deployments. It's fast, reliable, and requires no external services.

Redis Storage

Distributed storage for multi-instance deployments.

Installation

pip install capabilitymesh[redis]

Usage

from capabilitymesh import Mesh
from capabilitymesh.storage import RedisStorage

# Connect to Redis
mesh = Mesh(storage=RedisStorage(
    host="localhost",
    port=6379,
    db=0,
    password=None,  # Optional
    prefix="capabilitymesh:",  # Key prefix
    ttl=None  # Optional TTL in seconds
))

Features

  • ✅ Distributed storage (multiple instances)
  • ✅ Horizontal scaling
  • ✅ High availability (Redis Cluster)
  • ✅ Optional TTL for ephemeral agents
  • ✅ Pub/sub for real-time updates

Redis Key Schema

# Agent data
capabilitymesh:agents:{agent_id}  -> Hash

# Capabilities
capabilitymesh:capabilities:{cap_id}  -> Hash

# Agent ID index
capabilitymesh:agent_ids  -> Set

# Trust scores
capabilitymesh:trust:{agent_id}  -> Hash

Multi-Instance Setup

# Instance 1
mesh1 = Mesh(storage=RedisStorage(host="redis.example.com"))
await mesh1.register(agent1)

# Instance 2 (sees agent1!)
mesh2 = Mesh(storage=RedisStorage(host="redis.example.com"))
agents = await mesh2.list_agents()  # Contains agent1!
⚠️ Note: Requires a running Redis server. Start with: redis-server or use Docker: docker run -d -p 6379:6379 redis
📘 Important: This example shows cross-process discovery. Instance 2 can discover agents registered by Instance 1. However, execution is process-local - Instance 2 cannot execute Instance 1's Python functions directly. For distributed execution:
  • Use A2A adapters for HTTP-based agents
  • Implement custom RPC/message queue coordination
  • See Discovery Architecture for patterns

Trust Management

CapabilityMesh includes a built-in trust system that automatically tracks agent reliability based on execution results.

How It Works

  1. Every time you call mesh.execute(), the result (success/failure) is recorded
  2. Trust scores automatically adjust based on success rate and execution count
  3. You can filter discovery by minimum trust level
  4. Manual trust levels can override automatic calculations

Automatic Trust Tracking

from capabilitymesh import Mesh

mesh = Mesh()

@mesh.agent(name="reliable", capabilities=["task"])
def reliable_agent(x):
    return x * 2  # Always succeeds

@mesh.agent(name="unstable", capabilities=["task"])
def unstable_agent(x):
    if x % 2 == 0:
        raise ValueError("Failed!")
    return x * 2

# Execute multiple times
for i in range(20):
    try:
        await mesh.execute(reliable_id, i)
        await mesh.execute(unstable_id, i)
    except:
        pass  # unstable_agent fails 50% of the time

# Check trust scores
reliable_score = await mesh.trust.get_score(reliable_id)
print(f"Reliable: {reliable_score.level.name}")  # HIGH or VERIFIED
print(f"Success rate: {reliable_score.success_rate:.1%}")  # ~100%

unstable_score = await mesh.trust.get_score(unstable_id)
print(f"Unstable: {unstable_score.level.name}")  # LOW or MEDIUM
print(f"Success rate: {unstable_score.success_rate:.1%}")  # ~50%

Trust-Based Discovery

from capabilitymesh import TrustLevel

# Only discover agents with MEDIUM or higher trust
trusted_agents = await mesh.discover(
    "task processing",
    min_trust=TrustLevel.MEDIUM
)

# Only get highly trusted agents
verified_agents = await mesh.discover(
    "critical operations",
    min_trust=TrustLevel.HIGH
)

Trust Levels

CapabilityMesh uses a 5-level trust system:

Level Value Criteria Description
UNTRUSTED 0 Never executed New agents start here
LOW 1 < 50% success OR < 5 executions Unreliable or untested
MEDIUM 2 50-80% success, ≥ 5 executions Moderately reliable
HIGH 3 80-95% success, ≥ 10 executions Very reliable
VERIFIED 4 > 95% success, ≥ 20 executions OR manual Extremely reliable or manually verified

Trust Level Progression

UNTRUSTED (0 executions)
    ↓
LOW (< 5 executions OR low success rate)
    ↓
MEDIUM (5+ executions, 50-80% success)
    ↓
HIGH (10+ executions, 80-95% success)
    ↓
VERIFIED (20+ executions, > 95% success OR manually set)
💡 Note: Trust levels are auto-calculated unless manually set. Manual trust levels are never overridden by auto-calculation.

Trust API

SimpleTrustManager

Accessible via mesh.trust.

mesh.trust.get_score()

async def get_score(agent_id: str) -> TrustScore

Get trust score for an agent.

Returns: TrustScore object with metrics

score = await mesh.trust.get_score(agent_id)
print(f"Level: {score.level.name}")
print(f"Success rate: {score.success_rate:.1%}")
print(f"Total executions: {score.total_executions}")
print(f"Last execution: {score.last_execution}")

mesh.trust.set_level()

async def set_level(agent_id: str, level: TrustLevel, reason: str = None)

Manually set trust level (overrides auto-calculation).

from capabilitymesh import TrustLevel

# Manually verify an agent
await mesh.trust.set_level(
    agent_id,
    TrustLevel.VERIFIED,
    reason="Passed security audit"
)

# Blacklist an agent
await mesh.trust.set_level(
    agent_id,
    TrustLevel.UNTRUSTED,
    reason="Security vulnerability found"
)

mesh.trust.record_execution()

async def record_execution(agent_id: str, success: bool, duration: float = 0.0)

Record execution result. Automatically called by mesh.execute().

# Usually you don't call this directly, but you can:
await mesh.trust.record_execution(agent_id, success=True, duration=0.123)

mesh.trust.list_trusted()

async def list_trusted(min_level: TrustLevel = TrustLevel.MEDIUM) -> List[TrustScore]

List all agents meeting minimum trust level.

# Get all highly trusted agents
trusted = await mesh.trust.list_trusted(TrustLevel.HIGH)
for score in trusted:
    print(f"{score.agent_id}: {score.level.name} ({score.success_rate:.0%})")

TrustScore Object

Attribute Type Description
agent_id str Agent identifier
level TrustLevel Current trust level
success_count int Number of successful executions
failure_count int Number of failed executions
total_executions int Total execution count
success_rate float Success rate (0.0 - 1.0)
last_execution datetime Timestamp of last execution
manually_set bool Whether level was manually set

Embeddings & Semantic Search

CapabilityMesh uses embeddings to enable semantic discovery with natural language queries.

Default: KeywordEmbedder

By default, CapabilityMesh uses a TF-IDF based keyword embedder that requires no external dependencies:

from capabilitymesh import Mesh

mesh = Mesh()  # Automatically uses KeywordEmbedder

# Natural language queries work!
agents = await mesh.discover("translate text to Spanish")
agents = await mesh.discover("process natural language")
agents = await mesh.discover("analyze sentiment of reviews")

How Semantic Search Works

  1. When you register an agent, its capabilities are embedded into vectors
  2. When you search, the query is also embedded
  3. Cosine similarity finds the best matches
  4. Results are ranked by similarity score

Custom Embedders

You can provide your own embedder:

from capabilitymesh import Mesh, Embedder

class MyEmbedder(Embedder):
    async def embed(self, text: str) -> List[float]:
        # Your embedding logic here
        # Could use OpenAI, sentence-transformers, etc.
        return vector

mesh = Mesh(embedder=MyEmbedder())
🔮 Coming Soon: v1.0.0-beta.1 will include LocalEmbedder (sentence-transformers) and OpenAIEmbedder for even better semantic search.

Framework Integration

CapabilityMesh works with agents from any framework. Here's how to integrate popular frameworks:

CrewAI

from capabilitymesh import Mesh
from crewai import Agent as CrewAgent

mesh = Mesh()

# Create CrewAI agent
researcher = CrewAgent(
    role="Senior Research Analyst",
    goal="Uncover cutting-edge developments in AI",
    backstory="Expert researcher with deep domain knowledge",
    verbose=True
)

# Register with CapabilityMesh
await mesh.register(
    researcher,
    name="ai-researcher",
    capabilities=["research", "analysis", "information-gathering"]
)

# Discover and use
agents = await mesh.discover("research AI developments")
# agents[0] is the CrewAI researcher!

AutoGen

from capabilitymesh import Mesh
from autogen import AssistantAgent, UserProxyAgent

mesh = Mesh()

# Create AutoGen agents
coder = AssistantAgent(
    name="coder",
    system_message="You are an expert Python developer",
    llm_config={"config_list": [...]}
)

reviewer = AssistantAgent(
    name="code_reviewer",
    system_message="You review code for quality and security"
)

# Register with CapabilityMesh
await mesh.register(coder, name="python-coder", capabilities=["coding", "python"])
await mesh.register(reviewer, name="code-reviewer", capabilities=["review", "security"])

# Discover across frameworks
coders = await mesh.discover("write python code")
# Returns AutoGen coder!

LangGraph

from capabilitymesh import Mesh
from langgraph.graph import StateGraph

mesh = Mesh()

# Create LangGraph workflow
workflow = StateGraph(...)
# ... define nodes and edges ...

# Register with CapabilityMesh
await mesh.register(
    workflow,
    name="data-pipeline",
    capabilities=["data-processing", "workflow", "etl"]
)

# Discover
pipelines = await mesh.discover("process and transform data")

A2A Protocol

from capabilitymesh import Mesh

mesh = Mesh()

# A2A agents are automatically detected and work seamlessly
# CapabilityMesh can convert any agent to/from A2A format

Custom Agents

from capabilitymesh import Mesh

mesh = Mesh()

# Any callable works!
class MyAgent:
    def execute(self, task):
        return f"Processed: {task}"

agent = MyAgent()
await mesh.register(agent, name="my-agent", capabilities=["task-processing"])

# Or just functions
@mesh.agent(capabilities=["calculation"])
def calculator(a: int, b: int) -> int:
    return a + b

Multi-Agent Workflows

Coordinate multiple agents to accomplish complex tasks.

Sequential Pipeline

from capabilitymesh import Mesh

mesh = Mesh()

# Register pipeline agents
@mesh.agent(capabilities=["extraction"])
async def extract(pdf_path):
    return "Extracted text..."

@mesh.agent(capabilities=["summarization"])
async def summarize(text):
    return "Summary..."

@mesh.agent(capabilities=["translation"])
async def translate(text):
    return "Translated..."

# Execute pipeline
pdf = "document.pdf"

extractors = await mesh.discover("extract text from pdf")
text = await mesh.execute(extractors[0].id, pdf)

summarizers = await mesh.discover("summarize text")
summary = await mesh.execute(summarizers[0].id, text)

translators = await mesh.discover("translate to spanish")
result = await mesh.execute(translators[0].id, summary)

Parallel Processing

import asyncio
from capabilitymesh import Mesh

mesh = Mesh()

# Process multiple items in parallel
items = ["doc1.pdf", "doc2.pdf", "doc3.pdf"]

extractors = await mesh.discover("extract text")
extractor_id = extractors[0].id

# Execute in parallel
results = await asyncio.gather(*[
    mesh.execute(extractor_id, item)
    for item in items
])

# results = ["text1", "text2", "text3"]

Conditional Routing

from capabilitymesh import Mesh

mesh = Mesh()

async def process_document(doc):
    # Analyze document type
    analyzers = await mesh.discover("analyze document")
    doc_type = await mesh.execute(analyzers[0].id, doc)

    # Route to appropriate handler
    if doc_type == "pdf":
        handlers = await mesh.discover("extract pdf")
    elif doc_type == "image":
        handlers = await mesh.discover("ocr text extraction")
    else:
        handlers = await mesh.discover("generic text extraction")

    return await mesh.execute(handlers[0].id, doc)

Error Handling with Fallbacks

from capabilitymesh import Mesh, TrustLevel

mesh = Mesh()

async def robust_execution(capability, task):
    # Get agents sorted by trust
    agents = await mesh.discover(
        capability,
        min_trust=TrustLevel.MEDIUM
    )

    # Try agents in order until one succeeds
    for agent in agents:
        try:
            result = await mesh.execute(agent.id, task)
            return result
        except Exception as e:
            print(f"Agent {agent.name} failed: {e}")
            continue

    raise Exception("All agents failed")

# Usage
result = await robust_execution("translation", "Hello world")

Fan-Out / Fan-In

import asyncio
from capabilitymesh import Mesh

mesh = Mesh()

async def parallel_analysis(text):
    # Fan-out: Send to multiple analyzers
    sentiment_agents = await mesh.discover("sentiment analysis")
    entity_agents = await mesh.discover("entity extraction")
    topic_agents = await mesh.discover("topic classification")

    # Execute all in parallel
    sentiment, entities, topics = await asyncio.gather(
        mesh.execute(sentiment_agents[0].id, text),
        mesh.execute(entity_agents[0].id, text),
        mesh.execute(topic_agents[0].id, text)
    )

    # Fan-in: Combine results
    return {
        "sentiment": sentiment,
        "entities": entities,
        "topics": topics
    }

Best Practices

1. Use Descriptive Capability Names

# Good
capabilities=["translation", "english-to-spanish", "nlp", "language-processing"]

# Less effective
capabilities=["translate", "lang"]

2. Add Rich Metadata

from capabilitymesh import Capability, SemanticMetadata

capability = Capability(
    name="ml-translation",
    description="Neural machine translation with context awareness",
    semantic=SemanticMetadata(
        tags=["nlp", "translation", "ml", "neural"],
        categories=["Natural Language Processing"],
        domains=["linguistics", "ai"]
    )
)

3. Use Trust Levels in Production

# Always filter by trust in production
agents = await mesh.discover(
    "critical financial calculation",
    min_trust=TrustLevel.HIGH  # Only highly reliable agents
)

4. Choose the Right Storage

# Development
mesh = Mesh()  # InMemory (default)

# Production (single instance)
mesh = Mesh(storage=SQLiteStorage("agents.db"))

# Production (distributed)
mesh = Mesh(storage=RedisStorage(host="redis.example.com"))

5. Handle Errors Gracefully

from capabilitymesh.core.exceptions import ExecutionError

try:
    result = await mesh.execute(agent_id, task)
except ExecutionError as e:
    print(f"Execution failed: {e}")
    # Try fallback agent
    agents = await mesh.discover(capability)
    result = await mesh.execute(agents[1].id, task)

6. Use Async Where Possible

# Preferred (async)
agents = await mesh.discover("query")
result = await mesh.execute(agent_id, task)

# Sync wrappers available but less efficient
# agents = mesh.discover_sync("query")
# result = mesh.execute_sync(agent_id, task)

7. Version Your Capabilities

from capabilitymesh import Capability, CapabilityVersion

# Good: Semantic versioning
capability = Capability(
    name="translation-api",
    version=CapabilityVersion(major=2, minor=1, patch=0)
)

# Check compatibility
if capability.version.is_compatible(client_version):
    # Safe to use
    ...

8. Monitor Trust Scores

# Regularly check agent health
async def health_check():
    agents = await mesh.list_agents()
    for agent in agents:
        score = await mesh.trust.get_score(agent.id)
        if score.level < TrustLevel.MEDIUM:
            print(f"⚠️ {agent.name} has low trust: {score.success_rate:.1%}")
        if score.total_executions > 100 and score.success_rate < 0.5:
            print(f"🚨 {agent.name} is unreliable, consider removing")

Example: Basic Usage

Complete example demonstrating registration, discovery, and execution.

import asyncio
from capabilitymesh import Mesh, Capability

async def main():
    # Initialize mesh
    mesh = Mesh()

    # Method 1: Register with decorator
    @mesh.agent(name="summarizer", capabilities=["summarization", "nlp"])
    async def summarize(text: str) -> str:
        return f"Summary: {text[:100]}..."

    # Method 2: Register with Capability objects
    capability = Capability.create_simple(
        name="translation",
        description="Translate text between languages",
        tags=["nlp", "language", "translation"]
    )

    @mesh.agent(name="translator", capabilities=[capability])
    def translate(text: str, target_lang: str = "es") -> str:
        return f"[{target_lang}] {text}"

    # Method 3: Register explicitly
    def analyze(text: str) -> dict:
        return {"sentiment": "positive", "confidence": 0.95}

    await mesh.register(
        agent=analyze,
        name="sentiment-analyzer",
        capabilities=["sentiment-analysis", "nlp"]
    )

    # List all agents
    print("\n=== Registered Agents ===")
    agents = await mesh.list_agents()
    for agent in agents:
        caps = ", ".join([c.name for c in agent.capabilities])
        print(f"{agent.name}: [{caps}]")

    # Discover by semantic query
    print("\n=== Discovery ===")
    nlp_agents = await mesh.discover("process natural language")
    print(f"NLP agents: {[a.name for a in nlp_agents]}")

    # Execute tasks
    print("\n=== Execution ===")
    text = "CapabilityMesh is a powerful framework for multi-agent systems."

    # Summarize
    summarizers = await mesh.discover("summarize")
    if summarizers:
        result = await mesh.execute(summarizers[0].id, text)
        print(f"Summary: {result}")

    # Translate
    translators = await mesh.discover("translate")
    if translators:
        result = await mesh.execute(translators[0].id, text, target_lang="fr")
        print(f"Translation: {result}")

    # Analyze sentiment
    analyzers = await mesh.discover("sentiment")
    if analyzers:
        result = await mesh.execute(analyzers[0].id, text)
        print(f"Sentiment: {result}")

if __name__ == "__main__":
    asyncio.run(main())

Example: Storage Backends

import asyncio
from capabilitymesh import Mesh
from capabilitymesh.storage import InMemoryStorage, SQLiteStorage

async def demo_storage():
    print("=== InMemory Storage ===")
    mesh1 = Mesh(storage=InMemoryStorage())

    @mesh1.agent(capabilities=["task"])
    def agent1(x):
        return x * 2

    agents = await mesh1.list_agents()
    print(f"Agents: {len(agents)}")

    print("\n=== SQLite Storage ===")
    mesh2 = Mesh(storage=SQLiteStorage("demo.db"))

    @mesh2.agent(capabilities=["task"])
    def agent2(x):
        return x * 3

    # Persists to disk!
    agents = await mesh2.list_agents()
    print(f"Agents: {len(agents)}")

    # Full-text search
    results = await mesh2.discover("task processing agent")
    print(f"Found: {[a.name for a in results]}")

asyncio.run(demo_storage())

Example: Trust Management

import asyncio
import random
from capabilitymesh import Mesh, TrustLevel

async def demo_trust():
    mesh = Mesh()

    @mesh.agent(name="reliable", capabilities=["task"])
    def reliable_agent(x):
        return x * 2  # Always works

    @mesh.agent(name="unstable", capabilities=["task"])
    def unstable_agent(x):
        if random.random() < 0.5:
            raise ValueError("Random failure")
        return x * 2

    # Get agent IDs
    agents = await mesh.list_agents()
    reliable_id = next(a.id for a in agents if a.name == "reliable")
    unstable_id = next(a.id for a in agents if a.name == "unstable")

    # Execute multiple times
    print("Building trust history...")
    for i in range(20):
        try:
            await mesh.execute(reliable_id, i)
        except:
            pass

        try:
            await mesh.execute(unstable_id, i)
        except:
            pass

    # Check trust scores
    print("\n=== Trust Scores ===")
    reliable_score = await mesh.trust.get_score(reliable_id)
    print(f"Reliable agent:")
    print(f"  Level: {reliable_score.level.name}")
    print(f"  Success rate: {reliable_score.success_rate:.1%}")
    print(f"  Executions: {reliable_score.total_executions}")

    unstable_score = await mesh.trust.get_score(unstable_id)
    print(f"\nUnstable agent:")
    print(f"  Level: {unstable_score.level.name}")
    print(f"  Success rate: {unstable_score.success_rate:.1%}")
    print(f"  Executions: {unstable_score.total_executions}")

    # Trust-based discovery
    print("\n=== Trust-Based Discovery ===")
    trusted = await mesh.discover("task", min_trust=TrustLevel.MEDIUM)
    print(f"Trusted agents (MEDIUM+): {[a.name for a in trusted]}")

asyncio.run(demo_trust())

Example: Semantic Search

import asyncio
from capabilitymesh import Mesh, Capability

async def demo_semantic_search():
    mesh = Mesh()

    # Register agents with rich descriptions
    @mesh.agent(
        name="summarizer",
        capabilities=[
            Capability.create_simple(
                "text-summarization",
                "Extract key points and create concise summaries of long documents",
                tags=["nlp", "summarization", "text-processing", "extraction"]
            )
        ]
    )
    def summarize(text):
        return text[:100]

    @mesh.agent(
        name="translator",
        capabilities=[
            Capability.create_simple(
                "translation",
                "Convert text from one language to another with context awareness",
                tags=["nlp", "translation", "language", "localization"]
            )
        ]
    )
    def translate(text, lang="es"):
        return f"[{lang}] {text}"

    @mesh.agent(
        name="sentiment-analyzer",
        capabilities=[
            Capability.create_simple(
                "sentiment-analysis",
                "Analyze emotional tone and sentiment of text reviews and feedback",
                tags=["nlp", "sentiment", "emotion", "analysis"]
            )
        ]
    )
    def analyze_sentiment(text):
        return {"sentiment": "positive"}

    # Semantic queries
    print("=== Semantic Search Examples ===\n")

    queries = [
        "I need to make this document shorter",
        "Convert text to Spanish",
        "What is the emotional tone of customer reviews?",
        "Process natural language",
    ]

    for query in queries:
        print(f"Query: '{query}'")
        agents = await mesh.discover(query, limit=2)
        print(f"Found: {[a.name for a in agents]}\n")

asyncio.run(demo_semantic_search())

Troubleshooting

Common Issues

1. ModuleNotFoundError

ModuleNotFoundError: No module named 'capabilitymesh'

Solution:

pip install capabilitymesh

2. Redis Connection Error

Redis connection failed: Error connecting to localhost:6379

Solution:

# Start Redis server
redis-server

# Or use Docker
docker run -d -p 6379:6379 redis

3. SQLite FTS5 Not Available

SQLite FTS5 extension not available

Solution:

pip install pysqlite3-binary  # Includes FTS5

4. Async Execution Errors

RuntimeError: This event loop is already running

Solution:

# Use asyncio.run() for top-level
asyncio.run(main())

# Or use nest_asyncio in Jupyter
import nest_asyncio
nest_asyncio.apply()

5. No Agents Found

agents = await mesh.discover("query")
# agents is empty []

Solution:

  • Check that agents are registered before discovery
  • Lower min_similarity threshold
  • Use more specific capability names
  • Try exact keyword match instead of semantic

6. Trust Scores Not Updating

Solution: Trust scores update after each mesh.execute() call. Ensure you're using mesh.execute() and not calling agents directly.

FAQ

General Questions

Q: Do I need to use a specific framework?

A: No! CapabilityMesh works with ANY framework (CrewAI, AutoGen, LangGraph, A2A) and plain Python functions.

Q: Is CapabilityMesh production-ready?

A: v1.0.0-alpha.2 is feature-complete with 139 passing tests. It's ready for production use, but expect some refinement based on community feedback.

Q: What's the performance impact?

A: Minimal. InMemory discovery is < 1ms. SQLite is < 10ms. Redis depends on network latency.

Q: Can I use multiple storage backends?

A: Not in the same Mesh instance, but you can create multiple Mesh instances with different storage backends.

Technical Questions

Q: How does semantic search work without an LLM?

A: By default, CapabilityMesh uses TF-IDF based keyword embeddings. For better results, you can plugin OpenAI or sentence-transformers embedders (coming in v1.0-beta).

Q: Are trust scores persistent?

A: Yes! Trust scores are stored in the storage backend (SQLite or Redis). They persist across restarts.

Q: Can I disable trust tracking?

A: Not currently, but you can ignore trust scores in discovery by not setting min_trust.

Q: How do I migrate between storage backends?

A: You'll need to re-register agents with the new storage backend. We're working on a migration tool for v1.1.

Framework Integration

Q: Does CapabilityMesh execute framework agents directly?

A: It depends. For Python functions, yes. For framework agents, you may need to call the framework's execution API. mesh.get_native() gives you the original agent.

Q: Can I mix agents from different frameworks?

A: Yes! That's the whole point. Register agents from CrewAI, AutoGen, and LangGraph, then discover and coordinate them together.

Q: Is A2A protocol required?

A: No. A2A compatibility is built-in, but it's not required. CapabilityMesh works with or without A2A.

Changelog

v1.0.0-alpha.2 (2025-12-30)

Decorator Fix Release

Core Features

  • ✅ Mesh API for agent registration and discovery
  • ✅ Multi-framework support (CrewAI, AutoGen, LangGraph, A2A)
  • ✅ Fixed @mesh.agent() decorator - Immediate registration, no wrapper overhead
  • ✅ Semantic search with keyword embeddings
  • ✅ Three storage backends (InMemory, SQLite, Redis)
  • ✅ SimpleTrustManager with 5-level trust system
  • ✅ Rich capability schemas with versioning
  • ✅ A2A protocol compatibility

Storage

  • ✅ InMemoryStorage (default, zero-config)
  • ✅ SQLiteStorage with FTS5 full-text search
  • ✅ RedisStorage for distributed deployments

Trust System

  • ✅ Automatic trust scoring based on execution results
  • ✅ 5 trust levels (UNTRUSTED → VERIFIED)
  • ✅ Manual trust level overrides
  • ✅ Trust-based discovery filtering
  • ✅ Trust statistics and reporting

Testing

  • ✅ 139 Tests passing (100%)
  • ✅ Comprehensive unit tests
  • ✅ Integration tests for all features
  • ✅ Framework integration tests

Documentation

  • ✅ Complete README with examples
  • ✅ 6 comprehensive examples
  • ✅ EXAMPLES_GUIDE.md
  • ✅ DISCOVERY_ARCHITECTURE.md - Deployment patterns guide
  • ✅ DECORATOR_FIX_SUMMARY.md - Technical details of decorator improvements
  • ✅ CHANGELOG.md - Complete version history
  • ✅ API documentation (this file)
  • ✅ CONTRIBUTING.md

Planned Releases

v1.0.0-beta.1 (Coming Soon)

  • LocalEmbedder (sentence-transformers)
  • OpenAIEmbedder for better semantic search
  • Additional storage backends
  • Performance optimizations
  • Community feedback incorporation

v1.0.0 (Stable)

  • Production hardening
  • Documentation site
  • Tutorial videos
  • Case studies

v1.1.0

  • P2P discovery (mDNS, Gossip, DHT)
  • Advanced negotiation protocols
  • CLI tools
  • Migration utilities

CapabilityMesh v1.0.0-alpha.2

The first and only universal capability mesh for multi-agent systems

GitHubPyPIIssuesDiscussions

Built with ❤️ by Sai Kumar Yava