Skip to content

πŸ€–πŸ•ΈοΈ Production-grade multi-agent orchestration framework powered by Pregel BSP. Build sophisticated AI workflows with parallel execution, state management, and observability.

License

Notifications You must be signed in to change notification settings

hupe1980/agentmesh

Folders and files

NameName
Last commit message
Last commit date

Latest commit

Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 

Repository files navigation

πŸ€–πŸ•ΈοΈ AgentMesh

Go Version License Go Report Card GoDoc

Production-grade multi-agent orchestration framework powered by Pregel-style bulk-synchronous parallel (BSP) graph processing. Build sophisticated AI agent workflows with parallel execution, state management, and enterprise-grade observability.

Requires Go 1.24+


✨ Key Features

Core Engine

  • πŸ”„ Pregel BSP Execution - Parallel graph processing with optimized concurrency (4-10x faster state access)
  • 🧠 LLM Integration - Native support for OpenAI, Anthropic, Gemini, Amazon Bedrock, Ollama with streaming and reasoning models
  • πŸ’Ύ State Management - Lock-free channel-based state with checkpointing, managed value descriptors, and resume-time rehydration hooks
  • πŸ› οΈ Tool Orchestration - Type-safe function calling with automatic schema generation
  • πŸ”€ Model Routing - Intelligent model selection based on cost, capabilities, and availability

Production Ready

  • βœ… Graph Validation - Comprehensive compile-time error checking (cycles, missing nodes, unreachable paths)
  • πŸ’Ύ Checkpointing - Persistent state with auto-resume, encryption, and signing
  • ♻️ Zero-Copy Resume - Copy-on-write checkpoint restores reuse the saved map and only allocate when keys mutate (10k+ key checkpoints resume without GC spikes)
  • ⏸️ Human-in-the-Loop - Approval workflows with conditional guards and audit trails
  • πŸ“Š Observability - Built-in OpenTelemetry metrics, non-blocking event bus fan-out, and distributed tracing
  • πŸ” Resilience - Configurable retry policies, circuit breakers, and timeouts
  • πŸ”’ Security - WASM sandboxing for untrusted code, input validation, and integrity checks

AI/ML Features

  • πŸ”’ Embeddings & Memory - Semantic search and long-term conversation storage
  • πŸ” RAG Integration - AWS Bedrock Knowledge Bases and Kendra support
  • 🧠 Native Reasoning - First-class support for o1, o3, Gemini 2.0, Claude reasoning models
  • πŸ“ Prompt Templates - Variable substitution with reusable patterns

Extensibility

  • 🀝 A2A Protocol - Multi-agent collaboration with standardized communication
  • πŸ”Œ MCP Support - Dynamic tool discovery from Model Context Protocol servers
  • 🌐 LangChainGo Tools - Import existing tool ecosystem
  • βš™οΈ Custom Backends - Pluggable MessageBus for distributed execution (Redis, Kafka)

πŸš€ Quick Start

Installation

go get github.com/hupe1980/agentmesh@latest

Hello World ReAct Agent

package main

import (
    "context"
    "fmt"
    "log"
    "os"
    "strings"

    "github.com/hupe1980/agentmesh/pkg/agent"
    "github.com/hupe1980/agentmesh/pkg/graph"
    "github.com/hupe1980/agentmesh/pkg/message"
    "github.com/hupe1980/agentmesh/pkg/model/openai"
    "github.com/hupe1980/agentmesh/pkg/tool"
)

// WeatherArgs defines the JSON schema for the weather tool.
type WeatherArgs struct {
    Location string `json:"location" jsonschema:"description=The city to get weather for"`
}

func main() {
    ctx := context.Background()

    // Validate API key
    if strings.TrimSpace(os.Getenv("OPENAI_API_KEY")) == "" {
        log.Fatal("OPENAI_API_KEY environment variable is required")
    }

    // Create OpenAI model (uses OPENAI_API_KEY env var)
    model := openai.NewModel()

    // Define a tool with typed arguments
    weatherTool, err := tool.NewFuncTool(
        "get_weather",
        "Get current weather for a location",
        func(ctx context.Context, args WeatherArgs) (string, error) {
            return fmt.Sprintf("Weather in %s: Sunny, 72Β°F", args.Location), nil
        },
    )
    if err != nil {
        log.Fatal(err)
    }

    // Create ReAct agent
    reactAgent, err := agent.NewReAct(model,
        agent.WithTools(weatherTool),
        agent.WithMaxIterations(5),
    )
    if err != nil {
        log.Fatal(err)
    }

    // Execute agent and get the final result
    messages := []message.Message{
        message.NewHumanMessage("What's the weather in San Francisco?"),
    }

    lastMsg, err := graph.Last(reactAgent.Run(ctx, messages))
    if err != nil {
        log.Fatal(err)
    }

    fmt.Println(message.Stringify(lastMsg))
}

