Skip to content

Commit 3cc894b

Browse files
committed
dev
1 parent 42e66be commit 3cc894b

13 files changed

+3775
-0
lines changed

src/queue/IMPLEMENTATION.md

Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
# 消息队列实现方案总结
2+
3+
## 实现概述
4+
5+
提供了**两种实现方案**,系统会自动检测 Redis 版本并选择最佳实现:
6+
7+
### 方案 1: QueueService(基于 List + Sorted Set)
8+
- **Redis 版本要求**: 2.0+
9+
- **数据结构**: List + Sorted Set + String
10+
- **特点**: 兼容性好,实现简单
11+
- **适用场景**: Redis 版本较低或需求简单
12+
13+
### 方案 2: QueueStreamsService(基于 Redis Streams)
14+
- **Redis 版本要求**: 5.0+
15+
- **数据结构**: Stream + Sorted Set(延迟消息)
16+
- **特点**: 功能丰富,原生支持消费者组和 ACK
17+
- **适用场景**: Redis 5.0+,需要更强大的功能
18+
19+
## 自动选择机制
20+
21+
系统通过 `QueueFactoryService` 自动检测:
22+
23+
1. **启动时检查** Redis 版本
24+
2. **测试** Streams 功能是否可用
25+
3. **自动选择** 最佳实现
26+
27+
## 使用方式
28+
29+
### 方式 1: 直接使用 QueueService(推荐)
30+
31+
```typescript
32+
import { QueueService } from '../queue/queue.service';
33+
34+
@Injectable()
35+
export class YourService {
36+
constructor(private readonly queueService: QueueService) {}
37+
38+
async someMethod() {
39+
// 生产消息
40+
await this.queueService.produce('queue-name', 'type', data);
41+
42+
// 消费消息
43+
const messages = await this.queueService.consume('queue-name');
44+
45+
// 确认消息
46+
await this.queueService.ack('queue-name', messageId);
47+
}
48+
}
49+
```
50+
51+
系统会自动使用最佳实现,你无需关心底层细节。
52+
53+
### 方式 2: 使用 QueueFactoryService
54+
55+
```typescript
56+
import { QueueFactoryService } from '../queue/queue-factory.service';
57+
58+
@Injectable()
59+
export class YourService {
60+
constructor(private readonly factory: QueueFactoryService) {}
61+
62+
async someMethod() {
63+
const queueService = await this.factory.getService();
64+
const isUsingStreams = this.factory.isUsingStreams();
65+
66+
// 使用 queueService...
67+
}
68+
}
69+
```
70+
71+
## Redis Streams 命令说明
72+
73+
如果使用 Streams 实现,底层使用以下 Redis 命令:
74+
75+
- **XADD**: 添加消息到 Stream
76+
- **XREADGROUP**: 从消费者组读取消息
77+
- **XACK**: 确认消息处理完成
78+
- **XPENDING**: 查看 Pending List
79+
- **XDEL**: 删除消息
80+
- **XLEN**: 获取 Stream 长度
81+
82+
## 数据结构对比
83+
84+
### List 实现
85+
```
86+
queue:{queue} - List/Sorted Set,存储消息ID
87+
queue:delay:{queue} - Sorted Set,延迟消息
88+
queue:processing:{queue} - List,处理中消息
89+
queue:message:{queue}:{id} - String,消息详情
90+
```
91+
92+
### Streams 实现
93+
```
94+
stream:{queue} - Stream,消息流
95+
queue:delay:{queue} - Sorted Set,延迟消息
96+
queue:message:{queue}:{id} - String,消息详情(用于查询)
97+
```
98+
99+
## 性能对比
100+
101+
| 操作 | List 实现 | Streams 实现 |
102+
|------|-----------|--------------|
103+
| 生产消息 | O(1) | O(1) |
104+
| 消费消息 | O(1) | O(1) |
105+
| ACK | O(1) | O(1) |
106+
| 查询 Pending | O(N) | O(1) |
107+
| 消费者组 | 需手动实现 | 原生支持 |
108+
109+
## 推荐使用
110+
111+
- **直接注入 QueueService**:系统会自动选择最佳实现
112+
- **接口完全兼容**:两种实现接口一致,可以无缝切换
113+
- **无需关心底层**:系统会自动处理版本检测和实现选择
114+

0 commit comments

Comments
 (0)