forked from qinxuewu/docs
-
Notifications
You must be signed in to change notification settings - Fork 13
Expand file tree
/
Copy pathstorm06.md
More file actions
122 lines (110 loc) · 3.95 KB
/
storm06.md
File metadata and controls
122 lines (110 loc) · 3.95 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
## DRPC简介
Storm里面引入DRPC主要是利用storm的实时计算能力来并行化CPU intensive的计算。DRPC的storm topology以函数的参数流作为输入,而把这些函数调用的返回值作为topology的输出流。
DRPC其实不能算是storm本身的一个特性, 它是通过组合storm的原语spout,bolt, topology而成的一种模式(pattern)。
## DRPC服务调用过程
- 接收一个RPC请求。
- 发送请求到storm topology
- 从storm topology接收结果
- 把结果发回给等待的客户端。从客户端的角度来看一个DRPC调用跟一个普通的RPC调用没有任何区别
<img src="_media/storm6.png">
## 要使用DRPC首先要修改storm配置文件
apache-storm-1.2.2/conf/storm.yaml
```bash
storm.zookeeper.servers:
- "192.168.1.191"
storm.zookeeper.port: 2181
storm.local.dir: "/usr/local/apache-storm-1.2.2/data"
nimbus.seeds: ["192.168.1.191"]
supervisor.slots.ports:
- 6700
- 6701
- 6702
- 6703
storm.health.check.dir: "healthchecks"
storm.health.check.timeout.ms: 5000
#配置drpc
drpc.servers:
- "192.168.1.191"
```
## 启动drpc服务
```
bin/storm drpc &
```
## 编写drpc服务代码
```java
public class DrpcTopology {
public static class ExclaimBolt extends BaseBasicBolt {
public void execute(Tuple tuple, BasicOutputCollector collector) {
String input = tuple.getString(1);
collector.emit(new Values(tuple.getValue(0), input + "1"));
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("id", "result"));
}
}
/**
* Distributed RPC是由一个”DPRC Server”协调的(storm自带了一个实现)。
* DRPC服务调用过程:
* 接收一个RPC请求。
* 发送请求到storm topology
* 从storm topology接收结果
* 把结果发回给等待的客户端。从客户端的角度来看一个DRPC调用跟一个普通的RPC调用没有任何区别
* @param args
* @throws Exception
*/
public static void main(String[] args) throws Exception {
LinearDRPCTopologyBuilder builder = new LinearDRPCTopologyBuilder("exclamation");
builder.addBolt(new ExclaimBolt(), 3);
Config conf = new Config();
conf.setDebug(true);
conf.setNumWorkers(2);
try {
// LocalDRPC drpc = new LocalDRPC();
// LocalCluster cluster = new LocalCluster();
// cluster.submitTopology("drpc-demo", conf, builder.createLocalTopology(drpc));
// System.out.println("DRPC测试 'hello':" + drpc.execute("exclamation", "hello"));
//
// cluster.shutdown();
// drpc.shutdown();
//集群模式
conf.setNumWorkers(3);
StormSubmitter.submitTopology("exclamation", conf,builder.createRemoteTopology());
} catch (Exception e) {
e.printStackTrace();
}
}
```
## 打包提交到storm集群
语法:bin/storm jar (jar包名) | 主函数路径 | Topology名称
```
bin/storm jar stom-demo-1.0.jar com.qxw.drpc.DrpcTopology exclamation
```
## 访问UI查看是否提交成功
http://192.168.1.191:8080
<img src="_media/storm7.png">
**linux查看正在运行的Topology**
```bash
[root@web1 apache-storm-1.2.2]# bin/storm list
3893 [main] INFO o.a.s.u.NimbusClient - Found leader nimbus : web1:6627
Topology_name Status Num_tasks Num_workers Uptime_secs
-------------------------------------------------------------------
exclamation ACTIVE 0 0 1020
```
## 调用集群的drpc
```java
public class DrpcTest {
public static void main(String[] args) {
try {
Config conf = new Config();
conf.setDebug(false);
Map config = Utils.readDefaultConfig();
DRPCClient client = new DRPCClient(config,"192.168.1.191", 3772); //drpc服务
String result = client.execute("exclamation", "hello");/// 调用drpcTest函数,传递参数为hello
System.out.println(result);
} catch (Exception e) {
e.printStackTrace();
}
}
}
```
参考链接:https://blog.csdn.net/qq403977698/article/details/49025345