Output:

Thought: I need to check the weather in San Francisco
Action: get_weather("San Francisco")
Observation: Weather in San Francisco: Sunny, 72Β°F
The weather in San Francisco is currently sunny with a temperature of 72Β°F.

πŸ“š Documentation

Getting Started

Core Concepts

Advanced Features


🎨 Examples

Explore 31 comprehensive examples in the examples/ directory:

Example Description
basic_agent Simple ReAct agent with tools
conversational_agent Agent with long-term memory across turns
document_loader Document loading and ingestion pipeline
supervisor_agent Multi-agent coordination with supervisor
reflection_agent Self-critique and iterative refinement
checkpointing State persistence and resume
human_approval Approval workflows with conditional guards
parallel_tasks Concurrent node execution
streaming Real-time response streaming
middleware Rate limiting and circuit breakers
custom_scheduler Priority and resource-aware scheduling
observability OpenTelemetry integration
a2a_integration Agent-to-agent communication
wasm_tool Sandboxed tool execution

See all examples β†’

Running Examples

# Run any example
cd examples/basic_agent
go run main.go

# Set required environment variables
export OPENAI_API_KEY=your-key-here
export ANTHROPIC_API_KEY=your-key-here  # For Anthropic examples

πŸ—οΈ Architecture

AgentMesh uses a layered architecture with clean separation of concerns:

β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚           Application Layer (pkg/agent)                     β”‚
β”‚  β€’ ReActAgent, SupervisorAgent, RAGAgent                    β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
                         β”‚
                         β–Ό
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚           Graph Orchestration (pkg/graph)                   β”‚
β”‚  β€’ Workflow construction β€’ State management β€’ Validation    β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
                         β”‚
                         β–Ό
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚           Execution Engine (pkg/pregel)                     β”‚
β”‚                                                             β”‚
β”‚  β€’ BSP Runtime      β€’ Worker pools      β€’ MessageBus        β”‚
β”‚  β€’ Superstep sync   β€’ Sharded frontier  β€’ Backpressure      β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Key Components:

  • Graph - Define nodes, edges, and execution flow with NodeFunc
  • BSPState - Copy-on-write state with typed Key[T] and ListKey[T]
  • Pregel Runtime - Bulk-synchronous parallel execution with sharded message passing
  • Checkpointer - State persistence with encryption, signing, and two-phase commit
  • Agents - High-level abstractions (ReAct, Supervisor, RAG)

Learn more about the architecture β†’


πŸ§ͺ Testing

# Run all tests
go test ./...

# With coverage
go test ./... -coverprofile=coverage.out
go tool cover -html=coverage.out

# Run benchmarks
go test ./... -bench=. -benchmem

🀝 Contributing

Contributions are welcome!

  1. Fork the repository
  2. Create a feature branch (git checkout -b feature/amazing-feature)
  3. Write tests for your changes
  4. Commit your changes (git commit -m 'Add amazing feature')
  5. Push to the branch (git push origin feature/amazing-feature)
  6. Open a Pull Request

Development Guidelines

  • All new code must have tests (target 85%+ coverage)
  • Run go fmt and golangci-lint before committing
  • Update documentation for public API changes
  • Add examples for new features

πŸ“„ License

Licensed under the Apache License 2.0 - see LICENSE for details.


πŸ™ Acknowledgments

  • Pregel Paper - Google's bulk-synchronous parallel graph processing model
  • Go Community - Exceptional tooling and ecosystem
  • OpenTelemetry - Production-grade observability standards

⭐ Star this repo if you find it useful!

Made with ❀️ by hupe1980

About

πŸ€–πŸ•ΈοΈ Production-grade multi-agent orchestration framework powered by Pregel BSP. Build sophisticated AI workflows with parallel execution, state management, and observability.

Topics

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages