Skip to content

Commit e98847e

Browse files
authored
feat: record latency metrics and request metrics for each instance. (#15)
1 parent 1cae803 commit e98847e

File tree

12 files changed

+242
-32
lines changed

12 files changed

+242
-32
lines changed

xllm_service/common/types.h

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,43 @@ struct LoadMetrics {
112112
bool empty() const { return false; }
113113
};
114114

115+
// Record the latency monitoring metrics of the instance over the recent period
116+
struct LatencyMetrics {
117+
LatencyMetrics(const int64_t& recent_max_ttft, const int64_t& recent_max_tbt)
118+
: recent_max_ttft(recent_max_ttft), recent_max_tbt(recent_max_tbt) {}
119+
120+
// The unit is milliseconds.
121+
int64_t recent_max_ttft;
122+
int64_t recent_max_tbt;
123+
};
124+
125+
enum class RequestAction : int32_t {
126+
SCHEDULE = 0,
127+
FINISH_PREFILL = 1,
128+
FINISH_DECODE = 2,
129+
CANCEL = 3,
130+
};
131+
132+
// Record the request metrics of the instance
133+
struct RequestMetrics {
134+
RequestMetrics()
135+
: prefill_request_num(0),
136+
prefill_token_num(0),
137+
decode_request_num(0),
138+
decode_token_num(0),
139+
estimated_prefill_time(0) {}
140+
141+
int64_t prefill_request_num;
142+
int64_t prefill_token_num;
143+
144+
int64_t decode_request_num;
145+
int64_t decode_token_num;
146+
147+
// Estimated execution time for all prefill requests on the instance.
148+
// The unit is milliseconds.
149+
int64_t estimated_prefill_time;
150+
};
151+
115152
struct InstanceMetaInfo {
116153
public:
117154
InstanceMetaInfo() { set_init_timestamp(); }

xllm_service/http_service/service.cpp

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,12 @@ void handle_non_stream_response(brpc::Controller* cntl,
9292
template <typename T>
9393
void handle_first_response(brpc::Controller* cntl,
9494
std::shared_ptr<T> call_data,
95+
Scheduler* scheduler,
96+
std::string service_request_id,
9597
bool stream) {
98+
// update request metrics for prefill finished request
99+
scheduler->update_request_metrics_for_prefill(service_request_id);
100+
96101
std::unique_ptr<brpc::Controller> cntl_guard(cntl);
97102
if (cntl->Failed()) {
98103
LOG(WARNING) << "Fail to send stream generation, " << cntl->ErrorText();
@@ -150,7 +155,6 @@ void XllmHttpServiceImpl::handle(std::shared_ptr<T> call_data,
150155
LOG(ERROR) << "rpc service add new request error: "
151156
<< request->service_request_id;
152157
call_data->finish_with_error("Internal runtime error.");
153-
scheduler_->finish_request(request->service_request_id);
154158
return;
155159
}
156160
}
@@ -177,14 +181,19 @@ void XllmHttpServiceImpl::handle(std::shared_ptr<T> call_data,
177181
// 1. tokens will be received via rpc channel.
178182
//
179183
if (enable_decode_response_to_service_) {
180-
google::protobuf::Closure* done = brpc::NewCallback(
181-
&handle_first_response<T>, redirect_cntl, call_data, request->stream);
184+
google::protobuf::Closure* done =
185+
brpc::NewCallback(&handle_first_response<T>,
186+
redirect_cntl,
187+
call_data,
188+
scheduler_,
189+
request->service_request_id,
190+
request->stream);
182191
channel_ptr->CallMethod(NULL, redirect_cntl, NULL, NULL, done);
183192
if (redirect_cntl->Failed()) {
184193
LOG(ERROR) << "Redirect to instance error: "
185194
<< redirect_cntl->ErrorText();
186195
call_data->finish_with_error(redirect_cntl->ErrorText());
187-
scheduler_->finish_request(request->service_request_id);
196+
scheduler_->finish_request(request->service_request_id, /*error=*/true);
188197
delete done;
189198
delete redirect_cntl;
190199
return;

xllm_service/proto/xllm_rpc_service.proto

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,10 +51,16 @@ message LoadMetrics {
5151
float gpu_cache_usage_perc = 2;
5252
}
5353

54+
message LatencyMetrics {
55+
int64 recent_max_ttft = 1;
56+
int64 recent_max_tbt = 2;
57+
}
58+
5459
message HeartbeatRequest {
5560
string name = 1;
5661
KvCacheEvent cache_event = 2;
5762
LoadMetrics load_metrics = 3;
63+
LatencyMetrics latency_metrics = 4;
5864
}
5965

6066
message InstanceID {

xllm_service/request/request.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,9 @@ struct Request {
4747
// instance routing
4848
Routing routing;
4949

50+
// the estimated TTFT obtained from the TTFT predictor
51+
int64_t estimated_ttft = 0;
52+
5053
// output callback
5154
OutputCallback output_callback;
5255

xllm_service/scheduler/loadbalance_policy/cache_aware_routing.h

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,20 +17,22 @@ limitations under the License.
1717

1818
#include "common/macros.h"
1919
#include "loadbalance_policy.h"
20+
#include "scheduler/managers/global_kvcache_mgr.h"
2021

2122
namespace xllm_service {
2223

2324
class CacheAwareRouting final : public LoadBalancePolicy {
2425
public:
2526
CacheAwareRouting(std::shared_ptr<InstanceMgr> instance_mgr,
2627
std::shared_ptr<GlobalKVCacheMgr> global_kvcache_mgr)
27-
: LoadBalancePolicy(instance_mgr, global_kvcache_mgr) {};
28+
: global_kvcache_mgr_(global_kvcache_mgr),
29+
LoadBalancePolicy(instance_mgr) {};
2830

2931
virtual ~CacheAwareRouting() = default;
3032

3133
bool select_instances_pair(std::shared_ptr<Request> request) override;
3234

33-
protected:
35+
private:
3436
DISALLOW_COPY_AND_ASSIGN(CacheAwareRouting);
3537

3638
void cost_function(
@@ -39,6 +41,8 @@ class CacheAwareRouting final : public LoadBalancePolicy {
3941
const std::unordered_map<std::string, LoadMetrics>& load_metrics,
4042
const int64_t& max_waiting_requests_num,
4143
std::string* best_choice);
44+
45+
std::shared_ptr<GlobalKVCacheMgr> global_kvcache_mgr_;
4246
};
4347

4448
} // namespace xllm_service

xllm_service/scheduler/loadbalance_policy/loadbalance_policy.h

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -15,27 +15,23 @@ limitations under the License.
1515

1616
#pragma once
1717

18-
#include "../managers/global_kvcache_mgr.h"
19-
#include "../managers/instance_mgr.h"
2018
#include "common/types.h"
2119
#include "request/request.h"
20+
#include "scheduler/managers/instance_mgr.h"
2221

2322
namespace xllm_service {
2423

2524
class LoadBalancePolicy {
2625
public:
27-
LoadBalancePolicy(std::shared_ptr<InstanceMgr> instance_mgr,
28-
std::shared_ptr<GlobalKVCacheMgr> global_kvcache_mgr)
29-
: instance_mgr_(instance_mgr), global_kvcache_mgr_(global_kvcache_mgr) {}
26+
LoadBalancePolicy(std::shared_ptr<InstanceMgr> instance_mgr)
27+
: instance_mgr_(instance_mgr) {}
3028

3129
virtual ~LoadBalancePolicy() = default;
3230

3331
virtual bool select_instances_pair(std::shared_ptr<Request> request) = 0;
3432

3533
protected:
3634
std::shared_ptr<InstanceMgr> instance_mgr_;
37-
38-
std::shared_ptr<GlobalKVCacheMgr> global_kvcache_mgr_;
3935
};
4036

4137
} // namespace xllm_service

xllm_service/scheduler/loadbalance_policy/round_robin.h

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,15 +22,14 @@ namespace xllm_service {
2222

2323
class RoundRobin final : public LoadBalancePolicy {
2424
public:
25-
RoundRobin(std::shared_ptr<InstanceMgr> instance_mgr,
26-
std::shared_ptr<GlobalKVCacheMgr> global_kvcache_mgr)
27-
: LoadBalancePolicy(instance_mgr, global_kvcache_mgr) {};
25+
RoundRobin(std::shared_ptr<InstanceMgr> instance_mgr)
26+
: LoadBalancePolicy(instance_mgr) {};
2827

2928
virtual ~RoundRobin() = default;
3029

3130
bool select_instances_pair(std::shared_ptr<Request> request) override;
3231

33-
protected:
32+
private:
3433
DISALLOW_COPY_AND_ASSIGN(RoundRobin);
3534
};
3635

xllm_service/scheduler/managers/CMakeLists.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,10 @@ cc_library(
1111
instance_mgr.cpp
1212
global_kvcache_mgr.cpp
1313
DEPS
14+
:chat_template
1415
:common
1516
:etcd_client
17+
:request
1618
absl::random_random
1719
absl::strings
1820
glog::glog

xllm_service/scheduler/managers/instance_mgr.cpp

Lines changed: 106 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -66,10 +66,15 @@ void InstanceMgr::init() {
6666
for (auto& it : ETCD_KEYS_PREFIX_MAP) {
6767
etcd_client_->get_prefix(it.second, &instances_);
6868
}
69-
// create ttft predictor for each instance
70-
for (auto& pair : instances_) {
71-
ttft_predictors_.insert_or_assign(
72-
pair.first, TtftPredictor(pair.second.ttft_profiling_data));
69+
// create ttft predictor and request metrics for each instance
70+
{
71+
std::lock_guard<std::mutex> ttft_predictor_lock(ttft_predictor_mutex_);
72+
std::lock_guard<std::mutex> request_metrics_lock(request_metrics_mutex_);
73+
for (auto& pair : instances_) {
74+
ttft_predictors_.insert_or_assign(
75+
pair.first, TtftPredictor(pair.second.ttft_profiling_data));
76+
request_metrics_.insert_or_assign(pair.first, RequestMetrics());
77+
}
7378
}
7479
LOG(INFO) << "Load instance info from etcd:" << instances_.size();
7580
std::vector<std::string> channel_creat_fail_insts;
@@ -99,7 +104,13 @@ void InstanceMgr::init() {
99104
}
100105
for (auto& name : channel_creat_fail_insts) {
101106
instances_.erase(name);
102-
ttft_predictors_.erase(name);
107+
{
108+
std::lock_guard<std::mutex> ttft_predictor_lock(ttft_predictor_mutex_);
109+
std::lock_guard<std::mutex> request_metrics_lock(
110+
request_metrics_mutex_);
111+
ttft_predictors_.erase(name);
112+
request_metrics_.erase(name);
113+
}
103114
}
104115
}
105116
{
@@ -340,9 +351,18 @@ void InstanceMgr::update_instance_metainfo(const etcd::Response& response,
340351
continue;
341352
}
342353

343-
// create ttft predictor for instance
344-
ttft_predictors_.emplace(
345-
iter.first, TtftPredictor(iter.second.ttft_profiling_data));
354+
{
355+
std::lock_guard<std::mutex> ttft_predictor_lock(
356+
ttft_predictor_mutex_);
357+
std::lock_guard<std::mutex> request_metrics_lock(
358+
request_metrics_mutex_);
359+
// create ttft predictor for instance
360+
ttft_predictors_.emplace(
361+
iter.first, TtftPredictor(iter.second.ttft_profiling_data));
362+
363+
// create request metrics for instance
364+
request_metrics_.emplace(iter.first, RequestMetrics());
365+
}
346366

347367
instances_.insert(std::make_pair(iter.first, std::move(iter.second)));
348368

@@ -395,8 +415,15 @@ void InstanceMgr::update_instance_metainfo(const etcd::Response& response,
395415
}
396416

397417
instances_.erase(iter);
398-
ttft_predictors_.erase(iter);
399418
cached_channels_.erase(iter);
419+
{
420+
std::lock_guard<std::mutex> ttft_predictor_lock(
421+
ttft_predictor_mutex_);
422+
std::lock_guard<std::mutex> request_metrics_lock(
423+
request_metrics_mutex_);
424+
ttft_predictors_.erase(iter);
425+
request_metrics_.erase(iter);
426+
}
400427
{
401428
std::lock_guard<std::mutex> lock(update_mutex_);
402429
updated_metrics_.erase(iter);
@@ -450,4 +477,74 @@ void InstanceMgr::update_load_metrics(const etcd::Response& response,
450477
});
451478
}
452479

480+
void InstanceMgr::update_latency_metrics(
481+
const std::string& instance_name,
482+
const proto::LatencyMetrics& latency_metrics) {
483+
std::lock_guard<std::mutex> lock(latency_metrics_mutex_);
484+
485+
latency_metrics_.insert_or_assign(
486+
instance_name,
487+
LatencyMetrics(latency_metrics.recent_max_ttft(),
488+
latency_metrics.recent_max_tbt()));
489+
}
490+
491+
void InstanceMgr::update_request_metrics(std::shared_ptr<Request> request,
492+
RequestAction action) {
493+
std::lock_guard<std::mutex> lock(request_metrics_mutex_);
494+
495+
auto prefill_it = request_metrics_.find(request->routing.prefill_name);
496+
if (prefill_it == request_metrics_.end()) {
497+
LOG(ERROR) << "Failed to find instance request metrics, instance name : "
498+
<< request->routing.prefill_name;
499+
return;
500+
}
501+
502+
auto decode_it = request_metrics_.find(request->routing.decode_name);
503+
if (decode_it == request_metrics_.end()) {
504+
LOG(ERROR) << "Failed to find instance request metrics, instance name : "
505+
<< request->routing.decode_name;
506+
return;
507+
}
508+
509+
int64_t token_length = request->token_ids.size();
510+
switch (action) {
511+
case RequestAction::SCHEDULE:
512+
// update the request metrics for prefill and decode instances when
513+
// request is scheduled
514+
prefill_it->second.prefill_request_num += 1;
515+
prefill_it->second.prefill_token_num += token_length;
516+
prefill_it->second.estimated_prefill_time += request->estimated_ttft;
517+
518+
decode_it->second.decode_request_num += 1;
519+
decode_it->second.decode_token_num += token_length;
520+
break;
521+
case RequestAction::FINISH_PREFILL:
522+
// only update the request metrics for prefill instance when request
523+
// finishes the prefill phase
524+
prefill_it->second.prefill_request_num -= 1;
525+
prefill_it->second.prefill_token_num -= token_length;
526+
prefill_it->second.estimated_prefill_time -= request->estimated_ttft;
527+
break;
528+
case RequestAction::FINISH_DECODE:
529+
// update the request metrics for decode instance when request finishes
530+
// the decode phase
531+
decode_it->second.decode_request_num -= 1;
532+
decode_it->second.decode_token_num -= token_length;
533+
break;
534+
case RequestAction::CANCEL:
535+
// update the request metrics for prefill and decode instances when
536+
// request is cancelled
537+
prefill_it->second.prefill_request_num -= 1;
538+
prefill_it->second.prefill_token_num -= token_length;
539+
prefill_it->second.estimated_prefill_time -= request->estimated_ttft;
540+
541+
decode_it->second.decode_request_num -= 1;
542+
decode_it->second.decode_token_num -= token_length;
543+
break;
544+
default:
545+
LOG(ERROR) << "Unknown RequestAction: " << static_cast<int32_t>(action);
546+
break;
547+
}
548+
}
549+
453550
} // namespace xllm_service

0 commit comments

Comments
 (0)