Production-grade multi-agent orchestration framework powered by Pregel-style bulk-synchronous parallel (BSP) graph processing. Build sophisticated AI agent workflows with parallel execution, state management, and enterprise-grade observability.
Requires Go 1.24+
- π Pregel BSP Execution - Parallel graph processing with optimized concurrency (4-10x faster state access)
- π§ LLM Integration - Native support for OpenAI, Anthropic, Gemini, Amazon Bedrock, Ollama with streaming and reasoning models
- πΎ State Management - Lock-free channel-based state with checkpointing, managed value descriptors, and resume-time rehydration hooks
- π οΈ Tool Orchestration - Type-safe function calling with automatic schema generation
- π Model Routing - Intelligent model selection based on cost, capabilities, and availability
- β Graph Validation - Comprehensive compile-time error checking (cycles, missing nodes, unreachable paths)
- πΎ Checkpointing - Persistent state with auto-resume, encryption, and signing
- β»οΈ Zero-Copy Resume - Copy-on-write checkpoint restores reuse the saved map and only allocate when keys mutate (10k+ key checkpoints resume without GC spikes)
- βΈοΈ Human-in-the-Loop - Approval workflows with conditional guards and audit trails
- π Observability - Built-in OpenTelemetry metrics, non-blocking event bus fan-out, and distributed tracing
- π Resilience - Configurable retry policies, circuit breakers, and timeouts
- π Security - WASM sandboxing for untrusted code, input validation, and integrity checks
- π’ Embeddings & Memory - Semantic search and long-term conversation storage
- π RAG Integration - AWS Bedrock Knowledge Bases and Kendra support
- π§ Native Reasoning - First-class support for o1, o3, Gemini 2.0, Claude reasoning models
- π Prompt Templates - Variable substitution with reusable patterns
- π€ A2A Protocol - Multi-agent collaboration with standardized communication
- π MCP Support - Dynamic tool discovery from Model Context Protocol servers
- π LangChainGo Tools - Import existing tool ecosystem
- βοΈ Custom Backends - Pluggable MessageBus for distributed execution (Redis, Kafka)
go get github.com/hupe1980/agentmesh@latestpackage main
import (
"context"
"fmt"
"log"
"os"
"strings"
"github.com/hupe1980/agentmesh/pkg/agent"
"github.com/hupe1980/agentmesh/pkg/graph"
"github.com/hupe1980/agentmesh/pkg/message"
"github.com/hupe1980/agentmesh/pkg/model/openai"
"github.com/hupe1980/agentmesh/pkg/tool"
)
// WeatherArgs defines the JSON schema for the weather tool.
type WeatherArgs struct {
Location string `json:"location" jsonschema:"description=The city to get weather for"`
}
func main() {
ctx := context.Background()
// Validate API key
if strings.TrimSpace(os.Getenv("OPENAI_API_KEY")) == "" {
log.Fatal("OPENAI_API_KEY environment variable is required")
}
// Create OpenAI model (uses OPENAI_API_KEY env var)
model := openai.NewModel()
// Define a tool with typed arguments
weatherTool, err := tool.NewFuncTool(
"get_weather",
"Get current weather for a location",
func(ctx context.Context, args WeatherArgs) (string, error) {
return fmt.Sprintf("Weather in %s: Sunny, 72Β°F", args.Location), nil
},
)
if err != nil {
log.Fatal(err)
}
// Create ReAct agent
reactAgent, err := agent.NewReAct(model,
agent.WithTools(weatherTool),
agent.WithMaxIterations(5),
)
if err != nil {
log.Fatal(err)
}
// Execute agent and get the final result
messages := []message.Message{
message.NewHumanMessage("What's the weather in San Francisco?"),
}
lastMsg, err := graph.Last(reactAgent.Run(ctx, messages))
if err != nil {
log.Fatal(err)
}
fmt.Println(message.Stringify(lastMsg))
}Output:
Thought: I need to check the weather in San Francisco
Action: get_weather("San Francisco")
Observation: Weather in San Francisco: Sunny, 72Β°F
The weather in San Francisco is currently sunny with a temperature of 72Β°F.
- π Getting Started Guide - Complete tutorial with examples
- ποΈ Architecture Overview - Understanding the Pregel BSP design
- π API Reference - Complete godoc
- πΈοΈ Graph Building - Nodes, edges, and execution flow
- ποΈ State Management - Channels, reducers, checkpointing, and approval workflows
- π§ Tools Guide - Building and integrating tools
- π€ Model Integration - LLM provider setup and configuration
- π€ Agent Patterns - ReAct, RAG, and Supervisor agents
- π Observability - Metrics, tracing, and monitoring
- π Middleware - Caching, rate limiting, circuit breakers
- π Custom Schedulers - Priority-based and resource-aware vertex execution
- π§ Memory & Embeddings - Semantic search and conversation storage
- π€ A2A Protocol - Multi-agent collaboration
- π WASM Sandboxing - Secure untrusted code execution
Explore 31 comprehensive examples in the examples/ directory:
| Example | Description |
|---|---|
| basic_agent | Simple ReAct agent with tools |
| conversational_agent | Agent with long-term memory across turns |
| document_loader | Document loading and ingestion pipeline |
| supervisor_agent | Multi-agent coordination with supervisor |
| reflection_agent | Self-critique and iterative refinement |
| checkpointing | State persistence and resume |
| human_approval | Approval workflows with conditional guards |
| parallel_tasks | Concurrent node execution |
| streaming | Real-time response streaming |
| middleware | Rate limiting and circuit breakers |
| custom_scheduler | Priority and resource-aware scheduling |
| observability | OpenTelemetry integration |
| a2a_integration | Agent-to-agent communication |
| wasm_tool | Sandboxed tool execution |
# Run any example
cd examples/basic_agent
go run main.go
# Set required environment variables
export OPENAI_API_KEY=your-key-here
export ANTHROPIC_API_KEY=your-key-here # For Anthropic examplesAgentMesh uses a layered architecture with clean separation of concerns:
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β Application Layer (pkg/agent) β
β β’ ReActAgent, SupervisorAgent, RAGAgent β
ββββββββββββββββββββββββββ¬βββββββββββββββββββββββββββββββββββββ
β
βΌ
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β Graph Orchestration (pkg/graph) β
β β’ Workflow construction β’ State management β’ Validation β
ββββββββββββββββββββββββββ¬βββββββββββββββββββββββββββββββββββββ
β
βΌ
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β Execution Engine (pkg/pregel) β
β β
β β’ BSP Runtime β’ Worker pools β’ MessageBus β
β β’ Superstep sync β’ Sharded frontier β’ Backpressure β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
Key Components:
- Graph - Define nodes, edges, and execution flow with
NodeFunc - BSPState - Copy-on-write state with typed
Key[T]andListKey[T] - Pregel Runtime - Bulk-synchronous parallel execution with sharded message passing
- Checkpointer - State persistence with encryption, signing, and two-phase commit
- Agents - High-level abstractions (ReAct, Supervisor, RAG)
Learn more about the architecture β
# Run all tests
go test ./...
# With coverage
go test ./... -coverprofile=coverage.out
go tool cover -html=coverage.out
# Run benchmarks
go test ./... -bench=. -benchmemContributions are welcome!
- Fork the repository
- Create a feature branch (
git checkout -b feature/amazing-feature) - Write tests for your changes
- Commit your changes (
git commit -m 'Add amazing feature') - Push to the branch (
git push origin feature/amazing-feature) - Open a Pull Request
- All new code must have tests (target 85%+ coverage)
- Run
go fmtandgolangci-lintbefore committing - Update documentation for public API changes
- Add examples for new features
Licensed under the Apache License 2.0 - see LICENSE for details.
- Pregel Paper - Google's bulk-synchronous parallel graph processing model
- Go Community - Exceptional tooling and ecosystem
- OpenTelemetry - Production-grade observability standards
β Star this repo if you find it useful!
Made with β€οΈ by hupe1980