高性能 Rust 库,提供微秒级延迟的 Solana DEX 事件解析
中文 | English | Website | Telegram | Discord
本 SDK 提供多种语言版本:
| 语言 | 仓库 | 描述 |
|---|---|---|
| Rust | sol-parser-sdk | 超低延迟,SIMD 优化 |
| Node.js | sol-parser-sdk-nodejs | TypeScript/JavaScript,Node.js 支持 |
| Python | sol-parser-sdk-python | 原生 async/await 支持 |
| Go | sol-parser-sdk-golang | 并发安全,goroutine 支持 |
- 10-20μs 解析延迟(Release 模式)
- 零拷贝 栈缓冲区解析
- SIMD 加速 模式匹配(memchr)
- 无锁队列 ArrayQueue 事件传递
| 模式 | 延迟 | 说明 |
|---|---|---|
| Unordered | 10-20μs | 立即输出,超低延迟 |
| MicroBatch | 50-200μs | 微批次排序,时间窗口内排序 |
| StreamingOrdered | 0.1-5ms | 流式排序,连续序列立即释放 |
| Ordered | 1-50ms | 完整 slot 排序,等待整个 slot 完成 |
- ✅ 零堆分配 热路径无堆分配
- ✅ SIMD 模式匹配 所有协议检测 SIMD 加速
- ✅ 静态预编译查找器 字符串搜索零开销
- ✅ 激进内联 关键函数强制内联
- ✅ 事件类型过滤 精准解析目标事件
- ✅ 条件 Create 检测 仅在需要时检测
- ✅ 多种顺序模式 延迟与顺序的灵活平衡
克隆仓库:
cd your_project_dir
git clone https://github.com/0xfnzero/sol-parser-sdk在 Cargo.toml 中添加:
[dependencies]
# 默认:Borsh 解析器
sol-parser-sdk = { path = "../sol-parser-sdk" }
# 或:零拷贝解析器(最高性能)
sol-parser-sdk = { path = "../sol-parser-sdk", default-features = false, features = ["parse-zero-copy"] }# 在 Cargo.toml 中添加
sol-parser-sdk = "0.3.1"或使用零拷贝解析器(最高性能):
sol-parser-sdk = { version = "0.3.1", default-features = false, features = ["parse-zero-copy"] }使用优化示例测试解析延迟:
# PumpFun 详细性能指标
cargo run --example pumpfun_with_metrics --release
# PumpSwap 详细性能指标(单事件明细 + 每 10 秒统计)
cargo run --example pumpswap_with_metrics --release
# PumpSwap 超低延迟测试
cargo run --example pumpswap_low_latency --release
# PumpSwap 事件 + MicroBatch 有序模式
cargo run --example pumpswap_ordered --release
# 预期输出:
# gRPC接收时间: 1234567890 μs
# 事件接收时间: 1234567900 μs
# 延迟时间: 10 μs <-- 超低延迟!| 描述 | 运行命令 | 源码 |
|---|---|---|
| PumpFun | ||
| PumpFun 事件解析 + 性能指标 | cargo run --example pumpfun_with_metrics --release |
examples/pumpfun_with_metrics.rs |
| PumpFun 交易类型过滤 | cargo run --example pumpfun_trade_filter --release |
examples/pumpfun_trade_filter.rs |
| PumpFun 有序模式交易过滤 | cargo run --example pumpfun_trade_filter_ordered --release |
examples/pumpfun_trade_filter_ordered.rs |
| PumpFun 快速连接测试 | cargo run --example pumpfun_quick_test --release |
examples/pumpfun_quick_test.rs |
| 按签名解析 PumpFun 交易 | TX_SIGNATURE=<sig> cargo run --example parse_pump_tx --release |
examples/parse_pump_tx.rs |
| 调试 PumpFun 交易 | cargo run --example debug_pump_tx --release |
examples/debug_pump_tx.rs |
| PumpSwap | ||
| PumpSwap 事件 + 性能统计 | cargo run --example pumpswap_with_metrics --release |
examples/pumpswap_with_metrics.rs |
| PumpSwap 超低延迟 | cargo run --example pumpswap_low_latency --release |
examples/pumpswap_low_latency.rs |
| PumpSwap MicroBatch 有序 | cargo run --example pumpswap_ordered --release |
examples/pumpswap_ordered.rs |
| 按签名解析 PumpSwap 交易 | TX_SIGNATURE=<sig> cargo run --example parse_pumpswap_tx --release |
examples/parse_pumpswap_tx.rs |
| 调试 PumpSwap 交易 | cargo run --example debug_pumpswap_tx --release |
examples/debug_pumpswap_tx.rs |
| Meteora DAMM | ||
| Meteora DAMM V2 事件 | cargo run --example meteora_damm_grpc --release |
examples/meteora_damm_grpc.rs |
| 按签名解析 Meteora DAMM 交易 | TX_SIGNATURE=<sig> cargo run --example parse_meteora_damm_tx --release |
examples/parse_meteora_damm_tx.rs |
| 账户订阅 | ||
| Token 账户余额变化 | TOKEN_ACCOUNT=<pubkey> cargo run --example token_balance_listen --release |
examples/token_balance_listen.rs |
| Nonce 账户状态变化 | NONCE_ACCOUNT=<pubkey> cargo run --example nonce_listen --release |
examples/nonce_listen.rs |
| Mint 账户信息 | MINT_ACCOUNT=<pubkey> cargo run --example token_decimals_listen --release |
examples/token_decimals_listen.rs |
| PumpSwap 池账户 memcmp 订阅 | cargo run --example pumpswap_pool_account_listen --release |
examples/pumpswap_pool_account_listen.rs |
| 所有 ATA 订阅 | cargo run --example mint_all_ata_account_listen --release |
examples/mint_all_ata_account_listen.rs |
| ShredStream | ||
| Jito ShredStream 订阅 | cargo run --example shredstream_example --release |
examples/shredstream_example.rs |
| 工具 | ||
| 动态更新订阅过滤器 | cargo run --example dynamic_subscription --release |
examples/dynamic_subscription.rs |
| 调试 PumpSwap 账户填充 | cargo run --example test_account_filling --release |
examples/test_account_filling.rs |
use sol_parser_sdk::grpc::{YellowstoneGrpc, ClientConfig, OrderMode, EventTypeFilter, EventType};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// 创建 gRPC 客户端(默认 Unordered 模式)
let grpc = YellowstoneGrpc::new(
"https://solana-yellowstone-grpc.publicnode.com:443".to_string(),
None,
)?;
// 或使用自定义配置启用有序模式
let config = ClientConfig {
order_mode: OrderMode::MicroBatch, // 低延迟 + 有序
micro_batch_us: 100, // 100μs 批次窗口
..ClientConfig::default()
};
let grpc = YellowstoneGrpc::new_with_config(
"https://solana-yellowstone-grpc.publicnode.com:443".to_string(),
None,
config,
)?;
// 仅过滤 PumpFun Trade 事件(超快路径)
let event_filter = EventTypeFilter::include_only(vec![
EventType::PumpFunTrade
]);
// 订阅并获取无锁队列
let queue = grpc.subscribe_dex_events(
vec![transaction_filter],
vec![account_filter],
Some(event_filter),
).await?;
// 最小延迟消费事件
tokio::spawn(async move {
let mut spin_count = 0;
loop {
if let Some(event) = queue.pop() {
spin_count = 0;
// 处理事件(10-20μs 延迟!)
println!("{:?}", event);
} else {
// 混合自旋等待策略
spin_count += 1;
if spin_count < 1000 {
std::hint::spin_loop();
} else {
tokio::task::yield_now().await;
spin_count = 0;
}
}
}
});
Ok(())
}ShredStream 通过直接订阅 Jito 的 ShredStream 服务提供超低延迟(比 gRPC 快约 50-100ms):
use sol_parser_sdk::shredstream::{ShredStreamClient, ShredStreamConfig};
use sol_parser_sdk::DexEvent;
use std::sync::Arc;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// 创建 ShredStream 客户端
let client = ShredStreamClient::new("http://127.0.0.1:10800").await?;
// 或使用自定义配置
let config = ShredStreamConfig {
connection_timeout_ms: 5000,
request_timeout_ms: 30000,
max_decoding_message_size: 1024 * 1024 * 1024,
reconnect_delay_ms: 1000,
max_reconnect_attempts: 0, // 0 = 无限重连
};
let client = ShredStreamClient::new_with_config("http://127.0.0.1:10800", config).await?;
// 订阅并获取无锁队列
let queue = client.subscribe().await?;
// 消费事件
loop {
if let Some(event) = queue.pop() {
match &event {
DexEvent::PumpFunTrade(e) => {
println!("PumpFun Trade: mint={}, is_buy={}", e.mint, e.is_buy);
}
DexEvent::PumpSwapBuy(e) => {
println!("PumpSwap Buy: pool={}", e.pool);
}
_ => {}
}
} else {
std::hint::spin_loop();
}
}
}ShredStream 限制:
- 仅
static_account_keys()- 使用 ALT 的交易可能有错误的账户 - 无 Inner Instructions - 无法解析 CPI 调用
- 无 block_time - 恒为 0
- tx_index 是 entry 内索引而非 slot 内索引
- ✅ PumpFun - Meme 币交易(超快零拷贝路径)
- ✅ PumpSwap - PumpFun 交换协议
- ✅ Raydium AMM V4 - 自动做市商
- ✅ Raydium CLMM - 集中流动性做市
- ✅ Raydium CPMM - 集中池做市
- ✅ Orca Whirlpool - 集中流动性 AMM
- ✅ Meteora AMM - 动态 AMM
- ✅ Meteora DAMM - 动态 AMM V2
- ✅ Meteora DLMM - 动态流动性做市
- ✅ Bonk Launchpad - 代币发射平台
每个协议支持:
- 📈 交易/兑换事件 - 买入/卖出交易
- 💧 流动性事件 - 存款/提款
- 🏊 池事件 - 池创建/初始化
- 🎯 仓位事件 - 开仓/平仓(CLMM)
// PumpFun Trade 使用 512 字节栈缓冲区
const MAX_DECODE_SIZE: usize = 512;
let mut decode_buf: [u8; MAX_DECODE_SIZE] = [0u8; MAX_DECODE_SIZE];
// 直接解码到栈,无堆分配
general_purpose::STANDARD
.decode_slice(data_part.as_bytes(), &mut decode_buf)
.ok()?;// 预编译 SIMD 查找器(初始化一次)
static PUMPFUN_FINDER: Lazy<memmem::Finder> =
Lazy::new(|| memmem::Finder::new(b"6EF8rrecthR5Dkzon8Nwu78hRvfCKubJ14M5uBEwF6P"));
// 比 .contains() 快 3-10 倍
if PUMPFUN_FINDER.find(log_bytes).is_some() {
return LogType::PumpFun;
}// 单一事件类型超快路径
if include_only.len() == 1 && include_only[0] == EventType::PumpFunTrade {
if log_type == LogType::PumpFun {
return parse_pumpfun_trade( // 零拷贝路径
log, signature, slot, block_time, grpc_recv_us, is_created_buy
);
}
}// 100,000 容量的 ArrayQueue
let queue = Arc::new(ArrayQueue::<DexEvent>::new(100_000));
// 非阻塞 push/pop(无互斥锁开销)
let _ = queue.push(event);
if let Some(event) = queue.pop() {
// 处理事件
}通过过滤特定事件减少处理开销:
let event_filter = EventTypeFilter::include_only(vec![
EventType::PumpFunTrade,
EventType::RaydiumAmmV4Swap,
EventType::RaydiumClmmSwap,
EventType::OrcaWhirlpoolSwap,
]);let event_filter = EventTypeFilter::include_only(vec![
EventType::PumpFunCreate,
EventType::RaydiumClmmCreatePool,
EventType::OrcaWhirlpoolInitialize,
]);性能影响:
- 减少 60-80% 的处理开销
- 降低内存使用
- 减少网络带宽
自动检测代币创建后立即购买的交易:
// 检测 "Program data: GB7IKAUcB3c..." 模式
let has_create = detect_pumpfun_create(logs);
// 在 Trade 事件上设置 is_created_buy 标志
if has_create {
trade_event.is_created_buy = true;
}无需重连即可更新过滤器:
grpc.update_subscription(
vec![new_transaction_filter],
vec![new_account_filter],
).await?;根据场景选择延迟与顺序的平衡:
use sol_parser_sdk::grpc::{ClientConfig, OrderMode};
// 超低延迟(无顺序保证)
let config = ClientConfig {
order_mode: OrderMode::Unordered,
..ClientConfig::default()
};
// 低延迟微批次排序(50-200μs)
let config = ClientConfig {
order_mode: OrderMode::MicroBatch,
micro_batch_us: 100, // 100μs 批次窗口
..ClientConfig::default()
};
// 流式排序,连续序列立即释放(0.1-5ms)
let config = ClientConfig {
order_mode: OrderMode::StreamingOrdered,
order_timeout_ms: 50, // 不完整序列超时
..ClientConfig::default()
};
// 完整 slot 排序(1-50ms,等待整个 slot)
let config = ClientConfig {
order_mode: OrderMode::Ordered,
order_timeout_ms: 100,
..ClientConfig::default()
};let config = ClientConfig {
enable_metrics: true,
..ClientConfig::default()
};
let grpc = YellowstoneGrpc::new_with_config(endpoint, token, config)?;src/
├── core/
│ └── events.rs # 事件定义
├── grpc/
│ ├── client.rs # Yellowstone gRPC 客户端
│ ├── buffers.rs # SlotBuffer 和 MicroBatchBuffer
│ └── types.rs # OrderMode、ClientConfig、过滤器
├── shredstream/
│ ├── client.rs # Jito ShredStream 客户端
│ ├── config.rs # ShredStreamConfig
│ └── proto/ # Protobuf 定义
├── logs/
│ ├── optimized_matcher.rs # SIMD 日志检测
│ ├── zero_copy_parser.rs # 零拷贝解析
│ ├── pumpfun.rs # PumpFun 解析器
│ ├── raydium_*.rs # Raydium 解析器
│ ├── orca_*.rs # Orca 解析器
│ └── meteora_*.rs # Meteora 解析器
├── instr/
│ └── *.rs # 指令解析器
├── warmup/
│ └── mod.rs # 解析器预热(自动调用)
└── lib.rs
- 所有
.contains()替换为memmem::Finder - 性能提升 3-10 倍
- 预编译静态查找器
- 栈分配缓冲区(512 字节)
- 热路径无堆分配
- 内联辅助函数
- 协议级别早期过滤
- 条件 Create 检测
- 单类型超快路径
- ArrayQueue(100K 容量)
- 自旋等待混合策略
- 无互斥锁开销
#[inline(always)]
fn read_u64_le_inline(data: &[u8], offset: usize) -> Option<u64> {
if offset + 8 <= data.len() {
let mut bytes = [0u8; 8];
bytes.copy_from_slice(&data[offset..offset + 8]);
Some(u64::from_le_bytes(bytes))
} else {
None
}
}| 协议 | 平均延迟 | 最小 | 最大 |
|---|---|---|---|
| PumpFun Trade(零拷贝) | 10-15μs | 8μs | 20μs |
| Raydium AMM V4 Swap | 15-20μs | 12μs | 25μs |
| Orca Whirlpool Swap | 15-20μs | 12μs | 25μs |
| 操作 | 优化前(contains) | 优化后(SIMD) | 提升 |
|---|---|---|---|
| 协议检测 | 50-100ns | 10-20ns | 3-10x |
| Create 事件检测 | 150ns | 30ns | 5x |
MIT License
- 仓库: https://github.com/0xfnzero/solana-streamer
- Telegram: https://t.me/fnzero_group
- Discord: https://discord.gg/vuazbGkqQE
- 使用事件过滤 - 源头过滤可获得 60-80% 性能提升
- Release 模式运行 -
cargo build --release获得完整优化 - 使用 sudo 测试 -
sudo cargo run --example basic --release获得精确计时 - 监控延迟 - 生产环境检查
grpc_recv_us和队列延迟 - 调整队列大小 - 根据吞吐量调整 ArrayQueue 容量
- 自旋等待策略 - 根据使用场景调整自旋计数(默认:1000)
# 运行测试
cargo test
# 构建 release 二进制
cargo build --release
# 生成文档
cargo doc --open