Skip to content

Commit 6e27456

Browse files
JimHsiungliutongxuan
authored andcommitted
refactor: unify the handling of requests and responses into the scheduler.
1 parent c282da4 commit 6e27456

39 files changed

+700
-778
lines changed

xllm_service/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ add_subdirectory(rpc_service)
88
add_subdirectory(tokenizer)
99
add_subdirectory(chat_template)
1010
add_subdirectory(http_service)
11+
add_subdirectory(scheduler)
1112

1213
cc_binary(
1314
NAME

xllm_service/common/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,5 +32,6 @@ cc_library(
3232
gflags::gflags
3333
nlohmann_json::nlohmann_json
3434
SMHasherSupport
35+
proto_xllm
3536
)
3637
add_dependencies(common brpc-static)

xllm_service/common/call_data.h

Lines changed: 15 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,16 @@ limitations under the License.
1919
#include <brpc/controller.h>
2020
#include <butil/iobuf.h>
2121
#include <glog/logging.h>
22-
#include <grpcpp/grpcpp.h>
2322
#include <json2pb/pb_to_json.h>
2423

2524
#include <functional>
2625
#include <string>
2726

27+
#include "chat.pb.h"
28+
#include "completion.pb.h"
29+
30+
namespace xllm_service {
31+
2832
// Interface for the classes that are used to handle grpc requests.
2933
class CallData {
3034
public:
@@ -111,28 +115,13 @@ class StreamCallData : public CallData {
111115
return true;
112116
}
113117

114-
bool finish_with_error(const grpc::StatusCode& code,
115-
const std::string& error_message) {
116-
if (!stream_) {
117-
controller_->SetFailed(error_message);
118-
119-
} else {
120-
io_buf_.clear();
121-
io_buf_.append(error_message);
122-
pa_->Write(io_buf_);
123-
}
124-
125-
return true;
126-
}
127-
128-
bool write_and_finish(Response& response,
129-
grpc::Status grpc_status = grpc::Status::OK) {
118+
bool write_and_finish(Response& response) {
130119
butil::IOBufAsZeroCopyOutputStream json_output(
131120
&controller_->response_attachment());
132121
std::string err_msg;
133122
if (!json2pb::ProtoMessageToJson(
134123
response, &json_output, json_options_, &err_msg)) {
135-
return finish_with_error(grpc::StatusCode::INTERNAL, err_msg);
124+
return finish_with_error(err_msg);
136125
}
137126

138127
if (trace_callback_) {
@@ -229,3 +218,11 @@ class StreamCallData : public CallData {
229218
json2pb::Pb2JsonOptions json_options_;
230219
std::function<void(const std::string&)> trace_callback_;
231220
};
221+
222+
using CompletionCallData = StreamCallData<llm::proto::CompletionRequest,
223+
llm::proto::CompletionResponse>;
224+
225+
using ChatCallData =
226+
StreamCallData<llm::proto::ChatRequest, llm::proto::ChatResponse>;
227+
228+
} // namespace xllm_service

xllm_service/common/global_gflags.cpp

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,9 @@ limitations under the License.
1515

1616
#include "common/global_gflags.h"
1717

18-
DEFINE_string(http_server_host,
18+
DEFINE_string(server_host,
1919
"",
20-
"Http server listen address, may be IPV4/IPV6/UDS."
20+
"Server listen address, may be IPV4/IPV6/UDS."
2121
" If this is set, the flag port will be ignored");
2222

2323
DEFINE_int32(http_server_port, 8888, "Port for xllm http service to listen on");
@@ -60,10 +60,6 @@ DEFINE_int32(max_concurrency,
6060
128,
6161
"Limit number of requests processed in parallel");
6262

63-
DEFINE_string(test_instance_addr,
64-
"0.0.0.0:9999",
65-
"Xllm instance listen addr for testing.");
66-
6763
DEFINE_int32(timeout_ms,
6864
-1,
6965
"Max duration of bRPC Channel. -1 means wait indefinitely.");
@@ -92,6 +88,4 @@ DEFINE_int32(block_size,
9288

9389
DEFINE_string(tokenizer_path, "", "tokenizer config path.");
9490

95-
DEFINE_string(model_type, "", "model type.");
96-
9791
DEFINE_bool(enable_request_trace, false, "Whether to enable request trace");

xllm_service/common/global_gflags.h

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ limitations under the License.
1717

1818
#include <gflags/gflags.h>
1919

20-
DECLARE_string(http_server_host);
20+
DECLARE_string(server_host);
2121

2222
DECLARE_int32(http_server_port);
2323

@@ -37,8 +37,6 @@ DECLARE_int32(rpc_server_max_concurrency);
3737

3838
DECLARE_uint32(murmur_hash3_seed);
3939

40-
DECLARE_string(test_instance_addr);
41-
4240
DECLARE_int32(timeout_ms);
4341

4442
DECLARE_string(listen_addr);
@@ -61,6 +59,4 @@ DECLARE_int32(block_size);
6159

6260
DECLARE_string(tokenizer_path);
6361

64-
DECLARE_string(model_type);
65-
6662
DECLARE_bool(enable_request_trace);

xllm_service/common/options.h

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
/* Copyright 2025 The xLLM Authors. All Rights Reserved.
2+
3+
Licensed under the Apache License, Version 2.0 (the "License");
4+
you may not use this file except in compliance with the License.
5+
You may obtain a copy of the License at
6+
7+
https://github.com/jd-opensource/xllm-service/blob/main/LICENSE
8+
9+
Unless required by applicable law or agreed to in writing, software
10+
distributed under the License is distributed on an "AS IS" BASIS,
11+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
See the License for the specific language governing permissions and
13+
limitations under the License.
14+
==============================================================================*/
15+
16+
#pragma once
17+
18+
#include <string>
19+
20+
#include "common/macros.h"
21+
22+
namespace xllm_service {
23+
24+
class Options {
25+
public:
26+
Options() = default;
27+
~Options() = default;
28+
29+
// http server options
30+
PROPERTY(std::string, server_host);
31+
32+
PROPERTY(int32_t, http_port) = 9998;
33+
34+
PROPERTY(int32_t, http_idle_timeout_s) = -1;
35+
36+
PROPERTY(int32_t, http_num_threads) = 32;
37+
38+
PROPERTY(int32_t, http_max_concurrency) = 0;
39+
40+
// rpc server options
41+
PROPERTY(int32_t, rpc_port) = 9999;
42+
43+
PROPERTY(int32_t, rpc_idle_timeout_s) = -1;
44+
45+
PROPERTY(int32_t, rpc_num_threads) = 32;
46+
47+
PROPERTY(int32_t, rpc_max_concurrency) = 0;
48+
49+
PROPERTY(int32_t, num_threads) = 32;
50+
51+
PROPERTY(int32_t, max_concurrency) = 32;
52+
53+
PROPERTY(int32_t, timeout_ms) = 32;
54+
55+
// instance manager options
56+
PROPERTY(std::string, etcd_addr);
57+
58+
PROPERTY(int32_t, detect_disconnected_instance_interval) = 15;
59+
60+
// scheduler options
61+
PROPERTY(std::string, load_balance_policy);
62+
63+
PROPERTY(int32_t, block_size) = 128;
64+
65+
PROPERTY(uint32_t, murmur_hash3_seed) = 1024;
66+
67+
PROPERTY(std::string, service_name);
68+
69+
// tokenizer options
70+
PROPERTY(std::string, tokenizer_path);
71+
72+
// trace options
73+
PROPERTY(bool, enable_request_trace) = false;
74+
};
75+
76+
} // namespace xllm_service

xllm_service/common/types.h

Lines changed: 0 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -36,26 +36,6 @@ using Murmur3KeyCacheMap = std::unordered_map<Murmur3Key,
3636
FixedStringKeyHash,
3737
FixedStringKeyEqual>;
3838

39-
struct HttpServiceConfig {
40-
int num_threads = 16;
41-
int timeout_ms = -1;
42-
std::string test_instance_addr = "";
43-
bool enable_request_trace = false;
44-
};
45-
46-
struct RpcServiceConfig {
47-
std::string etcd_addr = "";
48-
std::string load_balance_policy = "";
49-
int detect_disconnected_instance_interval = 15; // seconds
50-
std::string service_name = "";
51-
};
52-
53-
struct ModelConfig {
54-
int32_t block_size = 16;
55-
std::string model_type = "chatglm";
56-
std::string tokenizer_path = "";
57-
};
58-
5939
struct Routing {
6040
std::string prefill_name;
6141
std::string decode_name;

xllm_service/common/xllm/output.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,4 +126,7 @@ inline std::optional<std::string> to_string(FinishReason reason) {
126126
}
127127

128128
} // namespace llm
129+
130+
using OutputCallback = std::function<bool(llm::RequestOutput output)>;
131+
129132
} // namespace xllm_service

xllm_service/http_service/CMakeLists.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ cc_library(
1212
request_tracer.cpp
1313
DEPS
1414
:common
15-
:xllm_rpc_service
15+
:scheduler
1616
absl::random_random
1717
absl::synchronization
1818
glog::glog

xllm_service/http_service/main.cpp

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ limitations under the License.
1919
#include <grpcpp/grpcpp.h>
2020

2121
#include "common/global_gflags.h"
22+
#include "common/options.h"
2223
#include "http_service/service.h"
2324

2425
int main(int argc, char** argv) {
@@ -31,11 +32,8 @@ int main(int argc, char** argv) {
3132

3233
LOG(INFO) << "Starting xllm http service, port: " << FLAGS_port;
3334

34-
xllm_service::HttpServiceConfig config;
35-
config.num_threads = FLAGS_num_threads;
36-
config.timeout_ms = FLAGS_timeout_ms;
37-
config.test_instance_addr = FLAGS_test_instance_addr;
38-
xllm_service::XllmHttpServiceImpl service_impl(config);
35+
xllm_service::Options service_options;
36+
xllm_service::XllmHttpServiceImpl service_impl(service_options, nullptr);
3937

4038
// register http methods here
4139
brpc::Server server;

0 commit comments

Comments
 (0)